diff options
Diffstat (limited to 'ansible/netbox/sync-unifi.py')
-rw-r--r-- | ansible/netbox/sync-unifi.py | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/ansible/netbox/sync-unifi.py b/ansible/netbox/sync-unifi.py new file mode 100644 index 0000000..4427b20 --- /dev/null +++ b/ansible/netbox/sync-unifi.py @@ -0,0 +1,363 @@ +import os +import sys +from unifi_controller_api import UnifiController, UnifiDevice +from unifi_controller_api.exceptions import UnifiAuthenticationError, UnifiAPIError +from pprint import pprint + +import pynetbox +from pynetbox.core.response import Record + +class Db(): + def __init__(self): + self.devices = [] + self.interfaces = [] + self.ips = [] + self.macs = [] + self.cables = [] + +class NotFoundException(Exception): + def __init__(self, msg): + super().__init__(msg) + +class Query(): + def __init__(self, args, query, projection=lambda x: x.id): + self.args = args + self.query = query + self.projection = projection + + def run(self, nb): + try: + ret = self.query() + except Exception as e: + print("Query failed: ") + print(f"Arguments: {value.args}") + print(e) + raise e + + if ret is None: + raise NotFoundException(f"resource not found, args={self.args}") + +# if len(ret) != 1: +# raise NotFoundException(f"multiple resources found, args={self.args}, result={ret}") + + return self.projection(ret) + +def find_device(nb, name: str): + return Query({}, lambda: nb.dcim.devices.get(name=name)) + +def find_interface_by_mac(nb, mac_address: str): + args = locals() + del args["nb"] + return Query(args, lambda: nb.dcim.interfaces.get(mac_address=mac_address)) + +def find_interface(nb, device: str, name: str): + args = locals() + del args["nb"] + return Query(args, lambda: nb.dcim.interfaces.get(device=device, name=name)) + +class NetboxCache(): + def __init__(self, nb): + self.nb = nb + self.device_roles = {} + self.device_types = {} + self.ip_addresses = {} + + def get_device_role(self, slug): + dt = self.device_roles.get(slug) + + if dt is not None: + return dt + + dt = self.nb.dcim.device_roles.get(slug=slug) + if dt is None: + raise Exception(f"No such device type: {slug}") + + self.device_roles[slug] = dt + return dt + + def get_device_type(self, slug): + dt = self.device_types.get(slug) + + if dt is not None: + return dt + + dt = self.nb.dcim.device_types.get(slug=slug) + if dt is None: + raise Exception(f"No such device type: {slug}") + + self.device_types[slug] = dt + return dt + + def get_or_create_ip_address(self, addr: str, vrf: Record | None, data): + vrf_id = vrf.id if vrf is not None else None + key = (addr, vrf_id) + ip = self.ip_addresses.get(key) + if ip is not None: + return ip + + ip = self.nb.ipam.ip_addresses.get(address=addr, vrf_id=vrf_id) + if ip is not None: + print(f"Found IP address {ip.id} address={ip.address}, vrf={ip.vrf}") + ip.update(data) + ip = self.nb.ipam.ip_addresses.get(address=addr, vrf_id=vrf_id) + self.ip_addresses[key] = ip + return ip + + ip = self.nb.ipam.ip_addresses.create(address=addr, vrf=vrf_id, status="active") + self.ip_addresses[key] = ip + return ip + +def create_or_update_device(nb, d): + device = nb.dcim.devices.get(name = d["name"]) + if device is None: + device = nb.dcim.devices.create(d) + print(f"Created device id={device.id}, name={device.name}") + + return device + + print(f"Updating device id={device.id}, name={device.name}") + device.update(d) + + return nb.dcim.devices.get(id=device.id) + +def create_or_update_interface(nb, i): + iface = nb.dcim.interfaces.get(device_id=i["device"], name=i["name"]) + if iface is None: + iface = nb.dcim.interfaces.create(i) + print(f"Created interface id={iface.id}, name={iface.name}") + + return iface + + print(f"Updating interface id={iface.id}, name={iface.name}") + iface.update(i) + return nb.dcim.interfaces.get(id=iface.id) + +def create_or_update_mac_address(nb, data): + ma = nb.dcim.mac_addresses.get(mac_address=data["mac_address"]) + if ma is None: + ma = nb.dcim.mac_addresses.create(data) + print(f"Created MAC address id={ma.id}, address={ma.mac_address}") + + return ma + + print(f"Updating MAC address id={ma.id}, address={ma.mac_address}") + ma.update(data) + return nb.dcim.mac_addresses.get(id=ma.id) + +def create_or_update_ip_address(nb, data): + ip = nb.ipam.ip_addresses.get(address=data["address"]) + if ip is None: + ip = nb.ipam.ip_addresses.create(data) + print(f"Created IP address id={ip.id}, ip={ip.address}") + + return ip + + print(f"Updating IP address id={ip.id}, ip={ip.address}") + ip.update(data) + return nb.ipam.ip_addresses.get(id=ip.id) + +def create_or_update_cable(nb, data): + if len(data["a_terminations"]) == 1: + a = data["a_terminations"][0] + else: + raise Exception("only single termination is supported") + + if len(data["b_terminations"]) == 1: + b = data["b_terminations"][0] + else: + raise Exception("only single termination is supported") + + cable = nb.dcim.cables.get( + termination_a_type=a["object_type"], + termination_a_id=a["object_id"], + termination_b_type=b["object_type"], + termination_b_id=b["object_id"], + ) + if cable is None: + cable = nb.dcim.cables.create(data) + print(f"Created Cable address id={ip.id}") + + return cable + + print(f"Updating cable id={ip.id}") + cable.update(data) + return nb.dcim.cables.get(id=cable.id) + +def process_switch(d: UnifiDevice, db: Db, nb: NetboxCache, site, vrf): +# db.devices.append({ +# "name": d.name, +# "device_type": nb.get_device_type("ubiquiti-us-8-150w").id, +# "role": nb.get_device_role("switch").id, +# "serial": d.serial, +# "site_name": site, +# }) +# +# db.interfaces.append({ +# "device": find_device(nb.nb, name=d.name), +# "name": "switch0", +# "type": "virtual", +# }) +# +# db.ips.append({ +# "address": f"{d.ip}/32", +# "is_primary": "true", +# "vrf": vrf.id, +# "assigned_object_id": find_interface(nb.nb, device=d.name, name="switch0"), +# "assigned_object_type": "dcim.interface", +## "is_primary": "true" TODO: does not work +# }) +# +# db.macs.append({ +# "mac_address": d.mac, +# "assigned_object_id": find_interface(nb.nb, device=d.name, name="switch0"), +# "assigned_object_type": "dcim.interface", +## "is_primary": "true" TODO: does not work +# }) + + pprint(d.lldp_info) + + for e in d.lldp_info: + a = [ + { + "object_type": "dcim.interface", + "object_id": find_interface(nb.nb, device=d.name, name=f"Port {e.local_port_idx} (PoE)"), + } + ] + b = [ + { + "object_type": "dcim.interface", + "object_id": find_interface_by_mac(nb.nb, e.chassis_id), + } + ] + + if e.chassis_id > d.mac: + a, b = b, a + + db.cables.append({ + "a_terminations": a, + "b_terminations": b, + "status": "connected", + }) + +def sync_db(db: Db, nb): + def resolve_query(value): + if value is None or isinstance(value, str) or isinstance(value, int): + return value + elif isinstance(value, Query): + return value.run(nb) + elif isinstance(value, dict): + for k, v in value.items(): + value[k] = resolve_query(v) + elif isinstance(value, list): + for i, item in enumerate(value): + value[i] = resolve_query(item) + else: + raise Exception(f"unsupported type: {value}") + return value + + for device in db.devices: + device = resolve_query(device) + create_or_update_device(nb, device) + + for iface in db.interfaces: + iface = resolve_query(iface) + create_or_update_interface(nb, iface) + + for mac in db.macs: + mac = resolve_query(mac) + create_or_update_mac_address(nb, mac) + + for ip in db.ips: + ip = resolve_query(ip) + create_or_update_ip_address(nb, ip) + + for cable in db.cables: + try: + cable = resolve_query(cable) + pprint(cable) + create_or_update_cable(nb, cable) + except NotFoundException: + print("Cable failed, could not find endpoint") + continue + +def main(): + unifi_url=os.getenv("UNIFI_URL") + unifi_username=os.getenv("UNIFI_USERNAME") + unifi_password=os.getenv("UNIFI_PASSWORD") + unifi_site=os.getenv("UNIFI_SITE") + + netbox_url=os.getenv("NETBOX_URL") + netbox_token=os.getenv("NETBOX_TOKEN") + netbox_vrf_name=os.getenv("NETBOX_VRF") + netbox_site_name=os.getenv("NETBOX_SITE") + + controller = controller_login(unifi_url, unifi_username, unifi_password) + + (nb, netbox_site, netbox_vrf) = netbox_login(netbox_url, netbox_token, netbox_site_name, netbox_vrf_name) + status = nb.status() + print(f"NetBox status: {status}") + + devices = collect_devices(controller, unifi_site) + + nb_cache = NetboxCache(nb) + db = Db() + for d in devices: +# pprint(d) + if d.model == "US8P150": + process_switch(d, db, nb_cache, netbox_site, netbox_vrf) + + sync_db(db, nb) + +def controller_login(url, username, password) -> UnifiController: +# try: + controller = UnifiController( + controller_url=url, + username=username, + password=password, + is_udm_pro=False, + verify_ssl=True, + ) + + # Just to check that there is a valid authentication + controller.get_unifi_site(include_health=False, raw=False) + + return controller +# except UnifiAuthenticationError: +# print("Authentication failed - please check your UniFi Controller credentials and URL.") +# except UnifiAPIError as e: +# print(f"UniFi API error: {e}") +# except Exception as e: +# print(f"An unexpected error occurred: {e}") + +def collect_devices(controller: UnifiController, site_name: str) -> list[UnifiDevice]: + try: + return controller.get_unifi_site_device(site_name=site_name, detailed=True, raw=False) + except UnifiAPIError as e: + print(f"Error fetching device information: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + +def netbox_login(url: str, token: str, site_name: str, vrf_name: str) -> pynetbox.core.api.Api: + nb = pynetbox.api(url, token=token) + + site = nb.dcim.sites.get(name=site_name) + if site is None: + site = nb.dcim.sites.get(slug=site_name) + if site is None: + print(f"Could not look up site by name or slug: {site_name}") + exit(1) + print(f"NetBox site {site.name}") + + vrf = None + vrf_id = None + if vrf_name is not None: + vrf = nb.ipam.vrfs.get(site=site, name=vrf_name) + if vrf is None: + print(f"Could not look up VRF by slug: {vrf_name}") + exit(1) + vrf_id = vrf.id + + return nb, site, vrf + +if __name__ == "__main__": + main() |