From d72247b46519609fb0b373d34bcc5d5939d7b9c3 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 17 Jul 2018 00:42:19 +0200 Subject: wip --- src/ee/fact/__init__.py | 276 +++++++++++++++++++++++++++++++++++++----------- src/ee/kicad/doit.py | 169 +++++++++++++++-------------- 2 files changed, 297 insertions(+), 148 deletions(-) (limited to 'src') diff --git a/src/ee/fact/__init__.py b/src/ee/fact/__init__.py index 659558f..959e755 100644 --- a/src/ee/fact/__init__.py +++ b/src/ee/fact/__init__.py @@ -1,97 +1,249 @@ -from typing import Optional, Mapping -import os.path +from typing import Optional, Mapping, List import configparser +import os +from pathlib import Path +from functools import total_ordering +import logging + +logger = logging.getLogger(__name__) + +@total_ordering +class ObjectType(object): + def __init__(self, name: str): + self._name = name + self._fields = [] + self._objects = {} + + def __eq__(self, o: object) -> bool: + other = o # type ObjectType + return isinstance(o, ObjectType) and self._name == other._name + + def __lt__(self, o: object) -> bool: + if not isinstance(o, ObjectType): + return True -class ObjectDescriptor(object): - def __init__(self): - self._keys = [] + other = o # type ObjectType + return (self._name) < (self._name) - def index_of(self, key, create: bool = False) -> int: + def __hash__(self) -> int: + return self._name.__hash__() + + def by_key(self, key: str) -> "Object": try: - return self._keys.index(key) + return self._objects[key] + except ValueError: + o = Object(self, key, {}, {}) + self._objects[key] = o + return o + + @property + def name(self): + return self._name + + @property + def fields(self): + return self._fields + + def index_of(self, field: str, create: bool = False) -> int: + try: + return self._fields.index(field) except ValueError as e: if not create: raise e - self._keys.append(key) - return len(self._keys) - 1 - - @property - def keys(self): - return self._keys + self._fields.append(field) + return len(self._fields) - 1 class Object(object): - def __init__(self, key, descriptor): + def __init__(self, ds: "DataSet", ot: ObjectType, key: str): + self._ds = ds + self._ot = ot self._key = key - self._descriptor = descriptor self._data = [] + @property + def object_type(self): + return self._ot + @property def key(self): return self._key def set(self, key: str, value: str): - idx = self._descriptor.index_of(key, create=True) + if self._ds._frozen: + raise Exception("This data set is frozen") + idx = self._ot.index_of(key, create=True) self._data.insert(idx, value) - def merge(self, kv: Mapping[str, str]): - for k, v in kv.items(): - self.set(k, v) + def _set_from_object(self, other: "Object"): + for k in other._ot.fields: + self.set(k, other.get(k)) def get(self, key: str) -> Optional[str]: - return self._data[self._descriptor.index_of(key)] + idx = self._ot.index_of(key) + return self._data[idx] -class ObjectSet(object): - def __init__(self, meta = dict()): - self._objects = {} - self._meta = meta - self._descriptor = ObjectDescriptor() +class DataSet(object): + def __init__(self, name): + self._name = name + self._object_types = {} + self._objects_by_type = {} # type: Mapping[str, Mapping[str, Object]] + self._frozen = False + self._changed = False + + @property + def name(self): + return self._name + + def freeze(self): + self._frozen = True + + def get_object_type(self, object_type: str) -> ObjectType: + try: + return self._object_types[object_type] + except KeyError: + ot = ObjectType(object_type) + self._object_types[object_type] = ot + self._changed = True + return ot + + def get_object(self, object_type: str, key: str) -> Object: + try: + objects = self._objects_by_type[object_type] + except KeyError: + if self._frozen: + raise Exception("This data set is frozen") + + objects = {} + self._objects_by_type[object_type] = objects + self._changed = True + + try: + return objects[key] + except KeyError: + if self._frozen: + raise Exception("This data set is frozen") + + ot = self.get_object_type(object_type) + o = Object(self, ot, key) + objects[key] = o + self._changed = True + return o def items(self): - return self._objects.values() - - def create_object(self, key: str): - if key in self._objects: - raise ValueError("Object already exists: {}".format(key)) - o = Object(key, self._descriptor) - self._objects[key] = o - return o - - def read(self, path): - from pathlib import Path - print("Reading objects from {}".format(path)) - for p in Path(path).glob("*.ini"): - if p.name == "object-set.ini": - continue + from itertools import chain + return list(chain.from_iterable([objects.values() for objects in self._objects_by_type.values()])) - with open(p, "r") as f: - ini = configparser.ConfigParser(interpolation = None) - ini.read(p) + def merge(self, other: "DataSet") -> "DataSet": + ds = DataSet(self._name) + for objects in self._objects_by_type.values(): + for o in objects.values(): + ds.get_object(o.object_type.name, o.key)._set_from_object(o) - key = ini.get("meta", "key") + for objects in other._objects_by_type.values(): + for o in objects.values(): + ds.get_object(o.object_type.name, o.key)._set_from_object(o) - o = self.create_object(key) - o.merge({k:v for k, v in ini.items("values")}) - print("Read {} objects".format(len(self._objects))) + return ds - def write(self, path): - print("Writing {} objects".format(len(self._objects))) +class DataSetManager(object): + def __init__(self, basedir: Path): + self._basedir = Path(basedir) - ini = configparser.ConfigParser(interpolation = None) - ini.add_section("object-set") - with open(os.path.join(path, "object-set.ini"), "w") as f: - ini.write(f) + def metafile_for_ds(self, ds_name) -> Path: + return self._basedir / ds_name / "data-set.ini" + + def create_rw(self, name, inputs: List[str]) -> "LazyDataSet": + return LazyDataSet(self, name, inputs) + + def load(self, name, freeze=False) -> DataSet: + ds_dir = Path(name) if Path(name).is_absolute() else self._basedir / name + ds_dir = ds_dir if ds_dir.is_dir() else ds_dir.parent + + logger.info("Loading DS from '{}'".format(ds_dir)) - for o in self._objects.values(): - ini = configparser.ConfigParser(interpolation = None) + ini = self._load_ini(ds_dir / "data-set.ini") + name = ini.get("data-set", "name") + + ds = DataSet(name) + count = 0 + for ot_path in ds_dir.glob("*"): + if not ot_path.is_dir(): + continue + + ot = ot_path.name + logger.info(" Loading type '{}'".format(ot)) + for o_path in ot_path.glob("*.ini"): + count += 1 + + key = o_path.name[:-4] + logger.info(" Loading key '{}'".format(key)) + ini = self._load_ini(o_path) + o = ds.get_object(ot, key) + for k, v in ini.items("values"): + o.set(k, v) + + if freeze: + ds.freeze() + + logger.info("Loaded {} items".format(count)) + return ds + + def store(self, ds: DataSet): + ds_dir = self._basedir / ds.name + logger.info("Storing DS '{}' with {} objects to {}".format(ds.name, len(ds.items()), ds_dir)) + + os.makedirs(ds_dir, exist_ok=True) + ini = self._blank_ini() + ini.add_section("data-set") + ini.set("data-set", "name", ds.name) + self._store_ini(ini, ds_dir / "data-set.ini") + + for o in ds.items(): + ot = o.object_type + key = o.key + + ot_dir = ds_dir / ot.name + os.makedirs(ot_dir, exist_ok=True) + ini = self._blank_ini() ini.add_section("meta") - ini.set("meta", "key", o.key) + ini.set("meta", "type", ot.name) ini.add_section("values") - for key in sorted(self._descriptor.keys): - value = o.get(key) - if value: - ini.set("values", key, value) + for k in ot.fields: + v = o.get(k) + ini.set("values", k, v) + self._store_ini(ini, ot_dir / "{}.ini".format(key)) + + def _blank_ini(self): + return configparser.ConfigParser(interpolation = None) + + def _load_ini(self, path: Path): + ini = self._blank_ini() + if len(ini.read(str(path))) != 1: + raise IOError("Could not load ini file: {}".format(path)) + return ini + + def _store_ini(self, ini, path): + with open(path, "w") as f: + ini.write(f) - with open(os.path.join(path, "{}.ini".format(o.key)), "w") as f: - ini.write(f) +class LazyDataSet(object): + def __init__(self, dsm: DataSetManager, name, inputs): + self._dsm = dsm + self._name = name + self._inputs = inputs + + def __enter__(self): +# logger.info("enter: name={}, inputs={}".format(self._name, self._inputs)) + ds = DataSet(self._name) + for name in self._inputs: + ds = ds.merge(self._dsm.load(name, freeze=True)) + self._ds = ds + return self._ds + + def __exit__(self, *args): +# logger.info("exit: name={}, inputs={}".format(self._name, self._inputs)) +# logger.info("ds.size={}".format(len(self._ds.items()))) + self._dsm.store(self._ds) + return False diff --git a/src/ee/kicad/doit.py b/src/ee/kicad/doit.py index cc9936c..10de885 100644 --- a/src/ee/kicad/doit.py +++ b/src/ee/kicad/doit.py @@ -2,6 +2,11 @@ import os.path from doit.exceptions import TaskFailed from doit.tools import check_timestamp_unchanged from configclass import Config +from typing import List +from ee.fact import DataSetManager +import logging + +logger = logging.getLogger(__name__) class KicadDoitTasks(object): config = Config({ @@ -9,15 +14,14 @@ class KicadDoitTasks(object): "kicad_pcb": None, "gerber_dir": None, "gerber_zip": None, - "kicad_sch_object_set_dir": None, - "component_object_set_dir": None, + "data_set_dir": None, }) def __init__(self, *args, **kwargs): self.config = self.config.make(kwargs) def echo(*args, **kwargs): - print("_task: args={}, kwars={}".format(args, kwargs)) + logger.info("_task: args={}, kwars={}".format(args, kwargs)) def tasks(self, *args, **kwargs): import ee.kicad @@ -25,107 +29,100 @@ class KicadDoitTasks(object): sch = self.config["sch"] tasks = [] + dsm = DataSetManager(self.config["data_set_dir"]) + gerber_dir = self.config["gerber_dir"] if gerber_dir: - gerber_zip = self.config["gerber_zip"] or "{}.zip".format(gerber_dir) - #print("gerber_zip={}".format(gerber_zip)) - - eg = next((p for p in (os.path.join(p, "export_gerber.py") for p in ee.kicad.__path__) if os.path.isfile(p)), None) - if not eg: - raise Exception("Could not find export_gerber.py") - - # TODO: replace with python - mkdir = "mkdir -p {}".format(gerber_dir) - export_gerber = " ".join([ - eg, - "--pcb", kicad_pcb, - "--output-directory", gerber_dir, - "--protel-extensions", - ]) - def make_zip(): - import zipfile - from pathlib import Path - with zipfile.ZipFile(gerber_zip, "w") as z: - for p in Path(gerber_dir).iterdir(): - if not p.is_file(): - continue - z.write(p, arcname=p.relative_to(gerber_dir)) - - tasks.append({ - "name": "kicad_gerber", - "targets": [gerber_zip], - "actions": [mkdir, export_gerber, make_zip], - "file_dep": [kicad_pcb], - }) - - kicad_sch_object_set_dir = self.config["kicad_sch_object_set_dir"] - component_object_set_dir = self.config["component_object_set_dir"] - - if sch and kicad_sch_object_set_dir: - tasks.append(task_create_kicad_sch_objects(sch, kicad_sch_object_set_dir)) - - if kicad_sch_object_set_dir and component_object_set_dir: - tasks.append(task_kicad_sch_to_component_object(kicad_sch_object_set_dir, component_object_set_dir)) - - for t in tasks: - yield t - -def task_create_kicad_sch_objects(sch, os_dir, name="create-kicad-sch-objects"): + tasks.append(task_kicad_gerber()) + + sch_ds = task_kicad_sch_to_data_set(dsm, sch, \ + in_data_sets=[], \ + out_data_set="kicad-sch-data-set") \ + if sch else None + + component_ds = task_kicad_create_component_data_set(dsm, \ + in_data_sets=sch_ds["targets"], \ + out_data_set="raw-component") \ + if sch_ds else None + + return (t for t in [sch_ds, component_ds] if t is not None) + +def task_kicad_gerber(name="kicad-gerber"): + gerber_zip = self.config["gerber_zip"] or "{}.zip".format(gerber_dir) + #logger.info("gerber_zip={}".format(gerber_zip)) + + eg = next((p for p in (os.path.join(p, "export_gerber.py") for p in ee.kicad.__path__) if os.path.isfile(p)), None) + if not eg: + raise Exception("Could not find export_gerber.py") + + # TODO: replace with python + mkdir = "mkdir -p {}".format(gerber_dir) + export_gerber = " ".join([ + eg, + "--pcb", kicad_pcb, + "--output-directory", gerber_dir, + "--protel-extensions", + ]) + def make_zip(): + import zipfile + from pathlib import Path + with zipfile.ZipFile(gerber_zip, "w") as z: + for p in Path(gerber_dir).iterdir(): + if not p.is_file(): + continue + z.write(p, arcname=p.relative_to(gerber_dir)) + + return { + "name": name, + "targets": [gerber_zip], + "actions": [mkdir, export_gerber, make_zip], + "file_dep": [kicad_pcb], + } + +def task_kicad_sch_to_data_set(dsm: DataSetManager, sch, in_data_sets: List[str], out_data_set, name="kicad-sch-to-data-set"): def action(): import ee.kicad from ee.kicad.model import Component, ComponentField import csv import configparser - import ee.fact as fact - schematics = ee.kicad.read_schematics(sch) - cs = [c for c in schematics.components] - os.makedirs(os_dir, exist_ok=True) - # TODO: remove directory? - oset = fact.ObjectSet() - oset.read(os_dir) - for c in cs: - o = oset.create_object(c.timestamp) - o.set("ref", c.ref) - o.set("ref-type", c.ref_type) - if c.has_ref_num: - o.set("ref-num", str(c.ref_num)) - o.set("value", c.value) - if c.footprint: - o.set("footprint", c.footprint) - - for f in c.fields: - if f.value and f.name not in ComponentField.names: - o.set(f.name, str(f.value)) - - oset.write(os_dir) + with dsm.create_rw(out_data_set, inputs=in_data_sets) as ds: + schematics = ee.kicad.read_schematics(sch) + for c in [c for c in schematics.components]: + o = ds.get_object("kicad-schematic-component", c.timestamp) + o.set("ref", c.ref) + o.set("ref-type", c.ref_type) + if c.has_ref_num: + o.set("ref-num", str(c.ref_num)) + o.set("value", c.value) + if c.footprint: + o.set("footprint", c.footprint) + + for f in c.fields: + if f.value and f.name not in ComponentField.names: + o.set(f.name, str(f.value)) return { "name": name, - "file_dep": [sch], + "file_dep": [sch] + [dsm.metafile_for_ds(ds) for ds in in_data_sets], "actions": [action], - "targets": [os.path.join(os_dir, "object-set.ini")], + "targets": [dsm.metafile_for_ds(out_data_set)], } -def task_kicad_sch_to_component_object(kicad_sch_os_dir, component_os_dir, name="kicad-sch-to-component-object"): - def action(): +def task_kicad_create_component_data_set(dsm: DataSetManager, in_data_sets: List[str], out_data_set, name="kicad-create-component-data-set"): + def action(*args, **kwargs): + logger.info("args={}, kwargs={}".format(args, kwargs)) import ee.fact as fact - kicad_sch_os = fact.ObjectSet() - kicad_sch_os.read(kicad_sch_os_dir) - - os.makedirs(component_os_dir, exist_ok=True) - component_os = fact.ObjectSet() - component_os.read(component_os_dir) - - for o in kicad_sch_os.items(): - print("processing {}".format(o.key)) - - component_os.write(component_os_dir) + with dsm.create_rw(out_data_set, inputs=in_data_sets) as ds: + items = ds.items() + logger.info("Got {} objects".format(len(items))) + for o in items: + logger.info("processing {}".format(o.key)) return { "name": name, - "file_dep": [os.path.join(kicad_sch_os_dir, "object-set.ini")], + "file_dep": [dsm.metafile_for_ds(ds) for ds in in_data_sets], "actions": [action], - "targets": [os.path.join(component_os_dir, "object-set.ini")], + "targets": [dsm.metafile_for_ds(out_data_set)], } -- cgit v1.2.3