import enum from typing import List, Optional import re import requests from cachecontrol import CacheControl from cachecontrol.caches.file_cache import FileCache from cachecontrol.heuristics import ExpiresAfter from functools import total_ordering from lxml import html import urllib.parse def normalize_filename(part): return part.replace('/', '_').replace(' ', '_') def _clean(s): if s is None: return None s = s.strip() return None if len(s) == 0 else s def _to_string(e): s = "" for t in e.itertext(): s += t return s.strip() def _parse_int(s): return int(s.replace(',', '').replace('.', '')) def _to_int(s): try: return _parse_int(s) except ValueError: return None def _id_from_url(url): if url is None: return None m = re.search(r".*/([0-9]+)", url) return m.group(1) if m else None def _first(collection, default=None): return next(iter(collection), default) class Digikey(object): def __init__(self): self.attribute_types = {} def get_attribute_type(self, key, label): try: return self.attribute_types[key] except KeyError: a = DigikeyAttributeType(key, label) self.attribute_types[key] = a return a @total_ordering class DigikeyProduct(object): def __init__(self, part_number, mpn, attributes=None, categories=None): self.part_number = _clean(part_number) self.mpn = _clean(mpn) self.attributes = attributes or [] self.categories = categories or [] self.quantity_available = None self.description = None assert self.part_number assert self.mpn def __eq__(self, other: "DigikeyProduct") -> bool: return self.part_number == other.part_number def __lt__(self, other: "DigikeyProduct") -> bool: return self.part_number < other.part_number def __hash__(self): return self.part_number.__hash__() def to_yaml(self): yaml = {"part_number": self.part_number} if self.mpn: yaml["mpn"] = self.mpn yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value} for a in self.attributes] return yaml class DigikeyAttributeType(object): def __init__(self, _id, label): self.id = _id self.label = label assert self.id assert self.label class DigikeyAttributeValue(object): def __init__(self, value, attribute_type): self.value = value self.attribute_type = attribute_type assert self.value assert self.attribute_type @total_ordering class DigikeyProductCategory(object): def __init__(self, _id, label, digikey_url=None, parent=None): self.id = _clean(_id) self.label = _clean(label) self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ "https://www.digikey.com" + digikey_url self.parent = parent # type: DigikeyProductCategory self.subCategories = [] # type: List[DigikeyProductCategory assert self.id assert self.label def __eq__(self, other: "DigikeyProductCategory"): return self.id == other.id def __lt__(self, other: "DigikeyProductCategory") -> bool: return self.label < other.label def add_sub_category(self, _id, label, digikey_url): sc = DigikeyProductCategory(_id, label, digikey_url=digikey_url, parent=self) self.subCategories.append(sc) def find_sub_category_by_label(self, label): return next((sc for sc in self.subCategories if sc.label == label), None) class SearchResponseTypes(enum.Enum): MANY = 1 SINGLE = 2 TOO_MANY = 3 NO_MATCHES = 4 class DigikeySearchResponse(object): def __init__(self, count: int, response_type: SearchResponseTypes): self.count = count self.response_type = response_type self.products = list() def append(self, product): self.products.append(product) class DigikeyClient(object): def __nop(self, message): pass def __init__(self, digikey: Digikey, on_download=None): self.digikey = digikey self.on_download = on_download or self.__nop cache = FileCache('digikey_cache', forever=True) self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1)) # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1)) # self.sess = requests.Session() # self.sess.mount('http://', adapter) # self.sess.mount('https://', adapter) def _req(self, url, params=None): if not url.startswith("http"): url = "https://www.digikey.com" + url s = "" if not params else "?" + urllib.parse.urlencode(params) self.on_download("Downloading {}".format(url + s)) return self.sess.get(url, params=params) def _search_process_single_result(self, tree: html) -> Optional[DigikeyProduct]: attributes = [] categories = [] part_number = mpn = None for n in tree.xpath("//*[@itemprop='productID' and @content]"): part_number = n.get("content") part_number = part_number.replace('sku:', '') for n in tree.xpath("//*[@itemprop='name' and @content]"): mpn = n.get("content") for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"): tds = tr.xpath("th|td") if len(tds) != 3: continue label = tds[0].text.strip() value = tds[1].text.strip() if len(label) == 0 or len(value) == 0: continue checkbox = tds[2].xpath("input[@type='checkbox' and @name]") try: name = checkbox[0].get("name") attribute_type_id = _to_int(name.replace('pv', '')) except IndexError: continue if attribute_type_id: a_type = self.digikey.get_attribute_type(attribute_type_id, label) attributes.append(DigikeyAttributeValue(value, a_type)) if part_number and mpn: p = DigikeyProduct(part_number, mpn, attributes, categories) for n in tree.xpath("//*[@itemprop='description']"): p.description = _to_string(n) return p return None def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse): products = tree.xpath("//*[@itemtype='http://schema.org/Product']") for product in products: part_number = _first(product.xpath("//*[@itemprop='productid' and @content]")) mpn = _first(product.xpath("//*[@itemprop='name']")) if part_number is not None and mpn is not None: res.append(DigikeyProduct( part_number.get("content").strip().replace('sku:', ''), mpn.text)) return len(products) def search(self, query: str, page_size=10) -> DigikeySearchResponse: # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size), 'x': 0, 'y': 0} params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} page = self._req("https://www.digikey.com/products/en", params=params) # print("page: ") # print(page.content) tree = html.fromstring(page.content) count = _first([_parse_int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]) if count: product_table = _first(tree.xpath("//table[@id='productTable']")) if product_table is not None: res = DigikeySearchResponse(count, SearchResponseTypes.MANY) self._search_process_multiple_results(product_table, res) return res else: # If the search matches multiple product categories the user has to select the appropriate category # first return DigikeySearchResponse(count, SearchResponseTypes.TOO_MANY) else: p = self._search_process_single_result(tree) if p: res = DigikeySearchResponse(1, SearchResponseTypes.SINGLE) res.append(p) return res else: return DigikeySearchResponse(1, SearchResponseTypes.NO_MATCHES)