From d59fb211556cd9b5a2bc028c5cf8a37b891cbfb3 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 3 Sep 2017 11:21:17 +0200 Subject: o Adding tools to download facts about parts from Digi-Key. --- src/ee/digikey/__init__.py | 240 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 240 insertions(+) create mode 100644 src/ee/digikey/__init__.py (limited to 'src/ee/digikey') diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py new file mode 100644 index 0000000..a2ccd03 --- /dev/null +++ b/src/ee/digikey/__init__.py @@ -0,0 +1,240 @@ +import re +from functools import total_ordering + +import requests +from cachecontrol import CacheControl +from cachecontrol import CacheControlAdapter +from cachecontrol.caches.file_cache import FileCache +from cachecontrol.heuristics import ExpiresAfter +from lxml import html +from typing import List + + +def normalize_filename(part): + return part.replace('/', '_').replace(' ', '_') + + +def _clean(s): + if s is None: + return None + s = s.strip() + return None if len(s) == 0 else s + + +def _to_string(e): + s = "" + for t in e.itertext(): + s += t + return s.strip() + + +def _to_int(s): + try: + return int(s) + except ValueError: + return None + + +def _id_from_url(url): + if url is None: + return None + m = re.search(r".*/([0-9]+)", url) + return m.group(1) if m else None + + +class Digikey(object): + def __init__(self): + self.attribute_types = {} + + def get_attribute_type(self, id, label): + try: + return self.attribute_types[id] + except KeyError: + a = DigikeyAttributeType(id, label) + self.attribute_types[id] = a + return a + + +@total_ordering +class DigikeyProduct(object): + def __init__(self, part_number, mpn, attributes, categories): + self.part_number = _clean(part_number) + self.mpn = _clean(mpn) + self.attributes = attributes + self.categories = categories + self.quantity_available = None + self.description = None + + assert self.part_number + assert self.mpn + + def __eq__(self, other): + # type: (DigikeyProduct, DigikeyProduct) -> bool + return self.part_number == other.part_number + + def __lt__(self, other): + # type: (DigikeyProduct, DigikeyProduct) -> bool + return self.part_number < other.part_number + + def __hash__(self): + return self.part_number.__hash__() + + def to_yaml(self): + yaml = {"part_number": self.part_number} + if self.mpn: + yaml["mpn"] = self.mpn + yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value} + for a in self.attributes] + return yaml + + +class DigikeyAttributeType(object): + def __init__(self, id, label): + self.id = id + self.label = label + + assert self.id + assert self.label + + +class DigikeyAttributeValue(object): + def __init__(self, value, attribute_type): + self.value = value + self.attribute_type = attribute_type + + assert self.value + assert self.attribute_type + + +@total_ordering +class DigikeyProductCategory(object): + def __init__(self, id, label, digikey_url=None, parent=None): + self.id = _clean(id) + self.label = _clean(label) + self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ + "https://www.digikey.com" + digikey_url + self.parent = parent # type: DigikeyProductCategory + self.subCategories = [] # type: List[DigikeyProductCategory + + assert self.id is not None + assert self.label is not None + + def __eq__(self, other): + # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool + return self.id == other.id + + def __lt__(self, other): + # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool + return self.label < other.label + + def add_sub_category(self, id, label, digikey_url): + sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self) + self.subCategories.append(sc) + + def find_sub_category_by_label(self, label): + return next((sc for sc in self.subCategories if sc.label == label), None) + + +class DigikeySearchResponse(object): + def __init__(self): + self.products = set() + + def append(self, product): + self.products.add(product) + + +class DigikeyClient(object): + def __nop(self): + pass + + def __init__(self, digikey: Digikey, on_download=None): + self.digikey = digikey + self.on_download = on_download or self.__nop + cache = FileCache('digikey_cache', forever=True) + self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1)) + + # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1)) + # self.sess = requests.Session() + # self.sess.mount('http://', adapter) + # self.sess.mount('https://', adapter) + + def req(self, url, params=None): + if not url.startswith("http"): + url = "https://www.digikey.com" + url + s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()]) + self.on_download("Downloading {}".format(url + s)) + return self.sess.get(url, params=params) + + def _search_process_single_result(self, url: str, tree: html) -> DigikeyProduct: + attributes = [] + categories = [] + + part_number = mpn = None + for n in tree.xpath("//*[@itemprop='productID' and @content]"): + part_number = n.get("content") + part_number = part_number.replace('sku:', '') + for n in tree.xpath("//*[@itemprop='name' and @content]"): + mpn = n.get("content") + + for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"): + tds = tr.xpath("th|td") + if len(tds) != 3: + continue + label = tds[0].text.strip() + value = tds[1].text.strip() + + if len(label) == 0 or len(value) == 0: + continue + + checkbox = tds[2].xpath("input[@type='checkbox' and @name]") + try: + name = checkbox[0].get("name") + attribute_type_id = _to_int(name.replace('pv', '')) + except IndexError: + continue + + if attribute_type_id: + a_type = self.digikey.get_attribute_type(attribute_type_id, label) + attributes.append(DigikeyAttributeValue(value, a_type)) + + if part_number and mpn: + p = DigikeyProduct(part_number, mpn, attributes, categories) + for n in tree.xpath("//*[@itemprop='description']"): + p.description = _to_string(n) + return p + + return None + + def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse): + + product_ids = [e.get("content").strip().replace('sku:', '') for e in + tree.xpath("//*[@itemprop='productid' and @content]")] + + for product_id in product_ids: + tmp = self.search(product_id) + if isinstance(tmp, DigikeyProduct): + res.append(tmp) + else: + [res.append(p) for p in tmp.products] + + return len(product_ids) + + def search(self, query: str) -> DigikeySearchResponse: + page_size = 10 + + # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND + params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} + page = self.req("https://www.digikey.com/products/en", params=params) + # print("page: ") + # print(page.content) + + tree = html.fromstring(page.content) + + count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0) + + if count == 0: + return self._search_process_single_result(page.url, tree) + else: + res = DigikeySearchResponse() + self._search_process_multiple_results(tree, res) + return res -- cgit v1.2.3