Source code for datumaro.plugins.kitti_format.converter

# Copyright (C) 2021 Intel Corporation
#
# SPDX-License-Identifier: MIT

from collections import OrderedDict
from enum import Enum, auto
import logging as log
import os
import os.path as osp

import numpy as np

from datumaro.components.annotation import (
    AnnotationType, CompiledMask, LabelCategories,
)
from datumaro.components.converter import Converter
from datumaro.util import cast, parse_str_enum_value, str_to_bool
from datumaro.util.annotation_util import make_label_id_mapping
from datumaro.util.image import save_image
from datumaro.util.mask_tools import paint_mask
from datumaro.util.meta_file_util import is_meta_file, parse_meta_file

from .format import (
    KittiLabelMap, KittiPath, KittiTask, make_kitti_categories, parse_label_map,
    write_label_map,
)


[docs]class LabelmapType(Enum): kitti = auto() source = auto()
[docs]class KittiConverter(Converter): DEFAULT_IMAGE_EXT = KittiPath.IMAGE_EXT @staticmethod def _split_tasks_string(s): return [KittiTask[i.strip().lower()] for i in s.split(',')] @staticmethod def _get_labelmap(s): if osp.isfile(s): return s try: return LabelmapType[s.lower()].name except KeyError: import argparse raise argparse.ArgumentTypeError()
[docs] @classmethod def build_cmdline_parser(cls, **kwargs): parser = super().build_cmdline_parser(**kwargs) parser.add_argument('--apply-colormap', type=str_to_bool, default=True, help="Use colormap for class masks (default: %(default)s)") parser.add_argument('--label-map', type=cls._get_labelmap, default=None, help="Labelmap file path or one of %s" % \ ', '.join(t.name for t in LabelmapType)) parser.add_argument('--tasks', type=cls._split_tasks_string, help="KITTI task filter, comma-separated list of {%s} " "(default: all)" % ', '.join(t.name for t in KittiTask)) return parser
[docs] def __init__(self, extractor, save_dir, tasks=None, apply_colormap=True, allow_attributes=True, label_map=None, **kwargs): super().__init__(extractor, save_dir, **kwargs) assert tasks is None or isinstance(tasks, (KittiTask, list, set)) if tasks is None: tasks = set(KittiTask) elif isinstance(tasks, KittiTask): tasks = {tasks} else: tasks = set(parse_str_enum_value(t, KittiTask) for t in tasks) self._tasks = tasks self._apply_colormap = apply_colormap if label_map is None: label_map = LabelmapType.source.name if KittiTask.segmentation in self._tasks: self._load_categories(label_map) elif KittiTask.detection in self._tasks: self._categories = {AnnotationType.label: self._extractor.categories().get(AnnotationType.label, LabelCategories())}
[docs] def apply(self): os.makedirs(self._save_dir, exist_ok=True) for subset_name, subset in self._extractor.subsets().items(): if KittiTask.segmentation in self._tasks: os.makedirs(osp.join(self._save_dir, subset_name, KittiPath.INSTANCES_DIR), exist_ok=True) for item in subset: if self._save_images: self._save_image(item, subdir=osp.join(subset_name, KittiPath.IMAGES_DIR)) masks = [a for a in item.annotations if a.type == AnnotationType.mask] if masks and KittiTask.segmentation in self._tasks: compiled_class_mask = CompiledMask.from_instance_masks(masks, instance_labels=[self._label_id_mapping(m.label) for m in masks]) color_mask_path = osp.join(subset_name, KittiPath.SEMANTIC_RGB_DIR, item.id + KittiPath.MASK_EXT) self.save_mask(osp.join(self._save_dir, color_mask_path), compiled_class_mask.class_mask) labelids_mask_path = osp.join(subset_name, KittiPath.SEMANTIC_DIR, item.id + KittiPath.MASK_EXT) self.save_mask(osp.join(self._save_dir, labelids_mask_path), compiled_class_mask.class_mask, apply_colormap=False, dtype=np.int32) # TODO: optimize second merging compiled_instance_mask = CompiledMask.from_instance_masks(masks, instance_labels=[(self._label_id_mapping(m.label) << 8) \ + m.id for m in masks]) inst_path = osp.join(subset_name, KittiPath.INSTANCES_DIR, item.id + KittiPath.MASK_EXT) self.save_mask(osp.join(self._save_dir, inst_path), compiled_instance_mask.class_mask, apply_colormap=False, dtype=np.int32) bboxes = [a for a in item.annotations if a.type == AnnotationType.bbox] if bboxes and KittiTask.detection in self._tasks: labels_file = osp.join(self._save_dir, subset_name, KittiPath.LABELS_DIR, '%s.txt' % item.id) os.makedirs(osp.dirname(labels_file), exist_ok=True) with open(labels_file, 'w', encoding='utf-8') as f: for bbox in bboxes: label_line = [-1] * 16 label_line[0] = self.get_label(bbox.label) label_line[1] = cast(bbox.attributes.get('truncated'), float, KittiPath.DEFAULT_TRUNCATED) label_line[2] = cast(bbox.attributes.get('occluded'), int, KittiPath.DEFAULT_OCCLUDED) x, y, h, w = bbox.get_bbox() label_line[4:8] = x, y, x + h, y + w label_line[15] = cast(bbox.attributes.get('score'), float, KittiPath.DEFAULT_SCORE) label_line = ' '.join(str(v) for v in label_line) f.write('%s\n' % label_line) if KittiTask.segmentation in self._tasks: self.save_label_map()
[docs] def get_label(self, label_id): return self._extractor. \ categories()[AnnotationType.label].items[label_id].name
[docs] def save_label_map(self): if self._save_dataset_meta: self._save_meta_file(self._save_dir) else: path = osp.join(self._save_dir, KittiPath.LABELMAP_FILE) write_label_map(path, self._label_map)
def _load_categories(self, label_map_source): if label_map_source == LabelmapType.kitti.name: # use the default KITTI colormap label_map = KittiLabelMap elif label_map_source == LabelmapType.source.name and \ AnnotationType.mask not in self._extractor.categories(): # generate colormap for input labels labels = self._extractor.categories() \ .get(AnnotationType.label, LabelCategories()) label_map = OrderedDict((item.name, None) for item in labels.items) elif label_map_source == LabelmapType.source.name and \ AnnotationType.mask in self._extractor.categories(): # use source colormap labels = self._extractor.categories()[AnnotationType.label] colors = self._extractor.categories()[AnnotationType.mask] label_map = OrderedDict() for idx, item in enumerate(labels.items): color = colors.colormap.get(idx) if color is not None: label_map[item.name] = color elif isinstance(label_map_source, dict): label_map = OrderedDict( sorted(label_map_source.items(), key=lambda e: e[0])) elif isinstance(label_map_source, str) and osp.isfile(label_map_source): if is_meta_file(label_map_source): label_map = parse_meta_file(label_map_source) else: label_map = parse_label_map(label_map_source) else: raise Exception("Wrong labelmap specified, " "expected one of %s or a file path" % \ ', '.join(t.name for t in LabelmapType)) self._categories = make_kitti_categories(label_map) self._label_map = label_map self._label_id_mapping = self._make_label_id_map() def _make_label_id_map(self): map_id, id_mapping, src_labels, dst_labels = make_label_id_mapping( self._extractor.categories().get(AnnotationType.label), self._categories[AnnotationType.label]) void_labels = [src_label for src_id, src_label in src_labels.items() if src_label not in dst_labels] if void_labels: log.warning("The following labels are remapped to background: %s" % ', '.join(void_labels)) log.debug("Saving segmentations with the following label mapping: \n%s" % '\n'.join(["#%s '%s' -> #%s '%s'" % ( src_id, src_label, id_mapping[src_id], self._categories[AnnotationType.label] \ .items[id_mapping[src_id]].name ) for src_id, src_label in src_labels.items() ]) ) return map_id
[docs] def save_mask(self, path, mask, colormap=None, apply_colormap=True, dtype=np.uint8): if self._apply_colormap and apply_colormap: if colormap is None: colormap = self._categories[AnnotationType.mask].colormap mask = paint_mask(mask, colormap) save_image(path, mask, create_dir=True, dtype=dtype)
[docs]class KittiSegmentationConverter(KittiConverter):
[docs] def __init__(self, *args, **kwargs): kwargs['tasks'] = KittiTask.segmentation super().__init__(*args, **kwargs)
[docs]class KittiDetectionConverter(KittiConverter):
[docs] def __init__(self, *args, **kwargs): kwargs['tasks'] = KittiTask.detection super().__init__(*args, **kwargs)