import re from functools import total_ordering import requests from cachecontrol import CacheControl from cachecontrol import CacheControlAdapter from cachecontrol.caches.file_cache import FileCache from cachecontrol.heuristics import ExpiresAfter from lxml import html from typing import List def normalize_filename(part): return part.replace('/', '_').replace(' ', '_') def _clean(s): if s is None: return None s = s.strip() return None if len(s) == 0 else s def _to_string(e): s = "" for t in e.itertext(): s += t return s.strip() def _to_int(s): try: return int(s) except ValueError: return None def _id_from_url(url): if url is None: return None m = re.search(r".*/([0-9]+)", url) return m.group(1) if m else None class Digikey(object): def __init__(self): self.attribute_types = {} def get_attribute_type(self, id, label): try: return self.attribute_types[id] except KeyError: a = DigikeyAttributeType(id, label) self.attribute_types[id] = a return a @total_ordering class DigikeyProduct(object): def __init__(self, part_number, mpn, attributes, categories): self.part_number = _clean(part_number) self.mpn = _clean(mpn) self.attributes = attributes self.categories = categories self.quantity_available = None self.description = None assert self.part_number assert self.mpn def __eq__(self, other): # type: (DigikeyProduct, DigikeyProduct) -> bool return self.part_number == other.part_number def __lt__(self, other): # type: (DigikeyProduct, DigikeyProduct) -> bool return self.part_number < other.part_number def __hash__(self): return self.part_number.__hash__() def to_yaml(self): yaml = {"part_number": self.part_number} if self.mpn: yaml["mpn"] = self.mpn yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value} for a in self.attributes] return yaml class DigikeyAttributeType(object): def __init__(self, id, label): self.id = id self.label = label assert self.id assert self.label class DigikeyAttributeValue(object): def __init__(self, value, attribute_type): self.value = value self.attribute_type = attribute_type assert self.value assert self.attribute_type @total_ordering class DigikeyProductCategory(object): def __init__(self, id, label, digikey_url=None, parent=None): self.id = _clean(id) self.label = _clean(label) self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ "https://www.digikey.com" + digikey_url self.parent = parent # type: DigikeyProductCategory self.subCategories = [] # type: List[DigikeyProductCategory assert self.id is not None assert self.label is not None def __eq__(self, other): # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool return self.id == other.id def __lt__(self, other): # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool return self.label < other.label def add_sub_category(self, id, label, digikey_url): sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self) self.subCategories.append(sc) def find_sub_category_by_label(self, label): return next((sc for sc in self.subCategories if sc.label == label), None) class DigikeySearchResponse(object): def __init__(self): self.products = set() def append(self, product): self.products.add(product) class DigikeyClient(object): def __nop(self): pass def __init__(self, digikey: Digikey, on_download=None): self.digikey = digikey self.on_download = on_download or self.__nop cache = FileCache('digikey_cache', forever=True) self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1)) # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1)) # self.sess = requests.Session() # self.sess.mount('http://', adapter) # self.sess.mount('https://', adapter) def req(self, url, params=None): if not url.startswith("http"): url = "https://www.digikey.com" + url s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()]) self.on_download("Downloading {}".format(url + s)) return self.sess.get(url, params=params) def _search_process_single_result(self, url: str, tree: html) -> DigikeyProduct: attributes = [] categories = [] part_number = mpn = None for n in tree.xpath("//*[@itemprop='productID' and @content]"): part_number = n.get("content") part_number = part_number.replace('sku:', '') for n in tree.xpath("//*[@itemprop='name' and @content]"): mpn = n.get("content") for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"): tds = tr.xpath("th|td") if len(tds) != 3: continue label = tds[0].text.strip() value = tds[1].text.strip() if len(label) == 0 or len(value) == 0: continue checkbox = tds[2].xpath("input[@type='checkbox' and @name]") try: name = checkbox[0].get("name") attribute_type_id = _to_int(name.replace('pv', '')) except IndexError: continue if attribute_type_id: a_type = self.digikey.get_attribute_type(attribute_type_id, label) attributes.append(DigikeyAttributeValue(value, a_type)) if part_number and mpn: p = DigikeyProduct(part_number, mpn, attributes, categories) for n in tree.xpath("//*[@itemprop='description']"): p.description = _to_string(n) return p return None def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse): product_ids = [e.get("content").strip().replace('sku:', '') for e in tree.xpath("//*[@itemprop='productid' and @content]")] for product_id in product_ids: tmp = self.search(product_id) if isinstance(tmp, DigikeyProduct): res.append(tmp) else: [res.append(p) for p in tmp.products] return len(product_ids) def search(self, query: str) -> DigikeySearchResponse: page_size = 10 # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} page = self.req("https://www.digikey.com/products/en", params=params) # print("page: ") # print(page.content) tree = html.fromstring(page.content) count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0) if count == 0: return self._search_process_single_result(page.url, tree) else: res = DigikeySearchResponse() self._search_process_multiple_results(tree, res) return res