import functools from pathlib import Path from typing import List, MutableSet, Mapping, Set from ee import EeException from ee.db import ObjDb from ee.digikey import Digikey, DigikeyParser, DigikeyClient, SearchResponseTypes, DigikeyProduct, DigikeyStore from ee.part import PartDb, load_db, save_db, Part, Category from ee.tools import mk_parents from ee.xml import types, uris from ee.xml.uris import make_digikey_fact_key __all__ = ["search_parts"] @functools.total_ordering class QueryByPn(object): def __init__(self, pn: str): self.pn = pn def __eq__(self, other): return self.pn == other.pn def __lt__(self, other): return self.pn < other.pn def __hash__(self): return hash(self.pn) @functools.total_ordering class QueryBySpn(object): def __init__(self, spn: str): self.spn = spn def __eq__(self, other): return self.spn == other.pn def __lt__(self, other): return self.spn < other.spn def __hash__(self): return hash(self.spn) @functools.total_ordering class QueryByAttributes(object): def __init__(self, attributes: Mapping[str, str]): self.attributes = attributes def __eq__(self, other: "QueryByAttributes"): return self.attributes == other.attributes def __lt__(self, other): return self.attributes < other.attributes def __hash__(self): return hash(self.attributes) class QueryEngine(object): def __init__(self, cache_dir, log, store_code): self.log = log self.store = DigikeyStore.from_store_code(store_code) self.parser = DigikeyParser(Digikey()) self.client = DigikeyClient(self.store.frontpage_url, cache_dir) out_parts: ObjDb[Part] = ObjDb[Part]() self.uri_idx = out_parts.add_unique_index("uri", lambda p: p.uri) out_parts.add_index("pn", lambda p: [pn.value for pn in p.get_part_references()], multiple=True) out_parts.add_index("spn", lambda p: [pn.value for pn in p.get_spns()], multiple=True) self.out_parts = out_parts self.categories: Set[Category] = set() def pn_search(self, pn): return self.pn_spn_search(pn, False) def spn_search(self, pn): return self.pn_spn_search(pn, True) def _collect_categories(self, product: DigikeyProduct): [self.categories.add(c) for c in product.categories] def pn_spn_search(self, pn, is_spn): s = "Searching for '{}'".format(pn) print(s, file=self.log) print("-" * len(s) + "\n", file=self.log) out_part = None result = None text = self.client.search(pn) response = self.parser.parse_string(self.client.baseurl, text) if response.response_type == SearchResponseTypes.EXCEPTION: result = "exception" elif response.response_type == SearchResponseTypes.SINGLE: product = response.products[0] out_part = resolved(self.store.url, product) self._collect_categories(product) out_pn = out_part.get_exactly_one_spn() if is_spn else out_part.get_exactly_one_mpn() out_pn = out_pn.valueProp if out_pn != pn: raise EeException("Internal error: returned PN/SPN didn't match expected parts: " "query: {} vs returned: {}".format(pn, out_pn)) result = "found" elif response.response_type == SearchResponseTypes.MANY: # TODO: order by spn/mpn so the output is consistent between runs. print("Got many responses:\n", file=self.log) from ee._utils import gen_rst_table data = [[p.part_number, p.mpn] for p in sorted(response.products, key=lambda x: x.part_number)] print(gen_rst_table(["DK", "MPN"], data), file=self.log) # find those with an exact match. Digikey uses a "contains" search so a query for "FOO" will return "FOO", # "FOOT" and "AFOO". def get_field(p): return p.part_number if is_spn else p.mpn filtered_products = [p for p in response.products if get_field(p) == pn] if len(filtered_products) == 0: print("No items matched the query.\n", file=self.log) result = "not-found" else: part = sorted(filtered_products, key=lambda p: p.part_number)[0] print("Found {} matching products, but their facts are the same so picked ``{}`` for more info.\n". format(len(filtered_products), part.part_number), file=self.log) page = self.client.get_for_product_url(part.url, part.part_number) response = self.parser.parse_string(self.client.baseurl, page) if response.response_type == SearchResponseTypes.SINGLE: product = response.products[0] out_part = resolved(self.store.url, product) self._collect_categories(product) result = "found" else: print("Unable to narrow down the part, got {} new products. Giving up.\n". format(len(response.products)), file=self.log) result = "many" elif response.response_type == SearchResponseTypes.TOO_MANY: result = "too-many" elif response.response_type == SearchResponseTypes.NO_MATCHES: result = "not-found" if out_part: if out_part.uri not in self.uri_idx: self.out_parts.add(out_part) print("Result: {}".format(result), file=self.log) print("", file=self.log) return response.response_type def resolved(supplier, p: DigikeyProduct) -> Part: # TODO: fix uri xml = types.Part(uri=p.uri, supplier=supplier, description=p.description, category=types.CategoryList(), links=types.LinkList(), facts=types.FactList(), references=types.ReferenceList()) part = Part(xml) if p.url: part.get_links().append(types.Link(url=p.url, relation="canonical", media_type="text/html")) if len(p.categories): xml.set_category(p.categories[-1].uri) for d in p.documents: title = "{}: {}".format(d.section, d.title) relations = ["http://purl.org/ee/link-relation#documentation"] if "datasheet" in d.classes: relations.append(uris.make_link_rel("datasheet")) part.get_links().append(types.Link(url=d.url, relation=" ".join(relations), media_type="text/html", title=title)) part.add_spn(p.part_number) if p.mpn: part.add_mpn(p.mpn) facts: List[types.Fact] = xml.factsProp.factProp for a in p.attributes: key = make_digikey_fact_key(a.attribute_type.id) facts.append(types.Fact(key=key, label=a.attribute_type.label, value=a.value)) if len(p.price_breaks): xml.price_breaksProp = types.PriceBreakList() price_breaks: List[types.PriceBreak] = xml.price_breaksProp.price_break for pb in p.price_breaks: amount = types.Amount(value=str(pb.per_piece_price.amount), currency=pb.per_piece_price.currency) price_breaks.append(types.PriceBreak(pb.quantity, amount=amount)) return part def search_parts(in_path: Path, out_path: Path, log_path: Path, cache_dir: Path, store_code): mk_parents(log_path) with log_path.open("w") as log: run_search_parts(in_path, out_path, log, cache_dir, store_code) def run_search_parts(in_path: Path, out_path: Path, log, cache_dir: Path, store_code): in_db = load_db(in_path) engine = QueryEngine(cache_dir, log, store_code) pn_qs: MutableSet[QueryByPn] = set() spn_qs: MutableSet[QueryBySpn] = set() attr_qs: MutableSet[QueryByAttributes] = set() for xml in in_db.iterparts(): part = Part(xml) found_any = False for pn in part.get_mpns(): pn_qs.add(QueryByPn(pn.valueProp)) found_any = True for pn in part.get_spns(): spn_qs.add(QueryBySpn(pn.valueProp)) found_any = True if not found_any: print("Could not find anything for search by, checked product numbers and supplier product numbers") pn_queries = list(sorted(pn_qs)) spn_queries = list(sorted(spn_qs)) s = "Manufacturer product number searches" print("{}\n{}\n".format(s, "=" * len(s)), file=log) print("Executing {} manufacturer product number searches:\n\n".format(len(pn_queries)), file=log) exception = False mpn_results = {} for q in pn_queries: res = engine.pn_search(q.pn) if res in mpn_results: mpn_results[res] = mpn_results[res] + 1 else: mpn_results[res] = 1 if res == SearchResponseTypes.EXCEPTION: exception = True break spn_results = {} if not exception: if len(spn_queries) == 0: print("Executing no supplier product number searches.\n\n".format(len(spn_queries)), file=log) else: print("Executing {} supplier product number searches:\n\n".format(len(spn_queries)), file=log) for q in spn_queries: res = engine.spn_search(q.spn) if res in spn_results: spn_results[res] = spn_results[res] + 1 else: spn_results[res] = 1 if res == SearchResponseTypes.EXCEPTION: exception = True break header = "Statistics" print("{}\n{}\n".format(header, "=" * len(header)), file=log) for name, results in [("MPN", mpn_results), ("SPN", spn_results)]: print("{} Searches:\n".format(name), file=log) for res, count in sorted([(res, count) for res, count in results.items()], key=lambda x: x[0].value): print("* {}: {}".format(res.name, count), file=log) print("", file=log) part_db = PartDb() for part in engine.out_parts: part_db.add_entry(part, True) part_db.categories = engine.categories save_db(out_path, part_db, sort=True)