Source code for datumaro.plugins.yolo_format.extractor
# Copyright (C) 2019-2022 Intel Corporation
#
# SPDX-License-Identifier: MIT
from collections import OrderedDict
from typing import Dict
import os.path as osp
import re
from datumaro.components.annotation import AnnotationType, Bbox, LabelCategories
from datumaro.components.extractor import (
DatasetItem, Extractor, Importer, SourceExtractor,
)
from datumaro.components.format_detection import FormatDetectionContext
from datumaro.components.media import Image
from datumaro.util.image import (
DEFAULT_IMAGE_META_FILE_NAME, load_image_meta_file,
)
from datumaro.util.meta_file_util import has_meta_file, parse_meta_file
from datumaro.util.os_util import split_path
from .format import YoloPath
[docs]class YoloExtractor(SourceExtractor):
[docs] class Subset(Extractor):
[docs] def __init__(self, name, parent):
super().__init__()
self._name = name
self._parent = parent
self.items = OrderedDict()
[docs] def __init__(self, config_path, image_info=None, **kwargs):
super().__init__(**kwargs)
if not osp.isfile(config_path):
raise Exception("Can't read dataset descriptor file '%s'" %
config_path)
rootpath = osp.dirname(config_path)
self._path = rootpath
assert image_info is None or isinstance(image_info, (str, dict))
if image_info is None:
image_info = osp.join(rootpath, DEFAULT_IMAGE_META_FILE_NAME)
if not osp.isfile(image_info):
image_info = {}
if isinstance(image_info, str):
image_info = load_image_meta_file(image_info)
self._image_info = image_info
with open(config_path, 'r', encoding='utf-8') as f:
config_lines = f.readlines()
subsets = OrderedDict()
names_path = None
for line in config_lines:
match = re.match(r'(\w+)\s*=\s*(.+)$', line)
if not match:
continue
key = match.group(1)
value = match.group(2)
if key == 'names':
names_path = value
elif key in YoloPath.SUBSET_NAMES:
subsets[key] = value
else:
continue
if not names_path:
raise Exception("Failed to parse labels path from '%s'" % \
config_path)
for subset_name, list_path in subsets.items():
list_path = osp.join(self._path, self.localize_path(list_path))
if not osp.isfile(list_path):
raise Exception("Not found '%s' subset list file" % subset_name)
subset = YoloExtractor.Subset(subset_name, self)
with open(list_path, 'r', encoding='utf-8') as f:
subset.items = OrderedDict(
(self.name_from_path(p), self.localize_path(p))
for p in f if p.strip()
)
subsets[subset_name] = subset
self._subsets: Dict[str, YoloExtractor.Subset] = subsets
self._categories = {
AnnotationType.label:
self._load_categories(
osp.join(self._path, self.localize_path(names_path)))
}
[docs] @staticmethod
def localize_path(path):
path = osp.normpath(path).strip()
default_base = 'data' + osp.sep
if path.startswith(default_base): # default path
path = path[len(default_base) : ]
return path
[docs] @classmethod
def name_from_path(cls, path):
path = cls.localize_path(path)
parts = split_path(path)
if 1 < len(parts) and not osp.isabs(path):
# NOTE: when path is like [data/]<subset>_obj/<image_name>
# drop everything but <image name>
# <image name> can be <a/b/c/filename.ext>, so not just basename()
path = osp.join(*parts[1:]) # pylint: disable=no-value-for-parameter
return osp.splitext(path)[0]
def _get(self, item_id, subset_name):
subset = self._subsets[subset_name]
item = subset.items[item_id]
if isinstance(item, str):
image_size = self._image_info.get(item_id)
image = Image(path=osp.join(self._path, item), size=image_size)
anno_path = osp.splitext(image.path)[0] + '.txt'
annotations = self._parse_annotations(anno_path, image)
item = DatasetItem(id=item_id, subset=subset_name,
image=image, annotations=annotations)
subset.items[item_id] = item
return item
@staticmethod
def _parse_annotations(anno_path, image):
lines = []
with open(anno_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
lines.append(line)
annotations = []
if lines:
# use image info as late as possible
if image.size is None:
raise Exception("Can't find image info for '%s'" % image.path)
image_height, image_width = image.size
for line in lines:
label_id, xc, yc, w, h = line.split()
label_id = int(label_id)
w = float(w)
h = float(h)
x = float(xc) - w * 0.5
y = float(yc) - h * 0.5
annotations.append(Bbox(
round(x * image_width, 1), round(y * image_height, 1),
round(w * image_width, 1), round(h * image_height, 1),
label=label_id
))
return annotations
@staticmethod
def _load_categories(names_path):
if has_meta_file(osp.dirname(names_path)):
return LabelCategories.from_iterable(
parse_meta_file(osp.dirname(names_path)).keys())
label_categories = LabelCategories()
with open(names_path, 'r', encoding='utf-8') as f:
for label in f:
label = label.strip()
if label:
label_categories.add(label)
return label_categories
[docs] def __iter__(self):
subsets = self._subsets
pbars = self._ctx.progress_reporter.split(len(subsets))
for pbar, (subset_name, subset) in zip(pbars, subsets.items()):
for item in pbar.iter(subset, desc=f"Parsing '{subset_name}'"):
yield item