From d59fb211556cd9b5a2bc028c5cf8a37b891cbfb3 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 3 Sep 2017 11:21:17 +0200 Subject: o Adding tools to download facts about parts from Digi-Key. --- .editorconfig | 2 + .gitignore | 6 +- requirements.txt | 15 ++- src/ee/__main__.py | 53 ++++++++ src/ee/digikey/__init__.py | 240 +++++++++++++++++++++++++++++++++ src/ee/tools/digikey_download_facts.py | 59 ++++++++ test/test_digikey.py | 36 +++-- 7 files changed, 391 insertions(+), 20 deletions(-) create mode 100644 .editorconfig create mode 100644 src/ee/__main__.py create mode 100644 src/ee/digikey/__init__.py create mode 100644 src/ee/tools/digikey_download_facts.py diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..7267e19 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,2 @@ +[*.py] +indent = 2 diff --git a/.gitignore b/.gitignore index 3b76ab3..074e9f9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ env-* ee.egg-info .eggs .cache +.tox +.ipynb*/ +.idea *.png @@ -11,6 +14,3 @@ ee.egg-info demo/*/*.net demo/*/*.raw demo/*/*.log - -.tox -.ipynb*/ diff --git a/requirements.txt b/requirements.txt index 627079e..22e94b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,18 @@ -sympy==1.0 +ansicolors==1.1.8 +cachecontrol==0.12.3 +lockfile==0.12.2 +lxml==3.8.0 +matplotlib==2.0.2 mpmath==0.19 +mypy==0.521; python_version >= '3.0' numpy==1.13.1 -parsec==3.3 -matplotlib==2.0.2 pandas==0.20.3 +parsec==3.3 Pillow==4.2.1 pytest==3.2.0 - -mypy==0.521; python_version >= '3.0' +pyyaml==3.12 +sympy==1.0 +typing==3.6.2; python_version < '3.0' # for development jupyter==1.0.0 diff --git a/src/ee/__main__.py b/src/ee/__main__.py new file mode 100644 index 0000000..75688c0 --- /dev/null +++ b/src/ee/__main__.py @@ -0,0 +1,53 @@ +from functools import total_ordering +import ee.tools +import importlib +import logging +import pkgutil +import sys + +def eprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + +@total_ordering +class Tool(object): + def __init__(self, module_name, name): + self.module_name = module_name + self.name = name + + def __eq__(self, other): + return self.name == other.name + + def __lt__(self, other): + return self.name < other.name + +def find_tools(): + prefix = ee.tools.__name__ + '.' + ps = pkgutil.walk_packages(ee.tools.__path__, prefix) + tools = [] + for (module_loader, module_name, ispkg) in ps: + name = module_name.replace(prefix, '').replace('_', '-') + tools.append(Tool(module_name, name)) + return sorted(tools) + +if __name__ == "__main__": + + logging.basicConfig() # you need to initialize logging, otherwise you will not see anything from requests + logging.getLogger().setLevel(logging.DEBUG) + requests_log = logging.getLogger("requests.packages.urllib3") + requests_log.setLevel(logging.DEBUG) + requests_log.propagate = True + + tools = find_tools() + + name = sys.argv[1] + del sys.argv[1] + + for t in tools: + if t.name != name: + continue + sys.argv[0] = t.name + importlib.import_module(t.module_name) + exit(0) + + eprint("No such tool: {}".format(name)) + exit(1) diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py new file mode 100644 index 0000000..a2ccd03 --- /dev/null +++ b/src/ee/digikey/__init__.py @@ -0,0 +1,240 @@ +import re +from functools import total_ordering + +import requests +from cachecontrol import CacheControl +from cachecontrol import CacheControlAdapter +from cachecontrol.caches.file_cache import FileCache +from cachecontrol.heuristics import ExpiresAfter +from lxml import html +from typing import List + + +def normalize_filename(part): + return part.replace('/', '_').replace(' ', '_') + + +def _clean(s): + if s is None: + return None + s = s.strip() + return None if len(s) == 0 else s + + +def _to_string(e): + s = "" + for t in e.itertext(): + s += t + return s.strip() + + +def _to_int(s): + try: + return int(s) + except ValueError: + return None + + +def _id_from_url(url): + if url is None: + return None + m = re.search(r".*/([0-9]+)", url) + return m.group(1) if m else None + + +class Digikey(object): + def __init__(self): + self.attribute_types = {} + + def get_attribute_type(self, id, label): + try: + return self.attribute_types[id] + except KeyError: + a = DigikeyAttributeType(id, label) + self.attribute_types[id] = a + return a + + +@total_ordering +class DigikeyProduct(object): + def __init__(self, part_number, mpn, attributes, categories): + self.part_number = _clean(part_number) + self.mpn = _clean(mpn) + self.attributes = attributes + self.categories = categories + self.quantity_available = None + self.description = None + + assert self.part_number + assert self.mpn + + def __eq__(self, other): + # type: (DigikeyProduct, DigikeyProduct) -> bool + return self.part_number == other.part_number + + def __lt__(self, other): + # type: (DigikeyProduct, DigikeyProduct) -> bool + return self.part_number < other.part_number + + def __hash__(self): + return self.part_number.__hash__() + + def to_yaml(self): + yaml = {"part_number": self.part_number} + if self.mpn: + yaml["mpn"] = self.mpn + yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value} + for a in self.attributes] + return yaml + + +class DigikeyAttributeType(object): + def __init__(self, id, label): + self.id = id + self.label = label + + assert self.id + assert self.label + + +class DigikeyAttributeValue(object): + def __init__(self, value, attribute_type): + self.value = value + self.attribute_type = attribute_type + + assert self.value + assert self.attribute_type + + +@total_ordering +class DigikeyProductCategory(object): + def __init__(self, id, label, digikey_url=None, parent=None): + self.id = _clean(id) + self.label = _clean(label) + self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ + "https://www.digikey.com" + digikey_url + self.parent = parent # type: DigikeyProductCategory + self.subCategories = [] # type: List[DigikeyProductCategory + + assert self.id is not None + assert self.label is not None + + def __eq__(self, other): + # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool + return self.id == other.id + + def __lt__(self, other): + # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool + return self.label < other.label + + def add_sub_category(self, id, label, digikey_url): + sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self) + self.subCategories.append(sc) + + def find_sub_category_by_label(self, label): + return next((sc for sc in self.subCategories if sc.label == label), None) + + +class DigikeySearchResponse(object): + def __init__(self): + self.products = set() + + def append(self, product): + self.products.add(product) + + +class DigikeyClient(object): + def __nop(self): + pass + + def __init__(self, digikey: Digikey, on_download=None): + self.digikey = digikey + self.on_download = on_download or self.__nop + cache = FileCache('digikey_cache', forever=True) + self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1)) + + # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1)) + # self.sess = requests.Session() + # self.sess.mount('http://', adapter) + # self.sess.mount('https://', adapter) + + def req(self, url, params=None): + if not url.startswith("http"): + url = "https://www.digikey.com" + url + s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()]) + self.on_download("Downloading {}".format(url + s)) + return self.sess.get(url, params=params) + + def _search_process_single_result(self, url: str, tree: html) -> DigikeyProduct: + attributes = [] + categories = [] + + part_number = mpn = None + for n in tree.xpath("//*[@itemprop='productID' and @content]"): + part_number = n.get("content") + part_number = part_number.replace('sku:', '') + for n in tree.xpath("//*[@itemprop='name' and @content]"): + mpn = n.get("content") + + for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"): + tds = tr.xpath("th|td") + if len(tds) != 3: + continue + label = tds[0].text.strip() + value = tds[1].text.strip() + + if len(label) == 0 or len(value) == 0: + continue + + checkbox = tds[2].xpath("input[@type='checkbox' and @name]") + try: + name = checkbox[0].get("name") + attribute_type_id = _to_int(name.replace('pv', '')) + except IndexError: + continue + + if attribute_type_id: + a_type = self.digikey.get_attribute_type(attribute_type_id, label) + attributes.append(DigikeyAttributeValue(value, a_type)) + + if part_number and mpn: + p = DigikeyProduct(part_number, mpn, attributes, categories) + for n in tree.xpath("//*[@itemprop='description']"): + p.description = _to_string(n) + return p + + return None + + def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse): + + product_ids = [e.get("content").strip().replace('sku:', '') for e in + tree.xpath("//*[@itemprop='productid' and @content]")] + + for product_id in product_ids: + tmp = self.search(product_id) + if isinstance(tmp, DigikeyProduct): + res.append(tmp) + else: + [res.append(p) for p in tmp.products] + + return len(product_ids) + + def search(self, query: str) -> DigikeySearchResponse: + page_size = 10 + + # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND + params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)} + page = self.req("https://www.digikey.com/products/en", params=params) + # print("page: ") + # print(page.content) + + tree = html.fromstring(page.content) + + count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0) + + if count == 0: + return self._search_process_single_result(page.url, tree) + else: + res = DigikeySearchResponse() + self._search_process_multiple_results(tree, res) + return res diff --git a/src/ee/tools/digikey_download_facts.py b/src/ee/tools/digikey_download_facts.py new file mode 100644 index 0000000..f21f171 --- /dev/null +++ b/src/ee/tools/digikey_download_facts.py @@ -0,0 +1,59 @@ +from colors import color +import argparse +import sys +import ee.digikey as dk +import pandas +from itertools import * +import yaml +import os.path + +parser = argparse.ArgumentParser(description="Download facts about parts from Digi-Key") + +parser.add_argument("parts", + metavar="PART", + nargs="+", + help="The parts to download fact for") + +parser.add_argument("--out", + required=True, + metavar="OUTPUT_DIRECTORY", + dest="out", + action="store", + help="A directory to store fact files") + +parser.add_argument("--force", + dest="force", + action="store", + help="Always download fact even if there is a local file") + +args = parser.parse_args() + +digikey = dk.Digikey() +client = dk.DigikeyClient(digikey, on_download=lambda s: print(color(s, 'grey'))) + +def mpn_to_path(mpn): + return "{}/{}.yaml".format(args.out, mpn) + +def on_product(p): + y = p.to_yaml() + with open(mpn_to_path(p.mpn), "w") as f: + yaml.dump(y, f, encoding="utf-8", allow_unicode=True) + +for p in args.parts: + print(color("Searching for {}".format(p), "white")) + path = mpn_to_path(p) + + if os.path.isfile(path) and not args.force: + continue + + response = client.search(p) + + if not response: + print(color("Part not found", "orange")) + elif isinstance(response, dk.DigikeyProduct): + print(color("Found {}".format(response.mpn))) + on_product(response) + else: + for k, g in groupby(sorted(response.products), lambda p: p.mpn): + print(color("Found {}".format(k), "white")) + on_product(list(g)[0]) diff --git a/test/test_digikey.py b/test/test_digikey.py index 148f56e..0b79777 100644 --- a/test/test_digikey.py +++ b/test/test_digikey.py @@ -1,16 +1,28 @@ -from ee.kicad.bom import * -from ee.kicad.bom.io import read_bom -import ee.kicad.bom_tool as bom_tool -import ee.kicad.bom_tool.predef as predef +import ee.digikey as dk import os.path -import pytest +import yaml +import sys basedir = os.path.dirname(os.path.abspath(__file__)) -@pytest.mark.skip(reason="disabled for now") -def test_digikey(): - print("") - bom = read_bom(basedir + '/../demo/kicad/bom/A64-OlinuXino_Rev_C.xml') - settings = bom_tool.Settings(suppliers = [predef.digikey]) - pd = bom_tool.to_panda(bom, settings, predef.digikeyCsvFormat(predef.digikey)) - print(pd.to_csv(index = False)) +digikey = dk.Digikey() +client = dk.DigikeyClient(digikey) + + +def test_digikey_1(): + p = client.search("TCR2LF18LM(CTTR-ND") + assert isinstance(p, dk.DigikeyProduct) + assert p.part_number == "TCR2LF18LM(CTTR-ND" + assert len(p.attributes) > 5 + x = p.to_yaml() + print(type(x)) + print("{}".format(x)) + yaml.dump(x, sys.stdout) + + +def test_digikey_2(): + response = client.search("TCR2LF") + [print(p.part_id) for p in response.products] + assert len(response.products) == 28 + # p = products[0] + # assert p.part_number == "TCR2LF18LM(CTTR-ND" -- cgit v1.2.3