fastgate_checkmk_agent/fastgate.py
2024-11-13 17:32:07 +00:00

150 lines
No EOL
3.5 KiB
Python

from .agent_based_api.v1 import register, Result, Service, State
###############
# DEVICE LIST #
###############
def parse_fastgate_device_list(string_table):
result = {}
for line in string_table:
parts = line
mac, hostname, ip = parts
result[hostname] = {
"mac": str(mac),
"ip": str(ip)
}
return result
def discover_fastgate_device_list(section):
for s in section.keys():
yield Service(item=s)
def check_fastgate_device_list(item, section):
try:
if item in section:
mac = section[item]["mac"]
ip = section[item]["ip"]
yield Result(state=State.OK, summary=f"Device connected [{mac}] - IP Address [{ip}]")
except Exception as e:
print(e)
register.agent_section(
name="device_list",
parse_function=parse_fastgate_device_list,
)
register.check_plugin(
name="device_list",
service_name="Device %s",
sections=["device_list"],
discovery_function=discover_fastgate_device_list,
check_function=check_fastgate_device_list
)
#########
# LINKS #
#########
def parse_fastgate_links(string_table):
result = {}
for line in string_table:
parts = line
eth, state, speed = parts
result [eth] = {
"state": int(state),
"speed": int(speed)
}
return result
def discover_fastgate_links(section):
for s in section.keys():
yield Service(item=s)
def check_fastgate_links(item, section):
try:
if item in section:
state = section[item]["state"]
speed = section[item]["speed"]
if state == 1:
yield Result(state=State.OK, summary=f"Link up - {speed}Mbit/s")
else:
yield Result(state=State.OK, summary=f"Disconnected")
except Exception as e:
print(e)
register.agent_section(
name="links",
parse_function=parse_fastgate_links,
)
register.check_plugin(
name="links",
service_name="Ethernet %s",
sections=["links"],
discovery_function=discover_fastgate_links,
check_function=check_fastgate_links
)
################
# PORT MAPPING #
################
def parse_fastgate_port_mapping(string_table):
result = {}
for line in string_table:
parts = line
name, state, start, end, ip, prot = parts
result [name] = {
"state": int(state),
"start": int(start),
"end": int(end),
"ip": str(ip),
"prot": str(prot)
}
return result
def discover_fastgate_port_mapping(section):
for s in section.keys():
yield Service(item=s)
def check_fastgate_port_mapping(item, section):
try:
if item in section:
state = section[item]["state"]
start = section[item]["start"]
end = section[item]["end"]
ip = section[item]["ip"]
protocol = section[item]["prot"]
if state == 1:
yield Result(state=State.OK, summary=f"{protocol}: {start}:{end} - ip: {ip}")
else:
yield Result(state=State.OK, summary=f"[DISABLED] {protocol}: {start}:{end} - ip: {ip}")
except Exception as e:
print(e)
register.agent_section(
name="port_mapping",
parse_function=parse_fastgate_port_mapping,
)
register.check_plugin(
name="port_mapping",
service_name="Port Mapping - %s",
sections=["port_mapping"],
discovery_function=discover_fastgate_port_mapping,
check_function=check_fastgate_port_mapping
)