From 2dfcda7b2d6e131db9c0b90070f2eebb316cdd36 Mon Sep 17 00:00:00 2001 From: nicole Date: Wed, 13 Nov 2024 17:32:07 +0000 Subject: [PATCH] update --- fastgate.py | 377 ++++++++++++++++++---------------------------------- 1 file changed, 131 insertions(+), 246 deletions(-) diff --git a/fastgate.py b/fastgate.py index d8aa22f..6e040c4 100644 --- a/fastgate.py +++ b/fastgate.py @@ -1,265 +1,150 @@ -import requests -import time -import json -import os +from .agent_based_api.v1 import register, Result, Service, State -thisfolder = os.path.dirname(os.path.abspath(__file__)) -cookies_file = os.path.join(thisfolder, "cookies.json") +############### +# DEVICE LIST # +############### -# Generate timestamp -timestamp = str(int(time.time() * 1000)) +def parse_fastgate_device_list(string_table): + result = {} -# Start a session to maintain cookies -session = requests.Session() - -# Function to generate current timestamp in milliseconds -def generate_timestamp(): - return str(int(time.time() * 1000)) - -def login_and_save_tokens(): - login_url = 'http://192.168.1.254/status.cgi?service=login_confirm' - # Extract cookies from the root request - sid_cookie = session.cookies.get('SID') - test_cookie = session.cookies.get('_TESTCOOKIESUPPORT') - xsrf_token = session.cookies.get('XSRF-TOKEN') - - # Log initial cookies (optional for debugging) - #print(f"Initial SID: {sid_cookie}, XSRF-TOKEN: {xsrf_token}") - - # Set headers and login data - headers = { - 'Accept': 'application/json, text/plain, */*', - 'Accept-Encoding': 'gzip, deflate', - 'Accept-Language': 'en-US,en;q=0.5', - 'Connection': 'keep-alive', - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - 'Origin': 'http://192.168.1.254', - 'Referer': 'http://192.168.1.254/', - 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', - 'X-XSRF-TOKEN': xsrf_token - } - cookies = { - 'SID': sid_cookie, - '_TESTCOOKIESUPPORT': test_cookie, - 'XSRF-TOKEN': xsrf_token - } - login_data = { - 'cmd': '3', - 'username': 'admin', # Replace with actual username - 'password': '6453', # Replace with actual password - 'remember_me': '1', - 'act': 'nvset', - 'service': 'login_confirm', - 'sessionKey': 'NULL', - '_': generate_timestamp() - } - - # Step 2: Perform login request - response = session.post(login_url, headers=headers, cookies=cookies, data=login_data) - response_data = response.json() - - # Print response data for debugging - #print(response_data) - - # Check if login was successful - if response_data.get('login_confirm', {}).get('check_pwd') == '1': - # Step 3: After login, extract new cookies for XSRF-TOKEN and SID - new_sid_cookie = session.cookies.get('SID') - new_xsrf_token = session.cookies.get('XSRF-TOKEN') - - # Save the new tokens to a file (cookies.json) - cookies_data = { - 'SID': new_sid_cookie, - 'XSRF-TOKEN': new_xsrf_token + for line in string_table: + parts = line + mac, hostname, ip = parts + result[hostname] = { + "mac": str(mac), + "ip": str(ip) } + return result - with open(cookies_file, 'w') as json_file: - json.dump(cookies_data, json_file) +def discover_fastgate_device_list(section): + for s in section.keys(): + yield Service(item=s) -def get_port_mapping(): - while True: - with open(cookies_file, 'r') as json_file: - cookies_data = json.load(json_file) +def check_fastgate_device_list(item, section): + try: + if item in section: + mac = section[item]["mac"] + ip = section[item]["ip"] - # Access individual cookies - sid_cookie = cookies_data.get('SID') - test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') - xsrf_token = cookies_data.get('XSRF-TOKEN') + yield Result(state=State.OK, summary=f"Device connected [{mac}] - IP Address [{ip}]") + + except Exception as e: + print(e) + +register.agent_section( + name="device_list", + parse_function=parse_fastgate_device_list, +) + +register.check_plugin( + name="device_list", + service_name="Device %s", + sections=["device_list"], + discovery_function=discover_fastgate_device_list, + check_function=check_fastgate_device_list +) - url = "http://192.168.1.254/status.cgi" - params = { - "_": generate_timestamp(), - "nvget": "virtual_server_list", - "sessionKey": sid_cookie +######### +# LINKS # +######### + +def parse_fastgate_links(string_table): + result = {} + + for line in string_table: + parts = line + eth, state, speed = parts + result [eth] = { + "state": int(state), + "speed": int(speed) } - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0", - "Accept": "application/json, text/plain, */*", - "Accept-Language": "en-US,en;q=0.5", - "Accept-Encoding": "gzip, deflate", - "X-XSRF-TOKEN": xsrf_token, - "Sec-GPC": "1", - "Connection": "keep-alive", - "Referer": "http://192.168.1.254/" - } - cookies = { - "SID": sid_cookie, - "_TESTCOOKIESUPPORT": test_cookie, - "XSRF-TOKEN": xsrf_token + return result + +def discover_fastgate_links(section): + for s in section.keys(): + yield Service(item=s) + + +def check_fastgate_links(item, section): + try: + if item in section: + state = section[item]["state"] + speed = section[item]["speed"] + + if state == 1: + yield Result(state=State.OK, summary=f"Link up - {speed}Mbit/s") + else: + yield Result(state=State.OK, summary=f"Disconnected") + + except Exception as e: + print(e) + +register.agent_section( + name="links", + parse_function=parse_fastgate_links, +) + +register.check_plugin( + name="links", + service_name="Ethernet %s", + sections=["links"], + discovery_function=discover_fastgate_links, + check_function=check_fastgate_links +) + + +################ +# PORT MAPPING # +################ + +def parse_fastgate_port_mapping(string_table): + result = {} + + for line in string_table: + parts = line + name, state, start, end, ip, prot = parts + result [name] = { + "state": int(state), + "start": int(start), + "end": int(end), + "ip": str(ip), + "prot": str(prot) } + return result - response = requests.get(url, headers=headers, params=params, cookies=cookies) - if response.json().get('login_confirm', {}).get('login_status') == '0': - login_and_save_tokens() - else: - return response.json() - - -def get_device_list(): - while True: - with open(cookies_file, 'r') as json_file: - cookies_data = json.load(json_file) - - # Access individual cookies - sid_cookie = cookies_data.get('SID') - test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') - xsrf_token = cookies_data.get('XSRF-TOKEN') - - # URL for the request - url = 'http://192.168.1.254/status.cgi' - - # Query parameters - params = { - '_': generate_timestamp(), - 'nvget': 'connected_device_list', - 'sessionKey': sid_cookie - } - - # Headers - headers = { - 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', - 'Accept': 'application/json, text/plain, */*', - 'Accept-Language': 'en-US,en;q=0.5', - 'Accept-Encoding': 'gzip, deflate', - 'X-XSRF-TOKEN': xsrf_token, - 'Sec-GPC': '1', - 'Connection': 'keep-alive', - 'Referer': 'http://192.168.1.254/' - } - - # Cookies - cookies = { - 'SID': sid_cookie, - '_TESTCOOKIESUPPORT': test_cookie, - 'XSRF-TOKEN': xsrf_token - } - - # Making the GET request - response = requests.get(url, headers=headers, params=params, cookies=cookies) - if response.json().get('login_confirm', {}).get('login_status') == '0': - login_and_save_tokens() - else: - return response.json() - -def get_links(): - while True: - with open(cookies_file, 'r') as json_file: - cookies_data = json.load(json_file) - - # Access individual cookies - sid_cookie = cookies_data.get('SID') - test_cookie = cookies_data.get('_TESTCOOKIESUPPORT') - xsrf_token = cookies_data.get('XSRF-TOKEN') - - # URL for the request - url = 'http://192.168.1.254/status.cgi' - - # Query parameters - params = { - '_': generate_timestamp(), - 'nvget': 'diagnostic', - 'sessionKey': sid_cookie - } - - # Headers - headers = { - 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0', - 'Accept': 'application/json, text/plain, */*', - 'Accept-Language': 'en-US,en;q=0.5', - 'Accept-Encoding': 'gzip, deflate', - 'X-XSRF-TOKEN': xsrf_token, - 'Sec-GPC': '1', - 'Connection': 'keep-alive', - 'Referer': 'http://192.168.1.254/' - } - - # Cookies - cookies = { - 'SID': sid_cookie, - '_TESTCOOKIESUPPORT': test_cookie, - 'XSRF-TOKEN': xsrf_token - } - - # Making the GET request - response = requests.get(url, headers=headers, params=params, cookies=cookies) - if response.json().get('login_confirm', {}).get('login_status') == '0': - login_and_save_tokens() - else: - return response.json() +def discover_fastgate_port_mapping(section): + for s in section.keys(): + yield Service(item=s) -port_mappings = get_port_mapping() -device_list = get_device_list() -links = get_links() +def check_fastgate_port_mapping(item, section): + try: + if item in section: + state = section[item]["state"] + start = section[item]["start"] + end = section[item]["end"] + ip = section[item]["ip"] + protocol = section[item]["prot"] + if state == 1: + yield Result(state=State.OK, summary=f"{protocol}: {start}:{end} - ip: {ip}") + else: + yield Result(state=State.OK, summary=f"[DISABLED] {protocol}: {start}:{end} - ip: {ip}") + except Exception as e: + print(e) -def port_mapping(): - c = 0 - print("<<>>") - while True: - try: - descr = port_mappings["virtual_server_list"][f"svc_{c}_descr"] - enabled = port_mappings["virtual_server_list"][f"svc_{c}_enabled"] - ext_port_start = port_mappings["virtual_server_list"][f"svc_{c}_ext_port_start"] - ext_port_end = port_mappings["virtual_server_list"][f"svc_{c}_ext_port_end"] - hostname = port_mappings["virtual_server_list"][f"svc_{c}_hostname"] - ip = port_mappings["virtual_server_list"][f"svc_{c}_ip"] - proto = port_mappings["virtual_server_list"][f"svc_{c}_proto"] +register.agent_section( + name="port_mapping", + parse_function=parse_fastgate_port_mapping, +) - print(f"{descr}\t{enabled}\t{ext_port_start}\t{ext_port_end}\t{hostname}\t{ip}\t{proto}") - - c += 1 - except Exception as e: - break - -def devices_list(): - c = 0 - print("<<>>") - while True: - try: - ip = device_list["connected_device_list"][f"dev_{c}_ip"] - mac = device_list["connected_device_list"][f"dev_{c}_mac"] - name = device_list["connected_device_list"][f"dev_{c}_name"] - print(mac, "\t", name, "\t", ip) - c += 1 - except: - break - -def link(): - c = 1 - print("<<>>") - while True: - try: - eth = links["diagnostic"][f"eth{c}_link"] - eth_speed = links["diagnostic"][f"eth{c}_media_type"] - print(c, "\t", eth, "\t", eth_speed) - c += 1 - except: - break - -devices_list() -port_mapping() -link() \ No newline at end of file +register.check_plugin( + name="port_mapping", + service_name="Port Mapping - %s", + sections=["port_mapping"], + discovery_function=discover_fastgate_port_mapping, + check_function=check_fastgate_port_mapping +) \ No newline at end of file