diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2019-02-24 21:51:38 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2019-02-24 21:51:38 +0100 |
commit | 80e0623913e87c6480049520590e424a831e0401 (patch) | |
tree | ff27e1d269cac886dd06ab4f9924719f84794e38 /src/ee | |
parent | 8aae5d032dd30118b6d992018391a8bd5be759e4 (diff) | |
download | ee-python-80e0623913e87c6480049520590e424a831e0401.tar.gz ee-python-80e0623913e87c6480049520590e424a831e0401.tar.bz2 ee-python-80e0623913e87c6480049520590e424a831e0401.tar.xz ee-python-80e0623913e87c6480049520590e424a831e0401.zip |
Digikey: replacing requests-based code with selenium.
Adding new tools: digikey-import-parts and digikey-refresh-parts.
Diffstat (limited to 'src/ee')
-rw-r--r-- | src/ee/__main__.py | 3 | ||||
-rw-r--r-- | src/ee/digikey/__init__.py | 79 | ||||
-rw-r--r-- | src/ee/digikey/import_parts.py | 100 | ||||
-rw-r--r-- | src/ee/digikey/refresh_parts.py | 97 | ||||
-rw-r--r-- | src/ee/kicad/make_bom.py | 13 | ||||
-rw-r--r-- | src/ee/tools/digikey_download_facts.py | 7 | ||||
-rw-r--r-- | src/ee/tools/digikey_import_parts.py | 17 | ||||
-rw-r--r-- | src/ee/tools/digikey_refresh_parts.py | 22 | ||||
-rw-r--r-- | src/ee/xml/bomFile.py | 582 | ||||
-rw-r--r-- | src/ee/xml/bom_file_utils.py | 50 | ||||
-rw-r--r-- | src/ee/xml/uris.py (renamed from src/ee/bom/part_type_uris.py) | 2 |
11 files changed, 933 insertions, 39 deletions
diff --git a/src/ee/__main__.py b/src/ee/__main__.py index a387b1d..177987d 100644 --- a/src/ee/__main__.py +++ b/src/ee/__main__.py @@ -38,9 +38,6 @@ def find_tools(): def main(): logging.basicConfig() # you need to initialize logging, otherwise you will not see anything from requests logging.getLogger().setLevel(logging.DEBUG) - requests_log = logging.getLogger("requests.packages.urllib3") - requests_log.setLevel(logging.DEBUG) - requests_log.propagate = True tools = find_tools() diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py index 615d458..32308e5 100644 --- a/src/ee/digikey/__init__.py +++ b/src/ee/digikey/__init__.py @@ -6,13 +6,11 @@ import os.path import re import urllib.parse from functools import total_ordering +from pathlib import Path from typing import List, Optional -import requests -from cachecontrol import CacheControl -from cachecontrol.caches.file_cache import FileCache -from cachecontrol.heuristics import ExpiresAfter from lxml import html +from selenium import webdriver import ee._utils from ee.tools import mk_parents @@ -73,11 +71,11 @@ class Digikey(object): @total_ordering class DigikeyProduct(object): - def __init__(self, part_number, mpn, url, attributes=None, categories=None): + def __init__(self, part_number, mpn, url, attributes: List["DigikeyAttributeValue"] = None, categories=None): self.part_number = _clean(part_number) self.mpn = _clean(mpn) self.url = url - self.attributes = attributes or [] + self.attributes = attributes or [] # type: List["DigikeyAttributeValue"] self.categories = categories or [] self.quantity_available = None self.description = None @@ -156,7 +154,7 @@ class DigikeyAttributeType(object): class DigikeyAttributeValue(object): - def __init__(self, value, attribute_type): + def __init__(self, value, attribute_type: DigikeyAttributeType): self.value = value self.attribute_type = attribute_type @@ -171,8 +169,8 @@ class DigikeyProductCategory(object): self.label = _clean(label) self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ "https://www.digikey.com" + digikey_url - self.parent = parent # type: DigikeyProductCategory - self.subCategories = [] # type: List[DigikeyProductCategory] + self.parent: DigikeyProductCategory = parent + self.subCategories: List[DigikeyProductCategory] = [] assert self.id assert self.label @@ -203,7 +201,7 @@ class DigikeySearchResponse(object): self.count = count self.response_type = response_type - self.products = list() # type: List[DigikeyProduct] + self.products: List[DigikeyProduct] = list() def append(self, product: DigikeyProduct): self.products.append(product) @@ -213,18 +211,57 @@ class DigikeyClient(object): def __nop(self, message): pass - def __init__(self, digikey: Digikey, cache_dir=None, on_download=None): - self.digikey = digikey + def __init__(self, cache_dir: Path = None, on_download=None): self.on_download = on_download or self.__nop - cache = FileCache(cache_dir or 'digikey_cache', forever=True) - self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=10*365)) + self.cache_dir = cache_dir or Path() + self.driver: webdriver.Chrome = None - def _req(self, url, params=None): + def search(self, query: str, page_size=10) -> str: + return self.product_search(query, page_size) + + def product_search(self, query: str, page_size=10) -> str: + params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} + cache_key = urllib.parse.quote(query) + page = self._req("https://www.digikey.com/products/en", cache_key=cache_key, params=params) + + return page + + def _req(self, url, cache_key, params=None): if not url.startswith("http"): url = "https://www.digikey.com" + url - s = "" if not params else "?" + urllib.parse.urlencode(params) - self.on_download("Downloading {}".format(url + s)) - return self.sess.get(url, params=params) + url = url + ("" if not params else "?" + urllib.parse.urlencode(params)) + + cache_path: Optional[Path] = None + if self.cache_dir: + cache_path = self.cache_dir / "{}.html".format(cache_key) + + if cache_path.exists(): + self.on_download("Using cached {}".format(url)) + with open(str(cache_path), "r") as f: + return f.read() + + self.on_download("Downloading {}".format(url)) + + if self.driver is None: + options = webdriver.ChromeOptions() + self.driver = webdriver.Chrome(chrome_options=options) + + self.driver.get(url) + + src = self.driver.page_source + if cache_path: + cache_path.parent.mkdir(parents=True, exist_ok=True) + + with open(str(cache_path), "w") as f: + f.write(src) + assert self.cache_dir.stat().st_size > 0 + + return src + + +class DigikeyParser(object): + def __init__(self, digikey: Digikey): + self.digikey = digikey or Digikey() def _search_process_single_result(self, tree: html) -> Optional[DigikeyProduct]: attributes = [] @@ -300,12 +337,6 @@ class DigikeyClient(object): return len(products) - def search(self, query: str, page_size=10) -> DigikeySearchResponse: - params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} - page = self._req("https://www.digikey.com/products/en", params=params) - - return self.parse_string(page.content) - def parse_string(self, page_content: str): tree = html.fromstring(page_content) diff --git a/src/ee/digikey/import_parts.py b/src/ee/digikey/import_parts.py new file mode 100644 index 0000000..748bbef --- /dev/null +++ b/src/ee/digikey/import_parts.py @@ -0,0 +1,100 @@ +import os +from typing import List, MutableMapping + +from ee.xml import bomFile +from ee.xml.bom_file_utils import * +from ee.xml.uris import DIGIKEY_URI + +__all__ = ["import_parts"] + + +class Entry(object): + def __init__(self, new: bool, part: bomFile.Part): + self.new = new + self.part = part + + self.pn = find_pn(part) + self.dpn = find_dpn(part, DIGIKEY_URI) + + +def import_parts(in_path, out_path): + print("in: {}, out: {}".format(in_path, out_path)) + + in_file = bomFile.parse(in_path, True) + if in_file.partsProp is None: + in_file.partsProp = bomFile.PartList() + in_part_list = in_file.partsProp.partProp # type: List[bomFile.Part] + + print("in file: {} parts".format(len(in_part_list))) + + if os.path.isfile(out_path): + out_file = bomFile.parse(out_path, True) + else: + out_file = bomFile.BomFile() + + if out_file.partsProp is None: + out_file.partsProp = bomFile.PartList() + out_part_list = out_file.partsProp.partProp # type: List[bomFile.Part] + print("out file: {} parts".format(len(out_part_list))) + + existing_parts = [] # type: List[Entry] + pn_index = {} # type: MutableMapping[str, Entry] + dpn_index = {} # type: MutableMapping[str, Entry] + new_entry_added = 0 + + def add_entry(e: Entry): + existing_parts.append(e) + pn_index[e.pn] = e + dpn_index[e.dpn] = e + + if e.new: + out_part_list.append(e.part) + nonlocal new_entry_added + new_entry_added = new_entry_added + 1 + + print("len(out_part_list)={}".format(len(out_part_list))) + for part in out_part_list: # type: bomFile.Part + entry = Entry(False, part) + add_entry(entry) + + print("loaded {} existing parts".format(len(existing_parts))) + + for part in in_part_list: + pn_value = find_pn(part) + + if pn_value is None: + print("Skipping part with no part number: id={}".format(part.idProp)) + continue + + entry = pn_index.get(pn_value) + + if entry is not None: + print("Already imported pn_value={}".format(pn_value)) + continue + + print("Importing {}".format(pn_value)) + + pns = bomFile.PartNumberList() + + if pn_value is not None: + pns.add_part_number(bomFile.PartNumber(value=pn_value)) + + dpn_value = find_dpn(part, DIGIKEY_URI) + if dpn_value is not None: + pns.add_part_number(bomFile.PartNumber(value=dpn_value, distributor=DIGIKEY_URI)) + + if len(pns.part_numberProp) == 0: + continue + + new_part = bomFile.Part(part_numbers=pns) + entry = Entry(True, new_part) + add_entry(entry) + + if new_entry_added: + print("Imported {} entries".format(new_entry_added)) + tmp_path = out_path + ".tmp" + with open(tmp_path, "w") as f: + out_file.export(f, 0, name_="bom-file") + os.rename(tmp_path, out_path) + else: + print("no new entries") diff --git a/src/ee/digikey/refresh_parts.py b/src/ee/digikey/refresh_parts.py new file mode 100644 index 0000000..87edf2f --- /dev/null +++ b/src/ee/digikey/refresh_parts.py @@ -0,0 +1,97 @@ +import os +from pathlib import Path +from typing import List + +from ee.digikey import Digikey, DigikeyParser, DigikeyClient, SearchResponseTypes, DigikeyProduct +from ee.xml import bomFile, bom_file_utils +from ee.xml.bomFile import DigikeyDistributorInfo +from ee.xml.uris import DIGIKEY_URI + +__all__ = ["refresh_parts"] + + +def resolved(di: DigikeyDistributorInfo, part: bomFile.Part, p: DigikeyProduct): + di.stateProp = "resolved" + + fact_set = bom_file_utils.find_fact_set(part, DIGIKEY_URI, create=True) + + # Remove the old list + fact_set.factsProp = bomFile.FactList() + facts: List[bomFile.Fact] = fact_set.factsProp.factProp + + for a in p.attributes: + facts.append(bomFile.Fact(key=a.attribute_type.id, label=a.attribute_type.label, value=a.value)) + + +def refresh_parts(in_path: Path, out_path: Path, cache_dir: Path, force_refresh: bool): + print("in: {}, out: {}".format(in_path, out_path)) + + in_file = bomFile.parse(str(in_path), True) + if in_file.partsProp is None: + in_file.partsProp = bomFile.PartList() + + parser = DigikeyParser(Digikey()) + client = DigikeyClient(cache_dir) + + for part in in_file.partsProp.partProp: # type: bomFile.Part + dpn = bom_file_utils.find_dpn(part, DIGIKEY_URI) + mpn = bom_file_utils.find_pn(part) + + is_mpn = query = None + + if dpn is not None: + query = dpn + is_mpn = False + elif mpn is not None: + query = mpn + is_mpn = True + + if query is None: + print("could not find pn or dpn: part.id={}".format(part.idProp)) + continue + + di = part.distributor_infoProp # type: DigikeyDistributorInfo + + if di is None: + di = bomFile.DigikeyDistributorInfo() + di.extensiontype_ = "DigikeyDistributorInfo" + di.original_tagname_ = "distributor-info" + part.distributor_infoProp = di + + if force_refresh or di.stateProp != "resolved": + text = client.search(query) + response = parser.parse_string(text) + + if response.response_type == SearchResponseTypes.SINGLE: + resolved(di, part, response.products[0]) + elif response.response_type == SearchResponseTypes.MANY: + + # find those with an exact match. Digikey uses a prefix search so a query for "FOO" will return "FOO" + # and "FOOT". + def get_field(p): + return p.mpn if is_mpn else p.part_number + + filtered_products = [p for p in response.products if get_field(p) == query] + + if len(filtered_products) == 0: + di.stateProp = "not-found" + else: + dpn = sorted(filtered_products, key=lambda p: p.part_number)[0].part_number + + response = parser.parse_string(client.search(dpn)) + if response.response_type == SearchResponseTypes.SINGLE: + resolved(di, part, response.products[0]) + else: + di.stateProp = "many" + + elif response.response_type == SearchResponseTypes.TOO_MANY: + di.stateProp = "too-many" + elif response.response_type == SearchResponseTypes.NO_MATCHES: + di.stateProp = "not-found" + + out_path = in_path + out_file = in_file + tmp_path = str(out_path) + ".tmp" + with open(tmp_path, "w") as f: + out_file.export(f, 0, name_="bom-file") + os.rename(tmp_path, str(out_path)) diff --git a/src/ee/kicad/make_bom.py b/src/ee/kicad/make_bom.py index e061f15..2f5a2ac 100644 --- a/src/ee/kicad/make_bom.py +++ b/src/ee/kicad/make_bom.py @@ -8,8 +8,7 @@ from ee.kicad.model import Component from ee.kicad.read_schematic import read_schematics from ee.kicad.to_bom import to_bom, to_bom_xml from ee.tools import mk_parents -from ee.xml import bomFile -from ee.bom import part_type_uris +from ee.xml import bomFile, uris __all__ = [ "StrategyCallable", @@ -45,15 +44,15 @@ def part_type_strategy(component: Component, part: bomFile.Part) -> bomFile.Part lib, part_name = fp.split(":") if lib == "Capacitor_SMD": - part.part_typeProp = part_type_uris.CAPACITOR + part.part_typeProp = uris.CAPACITOR elif lib == "Resistor_SMD": - part.part_typeProp = part_type_uris.RESISTOR + part.part_typeProp = uris.RESISTOR elif lib == "Diode_SMD": - part.part_typeProp = part_type_uris.DIODE + part.part_typeProp = uris.DIODE elif lib == "Inductor_SMD": - part.part_typeProp = part_type_uris.INDUCTOR + part.part_typeProp = uris.INDUCTOR elif lib == "Crystal": - part.part_typeProp = part_type_uris.CRYSTAL + part.part_typeProp = uris.CRYSTAL return part diff --git a/src/ee/tools/digikey_download_facts.py b/src/ee/tools/digikey_download_facts.py index ee4cf20..3ab8551 100644 --- a/src/ee/tools/digikey_download_facts.py +++ b/src/ee/tools/digikey_download_facts.py @@ -45,7 +45,8 @@ parser.add_argument("--force", args = parser.parse_args() digikey = dk.Digikey() -client = dk.DigikeyClient(digikey, on_download=log.debug) +client = dk.DigikeyClient(on_download=log.debug) +parser = dk.DigikeyParser(digikey) repo = dk.DigikeyRepository(digikey, args.out) @@ -79,7 +80,7 @@ for q in queries: continue log.info("Searching for {}".format(p)) - response = client.search(p) + response = parser.parse_string(client.search(p)) todos = [] @@ -108,7 +109,7 @@ for q in queries: log.warn("Part not found") for part_number in todos: - response = client.search(part_number) + response = parser.parse_string(client.search(part_number)) if response.response_type == SearchResponseTypes.SINGLE: p = sorted(response.products, key=lambda p: p.part_number)[0] diff --git a/src/ee/tools/digikey_import_parts.py b/src/ee/tools/digikey_import_parts.py new file mode 100644 index 0000000..77f87e8 --- /dev/null +++ b/src/ee/tools/digikey_import_parts.py @@ -0,0 +1,17 @@ +import argparse +from ee.digikey.import_parts import import_parts + +parser = argparse.ArgumentParser(description="Import all parts in XML file into Digi-Key parts list") + +parser.add_argument("--in", + dest="in_", + required=True, + metavar="FILE") + +parser.add_argument("--out", + required=True, + metavar="FILE") + +args = parser.parse_args() + +import_parts(args.in_, args.out) diff --git a/src/ee/tools/digikey_refresh_parts.py b/src/ee/tools/digikey_refresh_parts.py new file mode 100644 index 0000000..e3e3b35 --- /dev/null +++ b/src/ee/tools/digikey_refresh_parts.py @@ -0,0 +1,22 @@ +import argparse +from pathlib import Path + +from ee.digikey.refresh_parts import refresh_parts + +parser = argparse.ArgumentParser() + +parser.add_argument("--in", + dest="in_", + required=True, + metavar="FILE") + +parser.add_argument("--out", + required=True, + metavar="FILE") + +args = parser.parse_args() + +cache_dir = ".ee/cache" +force = True + +refresh_parts(Path(args.in_), Path(args.out), Path(cache_dir), force) diff --git a/src/ee/xml/bomFile.py b/src/ee/xml/bomFile.py index 4467cde..5a78173 100644 --- a/src/ee/xml/bomFile.py +++ b/src/ee/xml/bomFile.py @@ -804,16 +804,90 @@ class BomFile(GeneratedsSuper): # end class BomFile +class DistributorInfo(GeneratedsSuper): + subclass = None + superclass = None + def __init__(self, extensiontype_=None, **kwargs_): + self.original_tagname_ = None + self.parent_object_ = kwargs_.get('parent_object_') + self.extensiontype_ = extensiontype_ + def factory(*args_, **kwargs_): + if CurrentSubclassModule_ is not None: + subclass = getSubclassFromModule_( + CurrentSubclassModule_, DistributorInfo) + if subclass is not None: + return subclass(*args_, **kwargs_) + if DistributorInfo.subclass: + return DistributorInfo.subclass(*args_, **kwargs_) + else: + return DistributorInfo(*args_, **kwargs_) + factory = staticmethod(factory) + def get_extensiontype_(self): return self.extensiontype_ + def set_extensiontype_(self, extensiontype_): self.extensiontype_ = extensiontype_ + def hasContent_(self): + if ( + + ): + return True + else: + return False + def export(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='DistributorInfo', pretty_print=True): + imported_ns_def_ = GenerateDSNamespaceDefs_.get('DistributorInfo') + if imported_ns_def_ is not None: + namespacedef_ = imported_ns_def_ + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.original_tagname_ is not None: + name_ = self.original_tagname_ + showIndent(outfile, level, pretty_print) + outfile.write('<%s%s%s' % (namespaceprefix_, name_, namespacedef_ and ' ' + namespacedef_ or '', )) + already_processed = set() + self.exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='DistributorInfo') + if self.hasContent_(): + outfile.write('>%s' % (eol_, )) + self.exportChildren(outfile, level + 1, namespaceprefix_, namespacedef_, name_='DistributorInfo', pretty_print=pretty_print) + outfile.write('</%s%s>%s' % (namespaceprefix_, name_, eol_)) + else: + outfile.write('/>%s' % (eol_, )) + def exportAttributes(self, outfile, level, already_processed, namespaceprefix_='', name_='DistributorInfo'): + if self.extensiontype_ is not None and 'xsi:type' not in already_processed: + already_processed.add('xsi:type') + outfile.write(' xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"') + outfile.write(' xsi:type="%s"' % self.extensiontype_) + pass + def exportChildren(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='DistributorInfo', fromsubclass_=False, pretty_print=True): + pass + def build(self, node): + already_processed = set() + self.buildAttributes(node, node.attrib, already_processed) + for child in node: + nodeName_ = Tag_pattern_.match(child.tag).groups()[-1] + self.buildChildren(child, node, nodeName_) + return self + def buildAttributes(self, node, attrs, already_processed): + value = find_attr_value_('xsi:type', node) + if value is not None and 'xsi:type' not in already_processed: + already_processed.add('xsi:type') + self.extensiontype_ = value + def buildChildren(self, child_, node, nodeName_, fromsubclass_=False): + pass +# end class DistributorInfo + + class Part(GeneratedsSuper): subclass = None superclass = None - def __init__(self, id=None, schema_reference=None, part_type=None, part_numbers=None, **kwargs_): + def __init__(self, id=None, schema_reference=None, part_type=None, part_numbers=None, distributor_info=None, fact_sets=None, **kwargs_): self.original_tagname_ = None self.parent_object_ = kwargs_.get('parent_object_') self.id = _cast(None, id) self.schema_reference = schema_reference self.part_type = part_type self.part_numbers = part_numbers + self.distributor_info = distributor_info + self.fact_sets = fact_sets def factory(*args_, **kwargs_): if CurrentSubclassModule_ is not None: subclass = getSubclassFromModule_( @@ -840,6 +914,16 @@ class Part(GeneratedsSuper): def set_part_numbers(self, part_numbers): self.part_numbers = part_numbers part_numbersProp = property(get_part_numbers, set_part_numbers) + def get_distributor_info(self): + return self.distributor_info + def set_distributor_info(self, distributor_info): + self.distributor_info = distributor_info + distributor_infoProp = property(get_distributor_info, set_distributor_info) + def get_fact_sets(self): + return self.fact_sets + def set_fact_sets(self, fact_sets): + self.fact_sets = fact_sets + fact_setsProp = property(get_fact_sets, set_fact_sets) def get_id(self): return self.id def set_id(self, id): @@ -849,7 +933,9 @@ class Part(GeneratedsSuper): if ( self.schema_reference is not None or self.part_type is not None or - self.part_numbers is not None + self.part_numbers is not None or + self.distributor_info is not None or + self.fact_sets is not None ): return True else: @@ -892,6 +978,10 @@ class Part(GeneratedsSuper): outfile.write('<%spart-type>%s</%spart-type>%s' % (namespaceprefix_ , self.gds_encode(self.gds_format_string(quote_xml(self.part_type), input_name='part-type')), namespaceprefix_ , eol_)) if self.part_numbers is not None: self.part_numbers.export(outfile, level, namespaceprefix_, namespacedef_='', name_='part-numbers', pretty_print=pretty_print) + if self.distributor_info is not None: + self.distributor_info.export(outfile, level, namespaceprefix_, namespacedef_='', pretty_print=pretty_print) + if self.fact_sets is not None: + self.fact_sets.export(outfile, level, namespaceprefix_, namespacedef_='', name_='fact-sets', pretty_print=pretty_print) def build(self, node): already_processed = set() self.buildAttributes(node, node.attrib, already_processed) @@ -918,6 +1008,30 @@ class Part(GeneratedsSuper): obj_.build(child_) self.part_numbers = obj_ obj_.original_tagname_ = 'part-numbers' + elif nodeName_ == 'distributor-info': + type_name_ = child_.attrib.get( + '{http://www.w3.org/2001/XMLSchema-instance}type') + if type_name_ is None: + type_name_ = child_.attrib.get('type') + if type_name_ is not None: + type_names_ = type_name_.split(':') + if len(type_names_) == 1: + type_name_ = type_names_[0] + else: + type_name_ = type_names_[1] + class_ = globals()[type_name_] + obj_ = class_.factory() + obj_.build(child_) + else: + raise NotImplementedError( + 'Class not implemented for <distributor_info> element') + self.distributor_info = obj_ + obj_.original_tagname_ = 'distributor-info' + elif nodeName_ == 'fact-sets': + obj_ = FactSetList.factory(parent_object_=self) + obj_.build(child_) + self.fact_sets = obj_ + obj_.original_tagname_ = 'fact-sets' # end class Part @@ -1191,6 +1305,464 @@ class PartNumberList(GeneratedsSuper): # end class PartNumberList +class FactSetList(GeneratedsSuper): + subclass = None + superclass = None + def __init__(self, fact_set=None, **kwargs_): + self.original_tagname_ = None + self.parent_object_ = kwargs_.get('parent_object_') + if fact_set is None: + self.fact_set = [] + else: + self.fact_set = fact_set + def factory(*args_, **kwargs_): + if CurrentSubclassModule_ is not None: + subclass = getSubclassFromModule_( + CurrentSubclassModule_, FactSetList) + if subclass is not None: + return subclass(*args_, **kwargs_) + if FactSetList.subclass: + return FactSetList.subclass(*args_, **kwargs_) + else: + return FactSetList(*args_, **kwargs_) + factory = staticmethod(factory) + def get_fact_set(self): + return self.fact_set + def set_fact_set(self, fact_set): + self.fact_set = fact_set + def add_fact_set(self, value): + self.fact_set.append(value) + def add_fact_set(self, value): + self.fact_set.append(value) + def insert_fact_set_at(self, index, value): + self.fact_set.insert(index, value) + def replace_fact_set_at(self, index, value): + self.fact_set[index] = value + fact_setProp = property(get_fact_set, set_fact_set) + def hasContent_(self): + if ( + self.fact_set + ): + return True + else: + return False + def export(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='FactSetList', pretty_print=True): + imported_ns_def_ = GenerateDSNamespaceDefs_.get('FactSetList') + if imported_ns_def_ is not None: + namespacedef_ = imported_ns_def_ + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.original_tagname_ is not None: + name_ = self.original_tagname_ + showIndent(outfile, level, pretty_print) + outfile.write('<%s%s%s' % (namespaceprefix_, name_, namespacedef_ and ' ' + namespacedef_ or '', )) + already_processed = set() + self.exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='FactSetList') + if self.hasContent_(): + outfile.write('>%s' % (eol_, )) + self.exportChildren(outfile, level + 1, namespaceprefix_, namespacedef_, name_='FactSetList', pretty_print=pretty_print) + showIndent(outfile, level, pretty_print) + outfile.write('</%s%s>%s' % (namespaceprefix_, name_, eol_)) + else: + outfile.write('/>%s' % (eol_, )) + def exportAttributes(self, outfile, level, already_processed, namespaceprefix_='', name_='FactSetList'): + pass + def exportChildren(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='FactSetList', fromsubclass_=False, pretty_print=True): + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + for fact_set_ in self.fact_set: + fact_set_.export(outfile, level, namespaceprefix_, namespacedef_='', name_='fact-set', pretty_print=pretty_print) + def build(self, node): + already_processed = set() + self.buildAttributes(node, node.attrib, already_processed) + for child in node: + nodeName_ = Tag_pattern_.match(child.tag).groups()[-1] + self.buildChildren(child, node, nodeName_) + return self + def buildAttributes(self, node, attrs, already_processed): + pass + def buildChildren(self, child_, node, nodeName_, fromsubclass_=False): + if nodeName_ == 'fact-set': + obj_ = FactSet.factory(parent_object_=self) + obj_.build(child_) + self.fact_set.append(obj_) + obj_.original_tagname_ = 'fact-set' +# end class FactSetList + + +class FactSet(GeneratedsSuper): + subclass = None + superclass = None + def __init__(self, source=None, facts=None, **kwargs_): + self.original_tagname_ = None + self.parent_object_ = kwargs_.get('parent_object_') + self.source = source + self.facts = facts + def factory(*args_, **kwargs_): + if CurrentSubclassModule_ is not None: + subclass = getSubclassFromModule_( + CurrentSubclassModule_, FactSet) + if subclass is not None: + return subclass(*args_, **kwargs_) + if FactSet.subclass: + return FactSet.subclass(*args_, **kwargs_) + else: + return FactSet(*args_, **kwargs_) + factory = staticmethod(factory) + def get_source(self): + return self.source + def set_source(self, source): + self.source = source + sourceProp = property(get_source, set_source) + def get_facts(self): + return self.facts + def set_facts(self, facts): + self.facts = facts + factsProp = property(get_facts, set_facts) + def hasContent_(self): + if ( + self.source is not None or + self.facts is not None + ): + return True + else: + return False + def export(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='FactSet', pretty_print=True): + imported_ns_def_ = GenerateDSNamespaceDefs_.get('FactSet') + if imported_ns_def_ is not None: + namespacedef_ = imported_ns_def_ + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.original_tagname_ is not None: + name_ = self.original_tagname_ + showIndent(outfile, level, pretty_print) + outfile.write('<%s%s%s' % (namespaceprefix_, name_, namespacedef_ and ' ' + namespacedef_ or '', )) + already_processed = set() + self.exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='FactSet') + if self.hasContent_(): + outfile.write('>%s' % (eol_, )) + self.exportChildren(outfile, level + 1, namespaceprefix_, namespacedef_, name_='FactSet', pretty_print=pretty_print) + showIndent(outfile, level, pretty_print) + outfile.write('</%s%s>%s' % (namespaceprefix_, name_, eol_)) + else: + outfile.write('/>%s' % (eol_, )) + def exportAttributes(self, outfile, level, already_processed, namespaceprefix_='', name_='FactSet'): + pass + def exportChildren(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='FactSet', fromsubclass_=False, pretty_print=True): + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.source is not None: + showIndent(outfile, level, pretty_print) + outfile.write('<%ssource>%s</%ssource>%s' % (namespaceprefix_ , self.gds_encode(self.gds_format_string(quote_xml(self.source), input_name='source')), namespaceprefix_ , eol_)) + if self.facts is not None: + self.facts.export(outfile, level, namespaceprefix_, namespacedef_='', name_='facts', pretty_print=pretty_print) + def build(self, node): + already_processed = set() + self.buildAttributes(node, node.attrib, already_processed) + for child in node: + nodeName_ = Tag_pattern_.match(child.tag).groups()[-1] + self.buildChildren(child, node, nodeName_) + return self + def buildAttributes(self, node, attrs, already_processed): + pass + def buildChildren(self, child_, node, nodeName_, fromsubclass_=False): + if nodeName_ == 'source': + source_ = child_.text + source_ = self.gds_validate_string(source_, node, 'source') + self.source = source_ + elif nodeName_ == 'facts': + obj_ = FactList.factory(parent_object_=self) + obj_.build(child_) + self.facts = obj_ + obj_.original_tagname_ = 'facts' +# end class FactSet + + +class Fact(GeneratedsSuper): + subclass = None + superclass = None + def __init__(self, key=None, label=None, value=None, **kwargs_): + self.original_tagname_ = None + self.parent_object_ = kwargs_.get('parent_object_') + self.key = key + self.label = label + self.value = value + def factory(*args_, **kwargs_): + if CurrentSubclassModule_ is not None: + subclass = getSubclassFromModule_( + CurrentSubclassModule_, Fact) + if subclass is not None: + return subclass(*args_, **kwargs_) + if Fact.subclass: + return Fact.subclass(*args_, **kwargs_) + else: + return Fact(*args_, **kwargs_) + factory = staticmethod(factory) + def get_key(self): + return self.key + def set_key(self, key): + self.key = key + keyProp = property(get_key, set_key) + def get_label(self): + return self.label + def set_label(self, label): + self.label = label + labelProp = property(get_label, set_label) + def get_value(self): + return self.value + def set_value(self, value): + self.value = value + valueProp = property(get_value, set_value) + def hasContent_(self): + if ( + self.key is not None or + self.label is not None or + self.value is not None + ): + return True + else: + return False + def export(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='Fact', pretty_print=True): + imported_ns_def_ = GenerateDSNamespaceDefs_.get('Fact') + if imported_ns_def_ is not None: + namespacedef_ = imported_ns_def_ + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.original_tagname_ is not None: + name_ = self.original_tagname_ + showIndent(outfile, level, pretty_print) + outfile.write('<%s%s%s' % (namespaceprefix_, name_, namespacedef_ and ' ' + namespacedef_ or '', )) + already_processed = set() + self.exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='Fact') + if self.hasContent_(): + outfile.write('>%s' % (eol_, )) + self.exportChildren(outfile, level + 1, namespaceprefix_, namespacedef_, name_='Fact', pretty_print=pretty_print) + showIndent(outfile, level, pretty_print) + outfile.write('</%s%s>%s' % (namespaceprefix_, name_, eol_)) + else: + outfile.write('/>%s' % (eol_, )) + def exportAttributes(self, outfile, level, already_processed, namespaceprefix_='', name_='Fact'): + pass + def exportChildren(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='Fact', fromsubclass_=False, pretty_print=True): + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.key is not None: + showIndent(outfile, level, pretty_print) + outfile.write('<%skey>%s</%skey>%s' % (namespaceprefix_ , self.gds_encode(self.gds_format_string(quote_xml(self.key), input_name='key')), namespaceprefix_ , eol_)) + if self.label is not None: + showIndent(outfile, level, pretty_print) + outfile.write('<%slabel>%s</%slabel>%s' % (namespaceprefix_ , self.gds_encode(self.gds_format_string(quote_xml(self.label), input_name='label')), namespaceprefix_ , eol_)) + if self.value is not None: + showIndent(outfile, level, pretty_print) + outfile.write('<%svalue>%s</%svalue>%s' % (namespaceprefix_ , self.gds_encode(self.gds_format_string(quote_xml(self.value), input_name='value')), namespaceprefix_ , eol_)) + def build(self, node): + already_processed = set() + self.buildAttributes(node, node.attrib, already_processed) + for child in node: + nodeName_ = Tag_pattern_.match(child.tag).groups()[-1] + self.buildChildren(child, node, nodeName_) + return self + def buildAttributes(self, node, attrs, already_processed): + pass + def buildChildren(self, child_, node, nodeName_, fromsubclass_=False): + if nodeName_ == 'key': + key_ = child_.text + key_ = self.gds_validate_string(key_, node, 'key') + self.key = key_ + elif nodeName_ == 'label': + label_ = child_.text + label_ = self.gds_validate_string(label_, node, 'label') + self.label = label_ + elif nodeName_ == 'value': + value_ = child_.text + value_ = self.gds_validate_string(value_, node, 'value') + self.value = value_ +# end class Fact + + +class FactList(GeneratedsSuper): + subclass = None + superclass = None + def __init__(self, fact=None, **kwargs_): + self.original_tagname_ = None + self.parent_object_ = kwargs_.get('parent_object_') + if fact is None: + self.fact = [] + else: + self.fact = fact + def factory(*args_, **kwargs_): + if CurrentSubclassModule_ is not None: + subclass = getSubclassFromModule_( + CurrentSubclassModule_, FactList) + if subclass is not None: + return subclass(*args_, **kwargs_) + if FactList.subclass: + return FactList.subclass(*args_, **kwargs_) + else: + return FactList(*args_, **kwargs_) + factory = staticmethod(factory) + def get_fact(self): + return self.fact + def set_fact(self, fact): + self.fact = fact + def add_fact(self, value): + self.fact.append(value) + def add_fact(self, value): + self.fact.append(value) + def insert_fact_at(self, index, value): + self.fact.insert(index, value) + def replace_fact_at(self, index, value): + self.fact[index] = value + factProp = property(get_fact, set_fact) + def hasContent_(self): + if ( + self.fact + ): + return True + else: + return False + def export(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='FactList', pretty_print=True): + imported_ns_def_ = GenerateDSNamespaceDefs_.get('FactList') + if imported_ns_def_ is not None: + namespacedef_ = imported_ns_def_ + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.original_tagname_ is not None: + name_ = self.original_tagname_ + showIndent(outfile, level, pretty_print) + outfile.write('<%s%s%s' % (namespaceprefix_, name_, namespacedef_ and ' ' + namespacedef_ or '', )) + already_processed = set() + self.exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='FactList') + if self.hasContent_(): + outfile.write('>%s' % (eol_, )) + self.exportChildren(outfile, level + 1, namespaceprefix_, namespacedef_, name_='FactList', pretty_print=pretty_print) + showIndent(outfile, level, pretty_print) + outfile.write('</%s%s>%s' % (namespaceprefix_, name_, eol_)) + else: + outfile.write('/>%s' % (eol_, )) + def exportAttributes(self, outfile, level, already_processed, namespaceprefix_='', name_='FactList'): + pass + def exportChildren(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='FactList', fromsubclass_=False, pretty_print=True): + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + for fact_ in self.fact: + fact_.export(outfile, level, namespaceprefix_, namespacedef_='', name_='fact', pretty_print=pretty_print) + def build(self, node): + already_processed = set() + self.buildAttributes(node, node.attrib, already_processed) + for child in node: + nodeName_ = Tag_pattern_.match(child.tag).groups()[-1] + self.buildChildren(child, node, nodeName_) + return self + def buildAttributes(self, node, attrs, already_processed): + pass + def buildChildren(self, child_, node, nodeName_, fromsubclass_=False): + if nodeName_ == 'fact': + obj_ = Fact.factory(parent_object_=self) + obj_.build(child_) + self.fact.append(obj_) + obj_.original_tagname_ = 'fact' +# end class FactList + + +class DigikeyDistributorInfo(DistributorInfo): + subclass = None + superclass = DistributorInfo + def __init__(self, state=None, **kwargs_): + self.original_tagname_ = None + self.parent_object_ = kwargs_.get('parent_object_') + super(DigikeyDistributorInfo, self).__init__( **kwargs_) + self.state = state + def factory(*args_, **kwargs_): + if CurrentSubclassModule_ is not None: + subclass = getSubclassFromModule_( + CurrentSubclassModule_, DigikeyDistributorInfo) + if subclass is not None: + return subclass(*args_, **kwargs_) + if DigikeyDistributorInfo.subclass: + return DigikeyDistributorInfo.subclass(*args_, **kwargs_) + else: + return DigikeyDistributorInfo(*args_, **kwargs_) + factory = staticmethod(factory) + def get_state(self): + return self.state + def set_state(self, state): + self.state = state + stateProp = property(get_state, set_state) + def hasContent_(self): + if ( + self.state is not None or + super(DigikeyDistributorInfo, self).hasContent_() + ): + return True + else: + return False + def export(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='DigikeyDistributorInfo', pretty_print=True): + imported_ns_def_ = GenerateDSNamespaceDefs_.get('DigikeyDistributorInfo') + if imported_ns_def_ is not None: + namespacedef_ = imported_ns_def_ + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.original_tagname_ is not None: + name_ = self.original_tagname_ + showIndent(outfile, level, pretty_print) + outfile.write('<%s%s%s' % (namespaceprefix_, name_, namespacedef_ and ' ' + namespacedef_ or '', )) + already_processed = set() + self.exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='DigikeyDistributorInfo') + if self.hasContent_(): + outfile.write('>%s' % (eol_, )) + self.exportChildren(outfile, level + 1, namespaceprefix_, namespacedef_, name_='DigikeyDistributorInfo', pretty_print=pretty_print) + showIndent(outfile, level, pretty_print) + outfile.write('</%s%s>%s' % (namespaceprefix_, name_, eol_)) + else: + outfile.write('/>%s' % (eol_, )) + def exportAttributes(self, outfile, level, already_processed, namespaceprefix_='', name_='DigikeyDistributorInfo'): + super(DigikeyDistributorInfo, self).exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='DigikeyDistributorInfo') + def exportChildren(self, outfile, level, namespaceprefix_='', namespacedef_='', name_='DigikeyDistributorInfo', fromsubclass_=False, pretty_print=True): + super(DigikeyDistributorInfo, self).exportChildren(outfile, level, namespaceprefix_, name_, True, pretty_print=pretty_print) + if pretty_print: + eol_ = '\n' + else: + eol_ = '' + if self.state is not None: + showIndent(outfile, level, pretty_print) + outfile.write('<%sstate>%s</%sstate>%s' % (namespaceprefix_ , self.gds_encode(self.gds_format_string(quote_xml(self.state), input_name='state')), namespaceprefix_ , eol_)) + def build(self, node): + already_processed = set() + self.buildAttributes(node, node.attrib, already_processed) + for child in node: + nodeName_ = Tag_pattern_.match(child.tag).groups()[-1] + self.buildChildren(child, node, nodeName_) + return self + def buildAttributes(self, node, attrs, already_processed): + super(DigikeyDistributorInfo, self).buildAttributes(node, attrs, already_processed) + def buildChildren(self, child_, node, nodeName_, fromsubclass_=False): + if nodeName_ == 'state': + state_ = child_.text + state_ = self.gds_validate_string(state_, node, 'state') + self.state = state_ + super(DigikeyDistributorInfo, self).buildChildren(child_, node, nodeName_, True) +# end class DigikeyDistributorInfo + + GDSClassesMapping = { 'bom-file': BomFile, } @@ -1321,6 +1893,12 @@ if __name__ == '__main__': __all__ = [ "BomFile", + "DigikeyDistributorInfo", + "DistributorInfo", + "Fact", + "FactList", + "FactSet", + "FactSetList", "Part", "PartList", "PartNumber", diff --git a/src/ee/xml/bom_file_utils.py b/src/ee/xml/bom_file_utils.py new file mode 100644 index 0000000..f09a3dd --- /dev/null +++ b/src/ee/xml/bom_file_utils.py @@ -0,0 +1,50 @@ +from typing import List, Optional + +from ee.xml import bomFile + +__all__ = [ + "part_numbers", + "find_pn", + "find_dpn", +] + + +def part_numbers(part: bomFile.Part) -> List[bomFile.PartNumber]: + pns = part.part_numbersProp # type: bomFile.PartNumberList + + if pns is None: + return [] + + return pns.part_numberProp + + +def find_pn(part: bomFile.Part) -> str: + for pn in part_numbers(part): + if pn.distributor is None: + return pn.value + + +def find_dpn(part: bomFile.Part, distributor: str) -> str: + for pn in part_numbers(part): + if pn.distributor == distributor: + return pn.value + + +def find_fact_set(part: bomFile.Part, uri: str, create=False) -> Optional[bomFile.FactSet]: + fact_set_list: bomFile.FactSetList = part.fact_setsProp + + if fact_set_list is None: + if not create: + return + + fact_set_list = part.fact_setsProp = bomFile.FactSetList() + + for fs in fact_set_list.fact_setProp: + fact_set: bomFile.FactSet = fs + + if fact_set.sourceProp == uri: + return fact_set + + fact_set = bomFile.FactSet(source=uri) + fact_set_list.add_fact_set(fact_set) + return fact_set diff --git a/src/ee/bom/part_type_uris.py b/src/ee/xml/uris.py index b7f5e88..7bf0487 100644 --- a/src/ee/bom/part_type_uris.py +++ b/src/ee/xml/uris.py @@ -3,3 +3,5 @@ RESISTOR = "http://purl.org/ee/part-type#resistor" DIODE = "http://purl.org/ee/part-type#diode" INDUCTOR = "http://purl.org/ee/part-type#inductor" CRYSTAL = "http://purl.org/ee/part-type#inductor" + +DIGIKEY_URI = "https://digikey.com" |