import os import sys from unifi_controller_api import UnifiController, UnifiDevice from unifi_controller_api.exceptions import UnifiAuthenticationError, UnifiAPIError from pprint import pprint import pynetbox from pynetbox.core.response import Record class Db(): def __init__(self): self.devices = [] self.interfaces = [] self.ips = [] self.macs = [] self.cables = [] class NotFoundException(Exception): def __init__(self, msg): super().__init__(msg) class Query(): def __init__(self, args, query, projection=lambda x: x.id): self.args = args self.query = query self.projection = projection def run(self, nb): try: ret = self.query() except Exception as e: print("Query failed: ") print(f"Arguments: {value.args}") print(e) raise e if ret is None: raise NotFoundException(f"resource not found, args={self.args}") # if len(ret) != 1: # raise NotFoundException(f"multiple resources found, args={self.args}, result={ret}") return self.projection(ret) def find_device(nb, name: str): return Query({}, lambda: nb.dcim.devices.get(name=name)) def find_interface_by_mac(nb, mac_address: str): args = locals() del args["nb"] return Query(args, lambda: nb.dcim.interfaces.get(mac_address=mac_address)) def find_interface(nb, device: str, name: str): args = locals() del args["nb"] return Query(args, lambda: nb.dcim.interfaces.get(device=device, name=name)) class NetboxCache(): def __init__(self, nb): self.nb = nb self.device_roles = {} self.device_types = {} self.ip_addresses = {} def get_device_role(self, slug): dt = self.device_roles.get(slug) if dt is not None: return dt dt = self.nb.dcim.device_roles.get(slug=slug) if dt is None: raise Exception(f"No such device type: {slug}") self.device_roles[slug] = dt return dt def get_device_type(self, slug): dt = self.device_types.get(slug) if dt is not None: return dt dt = self.nb.dcim.device_types.get(slug=slug) if dt is None: raise Exception(f"No such device type: {slug}") self.device_types[slug] = dt return dt def get_or_create_ip_address(self, addr: str, vrf: Record | None, data): vrf_id = vrf.id if vrf is not None else None key = (addr, vrf_id) ip = self.ip_addresses.get(key) if ip is not None: return ip ip = self.nb.ipam.ip_addresses.get(address=addr, vrf_id=vrf_id) if ip is not None: print(f"Found IP address {ip.id} address={ip.address}, vrf={ip.vrf}") ip.update(data) ip = self.nb.ipam.ip_addresses.get(address=addr, vrf_id=vrf_id) self.ip_addresses[key] = ip return ip ip = self.nb.ipam.ip_addresses.create(address=addr, vrf=vrf_id, status="active") self.ip_addresses[key] = ip return ip def create_or_update_device(nb, d): device = nb.dcim.devices.get(name = d["name"]) if device is None: device = nb.dcim.devices.create(d) print(f"Created device id={device.id}, name={device.name}") return device print(f"Updating device id={device.id}, name={device.name}") device.update(d) return nb.dcim.devices.get(id=device.id) def create_or_update_interface(nb, i): iface = nb.dcim.interfaces.get(device_id=i["device"], name=i["name"]) if iface is None: iface = nb.dcim.interfaces.create(i) print(f"Created interface id={iface.id}, name={iface.name}") return iface print(f"Updating interface id={iface.id}, name={iface.name}") iface.update(i) return nb.dcim.interfaces.get(id=iface.id) def create_or_update_mac_address(nb, data): ma = nb.dcim.mac_addresses.get(mac_address=data["mac_address"]) if ma is None: ma = nb.dcim.mac_addresses.create(data) print(f"Created MAC address id={ma.id}, address={ma.mac_address}") return ma print(f"Updating MAC address id={ma.id}, address={ma.mac_address}") ma.update(data) return nb.dcim.mac_addresses.get(id=ma.id) def create_or_update_ip_address(nb, data): ip = nb.ipam.ip_addresses.get(address=data["address"]) if ip is None: ip = nb.ipam.ip_addresses.create(data) print(f"Created IP address id={ip.id}, ip={ip.address}") return ip print(f"Updating IP address id={ip.id}, ip={ip.address}") ip.update(data) return nb.ipam.ip_addresses.get(id=ip.id) def create_or_update_cable(nb, data): if len(data["a_terminations"]) == 1: a = data["a_terminations"][0] else: raise Exception("only single termination is supported") if len(data["b_terminations"]) == 1: b = data["b_terminations"][0] else: raise Exception("only single termination is supported") cable = nb.dcim.cables.get( termination_a_type=a["object_type"], termination_a_id=a["object_id"], termination_b_type=b["object_type"], termination_b_id=b["object_id"], ) if cable is None: cable = nb.dcim.cables.create(data) print(f"Created Cable address id={ip.id}") return cable print(f"Updating cable id={ip.id}") cable.update(data) return nb.dcim.cables.get(id=cable.id) def process_switch(d: UnifiDevice, db: Db, nb: NetboxCache, site, vrf): # db.devices.append({ # "name": d.name, # "device_type": nb.get_device_type("ubiquiti-us-8-150w").id, # "role": nb.get_device_role("switch").id, # "serial": d.serial, # "site_name": site, # }) # # db.interfaces.append({ # "device": find_device(nb.nb, name=d.name), # "name": "switch0", # "type": "virtual", # }) # # db.ips.append({ # "address": f"{d.ip}/32", # "is_primary": "true", # "vrf": vrf.id, # "assigned_object_id": find_interface(nb.nb, device=d.name, name="switch0"), # "assigned_object_type": "dcim.interface", ## "is_primary": "true" TODO: does not work # }) # # db.macs.append({ # "mac_address": d.mac, # "assigned_object_id": find_interface(nb.nb, device=d.name, name="switch0"), # "assigned_object_type": "dcim.interface", ## "is_primary": "true" TODO: does not work # }) pprint(d.lldp_info) for e in d.lldp_info: a = [ { "object_type": "dcim.interface", "object_id": find_interface(nb.nb, device=d.name, name=f"Port {e.local_port_idx} (PoE)"), } ] b = [ { "object_type": "dcim.interface", "object_id": find_interface_by_mac(nb.nb, e.chassis_id), } ] if e.chassis_id > d.mac: a, b = b, a db.cables.append({ "a_terminations": a, "b_terminations": b, "status": "connected", }) def sync_db(db: Db, nb): def resolve_query(value): if value is None or isinstance(value, str) or isinstance(value, int): return value elif isinstance(value, Query): return value.run(nb) elif isinstance(value, dict): for k, v in value.items(): value[k] = resolve_query(v) elif isinstance(value, list): for i, item in enumerate(value): value[i] = resolve_query(item) else: raise Exception(f"unsupported type: {value}") return value for device in db.devices: device = resolve_query(device) create_or_update_device(nb, device) for iface in db.interfaces: iface = resolve_query(iface) create_or_update_interface(nb, iface) for mac in db.macs: mac = resolve_query(mac) create_or_update_mac_address(nb, mac) for ip in db.ips: ip = resolve_query(ip) create_or_update_ip_address(nb, ip) for cable in db.cables: try: cable = resolve_query(cable) pprint(cable) create_or_update_cable(nb, cable) except NotFoundException: print("Cable failed, could not find endpoint") continue def main(): unifi_url=os.getenv("UNIFI_URL") unifi_username=os.getenv("UNIFI_USERNAME") unifi_password=os.getenv("UNIFI_PASSWORD") unifi_site=os.getenv("UNIFI_SITE") netbox_url=os.getenv("NETBOX_URL") netbox_token=os.getenv("NETBOX_TOKEN") netbox_vrf_name=os.getenv("NETBOX_VRF") netbox_site_name=os.getenv("NETBOX_SITE") controller = controller_login(unifi_url, unifi_username, unifi_password) (nb, netbox_site, netbox_vrf) = netbox_login(netbox_url, netbox_token, netbox_site_name, netbox_vrf_name) status = nb.status() print(f"NetBox status: {status}") devices = collect_devices(controller, unifi_site) nb_cache = NetboxCache(nb) db = Db() for d in devices: # pprint(d) if d.model == "US8P150": process_switch(d, db, nb_cache, netbox_site, netbox_vrf) sync_db(db, nb) def controller_login(url, username, password) -> UnifiController: # try: controller = UnifiController( controller_url=url, username=username, password=password, is_udm_pro=False, verify_ssl=True, ) # Just to check that there is a valid authentication controller.get_unifi_site(include_health=False, raw=False) return controller # except UnifiAuthenticationError: # print("Authentication failed - please check your UniFi Controller credentials and URL.") # except UnifiAPIError as e: # print(f"UniFi API error: {e}") # except Exception as e: # print(f"An unexpected error occurred: {e}") def collect_devices(controller: UnifiController, site_name: str) -> list[UnifiDevice]: try: return controller.get_unifi_site_device(site_name=site_name, detailed=True, raw=False) except UnifiAPIError as e: print(f"Error fetching device information: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") def netbox_login(url: str, token: str, site_name: str, vrf_name: str) -> pynetbox.core.api.Api: nb = pynetbox.api(url, token=token) site = nb.dcim.sites.get(name=site_name) if site is None: site = nb.dcim.sites.get(slug=site_name) if site is None: print(f"Could not look up site by name or slug: {site_name}") exit(1) print(f"NetBox site {site.name}") vrf = None vrf_id = None if vrf_name is not None: vrf = nb.ipam.vrfs.get(site=site, name=vrf_name) if vrf is None: print(f"Could not look up VRF by slug: {vrf_name}") exit(1) vrf_id = vrf.id return nb, site, vrf if __name__ == "__main__": main()