from pathlib import Path from typing import List, MutableMapping, Optional, Iterator, Union from ee import EeException from ee.money import Money from ee.xml import types __all__ = [ "Part", "PartDb", "load_db", "save_db", ] class Reference(object): pass def to_xml(self): return None class PartReference(Reference): def __init__(self, uri: str): self.uri = uri def to_xml(self): return types.PartReference(part_uri=self.uri) class SchematicReference(Reference): def __init__(self, reference: str): self.reference = reference def to_xml(self): return types.SchematicReference(reference=self.reference) class PartNumber(Reference): def __init__(self, value: str): self.value = value def to_xml(self): return types.PartNumber(value=self.value) class SupplierPartNumber(Reference): def __init__(self, value: str): self.value = value def to_xml(self): return types.SupplierPartNumber(value=self.value) class ReferenceList(object): def __init__(self, part_uri): self.part_uri_ = part_uri self.part_references: List[PartReference] = [] self.schematic_references: List[SchematicReference] = [] self.mpns: List[PartNumber] = [] self.spns: List[SupplierPartNumber] = [] self.description_references: List[str] = [] def to_xml(self): part_references = [r.to_xml() for r in self.part_references if isinstance(r, PartReference)] schematic_references = [r.to_xml() for r in self.schematic_references if isinstance(r, SchematicReference)] mpns = [r.to_xml() for r in self.mpns if isinstance(r, PartNumber)] spns = [r.to_xml() for r in self.spns if isinstance(r, SupplierPartNumber)] description_references = self.description_references if len(part_references) or len(schematic_references) or len(mpns) or len(spns) or \ len(description_references): return types.ReferenceList(part_reference=part_references, schematic_reference=schematic_references, part_number=mpns, supplier_part_number=spns, description=description_references) # Part Reference def add_part_reference(self, uri): self.part_references.append(PartReference(uri)) def get_exactly_one_part_reference(self) -> PartReference: refs = self.part_references if len(refs) == 0: raise EeException("This part does not contain any part references{}". format(", uri=" + self.part_uri_ if self.part_uri_ else "")) if len(refs) != 1: raise EeException("This part does not contain exactly one part reference: {}". format(", ".join([ref.uri for ref in refs]))) return refs[0] # Schematic references def add_schematic_reference(self, ref): self.schematic_references.append(SchematicReference(reference=ref)) def get_only_schematic_reference(self) -> Optional[SchematicReference]: return next(iter(self.schematic_references), None) def get_exactly_one_schematic_reference(self) -> SchematicReference: refs = self.schematic_references if len(refs) == 0: raise EeException("This part does not contain any schematic references{}". format(", uri=" + self.part_uri_ if self.part_uri_ else "")) if len(refs) != 1: raise EeException("This part does not contain exactly one schematic reference: {}". format(", ".join([ref.reference for ref in refs]))) return refs[0] # MPNs def add_mpn(self, mpn: str): self.mpns.append(PartNumber(value=mpn)) def get_only_mpn(self) -> Optional[PartNumber]: return next(iter(self.mpns), None) def get_exactly_one_mpn(self) -> PartNumber: mpns = self.mpns if len(mpns) == 0: raise EeException("This part does not contain any manufacturer part numbers{}". format(", uri=" + self.part_uri_ if self.part_uri_ else "")) if len(mpns) != 1: raise EeException("This part does not contain exactly one mpn: {}". format(", ".join([mpn.value for mpn in mpns]))) return mpns[0] # SPNs def add_spn(self, mpn: str): self.spns.append(SupplierPartNumber(value=mpn)) def get_only_spn(self) -> Optional[SupplierPartNumber]: return next(iter(self.spns), None) def get_exactly_one_spn(self) -> SupplierPartNumber: spns = self.spns if len(spns) == 0: raise EeException("This part does not contain any supplier part numbers{}". format(", uri=" + self.part_uri_ if self.part_uri_ else "")) if len(spns) != 1: raise EeException("This part does not contain exactly one spn: {}". format(", ".join([spn.value for spn in spns]))) return spns[0] def add_description_reference(self, description: str): self.description_references.append(description) # TODO: Replace self.xml.referencesProp with ReferenceList class Part(object): def __init__(self, xml: types.Part): assert type(xml) == types.Part self.xml = xml xml.referencesProp = xml.referencesProp if xml.referencesProp is not None else types.ReferenceList() xml.price_breaksProp = xml.price_breaksProp if xml.price_breaksProp is not None else types.PriceBreakList() xml.linksProp = xml.linksProp if xml.linksProp is not None else types.LinkList() xml.factsProp = xml.factsProp if xml.factsProp is not None else types.FactList() self.facts = Facts(self) def clean_xml(self): x = self.xml if len(x.referencesProp.part_referenceProp) == 0 and \ len(x.referencesProp.schematic_referenceProp) == 0 and \ len(x.referencesProp.part_numberProp) == 0 and \ len(x.referencesProp.supplier_part_numberProp) == 0: x.referencesProp = None if len(x.price_breaksProp.price_break) == 0: x.price_breaksProp = None if len(x.linksProp.link) == 0: x.linksProp = None if len(x.factsProp.fact) == 0: x.factsProp = None @property def underlying(self) -> types.Part: return self.xml @property def uri(self) -> str: return self.xml.uriProp @property def supplier(self) -> str: return self.xml.supplierProp # Part number ref def add_part_reference(self, uri): self.get_part_references().append(types.PartReference(part_uri=uri)) def get_part_references(self) -> List[types.PartReference]: return self.xml.referencesProp.part_referenceProp def get_exactly_one_part_reference(self) -> types.PartReference: refs = self.get_part_references() if len(refs) == 0: raise EeException("This part does not contain any part references{}". format(", uri=" + self.uri if self.uri else "")) if len(refs) != 1: raise EeException("This part does not contain exactly one part reference: {}". format(", ".join([ref.part_uriProp for ref in refs]))) return refs[0] # Schematic references def add_schematic_reference(self, ref): self.get_schematic_references().append(types.SchematicReference(reference=ref)) def get_schematic_references(self) -> List[types.SchematicReference]: return self.xml.referencesProp.schematic_referenceProp def get_only_schematic_reference(self) -> Optional[types.SchematicReference]: return next(iter(self.get_schematic_references()), None) def get_exactly_one_schematic_reference(self) -> types.SchematicReference: refs = self.get_schematic_references() if len(refs) == 0: raise EeException("This part does not contain any schematic references{}". format(", uri=" + self.uri if self.uri else "")) if len(refs) != 1: raise EeException("This part does not contain exactly one schematic reference: {}". format(", ".join([ref.referenceProp for ref in refs]))) return refs[0] # MPNs def add_mpn(self, mpn: str): self.get_mpns().append(types.PartNumber(value=mpn)) def get_mpns(self) -> List[types.PartNumber]: return self.xml.referencesProp.part_numberProp def get_only_mpn(self) -> Optional[types.PartNumber]: return next(iter(self.get_mpns()), None) def get_exactly_one_mpn(self) -> types.PartNumber: mpns = self.get_mpns() if len(mpns) == 0: raise EeException("This part does not contain any manufacturer part numbers{}". format(", uri=" + self.uri if self.uri else "")) if len(mpns) != 1: raise EeException("This part does not contain exactly one mpn: {}". format(", ".join([mpn.valueProp for mpn in mpns]))) return mpns[0] # SPNs def add_spn(self, mpn: str): self.get_spns().append(types.SupplierPartNumber(value=mpn)) def get_spns(self) -> List[types.SupplierPartNumber]: return self.xml.referencesProp.supplier_part_numberProp def get_only_spn(self) -> Optional[types.SupplierPartNumber]: return next(iter(self.get_spns()), None) def get_exactly_one_spn(self) -> types.SupplierPartNumber: spns = self.get_spns() if len(spns) == 0: raise EeException("This part does not contain any manufacturer part numbers{}". format(", uri=" + self.uri if self.uri else "")) if len(spns) != 1: raise EeException("This part does not contain exactly one spn: {}". format(", ".join([spn.valueProp for spn in spns]))) return spns[0] # Price breaks def add_price_break(self, quantity, price: Money): amount = types.Amount(value=price.amount, currency=price.currency) pb = types.PriceBreak(quantity=quantity, amount=amount) self.xml.price_breaksProp.price_break.append(pb) # Links def get_links(self) -> List[types.Link]: return self.xml.linksProp.link # Facts def get_facts(self) -> List[types.Fact]: return self.xml.factsProp.fact def find_fact(self, key: str) -> Optional[types.Fact]: return next((f for f in self.get_facts() if f.keyProp == key), None) class Facts(object): def __init__(self, part): self.part = part def add(self, key: str, value: str, label=None): self.part.get_facts().append(types.Fact(key=key, label=label, value=value)) class Entry(object): def __init__(self, new: bool, part: types.Part): self.new = new self.part = part self.pn = next((p.valueProp for p in Part(part).get_mpns()), None) class AssemblyPart(object): def __init__(self, uri: Optional[str]): self.count = 0 self.sub_parts: List[AssemblyPart] = [] self.references = ReferenceList(uri) def add_sub_part(self, ap: "AssemblyPart"): self.sub_parts.append(ap) class Assembly(object): def __init__(self): self.parts: List[AssemblyPart] = [] class PartDb(object): def __init__(self): self.parts: List[Entry] = [] self.pn_index: MutableMapping[str, Entry] = {} self.new_entries = 0 self._assembly: Optional[Assembly] = None def add_entry(self, part: Union[Part, types.Part], new: bool): if isinstance(part, Part): part = part.underlying e = Entry(new, part) self.parts.append(e) if e.pn: self.pn_index[e.pn] = e if e.new: self.new_entries = self.new_entries + 1 def iterparts(self, sort=False) -> Iterator[types.Part]: it = (e.part for e in self.parts) return sorted(it, key=lambda p: p.uriProp) if sort else it def size(self) -> int: return len(self.parts) def find_by_pn(self, pn: str) -> Optional[types.Part]: entry = self.pn_index.get(pn, None) return entry.part if entry else None @property def has_assembly(self): return self._assembly is not None @property def assembly(self): if self._assembly is None: self._assembly = Assembly() return self._assembly def load_db(path: Path) -> PartDb: db = PartDb() with path.open("r") as f: part_db: types.PartDb = types.parse(f, silence=True) part_db.partsProp = part_db.partsProp or types.PartList() for p in part_db.partsProp.part: db.add_entry(p, False) return db def find_root_tag(root): return next((tag for tag, klass in types.GDSClassesMapping.items() if klass == type(root)), None) def save_db(path: Path, db: PartDb, sort=False): part_db = types.PartDb() if db.size() > 0: part_db.parts = types.PartList() for part in db.iterparts(sort=sort): p = Part(part) p.clean_xml() part_db.parts.partProp.append(p.underlying) if db.has_assembly: def to_xml(ap: AssemblyPart): xml = types.AssemblyPart() if ap.count != 0: xml.countProp = ap.count if ap.sub_parts: xml.sub_partsProp = types.AssemblyPartList([to_xml(ap_) for ap_ in ap.sub_parts]) xml.set_references(ap.references.to_xml()) return xml assembly = db.assembly part_list = types.AssemblyPartList() for ap in assembly.parts: part_list.add_assembly_part(to_xml(ap)) part_db.assemblyProp = types.Assembly(assembly_parts=part_list) with path.open("w") as f: part_db.export(outfile=f, level=0, name_=find_root_tag(part_db))