import configparser import enum import glob import os import os.path import re import urllib.parse from functools import total_ordering from typing import List, Optional import requests from cachecontrol import CacheControl from cachecontrol.caches.file_cache import FileCache from cachecontrol.heuristics import ExpiresAfter from lxml import html import ee._utils from ee.tools import mk_parents def normalize_filename(part): return part.replace('/', '_').replace(' ', '_') def _clean(s): if s is None: return None s = s.strip() return None if len(s) == 0 else s def _to_string(e): s = "" for t in e.itertext(): s += t return s.strip() def _parse_int(s): return int(s.replace(',', '').replace('.', '')) def _to_int(s): try: return _parse_int(s) except ValueError: return None def _id_from_url(url): if url is None: return None m = re.search(r".*/([0-9]+)", url) return m.group(1) if m else None def _first(collection, default=None): return next(iter(collection), default) class Digikey(object): def __init__(self): self.attribute_types = {} def get_attribute_type(self, key, label): try: return self.attribute_types[key] except KeyError: a = DigikeyAttributeType(key, label) self.attribute_types[key] = a return a @total_ordering class DigikeyProduct(object): def __init__(self, part_number, mpn, url, attributes=None, categories=None): self.part_number = _clean(part_number) self.mpn = _clean(mpn) self.url = url self.attributes = attributes or [] self.categories = categories or [] self.quantity_available = None self.description = None assert self.part_number assert self.mpn def __eq__(self, other: "DigikeyProduct") -> bool: return self.part_number == other.part_number def __lt__(self, other: "DigikeyProduct") -> bool: return self.part_number < other.part_number def __hash__(self): return self.part_number.__hash__() def attribute_by_id(self, _id): return next((a for a in self.attributes if a.attribute_type.id == _id), None) def to_ini(self, c: configparser.ConfigParser): def set(cfg, key, value): if value: cfg[key] = value c["overview"] = {} overview = c["overview"] overview["part_number"] = self.part_number set(overview, "url", self.url) if self.mpn: overview["mpn"] = self.mpn c["attributes"] = {} attributes = c["attributes"] for a in self.attributes: key = "{}/{}".format(a.attribute_type.id, a.attribute_type.label) key = key.replace("%", "_") value = a.value.replace("%", "%%") attributes[key] = value return c def _to_pandas_dict(self): value = { "MPN": self.mpn, "Digi-Key": self.part_number, "URL": self.url, } for a in self.attributes: value[a.attribute_type.label] = a.value return value from_ini_r = re.compile("([^/]*)/(.*)") @staticmethod def from_ini(digikey, c): def get(_c, key): try: return _c[key] except KeyError: return None overview = c["overview"] attributes = [] for k, value in c.items("attributes"): (type_id, label) = DigikeyProduct.from_ini_r.match(k).groups() a_type = digikey.get_attribute_type(int(type_id), label) attributes.append(DigikeyAttributeValue(value, a_type)) return DigikeyProduct(overview["part_number"], overview["mpn"], get(overview, "url"), attributes) class DigikeyAttributeType(object): def __init__(self, _id, label): self.id = _id self.label = label assert self.id assert self.label class DigikeyAttributeValue(object): def __init__(self, value, attribute_type): self.value = value self.attribute_type = attribute_type assert self.value assert self.attribute_type @total_ordering class DigikeyProductCategory(object): def __init__(self, _id, label, digikey_url=None, parent=None): self.id = _clean(_id) self.label = _clean(label) self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ "https://www.digikey.com" + digikey_url self.parent = parent # type: DigikeyProductCategory self.subCategories = [] # type: List[DigikeyProductCategory] assert self.id assert self.label def __eq__(self, other: "DigikeyProductCategory"): return self.id == other.id def __lt__(self, other: "DigikeyProductCategory") -> bool: return self.label < other.label def add_sub_category(self, _id, label, digikey_url): sc = DigikeyProductCategory(_id, label, digikey_url=digikey_url, parent=self) self.subCategories.append(sc) def find_sub_category_by_label(self, label): return next((sc for sc in self.subCategories if sc.label == label), None) class SearchResponseTypes(enum.Enum): MANY = 1 SINGLE = 2 TOO_MANY = 3 NO_MATCHES = 4 class DigikeySearchResponse(object): def __init__(self, count: int, response_type: SearchResponseTypes): self.count = count self.response_type = response_type self.products = list() # type: List[DigikeyProduct] def append(self, product: DigikeyProduct): self.products.append(product) class DigikeyClient(object): def __nop(self, message): pass def __init__(self, digikey: Digikey, cache_dir=None, on_download=None): self.digikey = digikey self.on_download = on_download or self.__nop cache = FileCache(cache_dir or 'digikey_cache', forever=True) self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=10*365)) def _req(self, url, params=None): if not url.startswith("http"): url = "https://www.digikey.com" + url s = "" if not params else "?" + urllib.parse.urlencode(params) self.on_download("Downloading {}".format(url + s)) return self.sess.get(url, params=params) def _search_process_single_result(self, tree: html) -> Optional[DigikeyProduct]: attributes = [] categories = [] url = _first((link.get("href") for link in tree.xpath("/html/head/link[@rel='canonical' and @href]"))) part_number = mpn = None for n in tree.xpath("//*[@itemprop='productID' and @content]"): part_number = n.get("content") part_number = part_number.replace('sku:', '') for n in tree.xpath("//*[@itemprop='name' and @content]"): mpn = n.get("content") for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"): tds = tr.xpath("th|td") if len(tds) != 3: continue label = _to_string(tds[0]) value = _to_string(tds[1]) if len(label) == 0 or len(value) == 0: continue checkbox = tds[2].xpath("input[@type='checkbox' and @name]") try: name = checkbox[0].get("name") attribute_type_id = _to_int(name.replace('pv', '')) except IndexError: continue if attribute_type_id: a_type = self.digikey.get_attribute_type(attribute_type_id, label) attributes.append(DigikeyAttributeValue(value, a_type)) if part_number and mpn: p = DigikeyProduct(part_number, mpn, url, attributes, categories) for n in tree.xpath("//*[@itemprop='description']"): p.description = _to_string(n) return p return None @staticmethod def _handle_product_table(tree: html, res: DigikeySearchResponse): products = tree.xpath("//*[@itemtype='http://schema.org/Product']") for product in products: url = _first((a.get("href") for a in product.xpath(".//*[@class='tr-image']//a[@href]"))) part_number = _first(product.xpath(".//*[@itemprop='productid' and @content]")) mpn = _first(product.xpath(".//*[@itemprop='name']")) if part_number is not None and mpn is not None: res.append(DigikeyProduct( part_number.get("content").strip().replace('sku:', ''), mpn.text, url)) return len(products) @staticmethod def _handle_exact_part_list(tree: html, res: DigikeySearchResponse): products = tree.xpath(".//tr[@class='exactPart']") for product in products: a = _first((a for a in product.xpath(".//td/span/a[@href]"))) part_number = _first(product.xpath(".//span[last()]")) if a is not None and part_number is not None: url = a.get("href") mpn = a.text res.append(DigikeyProduct(part_number.text, mpn, url)) return len(products) def search(self, query: str, page_size=10) -> DigikeySearchResponse: params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} page = self._req("https://www.digikey.com/products/en", params=params) return self.parse_string(page.content) def parse_string(self, page_content: str): tree = html.fromstring(page_content) count = _first([_parse_int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]) if count: product_table = _first(tree.xpath("//table[@id='productTable']")) exact_part_list = _first(tree.xpath("//table[@id='exactPartList']")) if product_table is not None: res = DigikeySearchResponse(count, SearchResponseTypes.MANY) self._handle_product_table(product_table, res) return res elif exact_part_list is not None: res = DigikeySearchResponse(count, SearchResponseTypes.MANY) self._handle_exact_part_list(exact_part_list, res) return res else: # If the search matches multiple product categories the user has to select the appropriate category # first return DigikeySearchResponse(count, SearchResponseTypes.TOO_MANY) else: p = self._search_process_single_result(tree) if p: res = DigikeySearchResponse(1, SearchResponseTypes.SINGLE) res.append(p) return res else: return DigikeySearchResponse(1, SearchResponseTypes.NO_MATCHES) class DigikeyRepository(object): def __init__(self, digikey, path): self._digikey = digikey self._path = path self._products = {} def _mpn_to_path(self, mpn): mpn = mpn.replace("/", "_").replace(" ", "_") return "{}/{}.ini".format(self._path, mpn) def has_product(self, mpn=None, dpn=None): if mpn is not None: filename = self._mpn_to_path(mpn) return os.path.isfile(filename) if dpn is not None: for p in self.products: if p.part_number == dpn: return p def save(self, product: DigikeyProduct): c = self._make_configparser() y = product.to_ini(c) filename = self._mpn_to_path(product.mpn) mk_parents(filename) with open(filename, "w") as f: y.write(f) def load_all(self): [self._load(path) for path in glob.glob(self._path + "/*.ini")] @staticmethod def _make_configparser(): c = configparser.ConfigParser() c.optionxform = str return c def _load(self, path): c = self._make_configparser() c.read(path) p = DigikeyProduct.from_ini(self._digikey, c) self._products[p.mpn] = p return p @property def products(self): self.load_all() return self._products.values() def find_by_mpn(self, mpn): for p in self.products: if p.mpn == mpn: return p def to_pandas(self): import pandas data = [part._to_pandas_dict() for part in self.products] index = [part.mpn for part in self.products] df = pandas.DataFrame(data=data, index=index) return ee._utils.ensure_has_columns(df, ["MPN", "Digi-Key", "URL"])