From d8b6719c628c7dfb4537ad2303c016884e9312f3 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 15 May 2019 20:52:04 +0200 Subject: digikey-search-parts: even better structure. --- src/ee/digikey/search_parts.py | 178 ++++++++++++++++++++++++++--------------- 1 file changed, 113 insertions(+), 65 deletions(-) (limited to 'src') diff --git a/src/ee/digikey/search_parts.py b/src/ee/digikey/search_parts.py index c7ff1d6..5319ba9 100644 --- a/src/ee/digikey/search_parts.py +++ b/src/ee/digikey/search_parts.py @@ -1,6 +1,6 @@ import functools from pathlib import Path -from typing import List, MutableSet +from typing import List, MutableSet, Mapping from ee.db import ObjDb from ee.digikey import Digikey, DigikeyParser, DigikeyClient, SearchResponseTypes, DigikeyProduct, DigikeyStore @@ -13,18 +13,111 @@ __all__ = ["search_parts"] @functools.total_ordering class QueryByPn(object): - def __init__(self, pn: str, is_spn): + def __init__(self, pn: str): self.pn = pn - self.is_spn = is_spn def __eq__(self, other): - return self.pn == other.pn and self.is_spn == other.is_spn + return self.pn == other.pn def __lt__(self, other): - return (self.pn, self.is_spn) < (other.pn, other.is_spn) + return self.pn < other.pn def __hash__(self): - return hash((self.pn, self.is_spn)) + return hash(self.pn) + + +@functools.total_ordering +class QueryBySpn(object): + def __init__(self, spn: str): + self.spn = spn + + def __eq__(self, other): + return self.spn == other.pn + + def __lt__(self, other): + return self.spn < other.spn + + def __hash__(self): + return hash(self.spn) + + +@functools.total_ordering +class QueryByAttributes(object): + def __init__(self, attributes: Mapping[str, str]): + self.attributes = attributes + + def __eq__(self, other: "QueryByAttributes"): + return self.attributes == other.attributes + + def __lt__(self, other): + return self.attributes < other.attributes + + def __hash__(self): + return hash(self.attributes) + + +class QueryEngine(object): + def __init__(self, cache_dir, log, store_code): + self.log = log + self.store = DigikeyStore.from_store_code(store_code) + self.parser = DigikeyParser(Digikey()) + self.client = DigikeyClient(self.store.products_url, cache_dir) + + out_parts: ObjDb[Part] = ObjDb[Part]() + self.uri_idx = out_parts.add_unique_index("uri", lambda p: p.uri) + out_parts.add_index("pn", lambda p: [pn.value for pn in p.get_part_references()], multiple=True) + out_parts.add_index("spn", lambda p: [pn.value for pn in p.get_spns()], multiple=True) + + self.out_parts = out_parts + + def pn_search(self, pn): + self.pn_spn_search(pn, False) + + def spn_search(self, pn): + self.pn_spn_search(pn, True) + + def pn_spn_search(self, pn, is_spn): + out_part = None + result = None + + text = self.client.search(pn) + response = self.parser.parse_string(text) + + if response.response_type == SearchResponseTypes.SINGLE: + out_part = resolved(self.store.url, response.products[0]) + result = "found" + elif response.response_type == SearchResponseTypes.MANY: + + # find those with an exact match. Digikey uses a prefix search so a query for "FOO" will return "FOO" + # and "FOOT". + def get_field(p): + return p.part_number if is_spn else p.mpn + + filtered_products = [p for p in response.products if get_field(p) == pn] + + if len(filtered_products) == 0: + result = "not-found" + else: + dpn = sorted(filtered_products, key=lambda p: p.part_number)[0].part_number + + response = self.parser.parse_string(self.client.search(dpn)) + if response.response_type == SearchResponseTypes.SINGLE: + out_part = resolved(self.store.url, response.products[0]) + result = "found" + else: + result = "many" + + elif response.response_type == SearchResponseTypes.TOO_MANY: + result = "too-many" + elif response.response_type == SearchResponseTypes.NO_MATCHES: + result = "not-found" + + if out_part: + if out_part.uri not in self.uri_idx: + self.out_parts.add(out_part) + + print("Searching for '{}': {}".format(pn, result), file=self.log) + print("", file=self.log) def resolved(supplier, p: DigikeyProduct) -> Part: @@ -67,93 +160,48 @@ def resolved(supplier, p: DigikeyProduct) -> Part: def search_parts(in_path: Path, out_path: Path, log_path: Path, cache_dir: Path, store_code): - print_result = True with log_path.open("w") as log: - run_search_parts(in_path, out_path, log, cache_dir, store_code, print_result) + run_search_parts(in_path, out_path, log, cache_dir, store_code) -def run_search_parts(in_path: Path, out_path: Path, log, cache_dir: Path, store_code, print_result): +def run_search_parts(in_path: Path, out_path: Path, log, cache_dir: Path, store_code): in_db = load_db(in_path) - store = DigikeyStore.from_store_code(store_code) + engine = QueryEngine(cache_dir, log, store_code) pn_qs: MutableSet[QueryByPn] = set() + spn_qs: MutableSet[QueryBySpn] = set() + attr_qs: MutableSet[QueryByAttributes] = set() for xml in in_db.iterparts(): part = Part(xml) - # print("Searching for {}".format(part.printable_reference), file=log) - - if xml.supplierProp is not None and xml.supplierProp != store.url: - assert False, "Something is fishy" # Not sure why I made this rule found_any = False for pn in part.get_mpns(): - pn_qs.add(QueryByPn(pn.valueProp, False)) + pn_qs.add(QueryByPn(pn.valueProp)) found_any = True for pn in part.get_spns(): - pn_qs.add(QueryByPn(pn.valueProp, True)) + spn_qs.add(QueryBySpn(pn.valueProp)) found_any = True if not found_any: print("Could not find anything for search by, checked product numbers and supplier product numbers") pn_queries = list(sorted(pn_qs)) + spn_queries = list(sorted(spn_qs)) - parser = DigikeyParser(Digikey()) - client = DigikeyClient(store.products_url, cache_dir) - - out_parts: ObjDb[Part] = ObjDb[Part]() - uri_idx = out_parts.add_unique_index("uri", lambda p: p.uri) - out_parts.add_index("pn", lambda p: [pn.value for pn in p.get_part_references()], multiple=True) - out_parts.add_index("spn", lambda p: [pn.value for pn in p.get_spns()], multiple=True) - - print("Executing {} product number searches\n\n".format(len(pn_queries)), file=log) + print("Executing {} manufacturer product number searches\n\n".format(len(pn_queries)), file=log) for q in pn_queries: - out_part = None - result = None - - text = client.search(q.pn) - response = parser.parse_string(text) - - if response.response_type == SearchResponseTypes.SINGLE: - out_part = resolved(store.url, response.products[0]) - result = "found" - elif response.response_type == SearchResponseTypes.MANY: + engine.pn_search(q.pn) - # find those with an exact match. Digikey uses a prefix search so a query for "FOO" will return "FOO" - # and "FOOT". - def get_field(p): - return p.part_number if q.is_spn else p.mpn - - filtered_products = [p for p in response.products if get_field(p) == q.pn] - - if len(filtered_products) == 0: - result = "not-found" - else: - dpn = sorted(filtered_products, key=lambda p: p.part_number)[0].part_number - - response = parser.parse_string(client.search(dpn)) - if response.response_type == SearchResponseTypes.SINGLE: - out_part = resolved(store.url, response.products[0]) - result = "found" - else: - result = "many" - - elif response.response_type == SearchResponseTypes.TOO_MANY: - result = "too-many" - elif response.response_type == SearchResponseTypes.NO_MATCHES: - result = "not-found" - - if out_part: - if out_part.uri not in uri_idx: - out_parts.add(out_part) + print("Executing {} supplier product number searches\n\n".format(len(spn_queries)), file=log) - print("Searching for '{}': {}".format(q.pn, result), file=log) - print("", file=log) + for q in spn_queries: + engine.spn_search(q.spn) part_db = PartDb() - for part in out_parts: + for part in engine.out_parts: part_db.add_entry(part, True) save_db(out_path, part_db, sort=True) -- cgit v1.2.3