aboutsummaryrefslogtreecommitdiff
path: root/ansible/netbox/sync-unifi.py
diff options
context:
space:
mode:
Diffstat (limited to 'ansible/netbox/sync-unifi.py')
-rw-r--r--ansible/netbox/sync-unifi.py363
1 files changed, 363 insertions, 0 deletions
diff --git a/ansible/netbox/sync-unifi.py b/ansible/netbox/sync-unifi.py
new file mode 100644
index 0000000..4427b20
--- /dev/null
+++ b/ansible/netbox/sync-unifi.py
@@ -0,0 +1,363 @@
+import os
+import sys
+from unifi_controller_api import UnifiController, UnifiDevice
+from unifi_controller_api.exceptions import UnifiAuthenticationError, UnifiAPIError
+from pprint import pprint
+
+import pynetbox
+from pynetbox.core.response import Record
+
+class Db():
+ def __init__(self):
+ self.devices = []
+ self.interfaces = []
+ self.ips = []
+ self.macs = []
+ self.cables = []
+
+class NotFoundException(Exception):
+ def __init__(self, msg):
+ super().__init__(msg)
+
+class Query():
+ def __init__(self, args, query, projection=lambda x: x.id):
+ self.args = args
+ self.query = query
+ self.projection = projection
+
+ def run(self, nb):
+ try:
+ ret = self.query()
+ except Exception as e:
+ print("Query failed: ")
+ print(f"Arguments: {value.args}")
+ print(e)
+ raise e
+
+ if ret is None:
+ raise NotFoundException(f"resource not found, args={self.args}")
+
+# if len(ret) != 1:
+# raise NotFoundException(f"multiple resources found, args={self.args}, result={ret}")
+
+ return self.projection(ret)
+
+def find_device(nb, name: str):
+ return Query({}, lambda: nb.dcim.devices.get(name=name))
+
+def find_interface_by_mac(nb, mac_address: str):
+ args = locals()
+ del args["nb"]
+ return Query(args, lambda: nb.dcim.interfaces.get(mac_address=mac_address))
+
+def find_interface(nb, device: str, name: str):
+ args = locals()
+ del args["nb"]
+ return Query(args, lambda: nb.dcim.interfaces.get(device=device, name=name))
+
+class NetboxCache():
+ def __init__(self, nb):
+ self.nb = nb
+ self.device_roles = {}
+ self.device_types = {}
+ self.ip_addresses = {}
+
+ def get_device_role(self, slug):
+ dt = self.device_roles.get(slug)
+
+ if dt is not None:
+ return dt
+
+ dt = self.nb.dcim.device_roles.get(slug=slug)
+ if dt is None:
+ raise Exception(f"No such device type: {slug}")
+
+ self.device_roles[slug] = dt
+ return dt
+
+ def get_device_type(self, slug):
+ dt = self.device_types.get(slug)
+
+ if dt is not None:
+ return dt
+
+ dt = self.nb.dcim.device_types.get(slug=slug)
+ if dt is None:
+ raise Exception(f"No such device type: {slug}")
+
+ self.device_types[slug] = dt
+ return dt
+
+ def get_or_create_ip_address(self, addr: str, vrf: Record | None, data):
+ vrf_id = vrf.id if vrf is not None else None
+ key = (addr, vrf_id)
+ ip = self.ip_addresses.get(key)
+ if ip is not None:
+ return ip
+
+ ip = self.nb.ipam.ip_addresses.get(address=addr, vrf_id=vrf_id)
+ if ip is not None:
+ print(f"Found IP address {ip.id} address={ip.address}, vrf={ip.vrf}")
+ ip.update(data)
+ ip = self.nb.ipam.ip_addresses.get(address=addr, vrf_id=vrf_id)
+ self.ip_addresses[key] = ip
+ return ip
+
+ ip = self.nb.ipam.ip_addresses.create(address=addr, vrf=vrf_id, status="active")
+ self.ip_addresses[key] = ip
+ return ip
+
+def create_or_update_device(nb, d):
+ device = nb.dcim.devices.get(name = d["name"])
+ if device is None:
+ device = nb.dcim.devices.create(d)
+ print(f"Created device id={device.id}, name={device.name}")
+
+ return device
+
+ print(f"Updating device id={device.id}, name={device.name}")
+ device.update(d)
+
+ return nb.dcim.devices.get(id=device.id)
+
+def create_or_update_interface(nb, i):
+ iface = nb.dcim.interfaces.get(device_id=i["device"], name=i["name"])
+ if iface is None:
+ iface = nb.dcim.interfaces.create(i)
+ print(f"Created interface id={iface.id}, name={iface.name}")
+
+ return iface
+
+ print(f"Updating interface id={iface.id}, name={iface.name}")
+ iface.update(i)
+ return nb.dcim.interfaces.get(id=iface.id)
+
+def create_or_update_mac_address(nb, data):
+ ma = nb.dcim.mac_addresses.get(mac_address=data["mac_address"])
+ if ma is None:
+ ma = nb.dcim.mac_addresses.create(data)
+ print(f"Created MAC address id={ma.id}, address={ma.mac_address}")
+
+ return ma
+
+ print(f"Updating MAC address id={ma.id}, address={ma.mac_address}")
+ ma.update(data)
+ return nb.dcim.mac_addresses.get(id=ma.id)
+
+def create_or_update_ip_address(nb, data):
+ ip = nb.ipam.ip_addresses.get(address=data["address"])
+ if ip is None:
+ ip = nb.ipam.ip_addresses.create(data)
+ print(f"Created IP address id={ip.id}, ip={ip.address}")
+
+ return ip
+
+ print(f"Updating IP address id={ip.id}, ip={ip.address}")
+ ip.update(data)
+ return nb.ipam.ip_addresses.get(id=ip.id)
+
+def create_or_update_cable(nb, data):
+ if len(data["a_terminations"]) == 1:
+ a = data["a_terminations"][0]
+ else:
+ raise Exception("only single termination is supported")
+
+ if len(data["b_terminations"]) == 1:
+ b = data["b_terminations"][0]
+ else:
+ raise Exception("only single termination is supported")
+
+ cable = nb.dcim.cables.get(
+ termination_a_type=a["object_type"],
+ termination_a_id=a["object_id"],
+ termination_b_type=b["object_type"],
+ termination_b_id=b["object_id"],
+ )
+ if cable is None:
+ cable = nb.dcim.cables.create(data)
+ print(f"Created Cable address id={ip.id}")
+
+ return cable
+
+ print(f"Updating cable id={ip.id}")
+ cable.update(data)
+ return nb.dcim.cables.get(id=cable.id)
+
+def process_switch(d: UnifiDevice, db: Db, nb: NetboxCache, site, vrf):
+# db.devices.append({
+# "name": d.name,
+# "device_type": nb.get_device_type("ubiquiti-us-8-150w").id,
+# "role": nb.get_device_role("switch").id,
+# "serial": d.serial,
+# "site_name": site,
+# })
+#
+# db.interfaces.append({
+# "device": find_device(nb.nb, name=d.name),
+# "name": "switch0",
+# "type": "virtual",
+# })
+#
+# db.ips.append({
+# "address": f"{d.ip}/32",
+# "is_primary": "true",
+# "vrf": vrf.id,
+# "assigned_object_id": find_interface(nb.nb, device=d.name, name="switch0"),
+# "assigned_object_type": "dcim.interface",
+## "is_primary": "true" TODO: does not work
+# })
+#
+# db.macs.append({
+# "mac_address": d.mac,
+# "assigned_object_id": find_interface(nb.nb, device=d.name, name="switch0"),
+# "assigned_object_type": "dcim.interface",
+## "is_primary": "true" TODO: does not work
+# })
+
+ pprint(d.lldp_info)
+
+ for e in d.lldp_info:
+ a = [
+ {
+ "object_type": "dcim.interface",
+ "object_id": find_interface(nb.nb, device=d.name, name=f"Port {e.local_port_idx} (PoE)"),
+ }
+ ]
+ b = [
+ {
+ "object_type": "dcim.interface",
+ "object_id": find_interface_by_mac(nb.nb, e.chassis_id),
+ }
+ ]
+
+ if e.chassis_id > d.mac:
+ a, b = b, a
+
+ db.cables.append({
+ "a_terminations": a,
+ "b_terminations": b,
+ "status": "connected",
+ })
+
+def sync_db(db: Db, nb):
+ def resolve_query(value):
+ if value is None or isinstance(value, str) or isinstance(value, int):
+ return value
+ elif isinstance(value, Query):
+ return value.run(nb)
+ elif isinstance(value, dict):
+ for k, v in value.items():
+ value[k] = resolve_query(v)
+ elif isinstance(value, list):
+ for i, item in enumerate(value):
+ value[i] = resolve_query(item)
+ else:
+ raise Exception(f"unsupported type: {value}")
+ return value
+
+ for device in db.devices:
+ device = resolve_query(device)
+ create_or_update_device(nb, device)
+
+ for iface in db.interfaces:
+ iface = resolve_query(iface)
+ create_or_update_interface(nb, iface)
+
+ for mac in db.macs:
+ mac = resolve_query(mac)
+ create_or_update_mac_address(nb, mac)
+
+ for ip in db.ips:
+ ip = resolve_query(ip)
+ create_or_update_ip_address(nb, ip)
+
+ for cable in db.cables:
+ try:
+ cable = resolve_query(cable)
+ pprint(cable)
+ create_or_update_cable(nb, cable)
+ except NotFoundException:
+ print("Cable failed, could not find endpoint")
+ continue
+
+def main():
+ unifi_url=os.getenv("UNIFI_URL")
+ unifi_username=os.getenv("UNIFI_USERNAME")
+ unifi_password=os.getenv("UNIFI_PASSWORD")
+ unifi_site=os.getenv("UNIFI_SITE")
+
+ netbox_url=os.getenv("NETBOX_URL")
+ netbox_token=os.getenv("NETBOX_TOKEN")
+ netbox_vrf_name=os.getenv("NETBOX_VRF")
+ netbox_site_name=os.getenv("NETBOX_SITE")
+
+ controller = controller_login(unifi_url, unifi_username, unifi_password)
+
+ (nb, netbox_site, netbox_vrf) = netbox_login(netbox_url, netbox_token, netbox_site_name, netbox_vrf_name)
+ status = nb.status()
+ print(f"NetBox status: {status}")
+
+ devices = collect_devices(controller, unifi_site)
+
+ nb_cache = NetboxCache(nb)
+ db = Db()
+ for d in devices:
+# pprint(d)
+ if d.model == "US8P150":
+ process_switch(d, db, nb_cache, netbox_site, netbox_vrf)
+
+ sync_db(db, nb)
+
+def controller_login(url, username, password) -> UnifiController:
+# try:
+ controller = UnifiController(
+ controller_url=url,
+ username=username,
+ password=password,
+ is_udm_pro=False,
+ verify_ssl=True,
+ )
+
+ # Just to check that there is a valid authentication
+ controller.get_unifi_site(include_health=False, raw=False)
+
+ return controller
+# except UnifiAuthenticationError:
+# print("Authentication failed - please check your UniFi Controller credentials and URL.")
+# except UnifiAPIError as e:
+# print(f"UniFi API error: {e}")
+# except Exception as e:
+# print(f"An unexpected error occurred: {e}")
+
+def collect_devices(controller: UnifiController, site_name: str) -> list[UnifiDevice]:
+ try:
+ return controller.get_unifi_site_device(site_name=site_name, detailed=True, raw=False)
+ except UnifiAPIError as e:
+ print(f"Error fetching device information: {e}")
+ except Exception as e:
+ print(f"An unexpected error occurred: {e}")
+
+def netbox_login(url: str, token: str, site_name: str, vrf_name: str) -> pynetbox.core.api.Api:
+ nb = pynetbox.api(url, token=token)
+
+ site = nb.dcim.sites.get(name=site_name)
+ if site is None:
+ site = nb.dcim.sites.get(slug=site_name)
+ if site is None:
+ print(f"Could not look up site by name or slug: {site_name}")
+ exit(1)
+ print(f"NetBox site {site.name}")
+
+ vrf = None
+ vrf_id = None
+ if vrf_name is not None:
+ vrf = nb.ipam.vrfs.get(site=site, name=vrf_name)
+ if vrf is None:
+ print(f"Could not look up VRF by slug: {vrf_name}")
+ exit(1)
+ vrf_id = vrf.id
+
+ return nb, site, vrf
+
+if __name__ == "__main__":
+ main()