From 1733547e0481f3dd2a500577126c5382dc6bb4f7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 6 Sep 2017 22:19:03 +0200 Subject: o Better API for the Digikey downloader. --- requirements.txt | 1 + src/ee/digikey/__init__.py | 81 ++++++++++++++++++++++++---------- src/ee/tools/digikey_download_facts.py | 61 +++++++++++++------------ 3 files changed, 92 insertions(+), 51 deletions(-) diff --git a/requirements.txt b/requirements.txt index 22e94b5..2c5de4c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ parsec==3.3 Pillow==4.2.1 pytest==3.2.0 pyyaml==3.12 +requests==2.18.4 sympy==1.0 typing==3.6.2; python_version < '3.0' diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py index 8acde6b..bd9a86b 100644 --- a/src/ee/digikey/__init__.py +++ b/src/ee/digikey/__init__.py @@ -1,12 +1,14 @@ -import re -from functools import total_ordering +import enum +from typing import List, Optional +import re import requests from cachecontrol import CacheControl from cachecontrol.caches.file_cache import FileCache from cachecontrol.heuristics import ExpiresAfter +from functools import total_ordering from lxml import html -from typing import List, Optional +import urllib.parse def normalize_filename(part): @@ -27,9 +29,13 @@ def _to_string(e): return s.strip() +def _parse_int(s): + return int(s.replace(',', '').replace('.', '')) + + def _to_int(s): try: - return int(s) + return _parse_int(s) except ValueError: return None @@ -41,6 +47,10 @@ def _id_from_url(url): return m.group(1) if m else None +def _first(collection, default=None): + return next(iter(collection), default) + + class Digikey(object): def __init__(self): self.attribute_types = {} @@ -86,8 +96,8 @@ class DigikeyProduct(object): class DigikeyAttributeType(object): - def __init__(self, id, label): - self.id = id + def __init__(self, _id, label): + self.id = _id self.label = label assert self.id @@ -105,16 +115,16 @@ class DigikeyAttributeValue(object): @total_ordering class DigikeyProductCategory(object): - def __init__(self, id, label, digikey_url=None, parent=None): - self.id = _clean(id) + def __init__(self, _id, label, digikey_url=None, parent=None): + self.id = _clean(_id) self.label = _clean(label) self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \ "https://www.digikey.com" + digikey_url self.parent = parent # type: DigikeyProductCategory self.subCategories = [] # type: List[DigikeyProductCategory - assert self.id is not None - assert self.label is not None + assert self.id + assert self.label def __eq__(self, other: "DigikeyProductCategory"): return self.id == other.id @@ -122,16 +132,26 @@ class DigikeyProductCategory(object): def __lt__(self, other: "DigikeyProductCategory") -> bool: return self.label < other.label - def add_sub_category(self, id, label, digikey_url): - sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self) + def add_sub_category(self, _id, label, digikey_url): + sc = DigikeyProductCategory(_id, label, digikey_url=digikey_url, parent=self) self.subCategories.append(sc) def find_sub_category_by_label(self, label): return next((sc for sc in self.subCategories if sc.label == label), None) +class SearchResponseTypes(enum.Enum): + MANY = 1 + SINGLE = 2 + TOO_MANY = 3 + NO_MATCHES = 4 + + class DigikeySearchResponse(object): - def __init__(self): + def __init__(self, count: int, response_type: SearchResponseTypes): + self.count = count + self.response_type = response_type + self.products = set() def append(self, product): @@ -139,7 +159,7 @@ class DigikeySearchResponse(object): class DigikeyClient(object): - def __nop(self): + def __nop(self, message): pass def __init__(self, digikey: Digikey, on_download=None): @@ -156,11 +176,11 @@ class DigikeyClient(object): def req(self, url, params=None): if not url.startswith("http"): url = "https://www.digikey.com" + url - s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()]) + s = "" if not params else "?" + urllib.parse.urlencode(params) self.on_download("Downloading {}".format(url + s)) return self.sess.get(url, params=params) - def _search_process_single_result(self, url: str, tree: html) -> Optional[DigikeyProduct]: + def _search_process_single_result(self, tree: html) -> Optional[DigikeyProduct]: attributes = [] categories = [] @@ -201,7 +221,6 @@ class DigikeyClient(object): return None def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse): - product_ids = [e.get("content").strip().replace('sku:', '') for e in tree.xpath("//*[@itemprop='productid' and @content]")] @@ -214,7 +233,7 @@ class DigikeyClient(object): return len(product_ids) - def search(self, query: str): + def search(self, query: str) -> DigikeySearchResponse: page_size = 10 # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND @@ -225,11 +244,25 @@ class DigikeyClient(object): tree = html.fromstring(page.content) - count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0) + count = _first([_parse_int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]) + + if count: + product_table = _first(tree.xpath("//table[@id='productTable']")) + + if product_table is not None: + res = DigikeySearchResponse(count, SearchResponseTypes.MANY) + self._search_process_multiple_results(product_table, res) + return res + else: + # If the search matches multiple product categories the user has to select the appropriate category + # first + return DigikeySearchResponse(count, SearchResponseTypes.TOO_MANY) - if count == 0: - return self._search_process_single_result(page.url, tree) else: - res = DigikeySearchResponse() - self._search_process_multiple_results(tree, res) - return res + p = self._search_process_single_result(tree) + if p: + res = DigikeySearchResponse(1, SearchResponseTypes.SINGLE) + res.append(p) + return res + else: + return DigikeySearchResponse(1, SearchResponseTypes.NO_MATCHES) diff --git a/src/ee/tools/digikey_download_facts.py b/src/ee/tools/digikey_download_facts.py index f21f171..a1e242b 100644 --- a/src/ee/tools/digikey_download_facts.py +++ b/src/ee/tools/digikey_download_facts.py @@ -1,11 +1,12 @@ -from colors import color import argparse -import sys -import ee.digikey as dk -import pandas from itertools import * -import yaml + import os.path +import yaml +from colors import color + +import ee.digikey as dk +from ee.digikey import SearchResponseTypes, DigikeyProduct parser = argparse.ArgumentParser(description="Download facts about parts from Digi-Key") @@ -31,29 +32,35 @@ args = parser.parse_args() digikey = dk.Digikey() client = dk.DigikeyClient(digikey, on_download=lambda s: print(color(s, 'grey'))) + def mpn_to_path(mpn): - return "{}/{}.yaml".format(args.out, mpn) + return "{}/{}.yaml".format(args.out, mpn) + + +def on_product(p: DigikeyProduct): + y = p.to_yaml() + with open(mpn_to_path(p.mpn), "w") as f: + yaml.dump(y, f, encoding="utf-8", allow_unicode=True) -def on_product(p): - y = p.to_yaml() - with open(mpn_to_path(p.mpn), "w") as f: - yaml.dump(y, f, encoding="utf-8", allow_unicode=True) for p in args.parts: - print(color("Searching for {}".format(p), "white")) - path = mpn_to_path(p) - - if os.path.isfile(path) and not args.force: - continue - - response = client.search(p) - - if not response: - print(color("Part not found", "orange")) - elif isinstance(response, dk.DigikeyProduct): - print(color("Found {}".format(response.mpn))) - on_product(response) - else: - for k, g in groupby(sorted(response.products), lambda p: p.mpn): - print(color("Found {}".format(k), "white")) - on_product(list(g)[0]) + print(color("Searching for {}".format(p), "white")) + path = mpn_to_path(p) + + if os.path.isfile(path) and not args.force: + continue + + response = client.search(p) + + if response.response_type == SearchResponseTypes.SINGLE: + p = response.products[0] + print(color("Found {}".format(p.mpn), "white")) + on_product(p) + elif response.response_type == SearchResponseTypes.MANY: + for k, g in groupby(sorted(response.products), lambda p: p.mpn): + print(color("Found {}".format(k), "white")) + on_product(list(g)[0]) + elif response.response_type == SearchResponseTypes.TOO_MANY: + print(color("Too many results ({}), select a category first".format(response.count), 'red')) + elif response.response_type == SearchResponseTypes.NO_MATCHES: + print(color("Part not found", "orange")) -- cgit v1.2.3