# modules/db.py import os import re import time import sqlite3 try: import mariadb except ImportError: mariadb = None # We handle gracefully if 'mariadb' isn't installed. def init_db_connection(config, log): """ Initializes a database connection based on config.json contents: - If config says 'use_mariadb', tries connecting to MariaDB. - If that fails (or not configured), falls back to SQLite. - Logs FATAL if neither can be established (the bot likely depends on DB). :param config: (dict) The loaded config.json data :param log: (function) Logging function (message, level="INFO") :return: a connection object (MariaDB or sqlite3.Connection), or None on failure """ db_settings = config.get("database", {}) use_mariadb = db_settings.get("use_mariadb", False) if use_mariadb and mariadb is not None: # Attempt MariaDB host = db_settings.get("mariadb_host", "localhost") user = db_settings.get("mariadb_user", "") password = db_settings.get("mariadb_password", "") dbname = db_settings.get("mariadb_dbname", "") port = int(db_settings.get("mariadb_port", 3306)) if user and password and dbname: try: conn = mariadb.connect( host=host, user=user, password=password, database=dbname, port=port ) conn.autocommit = False # We'll manage commits manually log(f"Database connection established using MariaDB (host={host}, db={dbname}).") return conn except mariadb.Error as e: log(f"Error connecting to MariaDB: {e}", "WARNING") else: log("MariaDB config incomplete. Falling back to SQLite...", "WARNING") else: if use_mariadb and mariadb is None: log("mariadb module not installed but use_mariadb=True. Falling back to SQLite...", "WARNING") # Fallback to local SQLite sqlite_path = db_settings.get("sqlite_path", "local_database.sqlite") try: conn = sqlite3.connect(sqlite_path) log(f"Database connection established using local SQLite: {sqlite_path}") return conn except sqlite3.Error as e: log(f"Could not open local SQLite database '{sqlite_path}': {e}", "WARNING") # If neither MariaDB nor SQLite connected, that's fatal for the bot log("No valid database connection could be established! Exiting...", "FATAL") return None def run_db_operation(conn, operation, query, params=None, log_func=None): """ Executes a parameterized query with basic screening for injection attempts: - 'operation' can be "read", "write", "update", "delete", "lookup", etc. - 'query' is the SQL statement, with placeholders (? in SQLite or %s in MariaDB both work). - 'params' is a tuple/list of parameters for the query (preferred for security). - 'log_func' is the logging function (message, level). 1) We do a minimal check for suspicious patterns, e.g. multiple statements or known bad keywords. 2) We execute the query with parameters, and commit on write/update/delete. 3) On read/lookup, we fetch and return rows. Otherwise, return rowcount. NOTE: - This is still not a replacement for well-structured queries and security best practices. - Always use parameterized queries wherever possible to avoid injection. """ if conn is None: if log_func: log_func("run_db_operation called but no valid DB connection!", "FATAL") return None if params is None: params = () # Basic screening for malicious usage (multiple statements, forced semicolons, suspicious SQL keywords, etc.) # This is minimal and can be expanded if needed. lowered = query.strip().lower() # Check for multiple statements separated by semicolons (beyond the last one) if lowered.count(";") > 1: if log_func: log_func("Query blocked: multiple SQL statements detected.", "WARNING") log_func(f"Offending query: {query}", "WARNING") return None # Potentially dangerous SQL keywords forbidden_keywords = ["drop table", "union select", "exec ", "benchmark(", "sleep("] for kw in forbidden_keywords: if kw in lowered: if log_func: log_func(f"Query blocked due to forbidden keyword: '{kw}'", "WARNING") log_func(f"Offending query: {query}", "WARNING") return None cursor = conn.cursor() try: cursor.execute(query, params) # If it's a write/update/delete, commit the changes write_ops = ("write", "insert", "update", "delete", "change") if operation.lower() in write_ops: conn.commit() if log_func: log_func(f"DB operation '{operation}' committed.", "DEBUG") # If it's read/lookup, fetch results read_ops = ("read", "lookup", "select") if operation.lower() in read_ops: rows = cursor.fetchall() return rows else: return cursor.rowcount # for insert/update/delete, rowcount can be helpful except Exception as e: # Rollback on any error conn.rollback() if log_func: log_func(f"Error during '{operation}' query execution: {e}", "ERROR") return None finally: cursor.close() ####################### # Ensure quotes table exists ####################### def ensure_quotes_table(db_conn, log_func): """ Checks if 'quotes' table exists. If not, attempts to create it. Raises an Exception or logs errors if creation fails. """ # 1) Determine if DB is sqlite or mariadb for the system table check is_sqlite = "sqlite3" in str(type(db_conn)).lower() # 2) Check existence if is_sqlite: # For SQLite: check the sqlite_master table check_sql = """ SELECT name FROM sqlite_master WHERE type='table' AND name='quotes' """ else: # For MariaDB/MySQL: check information_schema check_sql = """ SELECT table_name FROM information_schema.tables WHERE table_name = 'quotes' AND table_schema = DATABASE() """ from modules.db import run_db_operation rows = run_db_operation(db_conn, "read", check_sql, log_func=log_func) if rows and rows[0] and rows[0][0]: # The table 'quotes' already exists log_func("Table 'quotes' already exists, skipping creation.", "DEBUG") return # We can just return # 3) Table does NOT exist => create it log_func("Table 'quotes' does not exist; creating now...") if is_sqlite: create_table_sql = """ CREATE TABLE quotes ( ID INTEGER PRIMARY KEY AUTOINCREMENT, QUOTE_TEXT TEXT, QUOTEE TEXT, QUOTE_CHANNEL TEXT, QUOTE_DATETIME TEXT, QUOTE_GAME TEXT, QUOTE_REMOVED BOOLEAN DEFAULT 0, QUOTE_REMOVED_BY TEXT ) """ else: create_table_sql = """ CREATE TABLE quotes ( ID INT PRIMARY KEY AUTO_INCREMENT, QUOTE_TEXT TEXT, QUOTEE VARCHAR(100), QUOTE_CHANNEL VARCHAR(100), QUOTE_DATETIME DATETIME DEFAULT CURRENT_TIMESTAMP, QUOTE_GAME VARCHAR(200), QUOTE_REMOVED BOOLEAN DEFAULT FALSE, QUOTE_REMOVED_BY VARCHAR(100) ) """ result = run_db_operation(db_conn, "write", create_table_sql, log_func=log_func) if result is None: # If run_db_operation returns None on error, handle or raise: error_msg = "Failed to create 'quotes' table!" log_func(error_msg, "ERROR") raise RuntimeError(error_msg) log_func("Successfully created table 'quotes'.")