Reworked internal logging

- All logging functionality has been moved into the Logger class
  - Initiated globally as globals.logger()
  - Logging format changed from `globals.log("message_str", "LEVEL")` => `logger.level("message_str")`
- Minor changes to Twitch authentication
experimental
Kami 2025-03-13 14:17:43 +01:00
parent d541f65804
commit 5023ea9919
10 changed files with 497 additions and 414 deletions

View File

@ -8,6 +8,7 @@ import json
import os
import globals
from globals import logger
import modules
import modules.utility
@ -21,11 +22,11 @@ class DiscordBot(commands.Bot):
super().__init__(command_prefix="!", intents=discord.Intents.all())
self.remove_command("help") # Remove built-in help function
self.config = globals.constants.config_data
self.log = globals.log # Use the logging function from bots.py
self.log = globals.logger # Use the logging function from bots.py
self.db_conn = None # We'll set this later
self.help_data = None # We'll set this later
globals.log("Discord bot initiated")
logger.info("Discord bot initiated")
# async def sync_slash_commands(self):
# """Syncs slash commands for the bot."""
@ -54,9 +55,9 @@ class DiscordBot(commands.Bot):
# Log which cogs got loaded
short_name = filename[:-3]
globals.log(f"Loaded Discord command cog '{short_name}'", "DEBUG")
logger.debug(f"Loaded Discord command cog '{short_name}'")
globals.log("All Discord command cogs loaded successfully.", "INFO")
logger.info("All Discord command cogs loaded successfully.")
# Now that cogs are all loaded, run any help file initialization:
help_json_path = "dictionary/help_discord.json"
@ -74,10 +75,10 @@ class DiscordBot(commands.Bot):
await ctx.bot.unload_extension(f"cmd_discord.{cog_name}")
await ctx.bot.load_extension(f"cmd_discord.{cog_name}")
await ctx.reply(f"✅ Reloaded `{cog_name}` successfully!")
globals.log(f"Successfully reloaded the command cog `{cog_name}`", "INFO")
logger.info(f"Successfully reloaded the command cog `{cog_name}`")
except Exception as e:
await ctx.reply(f"❌ Error reloading `{cog_name}`: {e}")
globals.log(f"Failed to reload the command cog `{cog_name}`", "ERROR")
logger.error(f"Failed to reload the command cog `{cog_name}`")
async def on_message(self, message):
if message.guild:
@ -87,7 +88,7 @@ class DiscordBot(commands.Bot):
guild_name = "DM"
channel_name = "Direct Message"
globals.log(f"Message detected by '{message.author.name}' in '{guild_name}' - #'{channel_name}'", "DEBUG")
logger.debug(f"Message detected by '{message.author.name}' in '{guild_name}' - #'{channel_name}'")
try:
is_bot = message.author.bot
@ -120,14 +121,14 @@ class DiscordBot(commands.Bot):
platform_message_id=str(message.id) # Include Discord message ID
)
except Exception as e:
globals.log(f"... UUI lookup failed: {e}", "WARNING")
logger.warning(f"... UUI lookup failed: {e}")
pass
try:
await self.process_commands(message)
globals.log(f"Command processing complete", "DEBUG")
logger.debug(f"Command processing complete")
except Exception as e:
globals.log(f"Command processing failed: {e}", "ERROR")
logger.error(f"Command processing failed: {e}")
# async def on_reaction_add(self, reaction, user):
# if user.bot:
@ -136,7 +137,7 @@ class DiscordBot(commands.Bot):
# guild_name = reaction.message.guild.name if reaction.message.guild else "DM"
# channel_name = reaction.message.channel.name if hasattr(reaction.message.channel, "name") else "Direct Message"
# globals.log(f"Reaction added by '{user.name}' on message '{reaction.message.id}' in '{guild_name}' - #{channel_name}", "DEBUG")
# logger.debug(f"Reaction added by '{user.name}' on message '{reaction.message.id}' in '{guild_name}' - #{channel_name}")
# modules.utility.track_user_activity(
# db_conn=self.db_conn,
@ -165,7 +166,7 @@ class DiscordBot(commands.Bot):
# guild_name = reaction.message.guild.name if reaction.message.guild else "DM"
# channel_name = reaction.message.channel.name if hasattr(reaction.message.channel, "name") else "Direct Message"
# globals.log(f"Reaction removed by '{user.name}' on message '{reaction.message.id}' in '{guild_name}' - #{channel_name}", "DEBUG")
# logger.debug(f"Reaction removed by '{user.name}' on message '{reaction.message.id}' in '{guild_name}' - #{channel_name}")
# modules.utility.track_user_activity(
# db_conn=self.db_conn,
@ -194,8 +195,8 @@ class DiscordBot(commands.Bot):
# guild_name = before.guild.name if before.guild else "DM"
# channel_name = before.channel.name if hasattr(before.channel, "name") else "Direct Message"
# globals.log(f"Message edited by '{before.author.name}' in '{guild_name}' - #{channel_name}", "DEBUG")
# globals.log(f"Before: {before.content}\nAfter: {after.content}", "DEBUG")
# logger.debug(f"Message edited by '{before.author.name}' in '{guild_name}' - #{channel_name}")
# logger.debug(f"Before: {before.content}\nAfter: {after.content}")
# modules.utility.track_user_activity(
# db_conn=self.db_conn,
@ -218,7 +219,7 @@ class DiscordBot(commands.Bot):
# )
# async def on_thread_create(self, thread):
# globals.log(f"Thread '{thread.name}' created in #{thread.parent.name}", "DEBUG")
# logger.debug(f"Thread '{thread.name}' created in #{thread.parent.name}")
# modules.utility.track_user_activity(
# db_conn=self.db_conn,
@ -239,7 +240,7 @@ class DiscordBot(commands.Bot):
# )
# async def on_thread_update(self, before, after):
# globals.log(f"Thread updated: '{before.name}' -> '{after.name}'", "DEBUG")
# logger.debug(f"Thread updated: '{before.name}' -> '{after.name}'")
# log_message(
# db_conn=self.db_conn,
@ -251,7 +252,7 @@ class DiscordBot(commands.Bot):
# )
# async def on_thread_delete(self, thread):
# globals.log(f"Thread '{thread.name}' deleted", "DEBUG")
# logger.debug(f"Thread '{thread.name}' deleted")
# log_message(
# db_conn=self.db_conn,
@ -269,7 +270,7 @@ class DiscordBot(commands.Bot):
with open("settings/discord_bot_settings.json", "r") as file:
return json.load(file)
except Exception as e:
self.log(f"Failed to load settings: {e}", "ERROR")
self.log.error(f"Failed to load settings: {e}")
return {
"activity_mode": 0,
"static_activity": {"type": "Playing", "name": "with my commands!"},
@ -282,8 +283,8 @@ class DiscordBot(commands.Bot):
"""Logs every command execution at DEBUG level."""
_cmd_args = str(ctx.message.content).split(" ")[1:]
channel_name = "Direct Message" if "Direct Message with" in str(ctx.channel) else ctx.channel
globals.log(f"Command '{ctx.command}' (Discord) initiated by {ctx.author} in #{channel_name}", "DEBUG")
if len(_cmd_args) > 1: globals.log(f"!{ctx.command} arguments: {_cmd_args}", "DEBUG")
logger.debug(f"Command '{ctx.command}' (Discord) initiated by {ctx.author} in #{channel_name}")
if len(_cmd_args) > 1: logger.debug(f"!{ctx.command} arguments: {_cmd_args}")
async def on_interaction(interaction: discord.Interaction):
# Only log application command (slash command) interactions.
@ -301,13 +302,10 @@ class DiscordBot(commands.Bot):
else:
channel_name = "Direct Message"
globals.log(
f"Command '{command_name}' (Discord) initiated by {interaction.user} in #{channel_name}",
"DEBUG"
)
logger.debug(f"Command '{command_name}' (Discord) initiated by {interaction.user} in #{channel_name}")
if option_values:
globals.log(f"Command '{command_name}' arguments: {option_values}", "DEBUG")
logger.debug(f"Command '{command_name}' arguments: {option_values}")
async def on_ready(self):
"""Runs when the bot successfully logs in."""
@ -322,7 +320,7 @@ class DiscordBot(commands.Bot):
try:
# Sync slash commands globally
#await self.tree.sync()
#globals.log("Discord slash commands synced.")
#logger.info("Discord slash commands synced.")
num_guilds = len(self.config["discord_guilds"])
cmd_tree_result = (await self.tree.sync(guild=primary_guild["object"]))
command_names = [command.name for command in cmd_tree_result] if cmd_tree_result else None
@ -332,22 +330,22 @@ class DiscordBot(commands.Bot):
primary_guild_name = guild_info["name"]
except Exception as e:
primary_guild_name = f"{primary_guild["id"]}"
globals.log(f"Guild lookup failed: {e}", "ERROR")
logger.error(f"Guild lookup failed: {e}")
_log_message = f"{num_guilds} guilds (global)" if num_guilds > 1 else f"guild: {primary_guild_name}"
globals.log(f"Discord slash commands force synced to {_log_message}")
globals.log(f"Discord slash commands that got synced: {command_names}")
logger.info(f"Discord slash commands force synced to {_log_message}")
logger.info(f"Discord slash commands that got synced: {command_names}")
else:
globals.log("Discord commands synced globally.")
logger.info("Discord commands synced globally.")
except Exception as e:
globals.log(f"Unable to sync Discord slash commands: {e}")
logger.error(f"Unable to sync Discord slash commands: {e}")
# Log successful bot startup
globals.log(f"Discord bot is online as {self.user}")
logger.info(f"Discord bot is online as {self.user}")
log_bot_event(self.db_conn, "DISCORD_RECONNECTED", "Discord bot logged in.")
async def on_disconnect(self):
globals.log("Discord bot has lost connection!", "WARNING")
logger.warning("Discord bot has lost connection!")
log_bot_event(self.db_conn, "DISCORD_DISCONNECTED", "Discord bot lost connection.")
async def update_activity(self):
@ -361,7 +359,7 @@ class DiscordBot(commands.Bot):
if mode == 0:
# Disable activity
await self.change_presence(activity=None)
self.log("Activity disabled", "DEBUG")
self.log.debug("Activity disabled")
elif mode == 1:
# Static activity
@ -369,10 +367,10 @@ class DiscordBot(commands.Bot):
if activity_data:
activity = self.get_activity(activity_data.get("type"), activity_data.get("name"))
await self.change_presence(activity=activity)
self.log(f"Static activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
self.log.debug(f"Static activity set: {activity_data['type']} {activity_data['name']}")
else:
await self.change_presence(activity=None)
self.log("No static activity defined", "DEBUG")
self.log.debug("No static activity defined")
elif mode == 2:
# Rotating activity
@ -380,24 +378,24 @@ class DiscordBot(commands.Bot):
if activities:
self.change_rotating_activity.change_interval(seconds=self.settings.get("rotation_interval", 300))
self.change_rotating_activity.start()
self.log("Rotating activity mode enabled", "DEBUG")
self.log.debug("Rotating activity mode enabled")
else:
self.log("No rotating activities defined, falling back to static.", "INFO")
self.log.info("No rotating activities defined, falling back to static.")
await self.update_activity_static()
elif mode == 3:
# Dynamic activity with fallback
if not await self.set_dynamic_activity():
self.log("Dynamic activity unavailable, falling back.", "INFO")
self.log.info("Dynamic activity unavailable, falling back.")
# Fallback to rotating or static
if self.settings.get("rotating_activities"):
self.change_rotating_activity.start()
self.log("Falling back to rotating activity.", "DEBUG")
self.log.debug("Falling back to rotating activity.")
else:
await self.update_activity_static()
else:
self.log("Invalid activity mode, defaulting to disabled.", "WARNING")
self.log.warning("Invalid activity mode, defaulting to disabled.")
await self.change_presence(activity=None)
async def update_activity_static(self):
@ -406,17 +404,17 @@ class DiscordBot(commands.Bot):
if activity_data:
activity = self.get_activity(activity_data.get("type"), activity_data.get("name"))
await self.change_presence(activity=activity)
self.log(f"Static activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
self.log.debug(f"Static activity set: {activity_data['type']} {activity_data['name']}")
else:
await self.change_presence(activity=None)
self.log("No static activity defined, activity disabled.", "DEBUG")
self.log.debug("No static activity defined, activity disabled.")
@tasks.loop(seconds=300) # Default to 5 minutes
async def change_rotating_activity(self):
"""Rotates activities every set interval."""
activities = self.settings.get("rotating_activities", [])
if not activities:
self.log("No rotating activities available, stopping rotation.", "INFO")
self.log.info("No rotating activities available, stopping rotation.")
self.change_rotating_activity.stop()
return
@ -426,7 +424,7 @@ class DiscordBot(commands.Bot):
activity = self.get_activity(activity_data.get("type"), activity_data.get("name"))
await self.change_presence(activity=activity)
self.log(f"Rotating activity: {activity_data['type']} {activity_data['name']}", "DEBUG")
self.log.debug(f"Rotating activity: {activity_data['type']} {activity_data['name']}")
async def set_dynamic_activity(self):
"""Sets a dynamic activity based on external conditions."""
@ -440,7 +438,7 @@ class DiscordBot(commands.Bot):
if activity_data:
activity = self.get_activity(activity_data.get("type"), activity_data.get("name"), activity_data.get("url"))
await self.change_presence(activity=activity)
self.log(f"Dynamic activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
self.log.debug(f"Dynamic activity set: {activity_data['type']} {activity_data['name']}")
return True # Dynamic activity was set
return False # No dynamic activity available
@ -469,7 +467,7 @@ class DiscordBot(commands.Bot):
user_uuid = modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
globals.log(f"User {member.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "INFO")
logger.info(f"User {member.name} ({discord_user_id}) not found in 'users'. Attempting to add...")
modules.utility.track_user_activity(
db_conn=self.db_conn,
platform="discord",
@ -480,10 +478,10 @@ class DiscordBot(commands.Bot):
)
user_uuid= modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
globals.log(f"Failed to associate {member.name} ({discord_user_id}) with a UUID. Skipping activity log.", "WARNING")
logger.warning(f"Failed to associate {member.name} ({discord_user_id}) with a UUID. Skipping activity log.")
return # Prevent logging with invalid UUID
if user_uuid:
globals.log(f"Successfully added {member.name} ({discord_user_id}) to the UUI database.", "INFO")
logger.info(f"Successfully added {member.name} ({discord_user_id}) to the UUI database.")
# Detect join and leave events
if before.channel is None and after.channel is not None:
@ -541,7 +539,7 @@ class DiscordBot(commands.Bot):
)
if not user_uuid:
globals.log(f"User {after.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "WARNING")
logger.info(f"User {after.name} ({discord_user_id}) not found in 'users'. Attempting to add...")
modules.utility.track_user_activity(
db_conn=self.db_conn,
platform="discord",
@ -552,10 +550,10 @@ class DiscordBot(commands.Bot):
)
user_uuid = modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
globals.log(f"ERROR: Failed to associate {after.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
logger.error(f"Failed to associate {after.name} ({discord_user_id}) with a UUID. Skipping activity log.")
return
if user_uuid:
globals.log(f"Successfully added {after.name} ({discord_user_id}) to the UUI database.", "INFO")
logger.info(f"Successfully added {after.name} ({discord_user_id}) to the UUI database.")
# Check all activities
new_activity = None
@ -646,4 +644,4 @@ class DiscordBot(commands.Bot):
try:
await super().start(token)
except Exception as e:
globals.log(f"Discord bot error: {e}", "CRITICAL")
logger.critical(f"Discord bot error: {e}")

View File

@ -7,6 +7,7 @@ import importlib
import cmd_twitch
import globals
from globals import logger
import modules
import modules.utility
@ -20,7 +21,7 @@ class TwitchBot(commands.Bot):
self.client_secret = os.getenv("TWITCH_CLIENT_SECRET")
self.token = os.getenv("TWITCH_BOT_TOKEN")
self.refresh_token = os.getenv("TWITCH_REFRESH_TOKEN")
self.log = globals.log # Use the logging function from bots.py
self.log = globals.logger # Use the logging function from bots.py
self.config = globals.constants.config_data
self.db_conn = None # We'll set this later
self.help_data = None # We'll set this later
@ -32,7 +33,7 @@ class TwitchBot(commands.Bot):
initial_channels=twitch_channels
)
globals.log("Twitch bot initiated")
logger.info("Twitch bot initiated")
# 2) Then load commands
self.load_commands()
@ -61,7 +62,7 @@ class TwitchBot(commands.Bot):
user_name = author.name
display_name = author.display_name or user_name
globals.log(f"Message detected, attempting UUI lookup on {user_name} ...", "DEBUG")
logger.debug(f"Message detected, attempting UUI lookup on {user_name} ...")
modules.utility.track_user_activity(
db_conn=self.db_conn,
@ -72,7 +73,7 @@ class TwitchBot(commands.Bot):
user_is_bot=is_bot
)
globals.log("... UUI lookup complete.", "DEBUG")
logger.debug("... UUI lookup complete.")
log_message(
db_conn=self.db_conn,
@ -86,29 +87,25 @@ class TwitchBot(commands.Bot):
)
except Exception as e:
globals.log(f"... UUI lookup failed: {e}", "ERROR")
logger.error(f"... UUI lookup failed: {e}")
await self.handle_commands(message)
async def event_ready(self):
globals.log(f"Twitch bot is online as {self.nick}")
logger.info(f"Twitch bot is online as {self.nick}")
modules.utility.list_channels(self)
kami_status = "OokamiKunTV is currently LIVE" if await modules.utility.is_channel_live(self) else "OokamikunTV is currently not streaming"
globals.log(kami_status)
logger.info(kami_status)
log_bot_event(self.db_conn, "TWITCH_RECONNECTED", "Twitch bot logged in.")
async def event_disconnected(self):
globals.log("Twitch bot has lost connection!", "WARNING")
logger.warning("Twitch bot has lost connection!")
log_bot_event(self.db_conn, "TWITCH_DISCONNECTED", "Twitch bot lost connection.")
async def refresh_access_token(self, automatic=False):
"""
Refresh the Twitch access token using the stored refresh token.
If 'automatic' is True, do NOT shut down the bot or require manual restart.
Return True if success, False if not.
"""
self.log("Attempting to refresh Twitch token...")
async def refresh_access_token(self, automatic=False, retries=1):
"""Refresh the Twitch access token and ensure it is applied correctly."""
self.log.info("Attempting to refresh Twitch token...")
url = "https://id.twitch.tv/oauth2/token"
params = {
@ -118,58 +115,75 @@ class TwitchBot(commands.Bot):
"grant_type": "refresh_token"
}
try:
response = requests.post(url, params=params)
data = response.json()
self.log(f"Twitch token response: {data}", "DEBUG")
for attempt in range(retries + 1):
try:
response = requests.post(url, params=params)
data = response.json()
self.log.debug(f"Twitch token response: {data}")
if "access_token" in data:
self.token = data["access_token"]
self.refresh_token = data.get("refresh_token", self.refresh_token)
os.environ["TWITCH_BOT_TOKEN"] = self.token
os.environ["TWITCH_REFRESH_TOKEN"] = self.refresh_token
self.update_env_file()
if "access_token" in data:
_before_token = os.getenv("TWITCH_BOT_TOKEN", "")
# Validate newly refreshed token:
if not await self.validate_token():
self.log("New token is still invalid, re-auth required.", "CRITICAL")
self.token = data["access_token"]
self.refresh_token = data.get("refresh_token", self.refresh_token)
os.environ["TWITCH_BOT_TOKEN"] = self.token
os.environ["TWITCH_REFRESH_TOKEN"] = self.refresh_token
self.update_env_file()
await asyncio.sleep(1) # Allow Twitch API time to register the new token
# Ensure bot reloads the updated token
self.token = os.getenv("TWITCH_BOT_TOKEN")
if self.token == _before_token:
self.log.critical("Token did not change after refresh! Avoiding refresh loop.")
return False
self.log.info("Twitch token successfully refreshed.")
# Validate the new token
if not await self.validate_token():
self.log.critical("New token is still invalid, re-auth required.")
if not automatic:
await self.prompt_manual_token()
return False
return True # Successful refresh
elif "error" in data and data["error"] == "invalid_grant":
self.log.critical("Refresh token is invalid or expired; manual re-auth required.")
if not automatic:
await self.prompt_manual_token()
return False
else:
self.log.error(f"Unexpected refresh response: {data}")
if not automatic:
await self.prompt_manual_token()
return False
self.log("Twitch token refreshed successfully.")
return True
elif "error" in data and data["error"] == "invalid_grant":
self.log("Refresh token is invalid or expired; manual re-auth required.", "CRITICAL")
if not automatic:
await self.prompt_manual_token()
return False
else:
self.log(f"Unexpected refresh response: {data}", "ERROR")
if not automatic:
await self.prompt_manual_token()
return False
except Exception as e:
self.log(f"Twitch token refresh error: {e}", "ERROR")
if not automatic:
await self.prompt_manual_token()
return False
except Exception as e:
self.log.error(f"Twitch token refresh error: {e}")
if attempt < retries:
self.log.warning(f"Retrying token refresh in 2 seconds... (Attempt {attempt + 1}/{retries})")
await asyncio.sleep(2)
else:
if not automatic:
await self.prompt_manual_token()
return False
async def shutdown_gracefully(self):
"""
Gracefully shuts down the bot, ensuring all resources are cleaned up.
"""
self.log("Closing Twitch bot gracefully...", "INFO")
self.log.info("Closing Twitch bot gracefully...")
try:
await self.close() # Closes TwitchIO bot properly
self.log("Twitch bot closed successfully.", "INFO")
self.log.info("Twitch bot closed successfully.")
except Exception as e:
self.log(f"Error during bot shutdown: {e}", "ERROR")
self.log.error(f"Error during bot shutdown: {e}")
self.log("Bot has been stopped. Please restart it manually.", "FATAL")
self.log.fatal("Bot has been stopped. Please restart it manually.")
async def validate_token(self):
"""
@ -180,25 +194,25 @@ class TwitchBot(commands.Bot):
try:
response = requests.get(url, headers=headers)
self.log(f"Token validation response: {response.status_code}, {response.text}", "DEBUG")
self.log.debug(f"Token validation response: {response.status_code}, {response.text}")
return response.status_code == 200
except Exception as e:
self.log(f"Error during token validation: {e}", "ERROR")
self.log.error(f"Error during token validation: {e}")
return False
async def prompt_manual_token(self):
"""
Prompt the user in-terminal to manually enter a new Twitch access token.
"""
self.log("Prompting user for manual Twitch token input.", "WARNING")
self.log.warning("Prompting user for manual Twitch token input.")
new_token = input("Enter a new valid Twitch access token: ").strip()
if new_token:
self.token = new_token
os.environ["TWITCH_BOT_TOKEN"] = self.token
self.update_env_file()
self.log("New Twitch token entered manually. Please restart the bot.", "INFO")
self.log.info("New Twitch token entered manually. Please restart the bot.")
else:
self.log("No valid token entered. Bot cannot continue.", "FATAL")
self.log.fatal("No valid token entered. Bot cannot continue.")
async def try_refresh_and_reconnect(self) -> bool:
"""
@ -213,12 +227,12 @@ class TwitchBot(commands.Bot):
# If we got here, we have a valid new token.
# We can call self.start() again in the same run.
self.log("Re-initializing the Twitch connection with the new token...", "INFO")
self.log.info("Re-initializing the Twitch connection with the new token...")
self._http.token = self.token # Make sure TwitchIO sees the new token
await self.start()
return True
except Exception as e:
self.log(f"Auto-reconnect failed after token refresh: {e}", "ERROR")
self.log.error(f"Auto-reconnect failed after token refresh: {e}")
return False
@ -239,10 +253,10 @@ class TwitchBot(commands.Bot):
else:
file.write(line)
globals.log("Updated .env file with new Twitch token.")
logger.info("Updated .env file with new Twitch token.")
except Exception as e:
globals.log(f"Failed to update .env file: {e}", "ERROR")
logger.error(f"Failed to update .env file: {e}")
def load_commands(self):
"""
@ -250,7 +264,7 @@ class TwitchBot(commands.Bot):
"""
try:
cmd_twitch.setup(self)
globals.log("Twitch commands loaded successfully.")
logger.info("Twitch commands loaded successfully.")
# Now load the help info from dictionary/help_twitch.json
help_json_path = "dictionary/help_twitch.json"
@ -261,50 +275,51 @@ class TwitchBot(commands.Bot):
)
except Exception as e:
globals.log(f"Error loading Twitch commands: {e}", "ERROR")
logger.error(f"Error loading Twitch commands: {e}")
async def run(self):
"""
Attempt to start the bot once. If token is invalid, refresh it,
then re-instantiate a fresh TwitchBot in the same Python process.
This avoids any manual restarts or external managers.
Attempt to start the bot once. If the token is invalid, refresh it first,
then re-instantiate a fresh TwitchBot within the same Python process.
This avoids manual restarts or external managers.
"""
# Attempt token refresh before starting the bot
refresh_success = await self.refresh_access_token()
if not refresh_success:
self.log.shutdown("Token refresh failed. Shutting down in the same run.")
return
await asyncio.sleep(1) # Give Twitch a moment to recognize the refreshed token
try:
# Normal attempt: just call self.start()
await self.start()
except Exception as e:
self.log(f"Twitch bot failed to start: {e}", "CRITICAL")
self.log.critical(f"Twitch bot failed to start: {e}")
# Check if error is invalid token
if "Invalid or unauthorized Access Token passed." in str(e):
self.log("Attempting token refresh...", "WARNING")
refresh_success = await self.refresh_access_token()
if not refresh_success:
# If refresh truly failed => we can't proceed.
# Log a shutdown, do no external restart.
self.log("Refresh failed. Shutting down in same run. Token is invalid.", "SHUTDOWN")
self.log.warning("Token became invalid after refresh. Attempting another refresh...")
if not await self.refresh_access_token():
self.log.shutdown("Second refresh attempt failed. Shutting down.")
return
# If refresh succeeded, we have a new valid token in .env.
# Now we must forcibly close THIS bot instance.
try:
self.log("Closing old bot instance after refresh...", "DEBUG")
self.log.debug("Closing old bot instance after refresh...")
await self.close()
await asyncio.sleep(1) # give the old WebSocket time to fully close
except Exception as close_err:
self.log(f"Ignored close() error: {close_err}", "DEBUG")
self.log.debug(f"Ignored close() error: {close_err}")
# Create a brand-new instance, referencing the updated token from .env
self.log("Creating a fresh TwitchBot instance with the new token...", "INFO")
from bot_twitch import TwitchBot # Re-import or define
new_bot = TwitchBot() # Re-run __init__, loads new token from environment
self.log.info("Creating a fresh TwitchBot instance with the new token...")
from bot_twitch import TwitchBot
new_bot = TwitchBot()
new_bot.set_db_connection(self.db_conn)
self.log("Starting the new TwitchBot in the same run...", "INFO")
await new_bot.run() # Now call *its* run method
return # Our job is done
else:
# Unknown error => you can either do a SHUTDOWN or ignore
self.log("Could not connect due to an unknown error. Shutting down in same run...", "SHUTDOWN")
self.log.info("Starting the new TwitchBot in the same run in 3 seconds...")
await asyncio.sleep(1) # final delay
await new_bot.run()
return
else:
self.log.shutdown("Could not connect due to an unknown error. Shutting down.")
return

24
bots.py
View File

@ -12,6 +12,8 @@ import twitchio.ext
from discord.ext import commands
from dotenv import load_dotenv
from globals import logger
from bot_discord import DiscordBot
from bot_twitch import TwitchBot
@ -24,7 +26,7 @@ from modules import db, utility
load_dotenv()
# Clear previous current-run logfile
globals.reset_curlogfile()
logger.reset_curlogfile()
# Load bot configuration
config_data = globals.Constants.config_data
@ -37,14 +39,14 @@ async def main():
global discord_bot, twitch_bot, db_conn
# Log initial start
globals.log("--------------- BOT STARTUP ---------------")
logger.info("--------------- BOT STARTUP ---------------")
# Before creating your DiscordBot/TwitchBot, initialize DB
db_conn = globals.init_db_conn()
try: # Ensure FKs are enabled
db.checkenable_db_fk(db_conn)
except Exception as e:
globals.log(f"Unable to ensure Foreign keys are enabled: {e}", "WARNING")
logger.warning(f"Unable to ensure Foreign keys are enabled: {e}")
# auto-create the quotes table if it doesn't exist
tables = {
@ -62,11 +64,11 @@ async def main():
try:
for table, func in tables.items():
func() # Call the function with db_conn and log already provided
globals.log(f"{table} ensured.", "DEBUG")
logger.debug(f"{table} ensured.")
except Exception as e:
globals.log(f"Unable to ensure DB tables exist: {e}", "FATAL")
logger.fatal(f"Unable to ensure DB tables exist: {e}")
globals.log("Initializing bots...")
logger.info("Initializing bots...")
# Create both bots
discord_bot = DiscordBot()
@ -79,11 +81,11 @@ async def main():
try:
discord_bot.set_db_connection(db_conn)
twitch_bot.set_db_connection(db_conn)
globals.log(f"Initialized database connection to both bots")
logger.info(f"Initialized database connection to both bots")
except Exception as e:
globals.log(f"Unable to initialize database connection to one or both bots: {e}", "FATAL")
logger.fatal(f"Unable to initialize database connection to one or both bots: {e}")
globals.log("Starting Discord and Twitch bots...")
logger.info("Starting Discord and Twitch bots...")
discord_task = asyncio.create_task(discord_bot.run(os.getenv("DISCORD_BOT_TOKEN")))
twitch_task = asyncio.create_task(twitch_bot.run())
@ -92,7 +94,7 @@ async def main():
enable_dev_func = False
if enable_dev_func:
dev_func_result = dev_func(db_conn, enable_dev_func)
globals.log(f"dev_func output: {dev_func_result}")
logger.debug(f"dev_func output: {dev_func_result}")
await asyncio.gather(discord_task, twitch_task)
#await asyncio.gather(discord_task)
@ -104,5 +106,5 @@ if __name__ == "__main__":
utility.log_bot_shutdown(db_conn, intent="User Shutdown")
except Exception as e:
error_trace = traceback.format_exc()
globals.log(f"Fatal Error: {e}\n{error_trace}", "FATAL")
logger.fatal(f"Fatal Error: {e}\n{error_trace}")
utility.log_bot_shutdown(db_conn)

View File

@ -6,6 +6,8 @@ import globals
import json
import re
from globals import logger
from modules import db
#def howl(username: str) -> str:
@ -105,7 +107,7 @@ def handle_howl_normal(ctx, platform, author_id, author_display_name) -> str:
user_uuid = user_data["uuid"]
db.insert_howl(db_conn, user_uuid, howl_val)
else:
globals.log(f"Could not find user by ID={author_id} on {platform}. Not storing howl.", "WARNING")
logger.warning(f"Could not find user by ID={author_id} on {platform}. Not storing howl.")
utility.wfetl()
return reply
@ -189,7 +191,7 @@ def lookup_user_by_name(db_conn, platform, name_str):
return ud
else:
globals.log(f"Unknown platform '{platform}' in lookup_user_by_name", "WARNING")
logger.warning(f"Unknown platform '{platform}' in lookup_user_by_name")
utility.wfetl()
return None
@ -240,7 +242,7 @@ def create_quotes_table(db_conn):
"""
utility.wfstl()
if not db_conn:
globals.log("No database connection available to create quotes table!", "FATAL")
logger.fatal("No database connection available to create quotes table!")
utility.wfetl()
return
@ -321,7 +323,7 @@ async def handle_quote_command(db_conn, is_discord: bool, ctx, args, game_name=N
utility.wfetl()
return await retrieve_random_quote(db_conn)
except Exception as e:
globals.log(f"handle_quote_command() failed to retrieve a random quote: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to retrieve a random quote: {e}", exec_info=True)
sub = args[0].lower()
@ -335,7 +337,7 @@ async def handle_quote_command(db_conn, is_discord: bool, ctx, args, game_name=N
utility.wfetl()
return await add_new_quote(db_conn, is_discord, ctx, quote_text, game_name)
except Exception as e:
globals.log(f"handle_quote_command() failed to add a new quote: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to add a new quote: {e}", exec_info=True)
elif sub == "remove":
if len(args) < 2:
utility.wfetl()
@ -344,7 +346,7 @@ async def handle_quote_command(db_conn, is_discord: bool, ctx, args, game_name=N
utility.wfetl()
return await remove_quote(db_conn, is_discord, ctx, quote_id_str=args[1])
except Exception as e:
globals.log(f"handle_quote_command() failed to remove a quote: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to remove a quote: {e}", exec_info=True)
elif sub == "restore":
if len(args) < 2:
utility.wfetl()
@ -353,7 +355,7 @@ async def handle_quote_command(db_conn, is_discord: bool, ctx, args, game_name=N
utility.wfetl()
return await restore_quote(db_conn, is_discord, ctx, quote_id_str=args[1])
except Exception as e:
globals.log(f"handle_quote_command() failed to restore a quote: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to restore a quote: {e}", exec_info=True)
elif sub == "info":
if len(args) < 2:
utility.wfetl()
@ -366,7 +368,7 @@ async def handle_quote_command(db_conn, is_discord: bool, ctx, args, game_name=N
utility.wfetl()
return await retrieve_quote_info(db_conn, ctx, quote_id, is_discord)
except Exception as e:
globals.log(f"handle_quote_command() failed to retrieve quote info: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to retrieve quote info: {e}", exec_info=True)
elif sub == "search":
if len(args) < 2:
utility.wfetl()
@ -376,13 +378,13 @@ async def handle_quote_command(db_conn, is_discord: bool, ctx, args, game_name=N
utility.wfetl()
return await search_quote(db_conn, keywords, is_discord, ctx)
except Exception as e:
globals.log(f"handle_quote_command() failed to process quote search: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to process quote search: {e}", exec_info=True)
elif sub in ["last", "latest", "newest"]:
try:
utility.wfetl()
return await retrieve_latest_quote(db_conn)
except Exception as e:
globals.log(f"handle_quote_command() failed to retrieve latest quote: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to retrieve latest quote: {e}", exec_info=True)
else:
# Possibly a quote ID
if sub.isdigit():
@ -391,14 +393,14 @@ async def handle_quote_command(db_conn, is_discord: bool, ctx, args, game_name=N
utility.wfetl()
return await retrieve_specific_quote(db_conn, ctx, quote_id, is_discord)
except Exception as e:
globals.log(f"handle_quote_command() failed to retrieve a specific quote: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to retrieve a specific quote: {e}", exec_info=True)
else:
# unrecognized subcommand => fallback to random
try:
utility.wfetl()
return await retrieve_random_quote(db_conn)
except Exception as e:
globals.log(f"handle_quote_command() failed to retrieve a random quote: {e}", "ERROR", exec_info=True)
logger.error(f"handle_quote_command() failed to retrieve a random quote: {e}", exec_info=True)
async def add_new_quote(db_conn, is_discord, ctx, quote_text, game_name: str = None):
@ -412,7 +414,7 @@ async def add_new_quote(db_conn, is_discord, ctx, quote_text, game_name: str = N
# Lookup UUID from users table
user_data = db.lookup_user(db_conn, identifier=user_id, identifier_type=f"{platform}_user_id")
if not user_data:
globals.log(f"Could not find UUID for user {ctx.author.name} ({user_id}) on {platform}. Quote not saved.", "ERROR")
logger.error(f"Could not find UUID for user {ctx.author.name} ({user_id}) on {platform}. Quote not saved.")
utility.wfetl()
return "Could not save quote. Your user data is missing from the system."
@ -431,7 +433,7 @@ async def add_new_quote(db_conn, is_discord, ctx, quote_text, game_name: str = N
result = db.run_db_operation(db_conn, "write", insert_sql, params)
if result is not None:
quote_id = get_max_quote_id(db_conn)
globals.log(f"New quote added: {quote_text} ({quote_id})")
logger.info(f"New quote added: {quote_text} ({quote_id})")
utility.wfetl()
return f"Successfully added quote #{quote_id}"
else:
@ -455,7 +457,7 @@ async def remove_quote(db_conn, is_discord: bool, ctx, quote_id_str):
# Lookup UUID from users table
user_data = db.lookup_user(db_conn, identifier=user_id, identifier_type=f"{platform}_user_id")
if not user_data:
globals.log(f"Could not find UUID for user {ctx.author.name} ({user_id}) on {platform}. Quote not removed.", "ERROR")
logger.error(f"Could not find UUID for user {ctx.author.name} ({user_id}) on {platform}. Quote not removed.")
utility.wfetl()
return "Could not remove quote. Your user data is missing from the system."
@ -562,7 +564,7 @@ async def retrieve_specific_quote(db_conn, ctx, quote_id, is_discord):
# Lookup UUID from users table for the quoter
user_data = db.lookup_user(db_conn, identifier=quotee, identifier_type="UUID")
if not user_data:
globals.log(f"Could not find platform name for quotee UUID {quotee}. Default to 'Unknown'", "INFO")
logger.info(f"Could not find platform name for quotee UUID {quotee}. Default to 'Unknown'")
quotee_display = "Unknown"
else:
quotee_display = user_data[f"{platform}_user_display_name"]
@ -571,7 +573,7 @@ async def retrieve_specific_quote(db_conn, ctx, quote_id, is_discord):
# Lookup UUID for removed_by if removed
removed_user_data = db.lookup_user(db_conn, identifier=quote_removed_by, identifier_type="UUID")
if not removed_user_data:
globals.log(f"Could not find platform name for remover UUID {quote_removed_by}. Default to 'Unknown'", "INFO")
logger.warning(f"Could not find platform name for remover UUID {quote_removed_by}. Default to 'Unknown'")
quote_removed_by_display = "Unknown"
else:
quote_removed_by_display = removed_user_data[f"{platform}_user_display_name"]
@ -698,7 +700,7 @@ async def retrieve_quote_info(db_conn, ctx, quote_id, is_discord):
# Lookup display name for the quoter
user_data = db.lookup_user(db_conn, identifier=quotee, identifier_type="UUID")
if not user_data:
globals.log(f"Could not find display name for quotee UUID {quotee}.", "INFO")
logger.info(f"Could not find display name for quotee UUID {quotee}.")
quotee_display = "Unknown"
else:
# Use display name or fallback to platform username
@ -716,7 +718,7 @@ async def retrieve_quote_info(db_conn, ctx, quote_id, is_discord):
if quote_removed_by:
removed_user_data = db.lookup_user(db_conn, identifier=quote_removed_by, identifier_type="UUID")
if not removed_user_data:
globals.log(f"Could not find display name for remover UUID {quote_removed_by}.", "INFO")
logger.info(f"Could not find display name for remover UUID {quote_removed_by}.")
quote_removed_by_display = "Unknown"
else:
# Use display name or fallback to platform username
@ -782,7 +784,7 @@ async def search_quote(db_conn, keywords, is_discord, ctx):
func_end = time.time()
func_elapsed = utility.time_since(func_start, func_end, "s")
dbg_lvl = "DEBUG" if func_elapsed[2] < 5 else "WARNING"
globals.log(f"search_quote took {func_elapsed[0]}, {func_elapsed[1]} to complete", dbg_lvl)
logger.info(f"search_quote took {func_elapsed[0]}, {func_elapsed[1]} to complete", dbg_lvl)
utility.wfetl()
return "No quotes are created yet."
best_score = None
@ -801,7 +803,7 @@ async def search_quote(db_conn, keywords, is_discord, ctx):
# Use display name or fallback to platform username
quotee_display = user_data.get("platform_display_name", user_data.get("platform_username", "Unknown"))
else:
globals.log(f"Could not find display name for quotee UUID {quotee}.", "INFO")
logger.info(f"Could not find display name for quotee UUID {quotee}.")
quotee_display = "Unknown"
# For each keyword, check each field.
# Award 2 points for a whole word match and 1 point for a partial match.
@ -830,14 +832,14 @@ async def search_quote(db_conn, keywords, is_discord, ctx):
func_end = time.time()
func_elapsed = utility.time_since(func_start, func_end)
dbg_lvl = "DEBUG" if func_elapsed[2] < 5 else "WARNING"
globals.log(f"search_quote took {func_elapsed[0]}, {func_elapsed[1]} to complete", dbg_lvl)
logger.info(f"search_quote took {func_elapsed[0]}, {func_elapsed[1]} to complete", dbg_lvl)
utility.wfetl()
return "No matching quotes found."
chosen = random.choice(best_quotes)
func_end = time.time()
func_elapsed = utility.time_since(func_start, func_end, "s")
dbg_lvl = "DEBUG" if func_elapsed[2] < 5 else "WARNING"
globals.log(f"search_quote took {func_elapsed[0]}, {func_elapsed[1]} to complete", dbg_lvl)
logger.info(f"search_quote took {func_elapsed[0]}, {func_elapsed[1]} to complete", dbg_lvl)
utility.wfetl()
return f"Quote {chosen[0]}: {chosen[1]}"

View File

@ -17,6 +17,8 @@ import discord
from discord.ext import commands
import datetime
import globals
from globals import logger
from modules.permissions import has_custom_vc_permission
class CustomVCCog(commands.Cog):
@ -88,9 +90,9 @@ class CustomVCCog(commands.Cog):
else:
try:
await self.bot.invoke(ctx) # This will ensure subcommands get processed.
globals.log(f"{ctx.author.name} executed Custom VC subcommand '{ctx.invoked_subcommand}' in {ctx.channel.name}", "DEBUG")
logger.debug(f"{ctx.author.name} executed Custom VC subcommand '{ctx.invoked_subcommand}' in {ctx.channel.name}")
except Exception as e:
globals.log(f"'customvc {ctx.invoked_subcommand}' failed to execute: {e}", "ERROR")
logger.error(f"'customvc {ctx.invoked_subcommand}' failed to execute: {e}")
# Subcommands:
@ -165,7 +167,7 @@ class CustomVCCog(commands.Cog):
category = member.guild.get_channel(self.CUSTOM_VC_CATEGORY_ID)
if not category or not isinstance(category, discord.CategoryChannel):
globals.log("Could not find a valid custom VC category.", "INFO")
logger.info("Could not find a valid custom VC category.")
return
new_vc = await category.create_voice_channel(name=vc_name)
@ -276,7 +278,7 @@ class CustomVCCog(commands.Cog):
await ch.delete()
except:
pass
globals.log(f"Deleted empty leftover channel: {ch.name}", "INFO")
logger.info(f"Deleted empty leftover channel: {ch.name}")
else:
# pick first occupant
first = ch.members[0]
@ -289,7 +291,7 @@ class CustomVCCog(commands.Cog):
"user_limit": None,
"bitrate": None,
}
globals.log(f"Assigned {first.display_name} as owner of leftover VC: {ch.name}", "INFO")
logger.info(f"Assigned {first.display_name} as owner of leftover VC: {ch.name}")
if hasattr(ch, "send"):
try:
await ch.send(

View File

@ -3,6 +3,8 @@
import discord
from discord.ext import commands
import globals
from globals import logger
import cmd_common.common_commands as cc
class QuoteCog(commands.Cog):
@ -19,7 +21,7 @@ class QuoteCog(commands.Cog):
return
args = arg_str.split() if arg_str else []
globals.log(f"'quote' command initiated with arguments: {args}", "DEBUG")
logger.debug(f"'quote' command initiated with arguments: {args}")
result = await cc.handle_quote_command(
db_conn=globals.init_db_conn,
@ -29,7 +31,7 @@ class QuoteCog(commands.Cog):
game_name=None
)
globals.log(f"'quote' result: {result}", "DEBUG")
logger.debug(f"'quote' result: {result}")
if hasattr(result, "to_dict"):
await ctx.reply(embed=result)
else:

View File

@ -72,150 +72,205 @@ def load_settings_file(file: str):
file_path = os.path.join(SETTINGS_PATH, file)
if not os.path.exists(file_path):
log(f"Unable to read the settings file {file}!", "FATAL")
logger.fatal(f"Unable to read the settings file {file}!")
try:
with open(file_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
return config_data
except json.JSONDecodeError as e:
log(f"Error parsing {file}: {e}", "FATAL")
logger.fatal(f"Error parsing {file}: {e}")
###############################
# Simple Logging System
###############################
def log(message: str, level="INFO", exec_info=False, linebreaks=False):
class Logger:
"""
Log a message with the specified log level.
Capable of logging individual levels to the terminal and/or logfile separately.
Can also append traceback information if needed, and is capable of preserving/removing
linebreaks from log messages as needed. Also prepends the calling function name.
Args:
message (str): The message to log.
level (str, optional): Log level of the message. Defaults to "INFO".
exec_info (bool, optional): If True, append traceback information. Defaults to False.
linebreaks (bool, optional): If True, preserve line breaks in the log. Defaults to False.
Available levels:
DEBUG - Information useful for debugging.
INFO - Informational messages.
WARNING - Something happened that may lead to issues.
ERROR - A non-critical error has occurred.
CRITICAL - A critical, but non-fatal, error occurred.
FATAL - Fatal error; program exits after logging this.
RESTART - Graceful restart.
SHUTDOWN - Graceful exit.
See:
config.json for further configuration options under "logging".
Example:
log("An error occured during processing", "ERROR", exec_info=True, linebreaks=False)
Custom logger class to handle different log levels, terminal & file output,
and system events (FATAL, RESTART, SHUTDOWN).
"""
# Hard-coded options/settings (can be expanded as needed)
default_level = "INFO" # Fallback log level
allowed_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL", "RESTART", "SHUTDOWN"}
# Ensure valid level
if level not in allowed_levels:
level = default_level
def __init__(self):
"""
Initializes the Logger with configurations from `config_data`.
"""
self.default_level = "INFO"
self.log_file_path = config_data["logging"]["logfile_path"]
self.current_log_file_path = f"cur_{self.log_file_path}"
self.log_to_terminal = config_data["logging"]["terminal"]["log_to_terminal"]
self.log_to_file = config_data["logging"]["file"]["log_to_file"]
self.enabled_log_levels = set(config_data["logging"]["log_levels"])
# Capture the calling function's name using inspect
try:
caller_frame = inspect.stack()[1]
caller_name = caller_frame.function
except Exception:
caller_name = "Unknown"
# Check if both logging outputs are disabled
if not self.log_to_terminal and not self.log_to_file:
print("!!! WARNING !!! LOGGING DISABLED !!! NO LOGS WILL BE PROVIDED !!!")
# Optionally remove linebreaks if not desired
if not linebreaks:
message = message.replace("\n", " ")
def log(self, message: str, level="INFO", exec_info=False, linebreaks=False):
"""
Logs a message at the specified log level.
# Get current timestamp and uptime
elapsed = time.time() - get_bot_start_time() # Assuming this function is defined elsewhere
from modules import utility
uptime_str, _ = utility.format_uptime(elapsed)
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
Args:
message (str): The message to log.
level (str, optional): Log level. Defaults to "INFO".
exec_info (bool, optional): Append traceback information if True. Defaults to False.
linebreaks (bool, optional): Preserve line breaks if True. Defaults to False.
"""
from modules.utility import format_uptime
# Prepend dynamic details including the caller name
log_message = f"[{timestamp} - {uptime_str}] [{level}] [Func: {caller_name}] {message}"
level = level.upper()
if level not in self.allowed_levels:
level = self.default_level
# Append traceback if required or for error levels
if exec_info or level in {"ERROR", "CRITICAL", "FATAL"}:
log_message += f"\n{traceback.format_exc()}"
# Capture calling function name
caller_name = self.get_caller_function()
# Read logging settings from the configuration
lfp = config_data["logging"]["logfile_path"] # Log File Path
clfp = f"cur_{lfp}" # Current Log File Path
# Remove line breaks if required
if not linebreaks:
message = message.replace("\n", " ")
if not (config_data["logging"]["terminal"]["log_to_terminal"] or
config_data["logging"]["file"]["log_to_file"]):
print("!!! WARNING !!! CONSOLE AND LOGFILE OUTPUT DISABLED !!!\n"
"!!! NO LOGS WILL BE PROVIDED !!!")
# Timestamp & uptime
elapsed = time.time() - get_bot_start_time() # Assuming this function exists
uptime_str, _ = format_uptime(elapsed)
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
# Check if this log level is enabled (or if it's a FATAL message which always prints)
if level in config_data["logging"]["log_levels"] or level == "FATAL":
# Terminal output
if config_data["logging"]["terminal"]["log_to_terminal"] or level == "FATAL":
config_level_format = f"log_{level.lower()}"
if config_data["logging"]["terminal"].get(config_level_format, False) or level == "FATAL":
# Format log message
log_message = f"[{timestamp} - {uptime_str}] [{level}] [Func: {caller_name}] {message}"
# Append traceback if needed
if exec_info or level in {"ERROR", "CRITICAL", "FATAL"}:
log_message += f"\n{traceback.format_exc()}"
# Log to terminal
self._log_to_terminal(log_message, level)
# Log to file
self._log_to_file(log_message, level)
# Handle special log levels
self._handle_special_levels(level)
def _log_to_terminal(self, log_message: str, level: str):
"""
Outputs log messages to the terminal if enabled.
"""
if self.log_to_terminal or level == "FATAL":
if config_data["logging"]["terminal"].get(f"log_{level.lower()}", False) or level == "FATAL":
print(log_message)
# File output
if config_data["logging"]["file"]["log_to_file"] or level == "FATAL":
config_level_format = f"log_{level.lower()}"
if config_data["logging"]["file"].get(config_level_format, False) or level == "FATAL":
def _log_to_file(self, log_message: str, level: str):
"""
Writes log messages to a file if enabled.
"""
if self.log_to_file or level == "FATAL":
if config_data["logging"]["file"].get(f"log_{level.lower()}", False) or level == "FATAL":
try:
with open(lfp, "a", encoding="utf-8") as logfile:
with open(self.log_file_path, "a", encoding="utf-8") as logfile:
logfile.write(f"{log_message}\n")
logfile.flush()
with open(clfp, "a", encoding="utf-8") as c_logfile:
with open(self.current_log_file_path, "a", encoding="utf-8") as c_logfile:
c_logfile.write(f"{log_message}\n")
c_logfile.flush()
except Exception as e:
print(f"[WARNING] Failed to write to logfile: {e}")
# Handle fatal errors with shutdown
if level == "FATAL":
print("!!! FATAL ERROR LOGGED, SHUTTING DOWN !!!")
sys.exit(1)
def _handle_special_levels(self, level: str):
"""
Handles special log levels like FATAL, RESTART, and SHUTDOWN.
"""
if level == "FATAL":
print("!!! FATAL ERROR LOGGED, SHUTTING DOWN !!!")
sys.exit(1)
if level == "RESTART":
print("!!! RESTART LOG LEVEL TRIGGERED, EXITING!!!")
sys.exit(0)
if level == "RESTART":
print("!!! RESTART LOG LEVEL TRIGGERED, EXITING !!!")
sys.exit(0)
if level == "SHUTDOWN":
print("!!! SHUTDOWN LOG LEVEL TRIGGERED, EXITING!!!")
sys.exit(0)
if level == "SHUTDOWN":
print("!!! SHUTDOWN LOG LEVEL TRIGGERED, EXITING !!!")
sys.exit(0)
def reset_curlogfile():
"""
Clear the current log file.
def get_caller_function(self):
"""
Retrieves the calling function's name using `inspect`.
"""
try:
caller_frame = inspect.stack()[2]
return caller_frame.function
except Exception:
return "Unknown"
This function constructs the current log file path by prepending 'cur_'
to the log file path specified in the configuration data under the "logging"
section. It then opens the file in write mode, effectively truncating and
clearing its contents.
def reset_curlogfile(self):
"""
Clears the current log file.
"""
try:
open(self.current_log_file_path, "w").close()
except Exception as e:
print(f"[WARNING] Failed to clear current-run logfile: {e}")
If an exception occurs while attempting to clear the log file, the error is
silently ignored.
"""
# Initiate logfile
lfp = config_data["logging"]["logfile_path"] # Log File Path
clfp = f"cur_{lfp}" # Current Log File Path
## Shorter, cleaner methods for each log level
def debug(self, message: str, exec_info=False, linebreaks=False):
"""
Debug message for development troubleshooting.
"""
self.log(message, "DEBUG", exec_info, linebreaks)
try:
open(clfp, "w")
# log(f"Current-run logfile cleared", "DEBUG")
except Exception as e:
# log(f"Failed to clear current-run logfile: {e}")
pass
def info(self, message: str, exec_info=False, linebreaks=False):
"""
General informational messages.
"""
self.log(message, "INFO", exec_info, linebreaks)
def warning(self, message: str, exec_info=False, linebreaks=False):
"""
Warning messages.
Something unusal happened, but shouldn't affect the program overall.
"""
self.log(message, "WARNING", exec_info, linebreaks)
def error(self, message: str, exec_info=True, linebreaks=False):
"""
Error messages.
Something didn't execute properly, but the bot overall should manage.
"""
self.log(message, "ERROR", exec_info, linebreaks)
def critical(self, message: str, exec_info=True, linebreaks=False):
"""
Critical error messages.
Something happened that is very likely to cause unintended behaviour and potential crashes.
"""
self.log(message, "CRITICAL", exec_info, linebreaks)
def fatal(self, message: str, exec_info=True, linebreaks=False):
"""
Fatal error messages.
Something happened that requires the bot to shut down somewhat nicely.
"""
self.log(message, "FATAL", exec_info, linebreaks)
def restart(self, message: str):
"""
Indicate bot restart log event.
"""
self.log(message, "RESTART")
def shutdown(self, message: str):
"""
Indicate bot shutdown log event.
"""
self.log(message, "SHUTDOWN")
# Instantiate Logger globally
logger = Logger()
#
#
#
def init_db_conn():
"""
@ -238,11 +293,11 @@ def init_db_conn():
db_conn = modules.db.init_db_connection(config_data)
if not db_conn:
# If we get None, it means a fatal error occurred.
log("Terminating bot due to no DB connection.", "FATAL")
logger.fatal("Terminating bot due to no DB connection.")
sys.exit(1)
return db_conn
except Exception as e:
log(f"Unable to initialize database!: {e}", "FATAL")
logger.fatal(f"Unable to initialize database!: {e}")
return None
class Constants:
@ -278,7 +333,7 @@ class Constants:
primary_guild_int = None
if not sync_commands_globally:
log("Discord commands sync set to single-guild in config")
logger.info("Discord commands sync set to single-guild in config")
primary_guild_int = int(config_data["discord_guilds"][0]) if len(config_data["discord_guilds"]) > 0 else None
if primary_guild_int:
primary_guild_object = discord.Object(id=primary_guild_int)

View File

@ -6,6 +6,7 @@ import sqlite3
import uuid
import globals
from globals import logger
try:
import mariadb
@ -25,12 +26,12 @@ def checkenable_db_fk(db_conn):
cursor.execute("PRAGMA foreign_keys = ON;")
cursor.close()
db_conn.commit()
globals.log("Enabled foreign key support in SQLite (PRAGMA foreign_keys=ON).", "DEBUG")
logger.debug("Enabled foreign key support in SQLite (PRAGMA foreign_keys=ON).")
except Exception as e:
globals.log(f"Failed to enable foreign key support in SQLite: {e}", "WARNING")
logger.warning(f"Failed to enable foreign key support in SQLite: {e}")
else:
# For MariaDB/MySQL, they're typically enabled with InnoDB
globals.log("Assuming DB is MariaDB/MySQL with FKs enabled", "DEBUG")
logger.debug("Assuming DB is MariaDB/MySQL with FKs enabled")
def init_db_connection(config):
@ -65,27 +66,27 @@ def init_db_connection(config):
port=port
)
conn.autocommit = False # We'll manage commits manually
globals.log(f"Database connection established using MariaDB (host={host}, db={dbname}).")
logger.info(f"Database connection established using MariaDB (host={host}, db={dbname}).")
return conn
except mariadb.Error as e:
globals.log(f"Error connecting to MariaDB: {e}", "WARNING")
logger.warning(f"Error connecting to MariaDB: {e}")
else:
globals.log("MariaDB config incomplete. Falling back to SQLite...", "WARNING")
logger.warning("MariaDB config incomplete. Falling back to SQLite...")
else:
if use_mariadb and mariadb is None:
globals.log("mariadb module not installed but use_mariadb=True. Falling back to SQLite...", "WARNING")
logger.warning("mariadb module not installed but use_mariadb=True. Falling back to SQLite...")
# Fallback to local SQLite
sqlite_path = db_settings.get("sqlite_path", "local_database.sqlite")
try:
conn = sqlite3.connect(sqlite_path)
globals.log(f"Database connection established using local SQLite: {sqlite_path}", "DEBUG")
logger.debug(f"Database connection established using local SQLite: {sqlite_path}")
return conn
except sqlite3.Error as e:
globals.log(f"Could not open local SQLite database '{sqlite_path}': {e}", "WARNING")
logger.warning(f"Could not open local SQLite database '{sqlite_path}': {e}")
# If neither MariaDB nor SQLite connected, that's fatal for the bot
globals.log("No valid database connection could be established! Exiting...", "FATAL")
logger.fatal("No valid database connection could be established! Exiting...")
return None
@ -105,8 +106,8 @@ def run_db_operation(conn, operation, query, params=None):
- Always use parameterized queries wherever possible to avoid injection.
"""
if conn is None:
if globals.log:
globals.log("run_db_operation called but no valid DB connection!", "FATAL")
if logger:
logger.fatal("run_db_operation called but no valid DB connection!")
return None
if params is None:
@ -118,18 +119,18 @@ def run_db_operation(conn, operation, query, params=None):
# Check for multiple statements separated by semicolons (beyond the last one)
if lowered.count(";") > 1:
if globals.log:
globals.log("Query blocked: multiple SQL statements detected.", "WARNING")
globals.log(f"Offending query: {query}", "WARNING")
if logger:
logger.warning("Query blocked: multiple SQL statements detected.")
logger.warning(f"Offending query: {query}")
return None
# Potentially dangerous SQL keywords
forbidden_keywords = ["drop table", "union select", "exec ", "benchmark(", "sleep("]
for kw in forbidden_keywords:
if kw in lowered:
if globals.log:
globals.log(f"Query blocked due to forbidden keyword: '{kw}'", "WARNING")
globals.log(f"Offending query: {query}", "WARNING")
if logger:
logger.warning(f"Query blocked due to forbidden keyword: '{kw}'")
logger.warning(f"Offending query: {query}")
return None
cursor = conn.cursor()
@ -140,15 +141,15 @@ def run_db_operation(conn, operation, query, params=None):
write_ops = ("write", "insert", "update", "delete", "change")
if operation.lower() in write_ops:
conn.commit()
if globals.log:
globals.log(f"DB operation '{operation}' committed.", "DEBUG")
if logger:
logger.debug(f"DB operation '{operation}' committed.")
# If the query is an INSERT, return the last inserted row ID
if query.strip().lower().startswith("insert"):
try:
return cursor.lastrowid
except Exception as e:
if globals.log:
globals.log(f"Error retrieving lastrowid: {e}", "ERROR")
if logger:
logger.error(f"Error retrieving lastrowid: {e}")
return cursor.rowcount
else:
return cursor.rowcount
@ -163,8 +164,8 @@ def run_db_operation(conn, operation, query, params=None):
except Exception as e:
# Rollback on any error
conn.rollback()
if globals.log:
globals.log(f"Error during '{operation}' query execution: {e}", "ERROR")
if logger:
logger.error(f"Error during '{operation}' query execution: {e}")
return None
finally:
cursor.close()
@ -204,11 +205,11 @@ def ensure_quotes_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
# The table 'quotes' already exists
globals.log("Table 'quotes' already exists, skipping creation.", "DEBUG")
logger.debug("Table 'quotes' already exists, skipping creation.")
return # We can just return
# 3) Table does NOT exist => create it
globals.log("Table 'quotes' does not exist; creating now...")
logger.info("Table 'quotes' does not exist; creating now...")
if is_sqlite:
create_table_sql = """
@ -247,10 +248,10 @@ def ensure_quotes_table(db_conn):
if result is None:
# If run_db_operation returns None on error, handle or raise:
error_msg = "Failed to create 'quotes' table!"
globals.log(error_msg, "CRITICAL")
logger.critical(error_msg)
raise RuntimeError(error_msg)
globals.log("Successfully created table 'quotes'.")
logger.info("Successfully created table 'quotes'.")
#######################
# Ensure 'users' table
@ -272,7 +273,7 @@ def ensure_users_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0]:
globals.log("Table 'Users' already exists, checking for column updates.", "DEBUG")
logger.debug("Table 'Users' already exists, checking for column updates.")
# Ensure 'last_seen' column exists
column_check_sql = "PRAGMA table_info(Users)" if is_sqlite else """
@ -281,7 +282,7 @@ def ensure_users_table(db_conn):
"""
columns = run_db_operation(db_conn, "read", column_check_sql)
if not any("last_seen" in col for col in columns):
globals.log("Adding 'last_seen' column to 'Users'...", "INFO")
logger.info("Adding 'last_seen' column to 'Users'...")
alter_sql = "ALTER TABLE Users ADD COLUMN last_seen TEXT DEFAULT CURRENT_TIMESTAMP" if is_sqlite else """
ALTER TABLE Users ADD COLUMN last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
"""
@ -289,7 +290,7 @@ def ensure_users_table(db_conn):
return
globals.log("Table 'Users' does not exist; creating now...", "INFO")
logger.info("Table 'Users' does not exist; creating now...")
create_sql = """
CREATE TABLE Users (
@ -314,10 +315,10 @@ def ensure_users_table(db_conn):
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
error_msg = "Failed to create 'Users' table!"
globals.log(error_msg, "CRITICAL")
logger.critical(error_msg)
raise RuntimeError(error_msg)
globals.log("Successfully created table 'Users'.", "INFO")
logger.info("Successfully created table 'Users'.")
#######################
@ -347,7 +348,7 @@ def ensure_platform_mapping_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
globals.log("Table 'Platform_Mapping' already exists, checking for column updates.", "DEBUG")
logger.debug("Table 'Platform_Mapping' already exists, checking for column updates.")
# Check if last_seen column exists
column_check_sql = """
@ -360,7 +361,7 @@ def ensure_platform_mapping_table(db_conn):
# If column doesn't exist, add it
if not any("last_seen" in col for col in columns):
globals.log("Adding 'last_seen' column to 'Platform_Mapping'...", "INFO")
logger.info("Adding 'last_seen' column to 'Platform_Mapping'...")
alter_sql = """
ALTER TABLE Platform_Mapping ADD COLUMN last_seen TEXT DEFAULT CURRENT_TIMESTAMP
""" if is_sqlite else """
@ -370,7 +371,7 @@ def ensure_platform_mapping_table(db_conn):
return
globals.log("Table 'Platform_Mapping' does not exist; creating now...", "INFO")
logger.info("Table 'Platform_Mapping' does not exist; creating now...")
if is_sqlite:
create_sql = """
@ -402,10 +403,10 @@ def ensure_platform_mapping_table(db_conn):
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
error_msg = "Failed to create 'Platform_Mapping' table!"
globals.log(error_msg, "CRITICAL")
logger.critical(error_msg)
raise RuntimeError(error_msg)
globals.log("Successfully created table 'Platform_Mapping'.", "INFO")
logger.info("Successfully created table 'Platform_Mapping'.")
########################
@ -443,7 +444,7 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
PRINT_QUERY_DEBUG = False
# Debug: Log the inputs
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() called with: identifier='{identifier}', identifier_type='{identifier_type}', target_identifier='{target_identifier}'", "DEBUG")
if PRINT_QUERY_DEBUG: logger.debug(f"lookup_user() called with: identifier='{identifier}', identifier_type='{identifier_type}', target_identifier='{target_identifier}'")
# Normalize identifier_type to lowercase
identifier_type = identifier_type.lower()
@ -471,7 +472,7 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
}
if identifier_type not in valid_identifier_types:
globals.log(f"lookup_user error: invalid identifier_type '{identifier_type}'", "WARNING")
logger.warning(f"lookup_user error: invalid identifier_type '{identifier_type}'")
return None
column_to_lookup = valid_identifier_types[identifier_type]
@ -504,17 +505,17 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
query += " LIMIT 1"
# Debug: Log the query and parameters
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() executing query: {query} with params={params}", "DEBUG")
if PRINT_QUERY_DEBUG: logger.debug(f"lookup_user() executing query: {query} with params={params}")
# Run the query
rows = run_db_operation(db_conn, "read", query, tuple(params))
# Debug: Log the result of the query
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() query result: {rows}", "DEBUG")
if PRINT_QUERY_DEBUG: logger.debug(f"lookup_user() query result: {rows}")
# Handle no result case
if not rows:
globals.log(f"lookup_user: No user found for {identifier_type}='{identifier}'", "INFO")
logger.info(f"lookup_user: No user found for {identifier_type}='{identifier}'")
return None
# Convert the row to a dictionary
@ -532,19 +533,19 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
}
# Debug: Log the constructed user data
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() constructed user_data: {user_data}", "DEBUG")
if PRINT_QUERY_DEBUG: logger.debug(f"lookup_user() constructed user_data: {user_data}")
# If target_identifier is provided, return just that value
if target_identifier:
target_identifier = target_identifier.lower()
if target_identifier in user_data:
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() returning target_identifier='{target_identifier}' with value='{user_data[target_identifier]}'", "DEBUG")
if PRINT_QUERY_DEBUG: logger.debug(f"lookup_user() returning target_identifier='{target_identifier}' with value='{user_data[target_identifier]}'")
return user_data[target_identifier]
else:
globals.log(f"lookup_user error: target_identifier '{target_identifier}' not found in user_data. Available keys: {list(user_data.keys())}", "WARNING")
logger.warning(f"lookup_user error: target_identifier '{target_identifier}' not found in user_data. Available keys: {list(user_data.keys())}")
return None
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() returning full user_data: {user_data}", "DEBUG")
if PRINT_QUERY_DEBUG: logger.debug(f"lookup_user() returning full user_data: {user_data}")
return user_data
def user_lastseen(db_conn, UUID: str, platform_name: str = None, platform_user_id: str | int = None, lookup: bool = False, update: bool = False):
@ -557,7 +558,7 @@ def user_lastseen(db_conn, UUID: str, platform_name: str = None, platform_user_i
- Otherwise, it applies to all accounts unified under the UUI system.
"""
if not UUID:
globals.log("UUID is required for user_lastseen()", "ERROR")
logger.error("UUID is required for user_lastseen()")
return None
if lookup:
@ -579,7 +580,7 @@ def user_lastseen(db_conn, UUID: str, platform_name: str = None, platform_user_i
params = (UUID,) if not platform_name or not platform_user_id else (UUID, platform_name, str(platform_user_id))
run_db_operation(db_conn, "write", update_sql, params)
globals.log(f"Updated last_seen timestamp for UUID={UUID}", "DEBUG")
logger.debug(f"Updated last_seen timestamp for UUID={UUID}")
if lookup:
if result and result[0]:
@ -609,11 +610,11 @@ def ensure_chatlog_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
globals.log("Table 'chat_log' already exists, skipping creation.", "DEBUG")
logger.debug("Table 'chat_log' already exists, skipping creation.")
return
# Table does not exist, create it
globals.log("Table 'chat_log' does not exist; creating now...", "INFO")
logger.info("Table 'chat_log' does not exist; creating now...")
create_sql = """
CREATE TABLE chat_log (
@ -644,10 +645,10 @@ def ensure_chatlog_table(db_conn):
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
error_msg = "Failed to create 'chat_log' table!"
globals.log(error_msg, "CRITICAL")
logger.critical(error_msg)
raise RuntimeError(error_msg)
globals.log("Successfully created table 'chat_log'.", "INFO")
logger.info("Successfully created table 'chat_log'.")
@ -663,7 +664,7 @@ def log_message(db_conn, identifier, identifier_type, message_content, platform,
# Get UUID using lookup_user
user_data = lookup_user(db_conn, identifier, identifier_type)
if not user_data:
globals.log(f"User not found for {identifier_type}='{identifier}'", "WARNING")
logger.warning(f"User not found for {identifier_type}='{identifier}'")
return
user_uuid = user_data["uuid"]
@ -673,7 +674,7 @@ def log_message(db_conn, identifier, identifier_type, message_content, platform,
requires_message_id = platform.startswith("discord") or platform == "twitch"
if requires_message_id and not platform_message_id:
globals.log(f"Warning: Platform '{platform}' usually requires a message ID, but none was provided.", "WARNING")
logger.warning(f"Warning: Platform '{platform}' usually requires a message ID, but none was provided.")
if attachments is None or not "https://" in attachments:
attachments = ""
@ -693,9 +694,9 @@ def log_message(db_conn, identifier, identifier_type, message_content, platform,
rowcount = run_db_operation(db_conn, "write", insert_sql, params)
if rowcount and rowcount > 0:
globals.log(f"Logged message for UUID={user_uuid} with Message UUID={message_uuid}.", "DEBUG")
logger.debug(f"Logged message for UUID={user_uuid} with Message UUID={message_uuid}.")
else:
globals.log("Failed to log message in 'chat_log'.", "ERROR")
logger.error("Failed to log message in 'chat_log'.")
def ensure_userhowls_table(db_conn):
@ -723,10 +724,10 @@ def ensure_userhowls_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
globals.log("Table 'user_howls' already exists, skipping creation.", "DEBUG")
logger.debug("Table 'user_howls' already exists, skipping creation.")
return
globals.log("Table 'user_howls' does not exist; creating now...", "INFO")
logger.info("Table 'user_howls' does not exist; creating now...")
if is_sqlite:
create_sql = """
@ -752,10 +753,10 @@ def ensure_userhowls_table(db_conn):
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
err_msg = "Failed to create 'user_howls' table!"
globals.log(err_msg, "ERROR")
logger.error(err_msg)
raise RuntimeError(err_msg)
globals.log("Successfully created table 'user_howls'.", "INFO")
logger.info("Successfully created table 'user_howls'.")
def insert_howl(db_conn, user_uuid, howl_value):
"""
@ -769,9 +770,9 @@ def insert_howl(db_conn, user_uuid, howl_value):
params = (user_uuid, howl_value)
rowcount = run_db_operation(db_conn, "write", sql, params)
if rowcount and rowcount > 0:
globals.log(f"Recorded a {howl_value}% howl for UUID={user_uuid}.", "DEBUG")
logger.debug(f"Recorded a {howl_value}% howl for UUID={user_uuid}.")
else:
globals.log(f"Failed to record {howl_value}% howl for UUID={user_uuid}.", "ERROR")
logger.error(f"Failed to record {howl_value}% howl for UUID={user_uuid}.")
def get_howl_stats(db_conn, user_uuid):
"""
@ -849,10 +850,10 @@ def ensure_discord_activity_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0]:
globals.log("Table 'discord_activity' already exists, skipping creation.", "DEBUG")
logger.debug("Table 'discord_activity' already exists, skipping creation.")
return
globals.log("Creating 'discord_activity' table...", "INFO")
logger.info("Creating 'discord_activity' table...")
if is_sqlite:
create_sql = """
@ -892,12 +893,12 @@ def ensure_discord_activity_table(db_conn):
try:
result = run_db_operation(db_conn, "write", create_sql)
except Exception as e:
globals.log(f"Unable to create the table: discord_activity: {e}")
logger.error(f"Unable to create the table: discord_activity: {e}")
if result is None:
globals.log("Failed to create 'discord_activity' table!", "CRITICAL")
logger.critical("Failed to create 'discord_activity' table!")
raise RuntimeError("Database table creation failed.")
globals.log("Successfully created table 'discord_activity'.", "INFO")
logger.info("Successfully created table 'discord_activity'.")
def log_discord_activity(db_conn, guild_id, user_identifier, action, voice_channel, action_detail=None):
@ -908,7 +909,7 @@ def log_discord_activity(db_conn, guild_id, user_identifier, action, voice_chann
# Resolve UUID using the new Platform_Mapping
user_data = lookup_user(db_conn, user_identifier, identifier_type="discord_user_id")
if not user_data:
globals.log(f"User not found for Discord ID: {user_identifier}", "WARNING")
logger.warning(f"User not found for Discord ID: {user_identifier}")
return
user_uuid = user_data["uuid"]
@ -942,7 +943,7 @@ def log_discord_activity(db_conn, guild_id, user_identifier, action, voice_chann
try:
dt = datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
except Exception as e:
globals.log(f"Error parsing datetime '{dt_str}': {e}", "ERROR")
logger.error(f"Error parsing datetime '{dt_str}': {e}")
continue
normalized_existing = normalize_detail(detail)
if normalized_existing == normalized_new:
@ -956,7 +957,7 @@ def log_discord_activity(db_conn, guild_id, user_identifier, action, voice_chann
if last_same is not None:
if (last_different is None) or (last_same > last_different):
if now - last_same < DUPLICATE_THRESHOLD:
globals.log(f"Duplicate {action} event for {user_uuid} within threshold; skipping log.", "DEBUG")
logger.debug(f"Duplicate {action} event for {user_uuid} within threshold; skipping log.")
return
# Insert the new event
@ -972,9 +973,9 @@ def log_discord_activity(db_conn, guild_id, user_identifier, action, voice_chann
if rowcount and rowcount > 0:
detail_str = f" ({action_detail})" if action_detail else ""
globals.log(f"Logged Discord activity for UUID={user_uuid} in Guild {guild_id}: {action}{detail_str}", "DEBUG")
logger.debug(f"Logged Discord activity for UUID={user_uuid} in Guild {guild_id}: {action}{detail_str}")
else:
globals.log("Failed to log Discord activity.", "ERROR")
logger.error("Failed to log Discord activity.")
def ensure_bot_events_table(db_conn):
@ -991,10 +992,10 @@ def ensure_bot_events_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0]:
globals.log("Table 'bot_events' already exists, skipping creation.", "DEBUG")
logger.debug("Table 'bot_events' already exists, skipping creation.")
return
globals.log("Creating 'bot_events' table...", "INFO")
logger.info("Creating 'bot_events' table...")
# Define SQL Schema
create_sql = """
@ -1016,10 +1017,10 @@ def ensure_bot_events_table(db_conn):
# Create the table
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
globals.log("Failed to create 'bot_events' table!", "CRITICAL")
logger.critical("Failed to create 'bot_events' table!")
raise RuntimeError("Database table creation failed.")
globals.log("Successfully created table 'bot_events'.", "INFO")
logger.info("Successfully created table 'bot_events'.")
def log_bot_event(db_conn, event_type, event_details):
"""
@ -1033,9 +1034,9 @@ def log_bot_event(db_conn, event_type, event_details):
rowcount = run_db_operation(db_conn, "write", sql, params)
if rowcount and rowcount > 0:
globals.log(f"Logged bot event: {event_type} - {event_details}", "DEBUG")
logger.debug(f"Logged bot event: {event_type} - {event_details}")
else:
globals.log("Failed to log bot event.", "ERROR")
logger.error("Failed to log bot event.")
def get_event_summary(db_conn, time_span="7d"):
"""
@ -1058,7 +1059,7 @@ def get_event_summary(db_conn, time_span="7d"):
}
if time_span not in time_mappings:
globals.log(f"Invalid time span '{time_span}', defaulting to '7d'", "WARNING")
logger.warning(f"Invalid time span '{time_span}', defaulting to '7d'")
time_span = "7d"
# Define SQL query
@ -1095,10 +1096,10 @@ def ensure_link_codes_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0]:
globals.log("Table 'link_codes' already exists, skipping creation.", "DEBUG")
logger.debug("Table 'link_codes' already exists, skipping creation.")
return
globals.log("Creating 'link_codes' table...", "INFO")
logger.info("Creating 'link_codes' table...")
create_sql = """
CREATE TABLE link_codes (
@ -1118,16 +1119,16 @@ def ensure_link_codes_table(db_conn):
result = run_db_operation(db_conn, "write", create_sql)
if result is None:
globals.log("Failed to create 'link_codes' table!", "CRITICAL")
logger.critical("Failed to create 'link_codes' table!")
raise RuntimeError("Database table creation failed.")
globals.log("Successfully created table 'link_codes'.", "INFO")
logger.info("Successfully created table 'link_codes'.")
def merge_uuid_data(db_conn, old_uuid, new_uuid):
"""
Merges data from old UUID to new UUID, updating references in Platform_Mapping.
"""
globals.log(f"Merging UUID data: {old_uuid} -> {new_uuid}", "INFO")
logger.info(f"Merging UUID data: {old_uuid} -> {new_uuid}")
# Update references in Platform_Mapping
update_mapping_sql = """
@ -1145,14 +1146,14 @@ def merge_uuid_data(db_conn, old_uuid, new_uuid):
for table in tables_to_update:
sql = f"UPDATE {table} SET UUID = ? WHERE UUID = ?"
rowcount = run_db_operation(db_conn, "update", sql, (new_uuid, old_uuid))
globals.log(f"Updated {rowcount} rows in {table} (transferred {old_uuid} -> {new_uuid})", "DEBUG")
logger.debug(f"Updated {rowcount} rows in {table} (transferred {old_uuid} -> {new_uuid})")
# Finally, delete the old UUID from Users table
delete_sql = "DELETE FROM Users WHERE UUID = ?"
rowcount = run_db_operation(db_conn, "write", delete_sql, (old_uuid,))
globals.log(f"Deleted old UUID {old_uuid} from 'Users' table ({rowcount} rows affected)", "INFO")
logger.info(f"Deleted old UUID {old_uuid} from 'Users' table ({rowcount} rows affected)")
globals.log(f"UUID merge complete: {old_uuid} -> {new_uuid}", "INFO")
logger.info(f"UUID merge complete: {old_uuid} -> {new_uuid}")
def ensure_community_events_table(db_conn):
@ -1181,10 +1182,10 @@ def ensure_community_events_table(db_conn):
from modules.db import run_db_operation
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
globals.log("Table 'community_events' already exists, skipping creation.", "DEBUG")
logger.debug("Table 'community_events' already exists, skipping creation.")
return
globals.log("Table 'community_events' does not exist; creating now...", "DEBUG")
logger.debug("Table 'community_events' does not exist; creating now...")
if is_sqlite:
create_table_sql = """
CREATE TABLE community_events (
@ -1212,9 +1213,9 @@ def ensure_community_events_table(db_conn):
result = run_db_operation(db_conn, "write", create_table_sql)
if result is None:
error_msg = "Failed to create 'community_events' table!"
globals.log(error_msg, "CRITICAL")
logger.critical(error_msg)
raise RuntimeError(error_msg)
globals.log("Successfully created table 'community_events'.", "DEBUG")
logger.debug("Successfully created table 'community_events'.")
async def handle_community_event(db_conn, is_discord, ctx, args):
@ -1247,7 +1248,7 @@ async def handle_community_event(db_conn, is_discord, ctx, args):
# Get UUID using lookup_user()
user_data = lookup_user(db_conn, identifier=user_id, identifier_type=f"{platform.lower()}_user_id")
if not user_data:
globals.log(f"User not found: {ctx.author.name} ({user_id}) on {platform}", "ERROR")
logger.error(f"User not found: {ctx.author.name} ({user_id}) on {platform}")
return "Could not log event: user data missing."
user_uuid = user_data["uuid"]
@ -1266,7 +1267,7 @@ async def handle_community_event(db_conn, is_discord, ctx, args):
result = run_db_operation(db_conn, "write", insert_sql, params)
if result is not None:
globals.log(f"New event added: {event_type} by {ctx.author.name}", "DEBUG")
logger.debug(f"New event added: {event_type} by {ctx.author.name}")
return f"Successfully logged event: {event_type}"
else:
return "Failed to log event."

View File

@ -5,6 +5,8 @@ import os
import globals
import discord
from globals import logger
PERMISSIONS_FILE = "permissions.json"
TWITCH_CONFIG_FILE = "settings/twitch_channels_config.json"
@ -20,7 +22,7 @@ def load_json_file(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
except json.JSONDecodeError as e:
globals.log(f"Error parsing JSON file {file_path}: {e}", "ERROR")
logger.error(f"Error parsing JSON file {file_path}: {e}")
return {}
def load_permissions():

View File

@ -14,6 +14,8 @@ from functools import wraps
import globals
from globals import logger
try:
# 'regex' on PyPI supports `\p{L}`, `\p{N}`, etc.
import regex
@ -469,7 +471,7 @@ def initialize_help_data(bot, help_json_path, is_discord):
platform_name = "Discord" if is_discord else "Twitch"
if not os.path.exists(help_json_path):
globals.log(f"Help file '{help_json_path}' not found. No help data loaded.", "WARNING")
logger.warning(f"Help file '{help_json_path}' not found. No help data loaded.")
bot.help_data = {}
return
@ -478,7 +480,7 @@ def initialize_help_data(bot, help_json_path, is_discord):
with open(help_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
globals.log(f"Error parsing help JSON '{help_json_path}': {e}", "ERROR")
logger.error(f"Error parsing help JSON '{help_json_path}': {e}")
data = {}
bot.help_data = data
@ -486,7 +488,7 @@ def initialize_help_data(bot, help_json_path, is_discord):
# Now cross-check the loaded commands vs. the data
loaded_cmds = set(get_loaded_commands(bot, is_discord))
if "commands" not in data:
globals.log(f"No 'commands' key in {help_json_path}, skipping checks.", "ERROR")
logger.error(f"No 'commands' key in {help_json_path}, skipping checks.")
return
file_cmds = set(data["commands"].keys())
@ -494,12 +496,12 @@ def initialize_help_data(bot, help_json_path, is_discord):
# 1) Commands in file but not loaded
missing_cmds = file_cmds - loaded_cmds
for cmd in missing_cmds:
globals.log(f"Help file has '{cmd}', but it's not loaded on this {platform_name} bot (deprecated?).", "WARNING")
logger.warning(f"Help file has '{cmd}', but it's not loaded on this {platform_name} bot (deprecated?).")
# 2) Commands loaded but not in file
needed_cmds = loaded_cmds - file_cmds
for cmd in needed_cmds:
globals.log(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING")
logger.warning(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.")
@ -530,9 +532,9 @@ def get_loaded_commands(bot, is_discord):
try:
_bot_type = str(type(bot)).split("_")[1].split(".")[0]
globals.log(f"Currently processing commands for {_bot_type} ...", "DEBUG")
logger.debug(f"Currently processing commands for {_bot_type} ...")
except Exception as e:
globals.log(f"Unable to determine current bot type: {e}", "WARNING")
logger.warning(f"Unable to determine current bot type: {e}")
# For Discord
if is_discord:
@ -540,22 +542,26 @@ def get_loaded_commands(bot, is_discord):
# 'bot.commands' is a set of Command objects.
for cmd_obj in bot.commands:
commands_list.append(cmd_obj.name)
debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING"
globals.log(f"Discord commands body: {commands_list}", f"{debug_level}")
if len(commands_list) > 0:
logger.debug(f"Discord commands body: {commands_list}")
else:
logger.warning(f"Discord commands body: {commands_list}")
except Exception as e:
globals.log(f"Error retrieving Discord commands: {e}", "ERROR")
logger.error(f"Error retrieving Discord commands: {e}")
elif not is_discord:
try:
for cmd_obj in bot._commands:
commands_list.append(cmd_obj)
debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING"
globals.log(f"Twitch commands body: {commands_list}", f"{debug_level}")
if len(commands_list) > 0:
logger.debug(f"Twitch commands body: {commands_list}")
else:
logger.warning(f"Twitch commands body: {commands_list}")
except Exception as e:
globals.log(f"Error retrieving Twitch commands: {e}", "ERROR")
logger.error(f"Error retrieving Twitch commands: {e}")
else:
globals.log(f"Unable to determine platform in 'get_loaded_commands()'!", "CRITICAL")
logger.critical(f"Unable to determine platform in 'get_loaded_commands()'!")
globals.log(f"... Finished processing commands for {_bot_type} ...", "DEBUG")
logger.debug(f"... Finished processing commands for {_bot_type} ...")
return sorted(commands_list)
@ -699,7 +705,7 @@ def track_user_activity(
valid_platforms = {"discord", "twitch"}
if platform not in valid_platforms:
globals.log(f"Unknown platform '{platform}' in track_user_activity!", "WARNING")
logger.warning(f"Unknown platform '{platform}' in track_user_activity!")
return
# Look up user by platform-specific ID in Platform_Mapping
@ -715,7 +721,7 @@ def track_user_activity(
# Update user last-seen timestamp
user_lastseen(db_conn, UUID=user_uuid, platform_name=platform, platform_user_id=user_id, update=True)
last_seen = user_lastseen(db_conn, UUID=user_uuid, platform_name=platform, platform_user_id=user_id, lookup=True)
globals.log(f"Updated last-seen datetime to {last_seen}", "DEBUG")
logger.debug(f"Updated last-seen datetime to {last_seen}")
if user_data["platform_username"] != username:
need_update = True
@ -737,7 +743,7 @@ def track_user_activity(
rowcount = run_db_operation(db_conn, "update", update_sql, params)
if rowcount and rowcount > 0:
globals.log(f"Updated {platform.capitalize()} user '{username}' (display '{display_name}') in Platform_Mapping.", "DEBUG")
logger.debug(f"Updated {platform.capitalize()} user '{username}' (display '{display_name}') in Platform_Mapping.")
return
# If user was not found in Platform_Mapping, check Users table
@ -757,9 +763,9 @@ def track_user_activity(
rowcount = run_db_operation(db_conn, "write", insert_mapping_sql, params)
if rowcount and rowcount > 0:
globals.log(f"Created new user entry for {platform} user '{username}' (display '{display_name}') with UUID={user_uuid}.", "DEBUG")
logger.debug(f"Created new user entry for {platform} user '{username}' (display '{display_name}') with UUID={user_uuid}.")
else:
globals.log(f"Failed to create user entry for {platform} user '{username}'.", "ERROR")
logger.error(f"Failed to create user entry for {platform} user '{username}'.")
from modules.db import log_bot_event
@ -820,7 +826,7 @@ def time_since(start, end=None, format=None):
"""
# Ensure a start time is provided.
if start is None:
globals.log("time_since() lacks a start value!", "ERROR")
logger.error("time_since() lacks a start value!")
return None
# If no end time is provided, use the current time.
@ -829,7 +835,7 @@ def time_since(start, end=None, format=None):
# Default to seconds if format is not valid.
if format not in ["s", "m", "h", "d"]:
globals.log("time_since() has incorrect format string. Defaulting to 's'", "WARNING")
logger.warning("time_since() has incorrect format string. Defaulting to 's'")
format = "s"
# Compute the total elapsed time in seconds.
@ -870,7 +876,7 @@ def wfstl():
Writes the calling function to log under the DEBUG category.
"""
caller_function_name = inspect.currentframe().f_back.f_code.co_name
globals.log(f"Function {caller_function_name} started processing", "DEBUG")
logger.debug(f"Function {caller_function_name} started processing")
def wfetl():
"""
@ -878,7 +884,7 @@ def wfetl():
Writes the calling function to log under the DEBUG category.
"""
caller_function_name = inspect.currentframe().f_back.f_code.co_name
globals.log(f"Function {caller_function_name} finished processing", "DEBUG")
logger.debug(f"Function {caller_function_name} finished processing")
async def get_guild_info(bot: discord.Client, guild_id: Union[int, str]) -> dict:
"""
@ -959,7 +965,7 @@ async def get_current_twitch_game(bot, channel_name: str) -> str:
# Depending on your TwitchIO version, the attribute may be `game_name` or `game`.
game_name = getattr(stream, 'game_name')
game_id = getattr(stream, 'game_id')
globals.log(f"'get_current_twitch_game()' result for Twitch channel '{channel_name}': Game ID: {game_id}, Game Name: {game_name}", "DEBUG")
logger.debug(f"'get_current_twitch_game()' result for Twitch channel '{channel_name}': Game ID: {game_id}, Game Name: {game_name}")
return game_name
return ""
@ -978,9 +984,7 @@ def list_channels(self):
channels_list = [channel.name for channel in self.connected_channels]
connected_channels_str = ", ".join(channels_list)
num_connected_channels = len(channels_list)
globals.log(
f"Currently connected to {num_connected_channels} Twitch channel(s): {connected_channels_str}"
)
logger.info(f"Currently connected to {num_connected_channels} Twitch channel(s): {connected_channels_str}")
from functools import wraps
import globals
@ -997,7 +1001,7 @@ def command_allowed_twitch(func):
# Block command if it's not allowed in the channel
if not is_command_allowed_twitch(command_name, channel_name):
globals.log(f"Command '{command_name}' is blocked in '{channel_name}'.", "WARNING")
logger.warning(f"Command '{command_name}' is blocked in '{channel_name}'.")
return
return await func(ctx, *args, **kwargs)