aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.editorconfig2
-rw-r--r--src/ee/digikey/__init__.py355
2 files changed, 176 insertions, 181 deletions
diff --git a/.editorconfig b/.editorconfig
index 7267e19..43c9cff 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -1,2 +1,2 @@
[*.py]
-indent = 2
+indent = 4
diff --git a/src/ee/digikey/__init__.py b/src/ee/digikey/__init__.py
index a2ccd03..8acde6b 100644
--- a/src/ee/digikey/__init__.py
+++ b/src/ee/digikey/__init__.py
@@ -3,238 +3,233 @@ from functools import total_ordering
import requests
from cachecontrol import CacheControl
-from cachecontrol import CacheControlAdapter
from cachecontrol.caches.file_cache import FileCache
from cachecontrol.heuristics import ExpiresAfter
from lxml import html
-from typing import List
+from typing import List, Optional
def normalize_filename(part):
- return part.replace('/', '_').replace(' ', '_')
+ return part.replace('/', '_').replace(' ', '_')
def _clean(s):
- if s is None:
- return None
- s = s.strip()
- return None if len(s) == 0 else s
+ if s is None:
+ return None
+ s = s.strip()
+ return None if len(s) == 0 else s
def _to_string(e):
- s = ""
- for t in e.itertext():
- s += t
- return s.strip()
+ s = ""
+ for t in e.itertext():
+ s += t
+ return s.strip()
def _to_int(s):
- try:
- return int(s)
- except ValueError:
- return None
+ try:
+ return int(s)
+ except ValueError:
+ return None
def _id_from_url(url):
- if url is None:
- return None
- m = re.search(r".*/([0-9]+)", url)
- return m.group(1) if m else None
+ if url is None:
+ return None
+ m = re.search(r".*/([0-9]+)", url)
+ return m.group(1) if m else None
class Digikey(object):
- def __init__(self):
- self.attribute_types = {}
+ def __init__(self):
+ self.attribute_types = {}
- def get_attribute_type(self, id, label):
- try:
- return self.attribute_types[id]
- except KeyError:
- a = DigikeyAttributeType(id, label)
- self.attribute_types[id] = a
- return a
+ def get_attribute_type(self, key, label):
+ try:
+ return self.attribute_types[key]
+ except KeyError:
+ a = DigikeyAttributeType(key, label)
+ self.attribute_types[key] = a
+ return a
@total_ordering
class DigikeyProduct(object):
- def __init__(self, part_number, mpn, attributes, categories):
- self.part_number = _clean(part_number)
- self.mpn = _clean(mpn)
- self.attributes = attributes
- self.categories = categories
- self.quantity_available = None
- self.description = None
+ def __init__(self, part_number, mpn, attributes, categories):
+ self.part_number = _clean(part_number)
+ self.mpn = _clean(mpn)
+ self.attributes = attributes
+ self.categories = categories
+ self.quantity_available = None
+ self.description = None
- assert self.part_number
- assert self.mpn
+ assert self.part_number
+ assert self.mpn
- def __eq__(self, other):
- # type: (DigikeyProduct, DigikeyProduct) -> bool
- return self.part_number == other.part_number
+ def __eq__(self, other: "DigikeyProduct") -> bool:
+ return self.part_number == other.part_number
- def __lt__(self, other):
- # type: (DigikeyProduct, DigikeyProduct) -> bool
- return self.part_number < other.part_number
+ def __lt__(self, other: "DigikeyProduct") -> bool:
+ return self.part_number < other.part_number
- def __hash__(self):
- return self.part_number.__hash__()
+ def __hash__(self):
+ return self.part_number.__hash__()
- def to_yaml(self):
- yaml = {"part_number": self.part_number}
- if self.mpn:
- yaml["mpn"] = self.mpn
- yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value}
- for a in self.attributes]
- return yaml
+ def to_yaml(self):
+ yaml = {"part_number": self.part_number}
+ if self.mpn:
+ yaml["mpn"] = self.mpn
+ yaml["attributes"] = [{"type": {"id": a.attribute_type.id, "label": a.attribute_type.label}, "value": a.value}
+ for a in self.attributes]
+ return yaml
class DigikeyAttributeType(object):
- def __init__(self, id, label):
- self.id = id
- self.label = label
+ def __init__(self, id, label):
+ self.id = id
+ self.label = label
- assert self.id
- assert self.label
+ assert self.id
+ assert self.label
class DigikeyAttributeValue(object):
- def __init__(self, value, attribute_type):
- self.value = value
- self.attribute_type = attribute_type
+ def __init__(self, value, attribute_type):
+ self.value = value
+ self.attribute_type = attribute_type
- assert self.value
- assert self.attribute_type
+ assert self.value
+ assert self.attribute_type
@total_ordering
class DigikeyProductCategory(object):
- def __init__(self, id, label, digikey_url=None, parent=None):
- self.id = _clean(id)
- self.label = _clean(label)
- self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \
- "https://www.digikey.com" + digikey_url
- self.parent = parent # type: DigikeyProductCategory
- self.subCategories = [] # type: List[DigikeyProductCategory
+ def __init__(self, id, label, digikey_url=None, parent=None):
+ self.id = _clean(id)
+ self.label = _clean(label)
+ self.digikey_url = digikey_url if digikey_url is None or digikey_url.startswith("http") else \
+ "https://www.digikey.com" + digikey_url
+ self.parent = parent # type: DigikeyProductCategory
+ self.subCategories = [] # type: List[DigikeyProductCategory
- assert self.id is not None
- assert self.label is not None
+ assert self.id is not None
+ assert self.label is not None
- def __eq__(self, other):
- # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool
- return self.id == other.id
+ def __eq__(self, other: "DigikeyProductCategory"):
+ return self.id == other.id
- def __lt__(self, other):
- # type: (DigikeyProductCategory, DigikeyProductCategory) -> bool
- return self.label < other.label
+ def __lt__(self, other: "DigikeyProductCategory") -> bool:
+ return self.label < other.label
- def add_sub_category(self, id, label, digikey_url):
- sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self)
- self.subCategories.append(sc)
+ def add_sub_category(self, id, label, digikey_url):
+ sc = DigikeyProductCategory(id, label, digikey_url=digikey_url, parent=self)
+ self.subCategories.append(sc)
- def find_sub_category_by_label(self, label):
- return next((sc for sc in self.subCategories if sc.label == label), None)
+ def find_sub_category_by_label(self, label):
+ return next((sc for sc in self.subCategories if sc.label == label), None)
class DigikeySearchResponse(object):
- def __init__(self):
- self.products = set()
+ def __init__(self):
+ self.products = set()
- def append(self, product):
- self.products.add(product)
+ def append(self, product):
+ self.products.add(product)
class DigikeyClient(object):
- def __nop(self):
- pass
-
- def __init__(self, digikey: Digikey, on_download=None):
- self.digikey = digikey
- self.on_download = on_download or self.__nop
- cache = FileCache('digikey_cache', forever=True)
- self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1))
-
- # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1))
- # self.sess = requests.Session()
- # self.sess.mount('http://', adapter)
- # self.sess.mount('https://', adapter)
-
- def req(self, url, params=None):
- if not url.startswith("http"):
- url = "https://www.digikey.com" + url
- s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()])
- self.on_download("Downloading {}".format(url + s))
- return self.sess.get(url, params=params)
-
- def _search_process_single_result(self, url: str, tree: html) -> DigikeyProduct:
- attributes = []
- categories = []
-
- part_number = mpn = None
- for n in tree.xpath("//*[@itemprop='productID' and @content]"):
- part_number = n.get("content")
- part_number = part_number.replace('sku:', '')
- for n in tree.xpath("//*[@itemprop='name' and @content]"):
- mpn = n.get("content")
-
- for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"):
- tds = tr.xpath("th|td")
- if len(tds) != 3:
- continue
- label = tds[0].text.strip()
- value = tds[1].text.strip()
-
- if len(label) == 0 or len(value) == 0:
- continue
-
- checkbox = tds[2].xpath("input[@type='checkbox' and @name]")
- try:
- name = checkbox[0].get("name")
- attribute_type_id = _to_int(name.replace('pv', ''))
- except IndexError:
- continue
-
- if attribute_type_id:
- a_type = self.digikey.get_attribute_type(attribute_type_id, label)
- attributes.append(DigikeyAttributeValue(value, a_type))
-
- if part_number and mpn:
- p = DigikeyProduct(part_number, mpn, attributes, categories)
- for n in tree.xpath("//*[@itemprop='description']"):
- p.description = _to_string(n)
- return p
-
- return None
-
- def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse):
-
- product_ids = [e.get("content").strip().replace('sku:', '') for e in
- tree.xpath("//*[@itemprop='productid' and @content]")]
-
- for product_id in product_ids:
- tmp = self.search(product_id)
- if isinstance(tmp, DigikeyProduct):
- res.append(tmp)
- else:
- [res.append(p) for p in tmp.products]
-
- return len(product_ids)
-
- def search(self, query: str) -> DigikeySearchResponse:
- page_size = 10
-
- # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND
- params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)}
- page = self.req("https://www.digikey.com/products/en", params=params)
- # print("page: ")
- # print(page.content)
-
- tree = html.fromstring(page.content)
-
- count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0)
-
- if count == 0:
- return self._search_process_single_result(page.url, tree)
- else:
- res = DigikeySearchResponse()
- self._search_process_multiple_results(tree, res)
- return res
+ def __nop(self):
+ pass
+
+ def __init__(self, digikey: Digikey, on_download=None):
+ self.digikey = digikey
+ self.on_download = on_download or self.__nop
+ cache = FileCache('digikey_cache', forever=True)
+ self.sess = CacheControl(requests.Session(), cache=cache, heuristic=ExpiresAfter(days=1))
+
+ # adapter = CacheControlAdapter(cache=cache, heuristic=ExpiresAfter(days=1))
+ # self.sess = requests.Session()
+ # self.sess.mount('http://', adapter)
+ # self.sess.mount('https://', adapter)
+
+ def req(self, url, params=None):
+ if not url.startswith("http"):
+ url = "https://www.digikey.com" + url
+ s = "" if not params else "?" + "&".join([k + "=" + v for k, v in params.items()])
+ self.on_download("Downloading {}".format(url + s))
+ return self.sess.get(url, params=params)
+
+ def _search_process_single_result(self, url: str, tree: html) -> Optional[DigikeyProduct]:
+ attributes = []
+ categories = []
+
+ part_number = mpn = None
+ for n in tree.xpath("//*[@itemprop='productID' and @content]"):
+ part_number = n.get("content")
+ part_number = part_number.replace('sku:', '')
+ for n in tree.xpath("//*[@itemprop='name' and @content]"):
+ mpn = n.get("content")
+
+ for tr in tree.xpath("//table[@id='prod-att-table']/tr[not(@id='prod-att-title-row')]"):
+ tds = tr.xpath("th|td")
+ if len(tds) != 3:
+ continue
+ label = tds[0].text.strip()
+ value = tds[1].text.strip()
+
+ if len(label) == 0 or len(value) == 0:
+ continue
+
+ checkbox = tds[2].xpath("input[@type='checkbox' and @name]")
+ try:
+ name = checkbox[0].get("name")
+ attribute_type_id = _to_int(name.replace('pv', ''))
+ except IndexError:
+ continue
+
+ if attribute_type_id:
+ a_type = self.digikey.get_attribute_type(attribute_type_id, label)
+ attributes.append(DigikeyAttributeValue(value, a_type))
+
+ if part_number and mpn:
+ p = DigikeyProduct(part_number, mpn, attributes, categories)
+ for n in tree.xpath("//*[@itemprop='description']"):
+ p.description = _to_string(n)
+ return p
+
+ return None
+
+ def _search_process_multiple_results(self, tree: html, res: DigikeySearchResponse):
+
+ product_ids = [e.get("content").strip().replace('sku:', '') for e in
+ tree.xpath("//*[@itemprop='productid' and @content]")]
+
+ for product_id in product_ids:
+ tmp = self.search(product_id)
+ if isinstance(tmp, DigikeyProduct):
+ res.append(tmp)
+ else:
+ [res.append(p) for p in tmp.products]
+
+ return len(product_ids)
+
+ def search(self, query: str):
+ page_size = 10
+
+ # http://www.digikey.com/products/en?x=0&y=0&lang=en&site=us&keywords=553-2320-1-ND
+ params = {'lang': 'en', 'site': 'us', 'keywords': query, 'pageSize': str(page_size)}
+ page = self.req("https://www.digikey.com/products/en", params=params)
+ # print("page: ")
+ # print(page.content)
+
+ tree = html.fromstring(page.content)
+
+ count = next(iter([int(e.text) for e in tree.xpath("//span[@id='matching-records-count']") if e.text]), 0)
+
+ if count == 0:
+ return self._search_process_single_result(page.url, tree)
+ else:
+ res = DigikeySearchResponse()
+ self._search_process_multiple_results(tree, res)
+ return res