improvement
This commit is contained in:
116
scripts/diagnose_file_log.py
Normal file
116
scripts/diagnose_file_log.py
Normal file
@@ -0,0 +1,116 @@
|
||||
import re
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from requests.exceptions import HTTPError, Timeout, RequestException
|
||||
|
||||
# FHEM server URL base
|
||||
FHEM_URL_BASE = "https://fhem.auwiesen2.de/fhem"
|
||||
|
||||
# CSRF token (replace with your actual token)
|
||||
CSRF_TOKEN = "csrf_611440676390392"
|
||||
|
||||
# Headers including CSRF token
|
||||
HEADERS = {
|
||||
"X-FHEM-csrfToken": CSRF_TOKEN,
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Accept": "text/html"
|
||||
}
|
||||
|
||||
# Session to handle requests
|
||||
session = requests.Session()
|
||||
|
||||
def fetch_device_log(device_id, year):
|
||||
"""Fetch the log file for the specified device and year."""
|
||||
log_url = f"{FHEM_URL_BASE}/FileLog_logWrapper&dev=FileLog_{device_id}&type=text&file={device_id}-{year}.log"
|
||||
|
||||
try:
|
||||
response = session.get(log_url, headers=HEADERS, timeout=10)
|
||||
response.raise_for_status()
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
|
||||
# Extract the log data
|
||||
log_content = soup.find('pre', class_='log')
|
||||
if log_content:
|
||||
log_text = log_content.text.strip()
|
||||
# Only keep lines that start with the expected date format
|
||||
log_lines = [line for line in log_text.splitlines() if re.match(r'^\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2}', line)]
|
||||
return log_lines
|
||||
else:
|
||||
print(f"No log data found for device {device_id} in year {year}.")
|
||||
return None
|
||||
|
||||
except (HTTPError, Timeout) as err:
|
||||
print(f"Error fetching log for device {device_id}: {err}")
|
||||
return None
|
||||
except RequestException as req_err:
|
||||
print(f"An error occurred: {req_err}")
|
||||
return None
|
||||
|
||||
def analyze_log_format(log_lines):
|
||||
"""Analyze the log lines to propose a table structure."""
|
||||
if not log_lines:
|
||||
return None
|
||||
|
||||
parameter_patterns = {}
|
||||
timestamp_pattern = r"(\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})"
|
||||
device_pattern = r"(MA_[a-zA-Z0-9]+)"
|
||||
|
||||
for line in log_lines:
|
||||
match = re.match(fr"{timestamp_pattern} {device_pattern} (.*)", line)
|
||||
if match:
|
||||
timestamp = match.group(1)
|
||||
device_id = match.group(2)
|
||||
parameters = match.group(3).split()
|
||||
|
||||
for param in parameters:
|
||||
key_value = param.split(":")
|
||||
if len(key_value) == 2:
|
||||
key, value = key_value[0], key_value[1]
|
||||
if key not in parameter_patterns:
|
||||
if value.isdigit():
|
||||
parameter_patterns[key] = "INT"
|
||||
elif re.match(r"^\d+(\.\d+)?$", value):
|
||||
parameter_patterns[key] = "FLOAT"
|
||||
else:
|
||||
parameter_patterns[key] = "VARCHAR(255)"
|
||||
|
||||
# Propose table structure
|
||||
table_structure = {
|
||||
"device_id": "VARCHAR(255)",
|
||||
"timestamp": "DATETIME"
|
||||
}
|
||||
table_structure.update(parameter_patterns)
|
||||
|
||||
return table_structure
|
||||
|
||||
def print_table_structure(table_structure):
|
||||
"""Print the proposed table structure."""
|
||||
print("Proposed Table Structure:")
|
||||
print("CREATE TABLE device_logs (")
|
||||
for column, dtype in table_structure.items():
|
||||
print(f" {column} {dtype},")
|
||||
print(" PRIMARY KEY (device_id, timestamp)")
|
||||
print(");")
|
||||
|
||||
def main(device_id, year=2024):
|
||||
log_lines = fetch_device_log(device_id, year)
|
||||
|
||||
if log_lines:
|
||||
# Print the first 100 lines of the log
|
||||
print(f"First 100 lines of the log for device {device_id}:\n")
|
||||
for i, line in enumerate(log_lines[:100]):
|
||||
print(f"{i + 1}: {line}")
|
||||
|
||||
table_structure = analyze_log_format(log_lines)
|
||||
if table_structure:
|
||||
print("\nAnalyzing log format...")
|
||||
print_table_structure(table_structure)
|
||||
else:
|
||||
print("No valid log data found to analyze.")
|
||||
else:
|
||||
print("No log data retrieved.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example device ID to test with
|
||||
example_device_id = "MA_030e8b3e5bc3"
|
||||
main(example_device_id)
|
@@ -1,37 +1,143 @@
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
import os
|
||||
import re
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from requests.exceptions import HTTPError, Timeout, RequestException
|
||||
|
||||
# FHEM URL
|
||||
# FHEM_URL = "http://192.168.2.200:8083/fhem"
|
||||
FHEM_URL = "https://fhem.auwiesen2.de/fhem"
|
||||
# Import the get_token function from the utilities module
|
||||
from utilities.get_token import get_token
|
||||
|
||||
# Parameters including CSRF token
|
||||
PARAMS = {
|
||||
"cmd": "jsonlist2",
|
||||
"XHR": "1",
|
||||
"fwcsrf": "csrf_334814865250639" # CSRF token as a parameter
|
||||
}
|
||||
# Database connection details
|
||||
DB_HOST = 'mariadb'
|
||||
DB_USER = 'root'
|
||||
DB_PASSWORD = os.getenv('MYSQL_ROOT_PASSWORD', 'S3raph1n!')
|
||||
DB_NAME = 'fhem'
|
||||
TABLE_NAME = 'devices'
|
||||
|
||||
# Headers including CSRF token
|
||||
HEADERS = {
|
||||
"X-FHEM-csrfToken": "csrf_822558611144652",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
# FHEM server URL
|
||||
FHEM_URL_BASE = "https://fhem.auwiesen2.de/fhem"
|
||||
|
||||
# Send the request with both parameters and headers
|
||||
response = requests.get(FHEM_URL, params=PARAMS, headers=HEADERS)
|
||||
# Path to the log file created by the first script
|
||||
LOG_FILE_PATH = 'fhem_script.log'
|
||||
|
||||
# Debugging: Check the status code and response content
|
||||
print("Response Status Code:", response.status_code)
|
||||
print("Response Content:", response.text)
|
||||
# Session to handle requests
|
||||
session = requests.Session()
|
||||
|
||||
try:
|
||||
# Attempt to parse the response as JSON
|
||||
data = response.json()
|
||||
devices = data['Results']
|
||||
for device in devices:
|
||||
print(f"Device: {device['Name']}, State: {device['Internals']['STATE']}")
|
||||
except requests.exceptions.JSONDecodeError:
|
||||
print("Error: Failed to decode JSON response.")
|
||||
except KeyError:
|
||||
print("Error: Expected keys not found in the JSON data.")
|
||||
def connect_to_db():
|
||||
"""Establish a connection to the MySQL database."""
|
||||
connection = None
|
||||
try:
|
||||
connection = mysql.connector.connect(
|
||||
host=DB_HOST,
|
||||
user=DB_USER,
|
||||
password=DB_PASSWORD,
|
||||
charset='utf8mb4',
|
||||
collation='utf8mb4_unicode_ci'
|
||||
)
|
||||
if connection.is_connected():
|
||||
print(f"Connected to MySQL Server.")
|
||||
return connection
|
||||
except Error as e:
|
||||
print(f"Error while connecting to MySQL: {e}")
|
||||
return connection
|
||||
|
||||
def create_database(connection):
|
||||
"""Create the fhem database if it doesn't exist."""
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_NAME} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")
|
||||
print(f"Database '{DB_NAME}' created or already exists with utf8mb4_unicode_ci collation.")
|
||||
except Error as e:
|
||||
print(f"Error creating database: {e}")
|
||||
|
||||
def create_table(connection):
|
||||
"""Create the devices table in the fhem database."""
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(f"USE {DB_NAME};")
|
||||
create_table_query = f"""
|
||||
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
|
||||
Device_id VARCHAR(255) PRIMARY KEY,
|
||||
Room VARCHAR(255),
|
||||
alias VARCHAR(255) NOT NULL
|
||||
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
"""
|
||||
cursor.execute(create_table_query)
|
||||
print(f"Table '{TABLE_NAME}' created or already exists in database '{DB_NAME}'.")
|
||||
except Error as e:
|
||||
print(f"Error creating table: {e}")
|
||||
|
||||
def insert_device_data(connection, device_id, room, alias):
|
||||
"""Insert a device's data into the devices table."""
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
insert_query = f"""
|
||||
INSERT INTO {TABLE_NAME} (Device_id, Room, alias)
|
||||
VALUES (%s, %s, %s)
|
||||
ON DUPLICATE KEY UPDATE Room = VALUES(Room), alias = VALUES(alias);
|
||||
"""
|
||||
cursor.execute(insert_query, (device_id, room, alias))
|
||||
connection.commit()
|
||||
print(f"Inserted/Updated Device: {device_id}, Room: {room}, Alias: {alias}")
|
||||
except Error as e:
|
||||
print(f"Error inserting data: {e}")
|
||||
|
||||
def get_device_details(device_name, csrf_token):
|
||||
"""Fetch the room and alias attributes for a given device from FHEM."""
|
||||
try:
|
||||
detail_url = f"{FHEM_URL_BASE}?detail={device_name}"
|
||||
headers = {
|
||||
"X-FHEM-csrfToken": csrf_token,
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
response = session.get(detail_url, headers=headers, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
attributes_table = soup.find('table', class_='block wide attributes')
|
||||
|
||||
alias = None
|
||||
room = None
|
||||
if attributes_table:
|
||||
rows = attributes_table.find_all('tr')
|
||||
for row in rows:
|
||||
name_div = row.find('div', class_='dname')
|
||||
value_div = row.find('div', class_='dval')
|
||||
if name_div and value_div:
|
||||
attr_name = name_div.text.strip()
|
||||
attr_value = value_div.text.strip()
|
||||
if attr_name == "alias":
|
||||
alias = attr_value
|
||||
elif attr_name == "room":
|
||||
room = attr_value
|
||||
|
||||
return alias, room
|
||||
|
||||
except (HTTPError, Timeout) as err:
|
||||
print(f"Error fetching details for device {device_name}: {err}")
|
||||
return None, None
|
||||
except RequestException as req_err:
|
||||
print(f"An error occurred: {req_err}")
|
||||
return None, None
|
||||
|
||||
def read_log_and_insert_into_db(connection, csrf_token):
|
||||
"""Read the log file, fetch device details, and insert them into the database."""
|
||||
try:
|
||||
with open(LOG_FILE_PATH, 'r') as log_file:
|
||||
for line in log_file:
|
||||
match = re.search(r'Device: (MA_[a-zA-Z0-9]+), State:', line)
|
||||
if match:
|
||||
device_name = match.group(1)
|
||||
alias, room = get_device_details(device_name, csrf_token)
|
||||
if alias and room:
|
||||
insert_device_data(connection, device_name, room, alias)
|
||||
elif alias: # In case room is not defined
|
||||
insert_device_data(connection, device_name, None, alias)
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"Log file '{LOG_FILE_PATH}' not found.")
|
||||
|
||||
def main():
|
||||
|
94
scripts/fhem_fetch_gradio.py
Normal file
94
scripts/fhem_fetch_gradio.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import os
|
||||
import requests
|
||||
import gradio as gr
|
||||
import subprocess
|
||||
|
||||
# FHEM URL
|
||||
FHEM_URL = "https://fhem.auwiesen2.de/fhem"
|
||||
|
||||
# Parameters including CSRF token
|
||||
PARAMS = {
|
||||
"cmd": "jsonlist2",
|
||||
"XHR": "1",
|
||||
"fwcsrf": "csrf_611440676390392" # CSRF token as a parameter
|
||||
}
|
||||
|
||||
# Headers including CSRF token
|
||||
HEADERS = {
|
||||
"X-FHEM-csrfToken": "csrf_611440676390392",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
|
||||
def get_device_list():
|
||||
try:
|
||||
# Send the request with both parameters and headers
|
||||
response = requests.get(FHEM_URL, params=PARAMS, headers=HEADERS, timeout=10)
|
||||
|
||||
# Debugging: Check the status code and response content
|
||||
print("Response Status Code:", response.status_code)
|
||||
print("Response Content:", response.text)
|
||||
|
||||
response.raise_for_status() # Raise an HTTPError for bad responses (4xx and 5xx)
|
||||
|
||||
# Attempt to parse the response as JSON
|
||||
data = response.json()
|
||||
print("Parsed JSON Data:", data) # Debugging: Print the parsed JSON data
|
||||
|
||||
devices = data.get('Results', [])
|
||||
|
||||
# Prepare data for Gradio (convert dictionaries to tuples)
|
||||
device_list = []
|
||||
for device in devices:
|
||||
device_tuple = (device['Name'], device['Internals'].get('STATE', 'Unknown'))
|
||||
device_list.append(device_tuple)
|
||||
|
||||
return device_list
|
||||
except requests.exceptions.JSONDecodeError:
|
||||
return "Error: Failed to decode JSON response."
|
||||
except KeyError:
|
||||
return "Error: Expected keys not found in the JSON data."
|
||||
except requests.exceptions.RequestException as e:
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
def display_devices():
|
||||
device_list = get_device_list()
|
||||
if isinstance(device_list, str): # Check if an error message was returned
|
||||
return device_list
|
||||
else:
|
||||
# Return the device list as a Gradio Dataframe component
|
||||
return gr.Dataframe(device_list, headers=["Device", "State"])
|
||||
|
||||
# Create a Gradio interface
|
||||
interface = gr.Interface(fn=display_devices, inputs=[], outputs=gr.Dataframe(headers=["Device", "State"]))
|
||||
|
||||
# List of ports to try
|
||||
ports_to_try = [8081, 8082]
|
||||
|
||||
# Function to terminate processes using a specific port
|
||||
def kill_process_on_port(port):
|
||||
try:
|
||||
pid = subprocess.check_output(f"lsof -t -i:{port}", shell=True).decode().strip()
|
||||
if pid:
|
||||
os.system(f"kill {pid}")
|
||||
print(f"Terminated process {pid} on port {port}")
|
||||
except subprocess.CalledProcessError:
|
||||
print(f"No process found on port {port}")
|
||||
|
||||
# Terminate any process that might be using the desired ports
|
||||
for port in ports_to_try:
|
||||
kill_process_on_port(port)
|
||||
|
||||
# Try each port until one is available
|
||||
for port in ports_to_try:
|
||||
try:
|
||||
# Launch the interface with the specified port
|
||||
interface.launch(
|
||||
server_name="0.0.0.0", # Bind to all IPs inside the Docker container
|
||||
server_port=port, # Try the current port
|
||||
share=False, # Do not create a public link since you use a custom domain
|
||||
inbrowser=False # Do not automatically open a browser
|
||||
)
|
||||
break # If launch is successful, break out of the loop
|
||||
except OSError:
|
||||
print(f"Port {port} is not available. Trying next port...")
|
@@ -1 +1,2 @@
|
||||
!pip install mysql-connector-python
|
||||
pip install mysql-connector-python
|
||||
pip install gradio
|
||||
|
116
scripts/read_devices.py
Normal file
116
scripts/read_devices.py
Normal file
@@ -0,0 +1,116 @@
|
||||
import re
|
||||
import requests
|
||||
import sys
|
||||
import os
|
||||
from bs4 import BeautifulSoup
|
||||
from requests.exceptions import HTTPError, Timeout, RequestException
|
||||
|
||||
# Set the working directory to parent
|
||||
os.chdir(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
# Update sys.path to include the parent directory of the current script
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
# Import the get_token function
|
||||
from utilities.get_token import get_token
|
||||
|
||||
# FHEM URL base (replace with your actual FHEM server URL)
|
||||
FHEM_URL_BASE = "https://fhem.auwiesen2.de/fhem"
|
||||
|
||||
# Retrieve the CSRF token dynamically
|
||||
csrf_token = get_token(FHEM_URL_BASE)
|
||||
|
||||
# Headers including CSRF token
|
||||
HEADERS = {
|
||||
"X-FHEM-csrfToken": csrf_token,
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
|
||||
# Path to the log file created by the first script
|
||||
log_file_path = 'fhem_script.log'
|
||||
|
||||
# Define the pattern to match devices in the log file
|
||||
device_pattern = r'^attr (MA_[a-zA-Z0-9]+) alias (.+)$'
|
||||
|
||||
# Session to handle requests
|
||||
session = requests.Session()
|
||||
|
||||
def get_device_attributes(device_name):
|
||||
"""Fetch the room and alias attributes for a given device from FHEM."""
|
||||
try:
|
||||
# Construct the URL to get the device details
|
||||
detail_url = f"{FHEM_URL_BASE}?detail={device_name}"
|
||||
|
||||
# Send the request
|
||||
response = session.get(detail_url, headers=HEADERS, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
# Parse the HTML response using BeautifulSoup
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
|
||||
# Find the table with the class 'block wide attributes'
|
||||
attributes_table = soup.find('table', class_='block wide attributes')
|
||||
|
||||
# Extract the alias and room values
|
||||
alias = None
|
||||
room = None
|
||||
if attributes_table:
|
||||
rows = attributes_table.find_all('tr')
|
||||
for row in rows:
|
||||
name_div = row.find('div', class_='dname')
|
||||
value_div = row.find('div', class_='dval')
|
||||
if name_div and value_div:
|
||||
attr_name = name_div.text.strip()
|
||||
attr_value = value_div.text.strip()
|
||||
if attr_name == "alias":
|
||||
alias = attr_value
|
||||
elif attr_name == "room":
|
||||
room = attr_value
|
||||
|
||||
return alias, room
|
||||
|
||||
except (HTTPError, Timeout) as err:
|
||||
print(f"Error fetching attributes for device {device_name}: {err}")
|
||||
return None, None
|
||||
except RequestException as req_err:
|
||||
print(f"An error occurred: {req_err}")
|
||||
return None, None
|
||||
|
||||
def list_ma_devices_with_aliases_and_rooms(log_file_path, device_pattern):
|
||||
matched_devices = []
|
||||
|
||||
try:
|
||||
# Open the log file and read line by line
|
||||
with open(log_file_path, 'r') as log_file:
|
||||
for line in log_file:
|
||||
# Check if the line matches the pattern for devices starting with "MA_"
|
||||
match = re.search(device_pattern, line)
|
||||
if match:
|
||||
device_name = match.group(1)
|
||||
|
||||
# Get the alias and room for the matched device
|
||||
alias, room = get_device_attributes(device_name)
|
||||
|
||||
# Add the device name, alias, and room to the list
|
||||
matched_devices.append((device_name, alias, room))
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"Log file '{log_file_path}' not found.")
|
||||
return None
|
||||
|
||||
return matched_devices
|
||||
|
||||
def main():
|
||||
# List all devices starting with "MA_", their aliases, and rooms
|
||||
devices = list_ma_devices_with_aliases_and_rooms(log_file_path, device_pattern)
|
||||
|
||||
if devices:
|
||||
print("Devices starting with 'MA_', their aliases, and rooms:")
|
||||
for device_name, alias, room in devices:
|
||||
print(f"Device: {device_name}, Alias: {alias}, Room: {room}")
|
||||
else:
|
||||
print("No devices starting with 'MA_' found.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@@ -1,38 +0,0 @@
|
||||
import mysql.connector
|
||||
from mysql.connector import errorcode
|
||||
|
||||
def connect_to_db():
|
||||
connection = None # Initialize connection to None
|
||||
try:
|
||||
# Establish connection
|
||||
connection = mysql.connector.connect(
|
||||
host='mariadb', # Use the container name or the IP address of the host running the container
|
||||
user='neuer',
|
||||
password='S3raph1n!',
|
||||
charset='utf8mb4' # Ensure the connection uses the correct charset
|
||||
)
|
||||
|
||||
# Create a cursor object using the connection
|
||||
cursor = connection.cursor()
|
||||
|
||||
# Set the session collation explicitly
|
||||
cursor.execute("SET SESSION collation_connection = 'utf8mb4_unicode_ci'")
|
||||
cursor.execute("SET NAMES 'utf8mb4' COLLATE 'utf8mb4_unicode_ci'")
|
||||
|
||||
print("Connected to MariaDB with utf8mb4_unicode_ci collation")
|
||||
return connection
|
||||
|
||||
except mysql.connector.Error as err:
|
||||
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
|
||||
print("Something is wrong with your user name or password")
|
||||
elif err.errno == errorcode.ER_BAD_DB_ERROR:
|
||||
print("Database does not exist")
|
||||
else:
|
||||
print(err)
|
||||
return connection
|
||||
|
||||
# Example of using the connection
|
||||
conn = connect_to_db()
|
||||
if conn:
|
||||
# Perform database operations here
|
||||
conn.close()
|
2
utilities/__init__.py
Normal file
2
utilities/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
# db/__init__.py
|
||||
print("Initializing the db package")
|
BIN
utilities/__pycache__/__init__.cpython-310.pyc
Normal file
BIN
utilities/__pycache__/__init__.cpython-310.pyc
Normal file
Binary file not shown.
BIN
utilities/__pycache__/get_token.cpython-310.pyc
Normal file
BIN
utilities/__pycache__/get_token.cpython-310.pyc
Normal file
Binary file not shown.
43
utilities/get_token.py
Normal file
43
utilities/get_token.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import sys
|
||||
import urllib.request as urllib2
|
||||
import urllib.parse as urlparse
|
||||
import ssl
|
||||
|
||||
BASEURL = 'https://fhem.auwiesen2.de/fhem?'
|
||||
url = BASEURL + 'detail=WEB'
|
||||
|
||||
def get_token(url):
|
||||
# Split the URL to extract the username and password
|
||||
nurl = urlparse.urlsplit(url)
|
||||
username = nurl.username
|
||||
password = nurl.password
|
||||
|
||||
# Reconstruct the URL without the username and password
|
||||
url = url.replace(f"{username}:{password}@", '')
|
||||
url = url.replace(" ", "%20")
|
||||
|
||||
# Create an unverified HTTPS context (not recommended for production)
|
||||
ssl._create_default_https_context = ssl._create_unverified_context
|
||||
|
||||
# Setup HTTP Basic Authentication
|
||||
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
|
||||
password_mgr.add_password(None, url, username, password)
|
||||
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
|
||||
opener = urllib2.build_opener(handler)
|
||||
urllib2.install_opener(opener)
|
||||
|
||||
try:
|
||||
# Send the request and retrieve the response
|
||||
with urllib2.urlopen(url, data=None, timeout=10) as uu:
|
||||
token = uu.read().decode('utf-8')
|
||||
# Extract the CSRF token
|
||||
token = token[token.find('csrf_'):]
|
||||
token = token[:token.find("'")]
|
||||
return token
|
||||
except urllib2.URLError as e:
|
||||
print(f'URLError: {e.reason}')
|
||||
return False
|
||||
|
||||
# Example usage
|
||||
token = get_token(url)
|
||||
print(f"Token: {token}")
|
Reference in New Issue
Block a user