diff options
-rw-r--r-- | src/ee/digikey/search_parts.py | 69 |
1 files changed, 45 insertions, 24 deletions
diff --git a/src/ee/digikey/search_parts.py b/src/ee/digikey/search_parts.py index 09f682d..c7ff1d6 100644 --- a/src/ee/digikey/search_parts.py +++ b/src/ee/digikey/search_parts.py @@ -1,5 +1,6 @@ +import functools from pathlib import Path -from typing import List +from typing import List, MutableSet from ee.db import ObjDb from ee.digikey import Digikey, DigikeyParser, DigikeyClient, SearchResponseTypes, DigikeyProduct, DigikeyStore @@ -10,6 +11,22 @@ from ee.xml.uris import make_digikey_fact_key __all__ = ["search_parts"] +@functools.total_ordering +class QueryByPn(object): + def __init__(self, pn: str, is_spn): + self.pn = pn + self.is_spn = is_spn + + def __eq__(self, other): + return self.pn == other.pn and self.is_spn == other.is_spn + + def __lt__(self, other): + return (self.pn, self.is_spn) < (other.pn, other.is_spn) + + def __hash__(self): + return hash((self.pn, self.is_spn)) + + def resolved(supplier, p: DigikeyProduct) -> Part: # TODO: fix uri xml = types.Part(uri="https://digikey.com/pn#{}".format(p.part_number), @@ -60,40 +77,44 @@ def run_search_parts(in_path: Path, out_path: Path, log, cache_dir: Path, store_ store = DigikeyStore.from_store_code(store_code) - parser = DigikeyParser(Digikey()) - client = DigikeyClient(store.products_url, cache_dir) + pn_qs: MutableSet[QueryByPn] = set() - out_parts: ObjDb[Part] = ObjDb[Part]() - uri_idx = out_parts.add_unique_index("uri", lambda p: p.uri) - out_parts.add_index("pn", lambda p: [pn.value for pn in p.get_part_references()], multiple=True) - out_parts.add_index("spn", lambda p: [pn.value for pn in p.get_spns()], multiple=True) for xml in in_db.iterparts(): part = Part(xml) - print("Searching for {}".format(part.printable_reference), file=log) + # print("Searching for {}".format(part.printable_reference), file=log) if xml.supplierProp is not None and xml.supplierProp != store.url: assert False, "Something is fishy" # Not sure why I made this rule - dpn = part.get_only_spn() - mpn = part.get_only_mpn() + found_any = False + for pn in part.get_mpns(): + pn_qs.add(QueryByPn(pn.valueProp, False)) + found_any = True + + for pn in part.get_spns(): + pn_qs.add(QueryByPn(pn.valueProp, True)) + found_any = True + + if not found_any: + print("Could not find anything for search by, checked product numbers and supplier product numbers") - is_mpn = query = None + pn_queries = list(sorted(pn_qs)) + + parser = DigikeyParser(Digikey()) + client = DigikeyClient(store.products_url, cache_dir) - if dpn is not None: - query = dpn.valueProp - is_mpn = False - elif mpn is not None: - query = mpn.valueProp - is_mpn = True + out_parts: ObjDb[Part] = ObjDb[Part]() + uri_idx = out_parts.add_unique_index("uri", lambda p: p.uri) + out_parts.add_index("pn", lambda p: [pn.value for pn in p.get_part_references()], multiple=True) + out_parts.add_index("spn", lambda p: [pn.value for pn in p.get_spns()], multiple=True) - if query is None: - print("Could not find pn or spn to search by", file=log) - continue + print("Executing {} product number searches\n\n".format(len(pn_queries)), file=log) + for q in pn_queries: out_part = None result = None - text = client.search(query) + text = client.search(q.pn) response = parser.parse_string(text) if response.response_type == SearchResponseTypes.SINGLE: @@ -104,9 +125,9 @@ def run_search_parts(in_path: Path, out_path: Path, log, cache_dir: Path, store_ # find those with an exact match. Digikey uses a prefix search so a query for "FOO" will return "FOO" # and "FOOT". def get_field(p): - return p.mpn if is_mpn else p.part_number + return p.part_number if q.is_spn else p.mpn - filtered_products = [p for p in response.products if get_field(p) == query] + filtered_products = [p for p in response.products if get_field(p) == q.pn] if len(filtered_products) == 0: result = "not-found" @@ -129,7 +150,7 @@ def run_search_parts(in_path: Path, out_path: Path, log, cache_dir: Path, store_ if out_part.uri not in uri_idx: out_parts.add(out_part) - print("{}: {}".format(query, result), file=log) + print("Searching for '{}': {}".format(q.pn, result), file=log) print("", file=log) part_db = PartDb() |