aboutsummaryrefslogtreecommitdiff
path: root/src/ee/ds/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ee/ds/__init__.py')
-rw-r--r--src/ee/ds/__init__.py491
1 files changed, 0 insertions, 491 deletions
diff --git a/src/ee/ds/__init__.py b/src/ee/ds/__init__.py
deleted file mode 100644
index 915dd6f..0000000
--- a/src/ee/ds/__init__.py
+++ /dev/null
@@ -1,491 +0,0 @@
-import configparser
-import csv
-import logging
-import os
-import shutil
-from functools import total_ordering
-from pathlib import Path
-from typing import MutableMapping, Optional, List, Tuple, Union, Iterator, Iterable
-
-logger = logging.getLogger(__name__)
-
-
-@total_ordering
-class ObjectType(object):
- def __init__(self, name: str):
- self._name = name
- self._fields = [] # type: List[str]
-
- def __eq__(self, o) -> bool:
- other = o # type: ObjectType
- return isinstance(o, ObjectType) and self._name == other._name
-
- def __lt__(self, o: object) -> bool:
- if not isinstance(o, ObjectType):
- return True
-
- other = o # type: ObjectType
- return self._name < other._name
-
- def __hash__(self) -> int:
- return self._name.__hash__()
-
- @property
- def name(self):
- return self._name
-
- @property
- def fields(self):
- return self._fields
-
- def index_of(self, field: str, create: bool = False) -> Optional[int]:
- try:
- return self._fields.index(field)
- except ValueError:
- if not create:
- return None
-
- self._fields.append(field)
- return len(self._fields) - 1
-
- def has(self, *keys: str):
- return all([key in self._fields for key in keys])
-
-
-class Object(object):
- class ValueList(list):
- """An auto-expanding version of list."""
-
- def __setitem__(self, index, value):
- if index >= len(self):
- self.extend([None] * (index + 1 - len(self)))
- list.__setitem__(self, index, value)
-
- def __getitem__(self, index):
- if index >= len(self):
- self.extend([None] * (index + 1 - len(self)))
- return list.__getitem__(self, index)
-
- def __init__(self, ds: "DataSet", ot: ObjectType, key: str):
- self._ds = ds
- self._ot = ot
- self._key = key
- self._data = Object.ValueList()
-
- @property
- def object_type(self):
- return self._ot
-
- @property
- def key(self):
- return self._key
-
- def set(self, key: str, value: str) -> "Object":
- if self._ds._frozen:
- raise Exception("This data set is frozen")
- idx = self._ot.index_of(key, create=True)
- self._data[idx] = value
-
- return self
-
- def _set_from_object(self, other: "Object"):
- for k in other._ot.fields:
- self.set(k, other.get(k))
-
- def has_values(self, *keys: str) -> bool:
- return all([len(value) > 0 for value in [self.get(key) for key in keys] if value is not None])
-
- def values(self, *keys: str, strip: bool = False, required: bool = True) -> List[Optional[str]]:
- """Looks up all values for all keys.
-
- If required=True, strip is also set to True
-
-
- If strip is True, all values are stripped with str.strip(). None values are preserved.
-
- If required=True, all values has to have a len() > 0. If any fails the requirement, a list with only None values
- is returned.
- """
-
- values = []
-
- strip = True if required else strip
-
- for key in keys:
- v = self.get(key)
-
- if strip:
- v = v.strip() if v else v
-
- if required:
- if v is None or len(v) == 0:
- return [None] * len(keys)
-
- values.append(v)
-
- return values
-
- def get(self, key: str) -> Optional[str]:
- idx = self._ot.index_of(key)
- return self._data[idx] if idx is not None else None
-
- def get_req(self, key: str) -> str:
- idx = self._ot.index_of(key)
- if idx is not None and idx < len(self._data):
- return self._data[idx]
- else:
- raise Exception("No such field: {}".format(key))
-
- def get_all(self, *keys: str) -> Optional[List[str]]:
- values = []
- for key in keys:
- idx = self._ot.index_of(key)
- if not idx or idx >= len(self._data):
- return None
- values.append(self._data[idx])
- return values
-
-
-class DataSet(object):
- def __init__(self):
- self._object_types = {} # type: MutableMapping[str, ObjectType]
- self._objects_by_type = {} # type: MutableMapping[ObjectType, MutableMapping[str, Object]]
- self._frozen = False
-
- def __len__(self):
- return sum((len(objects) for objects in self._objects_by_type.values()))
-
- def freeze(self):
- self._frozen = True
-
- def _assert_not_frozen(self):
- if self._frozen:
- raise Exception("This data set is frozen")
-
- def _check_object_type(self, object_type: str, create: bool) -> \
- Optional[Tuple[ObjectType, MutableMapping[str, Object]]]:
- try:
- ot = self._object_types[object_type]
- objects = self._objects_by_type[ot]
- return ot, objects,
- except KeyError:
- if not create:
- return None
-
- self._assert_not_frozen()
-
- ot = ObjectType(object_type)
- self._object_types[object_type] = ot
- self._objects_by_type[ot] = objects = {}
- return ot, objects,
-
- def _check_object(self, object_type: str, key: str, create: bool) -> Optional[Object]:
- t = self._check_object_type(object_type, create)
-
- if not t:
- return None
-
- ot, objects = t
- try:
- return objects[key]
- except KeyError:
- self._assert_not_frozen()
-
- if not create:
- raise Exception("No such object: {}:{}".format(object_type, key))
-
- o = Object(self, ot, key)
- objects[key] = o
- return o
-
- def get_object_type(self, object_type: str) -> ObjectType:
- t = self._check_object_type(object_type, False)
-
- if not t:
- raise Exception("No such object type: {}".format(object_type))
-
- ot, objects = t
- return ot
-
- def get_object(self, object_type: str, key: str) -> Object:
- o = self._check_object(object_type, key, False)
-
- if not o:
- raise Exception("No such object: {}:{}".format(object_type, key))
-
- return o
-
- def has_object(self, object_type: str, key: str) -> bool:
- t = self._check_object_type(object_type, False)
-
- if t:
- ot, objects = t
- return key in objects
-
- return False
-
- def get_or_create_object(self, object_type: str, key: str) -> Object:
- return self._check_object(object_type, key, True)
-
- def create_object(self, object_type: str, key: str, replace=False) -> Object:
- self._assert_not_frozen()
-
- if self.has_object(object_type, key):
- if not replace:
- raise Exception("Object already exist: {}:{}".format(object_type, key))
-
- ot, objects = self._check_object_type(object_type, False)
- del self._objects_by_type[ot][key]
-
- return self._check_object(object_type, key, True)
-
- def items(self) -> Iterator[Object]:
- for objects in self._objects_by_type.values():
- for o in objects.values():
- yield o
-
- def merge(self, other: "DataSet") -> "DataSet":
- ds = DataSet()
- for objects in self._objects_by_type.values():
- for o in objects.values():
- ds.create_object(o.object_type.name, o.key)._set_from_object(o)
-
- for objects in other._objects_by_type.values():
- for o in objects.values():
- ds.get_or_create_object(o.object_type.name, o.key)._set_from_object(o)
-
- return ds
-
- def import_object(self, other: Object) -> Object:
- o = self._check_object(other.object_type.name, other.key, create=True)
-
- for k in other.object_type.fields:
- o.set(k, other.get(k))
-
- return o
-
-
-class DataSetManager(object):
- def __init__(self, basedir: Union[Path, str]):
- self._basedir = Path(basedir)
- self._csv = {} # type: MutableMapping[str, Tuple[str, Path]]
-
- @property
- def all_data_sets(self):
- datasets = [ds.name for ds in self._basedir.iterdir() if (ds / "data-set.ini").is_file()]
- return list(self._csv.keys()) + datasets
-
- def cookie_for_ds(self, ds_name) -> Path:
- try:
- return self._csv[ds_name][1]
- except KeyError:
- return self._basedir / ds_name / "data-set.ini"
-
- def create_rw(self, name, clean: bool) -> "LazyRwDataSet":
- return LazyRwDataSet(self, name, clean)
-
- def load_data_sets(self, inputs: List[str], freeze: bool = True) -> DataSet:
- ds = DataSet()
- for name in inputs:
- ds = ds.merge(self.load(name, freeze=True))
-
- if freeze:
- ds.freeze()
-
- return ds
-
- def register_ds(self, ds_type: str, name: str, object_type: str, path: str = None):
- if ds_type == "csv":
- if name in self._csv:
- raise Exception("Data source already exists: {}".format(name))
-
- self._csv[name] = object_type, Path(path),
- else:
- raise Exception("Unknown data source type: {}".format(ds_type))
-
- def ds_type(self, name: str):
- return "csv" if name in self._csv else "ini-dir"
-
- def load(self, path, freeze=False) -> DataSet:
- try:
- object_type, path = self._csv[path]
-
- if not freeze:
- raise Exception("CSV data sources must be frozen")
-
- return DataSetManager._load_csv(object_type, path, freeze)
- except KeyError:
- return self._load_ini_dir(path, freeze)
-
- @staticmethod
- def _load_csv(object_type: str, path: Path, freeze: bool) -> DataSet:
- # logger.debug("Loading CSV file {}".format(path))
- ds = DataSet()
-
- with open(str(path), newline='') as f:
- r = csv.reader(f)
-
- header = next(r, None)
- for row in r:
- if len(row) == 0:
- continue
-
- key = row[0]
-
- o = ds.create_object(object_type, key)
- for idx, value in zip(range(0, min(len(row), len(header))), row):
- o.set(header[idx], value)
-
- if freeze:
- ds.freeze()
-
- # logger.debug("Loaded {} objects".format(len(ds)))
- return ds
-
- def _load_ini_dir(self, _path: str, freeze: bool) -> DataSet:
- ds_dir = Path(_path) if Path(_path).is_absolute() else self._basedir / _path
- ds_dir = ds_dir if ds_dir.is_dir() else ds_dir.parent
-
- # logger.debug("Loading DS from '{}'".format(ds_dir))
-
- self._load_ini(ds_dir / "data-set.ini")
-
- ds = DataSet()
- count = 0
- for ot_path in ds_dir.glob("*"):
- if not ot_path.is_dir():
- continue
-
- ot = ot_path.name
- # logger.debug(" Loading type '{}'".format(ot))
- for o_path in ot_path.glob("*.ini"):
- count += 1
-
- key = o_path.name[:-4]
- # logger.debug(" Loading key '{}'".format(key))
- ini = self._load_ini(o_path)
- o = ds.create_object(ot, key)
- for k, v in ini.items("values"):
- o.set(k, v)
-
- if freeze:
- ds.freeze()
-
- # logger.debug("Loaded {} items".format(count))
- return ds
-
- def store(self, ds: DataSet, ds_name: str):
- ds_dir = self._basedir / ds_name
- items = list(ds.items())
- # logger.info("Storing DS '{}' with {} objects to {}".format(ds_name, len(items), ds_dir))
-
- os.makedirs(ds_dir, exist_ok=True)
- ini = self._blank_ini()
- ini.add_section("data-set")
- ini.set("data-set", "name", ds_name)
- self._store_ini(ini, ds_dir / "data-set.ini")
-
- for o in items:
- ot = o.object_type
- key = o.key
-
- ot_dir = ds_dir / ot.name
- os.makedirs(ot_dir, exist_ok=True)
- ini = self._blank_ini()
- ini.add_section("meta")
- ini.set("meta", "type", ot.name)
-
- ini.add_section("values")
- for k in ot.fields:
- v = o.get(k)
- if v:
- ini.set("values", k, str(v))
- self._store_ini(ini, ot_dir / "{}.ini".format(key))
-
- # noinspection PyMethodMayBeStatic
- def store_csv(self, path: Union[str, Path], ds: DataSet, object_type: str,
- order_by: Union[str, Iterable[str]] = None, fields: List[str] = None,
- include_extra_fields: bool = True):
- items = [o for o in ds.items() if o.object_type.name == object_type]
-
- if order_by:
- if isinstance(order_by, str):
- items = sorted(items, key=lambda o: o.get_req(order_by))
- elif isinstance(order_by, Iterable):
- items = sorted(items, key=lambda o: [o.get_req(ob) for ob in order_by])
- else:
- raise Exception("Unsupported order_by")
-
- with open(path, "w") as f:
- w = csv.writer(f, lineterminator=os.linesep)
-
- if len(items):
-
- if fields is not None:
- header = list(fields)
-
- if include_extra_fields:
- header.append(set(ds.get_object_type(object_type).fields) - set(header))
- else:
- header = ds.get_object_type(object_type).fields
- w.writerow(header)
-
- for o in items:
- row = [o.get(k) for k in header]
- w.writerow(row)
-
- @staticmethod
- def _blank_ini():
- parser = configparser.ConfigParser(interpolation=None)
- parser.optionxform = str
- return parser
-
- def _load_ini(self, path: Path):
- ini = self._blank_ini()
- if len(ini.read(str(path))) != 1:
- raise IOError("Could not load ini file: {}".format(path))
- return ini
-
- @staticmethod
- def _store_ini(ini, path):
- with open(path, "w") as f:
- ini.write(f)
-
- def remove(self, name: str):
- try:
- object_type, path = self._csv[name]
- os.remove(str(path))
- except KeyError:
- shutil.rmtree(self._basedir / name)
-
-
-class LazyRwDataSet(object):
- def __init__(self, dsm: DataSetManager, name, clean):
- self._dsm = dsm
- self._name = name
- self._clean = clean
-
- def __enter__(self) -> DataSet:
- cookie = self._dsm.cookie_for_ds(self._name)
-
- if cookie.exists():
- if self._clean:
- self._dsm.remove(self._name)
- ds = DataSet()
- else:
- ds = self._dsm.load(self._name)
- else:
- ds = DataSet()
-
- self._ds = ds
- return ds
-
- def __exit__(self, *args):
- self._dsm.store(self._ds, self._name)
- return False
-
-
-def create_message(data_set: DataSet, message: str, level: str):
- return data_set.create_object("message", "message-{}".format(str(abs(hash(message))))). \
- set("message", message). \
- set("level", level)