fastgate_checkmk_agent/fastgate.py
2024-11-16 21:34:15 +00:00

280 lines
No EOL
8.7 KiB
Python

#!/omd/sites/cmk/bin/python3
import requests
import time
import json
import os
import argparse
# Initialize argument parser
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--user")
parser.add_argument("-p", "--password")
# Parse the argumentw
args = parser.parse_args()
user = args.user
password = args.password
# Initialize current folder
thisfolder = os.path.dirname(os.path.abspath(__file__))
cookies_file = os.path.join(thisfolder, "cookies.json")
# Generate timestamp
timestamp = str(int(time.time() * 1000))
# Start a session to maintain cookies
session = requests.Session()
# Function to generate current timestamp in milliseconds
def generate_timestamp():
return str(int(time.time() * 1000))
def login_and_save_tokens():
login_url = 'http://192.168.1.254/status.cgi?service=login_confirm'
# Extract cookies from the root request
sid_cookie = session.cookies.get('SID')
test_cookie = session.cookies.get('_TESTCOOKIESUPPORT')
xsrf_token = session.cookies.get('XSRF-TOKEN')
# Log initial cookies (optional for debugging)
#print(f"Initial SID: {sid_cookie}, XSRF-TOKEN: {xsrf_token}")
# Set headers and login data
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Origin': 'http://192.168.1.254',
'Referer': 'http://192.168.1.254/',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0',
'X-XSRF-TOKEN': xsrf_token
}
cookies = {
'SID': sid_cookie,
'_TESTCOOKIESUPPORT': test_cookie,
'XSRF-TOKEN': xsrf_token
}
login_data = {
'cmd': '3',
'username': user,
'password': password,
'remember_me': '1',
'act': 'nvset',
'service': 'login_confirm',
'sessionKey': 'NULL',
'_': generate_timestamp()
}
# Step 2: Perform login request
response = session.post(login_url, headers=headers, cookies=cookies, data=login_data)
response_data = response.json()
# Print response data for debugging
#print(response_data)
# Check if login was successful
if response_data.get('login_confirm', {}).get('check_pwd') == '1':
# Step 3: After login, extract new cookies for XSRF-TOKEN and SID
new_sid_cookie = session.cookies.get('SID')
new_xsrf_token = session.cookies.get('XSRF-TOKEN')
# Save the new tokens to a file (cookies.json)
cookies_data = {
'SID': new_sid_cookie,
'XSRF-TOKEN': new_xsrf_token
}
with open(cookies_file, 'w') as json_file:
json.dump(cookies_data, json_file)
def get_port_mapping():
while True:
with open(cookies_file, 'r') as json_file:
cookies_data = json.load(json_file)
# Access individual cookies
sid_cookie = cookies_data.get('SID')
test_cookie = cookies_data.get('_TESTCOOKIESUPPORT')
xsrf_token = cookies_data.get('XSRF-TOKEN')
url = "http://192.168.1.254/status.cgi"
params = {
"_": generate_timestamp(),
"nvget": "virtual_server_list",
"sessionKey": sid_cookie
}
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"X-XSRF-TOKEN": xsrf_token,
"Sec-GPC": "1",
"Connection": "keep-alive",
"Referer": "http://192.168.1.254/"
}
cookies = {
"SID": sid_cookie,
"_TESTCOOKIESUPPORT": test_cookie,
"XSRF-TOKEN": xsrf_token
}
response = requests.get(url, headers=headers, params=params, cookies=cookies)
if response.json().get('login_confirm', {}).get('login_status') == '0':
login_and_save_tokens()
else:
return response.json()
def get_device_list():
while True:
with open(cookies_file, 'r') as json_file:
cookies_data = json.load(json_file)
# Access individual cookies
sid_cookie = cookies_data.get('SID')
test_cookie = cookies_data.get('_TESTCOOKIESUPPORT')
xsrf_token = cookies_data.get('XSRF-TOKEN')
# URL for the request
url = 'http://192.168.1.254/status.cgi'
# Query parameters
params = {
'_': generate_timestamp(),
'nvget': 'connected_device_list',
'sessionKey': sid_cookie
}
# Headers
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'X-XSRF-TOKEN': xsrf_token,
'Sec-GPC': '1',
'Connection': 'keep-alive',
'Referer': 'http://192.168.1.254/'
}
# Cookies
cookies = {
'SID': sid_cookie,
'_TESTCOOKIESUPPORT': test_cookie,
'XSRF-TOKEN': xsrf_token
}
# Making the GET request
response = requests.get(url, headers=headers, params=params, cookies=cookies)
if response.json().get('login_confirm', {}).get('login_status') == '0':
login_and_save_tokens()
else:
return response.json()
def get_links():
while True:
with open(cookies_file, 'r') as json_file:
cookies_data = json.load(json_file)
# Access individual cookies
sid_cookie = cookies_data.get('SID')
test_cookie = cookies_data.get('_TESTCOOKIESUPPORT')
xsrf_token = cookies_data.get('XSRF-TOKEN')
# URL for the request
url = 'http://192.168.1.254/status.cgi'
# Query parameters
params = {
'_': generate_timestamp(),
'nvget': 'diagnostic',
'sessionKey': sid_cookie
}
# Headers
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:132.0) Gecko/20100101 Firefox/132.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'X-XSRF-TOKEN': xsrf_token,
'Sec-GPC': '1',
'Connection': 'keep-alive',
'Referer': 'http://192.168.1.254/'
}
# Cookies
cookies = {
'SID': sid_cookie,
'_TESTCOOKIESUPPORT': test_cookie,
'XSRF-TOKEN': xsrf_token
}
# Making the GET request
response = requests.get(url, headers=headers, params=params, cookies=cookies)
if response.json().get('login_confirm', {}).get('login_status') == '0':
login_and_save_tokens()
else:
return response.json()
port_mappings = get_port_mapping()
device_list = get_device_list()
links = get_links()
def port_mapping():
c = 0
print("<<<port_mapping>>>")
while True:
try:
descr = port_mappings["virtual_server_list"][f"svc_{c}_descr"]
enabled = port_mappings["virtual_server_list"][f"svc_{c}_enabled"]
ext_port_start = port_mappings["virtual_server_list"][f"svc_{c}_ext_port_start"]
ext_port_end = port_mappings["virtual_server_list"][f"svc_{c}_ext_port_end"]
hostname = port_mappings["virtual_server_list"][f"svc_{c}_hostname"]
ip = port_mappings["virtual_server_list"][f"svc_{c}_ip"]
proto = port_mappings["virtual_server_list"][f"svc_{c}_proto"]
print(f"{descr.replace(' ', '-')}\t{enabled}\t{ext_port_start}\t{ext_port_end}\t{ip}\t{proto}")
c += 1
except Exception as e:
break
def devices_list():
c = 0
print("<<<device_list>>>")
while True:
try:
ip = device_list["connected_device_list"][f"dev_{c}_ip"]
mac = device_list["connected_device_list"][f"dev_{c}_mac"]
name = device_list["connected_device_list"][f"dev_{c}_name"]
print(mac, "\t", name, "\t", ip)
c += 1
except:
break
def link():
c = 1
print("<<<links>>>")
while True:
try:
eth = links["diagnostic"][f"eth{c}_link"]
eth_speed = links["diagnostic"][f"eth{c}_media_type"]
print(c, "\t", eth, "\t", eth_speed)
c += 1
except:
break
devices_list()
port_mapping()
link()