from .agent_based_api.v1 import register, Result, Service, State ############### # DEVICE LIST # ############### def parse_fastgate_device_list(string_table): result = {} devices = {} counter = 0 online = "Status" for line in string_table: counter += 1 parts = line mac, hostname, ip = parts devices[hostname] = { "mac" : str(mac), "ip" : str(ip) } result["Status"] = { "count" : int(counter), "devices" : devices } return result def discover_fastgate_device_list(section): for s in section.keys(): yield Service(item=s) def check_fastgate_device_list(item, section): try: if item in section: count = section[item]["count"] devices = section[item]["devices"] details = "\n".join ( f"Hostname: {hostname}, IP: {device_info['ip']}, MAC: {device_info['mac']}" for hostname, device_info in devices.items() ) yield Result(state=State.OK, summary=f"{count} Device(s) Online", details=details) #if item in section: #mac = section[item]["mac"] #ip = section[item]["ip"] #yield Result(state=State.OK, summary=f"Device connected [{mac}] - IP Address [{ip}]") except Exception as e: print(e) register.agent_section( name="device_list", parse_function=parse_fastgate_device_list, ) register.check_plugin( name="device_list", service_name="Devices %s", sections=["device_list"], discovery_function=discover_fastgate_device_list, check_function=check_fastgate_device_list ) ######### # LINKS # ######### def parse_fastgate_links(string_table): result = {} for line in string_table: parts = line eth, state, speed = parts result [eth] = { "state": int(state), "speed": int(speed) } return result def discover_fastgate_links(section): for s in section.keys(): yield Service(item=s) def check_fastgate_links(item, section): try: if item in section: state = section[item]["state"] speed = section[item]["speed"] if state == 1: yield Result(state=State.OK, summary=f"Link up - {speed}Mbit/s") else: yield Result(state=State.OK, summary=f"Disconnected") except Exception as e: print(e) register.agent_section( name="links", parse_function=parse_fastgate_links, ) register.check_plugin( name="links", service_name="Ethernet %s", sections=["links"], discovery_function=discover_fastgate_links, check_function=check_fastgate_links ) ################ # PORT MAPPING # ################ def parse_fastgate_port_mapping(string_table): result = {} for line in string_table: parts = line name, state, start, end, ip, prot = parts result [name] = { "state": int(state), "start": int(start), "end": int(end), "ip": str(ip), "prot": str(prot) } return result def discover_fastgate_port_mapping(section): for s in section.keys(): yield Service(item=s) def check_fastgate_port_mapping(item, section): try: if item in section: state = section[item]["state"] start = section[item]["start"] end = section[item]["end"] ip = section[item]["ip"] protocol = section[item]["prot"] if state == 1: yield Result(state=State.OK, summary=f"{protocol}: {start}:{end} - ip: {ip}") else: yield Result(state=State.OK, summary=f"[DISABLED] {protocol}: {start}:{end} - ip: {ip}") except Exception as e: print(e) register.agent_section( name="port_mapping", parse_function=parse_fastgate_port_mapping, ) register.check_plugin( name="port_mapping", service_name="Port Mapping - %s", sections=["port_mapping"], discovery_function=discover_fastgate_port_mapping, check_function=check_fastgate_port_mapping )