Program logging rework

- Logging functionality has been centralised to 'globals.log'

This should allow easy log integration into subsystems without the need of passing the function everywhere *phew*
kami_dev
Kami 2025-02-11 10:47:48 +01:00
parent 699d8d493e
commit 66f3d03bc6
9 changed files with 587 additions and 475 deletions

View File

@ -5,21 +5,23 @@ from discord.ext import commands
import importlib
import cmd_discord
import globals
import modules
import modules.utility
from modules.db import log_message, lookup_user, log_bot_event
class DiscordBot(commands.Bot):
def __init__(self, config, log_func):
def __init__(self, config):
super().__init__(command_prefix="!", intents=discord.Intents.all())
self.remove_command("help") # Remove built-in help function
self.config = config
self.log = log_func # Use the logging function from bots.py
self.log = globals.log # Use the logging function from bots.py
self.db_conn = None # We'll set this later
self.help_data = None # We'll set this later
self.load_commands()
self.log("Discord bot initiated")
globals.log("Discord bot initiated")
# async def sync_slash_commands(self):
# """Syncs slash commands for the bot."""
@ -46,22 +48,24 @@ class DiscordBot(commands.Bot):
try:
importlib.reload(cmd_discord) # Reload the commands file
cmd_discord.setup(self) # Ensure commands are registered
self.log("Discord commands loaded successfully.")
globals.log("Discord commands loaded successfully.")
# Load help info
help_json_path = "dictionary/help_discord.json"
modules.utility.initialize_help_data(
bot=self,
help_json_path=help_json_path,
is_discord=True,
log_func=self.log
is_discord=True
)
except Exception as e:
self.log(f"Error loading Discord commands: {e}", "ERROR", True)
globals.log(f"Error loading Discord commands: {e}", "ERROR", True)
async def on_message(self, message):
self.log(f"Message detected, attempting UUI lookup on {message.author.name} ...", "DEBUG")
globals.log(f"Message detected by '{message.author.name}' in '{message.author.guild.name}' - #'{message.channel.name}'", "DEBUG")
#globals.log(f"Message body:\n{message}\nMessage content: {message.content}", "DEBUG") # Full message debug
globals.log(f"Message content: '{message.content}'", "DEBUG") # Partial message debug (content only)
globals.log(f"Attempting UUI lookup on '{message.author.name}' ...", "DEBUG")
try:
# If it's a bot message, ignore or pass user_is_bot=True
is_bot = message.author.bot
@ -71,7 +75,6 @@ class DiscordBot(commands.Bot):
modules.utility.track_user_activity(
db_conn=self.db_conn,
log_func=self.log,
platform="discord",
user_id=user_id,
username=user_name,
@ -79,9 +82,9 @@ class DiscordBot(commands.Bot):
user_is_bot=is_bot
)
self.log(f"... UUI lookup complete", "DEBUG")
globals.log(f"... UUI lookup complete", "DEBUG")
user_data = lookup_user(db_conn=self.db_conn, log_func=self.log, identifier=user_id, identifier_type="discord_user_id")
user_data = lookup_user(db_conn=self.db_conn, identifier=user_id, identifier_type="discord_user_id")
user_uuid = user_data["UUID"] if user_data else "UNKNOWN"
if user_uuid:
# The "platform" can be e.g. "discord" or you can store the server name
@ -97,33 +100,33 @@ class DiscordBot(commands.Bot):
log_message(
db_conn=self.db_conn,
log_func=self.log,
user_uuid=user_uuid,
message_content=message.content or "",
platform=platform_str,
channel=channel_str,
attachments=attachments
attachments=attachments,
username=message.author.name
)
# PLACEHOLDER FOR FUTURE MESSAGE PROCESSING
except Exception as e:
self.log(f"... UUI lookup failed: {e}", "WARNING")
globals.log(f"... UUI lookup failed: {e}", "WARNING")
pass
try:
# Pass message contents to commands processing
await self.process_commands(message)
self.log(f"Command processing complete", "DEBUG")
globals.log(f"Command processing complete", "DEBUG")
except Exception as e:
self.log(f"Command processing failed: {e}", "ERROR")
globals.log(f"Command processing failed: {e}", "ERROR")
async def on_command(self, ctx):
"""Logs every command execution at DEBUG level."""
_cmd_args = str(ctx.message.content).split(" ")[1:]
channel_name = "Direct Message" if "Direct Message with" in str(ctx.channel) else ctx.channel
self.log(f"Command '{ctx.command}' (Discord) initiated by {ctx.author} in #{channel_name}", "DEBUG")
if len(_cmd_args) > 1: self.log(f"!{ctx.command} arguments: {_cmd_args}", "DEBUG")
globals.log(f"Command '{ctx.command}' (Discord) initiated by {ctx.author} in #{channel_name}", "DEBUG")
if len(_cmd_args) > 1: globals.log(f"!{ctx.command} arguments: {_cmd_args}", "DEBUG")
async def on_ready(self):
"""Runs when the bot successfully logs in."""
@ -131,21 +134,21 @@ class DiscordBot(commands.Bot):
try:
# Sync slash commands globally
#await self.tree.sync()
#self.log("Discord slash commands synced.")
#globals.log("Discord slash commands synced.")
primary_guild_int = int(self.config["discord_guilds"][0])
primary_guild = discord.Object(id=primary_guild_int)
await self.tree.sync(guild=primary_guild)
self.log(f"Discord slash commands force synced to guild: {primary_guild_int}")
globals.log(f"Discord slash commands force synced to guild: {primary_guild_int}")
except Exception as e:
self.log(f"Unable to sync Discord slash commands: {e}")
globals.log(f"Unable to sync Discord slash commands: {e}")
# Log successful bot startup
self.log(f"Discord bot is online as {self.user}")
log_bot_event(self.db_conn, self.log, "DISCORD_RECONNECTED", "Discord bot logged in.")
globals.log(f"Discord bot is online as {self.user}")
log_bot_event(self.db_conn, "DISCORD_RECONNECTED", "Discord bot logged in.")
async def on_disconnect(self):
self.log("Discord bot has lost connection!", "WARNING")
log_bot_event(self.db_conn, self.log, "DISCORD_DISCONNECTED", "Discord bot lost connection.")
globals.log("Discord bot has lost connection!", "WARNING")
log_bot_event(self.db_conn, "DISCORD_DISCONNECTED", "Discord bot lost connection.")
async def on_voice_state_update(self, member, before, after):
"""
@ -156,54 +159,53 @@ class DiscordBot(commands.Bot):
voice_channel = after.channel.name if after.channel else before.channel.name if before.channel else None
# Ensure user exists in the UUI system
user_uuid = modules.db.lookup_user(self.db_conn, self.log, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
user_uuid = modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
self.log(f"User {member.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "WARNING")
globals.log(f"User {member.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "WARNING")
modules.utility.track_user_activity(
db_conn=self.db_conn,
log_func=self.log,
platform="discord",
user_id=discord_user_id,
username=member.name,
display_name=member.display_name,
user_is_bot=member.bot
)
user_uuid= modules.db.lookup_user(self.db_conn, self.log, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
user_uuid= modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
self.log(f"ERROR: Failed to associate {member.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
globals.log(f"ERROR: Failed to associate {member.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
return # Prevent logging with invalid UUID
# Detect join and leave events
if before.channel is None and after.channel is not None:
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, "JOIN", after.channel.name)
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "JOIN", after.channel.name)
elif before.channel is not None and after.channel is None:
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, "LEAVE", before.channel.name)
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "LEAVE", before.channel.name)
# Detect VC moves (self/moved)
if before.channel and after.channel and before.channel != after.channel:
move_detail = f"{before.channel.name} -> {after.channel.name}"
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, "VC_MOVE", after.channel.name, move_detail)
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "VC_MOVE", after.channel.name, move_detail)
# Detect mute/unmute
if before.self_mute != after.self_mute:
mute_action = "MUTE" if after.self_mute else "UNMUTE"
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, mute_action, voice_channel)
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, mute_action, voice_channel)
# Detect deafen/undeafen
if before.self_deaf != after.self_deaf:
deaf_action = "DEAFEN" if after.self_deaf else "UNDEAFEN"
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, deaf_action, voice_channel)
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, deaf_action, voice_channel)
# Detect streaming
if before.self_stream != after.self_stream:
stream_action = "STREAM_START" if after.self_stream else "STREAM_STOP"
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, stream_action, voice_channel)
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, stream_action, voice_channel)
# Detect camera usage
if before.self_video != after.self_video:
camera_action = "CAMERA_ON" if after.self_video else "CAMERA_OFF"
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, camera_action, voice_channel)
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, camera_action, voice_channel)
async def on_presence_update(self, before, after):
@ -220,26 +222,24 @@ class DiscordBot(commands.Bot):
# Ensure user exists in the UUI system
user_uuid = modules.db.lookup_user(
self.db_conn,
self.log,
identifier=discord_user_id,
identifier_type="discord_user_id",
target_identifier="UUID"
)
if not user_uuid:
self.log(f"User {after.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "WARNING")
globals.log(f"User {after.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "WARNING")
modules.utility.track_user_activity(
db_conn=self.db_conn,
log_func=self.log,
platform="discord",
user_id=discord_user_id,
username=after.name,
display_name=after.display_name,
user_is_bot=after.bot
)
user_uuid = modules.db.lookup_user(self.db_conn, self.log, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
user_uuid = modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
self.log(f"ERROR: Failed to associate {after.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
globals.log(f"ERROR: Failed to associate {after.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
return
# Check all activities
@ -266,9 +266,9 @@ class DiscordBot(commands.Bot):
old_activity = ("STREAM_STOP", o_activity.game or "Sharing screen")
if new_activity:
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, new_activity[0], None, new_activity[1])
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, new_activity[0], None, new_activity[1])
if old_activity:
modules.db.log_discord_activity(self.db_conn, self.log, guild_id, user_uuid, old_activity[0], None, old_activity[1])
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, old_activity[0], None, old_activity[1])
# async def start_account_linking(self, interaction: discord.Interaction):
# """Starts the linking process by generating a link code and displaying instructions."""
@ -331,4 +331,4 @@ class DiscordBot(commands.Bot):
try:
await super().start(token)
except Exception as e:
self.log(f"Discord bot error: {e}", "CRITICAL")
globals.log(f"Discord bot error: {e}", "CRITICAL")

View File

@ -6,17 +6,19 @@ from twitchio.ext import commands
import importlib
import cmd_twitch
import globals
import modules
import modules.utility
from modules.db import log_message, lookup_user, log_bot_event
class TwitchBot(commands.Bot):
def __init__(self, config, log_func):
def __init__(self, config):
self.client_id = os.getenv("TWITCH_CLIENT_ID")
self.client_secret = os.getenv("TWITCH_CLIENT_SECRET")
self.token = os.getenv("TWITCH_BOT_TOKEN")
self.refresh_token = os.getenv("TWITCH_REFRESH_TOKEN")
self.log = log_func # Use the logging function from bots.py
self.log = globals.log # Use the logging function from bots.py
self.config = config
self.db_conn = None # We'll set this
self.help_data = None # We'll set this later
@ -28,7 +30,7 @@ class TwitchBot(commands.Bot):
initial_channels=config["twitch_channels"]
)
self.log("Twitch bot initiated")
globals.log("Twitch bot initiated")
# 2) Then load commands
self.load_commands()
@ -53,8 +55,8 @@ class TwitchBot(commands.Bot):
_cmd = message.content[1:] # Remove the leading "!"
_cmd_args = _cmd.split(" ")[1:]
_cmd = _cmd.split(" ", 1)[0]
self.log(f"Command '{_cmd}' (Twitch) initiated by {message.author.name} in #{message.channel.name}", "DEBUG")
if len(_cmd_args) > 1: self.log(f"!{_cmd} arguments: {_cmd_args}", "DEBUG")
globals.log(f"Command '{_cmd}' (Twitch) initiated by {message.author.name} in #{message.channel.name}", "DEBUG")
if len(_cmd_args) > 1: globals.log(f"!{_cmd} arguments: {_cmd_args}", "DEBUG")
try:
# Typically message.author is not None for normal chat messages
@ -67,11 +69,10 @@ class TwitchBot(commands.Bot):
user_name = author.name
display_name = author.display_name or user_name
self.log(f"Message detected, attempting UUI lookup on {user_name} ...", "DEBUG")
globals.log(f"Message detected, attempting UUI lookup on {user_name} ...", "DEBUG")
modules.utility.track_user_activity(
db_conn=self.db_conn,
log_func=self.log,
platform="twitch",
user_id=user_id,
username=user_name,
@ -79,14 +80,13 @@ class TwitchBot(commands.Bot):
user_is_bot=is_bot
)
self.log("... UUI lookup complete.", "DEBUG")
globals.log("... UUI lookup complete.", "DEBUG")
user_data = lookup_user(db_conn=self.db_conn, log_func=self.log, identifier=str(message.author.id), identifier_type="twitch_user_id")
user_data = lookup_user(db_conn=self.db_conn, identifier=str(message.author.id), identifier_type="twitch_user_id")
user_uuid = user_data["UUID"] if user_data else "UNKNOWN"
from modules.db import log_message
log_message(
db_conn=self.db_conn,
log_func=self.log,
user_uuid=user_uuid,
message_content=message.content or "",
platform="twitch",
@ -95,26 +95,26 @@ class TwitchBot(commands.Bot):
)
except Exception as e:
self.log(f"... UUI lookup failed: {e}", "ERROR")
globals.log(f"... UUI lookup failed: {e}", "ERROR")
# Pass message contents to commands processing
await self.handle_commands(message)
async def event_ready(self):
self.log(f"Twitch bot is online as {self.nick}")
log_bot_event(self.db_conn, self.log, "TWITCH_RECONNECTED", "Twitch bot logged in.")
globals.log(f"Twitch bot is online as {self.nick}")
log_bot_event(self.db_conn, "TWITCH_RECONNECTED", "Twitch bot logged in.")
async def event_disconnected(self):
self.log("Twitch bot has lost connection!", "WARNING")
log_bot_event(self.db_conn, self.log, "TWITCH_DISCONNECTED", "Twitch bot lost connection.")
globals.log("Twitch bot has lost connection!", "WARNING")
log_bot_event(self.db_conn, "TWITCH_DISCONNECTED", "Twitch bot lost connection.")
async def refresh_access_token(self):
"""
Refreshes the Twitch access token using the stored refresh token.
Retries up to 3 times before logging a fatal error.
"""
self.log("Attempting to refresh Twitch token...")
globals.log("Attempting to refresh Twitch token...")
url = "https://id.twitch.tv/oauth2/token"
params = {
@ -137,7 +137,7 @@ class TwitchBot(commands.Bot):
os.environ["TWITCH_REFRESH_TOKEN"] = self.refresh_token
self.update_env_file()
self.log("Twitch token refreshed successfully. Restarting bot...")
globals.log("Twitch token refreshed successfully. Restarting bot...")
# Restart the TwitchIO connection
await self.close() # Close the old connection
@ -145,15 +145,15 @@ class TwitchBot(commands.Bot):
return # Exit function after successful refresh
else:
self.log(f"Twitch token refresh failed (Attempt {attempt+1}/3): {data}", "WARNING")
globals.log(f"Twitch token refresh failed (Attempt {attempt+1}/3): {data}", "WARNING")
except Exception as e:
self.log(f"Twitch token refresh error (Attempt {attempt+1}/3): {e}", "ERROR")
globals.log(f"Twitch token refresh error (Attempt {attempt+1}/3): {e}", "ERROR")
await asyncio.sleep(10) # Wait before retrying
# If all attempts fail, log error
self.log("Twitch token refresh failed after 3 attempts.", "FATAL")
globals.log("Twitch token refresh failed after 3 attempts.", "FATAL")
def update_env_file(self):
@ -173,10 +173,10 @@ class TwitchBot(commands.Bot):
else:
file.write(line)
self.log("Updated .env file with new Twitch token.")
globals.log("Updated .env file with new Twitch token.")
except Exception as e:
self.log(f"Failed to update .env file: {e}", "ERROR")
globals.log(f"Failed to update .env file: {e}", "ERROR")
def load_commands(self):
"""
@ -184,19 +184,18 @@ class TwitchBot(commands.Bot):
"""
try:
cmd_twitch.setup(self)
self.log("Twitch commands loaded successfully.")
globals.log("Twitch commands loaded successfully.")
# Now load the help info from dictionary/help_twitch.json
help_json_path = "dictionary/help_twitch.json"
modules.utility.initialize_help_data(
bot=self,
help_json_path=help_json_path,
is_discord=False, # Twitch
log_func=self.log
is_discord=False # Twitch
)
except Exception as e:
self.log(f"Error loading Twitch commands: {e}", "ERROR")
globals.log(f"Error loading Twitch commands: {e}", "ERROR")
async def run(self):
"""
@ -205,7 +204,7 @@ class TwitchBot(commands.Bot):
retries = 0
while True:
if retries > 3:
self.log(f"Twitch bot failed to connect after {retries} attempts.", "CIRITCAL")
globals.log(f"Twitch bot failed to connect after {retries} attempts.", "CIRITCAL")
break # Break loop if repeatedly failing to connect to Twitch
try:
await self.start()
@ -214,18 +213,18 @@ class TwitchBot(commands.Bot):
# await asyncio.sleep(10800) # Refresh every 3 hours
except Exception as e:
retries += 1
self.log(f"Twitch bot failed to start: {e}", "CRITICAL")
globals.log(f"Twitch bot failed to start: {e}", "CRITICAL")
if "Invalid or unauthorized Access Token passed." in str(e):
try:
await self.refresh_access_token()
self.log("Retrying bot connection after token refresh...", "INFO")
globals.log("Retrying bot connection after token refresh...", "INFO")
await self.start() # Restart connection with new token
return # Exit retry loop
except Exception as e:
self.log(f"Unable to refresh Twitch token! Twitch bot will be offline!", "CRITICAL")
globals.log(f"Unable to refresh Twitch token! Twitch bot will be offline!", "CRITICAL")
if self._keeper:
self._keeper.cancel()
if "'NoneType' object has no attribute 'cancel'" in str(e):
self.log(f"The Twitch bot experienced an initialization glitch. Try starting again", "FATAL")
globals.log(f"The Twitch bot experienced an initialization glitch. Try starting again", "FATAL")
await asyncio.sleep(5) # Wait before retrying to authenticate

131
bots.py
View File

@ -22,82 +22,12 @@ from modules import db, utility
# Load environment variables
load_dotenv()
# Clear previous current-run logfile
globals.reset_curlogfile()
# Load bot configuration
config_data = globals.load_config_file()
# Initiate logfile
logfile_path = config_data["logging"]["logfile_path"]
logfile = open(logfile_path, "a")
cur_logfile_path = f"cur_{logfile_path}"
cur_logfile = open(cur_logfile_path, "w")
if not config_data["logging"]["terminal"]["log_to_terminal"] and not config_data["logging"]["file"]["log_to_file"]:
print(f"!!! WARNING !!! CONSOLE AND LOGFILE OUTPUT DISABLED !!!\n!!! NO LOGS WILL BE PROVIDED !!!")
###############################
# Simple Logging System
###############################
def log(message, level="INFO", exec_info=False):
"""
A simple logging function with adjustable log levels.
Logs messages in a structured format.
Available levels:\n
DEBUG = Information useful for debugging\n
INFO = Informational messages\n
WARNING = Something happened that may lead to issues\n
ERROR = A non-critical error has happened\n
CRITICAL = A critical, but non-fatal, error\n
FATAL = Fatal error. Program exits after logging this\n\n
See 'config.json' for disabling/enabling logging levels
"""
from modules import utility
log_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"}
if level not in log_levels:
level = "INFO" # Default to INFO if an invalid level is provided
if level in config_data["logging"]["log_levels"] or level == "FATAL":
elapsed = time.time() - globals.get_bot_start_time()
uptime_str, _ = utility.format_uptime(elapsed)
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_message = f"[{timestamp} - {uptime_str}] [{level}] {message}"
# Include traceback for certain error levels
if exec_info or level in ["CRITICAL", "FATAL"]:
log_message += f"\n{traceback.format_exc()}"
# Print to terminal if enabled
# 'FATAL' errors override settings
# Checks config file to see enabled/disabled logging levels
if config_data["logging"]["terminal"]["log_to_terminal"] or level == "FATAL":
config_level_format = f"log_{level.lower()}"
if config_data["logging"]["terminal"][config_level_format] or level == "FATAL":
print(log_message)
# Write to file if enabled
# 'FATAL' errors override settings
# Checks config file to see enabled/disabled logging levels
if config_data["logging"]["file"]["log_to_file"] or level == "FATAL":
config_level_format = f"log_{level.lower()}"
if config_data["logging"]["file"][config_level_format] or level == "FATAL":
try:
lf = config_data["logging"]["logfile_path"]
clf = f"cur_{lf}"
with open(lf, "a", encoding="utf-8") as logfile: # Write to permanent logfile
logfile.write(f"{log_message}\n")
logfile.flush() # Ensure it gets written immediately
with open(clf, "a", encoding="utf-8") as c_logfile: # Write to this-run logfile
c_logfile.write(f"{log_message}\n")
c_logfile.flush() # Ensure it gets written immediately
except Exception as e:
print(f"[WARNING] Failed to write to logfile: {e}")
# Handle fatal errors with shutdown
if level == "FATAL":
print(f"!!! FATAL ERROR LOGGED, SHUTTING DOWN !!!")
sys.exit(1)
###############################
# Main Event Loop
###############################
@ -106,57 +36,58 @@ async def main():
global discord_bot, twitch_bot, db_conn
# Log initial start
log("--------------- BOT STARTUP ---------------")
globals.log("--------------- BOT STARTUP ---------------")
# Before creating your DiscordBot/TwitchBot, initialize DB
try:
db_conn = db.init_db_connection(config_data, log)
db_conn = db.init_db_connection(config_data)
if not db_conn:
# If we get None, it means FATAL. We might sys.exit(1) or handle it differently.
log("Terminating bot due to no DB connection.", "FATAL")
globals.log("Terminating bot due to no DB connection.", "FATAL")
sys.exit(1)
except Exception as e:
log(f"Unable to initialize database!: {e}", "FATAL")
globals.log(f"Unable to initialize database!: {e}", "FATAL")
try: # Ensure FKs are enabled
db.checkenable_db_fk(db_conn, log)
db.checkenable_db_fk(db_conn)
except Exception as e:
log(f"Unable to ensure Foreign keys are enabled: {e}", "WARNING")
globals.log(f"Unable to ensure Foreign keys are enabled: {e}", "WARNING")
# auto-create the quotes table if it doesn't exist
tables = {
"Bot events table": partial(db.ensure_bot_events_table, db_conn, log),
"Quotes table": partial(db.ensure_quotes_table, db_conn, log),
"Users table": partial(db.ensure_users_table, db_conn, log),
"Chatlog table": partial(db.ensure_chatlog_table, db_conn, log),
"Howls table": partial(db.ensure_userhowls_table, db_conn, log),
"Discord activity table": partial(db.ensure_discord_activity_table, db_conn, log),
"Account linking table": partial(db.ensure_link_codes_table, db_conn, log)
"Bot events table": partial(db.ensure_bot_events_table, db_conn),
"Quotes table": partial(db.ensure_quotes_table, db_conn),
"Users table": partial(db.ensure_users_table, db_conn),
"Chatlog table": partial(db.ensure_chatlog_table, db_conn),
"Howls table": partial(db.ensure_userhowls_table, db_conn),
"Discord activity table": partial(db.ensure_discord_activity_table, db_conn),
"Account linking table": partial(db.ensure_link_codes_table, db_conn)
}
try:
for table, func in tables.items():
func() # Call the function with db_conn and log already provided
log(f"{table} ensured.", "DEBUG")
globals.log(f"{table} ensured.", "DEBUG")
except Exception as e:
log(f"Unable to ensure DB tables exist: {e}", "FATAL")
globals.log(f"Unable to ensure DB tables exist: {e}", "FATAL")
log("Initializing bots...")
globals.log("Initializing bots...")
# Create both bots
discord_bot = DiscordBot(config_data, log)
twitch_bot = TwitchBot(config_data, log)
discord_bot = DiscordBot(config_data)
twitch_bot = TwitchBot(config_data)
# Log startup
utility.log_bot_startup(db_conn, log)
utility.log_bot_startup(db_conn)
# Provide DB connection to both bots
try:
discord_bot.set_db_connection(db_conn)
twitch_bot.set_db_connection(db_conn)
log(f"Initialized database connection to both bots")
globals.log(f"Initialized database connection to both bots")
except Exception as e:
log(f"Unable to initialize database connection to one or both bots: {e}", "FATAL")
globals.log(f"Unable to initialize database connection to one or both bots: {e}", "FATAL")
log("Starting Discord and Twitch bots...")
globals.log("Starting Discord and Twitch bots...")
discord_task = asyncio.create_task(discord_bot.run(os.getenv("DISCORD_BOT_TOKEN")))
twitch_task = asyncio.create_task(twitch_bot.run())
@ -164,8 +95,8 @@ async def main():
from modules.utility import dev_func
enable_dev_func = False
if enable_dev_func:
dev_func_result = dev_func(db_conn, log, enable_dev_func)
log(f"dev_func output: {dev_func_result}")
dev_func_result = dev_func(db_conn, enable_dev_func)
globals.log(f"dev_func output: {dev_func_result}")
await asyncio.gather(discord_task, twitch_task)
@ -173,8 +104,8 @@ if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
utility.log_bot_shutdown(db_conn, log, intent="User Shutdown")
utility.log_bot_shutdown(db_conn, intent="User Shutdown")
except Exception as e:
error_trace = traceback.format_exc()
log(f"Fatal Error: {e}\n{error_trace}", "FATAL")
utility.log_bot_shutdown(db_conn, log)
globals.log(f"Fatal Error: {e}\n{error_trace}", "FATAL")
utility.log_bot_shutdown(db_conn)

View File

@ -79,7 +79,6 @@ def handle_howl_normal(ctx, platform, author_id, author_display_name) -> str:
Normal usage: random generation, store in DB.
"""
db_conn = ctx.bot.db_conn
log_func = ctx.bot.log
# random logic
howl_val = random.randint(0, 100)
@ -97,21 +96,20 @@ def handle_howl_normal(ctx, platform, author_id, author_display_name) -> str:
)
# find user in DB by ID
user_data = db.lookup_user(db_conn, log_func, identifier=author_id, identifier_type=platform)
user_data = db.lookup_user(db_conn, identifier=author_id, identifier_type=platform)
if user_data:
db.insert_howl(db_conn, log_func, user_data["UUID"], howl_val)
db.insert_howl(db_conn, user_data["UUID"], howl_val)
else:
log_func(f"Could not find user by ID={author_id} on {platform}. Not storing howl.", "WARNING")
globals.log(f"Could not find user by ID={author_id} on {platform}. Not storing howl.", "WARNING")
return reply
def handle_howl_stats(ctx, platform, target_name) -> str:
db_conn = ctx.bot.db_conn
log_func = ctx.bot.log
# Check if requesting global stats
if target_name in ("_COMMUNITY_", "all", "global", "community"):
stats = db.get_global_howl_stats(db_conn, log_func)
stats = db.get_global_howl_stats(db_conn)
if not stats:
return "No howls have been recorded yet!"
@ -128,11 +126,11 @@ def handle_howl_stats(ctx, platform, target_name) -> str:
f"0% Howls: {count_zero}, 100% Howls: {count_hundred}")
# Otherwise, lookup a single user
user_data = lookup_user_by_name(db_conn, log_func, platform, target_name)
user_data = lookup_user_by_name(db_conn, platform, target_name)
if not user_data:
return f"I don't know that user: {target_name}"
stats = db.get_howl_stats(db_conn, log_func, user_data["UUID"])
stats = db.get_howl_stats(db_conn, user_data["UUID"])
if not stats:
return f"{target_name} hasn't howled yet! (Try `!howl` to get started.)"
@ -144,25 +142,25 @@ def handle_howl_stats(ctx, platform, target_name) -> str:
f"(0% x{z}, 100% x{h})")
def lookup_user_by_name(db_conn, log_func, platform, name_str):
def lookup_user_by_name(db_conn, platform, name_str):
"""
Attempt to find a user by name on that platform, e.g. 'discord_username' or 'twitch_username'.
"""
# same logic as before
if platform == "discord":
ud = db.lookup_user(db_conn, log_func, name_str, "discord_user_display_name")
ud = db.lookup_user(db_conn, name_str, "discord_user_display_name")
if ud:
return ud
ud = db.lookup_user(db_conn, log_func, name_str, "discord_username")
ud = db.lookup_user(db_conn, name_str, "discord_username")
return ud
elif platform == "twitch":
ud = db.lookup_user(db_conn, log_func, name_str, "twitch_user_display_name")
ud = db.lookup_user(db_conn, name_str, "twitch_user_display_name")
if ud:
return ud
ud = db.lookup_user(db_conn, log_func, name_str, "twitch_username")
ud = db.lookup_user(db_conn, name_str, "twitch_username")
return ud
else:
log_func(f"Unknown platform {platform} in lookup_user_by_name", "WARNING")
globals.log(f"Unknown platform {platform} in lookup_user_by_name", "WARNING")
return None
@ -201,14 +199,14 @@ def greet(target_display_name: str, platform_name: str) -> str:
# Quotes
######################
def create_quotes_table(db_conn, log_func):
def create_quotes_table(db_conn):
"""
Creates the 'quotes' table if it does not exist, with the columns:
ID, QUOTE_TEXT, QUOTEE, QUOTE_CHANNEL, QUOTE_DATETIME, QUOTE_GAME, QUOTE_REMOVED
Uses a slightly different CREATE statement depending on MariaDB vs SQLite.
"""
if not db_conn:
log_func("No database connection available to create quotes table!", "FATAL")
globals.log("No database connection available to create quotes table!", "FATAL")
return
# Detect if this is SQLite or MariaDB
@ -241,14 +239,13 @@ def create_quotes_table(db_conn, log_func):
)
"""
db.run_db_operation(db_conn, "write", create_table_sql, log_func=log_func)
db.run_db_operation(db_conn, "write", create_table_sql)
async def handle_quote_command(db_conn, log_func, is_discord: bool, ctx, args, get_twitch_game_for_channel=None):
async def handle_quote_command(db_conn, is_discord: bool, ctx, args, get_twitch_game_for_channel=None):
"""
Core logic for !quote command, shared by both Discord and Twitch.
- `db_conn`: your active DB connection
- `log_func`: your log(...) function
- `is_discord`: True if this command is being called from Discord, False if from Twitch
- `ctx`: the context object (discord.py ctx or twitchio context)
- `args`: a list of arguments (e.g. ["add", "some quote text..."] or ["remove", "3"] or ["2"] etc.)
@ -266,7 +263,7 @@ async def handle_quote_command(db_conn, log_func, is_discord: bool, ctx, args, g
"""
# If no subcommand, treat as "random"
if len(args) == 0:
return await retrieve_random_quote(db_conn, log_func, is_discord, ctx)
return await retrieve_random_quote(db_conn, is_discord, ctx)
sub = args[0].lower()
@ -274,23 +271,23 @@ async def handle_quote_command(db_conn, log_func, is_discord: bool, ctx, args, g
# everything after "add" is the quote text
quote_text = " ".join(args[1:]).strip()
if not quote_text:
return await send_message(ctx, "Please provide the quote text after 'add'.")
await add_new_quote(db_conn, log_func, is_discord, ctx, quote_text, get_twitch_game_for_channel)
return "Please provide the quote text after 'add'."
return await add_new_quote(db_conn, is_discord, ctx, quote_text, get_twitch_game_for_channel)
elif sub == "remove":
if len(args) < 2:
return await send_message(ctx, "Please specify which quote ID to remove.")
await remove_quote(db_conn, log_func, is_discord, ctx, quote_id_str=args[1])
return "Please specify which quote ID to remove."
return await remove_quote(db_conn, is_discord, ctx, quote_id_str=args[1])
else:
# Possibly a quote ID
if sub.isdigit():
quote_id = int(sub)
await retrieve_specific_quote(db_conn, log_func, ctx, quote_id, is_discord)
return await retrieve_specific_quote(db_conn, ctx, quote_id, is_discord)
else:
# unrecognized subcommand => fallback to random
await retrieve_random_quote(db_conn, log_func, is_discord, ctx)
return await retrieve_random_quote(db_conn, is_discord, ctx)
async def add_new_quote(db_conn, log_func, is_discord, ctx, quote_text, get_twitch_game_for_channel):
async def add_new_quote(db_conn, is_discord, ctx, quote_text, get_twitch_game_for_channel):
"""
Inserts a new quote with UUID instead of username.
"""
@ -298,11 +295,10 @@ async def add_new_quote(db_conn, log_func, is_discord, ctx, quote_text, get_twit
platform = "discord" if is_discord else "twitch"
# Lookup UUID from users table
user_data = db.lookup_user(db_conn, log_func, identifier=user_id, identifier_type=f"{platform}_user_id")
user_data = db.lookup_user(db_conn, identifier=user_id, identifier_type=f"{platform}_user_id")
if not user_data:
log_func(f"ERROR: Could not find UUID for user {ctx.author.name} ({user_id}) on {platform}. Quote not saved.", "ERROR")
await ctx.send("Could not save quote. Your user data is missing from the system.")
return
globals.log(f"ERROR: Could not find UUID for user {ctx.author.name} ({user_id}) on {platform}. Quote not saved.", "ERROR")
return "Could not save quote. Your user data is missing from the system."
user_uuid = user_data["UUID"]
channel_name = "Discord" if is_discord else ctx.channel.name
@ -317,29 +313,28 @@ async def add_new_quote(db_conn, log_func, is_discord, ctx, quote_text, get_twit
"""
params = (quote_text, user_uuid, channel_name, game_name)
result = db.run_db_operation(db_conn, "write", insert_sql, params, log_func=log_func)
result = db.run_db_operation(db_conn, "write", insert_sql, params)
if result is not None:
await ctx.send("Quote added successfully!")
return "Quote added successfully!"
else:
await ctx.send("Failed to add quote.")
return "Failed to add quote."
async def remove_quote(db_conn, log_func, is_discord: bool, ctx, quote_id_str):
async def remove_quote(db_conn, is_discord: bool, ctx, quote_id_str):
"""
Mark quote #ID as removed (QUOTE_REMOVED=1).
"""
if not quote_id_str.isdigit():
return await send_message(ctx, f"'{quote_id_str}' is not a valid quote ID.")
return f"'{quote_id_str}' is not a valid quote ID."
user_id = str(ctx.author.id)
platform = "discord" if is_discord else "twitch"
# Lookup UUID from users table
user_data = db.lookup_user(db_conn, log_func, identifier=user_id, identifier_type=f"{platform}_user_id")
user_data = db.lookup_user(db_conn, identifier=user_id, identifier_type=f"{platform}_user_id")
if not user_data:
log_func(f"ERROR: Could not find UUID for user {ctx.author.name} ({user_id}) on {platform}. Quote not removed.", "ERROR")
await ctx.send("Could not remove quote. Your user data is missing from the system.")
return
globals.log(f"ERROR: Could not find UUID for user {ctx.author.name} ({user_id}) on {platform}. Quote not removed.", "ERROR")
return "Could not remove quote. Your user data is missing from the system."
user_uuid = user_data["UUID"]
@ -355,24 +350,24 @@ async def remove_quote(db_conn, log_func, is_discord: bool, ctx, quote_id_str):
AND QUOTE_REMOVED = 0
"""
params = (remover_user, quote_id)
rowcount = db.run_db_operation(db_conn, "update", update_sql, params, log_func=log_func)
rowcount = db.run_db_operation(db_conn, "update", update_sql, params)
if rowcount and rowcount > 0:
await send_message(ctx, f"Removed quote #{quote_id}.")
return f"Removed quote #{quote_id}."
else:
await send_message(ctx, "Could not remove that quote (maybe it's already removed or doesn't exist).")
return "Could not remove that quote (maybe it's already removed or doesn't exist)."
async def retrieve_specific_quote(db_conn, log_func, ctx, quote_id, is_discord):
async def retrieve_specific_quote(db_conn, ctx, quote_id, is_discord):
"""
Retrieve a specific quote by ID, if not removed.
If not found, or removed, inform user of the valid ID range (1 - {max_id})
If no quotes exist at all, say "No quotes are created yet."
"""
# First, see if we have any quotes at all
max_id = get_max_quote_id(db_conn, log_func)
max_id = get_max_quote_id(db_conn)
if max_id < 1:
return await send_message(ctx, "No quotes are created yet.")
return "No quotes are created yet."
# Query for that specific quote
select_sql = """
@ -388,11 +383,11 @@ async def retrieve_specific_quote(db_conn, log_func, ctx, quote_id, is_discord):
FROM quotes
WHERE ID = ?
"""
rows = db.run_db_operation(db_conn, "read", select_sql, (quote_id,), log_func=log_func)
rows = db.run_db_operation(db_conn, "read", select_sql, (quote_id,))
if not rows:
# no match
return await send_message(ctx, f"I couldn't find that quote (1-{max_id}).")
return f"I couldn't find that quote (1-{max_id})."
row = rows[0]
quote_number = row[0]
@ -407,30 +402,30 @@ async def retrieve_specific_quote(db_conn, log_func, ctx, quote_id, is_discord):
platform = "discord" if is_discord else "twitch"
# Lookup UUID from users table
user_data = db.lookup_user(db_conn, log_func, identifier=quotee, identifier_type="UUID")
user_data = db.lookup_user(db_conn, identifier=quotee, identifier_type="UUID")
if not user_data:
log_func(f"ERROR: Could not find platform name for remover UUID {quote_removed_by} on UUI. Default to 'Unknown'", "ERROR")
globals.log(f"ERROR: Could not find platform name for remover UUID {quote_removed_by} on UUI. Default to 'Unknown'", "ERROR")
quote_removed_by = "Unknown"
else:
quote_removed_by = user_data[f"{platform}_user_display_name"]
if quote_removed == 1:
# It's removed
await send_message(ctx, f"Quote {quote_number}: [REMOVED by {quote_removed_by}]")
return f"Quote {quote_number}: [REMOVED by {quote_removed_by}]"
else:
# It's not removed
await send_message(ctx, f"Quote {quote_number}: {quote_text}")
return f"Quote {quote_number}: {quote_text}"
async def retrieve_random_quote(db_conn, log_func, is_discord, ctx):
async def retrieve_random_quote(db_conn, is_discord, ctx):
"""
Grab a random quote (QUOTE_REMOVED=0).
If no quotes exist or all removed, respond with "No quotes are created yet."
"""
# First check if we have any quotes
max_id = get_max_quote_id(db_conn, log_func)
max_id = get_max_quote_id(db_conn)
if max_id < 1:
return await send_message(ctx, "No quotes are created yet.")
return "No quotes are created yet."
# We have quotes, try selecting a random one from the not-removed set
if is_sqlite(db_conn):
@ -451,20 +446,20 @@ async def retrieve_random_quote(db_conn, log_func, is_discord, ctx):
LIMIT 1
"""
rows = db.run_db_operation(db_conn, "read", random_sql, log_func=log_func)
rows = db.run_db_operation(db_conn, "read", random_sql)
if not rows:
return await send_message(ctx, "No quotes are created yet.")
return "No quotes are created yet."
quote_number, quote_text = rows[0]
await send_message(ctx, f"Quote {quote_number}: {quote_text}")
await f"Quote {quote_number}: {quote_text}"
def get_max_quote_id(db_conn, log_func):
def get_max_quote_id(db_conn):
"""
Return the highest ID in the quotes table, or 0 if empty.
"""
sql = "SELECT MAX(ID) FROM quotes"
rows = db.run_db_operation(db_conn, "read", sql, log_func=log_func)
rows = db.run_db_operation(db_conn, "read", sql)
if rows and rows[0] and rows[0][0] is not None:
return rows[0][0]
return 0

View File

@ -1,15 +1,15 @@
# cmd_discord.py
import discord
from discord.ext import commands
from discord import app_commands
from typing import Optional
from cmd_common import common_commands as cc
from modules.permissions import has_permission
from modules.utility import handle_help_command
from modules.utility import monitor_cmds
import globals
def setup(bot, db_conn=None, log=None):
def setup(bot, db_conn=None):
"""
Attach commands to the Discord bot, store references to db/log.
"""
@ -17,7 +17,6 @@ def setup(bot, db_conn=None, log=None):
config_data = globals.load_config_file()
@bot.command()
@monitor_cmds(bot.log)
@commands.is_owner()
async def sync_commands(ctx):
"""
@ -32,11 +31,11 @@ def setup(bot, db_conn=None, log=None):
reply = "... Commands synced!"
except Exception as e:
reply = f"... Commands failed to sync! Error message:\n{e}"
globals.log(f"'sync_commands' failed to sync command tree\n{e}", "ERROR")
else:
reply = "You're not the registered owner of me!"
await ctx.send(reply)
@monitor_cmds(bot.log)
@bot.hybrid_command(name="available", description="List commands available to you")
async def available(ctx):
available_cmds = []
@ -50,37 +49,38 @@ def setup(bot, db_conn=None, log=None):
pass
except Exception as e:
# In case some commands fail unexpectedly during checks.
bot.log(f"Error checking command {command.name}: {e}", "ERROR")
globals.log(f"Error checking command {command.name}: {e}", "ERROR")
if available_cmds:
await ctx.send("Available commands: " + ", ".join(sorted(available_cmds)))
else:
await ctx.send("No commands are available to you at this time.")
@monitor_cmds(bot.log)
@bot.hybrid_command(name="help", description="Get information about commands")
async def cmd_help(ctx, *, cmd_name: str = ""):
@app_commands.describe(
command="The command to get help info about. Defaults to 'help'"
)
async def cmd_help(ctx: commands.Context, *, command: str = ""):
"""
e.g. !help
!help quote
"""
await handle_help_command(ctx, cmd_name, bot, is_discord=True, log_func=bot.log)
result = await handle_help_command(ctx, command, bot, is_discord=True)
await ctx.send(result)
@monitor_cmds(bot.log)
@bot.hybrid_command(name="greet", description="Make me greet you")
async def cmd_greet(ctx):
result = cc.greet(ctx.author.display_name, "Discord")
await ctx.send(result)
@monitor_cmds(bot.log)
@bot.hybrid_command(name="ping", description="Check my uptime")
async def cmd_ping(ctx):
result = cc.ping()
# Get heartbeat latency. Discord only
latency = round(float(bot.latency) * 1000)
result += f" (*latency: {latency}ms*)"
await ctx.send(result)
@monitor_cmds(bot.log)
@bot.hybrid_command(name="howl", description="Attempt a howl")
async def cmd_howl(ctx):
response = cc.handle_howl_command(ctx)
@ -108,7 +108,6 @@ def setup(bot, db_conn=None, log=None):
# except Exception as e:
# await ctx.send(f"Fallback reload failed: {e}")
@monitor_cmds(bot.log)
@bot.hybrid_command(name="hi", description="Dev command for testing permissions system")
async def cmd_hi(ctx):
user_id = str(ctx.author.id)
@ -169,7 +168,6 @@ def setup(bot, db_conn=None, log=None):
# get_twitch_game_for_channel=None # None for Discord
# )
@monitor_cmds(bot.log)
@bot.hybrid_group(name="quote", description="Interact with the quotes system", with_app_command=True)
async def cmd_quote(ctx, *, id: Optional[int] = None):
"""
@ -187,16 +185,20 @@ def setup(bot, db_conn=None, log=None):
else:
args = id.split() # Split query into arguments
await cc.handle_quote_command(
globals.log(f"'quote' command initiated with arguments: {args}", "DEBUG")
result = await cc.handle_quote_command(
db_conn=bot.db_conn,
log_func=bot.log,
is_discord=True,
ctx=ctx,
args=args,
get_twitch_game_for_channel=None
)
@monitor_cmds(bot.log)
globals.log(f"'quote' result: {result}")
await ctx.send(result)
@cmd_quote.command(name="add", description="Add a quote")
async def cmd_quote_add(ctx, *, text: str):
"""
@ -213,16 +215,16 @@ def setup(bot, db_conn=None, log=None):
args = ["add", text] # Properly format arguments
await cc.handle_quote_command(
result = await cc.handle_quote_command(
db_conn=bot.db_conn,
log_func=bot.log,
is_discord=True,
ctx=ctx,
args=args,
get_twitch_game_for_channel=None
)
@monitor_cmds(bot.log)
await ctx.send(result)
@cmd_quote.command(name="remove", description="Remove a quote by number")
async def cmd_quote_remove(ctx, id: int):
"""
@ -235,21 +237,22 @@ def setup(bot, db_conn=None, log=None):
args = ["remove", str(id)] # Properly pass the ID as an argument
await cc.handle_quote_command(
result = await cc.handle_quote_command(
db_conn=bot.db_conn,
log_func=bot.log,
is_discord=True,
ctx=ctx,
args=args,
get_twitch_game_for_channel=None
)
await ctx.send(result)
######################
# The following log entry must be last in the file to verify commands loading as they should
######################
# Debug: Print that commands are being registered
try:
command_names = [cmd.name for cmd in bot.commands] # Extract command names
bot.log(f"Registering commands for Discord: {command_names}", "DEBUG")
globals.log(f"Registering commands for Discord: {command_names}", "DEBUG")
except Exception as e:
bot.log(f"An error occured while printing registered commands for Discord: {e}", "WARNING")
globals.log(f"An error occured while printing registered commands for Discord: {e}", "WARNING")

View File

@ -2,28 +2,25 @@
from twitchio.ext import commands
import globals
from cmd_common import common_commands as cc
from modules.permissions import has_permission
from modules.utility import handle_help_command
from modules.utility import monitor_cmds
def setup(bot, db_conn=None, log=None):
def setup(bot, db_conn=None):
"""
This function is called to load/attach commands to the `bot`.
We also attach the db_conn and log so the commands can use them.
"""
@bot.command(name="greet")
@monitor_cmds(bot.log)
async def cmd_greet(ctx):
result = cc.greet(ctx.author.display_name, "Twitch")
await ctx.send(result)
@bot.command(name="ping")
@monitor_cmds(bot.log)
async def cmd_ping(ctx):
result = cc.ping()
latency = round(float(bot.latency) * 1000)
result += f" (*latency: {latency}ms*)"
await ctx.send(result)
@bot.command(name="howl")
@ -32,7 +29,6 @@ def setup(bot, db_conn=None, log=None):
await ctx.send(response)
@bot.command(name="hi")
@monitor_cmds(bot.log)
async def cmd_hi(ctx):
user_id = str(ctx.author.id) # Twitch user ID
user_roles = [role.lower() for role in ctx.author.badges.keys()] # "roles" from Twitch badges
@ -78,7 +74,6 @@ def setup(bot, db_conn=None, log=None):
@bot.command(name="quote")
@monitor_cmds(bot.log)
async def cmd_quote(ctx: commands.Context):
if not bot.db_conn:
return await ctx.send("Database is unavailable, sorry.")
@ -90,27 +85,26 @@ def setup(bot, db_conn=None, log=None):
# Placeholder for your actual logic to fetch the current game
return "SomeGame"
await cc.handle_quote_command(
result = await cc.handle_quote_command(
db_conn=bot.db_conn,
log_func=bot.log,
is_discord=False,
ctx=ctx,
args=args,
get_twitch_game_for_channel=get_twitch_game_for_channel
)
await ctx.send(result)
@bot.command(name="help")
@monitor_cmds(bot.log)
async def cmd_help(ctx):
parts = ctx.message.content.strip().split()
cmd_name = parts[1] if len(parts) > 1 else None
await handle_help_command(ctx, cmd_name, bot, is_discord=False, log_func=bot.log)
await handle_help_command(ctx, cmd_name, bot, is_discord=False)
######################
# The following log entry must be last in the file to verify commands loading as they should
######################
# Debug: Print that commands are being registered
try:
bot.log(f"Registering commands for Twitch: {list(bot.commands.keys())}", "DEBUG")
globals.log(f"Registering commands for Twitch: {list(bot.commands.keys())}", "DEBUG")
except Exception as e:
bot.log(f"An error occured while printing registered commands for Twitch: {e}", "WARNING")
globals.log(f"An error occured while printing registered commands for Twitch: {e}", "WARNING")

View File

@ -1,6 +1,7 @@
import time
import json
import sys
import traceback
# Store the start time globally
_bot_start_time = time.time()
@ -20,4 +21,94 @@ def load_config_file():
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error parsing config.json: {e}")
sys.exit(1)
sys.exit(1)
###############################
# Simple Logging System
###############################
def log(message, level="INFO", exec_info=False):
"""
A simple logging function with adjustable log levels.
Logs messages in a structured format.
Available levels:\n
DEBUG = Information useful for debugging\n
INFO = Informational messages\n
WARNING = Something happened that may lead to issues\n
ERROR = A non-critical error has happened\n
CRITICAL = A critical, but non-fatal, error\n
FATAL = Fatal error. Program exits after logging this\n\n
See 'config.json' for disabling/enabling logging levels
"""
# Load configuration file
config_data = load_config_file()
# Initiate logfile
lfp = config_data["logging"]["logfile_path"] # Log File Path
clfp = f"cur_{lfp}" # Current Log File Path
if not config_data["logging"]["terminal"]["log_to_terminal"] and not config_data["logging"]["file"]["log_to_file"]:
print(f"!!! WARNING !!! CONSOLE AND LOGFILE OUTPUT DISABLED !!!\n!!! NO LOGS WILL BE PROVIDED !!!")
from modules import utility
log_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"}
if level not in log_levels:
level = "INFO" # Default to INFO if an invalid level is provided
if level in config_data["logging"]["log_levels"] or level == "FATAL":
elapsed = time.time() - get_bot_start_time()
uptime_str, _ = utility.format_uptime(elapsed)
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_message = f"[{timestamp} - {uptime_str}] [{level}] {message}"
# Include traceback for certain error levels
if exec_info or level in ["CRITICAL", "FATAL"]:
log_message += f"\n{traceback.format_exc()}"
# Print to terminal if enabled
# 'FATAL' errors override settings
# Checks config file to see enabled/disabled logging levels
if config_data["logging"]["terminal"]["log_to_terminal"] or level == "FATAL":
config_level_format = f"log_{level.lower()}"
if config_data["logging"]["terminal"][config_level_format] or level == "FATAL":
print(log_message)
# Write to file if enabled
# 'FATAL' errors override settings
# Checks config file to see enabled/disabled logging levels
if config_data["logging"]["file"]["log_to_file"] or level == "FATAL":
config_level_format = f"log_{level.lower()}"
if config_data["logging"]["file"][config_level_format] or level == "FATAL":
try:
lfp = config_data["logging"]["logfile_path"]
clfp = f"cur_{lfp}"
with open(lfp, "a", encoding="utf-8") as logfile: # Write to permanent logfile
logfile.write(f"{log_message}\n")
logfile.flush() # Ensure it gets written immediately
with open(clfp, "a", encoding="utf-8") as c_logfile: # Write to this-run logfile
c_logfile.write(f"{log_message}\n")
c_logfile.flush() # Ensure it gets written immediately
except Exception as e:
print(f"[WARNING] Failed to write to logfile: {e}")
# Handle fatal errors with shutdown
if level == "FATAL":
print(f"!!! FATAL ERROR LOGGED, SHUTTING DOWN !!!")
sys.exit(1)
def reset_curlogfile():
# Load configuration file
config_data = load_config_file()
# Initiate logfile
lfp = config_data["logging"]["logfile_path"] # Log File Path
clfp = f"cur_{lfp}" # Current Log File Path
try:
open(clfp, "w")
#log(f"Current-run logfile cleared", "DEBUG")
except Exception as e:
#log(f"Failed to clear current-run logfile: {e}")
pass

View File

@ -4,12 +4,14 @@ import re
import time, datetime
import sqlite3
import globals
try:
import mariadb
except ImportError:
mariadb = None # We handle gracefully if 'mariadb' isn't installed.
def checkenable_db_fk(db_conn, log_func):
def checkenable_db_fk(db_conn):
"""
Attempt to enable foreign key checks where it is relevant
(i.e. in SQLite). For MariaDB/MySQL, nothing special is needed.
@ -22,15 +24,15 @@ def checkenable_db_fk(db_conn, log_func):
cursor.execute("PRAGMA foreign_keys = ON;")
cursor.close()
db_conn.commit()
log_func("Enabled foreign key support in SQLite (PRAGMA foreign_keys=ON).", "DEBUG")
globals.log("Enabled foreign key support in SQLite (PRAGMA foreign_keys=ON).", "DEBUG")
except Exception as e:
log_func(f"Failed to enable foreign key support in SQLite: {e}", "WARNING")
globals.log(f"Failed to enable foreign key support in SQLite: {e}", "WARNING")
else:
# For MariaDB/MySQL, they're typically enabled with InnoDB
log_func("Assuming DB is MariaDB/MySQL with FKs enabled", "DEBUG")
globals.log("Assuming DB is MariaDB/MySQL with FKs enabled", "DEBUG")
def init_db_connection(config, log):
def init_db_connection(config):
"""
Initializes a database connection based on config.json contents:
- If config says 'use_mariadb', tries connecting to MariaDB.
@ -62,37 +64,36 @@ def init_db_connection(config, log):
port=port
)
conn.autocommit = False # We'll manage commits manually
log(f"Database connection established using MariaDB (host={host}, db={dbname}).")
globals.log(f"Database connection established using MariaDB (host={host}, db={dbname}).")
return conn
except mariadb.Error as e:
log(f"Error connecting to MariaDB: {e}", "WARNING")
globals.log(f"Error connecting to MariaDB: {e}", "WARNING")
else:
log("MariaDB config incomplete. Falling back to SQLite...", "WARNING")
globals.log("MariaDB config incomplete. Falling back to SQLite...", "WARNING")
else:
if use_mariadb and mariadb is None:
log("mariadb module not installed but use_mariadb=True. Falling back to SQLite...", "WARNING")
globals.log("mariadb module not installed but use_mariadb=True. Falling back to SQLite...", "WARNING")
# Fallback to local SQLite
sqlite_path = db_settings.get("sqlite_path", "local_database.sqlite")
try:
conn = sqlite3.connect(sqlite_path)
log(f"Database connection established using local SQLite: {sqlite_path}")
globals.log(f"Database connection established using local SQLite: {sqlite_path}")
return conn
except sqlite3.Error as e:
log(f"Could not open local SQLite database '{sqlite_path}': {e}", "WARNING")
globals.log(f"Could not open local SQLite database '{sqlite_path}': {e}", "WARNING")
# If neither MariaDB nor SQLite connected, that's fatal for the bot
log("No valid database connection could be established! Exiting...", "FATAL")
globals.log("No valid database connection could be established! Exiting...", "FATAL")
return None
def run_db_operation(conn, operation, query, params=None, log_func=None):
def run_db_operation(conn, operation, query, params=None):
"""
Executes a parameterized query with basic screening for injection attempts:
- 'operation' can be "read", "write", "update", "delete", "lookup", etc.
- 'query' is the SQL statement, with placeholders (? in SQLite or %s in MariaDB both work).
- 'params' is a tuple/list of parameters for the query (preferred for security).
- 'log_func' is the logging function (message, level).
1) We do a minimal check for suspicious patterns, e.g. multiple statements or known bad keywords.
2) We execute the query with parameters, and commit on write/update/delete.
@ -103,8 +104,8 @@ def run_db_operation(conn, operation, query, params=None, log_func=None):
- Always use parameterized queries wherever possible to avoid injection.
"""
if conn is None:
if log_func:
log_func("run_db_operation called but no valid DB connection!", "FATAL")
if globals.log:
globals.log("run_db_operation called but no valid DB connection!", "FATAL")
return None
if params is None:
@ -116,18 +117,18 @@ def run_db_operation(conn, operation, query, params=None, log_func=None):
# Check for multiple statements separated by semicolons (beyond the last one)
if lowered.count(";") > 1:
if log_func:
log_func("Query blocked: multiple SQL statements detected.", "WARNING")
log_func(f"Offending query: {query}", "WARNING")
if globals.log:
globals.log("Query blocked: multiple SQL statements detected.", "WARNING")
globals.log(f"Offending query: {query}", "WARNING")
return None
# Potentially dangerous SQL keywords
forbidden_keywords = ["drop table", "union select", "exec ", "benchmark(", "sleep("]
for kw in forbidden_keywords:
if kw in lowered:
if log_func:
log_func(f"Query blocked due to forbidden keyword: '{kw}'", "WARNING")
log_func(f"Offending query: {query}", "WARNING")
if globals.log:
globals.log(f"Query blocked due to forbidden keyword: '{kw}'", "WARNING")
globals.log(f"Offending query: {query}", "WARNING")
return None
cursor = conn.cursor()
@ -138,8 +139,8 @@ def run_db_operation(conn, operation, query, params=None, log_func=None):
write_ops = ("write", "insert", "update", "delete", "change")
if operation.lower() in write_ops:
conn.commit()
if log_func:
log_func(f"DB operation '{operation}' committed.", "DEBUG")
if globals.log:
globals.log(f"DB operation '{operation}' committed.", "DEBUG")
# If it's read/lookup, fetch results
read_ops = ("read", "lookup", "select")
@ -151,8 +152,8 @@ def run_db_operation(conn, operation, query, params=None, log_func=None):
except Exception as e:
# Rollback on any error
conn.rollback()
if log_func:
log_func(f"Error during '{operation}' query execution: {e}", "ERROR")
if globals.log:
globals.log(f"Error during '{operation}' query execution: {e}", "ERROR")
return None
finally:
cursor.close()
@ -161,7 +162,7 @@ def run_db_operation(conn, operation, query, params=None, log_func=None):
# Ensure quotes table exists
#######################
def ensure_quotes_table(db_conn, log_func):
def ensure_quotes_table(db_conn):
"""
Checks if 'quotes' table exists. If not, attempts to create it.
Raises an Exception or logs errors if creation fails.
@ -189,14 +190,14 @@ def ensure_quotes_table(db_conn, log_func):
"""
from modules.db import run_db_operation
rows = run_db_operation(db_conn, "read", check_sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
# The table 'quotes' already exists
log_func("Table 'quotes' already exists, skipping creation.", "DEBUG")
globals.log("Table 'quotes' already exists, skipping creation.", "DEBUG")
return # We can just return
# 3) Table does NOT exist => create it
log_func("Table 'quotes' does not exist; creating now...")
globals.log("Table 'quotes' does not exist; creating now...")
if is_sqlite:
create_table_sql = """
@ -229,20 +230,20 @@ def ensure_quotes_table(db_conn, log_func):
)
"""
result = run_db_operation(db_conn, "write", create_table_sql, log_func=log_func)
result = run_db_operation(db_conn, "write", create_table_sql)
if result is None:
# If run_db_operation returns None on error, handle or raise:
error_msg = "Failed to create 'quotes' table!"
log_func(error_msg, "CRITICAL")
globals.log(error_msg, "CRITICAL")
raise RuntimeError(error_msg)
log_func("Successfully created table 'quotes'.")
globals.log("Successfully created table 'quotes'.")
#######################
# Ensure 'users' table
#######################
def ensure_users_table(db_conn, log_func):
def ensure_users_table(db_conn):
"""
Checks if 'users' table exists. If not, creates it.
@ -274,13 +275,13 @@ def ensure_users_table(db_conn, log_func):
AND table_schema = DATABASE()
"""
rows = run_db_operation(db_conn, "read", check_sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
log_func("Table 'users' already exists, skipping creation.", "DEBUG")
globals.log("Table 'users' already exists, skipping creation.", "DEBUG")
return
# 2) Table does NOT exist => create it
log_func("Table 'users' does not exist; creating now...")
globals.log("Table 'users' does not exist; creating now...")
if is_sqlite:
create_table_sql = """
@ -313,20 +314,20 @@ def ensure_users_table(db_conn, log_func):
)
"""
result = run_db_operation(db_conn, "write", create_table_sql, log_func=log_func)
result = run_db_operation(db_conn, "write", create_table_sql)
if result is None:
error_msg = "Failed to create 'users' table!"
log_func(error_msg, "CRITICAL")
globals.log(error_msg, "CRITICAL")
raise RuntimeError(error_msg)
log_func("Successfully created table 'users'.")
globals.log("Successfully created table 'users'.")
########################
# Lookup user function
########################
def lookup_user(db_conn, log_func, identifier: str, identifier_type: str, target_identifier: str = None):
def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifier: str = None):
"""
Looks up a user in the 'users' table based on the given identifier_type.
@ -370,8 +371,8 @@ def lookup_user(db_conn, log_func, identifier: str, identifier_type: str, target
# Ensure the provided identifier_type is acceptable.
if identifier_type.lower() not in valid_cols:
if log_func:
log_func(f"lookup_user error: invalid identifier_type '{identifier_type}'", "WARNING")
if globals.log:
globals.log(f"lookup_user error: invalid identifier_type '{identifier_type}'", "WARNING")
return None
# Convert shorthand identifier types to their full column names.
@ -383,8 +384,8 @@ def lookup_user(db_conn, log_func, identifier: str, identifier_type: str, target
# If a target_identifier is provided, validate that too.
if target_identifier is not None:
if target_identifier.lower() not in valid_cols:
if log_func:
log_func(f"lookup_user error: invalid target_identifier '{target_identifier}'", "WARNING")
if globals.log:
globals.log(f"lookup_user error: invalid target_identifier '{target_identifier}'", "WARNING")
return None
# Build the query using the (now validated) identifier_type.
@ -406,10 +407,10 @@ def lookup_user(db_conn, log_func, identifier: str, identifier_type: str, target
"""
# Execute the database operation. Adjust run_db_operation() as needed.
rows = run_db_operation(db_conn, "read", query, params=(identifier,), log_func=log_func)
rows = run_db_operation(db_conn, "read", query, params=(identifier,))
if not rows:
if log_func:
log_func(f"lookup_user: No user found for {identifier_type}='{identifier}'", "DEBUG")
if globals.log:
globals.log(f"lookup_user: No user found for {identifier_type}='{identifier}'", "DEBUG")
return None
# Since we have a single row, convert it to a dictionary.
@ -444,15 +445,15 @@ def lookup_user(db_conn, log_func, identifier: str, identifier_type: str, target
if target_identifier in user_data:
return user_data[target_identifier]
else:
if log_func:
log_func(f"lookup_user error: target_identifier '{target_identifier}' not present in user data", "WARNING")
if globals.log:
globals.log(f"lookup_user error: target_identifier '{target_identifier}' not present in user data", "WARNING")
return None
# Otherwise, return the full user record.
return user_data
def ensure_chatlog_table(db_conn, log_func):
def ensure_chatlog_table(db_conn):
"""
Checks if 'chat_log' table exists. If not, creates it.
@ -487,13 +488,13 @@ def ensure_chatlog_table(db_conn, log_func):
AND table_schema = DATABASE()
"""
rows = run_db_operation(db_conn, "read", check_sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
log_func("Table 'chat_log' already exists, skipping creation.", "DEBUG")
globals.log("Table 'chat_log' already exists, skipping creation.", "DEBUG")
return
# 2) Table doesn't exist => create it
log_func("Table 'chat_log' does not exist; creating now...")
globals.log("Table 'chat_log' does not exist; creating now...")
if is_sqlite:
create_sql = """
@ -522,16 +523,16 @@ def ensure_chatlog_table(db_conn, log_func):
)
"""
result = run_db_operation(db_conn, "write", create_sql, log_func=log_func)
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
error_msg = "Failed to create 'chat_log' table!"
log_func(error_msg, "CRITICAL")
globals.log(error_msg, "CRITICAL")
raise RuntimeError(error_msg)
log_func("Successfully created table 'chat_log'.", "INFO")
globals.log("Successfully created table 'chat_log'.", "INFO")
def log_message(db_conn, log_func, user_uuid, message_content, platform, channel, attachments=None):
def log_message(db_conn, user_uuid, message_content, platform, channel, attachments=None, username: str = "Unknown"):
"""
Inserts a row into 'chat_log' with the given fields.
user_uuid: The user's UUID from the 'users' table (string).
@ -557,15 +558,15 @@ def log_message(db_conn, log_func, user_uuid, message_content, platform, channel
VALUES (?, ?, ?, ?, ?)
"""
params = (user_uuid, message_content, platform, channel, attachments)
rowcount = run_db_operation(db_conn, "write", insert_sql, params, log_func=log_func)
rowcount = run_db_operation(db_conn, "write", insert_sql, params)
if rowcount and rowcount > 0:
log_func(f"Logged message for UUID={user_uuid} in 'chat_log'.", "DEBUG")
globals.log(f"Logged message for UUID={user_uuid} ({username}) in 'chat_log'.", "DEBUG")
else:
log_func("Failed to log message in 'chat_log'.", "ERROR")
globals.log("Failed to log message in 'chat_log'.", "ERROR")
def ensure_userhowls_table(db_conn, log_func):
def ensure_userhowls_table(db_conn):
"""
Checks if 'user_howls' table exists; if not, creates it:
ID (PK) | UUID (FK -> users.UUID) | HOWL (int) | DATETIME (auto timestamp)
@ -588,12 +589,12 @@ def ensure_userhowls_table(db_conn, log_func):
AND table_schema = DATABASE()
"""
rows = run_db_operation(db_conn, "read", check_sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
log_func("Table 'user_howls' already exists, skipping creation.", "DEBUG")
globals.log("Table 'user_howls' already exists, skipping creation.", "DEBUG")
return
log_func("Table 'user_howls' does not exist; creating now...", "INFO")
globals.log("Table 'user_howls' does not exist; creating now...", "INFO")
if is_sqlite:
create_sql = """
@ -616,15 +617,15 @@ def ensure_userhowls_table(db_conn, log_func):
)
"""
result = run_db_operation(db_conn, "write", create_sql, log_func=log_func)
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
err_msg = "Failed to create 'user_howls' table!"
log_func(err_msg, "ERROR")
globals.log(err_msg, "ERROR")
raise RuntimeError(err_msg)
log_func("Successfully created table 'user_howls'.", "INFO")
globals.log("Successfully created table 'user_howls'.", "INFO")
def insert_howl(db_conn, log_func, user_uuid, howl_value):
def insert_howl(db_conn, user_uuid, howl_value):
"""
Insert a row into user_howls with the user's UUID, the integer 0-100,
and DATETIME defaulting to now.
@ -634,13 +635,13 @@ def insert_howl(db_conn, log_func, user_uuid, howl_value):
VALUES (?, ?)
"""
params = (user_uuid, howl_value)
rowcount = run_db_operation(db_conn, "write", sql, params, log_func=log_func)
rowcount = run_db_operation(db_conn, "write", sql, params)
if rowcount and rowcount > 0:
log_func(f"Recorded a {howl_value}% howl for UUID={user_uuid}.", "DEBUG")
globals.log(f"Recorded a {howl_value}% howl for UUID={user_uuid}.", "DEBUG")
else:
log_func(f"Failed to record {howl_value}% howl for UUID={user_uuid}.", "ERROR")
globals.log(f"Failed to record {howl_value}% howl for UUID={user_uuid}.", "ERROR")
def get_howl_stats(db_conn, log_func, user_uuid):
def get_howl_stats(db_conn, user_uuid):
"""
Returns a dict with { 'count': int, 'average': float, 'count_zero': int, 'count_hundred': int }
or None if there are no rows at all for that UUID.
@ -654,7 +655,7 @@ def get_howl_stats(db_conn, log_func, user_uuid):
FROM user_howls
WHERE UUID = ?
"""
rows = run_db_operation(db_conn, "read", sql, (user_uuid,), log_func=log_func)
rows = run_db_operation(db_conn, "read", sql, (user_uuid,))
if not rows:
return None
@ -673,7 +674,7 @@ def get_howl_stats(db_conn, log_func, user_uuid):
"count_hundred": hundred_count
}
def get_global_howl_stats(db_conn, log_func):
def get_global_howl_stats(db_conn):
"""
Returns a dictionary with total howls, average howl percentage, unique users,
and counts of extreme (0% and 100%) howls.
@ -686,7 +687,7 @@ def get_global_howl_stats(db_conn, log_func):
SUM(HOWL = 100) AS count_hundred
FROM user_howls
"""
rows = run_db_operation(db_conn, "read", sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", sql)
if not rows or not rows[0] or rows[0][0] is None:
return None # No howl data exists
@ -699,7 +700,7 @@ def get_global_howl_stats(db_conn, log_func):
"count_hundred": rows[0][4],
}
def ensure_discord_activity_table(db_conn, log_func):
def ensure_discord_activity_table(db_conn):
"""
Ensures the 'discord_activity' table exists.
Logs voice events, cameras, streaming, gaming, and Discord activities.
@ -714,12 +715,12 @@ def ensure_discord_activity_table(db_conn, log_func):
WHERE table_name = 'discord_activity' AND table_schema = DATABASE()
"""
rows = run_db_operation(db_conn, "read", check_sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0]:
log_func("Table 'discord_activity' already exists, skipping creation.", "DEBUG")
globals.log("Table 'discord_activity' already exists, skipping creation.", "DEBUG")
return
log_func("Creating 'discord_activity' table...", "INFO")
globals.log("Creating 'discord_activity' table...", "INFO")
if is_sqlite:
create_sql = """
@ -757,17 +758,17 @@ def ensure_discord_activity_table(db_conn, log_func):
"""
try:
result = run_db_operation(db_conn, "write", create_sql, log_func=log_func)
result = run_db_operation(db_conn, "write", create_sql)
except Exception as e:
log_func(f"Unable to create the table: discord_activity: {e}")
globals.log(f"Unable to create the table: discord_activity: {e}")
if result is None:
log_func("Failed to create 'discord_activity' table!", "CRITICAL")
globals.log("Failed to create 'discord_activity' table!", "CRITICAL")
raise RuntimeError("Database table creation failed.")
log_func("Successfully created table 'discord_activity'.", "INFO")
globals.log("Successfully created table 'discord_activity'.", "INFO")
def log_discord_activity(db_conn, log_func, guild_id, user_uuid, action, voice_channel, action_detail=None):
def log_discord_activity(db_conn, guild_id, user_uuid, action, voice_channel, action_detail=None):
"""
Logs Discord activities (playing games, listening to Spotify, streaming).
@ -791,10 +792,10 @@ def log_discord_activity(db_conn, log_func, guild_id, user_uuid, action, voice_c
# Verify that the user exists in 'users' before proceeding.
user_check = run_db_operation(
db_conn, "read", "SELECT UUID FROM users WHERE UUID = ?", (user_uuid,), log_func
db_conn, "read", "SELECT UUID FROM users WHERE UUID = ?", (user_uuid,)
)
if not user_check:
log_func(f"WARNING: Attempted to log activity for non-existent UUID: {user_uuid}", "WARNING")
globals.log(f"WARNING: Attempted to log activity for non-existent UUID: {user_uuid}", "WARNING")
return # Prevent foreign key issues.
now = datetime.datetime.now()
@ -809,7 +810,7 @@ def log_discord_activity(db_conn, log_func, guild_id, user_uuid, action, voice_c
LIMIT ?
"""
rows = run_db_operation(
db_conn, "read", query, params=(user_uuid, action, NUM_RECENT_ENTRIES), log_func=log_func
db_conn, "read", query, params=(user_uuid, action, NUM_RECENT_ENTRIES)
)
# Determine the timestamp of the most recent event that matches the new detail,
@ -822,7 +823,7 @@ def log_discord_activity(db_conn, log_func, guild_id, user_uuid, action, voice_c
try:
dt = datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
except Exception as e:
log_func(f"Error parsing datetime '{dt_str}': {e}", "ERROR")
globals.log(f"Error parsing datetime '{dt_str}': {e}", "ERROR")
continue
normalized_existing = normalize_detail(detail)
if normalized_existing == normalized_new:
@ -853,15 +854,15 @@ def log_discord_activity(db_conn, log_func, guild_id, user_uuid, action, voice_c
VALUES (?, ?, ?, ?, ?)
"""
params = (user_uuid, action, guild_id, channel_val, action_detail)
rowcount = run_db_operation(db_conn, "write", sql, params, log_func)
rowcount = run_db_operation(db_conn, "write", sql, params)
if rowcount and rowcount > 0:
detail_str = f" ({action_detail})" if action_detail else ""
log_func(f"Logged Discord activity in Guild {guild_id}: {action}{detail_str}", "DEBUG")
globals.log(f"Logged Discord activity in Guild {guild_id}: {action}{detail_str}", "DEBUG")
else:
log_func("Failed to log Discord activity.", "ERROR")
globals.log("Failed to log Discord activity.", "ERROR")
def ensure_bot_events_table(db_conn, log_func):
def ensure_bot_events_table(db_conn):
"""
Ensures the 'bot_events' table exists, which logs major bot-related events.
"""
@ -873,12 +874,12 @@ def ensure_bot_events_table(db_conn, log_func):
WHERE table_name = 'bot_events' AND table_schema = DATABASE()
"""
rows = run_db_operation(db_conn, "read", check_sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0]:
log_func("Table 'bot_events' already exists, skipping creation.", "DEBUG")
globals.log("Table 'bot_events' already exists, skipping creation.", "DEBUG")
return
log_func("Creating 'bot_events' table...", "INFO")
globals.log("Creating 'bot_events' table...", "INFO")
# Define SQL Schema
create_sql = """
@ -898,14 +899,14 @@ def ensure_bot_events_table(db_conn, log_func):
"""
# Create the table
result = run_db_operation(db_conn, "write", create_sql, log_func=log_func)
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
log_func("Failed to create 'bot_events' table!", "CRITICAL")
globals.log("Failed to create 'bot_events' table!", "CRITICAL")
raise RuntimeError("Database table creation failed.")
log_func("Successfully created table 'bot_events'.", "INFO")
globals.log("Successfully created table 'bot_events'.", "INFO")
def log_bot_event(db_conn, log_func, event_type, event_details):
def log_bot_event(db_conn, event_type, event_details):
"""
Logs a bot event (e.g., startup, shutdown, disconnection).
"""
@ -914,14 +915,14 @@ def log_bot_event(db_conn, log_func, event_type, event_details):
VALUES (?, ?)
"""
params = (event_type, event_details)
rowcount = run_db_operation(db_conn, "write", sql, params, log_func)
rowcount = run_db_operation(db_conn, "write", sql, params)
if rowcount and rowcount > 0:
log_func(f"Logged bot event: {event_type} - {event_details}", "DEBUG")
globals.log(f"Logged bot event: {event_type} - {event_details}", "DEBUG")
else:
log_func("Failed to log bot event.", "ERROR")
globals.log("Failed to log bot event.", "ERROR")
def get_event_summary(db_conn, log_func, time_span="7d"):
def get_event_summary(db_conn, time_span="7d"):
"""
Retrieves bot event statistics based on a given time span.
Supports:
@ -942,7 +943,7 @@ def get_event_summary(db_conn, log_func, time_span="7d"):
}
if time_span not in time_mappings:
log_func(f"Invalid time span '{time_span}', defaulting to '7d'", "WARNING")
globals.log(f"Invalid time span '{time_span}', defaulting to '7d'", "WARNING")
time_span = "7d"
# Define SQL query
@ -954,7 +955,7 @@ def get_event_summary(db_conn, log_func, time_span="7d"):
ORDER BY COUNT(*) DESC
"""
rows = run_db_operation(db_conn, "read", sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", sql)
# Organize data into OrderedDict
summary = OrderedDict()
@ -964,7 +965,7 @@ def get_event_summary(db_conn, log_func, time_span="7d"):
return summary
def ensure_link_codes_table(db_conn, log_func):
def ensure_link_codes_table(db_conn):
"""
Ensures the 'link_codes' table exists.
This table stores one-time-use account linking codes.
@ -976,12 +977,12 @@ def ensure_link_codes_table(db_conn, log_func):
WHERE table_name = 'link_codes' AND table_schema = DATABASE()
"""
rows = run_db_operation(db_conn, "read", check_sql, log_func=log_func)
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0]:
log_func("Table 'link_codes' already exists, skipping creation.", "DEBUG")
globals.log("Table 'link_codes' already exists, skipping creation.", "DEBUG")
return
log_func("Creating 'link_codes' table...", "INFO")
globals.log("Creating 'link_codes' table...", "INFO")
create_sql = """
CREATE TABLE link_codes (
@ -999,14 +1000,14 @@ def ensure_link_codes_table(db_conn, log_func):
)
"""
result = run_db_operation(db_conn, "write", create_sql, log_func=log_func)
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
log_func("Failed to create 'link_codes' table!", "CRITICAL")
globals.log("Failed to create 'link_codes' table!", "CRITICAL")
raise RuntimeError("Database table creation failed.")
log_func("Successfully created table 'link_codes'.", "INFO")
globals.log("Successfully created table 'link_codes'.", "INFO")
def merge_uuid_data(db_conn, log_func, old_uuid, new_uuid):
def merge_uuid_data(db_conn, old_uuid, new_uuid):
"""
Merges all records from the old UUID (Twitch account) into the new UUID (Discord account).
This replaces all instances of the old UUID in all relevant tables with the new UUID,
@ -1014,7 +1015,7 @@ def merge_uuid_data(db_conn, log_func, old_uuid, new_uuid):
After merging, the old UUID entry is removed from the `users` table.
"""
log_func(f"Starting UUID merge: {old_uuid} -> {new_uuid}", "INFO")
globals.log(f"Starting UUID merge: {old_uuid} -> {new_uuid}", "INFO")
tables_to_update = [
"voice_activity_log",
@ -1026,13 +1027,13 @@ def merge_uuid_data(db_conn, log_func, old_uuid, new_uuid):
for table in tables_to_update:
sql = f"UPDATE {table} SET UUID = ? WHERE UUID = ?"
rowcount = run_db_operation(db_conn, "update", sql, (new_uuid, old_uuid), log_func)
log_func(f"Updated {rowcount} rows in {table} (transferring {old_uuid} -> {new_uuid})", "DEBUG")
rowcount = run_db_operation(db_conn, "update", sql, (new_uuid, old_uuid))
globals.log(f"Updated {rowcount} rows in {table} (transferring {old_uuid} -> {new_uuid})", "DEBUG")
# Finally, delete the old UUID from the `users` table
delete_sql = "DELETE FROM users WHERE UUID = ?"
rowcount = run_db_operation(db_conn, "write", delete_sql, (old_uuid,), log_func)
rowcount = run_db_operation(db_conn, "write", delete_sql, (old_uuid,))
log_func(f"Deleted old UUID {old_uuid} from 'users' table ({rowcount} rows affected)", "INFO")
globals.log(f"Deleted old UUID {old_uuid} from 'users' table ({rowcount} rows affected)", "INFO")
log_func(f"UUID merge complete: {old_uuid} -> {new_uuid}", "INFO")
globals.log(f"UUID merge complete: {old_uuid} -> {new_uuid}", "INFO")

View File

@ -6,7 +6,11 @@ import re
import functools
import inspect
import uuid
from modules.db import run_db_operation, lookup_user
from modules.db import run_db_operation, lookup_user, log_message
import modules.utility as utility
import discord
import globals
try:
# 'regex' on PyPI supports `\p{L}`, `\p{N}`, etc.
@ -18,34 +22,137 @@ except ImportError:
DICTIONARY_PATH = "dictionary/" # Path to dictionary files
def monitor_cmds(log_func):
"""
Decorator that logs when a command starts and ends execution.
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
# Extract a command name from the function name
cmd_name = str(func.__name__).split("_")[1]
log_func(f"Command '{cmd_name}' started execution.", "DEBUG")
# def monitor_cmds(log_func):
# """
# Decorator that logs when a command starts and ends execution.
# """
# def decorator(func):
# @functools.wraps(func)
# async def wrapper(*args, **kwargs):
# start_time = time.time()
# try:
# # Extract a command name from the function name
# cmd_name = str(func.__name__).split("_")[1]
# log_func(f"Command '{cmd_name}' started execution.", "DEBUG")
# Await the actual command function
result = await func(*args, **kwargs)
# # Await the actual command function
# result = await func(*args, **kwargs)
# end_time = time.time()
# cmd_duration = round(end_time - start_time, 2)
# log_func(f"Command '{cmd_name}' finished execution after {cmd_duration}s.", "DEBUG")
# return result
# except Exception as e:
# end_time = time.time()
# cmd_duration = round(end_time - start_time, 2)
# log_func(f"Command '{cmd_name}' FAILED while executing after {cmd_duration}s: {e}", "CRITICAL")
# # Explicitly preserve the original signature for slash command introspection
# wrapper.__signature__ = inspect.signature(func)
# return wrapper
# return decorator
# def monitor_cmds():
# """
# Decorator that logs when a command starts, completes, or fails.
# Uses `ctx.bot.log` dynamically.
# """
# def decorator(func):
# @functools.wraps(func)
# async def wrapper(*args, **kwargs):
# start_time = time.time()
# try:
# ctx = args[0] # First argument should be `ctx`
# cmd_name = ctx.command.name if hasattr(ctx, "command") else func.__name__
# log_func = ctx.bot.log
# log_func(f"Command '{cmd_name}' started by {ctx.author} in #{ctx.channel}", "DEBUG")
# # Execute the command
# result = await func(*args, **kwargs)
# # Log successful execution
# duration = round(time.time() - start_time, 2)
# log_func(f"Command '{cmd_name}' finished execution in {duration}s.", "DEBUG")
# return result
# except Exception as e:
# duration = round(time.time() - start_time, 2)
# log_func(f"Command execution failed: '{cmd_name}' after {duration}s: {e}", "CRITICAL")
# # Fix: Ensure Discord's command system keeps the correct function parameters
# wrapper.__signature__ = inspect.signature(func)
# return wrapper
# return decorator
# def monitor_msgs(log_func, db_conn):
# """
# Decorator that logs Discord messages and tracks user activity.
# Works for both commands and event handlers like on_message().
# """
# def decorator(func):
# @functools.wraps(func)
# async def wrapper(self, message: discord.Message, *args, **kwargs):
# start_time = time.time()
# try:
# # Ignore bot messages
# if message.author.bot:
# return
# user_id = str(message.author.id)
# user_name = message.author.name
# display_name = message.author.display_name
# # Track user activity
# track_user_activity(
# db_conn=db_conn,
# log_func=log_func,
# platform="discord",
# user_id=user_id,
# username=user_name,
# display_name=display_name,
# user_is_bot=False
# )
# log_func(f"Message from {user_name} in {message.channel} (Guild: {message.guild.name if message.guild else 'DM'})", "DEBUG")
# # Fetch user UUID
# user_data = lookup_user(db_conn, log_func, identifier=user_id, identifier_type="discord_user_id")
# user_uuid = user_data["UUID"] if user_data else "UNKNOWN"
# # Extract message details
# platform_str = f"discord-{message.guild.name}" if message.guild else "discord-DM"
# channel_str = message.channel.name if hasattr(message.channel, "name") else "DM"
# attachments = ", ".join(a.url for a in message.attachments) if message.attachments else ""
# # Log message in DB
# log_message(
# db_conn=db_conn,
# log_func=log_func,
# user_uuid=user_uuid,
# message_content=message.content or "",
# platform=platform_str,
# channel=channel_str,
# attachments=attachments
# )
# # Call the original `on_message()` function
# await func(self, message, *args, **kwargs)
# # Ensure the bot processes commands
# await self.process_commands(message)
# except Exception as e:
# log_func(f"Error processing message from {message.author}: {e}", "ERROR")
# finally:
# duration = round(time.time() - start_time, 2)
# log_func(f"Message processing complete in {duration}s.", "DEBUG")
# # Preserve original function signature
# wrapper.__signature__ = inspect.signature(func)
# return wrapper
# return decorator
end_time = time.time()
cmd_duration = round(end_time - start_time, 2)
log_func(f"Command '{cmd_name}' finished execution after {cmd_duration}s.", "DEBUG")
return result
except Exception as e:
end_time = time.time()
cmd_duration = round(end_time - start_time, 2)
log_func(f"Command '{cmd_name}' FAILED while executing after {cmd_duration}s: {e}", "CRITICAL")
# Explicitly preserve the original signature for slash command introspection
wrapper.__signature__ = inspect.signature(func)
return wrapper
return decorator
def format_uptime(seconds: float) -> tuple[str, int]:
"""
@ -209,49 +316,42 @@ def sanitize_user_input(
# Help command logic
#####################
async def handle_help_command(ctx, command_name, bot, is_discord, log_func):
async def handle_help_command(ctx, command_name, bot, is_discord):
"""
Called by the platform-specific help commands to provide the help text.
:param ctx: discord.py or twitchio context
:param command_name: e.g. "quote" or None if user typed just "!help"
:param bot: The current bot instance
:param is_discord: True for Discord, False for Twitch
:param log_func: The logging function
"""
# If there's no loaded help_data, we can't do much
if not hasattr(bot, "help_data") or not bot.help_data:
return await send_message(ctx, "No help data found.")
return await "No help data found."
help_data = bot.help_data # The parsed JSON from e.g. help_discord.json
if "commands" not in help_data:
return await send_message(ctx, "Invalid help data structure (no 'commands' key).")
return "Invalid help data structure (no 'commands' key).\n*This is due to an error with the help file.*"
if not command_name:
# User typed just "!help" => list all known commands from this bot
loaded_cmds = get_loaded_commands(bot, log_func, is_discord)
loaded_cmds = get_loaded_commands(bot, is_discord)
if not loaded_cmds:
return await send_message(ctx, "I have no commands loaded.")
return "I have no commands loaded."
else:
if is_discord:
help_str = f"I currently offer these commands:"
for cmd in loaded_cmds:
help_str += f"\n- !{cmd}"
help_str += f"\n*Use '!help <command>' for more details.*"
return await send_message(
ctx,
help_str
)
return help_str
else:
short_list = ", ".join(loaded_cmds)
# We can also mention "Use !help [command] for more info."
return await send_message(
ctx,
f"I currently offer these commands:{short_list}. \nUse '!help <command>' for details."
)
return f"I currently offer these commands:{short_list}. \nUse '!help <command>' for details."
# 1) Check if the command is loaded
loaded = (command_name in get_loaded_commands(bot, log_func, is_discord))
loaded = (command_name in get_loaded_commands(bot, is_discord))
# 2) Check if it has help info in the JSON
cmd_help = help_data["commands"].get(command_name, None)
@ -261,20 +361,20 @@ async def handle_help_command(ctx, command_name, bot, is_discord, log_func):
msg = build_discord_help_message(command_name, cmd_help)
else:
msg = build_twitch_help_message(command_name, cmd_help)
await send_message(ctx, msg)
return msg
elif loaded and not cmd_help:
# The command is loaded but no help info => mention that
await send_message(ctx, f"The '{command_name}' command is loaded but has no help info yet.")
return f"The '{command_name}' command is loaded but has no help info yet."
elif (not loaded) and cmd_help:
# The command is not loaded, but we have an entry => mention it's unloaded/deprecated
await send_message(ctx, f"The '{command_name}' command is not currently loaded (deprecated or unavailable).")
return f"The '{command_name}' command is not currently loaded (deprecated or unavailable)."
else:
# Not loaded, no help info => not found at all
await send_message(ctx, f"I'm sorry, I don't offer a command named '{command_name}'.")
return f"I'm sorry, I don't offer a command named '{command_name}'."
def initialize_help_data(bot, help_json_path, is_discord, log_func):
def initialize_help_data(bot, help_json_path, is_discord):
"""
Loads help data from a JSON file, stores it in bot.help_data,
then verifies each loaded command vs. the help_data.
@ -286,7 +386,7 @@ def initialize_help_data(bot, help_json_path, is_discord, log_func):
platform_name = "Discord" if is_discord else "Twitch"
if not os.path.exists(help_json_path):
log_func(f"Help file '{help_json_path}' not found. No help data loaded.", "WARNING")
globals.log(f"Help file '{help_json_path}' not found. No help data loaded.", "WARNING")
bot.help_data = {}
return
@ -295,15 +395,15 @@ def initialize_help_data(bot, help_json_path, is_discord, log_func):
with open(help_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
log_func(f"Error parsing help JSON '{help_json_path}': {e}", "ERROR")
globals.log(f"Error parsing help JSON '{help_json_path}': {e}", "ERROR")
data = {}
bot.help_data = data
# Now cross-check the loaded commands vs. the data
loaded_cmds = set(get_loaded_commands(bot, log_func, is_discord))
loaded_cmds = set(get_loaded_commands(bot, is_discord))
if "commands" not in data:
log_func(f"No 'commands' key in {help_json_path}, skipping checks.", "ERROR")
globals.log(f"No 'commands' key in {help_json_path}, skipping checks.", "ERROR")
return
file_cmds = set(data["commands"].keys())
@ -311,15 +411,15 @@ def initialize_help_data(bot, help_json_path, is_discord, log_func):
# 1) Commands in file but not loaded
missing_cmds = file_cmds - loaded_cmds
for cmd in missing_cmds:
log_func(f"Help file has '{cmd}', but it's not loaded on this {platform_name} bot (deprecated?).", "WARNING")
globals.log(f"Help file has '{cmd}', but it's not loaded on this {platform_name} bot (deprecated?).", "WARNING")
# 2) Commands loaded but not in file
needed_cmds = loaded_cmds - file_cmds
for cmd in needed_cmds:
log_func(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING")
globals.log(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING")
def get_loaded_commands(bot, log_func, is_discord):
def get_loaded_commands(bot, is_discord):
from discord.ext import commands as discord_commands
from twitchio.ext import commands as twitch_commands
@ -327,9 +427,9 @@ def get_loaded_commands(bot, log_func, is_discord):
try:
_bot_type = str(type(bot)).split("_")[1].split(".")[0]
log_func(f"Currently processing commands for {_bot_type} ...", "DEBUG")
globals.log(f"Currently processing commands for {_bot_type} ...", "DEBUG")
except Exception as e:
log_func(f"Unable to determine current bot type: {e}", "WARNING")
globals.log(f"Unable to determine current bot type: {e}", "WARNING")
# For Discord
if is_discord:
@ -339,21 +439,21 @@ def get_loaded_commands(bot, log_func, is_discord):
for cmd_obj in bot.commands:
commands_list.append(cmd_obj.name)
debug_level ="DEBUG" if len(commands_list) > 0 else "WARNING"
log_func(f"Discord commands body: {commands_list}", f"{debug_level}")
globals.log(f"Discord commands body: {commands_list}", f"{debug_level}")
except Exception as e:
log_func(f"Error retrieving Discord commands: {e}", "ERROR")
globals.log(f"Error retrieving Discord commands: {e}", "ERROR")
elif not is_discord:
try:
for cmd_obj in bot._commands:
commands_list.append(cmd_obj)
debug_level ="DEBUG" if len(commands_list) > 0 else "WARNING"
log_func(f"Twitch commands body: {commands_list}", f"{debug_level}")
globals.log(f"Twitch commands body: {commands_list}", f"{debug_level}")
except Exception as e:
log_func(f"Error retrieving Twitch commands: {e}", "ERROR")
globals.log(f"Error retrieving Twitch commands: {e}", "ERROR")
else:
log_func(f"Unable to determine platform in 'get_loaded_commands()'!", "CRITICAL")
globals.log(f"Unable to determine platform in 'get_loaded_commands()'!", "CRITICAL")
log_func(f"... Finished processing commands for {_bot_type} ...", "DEBUG")
globals.log(f"... Finished processing commands for {_bot_type} ...", "DEBUG")
return sorted(commands_list)
@ -421,7 +521,6 @@ async def send_message(ctx, text):
def track_user_activity(
db_conn,
log_func,
platform: str,
user_id: str,
username: str,
@ -432,7 +531,6 @@ def track_user_activity(
Checks or creates/updates a user in the 'users' table for the given platform's message.
:param db_conn: The active DB connection
:param log_func: The logging function (message, level="INFO")
:param platform: "discord" or "twitch"
:param user_id: e.g., Discord user ID or Twitch user ID
:param username: The raw username (no #discriminator for Discord)
@ -440,18 +538,18 @@ def track_user_activity(
:param user_is_bot: Boolean if the user is recognized as a bot on that platform
"""
log_func(f"UUI Lookup for: {username} - {user_id} ({platform.lower()}) ...", "DEBUG")
globals.log(f"UUI Lookup for: {username} - {user_id} ({platform.lower()}) ...", "DEBUG")
# Decide which column we use for the ID lookup
# "discord_user_id" or "twitch_user_id"
if platform.lower() in ("discord", "twitch"):
identifier_type = f"{platform.lower()}_user_id"
else:
log_func(f"Unknown platform '{platform}' in track_user_activity!", "WARNING")
globals.log(f"Unknown platform '{platform}' in track_user_activity!", "WARNING")
return
# 1) Try to find an existing user row
user_data = lookup_user(db_conn, log_func, identifier=user_id, identifier_type=identifier_type)
user_data = lookup_user(db_conn, identifier=user_id, identifier_type=identifier_type)
if user_data:
# Found an existing row for that user ID on this platform
@ -460,7 +558,7 @@ def track_user_activity(
column_updates = []
params = []
log_func(f"... Returned {user_data}", "DEBUG")
globals.log(f"... Returned {user_data}", "DEBUG")
if platform.lower() == "discord":
if user_data["discord_username"] != username:
@ -491,9 +589,9 @@ def track_user_activity(
"""
params.append(user_id)
rowcount = run_db_operation(db_conn, "update", update_sql, params=params, log_func=log_func)
rowcount = run_db_operation(db_conn, "update", update_sql, params=params)
if rowcount and rowcount > 0:
log_func(f"Updated Discord user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
globals.log(f"Updated Discord user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
elif platform.lower() == "twitch":
if user_data["twitch_username"] != username:
@ -521,9 +619,9 @@ def track_user_activity(
"""
params.append(user_id)
rowcount = run_db_operation(db_conn, "update", update_sql, params=params, log_func=log_func)
rowcount = run_db_operation(db_conn, "update", update_sql, params=params)
if rowcount and rowcount > 0:
log_func(f"Updated Twitch user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
globals.log(f"Updated Twitch user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
else:
# 2) No row found => create a new user row
@ -556,25 +654,25 @@ def track_user_activity(
"""
params = (new_uuid, user_id, username, display_name, int(user_is_bot))
rowcount = run_db_operation(db_conn, "write", insert_sql, params, log_func=log_func)
rowcount = run_db_operation(db_conn, "write", insert_sql, params)
if rowcount and rowcount > 0:
log_func(f"Created new user row for {platform} user '{username}' (display '{display_name}') with UUID={new_uuid}.", "DEBUG")
globals.log(f"Created new user row for {platform} user '{username}' (display '{display_name}') with UUID={new_uuid}.", "DEBUG")
else:
log_func(f"Failed to create new user row for {platform} user '{username}'", "ERROR")
globals.log(f"Failed to create new user row for {platform} user '{username}'", "ERROR")
from modules.db import log_bot_event
def log_bot_startup(db_conn, log_func):
def log_bot_startup(db_conn):
"""
Logs a bot startup event.
"""
log_bot_event(db_conn, log_func, "BOT_STARTUP", "Bot successfully started.")
log_bot_event(db_conn, "BOT_STARTUP", "Bot successfully started.")
def log_bot_shutdown(db_conn, log_func, intent: str = "Error/Crash"):
def log_bot_shutdown(db_conn, intent: str = "Error/Crash"):
"""
Logs a bot shutdown event.
"""
log_bot_event(db_conn, log_func, "BOT_SHUTDOWN", f"Bot is shutting down - {intent}.")
log_bot_event(db_conn, "BOT_SHUTDOWN", f"Bot is shutting down - {intent}.")
def generate_link_code():
"""Generates a unique 8-character alphanumeric link code."""
@ -585,11 +683,11 @@ def generate_link_code():
###############################################
# Development Test Function (called upon start)
###############################################
def dev_func(db_conn, log, enable: bool = False):
def dev_func(db_conn, enable: bool = False):
if enable:
id = "203190147582394369"
id_type = "discord_user_id"
uui_info = lookup_user(db_conn, log, identifier=id, identifier_type=id_type)
uui_info = lookup_user(db_conn, identifier=id, identifier_type=id_type)
if uui_info:
return list(uui_info.values())
else: