aboutsummaryrefslogtreecommitdiff
path: root/src/ee/digikey
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2019-02-24 21:51:38 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2019-02-24 21:51:38 +0100
commit80e0623913e87c6480049520590e424a831e0401 (patch)
treeff27e1d269cac886dd06ab4f9924719f84794e38 /src/ee/digikey
parent8aae5d032dd30118b6d992018391a8bd5be759e4 (diff)
downloadee-python-80e0623913e87c6480049520590e424a831e0401.tar.gz
ee-python-80e0623913e87c6480049520590e424a831e0401.tar.bz2
ee-python-80e0623913e87c6480049520590e424a831e0401.tar.xz
ee-python-80e0623913e87c6480049520590e424a831e0401.zip
Digikey: replacing requests-based code with selenium.
Adding new tools: digikey-import-parts and digikey-refresh-parts.
Diffstat (limited to 'src/ee/digikey')
-rw-r--r--src/ee/digikey/__init__.py79
-rw-r--r--src/ee/digikey/import_parts.py100
-rw-r--r--src/ee/digikey/refresh_parts.py97
3 files changed, 252 insertions, 24 deletions
diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py
index 615d458..32308e5 100644
--- a/src/ee/digikey/__init__.py
+++ b/src/ee/digikey/__init__.py
@@ -6,13 +6,11 @@ import os.path
import re
import urllib.parse
from functools import total_ordering
+from pathlib import Path
from typing import List, Optional
-import requests
-from cachecontrol import CacheControl
-from cachecontrol.caches.file_cache import FileCache
-from cachecontrol.heuristics import ExpiresAfter
from lxml import html
+from selenium import webdriver
import ee._utils
from ee.tools import mk_parents
@@ -73,11 +71,11 @@ class Digikey(object):
@total_ordering
class DigikeyProduct(object):
- def __init__(self, part_number, mpn, url, attributes=None, categories=None):
+ def __init__(self, part_number, mpn, url, attributes: List["DigikeyAttributeValue"] = None, categories=None):
self.part_number = _clean(part_number)
self.mpn = _clean(mpn)
self.url = url
- self.attributes = attributes or []
+ self.attributes = attributes or [] # type: List["DigikeyAttributeValue"]
self.categories = categories or []
self.quantity_available = None
self.description = None
@@ -156,7 +154,7 @@ class DigikeyAttributeType(object):
class DigikeyAttributeValue(object):
- def __init__(self, value, attribute_type):
+ def __init__(self, value, attribute_type: DigikeyAttributeType):
self.value = value
self.attribute_type = attribute_type
@@ -171,8 +169,8 @@ class DigikeyProductCategory(object):
self.label = _clean(label)
self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \
"https://www.digikey.com" + digikey_url
- self.parent = parent # type: DigikeyProductCategory
- self.subCategories = [] # type: List[DigikeyProductCategory]
+ self.parent: DigikeyProductCategory = parent
+ self.subCategories: List[DigikeyProductCategory] = []
assert self.id
assert self.label
@@ -203,7 +201,7 @@ class DigikeySearchResponse(object):
self.count = count
self.response_type = response_type
- self.products = list() # type: List[DigikeyProduct]
+ self.products: List[DigikeyProduct] = list()
def append(self, product: DigikeyProduct):
self.products.append(product)
@@ -213,18 +211,57 @@ class DigikeyClient(object):
def __nop(self, message):
pass
- def __init__(self, digikey: Digikey, cache_dir=None, on_download=None):
- self.digikey = digikey
+ def __init__(self, cache_dir: Path = None, on_download=None):
self.on_download = on_download or self.__nop
- cache = FileCache(cache_dir or 'digikey_cache', forever=True)
- self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=10*365))
+ self.cache_dir = cache_dir or Path()
+ self.driver: webdriver.Chrome = None
- def _req(self, url, params=None):
+ def search(self, query: str, page_size=10) -> str:
+ return self.product_search(query, page_size)
+
+ def product_search(self, query: str, page_size=10) -> str:
+ params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)}
+ cache_key = urllib.parse.quote(query)
+ page = self._req("https://www.digikey.com/products/en", cache_key=cache_key, params=params)
+
+ return page
+
+ def _req(self, url, cache_key, params=None):
if not url.startswith("http"):
url = "https://www.digikey.com" + url
- s = "" if not params else "?" + urllib.parse.urlencode(params)
- self.on_download("Downloading {}".format(url + s))
- return self.sess.get(url, params=params)
+ url = url + ("" if not params else "?" + urllib.parse.urlencode(params))
+
+ cache_path: Optional[Path] = None
+ if self.cache_dir:
+ cache_path = self.cache_dir / "{}.html".format(cache_key)
+
+ if cache_path.exists():
+ self.on_download("Using cached {}".format(url))
+ with open(str(cache_path), "r") as f:
+ return f.read()
+
+ self.on_download("Downloading {}".format(url))
+
+ if self.driver is None:
+ options = webdriver.ChromeOptions()
+ self.driver = webdriver.Chrome(chrome_options=options)
+
+ self.driver.get(url)
+
+ src = self.driver.page_source
+ if cache_path:
+ cache_path.parent.mkdir(parents=True, exist_ok=True)
+
+ with open(str(cache_path), "w") as f:
+ f.write(src)
+ assert self.cache_dir.stat().st_size > 0
+
+ return src
+
+
+class DigikeyParser(object):
+ def __init__(self, digikey: Digikey):
+ self.digikey = digikey or Digikey()
def _search_process_single_result(self, tree: html) -> Optional[DigikeyProduct]:
attributes = []
@@ -300,12 +337,6 @@ class DigikeyClient(object):
return len(products)
- def search(self, query: str, page_size=10) -> DigikeySearchResponse:
- params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)}
- page = self._req("https://www.digikey.com/products/en", params=params)
-
- return self.parse_string(page.content)
-
def parse_string(self, page_content: str):
tree = html.fromstring(page_content)
diff --git a/src/ee/digikey/import_parts.py b/src/ee/digikey/import_parts.py
new file mode 100644
index 0000000..748bbef
--- /dev/null
+++ b/src/ee/digikey/import_parts.py
@@ -0,0 +1,100 @@
+import os
+from typing import List, MutableMapping
+
+from ee.xml import bomFile
+from ee.xml.bom_file_utils import *
+from ee.xml.uris import DIGIKEY_URI
+
+__all__ = ["import_parts"]
+
+
+class Entry(object):
+ def __init__(self, new: bool, part: bomFile.Part):
+ self.new = new
+ self.part = part
+
+ self.pn = find_pn(part)
+ self.dpn = find_dpn(part, DIGIKEY_URI)
+
+
+def import_parts(in_path, out_path):
+ print("in: {}, out: {}".format(in_path, out_path))
+
+ in_file = bomFile.parse(in_path, True)
+ if in_file.partsProp is None:
+ in_file.partsProp = bomFile.PartList()
+ in_part_list = in_file.partsProp.partProp # type: List[bomFile.Part]
+
+ print("in file: {} parts".format(len(in_part_list)))
+
+ if os.path.isfile(out_path):
+ out_file = bomFile.parse(out_path, True)
+ else:
+ out_file = bomFile.BomFile()
+
+ if out_file.partsProp is None:
+ out_file.partsProp = bomFile.PartList()
+ out_part_list = out_file.partsProp.partProp # type: List[bomFile.Part]
+ print("out file: {} parts".format(len(out_part_list)))
+
+ existing_parts = [] # type: List[Entry]
+ pn_index = {} # type: MutableMapping[str, Entry]
+ dpn_index = {} # type: MutableMapping[str, Entry]
+ new_entry_added = 0
+
+ def add_entry(e: Entry):
+ existing_parts.append(e)
+ pn_index[e.pn] = e
+ dpn_index[e.dpn] = e
+
+ if e.new:
+ out_part_list.append(e.part)
+ nonlocal new_entry_added
+ new_entry_added = new_entry_added + 1
+
+ print("len(out_part_list)={}".format(len(out_part_list)))
+ for part in out_part_list: # type: bomFile.Part
+ entry = Entry(False, part)
+ add_entry(entry)
+
+ print("loaded {} existing parts".format(len(existing_parts)))
+
+ for part in in_part_list:
+ pn_value = find_pn(part)
+
+ if pn_value is None:
+ print("Skipping part with no part number: id={}".format(part.idProp))
+ continue
+
+ entry = pn_index.get(pn_value)
+
+ if entry is not None:
+ print("Already imported pn_value={}".format(pn_value))
+ continue
+
+ print("Importing {}".format(pn_value))
+
+ pns = bomFile.PartNumberList()
+
+ if pn_value is not None:
+ pns.add_part_number(bomFile.PartNumber(value=pn_value))
+
+ dpn_value = find_dpn(part, DIGIKEY_URI)
+ if dpn_value is not None:
+ pns.add_part_number(bomFile.PartNumber(value=dpn_value, distributor=DIGIKEY_URI))
+
+ if len(pns.part_numberProp) == 0:
+ continue
+
+ new_part = bomFile.Part(part_numbers=pns)
+ entry = Entry(True, new_part)
+ add_entry(entry)
+
+ if new_entry_added:
+ print("Imported {} entries".format(new_entry_added))
+ tmp_path = out_path + ".tmp"
+ with open(tmp_path, "w") as f:
+ out_file.export(f, 0, name_="bom-file")
+ os.rename(tmp_path, out_path)
+ else:
+ print("no new entries")
diff --git a/src/ee/digikey/refresh_parts.py b/src/ee/digikey/refresh_parts.py
new file mode 100644
index 0000000..87edf2f
--- /dev/null
+++ b/src/ee/digikey/refresh_parts.py
@@ -0,0 +1,97 @@
+import os
+from pathlib import Path
+from typing import List
+
+from ee.digikey import Digikey, DigikeyParser, DigikeyClient, SearchResponseTypes, DigikeyProduct
+from ee.xml import bomFile, bom_file_utils
+from ee.xml.bomFile import DigikeyDistributorInfo
+from ee.xml.uris import DIGIKEY_URI
+
+__all__ = ["refresh_parts"]
+
+
+def resolved(di: DigikeyDistributorInfo, part: bomFile.Part, p: DigikeyProduct):
+ di.stateProp = "resolved"
+
+ fact_set = bom_file_utils.find_fact_set(part, DIGIKEY_URI, create=True)
+
+ # Remove the old list
+ fact_set.factsProp = bomFile.FactList()
+ facts: List[bomFile.Fact] = fact_set.factsProp.factProp
+
+ for a in p.attributes:
+ facts.append(bomFile.Fact(key=a.attribute_type.id, label=a.attribute_type.label, value=a.value))
+
+
+def refresh_parts(in_path: Path, out_path: Path, cache_dir: Path, force_refresh: bool):
+ print("in: {}, out: {}".format(in_path, out_path))
+
+ in_file = bomFile.parse(str(in_path), True)
+ if in_file.partsProp is None:
+ in_file.partsProp = bomFile.PartList()
+
+ parser = DigikeyParser(Digikey())
+ client = DigikeyClient(cache_dir)
+
+ for part in in_file.partsProp.partProp: # type: bomFile.Part
+ dpn = bom_file_utils.find_dpn(part, DIGIKEY_URI)
+ mpn = bom_file_utils.find_pn(part)
+
+ is_mpn = query = None
+
+ if dpn is not None:
+ query = dpn
+ is_mpn = False
+ elif mpn is not None:
+ query = mpn
+ is_mpn = True
+
+ if query is None:
+ print("could not find pn or dpn: part.id={}".format(part.idProp))
+ continue
+
+ di = part.distributor_infoProp # type: DigikeyDistributorInfo
+
+ if di is None:
+ di = bomFile.DigikeyDistributorInfo()
+ di.extensiontype_ = "DigikeyDistributorInfo"
+ di.original_tagname_ = "distributor-info"
+ part.distributor_infoProp = di
+
+ if force_refresh or di.stateProp != "resolved":
+ text = client.search(query)
+ response = parser.parse_string(text)
+
+ if response.response_type == SearchResponseTypes.SINGLE:
+ resolved(di, part, response.products[0])
+ elif response.response_type == SearchResponseTypes.MANY:
+
+ # find those with an exact match. Digikey uses a prefix search so a query for "FOO" will return "FOO"
+ # and "FOOT".
+ def get_field(p):
+ return p.mpn if is_mpn else p.part_number
+
+ filtered_products = [p for p in response.products if get_field(p) == query]
+
+ if len(filtered_products) == 0:
+ di.stateProp = "not-found"
+ else:
+ dpn = sorted(filtered_products, key=lambda p: p.part_number)[0].part_number
+
+ response = parser.parse_string(client.search(dpn))
+ if response.response_type == SearchResponseTypes.SINGLE:
+ resolved(di, part, response.products[0])
+ else:
+ di.stateProp = "many"
+
+ elif response.response_type == SearchResponseTypes.TOO_MANY:
+ di.stateProp = "too-many"
+ elif response.response_type == SearchResponseTypes.NO_MATCHES:
+ di.stateProp = "not-found"
+
+ out_path = in_path
+ out_file = in_file
+ tmp_path = str(out_path) + ".tmp"
+ with open(tmp_path, "w") as f:
+ out_file.export(f, 0, name_="bom-file")
+ os.rename(tmp_path, str(out_path))