From d59fb211556cd9b5a2bc028c5cf8a37b891cbfb3 Mon Sep 17 00:00:00 2001
From: Trygve Laugstøl <trygvis@inamo.no>
Date: Sun, 3 Sep 2017 11:21:17 +0200
Subject: o Adding tools to download facts about parts from Digi-Key.

---
 src/ee/__main__.py                     |  53 ++++++++
 src/ee/digikey/__init__.py             | 240 +++++++++++++++++++++++++++++++++
 src/ee/tools/digikey_download_facts.py |  59 ++++++++
 3 files changed, 352 insertions(+)
 create mode 100644 src/ee/__main__.py
 create mode 100644 src/ee/digikey/__init__.py
 create mode 100644 src/ee/tools/digikey_download_facts.py

(limited to 'src/ee')

diff --git a/src/ee/__main__.py b/src/ee/__main__.py
new file mode 100644
index 0000000..75688c0
--- /dev/null
+++ b/src/ee/__main__.py
@@ -0,0 +1,53 @@
+from functools import total_ordering
+import ee.tools
+import importlib
+import logging
+import pkgutil
+import sys
+
+def eprint(*args, **kwargs):
+  print(*args, file=sys.stderr, **kwargs)
+
+@total_ordering
+class Tool(object):
+  def __init__(self, module_name, name):
+    self.module_name = module_name
+    self.name = name
+
+  def __eq__(self, other):
+    return self.name == other.name
+
+  def __lt__(self, other):
+    return self.name < other.name
+
+def find_tools():
+  prefix = ee.tools.__name__ + '.'
+  ps = pkgutil.walk_packages(ee.tools.__path__, prefix)
+  tools = []
+  for (module_loader, module_name, ispkg) in ps:
+    name = module_name.replace(prefix, '').replace('_', '-')
+    tools.append(Tool(module_name, name))
+  return sorted(tools)
+
+if __name__ == "__main__":
+
+  logging.basicConfig() # you need to initialize logging, otherwise you will not see anything from requests
+  logging.getLogger().setLevel(logging.DEBUG)
+  requests_log = logging.getLogger("requests.packages.urllib3")
+  requests_log.setLevel(logging.DEBUG)
+  requests_log.propagate = True
+
+  tools = find_tools()
+
+  name = sys.argv[1]
+  del sys.argv[1]
+
+  for t in tools:
+    if t.name != name:
+      continue
+    sys.argv[0] = t.name
+    importlib.import_module(t.module_name)
+    exit(0)
+
+  eprint("No such tool: {}".format(name))
+  exit(1)
diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py
new file mode 100644
index 0000000..a2ccd03
--- /dev/null
+++ b/src/ee/digikey/__init__.py
@@ -0,0 +1,240 @@
+import re
+from functools import total_ordering
+
+import requests
+from cachecontrol import CacheControl
+from cachecontrol import CacheControlAdapter
+from cachecontrol.caches.file_cache import FileCache
+from cachecontrol.heuristics import ExpiresAfter
+from lxml import html
+from typing import List
+
+
+def normalize_filename(part):
+  return part.replace('/', '_').replace(' ', '_')
+
+
+def _clean(s):
+  if s is None:
+    return None
+  s = s.strip()
+  return None if len(s) == 0 else s
+
+
+def _to_string(e):
+  s = ""
+  for t in e.itertext():
+    s += t
+  return s.strip()
+
+
+def _to_int(s):
+  try:
+    return int(s)
+  except ValueError:
+    return None
+
+
+def _id_from_url(url):
+  if url is None:
+    return None
+  m = re.search(r".*/([0-9]+)", url)
+  return m.group(1) if m else None
+
+
+class Digikey(object):
+  def __init__(self):
+    self.attribute_types = {}
+
+  def get_attribute_type(self, id, label):
+    try:
+      return self.attribute_types[id]
+    except KeyError:
+      a = DigikeyAttributeType(id, label)
+      self.attribute_types[id] = a
+      return a
+
+
+@total_ordering
+class DigikeyProduct(object):
+  def __init__(self, part_number, mpn, attributes, categories):
+    self.part_number = _clean(part_number)
+    self.mpn = _clean(mpn)
+    self.attributes = attributes
+    self.categories = categories
+    self.quantity_available = None
+    self.description = None
+
+    assert self.part_number
+    assert self.mpn
+
+  def __eq__(self, other):
+    # type: (DigikeyProduct, DigikeyProduct) -> bool
+    return self.part_number == other.part_number
+
+  def __lt__(self, other):
+    # type: (DigikeyProduct, DigikeyProduct) -> bool
+    return self.part_number < other.part_number
+
+  def __hash__(self):
+    return self.part_number.__hash__()
+
+  def to_yaml(self):
+    yaml = {"part_number": self.part_number}
+    if self.mpn:
+      yaml["mpn"] = self.mpn
+    yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value}
+                          for a in self.attributes]
+    return yaml
+
+
+class DigikeyAttributeType(object):
+  def __init__(self, id, label):
+    self.id = id
+    self.label = label
+
+    assert self.id
+    assert self.label
+
+
+class DigikeyAttributeValue(object):
+  def __init__(self, value, attribute_type):
+    self.value = value
+    self.attribute_type = attribute_type
+
+    assert self.value
+    assert self.attribute_type
+
+
+@total_ordering
+class DigikeyProductCategory(object):
+  def __init__(self, id, label, digikey_url=None, parent=None):
+    self.id = _clean(id)
+    self.label = _clean(label)
+    self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \
+      "https://www.digikey.com" + digikey_url
+    self.parent = parent  # type: DigikeyProductCategory
+    self.subCategories = []  # type: List[DigikeyProductCategory
+
+    assert self.id is not None
+    assert self.label is not None
+
+  def __eq__(self, other):
+    # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool
+    return self.id == other.id
+
+  def __lt__(self, other):
+    # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool
+    return self.label < other.label
+
+  def add_sub_category(self, id, label, digikey_url):
+    sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self)
+    self.subCategories.append(sc)
+
+  def find_sub_category_by_label(self, label):
+    return next((sc for sc in self.subCategories if sc.label == label), None)
+
+
+class DigikeySearchResponse(object):
+  def __init__(self):
+    self.products = set()
+
+  def append(self, product):
+    self.products.add(product)
+
+
+class DigikeyClient(object):
+  def __nop(self):
+    pass
+
+  def __init__(self, digikey: Digikey, on_download=None):
+    self.digikey = digikey
+    self.on_download = on_download or self.__nop
+    cache = FileCache('digikey_cache', forever=True)
+    self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1))
+
+    # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1))
+    # self.sess = requests.Session()
+    # self.sess.mount('http://', adapter)
+    # self.sess.mount('https://', adapter)
+
+  def req(self, url, params=None):
+    if not url.startswith("http"):
+      url = "https://www.digikey.com" + url
+    s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()])
+    self.on_download("Downloading {}".format(url + s))
+    return self.sess.get(url, params=params)
+
+  def _search_process_single_result(self, url: str, tree: html) -> DigikeyProduct:
+    attributes = []
+    categories = []
+
+    part_number = mpn = None
+    for n in tree.xpath("//*[@itemprop='productID' and @content]"):
+      part_number = n.get("content")
+      part_number = part_number.replace('sku:', '')
+    for n in tree.xpath("//*[@itemprop='name' and @content]"):
+      mpn = n.get("content")
+
+    for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"):
+      tds = tr.xpath("th|td")
+      if len(tds) != 3:
+        continue
+      label = tds[0].text.strip()
+      value = tds[1].text.strip()
+
+      if len(label) == 0 or len(value) == 0:
+        continue
+
+      checkbox = tds[2].xpath("input[@type='checkbox' and @name]")
+      try:
+        name = checkbox[0].get("name")
+        attribute_type_id = _to_int(name.replace('pv', ''))
+      except IndexError:
+        continue
+
+      if attribute_type_id:
+        a_type = self.digikey.get_attribute_type(attribute_type_id, label)
+        attributes.append(DigikeyAttributeValue(value, a_type))
+
+    if part_number and mpn:
+      p = DigikeyProduct(part_number, mpn, attributes, categories)
+      for n in tree.xpath("//*[@itemprop='description']"):
+        p.description = _to_string(n)
+      return p
+
+    return None
+
+  def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse):
+
+    product_ids = [e.get("content").strip().replace('sku:', '') for e in
+                   tree.xpath("//*[@itemprop='productid' and @content]")]
+
+    for product_id in product_ids:
+      tmp = self.search(product_id)
+      if isinstance(tmp, DigikeyProduct):
+        res.append(tmp)
+      else:
+        [res.append(p) for p in tmp.products]
+
+    return len(product_ids)
+
+  def search(self, query: str) -> DigikeySearchResponse:
+    page_size = 10
+
+    # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND
+    params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)}
+    page = self.req("https://www.digikey.com/products/en", params=params)
+    # print("page: ")
+    # print(page.content)
+
+    tree = html.fromstring(page.content)
+
+    count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0)
+
+    if count == 0:
+      return self._search_process_single_result(page.url, tree)
+    else:
+      res = DigikeySearchResponse()
+      self._search_process_multiple_results(tree, res)
+      return res
diff --git a/src/ee/tools/digikey_download_facts.py b/src/ee/tools/digikey_download_facts.py
new file mode 100644
index 0000000..f21f171
--- /dev/null
+++ b/src/ee/tools/digikey_download_facts.py
@@ -0,0 +1,59 @@
+from colors import color
+import argparse
+import sys
+import ee.digikey as dk
+import pandas
+from itertools import *
+import yaml
+import os.path
+
+parser = argparse.ArgumentParser(description="Download facts about parts from Digi-Key")
+
+parser.add_argument("parts",
+                    metavar="PART",
+                    nargs="+",
+                    help="The parts to download fact for")
+
+parser.add_argument("--out",
+                    required=True,
+                    metavar="OUTPUT_DIRECTORY",
+                    dest="out",
+                    action="store",
+                    help="A directory to store fact files")
+
+parser.add_argument("--force",
+                    dest="force",
+                    action="store",
+                    help="Always download fact even if there is a local file")
+
+args = parser.parse_args()
+
+digikey = dk.Digikey()
+client = dk.DigikeyClient(digikey, on_download=lambda s: print(color(s, 'grey')))
+
+def mpn_to_path(mpn):
+  return "{}/{}.yaml".format(args.out, mpn)
+
+def on_product(p):
+  y = p.to_yaml()
+  with open(mpn_to_path(p.mpn), "w") as f:
+    yaml.dump(y, f, encoding="utf-8", allow_unicode=True)
+
+for p in args.parts:
+  print(color("Searching for {}".format(p), "white"))
+  path = mpn_to_path(p)
+
+  if os.path.isfile(path) and not args.force:
+    continue
+    
+  response = client.search(p)
+
+  if not response:
+    print(color("Part not found", "orange"))
+  elif isinstance(response, dk.DigikeyProduct):
+    print(color("Found {}".format(response.mpn)))
+    on_product(response)
+  else:
+    for k, g in groupby(sorted(response.products), lambda p: p.mpn):
+      print(color("Found {}".format(k), "white"))
+      on_product(list(g)[0])
-- 
cgit v1.2.3