diff options
Diffstat (limited to 'src/ee/digikey')
-rw-r--r-- | src/ee/digikey/__init__.py | 355 |
1 files changed, 175 insertions, 180 deletions
diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py index a2ccd03..8acde6b 100644 --- a/src/ee/digikey/__init__.py +++ b/src/ee/digikey/__init__.py @@ -3,238 +3,233 @@ from functools import total_ordering import requests from cachecontrol import CacheControl -from cachecontrol import CacheControlAdapter from cachecontrol.caches.file_cache import FileCache from cachecontrol.heuristics import ExpiresAfter from lxml import html -from typing import List +from typing import List, Optional def normalize_filename(part): - return part.replace('/', '_').replace(' ', '_') + return part.replace('/', '_').replace(' ', '_') def _clean(s): - if s is None: - return None - s = s.strip() - return None if len(s) == 0 else s + if s is None: + return None + s = s.strip() + return None if len(s) == 0 else s def _to_string(e): - s = "" - for t in e.itertext(): - s += t - return s.strip() + s = "" + for t in e.itertext(): + s += t + return s.strip() def _to_int(s): - try: - return int(s) - except ValueError: - return None + try: + return int(s) + except ValueError: + return None def _id_from_url(url): - if url is None: - return None - m = re.search(r".*/([0-9]+)", url) - return m.group(1) if m else None + if url is None: + return None + m = re.search(r".*/([0-9]+)", url) + return m.group(1) if m else None class Digikey(object): - def __init__(self): - self.attribute_types = {} + def __init__(self): + self.attribute_types = {} - def get_attribute_type(self, id, label): - try: - return self.attribute_types[id] - except KeyError: - a = DigikeyAttributeType(id, label) - self.attribute_types[id] = a - return a + def get_attribute_type(self, key, label): + try: + return self.attribute_types[key] + except KeyError: + a = DigikeyAttributeType(key, label) + self.attribute_types[key] = a + return a @total_ordering class DigikeyProduct(object): - def __init__(self, part_number, mpn, attributes, categories): - self.part_number = _clean(part_number) - self.mpn = _clean(mpn) - self.attributes = attributes - self.categories = categories - self.quantity_available = None - self.description = None + def __init__(self, part_number, mpn, attributes, categories): + self.part_number = _clean(part_number) + self.mpn = _clean(mpn) + self.attributes = attributes + self.categories = categories + self.quantity_available = None + self.description = None - assert self.part_number - assert self.mpn + assert self.part_number + assert self.mpn - def __eq__(self, other): - # type: (DigikeyProduct, DigikeyProduct) -> bool - return self.part_number == other.part_number + def __eq__(self, other: "DigikeyProduct") -> bool: + return self.part_number == other.part_number - def __lt__(self, other): - # type: (DigikeyProduct, DigikeyProduct) -> bool - return self.part_number < other.part_number + def __lt__(self, other: "DigikeyProduct") -> bool: + return self.part_number < other.part_number - def __hash__(self): - return self.part_number.__hash__() + def __hash__(self): + return self.part_number.__hash__() - def to_yaml(self): - yaml = {"part_number": self.part_number} - if self.mpn: - yaml["mpn"] = self.mpn - yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value} - for a in self.attributes] - return yaml + def to_yaml(self): + yaml = {"part_number": self.part_number} + if self.mpn: + yaml["mpn"] = self.mpn + yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value} + for a in self.attributes] + return yaml class DigikeyAttributeType(object): - def __init__(self, id, label): - self.id = id - self.label = label + def __init__(self, id, label): + self.id = id + self.label = label - assert self.id - assert self.label + assert self.id + assert self.label class DigikeyAttributeValue(object): - def __init__(self, value, attribute_type): - self.value = value - self.attribute_type = attribute_type + def __init__(self, value, attribute_type): + self.value = value + self.attribute_type = attribute_type - assert self.value - assert self.attribute_type + assert self.value + assert self.attribute_type @total_ordering class DigikeyProductCategory(object): - def __init__(self, id, label, digikey_url=None, parent=None): - self.id = _clean(id) - self.label = _clean(label) - self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ - "https://www.digikey.com" + digikey_url - self.parent = parent # type: DigikeyProductCategory - self.subCategories = [] # type: List[DigikeyProductCategory + def __init__(self, id, label, digikey_url=None, parent=None): + self.id = _clean(id) + self.label = _clean(label) + self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ + "https://www.digikey.com" + digikey_url + self.parent = parent # type: DigikeyProductCategory + self.subCategories = [] # type: List[DigikeyProductCategory - assert self.id is not None - assert self.label is not None + assert self.id is not None + assert self.label is not None - def __eq__(self, other): - # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool - return self.id == other.id + def __eq__(self, other: "DigikeyProductCategory"): + return self.id == other.id - def __lt__(self, other): - # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool - return self.label < other.label + def __lt__(self, other: "DigikeyProductCategory") -> bool: + return self.label < other.label - def add_sub_category(self, id, label, digikey_url): - sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self) - self.subCategories.append(sc) + def add_sub_category(self, id, label, digikey_url): + sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self) + self.subCategories.append(sc) - def find_sub_category_by_label(self, label): - return next((sc for sc in self.subCategories if sc.label == label), None) + def find_sub_category_by_label(self, label): + return next((sc for sc in self.subCategories if sc.label == label), None) class DigikeySearchResponse(object): - def __init__(self): - self.products = set() + def __init__(self): + self.products = set() - def append(self, product): - self.products.add(product) + def append(self, product): + self.products.add(product) class DigikeyClient(object): - def __nop(self): - pass - - def __init__(self, digikey: Digikey, on_download=None): - self.digikey = digikey - self.on_download = on_download or self.__nop - cache = FileCache('digikey_cache', forever=True) - self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1)) - - # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1)) - # self.sess = requests.Session() - # self.sess.mount('http://', adapter) - # self.sess.mount('https://', adapter) - - def req(self, url, params=None): - if not url.startswith("http"): - url = "https://www.digikey.com" + url - s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()]) - self.on_download("Downloading {}".format(url + s)) - return self.sess.get(url, params=params) - - def _search_process_single_result(self, url: str, tree: html) -> DigikeyProduct: - attributes = [] - categories = [] - - part_number = mpn = None - for n in tree.xpath("//*[@itemprop='productID' and @content]"): - part_number = n.get("content") - part_number = part_number.replace('sku:', '') - for n in tree.xpath("//*[@itemprop='name' and @content]"): - mpn = n.get("content") - - for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"): - tds = tr.xpath("th|td") - if len(tds) != 3: - continue - label = tds[0].text.strip() - value = tds[1].text.strip() - - if len(label) == 0 or len(value) == 0: - continue - - checkbox = tds[2].xpath("input[@type='checkbox' and @name]") - try: - name = checkbox[0].get("name") - attribute_type_id = _to_int(name.replace('pv', '')) - except IndexError: - continue - - if attribute_type_id: - a_type = self.digikey.get_attribute_type(attribute_type_id, label) - attributes.append(DigikeyAttributeValue(value, a_type)) - - if part_number and mpn: - p = DigikeyProduct(part_number, mpn, attributes, categories) - for n in tree.xpath("//*[@itemprop='description']"): - p.description = _to_string(n) - return p - - return None - - def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse): - - product_ids = [e.get("content").strip().replace('sku:', '') for e in - tree.xpath("//*[@itemprop='productid' and @content]")] - - for product_id in product_ids: - tmp = self.search(product_id) - if isinstance(tmp, DigikeyProduct): - res.append(tmp) - else: - [res.append(p) for p in tmp.products] - - return len(product_ids) - - def search(self, query: str) -> DigikeySearchResponse: - page_size = 10 - - # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND - params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} - page = self.req("https://www.digikey.com/products/en", params=params) - # print("page: ") - # print(page.content) - - tree = html.fromstring(page.content) - - count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0) - - if count == 0: - return self._search_process_single_result(page.url, tree) - else: - res = DigikeySearchResponse() - self._search_process_multiple_results(tree, res) - return res + def __nop(self): + pass + + def __init__(self, digikey: Digikey, on_download=None): + self.digikey = digikey + self.on_download = on_download or self.__nop + cache = FileCache('digikey_cache', forever=True) + self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1)) + + # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1)) + # self.sess = requests.Session() + # self.sess.mount('http://', adapter) + # self.sess.mount('https://', adapter) + + def req(self, url, params=None): + if not url.startswith("http"): + url = "https://www.digikey.com" + url + s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()]) + self.on_download("Downloading {}".format(url + s)) + return self.sess.get(url, params=params) + + def _search_process_single_result(self, url: str, tree: html) -> Optional[DigikeyProduct]: + attributes = [] + categories = [] + + part_number = mpn = None + for n in tree.xpath("//*[@itemprop='productID' and @content]"): + part_number = n.get("content") + part_number = part_number.replace('sku:', '') + for n in tree.xpath("//*[@itemprop='name' and @content]"): + mpn = n.get("content") + + for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"): + tds = tr.xpath("th|td") + if len(tds) != 3: + continue + label = tds[0].text.strip() + value = tds[1].text.strip() + + if len(label) == 0 or len(value) == 0: + continue + + checkbox = tds[2].xpath("input[@type='checkbox' and @name]") + try: + name = checkbox[0].get("name") + attribute_type_id = _to_int(name.replace('pv', '')) + except IndexError: + continue + + if attribute_type_id: + a_type = self.digikey.get_attribute_type(attribute_type_id, label) + attributes.append(DigikeyAttributeValue(value, a_type)) + + if part_number and mpn: + p = DigikeyProduct(part_number, mpn, attributes, categories) + for n in tree.xpath("//*[@itemprop='description']"): + p.description = _to_string(n) + return p + + return None + + def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse): + + product_ids = [e.get("content").strip().replace('sku:', '') for e in + tree.xpath("//*[@itemprop='productid' and @content]")] + + for product_id in product_ids: + tmp = self.search(product_id) + if isinstance(tmp, DigikeyProduct): + res.append(tmp) + else: + [res.append(p) for p in tmp.products] + + return len(product_ids) + + def search(self, query: str): + page_size = 10 + + # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND + params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} + page = self.req("https://www.digikey.com/products/en", params=params) + # print("page: ") + # print(page.content) + + tree = html.fromstring(page.content) + + count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0) + + if count == 0: + return self._search_process_single_result(page.url, tree) + else: + res = DigikeySearchResponse() + self._search_process_multiple_results(tree, res) + return res |