From 0851aec7da7f699dfd3347440374421f919491ef Mon Sep 17 00:00:00 2001 From: nicole Date: Wed, 13 Nov 2024 17:33:02 +0000 Subject: [PATCH] Update fastgate.py --- fastgate.py | 379 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 248 insertions(+), 131 deletions(-) diff --git a/fastgate.py b/fastgate.py index 6e040c4..2254f6b 100644 --- a/fastgate.py +++ b/fastgate.py @@ -1,150 +1,267 @@ -from .agent_based_api.v1 import register, Result, Service, State +#!/omd/sites/cmk/bin/python3 -############### -# DEVICE LIST # -############### +import requests +import time +import json +import os -def parse_fastgate_device_list(string_table): - result = {} +thisfolder = os.path.dirname(os.path.abspath(__file__)) +cookies_file = os.path.join(thisfolder, "cookies.json") - for line in string_table: - parts = line - mac, hostname, ip = parts - result[hostname] = { - "mac": str(mac), - "ip": str(ip) +# Generate timestamp +timestamp = str(int(time.time() * 1000)) + +# Start a session to maintain cookies +session = requests.Session() + +# Function to generate current timestamp in milliseconds +def generate_timestamp(): + return str(int(time.time() * 1000)) + +def login_and_save_tokens(): + login_url = 'http://192.168.1.254/status.cgi?service=login_confirm' + # Extract cookies from the root request + sid_cookie = session.cookies.get('SID') + test_cookie = session.cookies.get('_TESTCOOKIESUPPORT') + xsrf_token = session.cookies.get('XSRF-TOKEN') + + # Log initial cookies (optional for debugging) + #print(f"Initial SID: {sid_cookie}, XSRF-TOKEN: {xsrf_token}") + + # Set headers and login data + headers = { + 'Accept': 'application/json, text/plain, */*', + 'Accept-Encoding': 'gzip, deflate', + 'Accept-Language': 'en-US,en;q=0.5', + 'Connection': 'keep-alive', + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', + 'Origin': 'http://192.168.1.254', + 'Referer': 'http://192.168.1.254/', + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', + 'X-XSRF-TOKEN': xsrf_token + } + cookies = { + 'SID': sid_cookie, + '_TESTCOOKIESUPPORT': test_cookie, + 'XSRF-TOKEN': xsrf_token + } + login_data = { + 'cmd': '3', + 'username': 'admin', # Replace with actual username + 'password': '6453', # Replace with actual password + 'remember_me': '1', + 'act': 'nvset', + 'service': 'login_confirm', + 'sessionKey': 'NULL', + '_': generate_timestamp() + } + + # Step 2: Perform login request + response = session.post(login_url, headers=headers, cookies=cookies, data=login_data) + response_data = response.json() + + # Print response data for debugging + #print(response_data) + + # Check if login was successful + if response_data.get('login_confirm', {}).get('check_pwd') == '1': + # Step 3: After login, extract new cookies for XSRF-TOKEN and SID + new_sid_cookie = session.cookies.get('SID') + new_xsrf_token = session.cookies.get('XSRF-TOKEN') + + # Save the new tokens to a file (cookies.json) + cookies_data = { + 'SID': new_sid_cookie, + 'XSRF-TOKEN': new_xsrf_token } - return result -def discover_fastgate_device_list(section): - for s in section.keys(): - yield Service(item=s) + with open(cookies_file, 'w') as json_file: + json.dump(cookies_data, json_file) -def check_fastgate_device_list(item, section): - try: - if item in section: - mac = section[item]["mac"] - ip = section[item]["ip"] +def get_port_mapping(): + while True: + with open(cookies_file, 'r') as json_file: + cookies_data = json.load(json_file) - yield Result(state=State.OK, summary=f"Device connected [{mac}] - IP Address [{ip}]") - - except Exception as e: - print(e) - -register.agent_section( - name="device_list", - parse_function=parse_fastgate_device_list, -) - -register.check_plugin( - name="device_list", - service_name="Device %s", - sections=["device_list"], - discovery_function=discover_fastgate_device_list, - check_function=check_fastgate_device_list -) + # Access individual cookies + sid_cookie = cookies_data.get('SID') + test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') + xsrf_token = cookies_data.get('XSRF-TOKEN') -######### -# LINKS # -######### - -def parse_fastgate_links(string_table): - result = {} - - for line in string_table: - parts = line - eth, state, speed = parts - result [eth] = { - "state": int(state), - "speed": int(speed) + url = "http://192.168.1.254/status.cgi" + params = { + "_": generate_timestamp(), + "nvget": "virtual_server_list", + "sessionKey": sid_cookie } - return result - -def discover_fastgate_links(section): - for s in section.keys(): - yield Service(item=s) - - -def check_fastgate_links(item, section): - try: - if item in section: - state = section[item]["state"] - speed = section[item]["speed"] - - if state == 1: - yield Result(state=State.OK, summary=f"Link up - {speed}Mbit/s") - else: - yield Result(state=State.OK, summary=f"Disconnected") - - except Exception as e: - print(e) - -register.agent_section( - name="links", - parse_function=parse_fastgate_links, -) - -register.check_plugin( - name="links", - service_name="Ethernet %s", - sections=["links"], - discovery_function=discover_fastgate_links, - check_function=check_fastgate_links -) - - -################ -# PORT MAPPING # -################ - -def parse_fastgate_port_mapping(string_table): - result = {} - - for line in string_table: - parts = line - name, state, start, end, ip, prot = parts - result [name] = { - "state": int(state), - "start": int(start), - "end": int(end), - "ip": str(ip), - "prot": str(prot) + headers = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "gzip, deflate", + "X-XSRF-TOKEN": xsrf_token, + "Sec-GPC": "1", + "Connection": "keep-alive", + "Referer": "http://192.168.1.254/" + } + cookies = { + "SID": sid_cookie, + "_TESTCOOKIESUPPORT": test_cookie, + "XSRF-TOKEN": xsrf_token } - return result -def discover_fastgate_port_mapping(section): - for s in section.keys(): - yield Service(item=s) + response = requests.get(url, headers=headers, params=params, cookies=cookies) + if response.json().get('login_confirm', {}).get('login_status') == '0': + login_and_save_tokens() + else: + return response.json() + + +def get_device_list(): + while True: + with open(cookies_file, 'r') as json_file: + cookies_data = json.load(json_file) + + # Access individual cookies + sid_cookie = cookies_data.get('SID') + test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') + xsrf_token = cookies_data.get('XSRF-TOKEN') + + # URL for the request + url = 'http://192.168.1.254/status.cgi' + + # Query parameters + params = { + '_': generate_timestamp(), + 'nvget': 'connected_device_list', + 'sessionKey': sid_cookie + } + + # Headers + headers = { + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', + 'Accept': 'application/json, text/plain, */*', + 'Accept-Language': 'en-US,en;q=0.5', + 'Accept-Encoding': 'gzip, deflate', + 'X-XSRF-TOKEN': xsrf_token, + 'Sec-GPC': '1', + 'Connection': 'keep-alive', + 'Referer': 'http://192.168.1.254/' + } + + # Cookies + cookies = { + 'SID': sid_cookie, + '_TESTCOOKIESUPPORT': test_cookie, + 'XSRF-TOKEN': xsrf_token + } + + # Making the GET request + response = requests.get(url, headers=headers, params=params, cookies=cookies) + if response.json().get('login_confirm', {}).get('login_status') == '0': + login_and_save_tokens() + else: + return response.json() + +def get_links(): + while True: + with open(cookies_file, 'r') as json_file: + cookies_data = json.load(json_file) + + # Access individual cookies + sid_cookie = cookies_data.get('SID') + test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') + xsrf_token = cookies_data.get('XSRF-TOKEN') + + # URL for the request + url = 'http://192.168.1.254/status.cgi' + + # Query parameters + params = { + '_': generate_timestamp(), + 'nvget': 'diagnostic', + 'sessionKey': sid_cookie + } + + # Headers + headers = { + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', + 'Accept': 'application/json, text/plain, */*', + 'Accept-Language': 'en-US,en;q=0.5', + 'Accept-Encoding': 'gzip, deflate', + 'X-XSRF-TOKEN': xsrf_token, + 'Sec-GPC': '1', + 'Connection': 'keep-alive', + 'Referer': 'http://192.168.1.254/' + } + + # Cookies + cookies = { + 'SID': sid_cookie, + '_TESTCOOKIESUPPORT': test_cookie, + 'XSRF-TOKEN': xsrf_token + } + + # Making the GET request + response = requests.get(url, headers=headers, params=params, cookies=cookies) + if response.json().get('login_confirm', {}).get('login_status') == '0': + login_and_save_tokens() + else: + return response.json() -def check_fastgate_port_mapping(item, section): - try: - if item in section: - state = section[item]["state"] - start = section[item]["start"] - end = section[item]["end"] - ip = section[item]["ip"] - protocol = section[item]["prot"] +port_mappings = get_port_mapping() +device_list = get_device_list() +links = get_links() - if state == 1: - yield Result(state=State.OK, summary=f"{protocol}: {start}:{end} - ip: {ip}") - else: - yield Result(state=State.OK, summary=f"[DISABLED] {protocol}: {start}:{end} - ip: {ip}") - except Exception as e: - print(e) -register.agent_section( - name="port_mapping", - parse_function=parse_fastgate_port_mapping, -) +def port_mapping(): + c = 0 + print("<<>>") + while True: + try: + descr = port_mappings["virtual_server_list"][f"svc_{c}_descr"] + enabled = port_mappings["virtual_server_list"][f"svc_{c}_enabled"] + ext_port_start = port_mappings["virtual_server_list"][f"svc_{c}_ext_port_start"] + ext_port_end = port_mappings["virtual_server_list"][f"svc_{c}_ext_port_end"] + hostname = port_mappings["virtual_server_list"][f"svc_{c}_hostname"] + ip = port_mappings["virtual_server_list"][f"svc_{c}_ip"] + proto = port_mappings["virtual_server_list"][f"svc_{c}_proto"] -register.check_plugin( - name="port_mapping", - service_name="Port Mapping - %s", - sections=["port_mapping"], - discovery_function=discover_fastgate_port_mapping, - check_function=check_fastgate_port_mapping -) \ No newline at end of file + print(f"{descr.replace(' ', '-')}\t{enabled}\t{ext_port_start}\t{ext_port_end}\t{ip}\t{proto}") + + c += 1 + except Exception as e: + break + +def devices_list(): + c = 0 + print("<<>>") + while True: + try: + ip = device_list["connected_device_list"][f"dev_{c}_ip"] + mac = device_list["connected_device_list"][f"dev_{c}_mac"] + name = device_list["connected_device_list"][f"dev_{c}_name"] + print(mac, "\t", name, "\t", ip) + c += 1 + except: + break + +def link(): + c = 1 + print("<<>>") + while True: + try: + eth = links["diagnostic"][f"eth{c}_link"] + eth_speed = links["diagnostic"][f"eth{c}_media_type"] + print(c, "\t", eth, "\t", eth_speed) + c += 1 + except: + break + +devices_list() +port_mapping() +link() \ No newline at end of file