# modules/db.py import os import re import time, datetime import sqlite3 import globals try: import mariadb except ImportError: mariadb = None # We handle gracefully if 'mariadb' isn't installed. def checkenable_db_fk(db_conn): """ Attempt to enable foreign key checks where it is relevant (i.e. in SQLite). For MariaDB/MySQL, nothing special is needed. """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() if is_sqlite: try: cursor = db_conn.cursor() # Try enabling foreign key checks cursor.execute("PRAGMA foreign_keys = ON;") cursor.close() db_conn.commit() globals.log("Enabled foreign key support in SQLite (PRAGMA foreign_keys=ON).", "DEBUG") except Exception as e: globals.log(f"Failed to enable foreign key support in SQLite: {e}", "WARNING") else: # For MariaDB/MySQL, they're typically enabled with InnoDB globals.log("Assuming DB is MariaDB/MySQL with FKs enabled", "DEBUG") def init_db_connection(config): """ Initializes a database connection based on config.json contents: - If config says 'use_mariadb', tries connecting to MariaDB. - If that fails (or not configured), falls back to SQLite. - Logs FATAL if neither can be established (the bot likely depends on DB). :param config: (dict) The loaded config.json data :param log: (function) Logging function (message, level="INFO") :return: a connection object (MariaDB or sqlite3.Connection), or None on failure """ db_settings = config.get("database", {}) use_mariadb = db_settings.get("use_mariadb", False) if use_mariadb and mariadb is not None or False: # Attempt MariaDB host = db_settings.get("mariadb_host", "localhost") user = db_settings.get("mariadb_user", "") password = db_settings.get("mariadb_password", "") dbname = db_settings.get("mariadb_dbname", "") port = int(db_settings.get("mariadb_port", 3306)) if user and password and dbname: try: conn = mariadb.connect( host=host, user=user, password=password, database=dbname, port=port ) conn.autocommit = False # We'll manage commits manually globals.log(f"Database connection established using MariaDB (host={host}, db={dbname}).") return conn except mariadb.Error as e: globals.log(f"Error connecting to MariaDB: {e}", "WARNING") else: globals.log("MariaDB config incomplete. Falling back to SQLite...", "WARNING") else: if use_mariadb and mariadb is None: globals.log("mariadb module not installed but use_mariadb=True. Falling back to SQLite...", "WARNING") # Fallback to local SQLite sqlite_path = db_settings.get("sqlite_path", "local_database.sqlite") try: conn = sqlite3.connect(sqlite_path) globals.log(f"Database connection established using local SQLite: {sqlite_path}") return conn except sqlite3.Error as e: globals.log(f"Could not open local SQLite database '{sqlite_path}': {e}", "WARNING") # If neither MariaDB nor SQLite connected, that's fatal for the bot globals.log("No valid database connection could be established! Exiting...", "FATAL") return None def run_db_operation(conn, operation, query, params=None): """ Executes a parameterized query with basic screening for injection attempts: - 'operation' can be "read", "write", "update", "delete", "lookup", etc. - 'query' is the SQL statement, with placeholders (? in SQLite or %s in MariaDB both work). - 'params' is a tuple/list of parameters for the query (preferred for security). 1) We do a minimal check for suspicious patterns, e.g. multiple statements or known bad keywords. 2) We execute the query with parameters, and commit on write/update/delete. 3) On read/lookup, we fetch and return rows. Otherwise, return rowcount. NOTE: - This is still not a replacement for well-structured queries and security best practices. - Always use parameterized queries wherever possible to avoid injection. """ if conn is None: if globals.log: globals.log("run_db_operation called but no valid DB connection!", "FATAL") return None if params is None: params = () # Basic screening for malicious usage (multiple statements, forced semicolons, suspicious SQL keywords, etc.) # This is minimal and can be expanded if needed. lowered = query.strip().lower() # Check for multiple statements separated by semicolons (beyond the last one) if lowered.count(";") > 1: if globals.log: globals.log("Query blocked: multiple SQL statements detected.", "WARNING") globals.log(f"Offending query: {query}", "WARNING") return None # Potentially dangerous SQL keywords forbidden_keywords = ["drop table", "union select", "exec ", "benchmark(", "sleep("] for kw in forbidden_keywords: if kw in lowered: if globals.log: globals.log(f"Query blocked due to forbidden keyword: '{kw}'", "WARNING") globals.log(f"Offending query: {query}", "WARNING") return None cursor = conn.cursor() try: cursor.execute(query, params) # If it's a write/update/delete, commit the changes write_ops = ("write", "insert", "update", "delete", "change") if operation.lower() in write_ops: conn.commit() if globals.log: globals.log(f"DB operation '{operation}' committed.", "DEBUG") # If the query is an INSERT, return the last inserted row ID if query.strip().lower().startswith("insert"): try: return cursor.lastrowid except Exception as e: if globals.log: globals.log(f"Error retrieving lastrowid: {e}", "ERROR") return cursor.rowcount else: return cursor.rowcount # If it's read/lookup, fetch results read_ops = ("read", "lookup", "select") if operation.lower() in read_ops: rows = cursor.fetchall() return rows else: return cursor.rowcount # for insert/update/delete, rowcount can be helpful except Exception as e: # Rollback on any error conn.rollback() if globals.log: globals.log(f"Error during '{operation}' query execution: {e}", "ERROR") return None finally: cursor.close() ####################### # Ensure quotes table exists ####################### def ensure_quotes_table(db_conn): """ Checks if 'quotes' table exists. If not, attempts to create it. Raises an Exception or logs errors if creation fails. """ # 1) Determine if DB is sqlite or mariadb for the system table check is_sqlite = "sqlite3" in str(type(db_conn)).lower() # 2) Check existence if is_sqlite: # For SQLite: check the sqlite_master table check_sql = """ SELECT name FROM sqlite_master WHERE type='table' AND name='quotes' """ else: # For MariaDB/MySQL: check information_schema check_sql = """ SELECT table_name FROM information_schema.tables WHERE table_name = 'quotes' AND table_schema = DATABASE() """ from modules.db import run_db_operation rows = run_db_operation(db_conn, "read", check_sql) if rows and rows[0] and rows[0][0]: # The table 'quotes' already exists globals.log("Table 'quotes' already exists, skipping creation.", "DEBUG") return # We can just return # 3) Table does NOT exist => create it globals.log("Table 'quotes' does not exist; creating now...") if is_sqlite: create_table_sql = """ CREATE TABLE quotes ( ID INTEGER PRIMARY KEY AUTOINCREMENT, QUOTE_TEXT TEXT, QUOTEE TEXT, QUOTE_CHANNEL TEXT, QUOTE_DATETIME TEXT, QUOTE_GAME TEXT, QUOTE_REMOVED BOOLEAN DEFAULT 0, QUOTE_REMOVED_BY TEXT, QUOTE_REMOVED_DATETIME TEXT DEFAULT NULL, FOREIGN KEY (QUOTEE) REFERENCES users(UUID), FOREIGN KEY (QUOTE_REMOVED_BY) REFERENCES users(UUID) ) """ else: create_table_sql = """ CREATE TABLE quotes ( ID INT PRIMARY KEY AUTO_INCREMENT, QUOTE_TEXT TEXT, QUOTEE VARCHAR(100), QUOTE_CHANNEL VARCHAR(100), QUOTE_DATETIME DATETIME DEFAULT CURRENT_TIMESTAMP, QUOTE_GAME VARCHAR(200), QUOTE_REMOVED BOOLEAN DEFAULT FALSE, QUOTE_REMOVED_BY VARCHAR(100), QUOTE_REMOVED_DATETIME DATETIME DEFAULT NULL, FOREIGN KEY (QUOTEE) REFERENCES users(UUID) ON DELETE SET NULL FOREIGN KEY (QUOTE_REMOVED_BY) REFERENCES users(UUID) ON DELETE SET NULL ) """ result = run_db_operation(db_conn, "write", create_table_sql) if result is None: # If run_db_operation returns None on error, handle or raise: error_msg = "Failed to create 'quotes' table!" globals.log(error_msg, "CRITICAL") raise RuntimeError(error_msg) globals.log("Successfully created table 'quotes'.") ####################### # Ensure 'users' table ####################### def ensure_users_table(db_conn): """ Checks if 'users' table exists. If not, creates it. The 'users' table tracks user linkage across platforms: - UUID: (PK) The universal ID for the user - discord_user_id, discord_username, discord_user_display_name - twitch_user_id, twitch_username, twitch_user_display_name - datetime_linked (DATE/TIME of row creation) - user_is_banned (BOOLEAN) - user_is_bot (BOOLEAN) This helps unify data for a single 'person' across Discord & Twitch. """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() # 1) Check existence if is_sqlite: check_sql = """ SELECT name FROM sqlite_master WHERE type='table' AND name='users' """ else: check_sql = """ SELECT table_name FROM information_schema.tables WHERE table_name = 'users' AND table_schema = DATABASE() """ rows = run_db_operation(db_conn, "read", check_sql) if rows and rows[0] and rows[0][0]: globals.log("Table 'users' already exists, skipping creation.", "DEBUG") return # 2) Table does NOT exist => create it globals.log("Table 'users' does not exist; creating now...") if is_sqlite: create_table_sql = """ CREATE TABLE users ( UUID TEXT PRIMARY KEY, discord_user_id TEXT, discord_username TEXT, discord_user_display_name TEXT, twitch_user_id TEXT, twitch_username TEXT, twitch_user_display_name TEXT, datetime_linked TEXT, user_is_banned BOOLEAN DEFAULT 0, user_is_bot BOOLEAN DEFAULT 0 ) """ else: create_table_sql = """ CREATE TABLE users ( UUID VARCHAR(36) PRIMARY KEY, discord_user_id VARCHAR(100), discord_username VARCHAR(100), discord_user_display_name VARCHAR(100), twitch_user_id VARCHAR(100), twitch_username VARCHAR(100), twitch_user_display_name VARCHAR(100), datetime_linked DATETIME, user_is_banned BOOLEAN DEFAULT FALSE, user_is_bot BOOLEAN DEFAULT FALSE ) """ result = run_db_operation(db_conn, "write", create_table_sql) if result is None: error_msg = "Failed to create 'users' table!" globals.log(error_msg, "CRITICAL") raise RuntimeError(error_msg) globals.log("Successfully created table 'users'.") ######################## # Lookup user function ######################## def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifier: str = None): """ Looks up a user in the 'users' table based on the given identifier_type. The accepted identifier_type values are: - "uuid" - "discord_user_id" or alias "discord" - "discord_username" - "discord_user_display_name" - "twitch_user_id" or alias "twitch" - "twitch_username" - "twitch_user_display_name" Optionally, if target_identifier is provided (must be one of the accepted columns), only that column's value will be returned instead of the full user record. Returns: If target_identifier is None: A dictionary with the following keys: { "UUID": str, "discord_user_id": str or None, "discord_username": str or None, "discord_user_display_name": str or None, "twitch_user_id": str or None, "twitch_username": str or None, "twitch_user_display_name": str or None, "datetime_linked": str (or datetime as stored in the database), "user_is_banned": bool or int, "user_is_bot": bool or int } If target_identifier is provided: The value from the record corresponding to that column. If the lookup fails or the parameters are invalid: None. """ # Define the valid columns for lookup and for target extraction. valid_cols = [ "uuid", "discord_user_id", "discord_username", "twitch_user_id", "twitch_username", "discord", "twitch", "discord_user_display_name", "twitch_user_display_name" ] # Ensure the provided identifier_type is acceptable. if identifier_type.lower() not in valid_cols: if globals.log: globals.log(f"lookup_user error: invalid identifier_type '{identifier_type}'", "WARNING") return None # Convert shorthand identifier types to their full column names. if identifier_type.lower() == "discord": identifier_type = "discord_user_id" elif identifier_type.lower() == "twitch": identifier_type = "twitch_user_id" # If a target_identifier is provided, validate that too. if target_identifier is not None: if target_identifier.lower() not in valid_cols: if globals.log: globals.log(f"lookup_user error: invalid target_identifier '{target_identifier}'", "WARNING") return None # Build the query using the (now validated) identifier_type. query = f""" SELECT UUID, discord_user_id, discord_username, discord_user_display_name, twitch_user_id, twitch_username, twitch_user_display_name, datetime_linked, user_is_banned, user_is_bot FROM users WHERE {identifier_type} = ? LIMIT 1 """ # Execute the database operation. Adjust run_db_operation() as needed. rows = run_db_operation(db_conn, "read", query, params=(identifier,)) if not rows: if globals.log: globals.log(f"lookup_user: No user found for {identifier_type}='{identifier}'", "DEBUG") return None # Since we have a single row, convert it to a dictionary. row = rows[0] user_data = { "UUID": row[0], "discord_user_id": row[1], "discord_username": row[2] if row[2] else f"{row[5]} (Discord unlinked)", "discord_user_display_name": row[3] if row[3] else f"{row[6]} (Discord unlinked)", "twitch_user_id": row[4], "twitch_username": row[5] if row[5] else f"{row[2]} (Twitch unlinked)", "twitch_user_display_name": row[6] if row[6] else f"{row[3]} (Twitch unlinked)", "datetime_linked": row[7], "user_is_banned": row[8], "user_is_bot": row[9], } # If the caller requested a specific target column, return that value. if target_identifier: # Adjust for potential alias: if target_identifier is an alias, # translate it to the actual column name. target_identifier = target_identifier.lower() if target_identifier == "discord": target_identifier = "discord_user_id" elif target_identifier == "twitch": target_identifier = "twitch_user_id" # The key for "uuid" is stored as "UUID" in our dict. if target_identifier == "uuid": target_identifier = "UUID" # If usernames are Null, default to that of the opposite platform # if not user_data['discord_username']: # user_data['discord_username'] = f"{user_data['twitch_username']} (Discord unlinked)" # elif not user_data['twitch_username']: # user_data['twitch_username'] = f"{user_data['discord_username']} (Twitch unlinked)" # if not user_data['discord_user_display_name']: # user_data['discord_user_display_name'] = f"{user_data['twitch_user_display_name']} (Discord unlinked)" # elif not user_data['twitch_user_display_name']: # user_data['twitch_user_display_name'] = f"{user_data['discord_user_display_name']} (Twitch unlinked)" if target_identifier in user_data: return user_data[target_identifier] else: if globals.log: globals.log(f"lookup_user error: target_identifier '{target_identifier}' not present in user data", "WARNING") return None # Otherwise, return the full user record. return user_data def ensure_chatlog_table(db_conn): """ Checks if 'chat_log' table exists. If not, creates it. The table layout: MESSAGE_ID (PK, auto increment) UUID (references users.UUID, if you want a foreign key, see note below) MESSAGE_CONTENT (text) PLATFORM (string, e.g. 'twitch' or discord server name) CHANNEL (the twitch channel or discord channel name) DATETIME (defaults to current timestamp) ATTACHMENTS (text; store hyperlink(s) or empty) For maximum compatibility, we won't enforce the foreign key at the DB level, but you can add it if you want. """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() # 1) Check if table exists if is_sqlite: check_sql = """ SELECT name FROM sqlite_master WHERE type='table' AND name='chat_log' """ else: check_sql = """ SELECT table_name FROM information_schema.tables WHERE table_name = 'chat_log' AND table_schema = DATABASE() """ rows = run_db_operation(db_conn, "read", check_sql) if rows and rows[0] and rows[0][0]: globals.log("Table 'chat_log' already exists, skipping creation.", "DEBUG") return # 2) Table doesn't exist => create it globals.log("Table 'chat_log' does not exist; creating now...") if is_sqlite: create_sql = """ CREATE TABLE chat_log ( MESSAGE_ID INTEGER PRIMARY KEY AUTOINCREMENT, UUID TEXT, MESSAGE_CONTENT TEXT, PLATFORM TEXT, CHANNEL TEXT, DATETIME TEXT DEFAULT CURRENT_TIMESTAMP, ATTACHMENTS TEXT, FOREIGN KEY (UUID) REFERENCES users(UUID) ) """ else: create_sql = """ CREATE TABLE chat_log ( MESSAGE_ID INT PRIMARY KEY AUTO_INCREMENT, UUID VARCHAR(36), MESSAGE_CONTENT TEXT, PLATFORM VARCHAR(100), CHANNEL VARCHAR(100), DATETIME DATETIME DEFAULT CURRENT_TIMESTAMP, ATTACHMENTS TEXT, FOREIGN KEY (UUID) REFERENCES users(UUID) ON DELETE SET NULL ) """ result = run_db_operation(db_conn, "write", create_sql) if result is None: error_msg = "Failed to create 'chat_log' table!" globals.log(error_msg, "CRITICAL") raise RuntimeError(error_msg) globals.log("Successfully created table 'chat_log'.", "INFO") def log_message(db_conn, user_uuid, message_content, platform, channel, attachments=None, username: str = "Unknown"): """ Inserts a row into 'chat_log' with the given fields. user_uuid: The user's UUID from the 'users' table (string). message_content: The text of the message. platform: 'twitch' or discord server name, etc. channel: The channel name (Twitch channel, or Discord channel). attachments: Optional string of hyperlinks if available. DATETIME will default to current timestamp in the DB. """ if attachments is None or not "https://" in attachments: attachments = "" insert_sql = """ INSERT INTO chat_log ( UUID, MESSAGE_CONTENT, PLATFORM, CHANNEL, ATTACHMENTS ) VALUES (?, ?, ?, ?, ?) """ params = (user_uuid, message_content, platform, channel, attachments) rowcount = run_db_operation(db_conn, "write", insert_sql, params) if rowcount and rowcount > 0: globals.log(f"Logged message for UUID={user_uuid} ({username}) in 'chat_log'.", "DEBUG") else: globals.log("Failed to log message in 'chat_log'.", "ERROR") def ensure_userhowls_table(db_conn): """ Checks if 'user_howls' table exists; if not, creates it: ID (PK) | UUID (FK -> users.UUID) | HOWL (int) | DATETIME (auto timestamp) """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() # Existence check if is_sqlite: check_sql = """ SELECT name FROM sqlite_master WHERE type='table' AND name='user_howls' """ else: check_sql = """ SELECT table_name FROM information_schema.tables WHERE table_name = 'user_howls' AND table_schema = DATABASE() """ rows = run_db_operation(db_conn, "read", check_sql) if rows and rows[0] and rows[0][0]: globals.log("Table 'user_howls' already exists, skipping creation.", "DEBUG") return globals.log("Table 'user_howls' does not exist; creating now...", "INFO") if is_sqlite: create_sql = """ CREATE TABLE user_howls ( ID INTEGER PRIMARY KEY AUTOINCREMENT, UUID TEXT, HOWL INT, DATETIME TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (UUID) REFERENCES users(UUID) ) """ else: create_sql = """ CREATE TABLE user_howls ( ID INT PRIMARY KEY AUTO_INCREMENT, UUID VARCHAR(36), HOWL INT, DATETIME DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (UUID) REFERENCES users(UUID) ON DELETE SET NULL ) """ result = run_db_operation(db_conn, "write", create_sql) if result is None: err_msg = "Failed to create 'user_howls' table!" globals.log(err_msg, "ERROR") raise RuntimeError(err_msg) globals.log("Successfully created table 'user_howls'.", "INFO") def insert_howl(db_conn, user_uuid, howl_value): """ Insert a row into user_howls with the user's UUID, the integer 0-100, and DATETIME defaulting to now. """ sql = """ INSERT INTO user_howls (UUID, HOWL) VALUES (?, ?) """ params = (user_uuid, howl_value) rowcount = run_db_operation(db_conn, "write", sql, params) if rowcount and rowcount > 0: globals.log(f"Recorded a {howl_value}% howl for UUID={user_uuid}.", "DEBUG") else: globals.log(f"Failed to record {howl_value}% howl for UUID={user_uuid}.", "ERROR") def get_howl_stats(db_conn, user_uuid): """ Returns a dict with { 'count': int, 'average': float, 'count_zero': int, 'count_hundred': int } or None if there are no rows at all for that UUID. """ sql = """ SELECT COUNT(*), AVG(HOWL), SUM(HOWL=0), SUM(HOWL=100) FROM user_howls WHERE UUID = ? """ rows = run_db_operation(db_conn, "read", sql, (user_uuid,)) if not rows: return None row = rows[0] # (count, avg, zero_count, hundred_count) count = row[0] if row[0] else 0 avg = float(row[1]) if row[1] else 0.0 zero_count = row[2] if row[2] else 0 hundred_count = row[3] if row[3] else 0 if count < 1: return None # user has no howls return { "count": count, "average": avg, "count_zero": zero_count, "count_hundred": hundred_count } def get_global_howl_stats(db_conn): """ Returns a dictionary with total howls, average howl percentage, unique users, and counts of extreme (0% and 100%) howls. """ sql = """ SELECT COUNT(*) AS total_howls, AVG(HOWL) AS average_howl, COUNT(DISTINCT UUID) AS unique_users, SUM(HOWL = 0) AS count_zero, SUM(HOWL = 100) AS count_hundred FROM user_howls """ rows = run_db_operation(db_conn, "read", sql) if not rows or not rows[0] or rows[0][0] is None: return None # No howl data exists return { "total_howls": rows[0][0], "average_howl": float(rows[0][1]) if rows[0][1] is not None else 0.0, "unique_users": rows[0][2], "count_zero": rows[0][3], "count_hundred": rows[0][4], } def ensure_discord_activity_table(db_conn): """ Ensures the 'discord_activity' table exists. Logs voice events, cameras, streaming, gaming, and Discord activities. """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() if is_sqlite: check_sql = "SELECT name FROM sqlite_master WHERE type='table' AND name='discord_activity'" else: check_sql = """ SELECT table_name FROM information_schema.tables WHERE table_name = 'discord_activity' AND table_schema = DATABASE() """ rows = run_db_operation(db_conn, "read", check_sql) if rows and rows[0]: globals.log("Table 'discord_activity' already exists, skipping creation.", "DEBUG") return globals.log("Creating 'discord_activity' table...", "INFO") if is_sqlite: create_sql = """ CREATE TABLE discord_activity ( ID INTEGER PRIMARY KEY AUTOINCREMENT, UUID TEXT, ACTION TEXT CHECK(ACTION IN ( 'JOIN', 'LEAVE', 'MUTE', 'UNMUTE', 'DEAFEN', 'UNDEAFEN', 'STREAM_START', 'STREAM_STOP', 'CAMERA_ON', 'CAMERA_OFF', 'GAME_START', 'GAME_STOP', 'LISTENING_SPOTIFY', 'DISCORD_ACTIVITY', 'VC_MOVE' )), GUILD_ID TEXT, VOICE_CHANNEL TEXT, ACTION_DETAIL TEXT DEFAULT NULL, DATETIME TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (UUID) REFERENCES users(UUID) ) """ else: create_sql = """ CREATE TABLE discord_activity ( ID INT PRIMARY KEY AUTO_INCREMENT, UUID VARCHAR(36), ACTION ENUM( 'JOIN', 'LEAVE', 'MUTE', 'UNMUTE', 'DEAFEN', 'UNDEAFEN', 'STREAM_START', 'STREAM_STOP', 'CAMERA_ON', 'CAMERA_OFF', 'GAME_START', 'GAME_STOP', 'LISTENING_SPOTIFY', 'DISCORD_ACTIVITY', 'VC_MOVE' ), GUILD_ID VARCHAR(36), VOICE_CHANNEL VARCHAR(100), ACTION_DETAIL TEXT DEFAULT NULL, DATETIME DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (UUID) REFERENCES users(UUID) ON DELETE SET NULL ) """ try: result = run_db_operation(db_conn, "write", create_sql) except Exception as e: globals.log(f"Unable to create the table: discord_activity: {e}") if result is None: globals.log("Failed to create 'discord_activity' table!", "CRITICAL") raise RuntimeError("Database table creation failed.") globals.log("Successfully created table 'discord_activity'.", "INFO") def log_discord_activity(db_conn, guild_id, user_uuid, action, voice_channel, action_detail=None): """ Logs Discord activities (playing games, listening to Spotify, streaming). Duplicate detection: - Fetch the last NUM_RECENT_ENTRIES events for this user & action. - Normalize the ACTION_DETAIL values. - If the most recent event(s) all match the new event's detail (i.e. no intervening non-matching event) and the latest matching event was logged less than DUPLICATE_THRESHOLD ago, skip logging. - This allows a "reset": if the user changes state (e.g. changes song or channel) and then reverts, the new event is logged. """ def normalize_detail(detail): """Return a normalized version of the detail for comparison (or None if detail is None).""" return detail.strip().lower() if detail else None # How long to consider an event “fresh” enough to be considered a duplicate. DUPLICATE_THRESHOLD = datetime.timedelta(minutes=5) # How many recent events to check. NUM_RECENT_ENTRIES = 5 # Verify that the user exists in 'users' before proceeding. user_check = run_db_operation( db_conn, "read", "SELECT UUID FROM users WHERE UUID = ?", (user_uuid,) ) if not user_check: globals.log(f"WARNING: Attempted to log activity for non-existent UUID: {user_uuid}", "WARNING") return # Prevent foreign key issues. now = datetime.datetime.now() normalized_new = normalize_detail(action_detail) # Query the last NUM_RECENT_ENTRIES events for this user and action. query = """ SELECT DATETIME, ACTION_DETAIL FROM discord_activity WHERE UUID = ? AND ACTION = ? ORDER BY DATETIME DESC LIMIT ? """ rows = run_db_operation( db_conn, "read", query, params=(user_uuid, action, NUM_RECENT_ENTRIES) ) # Determine the timestamp of the most recent event that matches the new detail, # and the most recent event that is different. last_same = None # Timestamp of the most recent event matching normalized_new. last_different = None # Timestamp of the most recent event with a different detail. for row in rows: dt_str, detail = row try: dt = datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") except Exception as e: globals.log(f"Error parsing datetime '{dt_str}': {e}", "ERROR") continue normalized_existing = normalize_detail(detail) if normalized_existing == normalized_new: # Record the most recent matching event. if last_same is None or dt > last_same: last_same = dt else: # Record the most recent non-matching event. if last_different is None or dt > last_different: last_different = dt # Decide whether to skip logging: # If there is a matching (same-detail) event, and either no different event exists OR the matching event # is more recent than the last different event (i.e. the user's current state is still the same), # then if that event is within the DUPLICATE_THRESHOLD, skip logging. if last_same is not None: if (last_different is None) or (last_same > last_different): if now - last_same > DUPLICATE_THRESHOLD: #log_func(f"Duplicate {action} event for user {user_uuid} (detail '{action_detail}') within threshold; skipping log.","DEBUG") return # Prepare the voice_channel value (if it’s an object with a name, use that). channel_val = voice_channel.name if (voice_channel and hasattr(voice_channel, "name")) else voice_channel # Insert the new event. sql = """ INSERT INTO discord_activity (UUID, ACTION, GUILD_ID, VOICE_CHANNEL, ACTION_DETAIL) VALUES (?, ?, ?, ?, ?) """ params = (user_uuid, action, guild_id, channel_val, action_detail) rowcount = run_db_operation(db_conn, "write", sql, params) if rowcount and rowcount > 0: detail_str = f" ({action_detail})" if action_detail else "" globals.log(f"Logged Discord activity in Guild {guild_id}: {action}{detail_str}", "DEBUG") else: globals.log("Failed to log Discord activity.", "ERROR") def ensure_bot_events_table(db_conn): """ Ensures the 'bot_events' table exists, which logs major bot-related events. """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() # Check if table exists check_sql = "SELECT name FROM sqlite_master WHERE type='table' AND name='bot_events'" if is_sqlite else """ SELECT table_name FROM information_schema.tables WHERE table_name = 'bot_events' AND table_schema = DATABASE() """ rows = run_db_operation(db_conn, "read", check_sql) if rows and rows[0]: globals.log("Table 'bot_events' already exists, skipping creation.", "DEBUG") return globals.log("Creating 'bot_events' table...", "INFO") # Define SQL Schema create_sql = """ CREATE TABLE bot_events ( EVENT_ID INTEGER PRIMARY KEY AUTOINCREMENT, EVENT_TYPE TEXT, EVENT_DETAILS TEXT, DATETIME TEXT DEFAULT CURRENT_TIMESTAMP ) """ if is_sqlite else """ CREATE TABLE bot_events ( EVENT_ID INT PRIMARY KEY AUTO_INCREMENT, EVENT_TYPE VARCHAR(50), EVENT_DETAILS TEXT, DATETIME DATETIME DEFAULT CURRENT_TIMESTAMP ) """ # Create the table result = run_db_operation(db_conn, "write", create_sql) if result is None: globals.log("Failed to create 'bot_events' table!", "CRITICAL") raise RuntimeError("Database table creation failed.") globals.log("Successfully created table 'bot_events'.", "INFO") def log_bot_event(db_conn, event_type, event_details): """ Logs a bot event (e.g., startup, shutdown, disconnection). """ sql = """ INSERT INTO bot_events (EVENT_TYPE, EVENT_DETAILS) VALUES (?, ?) """ params = (event_type, event_details) rowcount = run_db_operation(db_conn, "write", sql, params) if rowcount and rowcount > 0: globals.log(f"Logged bot event: {event_type} - {event_details}", "DEBUG") else: globals.log("Failed to log bot event.", "ERROR") def get_event_summary(db_conn, time_span="7d"): """ Retrieves bot event statistics based on a given time span. Supports: - "7d" (7 days) - "1m" (1 month) - "24h" (last 24 hours) Returns: OrderedDict with event statistics. """ from collections import OrderedDict import datetime # Time span mapping time_mappings = { "7d": "7 days", "1m": "1 month", "24h": "24 hours" } if time_span not in time_mappings: globals.log(f"Invalid time span '{time_span}', defaulting to '7d'", "WARNING") time_span = "7d" # Define SQL query sql = f""" SELECT EVENT_TYPE, COUNT(*) FROM bot_events WHERE DATETIME >= datetime('now', '-{time_mappings[time_span]}') GROUP BY EVENT_TYPE ORDER BY COUNT(*) DESC """ rows = run_db_operation(db_conn, "read", sql) # Organize data into OrderedDict summary = OrderedDict() summary["time_span"] = time_span for event_type, count in rows: summary[event_type] = count return summary def ensure_link_codes_table(db_conn): """ Ensures the 'link_codes' table exists. This table stores one-time-use account linking codes. """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() check_sql = "SELECT name FROM sqlite_master WHERE type='table' AND name='link_codes'" if is_sqlite else """ SELECT table_name FROM information_schema.tables WHERE table_name = 'link_codes' AND table_schema = DATABASE() """ rows = run_db_operation(db_conn, "read", check_sql) if rows and rows[0]: globals.log("Table 'link_codes' already exists, skipping creation.", "DEBUG") return globals.log("Creating 'link_codes' table...", "INFO") create_sql = """ CREATE TABLE link_codes ( ID INTEGER PRIMARY KEY AUTOINCREMENT, DISCORD_USER_ID TEXT UNIQUE, LINK_CODE TEXT UNIQUE, CREATED_AT TEXT DEFAULT CURRENT_TIMESTAMP ) """ if is_sqlite else """ CREATE TABLE link_codes ( ID INT PRIMARY KEY AUTO_INCREMENT, DISCORD_USER_ID VARCHAR(50) UNIQUE, LINK_CODE VARCHAR(50) UNIQUE, CREATED_AT DATETIME DEFAULT CURRENT_TIMESTAMP ) """ result = run_db_operation(db_conn, "write", create_sql) if result is None: globals.log("Failed to create 'link_codes' table!", "CRITICAL") raise RuntimeError("Database table creation failed.") globals.log("Successfully created table 'link_codes'.", "INFO") def merge_uuid_data(db_conn, old_uuid, new_uuid): """ Merges all records from the old UUID (Twitch account) into the new UUID (Discord account). This replaces all instances of the old UUID in all relevant tables with the new UUID, ensuring that no data is lost in the linking process. After merging, the old UUID entry is removed from the `users` table. """ globals.log(f"Starting UUID merge: {old_uuid} -> {new_uuid}", "INFO") tables_to_update = [ "voice_activity_log", "bot_events", "chat_log", "user_howls", "quotes" ] for table in tables_to_update: sql = f"UPDATE {table} SET UUID = ? WHERE UUID = ?" rowcount = run_db_operation(db_conn, "update", sql, (new_uuid, old_uuid)) globals.log(f"Updated {rowcount} rows in {table} (transferring {old_uuid} -> {new_uuid})", "DEBUG") # Finally, delete the old UUID from the `users` table delete_sql = "DELETE FROM users WHERE UUID = ?" rowcount = run_db_operation(db_conn, "write", delete_sql, (old_uuid,)) globals.log(f"Deleted old UUID {old_uuid} from 'users' table ({rowcount} rows affected)", "INFO") globals.log(f"UUID merge complete: {old_uuid} -> {new_uuid}", "INFO")