import configparser import csv import logging import os import shutil from functools import total_ordering from pathlib import Path from typing import MutableMapping, Optional, List, Tuple, Union, Iterator, Iterable logger = logging.getLogger(__name__) @total_ordering class ObjectType(object): def __init__(self, name: str): self._name = name self._fields = [] # type: List[str] def __eq__(self, o) -> bool: other = o # type: ObjectType return isinstance(o, ObjectType) and self._name == other._name def __lt__(self, o: object) -> bool: if not isinstance(o, ObjectType): return True other = o # type: ObjectType return self._name < other._name def __hash__(self) -> int: return self._name.__hash__() @property def name(self): return self._name @property def fields(self): return self._fields def index_of(self, field: str, create: bool = False) -> Optional[int]: try: return self._fields.index(field) except ValueError: if not create: return None self._fields.append(field) return len(self._fields) - 1 def has(self, *keys: str): return all([key in self._fields for key in keys]) class Object(object): class ValueList(list): """An auto-expanding version of list.""" def __setitem__(self, index, value): if index >= len(self): self.extend([None] * (index + 1 - len(self))) list.__setitem__(self, index, value) def __getitem__(self, index): if index >= len(self): self.extend([None] * (index + 1 - len(self))) return list.__getitem__(self, index) def __init__(self, ds: "DataSet", ot: ObjectType, key: str): self._ds = ds self._ot = ot self._key = key self._data = Object.ValueList() @property def object_type(self): return self._ot @property def key(self): return self._key def set(self, key: str, value: str) -> "Object": if self._ds._frozen: raise Exception("This data set is frozen") idx = self._ot.index_of(key, create=True) self._data[idx] = value return self def _set_from_object(self, other: "Object"): for k in other._ot.fields: self.set(k, other.get(k)) def has_values(self, *keys: str) -> bool: return all([len(value) > 0 for value in [self.get(key) for key in keys] if value is not None]) def values(self, *keys: str, strip: bool = False, required: bool = True) -> List[Optional[str]]: """Looks up all values for all keys. If required=True, strip is also set to True If strip is True, all values are stripped with str.strip(). None values are preserved. If required=True, all values has to have a len() > 0. If any fails the requirement, a list with only None values is returned. """ values = [] strip = True if required else strip for key in keys: v = self.get(key) if strip: v = v.strip() if v else v if required: if v is None or len(v) == 0: return [None] * len(keys) values.append(v) return values def get(self, key: str) -> Optional[str]: idx = self._ot.index_of(key) return self._data[idx] if idx is not None else None def get_req(self, key: str) -> str: idx = self._ot.index_of(key) if idx is not None and idx < len(self._data): return self._data[idx] else: raise Exception("No such field: {}".format(key)) def get_all(self, *keys: str) -> Optional[List[str]]: values = [] for key in keys: idx = self._ot.index_of(key) if not idx or idx >= len(self._data): return None values.append(self._data[idx]) return values class DataSet(object): def __init__(self): self._object_types = {} # type: MutableMapping[str, ObjectType] self._objects_by_type = {} # type: MutableMapping[ObjectType, MutableMapping[str, Object]] self._frozen = False def __len__(self): return sum((len(objects) for objects in self._objects_by_type.values())) def freeze(self): self._frozen = True def _assert_not_frozen(self): if self._frozen: raise Exception("This data set is frozen") def _check_object_type(self, object_type: str, create: bool) -> \ Optional[Tuple[ObjectType, MutableMapping[str, Object]]]: try: ot = self._object_types[object_type] objects = self._objects_by_type[ot] return ot, objects, except KeyError: if not create: return None self._assert_not_frozen() ot = ObjectType(object_type) self._object_types[object_type] = ot self._objects_by_type[ot] = objects = {} return ot, objects, def _check_object(self, object_type: str, key: str, create: bool) -> Optional[Object]: t = self._check_object_type(object_type, create) if not t: return None ot, objects = t try: return objects[key] except KeyError: self._assert_not_frozen() if not create: raise Exception("No such object: {}:{}".format(object_type, key)) o = Object(self, ot, key) objects[key] = o return o def get_object_type(self, object_type: str) -> ObjectType: t = self._check_object_type(object_type, False) if not t: raise Exception("No such object type: {}".format(object_type)) ot, objects = t return ot def get_object(self, object_type: str, key: str) -> Object: o = self._check_object(object_type, key, False) if not o: raise Exception("No such object: {}:{}".format(object_type, key)) return o def has_object(self, object_type: str, key: str) -> bool: t = self._check_object_type(object_type, False) if t: ot, objects = t return key in objects return False def get_or_create_object(self, object_type: str, key: str) -> Object: return self._check_object(object_type, key, True) def create_object(self, object_type: str, key: str, replace=False) -> Object: self._assert_not_frozen() if self.has_object(object_type, key): if not replace: raise Exception("Object already exist: {}:{}".format(object_type, key)) ot, objects = self._check_object_type(object_type, False) del self._objects_by_type[ot][key] return self._check_object(object_type, key, True) def items(self) -> Iterator[Object]: for objects in self._objects_by_type.values(): for o in objects.values(): yield o def merge(self, other: "DataSet") -> "DataSet": ds = DataSet() for objects in self._objects_by_type.values(): for o in objects.values(): ds.create_object(o.object_type.name, o.key)._set_from_object(o) for objects in other._objects_by_type.values(): for o in objects.values(): ds.get_or_create_object(o.object_type.name, o.key)._set_from_object(o) return ds def import_object(self, other: Object) -> Object: o = self._check_object(other.object_type.name, other.key, create=True) for k in other.object_type.fields: o.set(k, other.get(k)) return o class DataSetManager(object): def __init__(self, basedir: Union[Path, str]): self._basedir = Path(basedir) self._csv = {} # type: MutableMapping[str, Tuple[str, Path]] @property def all_data_sets(self): datasets = [ds.name for ds in self._basedir.iterdir() if (ds / "data-set.ini").is_file()] return list(self._csv.keys()) + datasets def cookie_for_ds(self, ds_name) -> Path: try: return self._csv[ds_name][1] except KeyError: return self._basedir / ds_name / "data-set.ini" def create_rw(self, name, clean: bool) -> "LazyRwDataSet": return LazyRwDataSet(self, name, clean) def load_data_sets(self, inputs: List[str], freeze: bool = True) -> DataSet: ds = DataSet() for name in inputs: ds = ds.merge(self.load(name, freeze=True)) if freeze: ds.freeze() return ds def register_ds(self, ds_type: str, name: str, object_type: str, path: str = None): if ds_type == "csv": if name in self._csv: raise Exception("Data source already exists: {}".format(name)) self._csv[name] = object_type, Path(path), else: raise Exception("Unknown data source type: {}".format(ds_type)) def ds_type(self, name: str): return "csv" if name in self._csv else "ini-dir" def load(self, path, freeze=False) -> DataSet: try: object_type, path = self._csv[path] if not freeze: raise Exception("CSV data sources must be frozen") return DataSetManager._load_csv(object_type, path, freeze) except KeyError: return self._load_ini_dir(path, freeze) @staticmethod def _load_csv(object_type: str, path: Path, freeze: bool) -> DataSet: # logger.debug("Loading CSV file {}".format(path)) ds = DataSet() with open(str(path), newline='') as f: r = csv.reader(f) header = next(r, None) for row in r: if len(row) == 0: continue key = row[0] o = ds.create_object(object_type, key) for idx, value in zip(range(0, min(len(row), len(header))), row): o.set(header[idx], value) if freeze: ds.freeze() # logger.debug("Loaded {} objects".format(len(ds))) return ds def _load_ini_dir(self, _path: str, freeze: bool) -> DataSet: ds_dir = Path(_path) if Path(_path).is_absolute() else self._basedir / _path ds_dir = ds_dir if ds_dir.is_dir() else ds_dir.parent # logger.debug("Loading DS from '{}'".format(ds_dir)) self._load_ini(ds_dir / "data-set.ini") ds = DataSet() count = 0 for ot_path in ds_dir.glob("*"): if not ot_path.is_dir(): continue ot = ot_path.name # logger.debug(" Loading type '{}'".format(ot)) for o_path in ot_path.glob("*.ini"): count += 1 key = o_path.name[:-4] # logger.debug(" Loading key '{}'".format(key)) ini = self._load_ini(o_path) o = ds.create_object(ot, key) for k, v in ini.items("values"): o.set(k, v) if freeze: ds.freeze() # logger.debug("Loaded {} items".format(count)) return ds def store(self, ds: DataSet, ds_name: str): ds_dir = self._basedir / ds_name items = list(ds.items()) # logger.info("Storing DS '{}' with {} objects to {}".format(ds_name, len(items), ds_dir)) os.makedirs(ds_dir, exist_ok=True) ini = self._blank_ini() ini.add_section("data-set") ini.set("data-set", "name", ds_name) self._store_ini(ini, ds_dir / "data-set.ini") for o in items: ot = o.object_type key = o.key ot_dir = ds_dir / ot.name os.makedirs(ot_dir, exist_ok=True) ini = self._blank_ini() ini.add_section("meta") ini.set("meta", "type", ot.name) ini.add_section("values") for k in ot.fields: v = o.get(k) if v: ini.set("values", k, str(v)) self._store_ini(ini, ot_dir / "{}.ini".format(key)) # noinspection PyMethodMayBeStatic def store_csv(self, path: Union[str, Path], ds: DataSet, object_type: str, order_by: Union[str, Iterable[str]] = None, fields: List[str] = None, include_extra_fields: bool = True): items = [o for o in ds.items() if o.object_type.name == object_type] if order_by: if isinstance(order_by, str): items = sorted(items, key=lambda o: o.get_req(order_by)) elif isinstance(order_by, Iterable): items = sorted(items, key=lambda o: [o.get_req(ob) for ob in order_by]) else: raise Exception("Unsupported order_by") with open(path, "w") as f: w = csv.writer(f, lineterminator=os.linesep) if len(items): if fields is not None: header = list(fields) if include_extra_fields: header.append(set(ds.get_object_type(object_type).fields) - set(header)) else: header = ds.get_object_type(object_type).fields w.writerow(header) for o in items: row = [o.get(k) for k in header] w.writerow(row) @staticmethod def _blank_ini(): parser = configparser.ConfigParser(interpolation=None) parser.optionxform = str return parser def _load_ini(self, path: Path): ini = self._blank_ini() if len(ini.read(str(path))) != 1: raise IOError("Could not load ini file: {}".format(path)) return ini @staticmethod def _store_ini(ini, path): with open(path, "w") as f: ini.write(f) def remove(self, name: str): try: object_type, path = self._csv[name] os.remove(str(path)) except KeyError: shutil.rmtree(self._basedir / name) class LazyRwDataSet(object): def __init__(self, dsm: DataSetManager, name, clean): self._dsm = dsm self._name = name self._clean = clean def __enter__(self) -> DataSet: cookie = self._dsm.cookie_for_ds(self._name) if cookie.exists(): if self._clean: self._dsm.remove(self._name) ds = DataSet() else: ds = self._dsm.load(self._name) else: ds = DataSet() self._ds = ds return ds def __exit__(self, *args): self._dsm.store(self._ds, self._name) return False def create_message(data_set: DataSet, message: str, level: str): return data_set.create_object("message", "message-{}".format(str(abs(hash(message))))). \ set("message", message). \ set("level", level)