fastgate_checkmk_agent/fastgate_check.py

167 lines
No EOL
4.1 KiB
Python

from .agent_based_api.v1 import register, Result, Service, State
###############
# DEVICE LIST #
###############
def parse_fastgate_device_list(string_table):
result = {}
devices = {}
counter = 0
online = "Status"
for line in string_table:
counter += 1
parts = line
mac, hostname, ip = parts
devices[hostname] = {
"mac" : str(mac),
"ip" : str(ip)
}
result["Status"] = {
"count" : int(counter),
"devices" : devices
}
return result
def discover_fastgate_device_list(section):
for s in section.keys():
yield Service(item=s)
def check_fastgate_device_list(item, section):
try:
if item in section:
count = section[item]["count"]
devices = section[item]["devices"]
details = "\n".join (
f"Hostname: {hostname}, IP: {device_info['ip']}, MAC: {device_info['mac']}"
for hostname, device_info in devices.items()
)
yield Result(state=State.OK, summary=f"{count} Device(s) Online", details=details)
#if item in section:
#mac = section[item]["mac"]
#ip = section[item]["ip"]
#yield Result(state=State.OK, summary=f"Device connected [{mac}] - IP Address [{ip}]")
except Exception as e:
print(e)
register.agent_section(
name="device_list",
parse_function=parse_fastgate_device_list,
)
register.check_plugin(
name="device_list",
service_name="Devices %s",
sections=["device_list"],
discovery_function=discover_fastgate_device_list,
check_function=check_fastgate_device_list
)
#########
# LINKS #
#########
def parse_fastgate_links(string_table):
result = {}
for line in string_table:
parts = line
eth, state, speed = parts
result [eth] = {
"state": int(state),
"speed": int(speed)
}
return result
def discover_fastgate_links(section):
for s in section.keys():
yield Service(item=s)
def check_fastgate_links(item, section):
try:
if item in section:
state = section[item]["state"]
speed = section[item]["speed"]
if state == 1:
yield Result(state=State.OK, summary=f"Link up - {speed}Mbit/s")
else:
yield Result(state=State.OK, summary=f"Disconnected")
except Exception as e:
print(e)
register.agent_section(
name="links",
parse_function=parse_fastgate_links,
)
register.check_plugin(
name="links",
service_name="Ethernet %s",
sections=["links"],
discovery_function=discover_fastgate_links,
check_function=check_fastgate_links
)
################
# PORT MAPPING #
################
def parse_fastgate_port_mapping(string_table):
result = {}
for line in string_table:
parts = line
name, state, start, end, ip, prot = parts
result [name] = {
"state": int(state),
"start": int(start),
"end": int(end),
"ip": str(ip),
"prot": str(prot)
}
return result
def discover_fastgate_port_mapping(section):
for s in section.keys():
yield Service(item=s)
def check_fastgate_port_mapping(item, section):
try:
if item in section:
state = section[item]["state"]
start = section[item]["start"]
end = section[item]["end"]
ip = section[item]["ip"]
protocol = section[item]["prot"]
if state == 1:
yield Result(state=State.OK, summary=f"{protocol}: {start}:{end} - ip: {ip}")
else:
yield Result(state=State.OK, summary=f"[DISABLED] {protocol}: {start}:{end} - ip: {ip}")
except Exception as e:
print(e)
register.agent_section(
name="port_mapping",
parse_function=parse_fastgate_port_mapping,
)
register.check_plugin(
name="port_mapping",
service_name="Port Mapping - %s",
sections=["port_mapping"],
discovery_function=discover_fastgate_port_mapping,
check_function=check_fastgate_port_mapping
)