import argparse from functools import total_ordering import ee.digikey as dk from ee.digikey import SearchResponseTypes, DigikeyProduct from ee.tools import log if True: raise Exception("This module is deprecated.") @total_ordering class Query(object): def __init__(self, is_mpn, query): self.is_mpn = is_mpn self.query = query def __eq__(self, other): return (self.is_mpn, self.query) == (other.is_mpn, other.query) def __lt__(self, other): return (self.is_mpn, self.query) < (other.is_mpn, other.query) def __hash__(self): return hash((self.is_mpn, self.query)) parser = argparse.ArgumentParser(description="Download facts about parts from Digi-Key") parser.add_argument("--out", required=True, metavar="OUTPUT_DIRECTORY", dest="out", action="store", help="A directory to store fact files") parser.add_argument("--part", nargs="+", help="the parts to download fact for") parser.add_argument("--sch") parser.add_argument("--force", dest="force", action="store", help="Always download fact even if there is a local file") args = parser.parse_args() digikey = dk.Digikey() client = dk.DigikeyClient("https://www.digikey.com/products/en", on_download=log.debug) parser = dk.DigikeyParser(digikey) repo = dk.DigikeyRepository(digikey, args.out) def on_product(product: DigikeyProduct): repo.save(product) queries = [] if args.part: [queries.append(Query(True, p)) for p in args.part] if args.sch: from ee.kicad import read_schematics, to_bom sch = read_schematics(args.sch) for c in to_bom(sch): digikey = c.get_field("digikey") if digikey: queries.append(Query(False, digikey.value)) mpn = c.get_field("mpn") if mpn: queries.append(Query(True, mpn.value)) queries = sorted(set(queries)) for q in queries: p = q.query if repo.has_product(mpn=p if q.is_mpn else None, dpn=p if not q.is_mpn else None) and not args.force: log.info("Already have facts for {}".format(p)) continue log.info("Searching for {}".format(p)) response = parser.parse_string(client.search(p)) todos = [] if response.response_type == SearchResponseTypes.SINGLE: p = response.products[0] log.info("Direct match mpn/dpn: {}/{}".format(p.mpn, p.part_number)) on_product(p) elif response.response_type == SearchResponseTypes.MANY: # A search for "FOO" might return products "FOO" and "FOOT" so we pick out the ones with the matching mpn/dpn. if q.is_mpn: viable_products = [p for p in response.products if p.mpn == q.query] else: viable_products = [p for p in response.products if p.part_number == q.query] if len(viable_products): # Pick the first one, should be as good as any part_number = sorted(viable_products, key=lambda p: p.part_number)[0].part_number log.info( "Got many hits for term '{}', will use {} for downloading attributes.".format(q.query, part_number)) todos.append(part_number) else: log.warn("Got many results: {}".format(", ".join([p.part_number for p in response.products]))) elif response.response_type == SearchResponseTypes.TOO_MANY: log.warn("Too many results ({}), select a category first".format(response.count)) elif response.response_type == SearchResponseTypes.NO_MATCHES: log.warn("Part not found") for part_number in todos: response = parser.parse_string(client.search(part_number)) if response.response_type == SearchResponseTypes.SINGLE: p = sorted(response.products, key=lambda p: p.part_number)[0] log.info("Downloaded {}".format(p.mpn)) on_product(p) else: log.warn("Got response of type {} when really expecting a single product.".format(response.response_type))