aboutsummaryrefslogtreecommitdiff
path: root/src/ee/digikey
diff options
context:
space:
mode:
Diffstat (limited to 'src/ee/digikey')
-rw-r--r--src/ee/digikey/__init__.py240
1 files changed, 240 insertions, 0 deletions
diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py
new file mode 100644
index 0000000..a2ccd03
--- /dev/null
+++ b/src/ee/digikey/__init__.py
@@ -0,0 +1,240 @@
+import re
+from functools import total_ordering
+
+import requests
+from cachecontrol import CacheControl
+from cachecontrol import CacheControlAdapter
+from cachecontrol.caches.file_cache import FileCache
+from cachecontrol.heuristics import ExpiresAfter
+from lxml import html
+from typing import List
+
+
+def normalize_filename(part):
+ return part.replace('/', '_').replace(' ', '_')
+
+
+def _clean(s):
+ if s is None:
+ return None
+ s = s.strip()
+ return None if len(s) == 0 else s
+
+
+def _to_string(e):
+ s = ""
+ for t in e.itertext():
+ s += t
+ return s.strip()
+
+
+def _to_int(s):
+ try:
+ return int(s)
+ except ValueError:
+ return None
+
+
+def _id_from_url(url):
+ if url is None:
+ return None
+ m = re.search(r".*/([0-9]+)", url)
+ return m.group(1) if m else None
+
+
+class Digikey(object):
+ def __init__(self):
+ self.attribute_types = {}
+
+ def get_attribute_type(self, id, label):
+ try:
+ return self.attribute_types[id]
+ except KeyError:
+ a = DigikeyAttributeType(id, label)
+ self.attribute_types[id] = a
+ return a
+
+
+@total_ordering
+class DigikeyProduct(object):
+ def __init__(self, part_number, mpn, attributes, categories):
+ self.part_number = _clean(part_number)
+ self.mpn = _clean(mpn)
+ self.attributes = attributes
+ self.categories = categories
+ self.quantity_available = None
+ self.description = None
+
+ assert self.part_number
+ assert self.mpn
+
+ def __eq__(self, other):
+ # type: (DigikeyProduct, DigikeyProduct) -> bool
+ return self.part_number == other.part_number
+
+ def __lt__(self, other):
+ # type: (DigikeyProduct, DigikeyProduct) -> bool
+ return self.part_number < other.part_number
+
+ def __hash__(self):
+ return self.part_number.__hash__()
+
+ def to_yaml(self):
+ yaml = {"part_number": self.part_number}
+ if self.mpn:
+ yaml["mpn"] = self.mpn
+ yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value}
+ for a in self.attributes]
+ return yaml
+
+
+class DigikeyAttributeType(object):
+ def __init__(self, id, label):
+ self.id = id
+ self.label = label
+
+ assert self.id
+ assert self.label
+
+
+class DigikeyAttributeValue(object):
+ def __init__(self, value, attribute_type):
+ self.value = value
+ self.attribute_type = attribute_type
+
+ assert self.value
+ assert self.attribute_type
+
+
+@total_ordering
+class DigikeyProductCategory(object):
+ def __init__(self, id, label, digikey_url=None, parent=None):
+ self.id = _clean(id)
+ self.label = _clean(label)
+ self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \
+ "https://www.digikey.com" + digikey_url
+ self.parent = parent # type: DigikeyProductCategory
+ self.subCategories = [] # type: List[DigikeyProductCategory
+
+ assert self.id is not None
+ assert self.label is not None
+
+ def __eq__(self, other):
+ # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool
+ return self.id == other.id
+
+ def __lt__(self, other):
+ # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool
+ return self.label < other.label
+
+ def add_sub_category(self, id, label, digikey_url):
+ sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self)
+ self.subCategories.append(sc)
+
+ def find_sub_category_by_label(self, label):
+ return next((sc for sc in self.subCategories if sc.label == label), None)
+
+
+class DigikeySearchResponse(object):
+ def __init__(self):
+ self.products = set()
+
+ def append(self, product):
+ self.products.add(product)
+
+
+class DigikeyClient(object):
+ def __nop(self):
+ pass
+
+ def __init__(self, digikey: Digikey, on_download=None):
+ self.digikey = digikey
+ self.on_download = on_download or self.__nop
+ cache = FileCache('digikey_cache', forever=True)
+ self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1))
+
+ # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1))
+ # self.sess = requests.Session()
+ # self.sess.mount('http://', adapter)
+ # self.sess.mount('https://', adapter)
+
+ def req(self, url, params=None):
+ if not url.startswith("http"):
+ url = "https://www.digikey.com" + url
+ s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()])
+ self.on_download("Downloading {}".format(url + s))
+ return self.sess.get(url, params=params)
+
+ def _search_process_single_result(self, url: str, tree: html) -> DigikeyProduct:
+ attributes = []
+ categories = []
+
+ part_number = mpn = None
+ for n in tree.xpath("//*[@itemprop='productID' and @content]"):
+ part_number = n.get("content")
+ part_number = part_number.replace('sku:', '')
+ for n in tree.xpath("//*[@itemprop='name' and @content]"):
+ mpn = n.get("content")
+
+ for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"):
+ tds = tr.xpath("th|td")
+ if len(tds) != 3:
+ continue
+ label = tds[0].text.strip()
+ value = tds[1].text.strip()
+
+ if len(label) == 0 or len(value) == 0:
+ continue
+
+ checkbox = tds[2].xpath("input[@type='checkbox' and @name]")
+ try:
+ name = checkbox[0].get("name")
+ attribute_type_id = _to_int(name.replace('pv', ''))
+ except IndexError:
+ continue
+
+ if attribute_type_id:
+ a_type = self.digikey.get_attribute_type(attribute_type_id, label)
+ attributes.append(DigikeyAttributeValue(value, a_type))
+
+ if part_number and mpn:
+ p = DigikeyProduct(part_number, mpn, attributes, categories)
+ for n in tree.xpath("//*[@itemprop='description']"):
+ p.description = _to_string(n)
+ return p
+
+ return None
+
+ def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse):
+
+ product_ids = [e.get("content").strip().replace('sku:', '') for e in
+ tree.xpath("//*[@itemprop='productid' and @content]")]
+
+ for product_id in product_ids:
+ tmp = self.search(product_id)
+ if isinstance(tmp, DigikeyProduct):
+ res.append(tmp)
+ else:
+ [res.append(p) for p in tmp.products]
+
+ return len(product_ids)
+
+ def search(self, query: str) -> DigikeySearchResponse:
+ page_size = 10
+
+ # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND
+ params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)}
+ page = self.req("https://www.digikey.com/products/en", params=params)
+ # print("page: ")
+ # print(page.content)
+
+ tree = html.fromstring(page.content)
+
+ count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0)
+
+ if count == 0:
+ return self._search_process_single_result(page.url, tree)
+ else:
+ res = DigikeySearchResponse()
+ self._search_process_multiple_results(tree, res)
+ return res