From 80e0623913e87c6480049520590e424a831e0401 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 24 Feb 2019 21:51:38 +0100 Subject: Digikey: replacing requests-based code with selenium. Adding new tools: digikey-import-parts and digikey-refresh-parts. --- src/ee/digikey/__init__.py | 79 +++++++++++++++++++++---------- src/ee/digikey/import_parts.py | 100 ++++++++++++++++++++++++++++++++++++++++ src/ee/digikey/refresh_parts.py | 97 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 252 insertions(+), 24 deletions(-) create mode 100644 src/ee/digikey/import_parts.py create mode 100644 src/ee/digikey/refresh_parts.py (limited to 'src/ee/digikey') diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py index 615d458..32308e5 100644 --- a/src/ee/digikey/__init__.py +++ b/src/ee/digikey/__init__.py @@ -6,13 +6,11 @@ import os.path import re import urllib.parse from functools import total_ordering +from pathlib import Path from typing import List, Optional -import requests -from cachecontrol import CacheControl -from cachecontrol.caches.file_cache import FileCache -from cachecontrol.heuristics import ExpiresAfter from lxml import html +from selenium import webdriver import ee._utils from ee.tools import mk_parents @@ -73,11 +71,11 @@ class Digikey(object): @total_ordering class DigikeyProduct(object): - def __init__(self, part_number, mpn, url, attributes=None, categories=None): + def __init__(self, part_number, mpn, url, attributes: List["DigikeyAttributeValue"] = None, categories=None): self.part_number = _clean(part_number) self.mpn = _clean(mpn) self.url = url - self.attributes = attributes or [] + self.attributes = attributes or [] # type: List["DigikeyAttributeValue"] self.categories = categories or [] self.quantity_available = None self.description = None @@ -156,7 +154,7 @@ class DigikeyAttributeType(object): class DigikeyAttributeValue(object): - def __init__(self, value, attribute_type): + def __init__(self, value, attribute_type: DigikeyAttributeType): self.value = value self.attribute_type = attribute_type @@ -171,8 +169,8 @@ class DigikeyProductCategory(object): self.label = _clean(label) self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ "https://www.digikey.com" + digikey_url - self.parent = parent # type: DigikeyProductCategory - self.subCategories = [] # type: List[DigikeyProductCategory] + self.parent: DigikeyProductCategory = parent + self.subCategories: List[DigikeyProductCategory] = [] assert self.id assert self.label @@ -203,7 +201,7 @@ class DigikeySearchResponse(object): self.count = count self.response_type = response_type - self.products = list() # type: List[DigikeyProduct] + self.products: List[DigikeyProduct] = list() def append(self, product: DigikeyProduct): self.products.append(product) @@ -213,18 +211,57 @@ class DigikeyClient(object): def __nop(self, message): pass - def __init__(self, digikey: Digikey, cache_dir=None, on_download=None): - self.digikey = digikey + def __init__(self, cache_dir: Path = None, on_download=None): self.on_download = on_download or self.__nop - cache = FileCache(cache_dir or 'digikey_cache', forever=True) - self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=10*365)) + self.cache_dir = cache_dir or Path() + self.driver: webdriver.Chrome = None - def _req(self, url, params=None): + def search(self, query: str, page_size=10) -> str: + return self.product_search(query, page_size) + + def product_search(self, query: str, page_size=10) -> str: + params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} + cache_key = urllib.parse.quote(query) + page = self._req("https://www.digikey.com/products/en", cache_key=cache_key, params=params) + + return page + + def _req(self, url, cache_key, params=None): if not url.startswith("http"): url = "https://www.digikey.com" + url - s = "" if not params else "?" + urllib.parse.urlencode(params) - self.on_download("Downloading {}".format(url + s)) - return self.sess.get(url, params=params) + url = url + ("" if not params else "?" + urllib.parse.urlencode(params)) + + cache_path: Optional[Path] = None + if self.cache_dir: + cache_path = self.cache_dir / "{}.html".format(cache_key) + + if cache_path.exists(): + self.on_download("Using cached {}".format(url)) + with open(str(cache_path), "r") as f: + return f.read() + + self.on_download("Downloading {}".format(url)) + + if self.driver is None: + options = webdriver.ChromeOptions() + self.driver = webdriver.Chrome(chrome_options=options) + + self.driver.get(url) + + src = self.driver.page_source + if cache_path: + cache_path.parent.mkdir(parents=True, exist_ok=True) + + with open(str(cache_path), "w") as f: + f.write(src) + assert self.cache_dir.stat().st_size > 0 + + return src + + +class DigikeyParser(object): + def __init__(self, digikey: Digikey): + self.digikey = digikey or Digikey() def _search_process_single_result(self, tree: html) -> Optional[DigikeyProduct]: attributes = [] @@ -300,12 +337,6 @@ class DigikeyClient(object): return len(products) - def search(self, query: str, page_size=10) -> DigikeySearchResponse: - params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} - page = self._req("https://www.digikey.com/products/en", params=params) - - return self.parse_string(page.content) - def parse_string(self, page_content: str): tree = html.fromstring(page_content) diff --git a/src/ee/digikey/import_parts.py b/src/ee/digikey/import_parts.py new file mode 100644 index 0000000..748bbef --- /dev/null +++ b/src/ee/digikey/import_parts.py @@ -0,0 +1,100 @@ +import os +from typing import List, MutableMapping + +from ee.xml import bomFile +from ee.xml.bom_file_utils import * +from ee.xml.uris import DIGIKEY_URI + +__all__ = ["import_parts"] + + +class Entry(object): + def __init__(self, new: bool, part: bomFile.Part): + self.new = new + self.part = part + + self.pn = find_pn(part) + self.dpn = find_dpn(part, DIGIKEY_URI) + + +def import_parts(in_path, out_path): + print("in: {}, out: {}".format(in_path, out_path)) + + in_file = bomFile.parse(in_path, True) + if in_file.partsProp is None: + in_file.partsProp = bomFile.PartList() + in_part_list = in_file.partsProp.partProp # type: List[bomFile.Part] + + print("in file: {} parts".format(len(in_part_list))) + + if os.path.isfile(out_path): + out_file = bomFile.parse(out_path, True) + else: + out_file = bomFile.BomFile() + + if out_file.partsProp is None: + out_file.partsProp = bomFile.PartList() + out_part_list = out_file.partsProp.partProp # type: List[bomFile.Part] + print("out file: {} parts".format(len(out_part_list))) + + existing_parts = [] # type: List[Entry] + pn_index = {} # type: MutableMapping[str, Entry] + dpn_index = {} # type: MutableMapping[str, Entry] + new_entry_added = 0 + + def add_entry(e: Entry): + existing_parts.append(e) + pn_index[e.pn] = e + dpn_index[e.dpn] = e + + if e.new: + out_part_list.append(e.part) + nonlocal new_entry_added + new_entry_added = new_entry_added + 1 + + print("len(out_part_list)={}".format(len(out_part_list))) + for part in out_part_list: # type: bomFile.Part + entry = Entry(False, part) + add_entry(entry) + + print("loaded {} existing parts".format(len(existing_parts))) + + for part in in_part_list: + pn_value = find_pn(part) + + if pn_value is None: + print("Skipping part with no part number: id={}".format(part.idProp)) + continue + + entry = pn_index.get(pn_value) + + if entry is not None: + print("Already imported pn_value={}".format(pn_value)) + continue + + print("Importing {}".format(pn_value)) + + pns = bomFile.PartNumberList() + + if pn_value is not None: + pns.add_part_number(bomFile.PartNumber(value=pn_value)) + + dpn_value = find_dpn(part, DIGIKEY_URI) + if dpn_value is not None: + pns.add_part_number(bomFile.PartNumber(value=dpn_value, distributor=DIGIKEY_URI)) + + if len(pns.part_numberProp) == 0: + continue + + new_part = bomFile.Part(part_numbers=pns) + entry = Entry(True, new_part) + add_entry(entry) + + if new_entry_added: + print("Imported {} entries".format(new_entry_added)) + tmp_path = out_path + ".tmp" + with open(tmp_path, "w") as f: + out_file.export(f, 0, name_="bom-file") + os.rename(tmp_path, out_path) + else: + print("no new entries") diff --git a/src/ee/digikey/refresh_parts.py b/src/ee/digikey/refresh_parts.py new file mode 100644 index 0000000..87edf2f --- /dev/null +++ b/src/ee/digikey/refresh_parts.py @@ -0,0 +1,97 @@ +import os +from pathlib import Path +from typing import List + +from ee.digikey import Digikey, DigikeyParser, DigikeyClient, SearchResponseTypes, DigikeyProduct +from ee.xml import bomFile, bom_file_utils +from ee.xml.bomFile import DigikeyDistributorInfo +from ee.xml.uris import DIGIKEY_URI + +__all__ = ["refresh_parts"] + + +def resolved(di: DigikeyDistributorInfo, part: bomFile.Part, p: DigikeyProduct): + di.stateProp = "resolved" + + fact_set = bom_file_utils.find_fact_set(part, DIGIKEY_URI, create=True) + + # Remove the old list + fact_set.factsProp = bomFile.FactList() + facts: List[bomFile.Fact] = fact_set.factsProp.factProp + + for a in p.attributes: + facts.append(bomFile.Fact(key=a.attribute_type.id, label=a.attribute_type.label, value=a.value)) + + +def refresh_parts(in_path: Path, out_path: Path, cache_dir: Path, force_refresh: bool): + print("in: {}, out: {}".format(in_path, out_path)) + + in_file = bomFile.parse(str(in_path), True) + if in_file.partsProp is None: + in_file.partsProp = bomFile.PartList() + + parser = DigikeyParser(Digikey()) + client = DigikeyClient(cache_dir) + + for part in in_file.partsProp.partProp: # type: bomFile.Part + dpn = bom_file_utils.find_dpn(part, DIGIKEY_URI) + mpn = bom_file_utils.find_pn(part) + + is_mpn = query = None + + if dpn is not None: + query = dpn + is_mpn = False + elif mpn is not None: + query = mpn + is_mpn = True + + if query is None: + print("could not find pn or dpn: part.id={}".format(part.idProp)) + continue + + di = part.distributor_infoProp # type: DigikeyDistributorInfo + + if di is None: + di = bomFile.DigikeyDistributorInfo() + di.extensiontype_ = "DigikeyDistributorInfo" + di.original_tagname_ = "distributor-info" + part.distributor_infoProp = di + + if force_refresh or di.stateProp != "resolved": + text = client.search(query) + response = parser.parse_string(text) + + if response.response_type == SearchResponseTypes.SINGLE: + resolved(di, part, response.products[0]) + elif response.response_type == SearchResponseTypes.MANY: + + # find those with an exact match. Digikey uses a prefix search so a query for "FOO" will return "FOO" + # and "FOOT". + def get_field(p): + return p.mpn if is_mpn else p.part_number + + filtered_products = [p for p in response.products if get_field(p) == query] + + if len(filtered_products) == 0: + di.stateProp = "not-found" + else: + dpn = sorted(filtered_products, key=lambda p: p.part_number)[0].part_number + + response = parser.parse_string(client.search(dpn)) + if response.response_type == SearchResponseTypes.SINGLE: + resolved(di, part, response.products[0]) + else: + di.stateProp = "many" + + elif response.response_type == SearchResponseTypes.TOO_MANY: + di.stateProp = "too-many" + elif response.response_type == SearchResponseTypes.NO_MATCHES: + di.stateProp = "not-found" + + out_path = in_path + out_file = in_file + tmp_path = str(out_path) + ".tmp" + with open(tmp_path, "w") as f: + out_file.export(f, 0, name_="bom-file") + os.rename(tmp_path, str(out_path)) -- cgit v1.2.3