From 732896af73f7b1eec716277846aa278f3894ac9e Mon Sep 17 00:00:00 2001 From: nicole Date: Wed, 13 Nov 2024 17:31:19 +0000 Subject: [PATCH] First commit --- fastgate.py | 265 ++++++++++++++++++++++++++++++++++++++++++++++ fastgate_check.py | 151 ++++++++++++++++++++++++++ 2 files changed, 416 insertions(+) create mode 100644 fastgate.py create mode 100644 fastgate_check.py diff --git a/fastgate.py b/fastgate.py new file mode 100644 index 0000000..d8aa22f --- /dev/null +++ b/fastgate.py @@ -0,0 +1,265 @@ +import requests +import time +import json +import os + +thisfolder = os.path.dirname(os.path.abspath(__file__)) +cookies_file = os.path.join(thisfolder, "cookies.json") + +# Generate timestamp +timestamp = str(int(time.time() * 1000)) + +# Start a session to maintain cookies +session = requests.Session() + +# Function to generate current timestamp in milliseconds +def generate_timestamp(): + return str(int(time.time() * 1000)) + +def login_and_save_tokens(): + login_url = 'http://192.168.1.254/status.cgi?service=login_confirm' + # Extract cookies from the root request + sid_cookie = session.cookies.get('SID') + test_cookie = session.cookies.get('_TESTCOOKIESUPPORT') + xsrf_token = session.cookies.get('XSRF-TOKEN') + + # Log initial cookies (optional for debugging) + #print(f"Initial SID: {sid_cookie}, XSRF-TOKEN: {xsrf_token}") + + # Set headers and login data + headers = { + 'Accept': 'application/json, text/plain, */*', + 'Accept-Encoding': 'gzip, deflate', + 'Accept-Language': 'en-US,en;q=0.5', + 'Connection': 'keep-alive', + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', + 'Origin': 'http://192.168.1.254', + 'Referer': 'http://192.168.1.254/', + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', + 'X-XSRF-TOKEN': xsrf_token + } + cookies = { + 'SID': sid_cookie, + '_TESTCOOKIESUPPORT': test_cookie, + 'XSRF-TOKEN': xsrf_token + } + login_data = { + 'cmd': '3', + 'username': 'admin', # Replace with actual username + 'password': '6453', # Replace with actual password + 'remember_me': '1', + 'act': 'nvset', + 'service': 'login_confirm', + 'sessionKey': 'NULL', + '_': generate_timestamp() + } + + # Step 2: Perform login request + response = session.post(login_url, headers=headers, cookies=cookies, data=login_data) + response_data = response.json() + + # Print response data for debugging + #print(response_data) + + # Check if login was successful + if response_data.get('login_confirm', {}).get('check_pwd') == '1': + # Step 3: After login, extract new cookies for XSRF-TOKEN and SID + new_sid_cookie = session.cookies.get('SID') + new_xsrf_token = session.cookies.get('XSRF-TOKEN') + + # Save the new tokens to a file (cookies.json) + cookies_data = { + 'SID': new_sid_cookie, + 'XSRF-TOKEN': new_xsrf_token + } + + with open(cookies_file, 'w') as json_file: + json.dump(cookies_data, json_file) + + +def get_port_mapping(): + while True: + with open(cookies_file, 'r') as json_file: + cookies_data = json.load(json_file) + + # Access individual cookies + sid_cookie = cookies_data.get('SID') + test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') + xsrf_token = cookies_data.get('XSRF-TOKEN') + + + url = "http://192.168.1.254/status.cgi" + params = { + "_": generate_timestamp(), + "nvget": "virtual_server_list", + "sessionKey": sid_cookie + } + headers = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "gzip, deflate", + "X-XSRF-TOKEN": xsrf_token, + "Sec-GPC": "1", + "Connection": "keep-alive", + "Referer": "http://192.168.1.254/" + } + cookies = { + "SID": sid_cookie, + "_TESTCOOKIESUPPORT": test_cookie, + "XSRF-TOKEN": xsrf_token + } + + response = requests.get(url, headers=headers, params=params, cookies=cookies) + if response.json().get('login_confirm', {}).get('login_status') == '0': + login_and_save_tokens() + else: + return response.json() + + +def get_device_list(): + while True: + with open(cookies_file, 'r') as json_file: + cookies_data = json.load(json_file) + + # Access individual cookies + sid_cookie = cookies_data.get('SID') + test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') + xsrf_token = cookies_data.get('XSRF-TOKEN') + + # URL for the request + url = 'http://192.168.1.254/status.cgi' + + # Query parameters + params = { + '_': generate_timestamp(), + 'nvget': 'connected_device_list', + 'sessionKey': sid_cookie + } + + # Headers + headers = { + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', + 'Accept': 'application/json, text/plain, */*', + 'Accept-Language': 'en-US,en;q=0.5', + 'Accept-Encoding': 'gzip, deflate', + 'X-XSRF-TOKEN': xsrf_token, + 'Sec-GPC': '1', + 'Connection': 'keep-alive', + 'Referer': 'http://192.168.1.254/' + } + + # Cookies + cookies = { + 'SID': sid_cookie, + '_TESTCOOKIESUPPORT': test_cookie, + 'XSRF-TOKEN': xsrf_token + } + + # Making the GET request + response = requests.get(url, headers=headers, params=params, cookies=cookies) + if response.json().get('login_confirm', {}).get('login_status') == '0': + login_and_save_tokens() + else: + return response.json() + +def get_links(): + while True: + with open(cookies_file, 'r') as json_file: + cookies_data = json.load(json_file) + + # Access individual cookies + sid_cookie = cookies_data.get('SID') + test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') + xsrf_token = cookies_data.get('XSRF-TOKEN') + + # URL for the request + url = 'http://192.168.1.254/status.cgi' + + # Query parameters + params = { + '_': generate_timestamp(), + 'nvget': 'diagnostic', + 'sessionKey': sid_cookie + } + + # Headers + headers = { + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', + 'Accept': 'application/json, text/plain, */*', + 'Accept-Language': 'en-US,en;q=0.5', + 'Accept-Encoding': 'gzip, deflate', + 'X-XSRF-TOKEN': xsrf_token, + 'Sec-GPC': '1', + 'Connection': 'keep-alive', + 'Referer': 'http://192.168.1.254/' + } + + # Cookies + cookies = { + 'SID': sid_cookie, + '_TESTCOOKIESUPPORT': test_cookie, + 'XSRF-TOKEN': xsrf_token + } + + # Making the GET request + response = requests.get(url, headers=headers, params=params, cookies=cookies) + if response.json().get('login_confirm', {}).get('login_status') == '0': + login_and_save_tokens() + else: + return response.json() + + +port_mappings = get_port_mapping() +device_list = get_device_list() +links = get_links() + + + +def port_mapping(): + c = 0 + print("<<>>") + while True: + try: + descr = port_mappings["virtual_server_list"][f"svc_{c}_descr"] + enabled = port_mappings["virtual_server_list"][f"svc_{c}_enabled"] + ext_port_start = port_mappings["virtual_server_list"][f"svc_{c}_ext_port_start"] + ext_port_end = port_mappings["virtual_server_list"][f"svc_{c}_ext_port_end"] + hostname = port_mappings["virtual_server_list"][f"svc_{c}_hostname"] + ip = port_mappings["virtual_server_list"][f"svc_{c}_ip"] + proto = port_mappings["virtual_server_list"][f"svc_{c}_proto"] + + print(f"{descr}\t{enabled}\t{ext_port_start}\t{ext_port_end}\t{hostname}\t{ip}\t{proto}") + + c += 1 + except Exception as e: + break + +def devices_list(): + c = 0 + print("<<>>") + while True: + try: + ip = device_list["connected_device_list"][f"dev_{c}_ip"] + mac = device_list["connected_device_list"][f"dev_{c}_mac"] + name = device_list["connected_device_list"][f"dev_{c}_name"] + print(mac, "\t", name, "\t", ip) + c += 1 + except: + break + +def link(): + c = 1 + print("<<>>") + while True: + try: + eth = links["diagnostic"][f"eth{c}_link"] + eth_speed = links["diagnostic"][f"eth{c}_media_type"] + print(c, "\t", eth, "\t", eth_speed) + c += 1 + except: + break + +devices_list() +port_mapping() +link() \ No newline at end of file diff --git a/fastgate_check.py b/fastgate_check.py new file mode 100644 index 0000000..86e7df1 --- /dev/null +++ b/fastgate_check.py @@ -0,0 +1,151 @@ +from .agent_based_api.v1 import register, Result, Service, State + + +############### +# DEVICE LIST # +############### +def parse_fastgate_device_list(string_table): + result = {} + + for line in string_table: + parts = line + mac, hostname, ip = parts + result[hostname] = { + "mac": str(mac), + "ip": str(ip) + } + return result + +def discover_fastgate_device_list(section): + for s in section.keys(): + yield Service(item=s) + + +def check_fastgate_device_list(item, section): + try: + if item in section: + mac = section[item]["mac"] + ip = section[item]["ip"] + + yield Result(state=State.OK, summary=f"Device connected [{mac}] - IP Address [{ip}]") + + except Exception as e: + print(e) + +register.agent_section( + name="device_list", + parse_function=parse_fastgate_device_list, +) + +register.check_plugin( + name="device_list", + service_name="Device %s", + sections=["device_list"], + discovery_function=discover_fastgate_device_list, + check_function=check_fastgate_device_list +) + + +######### +# LINKS # +######### + +def parse_fastgate_links(string_table): + result = {} + + for line in string_table: + parts = line + eth, state, speed = parts + result [eth] = { + "state": int(state), + "speed": int(speed) + } + return result + +def discover_fastgate_links(section): + for s in section.keys(): + yield Service(item=s) + + +def check_fastgate_links(item, section): + try: + if item in section: + state = section[item]["state"] + speed = section[item]["speed"] + + if state == 1: + yield Result(state=State.OK, summary=f"Link up - {speed}Mbit/s") + else: + yield Result(state=State.OK, summary=f"Disconnected") + + except Exception as e: + print(e) + +register.agent_section( + name="links", + parse_function=parse_fastgate_links, +) + +register.check_plugin( + name="links", + service_name="Ethernet %s", + sections=["links"], + discovery_function=discover_fastgate_links, + check_function=check_fastgate_links +) + + +################ +# PORT MAPPING # +################ + +def parse_fastgate_port_mapping(string_table): + result = {} + + for line in string_table: + parts = line + name, state, start, end, device, ip, prot = parts + result [name] = { + "state": int(state), + "start": int(start), + "end": int(end), + "device": str(device), + "ip": str(ip), + "prot": str(prot) + + } + return result + +def discover_fastgate_port_mapping(section): + for s in section.keys(): + yield Service(item=s) + + +def check_fastgate_port_mapping(item, section): + try: + if item in section: + state = section[item]["state"] + start = section[item]["start"] + end = section[item]["end"] + device = section[item]["device"] + ip = section[item]["ip"] + protocol = section[item]["prot"] + + if state == 1: + yield Result(state=State.OK, summary=f"{protocol}: {start}:{end} - ip: {ip}({device})") + + except Exception as e: + print(e) + +register.agent_section( + name="port_mapping", + parse_function=parse_fastgate_port_mapping, +) + +register.check_plugin( + name="port_mapping", + service_name="Port Mapping - %s", + sections=["port_mapping"], + discovery_function=discover_fastgate_port_mapping, + check_function=check_fastgate_port_mapping +) \ No newline at end of file