import functools from pathlib import Path from typing import List, MutableSet, Mapping from ee.db import ObjDb from ee.digikey import Digikey, DigikeyParser, DigikeyClient, SearchResponseTypes, DigikeyProduct, DigikeyStore from ee.part import PartDb, load_db, save_db, Part from ee.tools import mk_parents from ee.xml import types, uris from ee.xml.uris import make_digikey_fact_key __all__ = ["search_parts"] @functools.total_ordering class QueryByPn(object): def __init__(self, pn: str): self.pn = pn def __eq__(self, other): return self.pn == other.pn def __lt__(self, other): return self.pn < other.pn def __hash__(self): return hash(self.pn) @functools.total_ordering class QueryBySpn(object): def __init__(self, spn: str): self.spn = spn def __eq__(self, other): return self.spn == other.pn def __lt__(self, other): return self.spn < other.spn def __hash__(self): return hash(self.spn) @functools.total_ordering class QueryByAttributes(object): def __init__(self, attributes: Mapping[str, str]): self.attributes = attributes def __eq__(self, other: "QueryByAttributes"): return self.attributes == other.attributes def __lt__(self, other): return self.attributes < other.attributes def __hash__(self): return hash(self.attributes) class QueryEngine(object): def __init__(self, cache_dir, log, store_code): self.log = log self.store = DigikeyStore.from_store_code(store_code) self.parser = DigikeyParser(Digikey()) self.client = DigikeyClient(self.store.frontpage_url, cache_dir) out_parts: ObjDb[Part] = ObjDb[Part]() self.uri_idx = out_parts.add_unique_index("uri", lambda p: p.uri) out_parts.add_index("pn", lambda p: [pn.value for pn in p.get_part_references()], multiple=True) out_parts.add_index("spn", lambda p: [pn.value for pn in p.get_spns()], multiple=True) self.out_parts = out_parts def pn_search(self, pn): return self.pn_spn_search(pn, False) def spn_search(self, pn): return self.pn_spn_search(pn, True) def pn_spn_search(self, pn, is_spn): s = "Searching for '{}'".format(pn) print(s, file=self.log) print("=" * len(s) + "\n", file=self.log) out_part = None result = None text = self.client.search(pn) response = self.parser.parse_string(self.client.baseurl, text) if response.response_type == SearchResponseTypes.EXCEPTION: result = "exception" elif response.response_type == SearchResponseTypes.SINGLE: out_part = resolved(self.store.url, response.products[0]) result = "found" elif response.response_type == SearchResponseTypes.MANY: # TODO: order by spn/mpn so the output is consistent between runs. print("Got many responses:\n", file=self.log) from ee._utils import gen_rst_table data = [[p.part_number, p.mpn] for p in sorted(response.products, key=lambda x: x.part_number)] print(gen_rst_table(["DK", "MPN"], data), file=self.log) # find those with an exact match. Digikey uses a "contains" search so a query for "FOO" will return "FOO", # "FOOT" and "AFOO". def get_field(p): return p.part_number if is_spn else p.mpn filtered_products = [p for p in response.products if get_field(p) == pn] if len(filtered_products) == 0: print("No items matched the query.", file=self.log) result = "not-found" else: part = sorted(filtered_products, key=lambda p: p.part_number)[0] print("Found {} matching products, but their facts are the same so picked ``{}`` for more info.".format( len(filtered_products), part.part_number), file=self.log) page = self.client.get_for_product_url(part.url, part.part_number) response = self.parser.parse_string(self.client.baseurl, page) if response.response_type == SearchResponseTypes.SINGLE: out_part = resolved(self.store.url, response.products[0]) result = "found" else: print("Unable to narrow down the part, got {} new products. Giving up.".format( len(response.products)), file=self.log) result = "many" elif response.response_type == SearchResponseTypes.TOO_MANY: result = "too-many" elif response.response_type == SearchResponseTypes.NO_MATCHES: result = "not-found" if out_part: if out_part.uri not in self.uri_idx: self.out_parts.add(out_part) print("\nResult: {}".format(result), file=self.log) print("", file=self.log) return response.response_type def resolved(supplier, p: DigikeyProduct) -> Part: # TODO: fix uri xml = types.Part(uri="https://digikey.com/pn#{}".format(p.part_number), supplier=supplier, description=p.description, links=types.LinkList(), facts=types.FactList(), references=types.ReferenceList()) part = Part(xml) if p.url: part.get_links().append(types.Link(url=p.url, relation="canonical", media_type="text/html")) for d in p.documents: title = "{}: {}".format(d.section, d.title) relations = ["http://purl.org/ee/link-relation#documentation"] if "datasheet" in d.classes: relations.append(uris.make_link_rel("datasheet")) part.get_links().append(types.Link(url=d.url, relation=" ".join(relations), media_type="text/html", title=title)) part.add_spn(p.part_number) if p.mpn: part.add_mpn(p.mpn) facts: List[types.Fact] = xml.factsProp.factProp for a in p.attributes: key = make_digikey_fact_key(a.attribute_type.id) facts.append(types.Fact(key=key, label=a.attribute_type.label, value=a.value)) if len(p.price_breaks): xml.price_breaksProp = types.PriceBreakList() price_breaks: List[types.PriceBreak] = xml.price_breaksProp.price_break for pb in p.price_breaks: amount = types.Amount(value=str(pb.per_piece_price.amount), currency=pb.per_piece_price.currency) price_breaks.append(types.PriceBreak(pb.quantity, amount=amount)) return part def search_parts(in_path: Path, out_path: Path, log_path: Path, cache_dir: Path, store_code): mk_parents(log_path) with log_path.open("w") as log: run_search_parts(in_path, out_path, log, cache_dir, store_code) def run_search_parts(in_path: Path, out_path: Path, log, cache_dir: Path, store_code): in_db = load_db(in_path) engine = QueryEngine(cache_dir, log, store_code) pn_qs: MutableSet[QueryByPn] = set() spn_qs: MutableSet[QueryBySpn] = set() attr_qs: MutableSet[QueryByAttributes] = set() for xml in in_db.iterparts(): part = Part(xml) found_any = False for pn in part.get_mpns(): pn_qs.add(QueryByPn(pn.valueProp)) found_any = True for pn in part.get_spns(): spn_qs.add(QueryBySpn(pn.valueProp)) found_any = True if not found_any: print("Could not find anything for search by, checked product numbers and supplier product numbers") pn_queries = list(sorted(pn_qs)) spn_queries = list(sorted(spn_qs)) print("Executing {} manufacturer product number searches\n\n".format(len(pn_queries)), file=log) exception = False for q in pn_queries: res = engine.pn_search(q.pn) if res == SearchResponseTypes.EXCEPTION: exception = True break if not exception: print("Executing {} supplier product number searches\n\n".format(len(spn_queries)), file=log) for q in spn_queries: res = engine.spn_search(q.spn) if res == SearchResponseTypes.EXCEPTION: exception = True break part_db = PartDb() for part in engine.out_parts: part_db.add_entry(part, True) save_db(out_path, part_db, sort=True)