import configparser import enum import glob import os import os.path import re import urllib.parse from functools import total_ordering from pathlib import Path from typing import List, Optional from urllib.parse import urlparse, parse_qs from lxml import html from selenium import webdriver import ee._utils from ee.money import Money, get_default_context from ee.tools import mk_parents money = get_default_context() def normalize_filename(part) -> str: return part.replace('/', '_').replace(' ', '_') def _clean(s) -> Optional[str]: if s is None: return None s = s.strip() return None if len(s) == 0 else s def _to_string(e) -> str: s = "" for t in e.itertext(): s += t return s.strip() def _parse_int(s) -> int: return int(s.replace(',', '').replace('.', '')) def _to_int(s) -> Optional[int]: try: return _parse_int(s) except ValueError: return None def _id_from_url(url): if url is None: return None m = re.search(r".*/([0-9]+)", url) return m.group(1) if m else None def _first(collection, default=None): return next(iter(collection), default) class DigikeyStore(object): BASEURL = "http://purl.org/ee/supplier/digikey" def __init__(self, url, store, products_url): self.url = url self.store = store self.products_url = products_url @staticmethod def from_store_code(store_code): url = "{}?store={}".format(DigikeyStore.BASEURL, store_code) products_url = "https://www.digikey.com/products/en" if store_code == "us" else \ "https://www.digikey.{}/products/en".format(store_code) return DigikeyStore(url, store_code, products_url) @staticmethod def from_url(store_url) -> Optional["DigikeyStore"]: base = urlparse(DigikeyStore.BASEURL) url = urlparse(store_url) if base.scheme != url.scheme or \ base.netloc != url.netloc or \ base.path != url.path: return None q = parse_qs((url.query or "").strip()) store = q.get("store") if not store: return None del q["store"] if len(q): return None return DigikeyStore.from_store_code(store[0]) class Digikey(object): def __init__(self): self.attribute_types = {} def get_attribute_type(self, key, label): try: return self.attribute_types[key] except KeyError: a = DigikeyAttributeType(key, label) self.attribute_types[key] = a return a class PriceBreak(object): def __init__(self, quantity: int, per_piece_price: Money, per_quantity_price: Money): self.quantity = quantity self.per_piece_price = per_piece_price self.per_quantity_price = per_quantity_price class Document(object): def __init__(self, kind: str, title: str, url: str): self.kind = kind self.title = title self.url = url @total_ordering class DigikeyProduct(object): def __init__(self, part_number, mpn, url, attributes: List["DigikeyAttributeValue"] = None, categories=None): self.part_number = _clean(part_number) self.mpn = _clean(mpn) self.url = url self.attributes = attributes or [] # type: List["DigikeyAttributeValue"] self.categories = categories or [] self.quantity_available = None self.description = None self.price_breaks: List[PriceBreak] = [] self.documents: List[Document] = [] assert self.part_number assert self.mpn def __eq__(self, other: "DigikeyProduct") -> bool: return self.part_number == other.part_number def __lt__(self, other: "DigikeyProduct") -> bool: return self.part_number < other.part_number def __hash__(self): return self.part_number.__hash__() def attribute_by_id(self, _id): return next((a for a in self.attributes if a.attribute_type.id == _id), None) def to_ini(self, c: configparser.ConfigParser): def set(cfg, key, value): if value: cfg[key] = value c["overview"] = {} overview = c["overview"] overview["part_number"] = self.part_number set(overview, "url", self.url) if self.mpn: overview["mpn"] = self.mpn c["attributes"] = {} attributes = c["attributes"] for a in self.attributes: key = "{}/{}".format(a.attribute_type.id, a.attribute_type.label) key = key.replace("%", "_") value = a.value.replace("%", "%%") attributes[key] = value return c def _to_pandas_dict(self): value = { "MPN": self.mpn, "Digi-Key": self.part_number, "URL": self.url, } for a in self.attributes: value[a.attribute_type.label] = a.value return value from_ini_r = re.compile("([^/]*)/(.*)") @staticmethod def from_ini(digikey, c): def get(_c, key): try: return _c[key] except KeyError: return None overview = c["overview"] attributes = [] for k, value in c.items("attributes"): (type_id, label) = DigikeyProduct.from_ini_r.match(k).groups() a_type = digikey.get_attribute_type(int(type_id), label) attributes.append(DigikeyAttributeValue(value, a_type)) return DigikeyProduct(overview["part_number"], overview["mpn"], get(overview, "url"), attributes) class DigikeyAttributeType(object): def __init__(self, _id, label): self.id = _id self.label = label assert self.id assert self.label class DigikeyAttributeValue(object): def __init__(self, value, attribute_type: DigikeyAttributeType): self.value = value self.attribute_type = attribute_type assert self.value assert self.attribute_type @total_ordering class DigikeyProductCategory(object): def __init__(self, _id, label, digikey_url=None, parent=None): self.id = _clean(_id) self.label = _clean(label) self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ "https://www.digikey.com" + digikey_url self.parent: DigikeyProductCategory = parent self.subCategories: List[DigikeyProductCategory] = [] assert self.id assert self.label def __eq__(self, other: "DigikeyProductCategory"): return self.id == other.id def __lt__(self, other: "DigikeyProductCategory") -> bool: return self.label < other.label def add_sub_category(self, _id, label, digikey_url): sc = DigikeyProductCategory(_id, label, digikey_url=digikey_url, parent=self) self.subCategories.append(sc) def find_sub_category_by_label(self, label): return next((sc for sc in self.subCategories if sc.label == label), None) class SearchResponseTypes(enum.Enum): MANY = 1 # A product table was returned. SINGLE = 2 # A product page was returned TOO_MANY = 3 # A listing of categories was given, the user is expected to narrow down the search NO_MATCHES = 4 class DigikeySearchResponse(object): def __init__(self, count: int, response_type: SearchResponseTypes): self.count = count self.response_type = response_type self.products: List[DigikeyProduct] = list() def append(self, product: DigikeyProduct): self.products.append(product) class DigikeyClient(object): def __nop(self, message): pass def __init__(self, baseurl, cache_dir: Path = None, on_download=None): self.baseurl = baseurl self.on_download = on_download or self.__nop self.cache = ee._utils.maybe_cache(cache_dir) self.driver: Optional[webdriver.Chrome] = None def search(self, query: str, page_size=10) -> str: return self.product_search(query, page_size) def product_search(self, query: str, page_size=10) -> str: params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} cache_key = urllib.parse.quote(query) page = self._req(self.baseurl, cache_key=cache_key, params=params) return page def _req(self, url, cache_key, params=None): url = url + ("" if not params else "?" + urllib.parse.urlencode(params)) cached = self.cache.lookup(cache_key) if cached: self.on_download("Using cached {}".format(url)) return cached self.on_download("Downloading {}".format(url)) if self.driver is None: options = webdriver.ChromeOptions() self.driver = webdriver.Chrome(chrome_options=options) self.driver.get(url) src = self.driver.page_source self.cache.save(cache_key, src) return src def get_for_product_url(self, url, product_number): return self._req(url, "product-{}".format(product_number)) def get(self, url, cache_key, params=None): return self._req(url, cache_key, params) class DigikeyParser(object): def __init__(self, digikey: Digikey): self.digikey = digikey or Digikey() def _search_process_single_result(self, origin_url, tree: html) -> Optional[DigikeyProduct]: attributes = [] categories = [] url = _first((link.get("href") for link in tree.xpath("/html/head/link[@rel='canonical' and @href]"))) url = self.ensure_absolute_url(origin_url, url) part_number = mpn = None for n in tree.xpath("//*[@itemprop='productID' and @content]"): part_number = n.get("content") part_number = part_number.replace('sku:', '') for n in tree.xpath("//*[@itemprop='name' and @content]"): mpn = n.get("content") for tr in tree.xpath("//table[@id='product-attribute-table']/*/tr[not(@id)]"): tds = tr.xpath("th|td") if len(tds) != 3: continue label = _to_string(tds[0]) value = _to_string(tds[1]) if len(label) == 0 or len(value) == 0: continue checkbox = tds[2].xpath("input[@type='checkbox' and @name]") try: name = checkbox[0].get("name") attribute_type_id = _to_int(name.replace('pv', '')) except IndexError: continue if attribute_type_id: a_type = self.digikey.get_attribute_type(attribute_type_id, label) attributes.append(DigikeyAttributeValue(value, a_type)) if part_number and mpn: p = DigikeyProduct(part_number, mpn, url, attributes, categories) p.price_breaks = self._parse_price_breaks(tree) p.documents = self._parse_documents(tree) p.description = _to_string(_first(tree.xpath("//*[@id='product-overview']//*[@itemprop='description']"))) return p return None @staticmethod def _find_currency(tree: html) -> Optional[str]: for e in tree.xpath(".//div[@class='cur-dropdown']/ul"): s = _clean(e.text) if s: return s for e in tree.xpath("(//*[@class='locale--lang-cur']/*)[last()]"): s = _clean(e.text) if s: return s def _parse_price_breaks(self, tree: html) -> List[PriceBreak]: currency = self._find_currency(tree) price_breaks = [] ok = True for row in tree.xpath("//table[@class='product-dollars']//tr"): tds = list(row.xpath("./td")) if len(tds) != 3: continue tds = ["".join(td.xpath("./descendant-or-self::*/text()")) for td in tds] quantity = _to_int(tds[0]) price = money.try_parse(tds[1], currency=currency) if quantity is None or price is None: ok = False break price_breaks.append(PriceBreak(quantity=quantity, per_piece_price=price, per_quantity_price=price * quantity)) return price_breaks if ok else [] @staticmethod def _parse_documents(tree: html) -> List[Document]: docs = [] for row in tree.xpath("//*[@class='product-details-documents-media product-details-section']//tr"): kind: str = _first(row.xpath(".//th/text()")) if not kind: continue kind = kind.strip() for a in row.xpath(".//td//a[not(contains(@class, '-expander-toggle'))]"): title = a.text if not title: continue title = title.strip() href = a.get("href") if not href: continue href = href.strip() if href.startswith("//"): href = "https:" + href docs.append(Document(kind, title, href)) return docs def _handle_product_table(self, origin_url, tree: html, res: DigikeySearchResponse): products = tree.xpath("//*[@itemtype='http://schema.org/Product']") for product in products: url = _first((a.get("href") for a in product.xpath(".//*[@class='tr-image']//a[@href]"))) url = self.ensure_absolute_url(origin_url, url) part_number = _first(product.xpath(".//*[@itemprop='productid' and @content]")) mpn = _first(product.xpath(".//*[@itemprop='name']")) if part_number is not None and mpn is not None: res.append(DigikeyProduct( part_number.get("content").strip().replace('sku:', ''), mpn.text, url)) return len(products) def _handle_exact_part_list(self, origin_url, tree: html, res: DigikeySearchResponse): products = tree.xpath(".//tr[@class='exactPart']") for product in products: a = _first((a for a in product.xpath(".//td/span/a[@href]"))) part_number = _first(product.xpath(".//span[last()]")) if a is not None and part_number is not None: url = a.get("href") url = self.ensure_absolute_url(origin_url, url) mpn = a.text res.append(DigikeyProduct(part_number.text, mpn, url)) return len(products) def parse_string(self, origin_url, page_content: str): tree = html.fromstring(page_content) count = _first([_parse_int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]) if count: product_table = _first(tree.xpath("//table[@id='productTable']")) exact_part_list = _first(tree.xpath("//table[@id='exactPartList']")) if product_table is not None: res = DigikeySearchResponse(count, SearchResponseTypes.MANY) self._handle_product_table(origin_url, product_table, res) return res elif exact_part_list is not None: res = DigikeySearchResponse(count, SearchResponseTypes.MANY) self._handle_exact_part_list(origin_url, exact_part_list, res) return res else: # If the search matches multiple product categories the user has to select the appropriate category # first return DigikeySearchResponse(count, SearchResponseTypes.TOO_MANY) else: p = self._search_process_single_result(origin_url, tree) if p: res = DigikeySearchResponse(1, SearchResponseTypes.SINGLE) res.append(p) return res else: return DigikeySearchResponse(1, SearchResponseTypes.NO_MATCHES) class DigikeyRepository(object): def __init__(self, digikey, path): self._digikey = digikey self._path = path self._products = {} def _mpn_to_path(self, mpn): mpn = mpn.replace("/", "_").replace(" ", "_") return "{}/{}.ini".format(self._path, mpn) def has_product(self, mpn=None, dpn=None): if mpn is not None: filename = self._mpn_to_path(mpn) return os.path.isfile(filename) if dpn is not None: for p in self.products: if p.part_number == dpn: return p def save(self, product: DigikeyProduct): c = self._make_configparser() y = product.to_ini(c) filename = self._mpn_to_path(product.mpn) mk_parents(filename) with open(filename, "w") as f: y.write(f) def load_all(self): [self._load(path) for path in glob.glob(self._path + "/*.ini")] @staticmethod def _make_configparser(): c = configparser.ConfigParser() c.optionxform = str return c def _load(self, path): c = self._make_configparser() c.read(path) p = DigikeyProduct.from_ini(self._digikey, c) self._products[p.mpn] = p return p @property def products(self): self.load_all() return self._products.values() def find_by_mpn(self, mpn): for p in self.products: if p.mpn == mpn: return p def to_pandas(self): import pandas data = [part._to_pandas_dict() for part in self.products] index = [part.mpn for part in self.products] df = pandas.DataFrame(data=data, index=index) return ee._utils.ensure_has_columns(df, ["MPN", "Digi-Key", "URL"])