Bug tracking, minor tweaks

- Moved some constants to the globals.Constants class
  - config_data: returns the configuration dictionary
  - bot_start_time: returns the epoch bot start time
  - primary_discord_guild: returns a dict like `{"object": primary_guild_object, "id": primary_guild_int}`
  *These is initiated under globals.constants*
- Improved on docstrings. Google format now standard
- Reverted all commands to purely textual until further notice
  - `/help` is still additionally available as a slash command
kami_dev
Kami 2025-02-12 23:14:40 +01:00
parent 1b141c10fb
commit 01f002600c
7 changed files with 649 additions and 324 deletions

View File

@ -11,11 +11,14 @@ import modules
import modules.utility import modules.utility
from modules.db import log_message, lookup_user, log_bot_event from modules.db import log_message, lookup_user, log_bot_event
primary_guild = globals.constants.primary_discord_guild()["object"]
class DiscordBot(commands.Bot): class DiscordBot(commands.Bot):
def __init__(self, config): def __init__(self):
super().__init__(command_prefix="!", intents=discord.Intents.all()) super().__init__(command_prefix="!", intents=discord.Intents.all())
self.remove_command("help") # Remove built-in help function self.remove_command("help") # Remove built-in help function
self.config = config self.config = globals.constants.config_data
self.log = globals.log # Use the logging function from bots.py self.log = globals.log # Use the logging function from bots.py
self.db_conn = None # We'll set this later self.db_conn = None # We'll set this later
self.help_data = None # We'll set this later self.help_data = None # We'll set this later
@ -58,8 +61,29 @@ class DiscordBot(commands.Bot):
is_discord=True is_discord=True
) )
except Exception as e: except Exception as e:
globals.log(f"Error loading Discord commands: {e}", "ERROR", True) _result = f"Error loading Discord commands: {e}"
globals.log(_result, "ERROR", True)
@commands.command(name="cmd_reload")
@commands.is_owner()
async def cmd_reload(self, ctx: commands.Context):
try:
importlib.reload(cmd_discord) # Reload the commands file
cmd_discord.setup(self) # Ensure commands are registered
# Load help info
help_json_path = "dictionary/help_discord.json"
modules.utility.initialize_help_data(
bot=self,
help_json_path=help_json_path,
is_discord=True
)
_result = "Commands reloaded successfully"
globals.log("Discord commands reloaded successfully.")
except Exception as e:
_result = f"Error reloading Discord commands: {e}"
globals.log(_result, "ERROR", True)
await ctx.reply(_result)
async def on_message(self, message): async def on_message(self, message):
globals.log(f"Message detected by '{message.author.name}' in '{message.author.guild.name}' - #'{message.channel.name}'", "DEBUG") globals.log(f"Message detected by '{message.author.name}' in '{message.author.guild.name}' - #'{message.channel.name}'", "DEBUG")
@ -128,6 +152,30 @@ class DiscordBot(commands.Bot):
globals.log(f"Command '{ctx.command}' (Discord) initiated by {ctx.author} in #{channel_name}", "DEBUG") globals.log(f"Command '{ctx.command}' (Discord) initiated by {ctx.author} in #{channel_name}", "DEBUG")
if len(_cmd_args) > 1: globals.log(f"!{ctx.command} arguments: {_cmd_args}", "DEBUG") if len(_cmd_args) > 1: globals.log(f"!{ctx.command} arguments: {_cmd_args}", "DEBUG")
async def on_interaction(interaction: discord.Interaction):
# Only log application command (slash command) interactions.
if interaction.type == discord.InteractionType.application_command:
# Get the command name from the interaction data.
command_name = interaction.data.get("name")
# Get the options (arguments) if any.
options = interaction.data.get("options", [])
# Convert options to a list of values or key-value pairs.
option_values = [f'{opt.get("name")}: {opt.get("value")}' for opt in options]
# Determine the channel name (or DM).
if interaction.channel and hasattr(interaction.channel, "name"):
channel_name = interaction.channel.name
else:
channel_name = "Direct Message"
globals.log(
f"Command '{command_name}' (Discord) initiated by {interaction.user} in #{channel_name}",
"DEBUG"
)
if option_values:
globals.log(f"Command '{command_name}' arguments: {option_values}", "DEBUG")
async def on_ready(self): async def on_ready(self):
"""Runs when the bot successfully logs in.""" """Runs when the bot successfully logs in."""
# Sync Slash Commands # Sync Slash Commands
@ -136,9 +184,19 @@ class DiscordBot(commands.Bot):
#await self.tree.sync() #await self.tree.sync()
#globals.log("Discord slash commands synced.") #globals.log("Discord slash commands synced.")
primary_guild_int = int(self.config["discord_guilds"][0]) primary_guild_int = int(self.config["discord_guilds"][0])
primary_guild = discord.Object(id=primary_guild_int) num_guilds = len(self.config["discord_guilds"])
await self.tree.sync(guild=primary_guild) cmd_tree_result = (await self.tree.sync(guild=primary_guild))
globals.log(f"Discord slash commands force synced to guild: {primary_guild_int}") command_names = [command.name for command in cmd_tree_result] if cmd_tree_result else None
try:
guild_info = await modules.utility.get_guild_info(self, primary_guild_int)
primary_guild_name = guild_info["name"]
except Exception as e:
primary_guild_name = f"{primary_guild_int} (id)"
globals.log(f"Guild lookup failed: {e}", "ERROR")
_log_message = f"{num_guilds} guilds (global)" if num_guilds > 1 else f"guild: {primary_guild_name}"
globals.log(f"Discord slash commands force synced to {_log_message}")
globals.log(f"Discord slash commands that got synced: {command_names}")
except Exception as e: except Exception as e:
globals.log(f"Unable to sync Discord slash commands: {e}") globals.log(f"Unable to sync Discord slash commands: {e}")

View File

@ -12,14 +12,16 @@ import modules
import modules.utility import modules.utility
from modules.db import log_message, lookup_user, log_bot_event from modules.db import log_message, lookup_user, log_bot_event
twitch_channels = globals.constants.config_data["twitch_channels"]
class TwitchBot(commands.Bot): class TwitchBot(commands.Bot):
def __init__(self, config): def __init__(self):
self.client_id = os.getenv("TWITCH_CLIENT_ID") self.client_id = os.getenv("TWITCH_CLIENT_ID")
self.client_secret = os.getenv("TWITCH_CLIENT_SECRET") self.client_secret = os.getenv("TWITCH_CLIENT_SECRET")
self.token = os.getenv("TWITCH_BOT_TOKEN") self.token = os.getenv("TWITCH_BOT_TOKEN")
self.refresh_token = os.getenv("TWITCH_REFRESH_TOKEN") self.refresh_token = os.getenv("TWITCH_REFRESH_TOKEN")
self.log = globals.log # Use the logging function from bots.py self.log = globals.log # Use the logging function from bots.py
self.config = config self.config = globals.constants.config_data
self.db_conn = None # We'll set this self.db_conn = None # We'll set this
self.help_data = None # We'll set this later self.help_data = None # We'll set this later
@ -27,7 +29,7 @@ class TwitchBot(commands.Bot):
super().__init__( super().__init__(
token=self.token, token=self.token,
prefix="!", prefix="!",
initial_channels=config["twitch_channels"] initial_channels=twitch_channels
) )
globals.log("Twitch bot initiated") globals.log("Twitch bot initiated")

View File

@ -26,7 +26,7 @@ load_dotenv()
globals.reset_curlogfile() globals.reset_curlogfile()
# Load bot configuration # Load bot configuration
config_data = globals.load_config_file() config_data = globals.Constants.config_data
############################### ###############################
# Main Event Loop # Main Event Loop
@ -66,8 +66,8 @@ async def main():
globals.log("Initializing bots...") globals.log("Initializing bots...")
# Create both bots # Create both bots
discord_bot = DiscordBot(config_data) discord_bot = DiscordBot()
twitch_bot = TwitchBot(config_data) twitch_bot = TwitchBot()
# Log startup # Log startup
utility.log_bot_startup(db_conn) utility.log_bot_startup(db_conn)

View File

@ -1,204 +1,72 @@
# cmd_discord.py # cmd_discord.py
import discord import discord
from discord.ext import commands from discord.ext import commands
from discord import app_commands
from typing import Optional from typing import Optional
from discord import app_commands
from cmd_common import common_commands as cc from cmd_common import common_commands as cc
from modules.permissions import has_permission from modules.permissions import has_permission
from modules.utility import handle_help_command from modules.utility import handle_help_command
import globals import globals
import os
import json
import random
# Retrieve primary guild info if needed (for logging or other purposes)
primary_guild = globals.constants.primary_discord_guild() # e.g., {"object": discord.Object(id=1234567890), "id": 1234567890}
def setup(bot): def setup(bot):
""" """
Attach commands to the Discord bot, store references to db/log. Attach commands to the Discord bot and register textual commands.
This function loads configuration data, then registers several commands:
- sync_commands: (OWNER ONLY) Manually syncs the bots command tree.
- available: Lists commands available to the user.
- help: Provides detailed help information for commands.
- greet: Makes the bot greet the user.
- ping: Checks the bot's uptime and latency.
- howl: Makes the bot attempt a howl.
- hi: A development command to test permission handling.
- quote: Interact with the quotes system (multiple subcommands supported).
All commands here are invoked with your text command prefix (e.g. "!")
""" """
config_data = globals.load_config_file() config_data = globals.load_config_file()
@bot.command()
@commands.is_owner()
async def sync_commands(ctx):
"""
Trigger manual command sync. OWNER ONLY
"""
if commands.is_owner():
primary_guild_int = int(config_data["discord_guilds"][0])
primary_guild = discord.Object(id=primary_guild_int)
await ctx.send(f"Initiating slash sync to Discord Guild '{primary_guild_int}' ...")
try:
await bot.tree.sync(guild=primary_guild)
reply = "... Commands synced!"
except Exception as e:
reply = f"... Commands failed to sync! Error message:\n{e}"
globals.log(f"'sync_commands' failed to sync command tree\n{e}", "ERROR")
else:
reply = "You're not the registered owner of me!"
await ctx.send(reply)
@bot.hybrid_command(name="available", description="List commands available to you")
async def available(ctx):
available_cmds = []
for command in bot.commands:
try:
# This will return True if the command's checks pass for the given context.
if await command.can_run(ctx):
available_cmds.append(command.name)
except commands.CheckFailure:
# The command's checks did not pass.
pass
except Exception as e:
# In case some commands fail unexpectedly during checks.
globals.log(f"Error checking command {command.name}: {e}", "ERROR")
if available_cmds:
await ctx.send("Available commands: " + ", ".join(sorted(available_cmds)))
else:
await ctx.send("No commands are available to you at this time.")
@bot.hybrid_command(name="help", description="Get information about commands")
@app_commands.describe(
command="The command to get help info about. Defaults to 'help'"
)
async def cmd_help(ctx: commands.Context, *, command: str = ""):
"""
e.g. !help
!help quote
"""
result = await handle_help_command(ctx, command, bot, is_discord=True)
await ctx.send(result)
@bot.hybrid_command(name="greet", description="Make me greet you")
async def cmd_greet(ctx):
result = cc.greet(ctx.author.display_name, "Discord")
await ctx.send(result)
@bot.hybrid_command(name="ping", description="Check my uptime")
async def cmd_ping(ctx):
result = cc.ping()
# Get heartbeat latency. Discord only
latency = round(float(bot.latency) * 1000)
result += f" (*latency: {latency}ms*)"
await ctx.send(result)
@bot.hybrid_command(name="howl", description="Attempt a howl")
async def cmd_howl(ctx):
response = cc.handle_howl_command(ctx)
await ctx.send(response)
# @monitor_cmds(bot.log)
# @bot.hybrid_command(name="reload", description="Dynamically reload commands (INOP)")
# async def cmd_reload(ctx):
# """ Dynamically reloads Discord commands. """
# try:
# import cmd_discord
# import importlib
# importlib.reload(cmd_discord)
# cmd_discord.setup(bot)
# await ctx.send("Commands reloaded on first try!")
# except Exception as e:
# try:
# await bot.reload_extension("cmd_discord")
# await ctx.send("Commands reloaded on second try!")
# except Exception as e:
# try:
# await bot.unload_extension("cmd_discord")
# await bot.load_extension("cmd_discord")
# await ctx.send("Commands reloaded on third try!")
# except Exception as e:
# await ctx.send(f"Fallback reload failed: {e}")
@bot.hybrid_command(name="hi", description="Dev command for testing permissions system")
async def cmd_hi(ctx):
user_id = str(ctx.author.id)
user_roles = [role.name.lower() for role in ctx.author.roles] # Normalize to lowercase
if not has_permission("hi", user_id, user_roles, "discord"):
await ctx.send("You don't have permission to use this command.")
return
await ctx.send("Hello there!")
# @monitor_cmds(bot.log)
# @bot.hybrid_command(name="quote", description="Interact with the quotes system")
# async def cmd_quote(ctx, query: str = None):
# """
# !quote
# !quote add <text>
# !quote remove <id>
# !quote <id>
# """
# if not bot.db_conn:
# return await ctx.send("Database is unavailable, sorry.")
# args = query.split()
# # Send to our shared logic
# await cc.handle_quote_command(
# db_conn=bot.db_conn,
# log_func=bot.log,
# is_discord=True,
# ctx=ctx,
# args=list(args),
# get_twitch_game_for_channel=None # None for Discord
# )
# @monitor_cmds(bot.log)
# @bot.hybrid_group(name="quote", description="Interact with the quotes system", with_app_command=True)
# async def cmd_quote(ctx, query: str = None):
# """
# Usage:
# !quote -> get a random quote
# !quote <id> -> get a specific quote by number
# As a slash command, leave the query blank for a random quote or type a number.
# """
# if not bot.db_conn:
# return await ctx.send("Database is unavailable, sorry.")
# # Only process the base command if no subcommand was invoked.
# # When query is provided, split it into arguments (for a specific quote lookup).
# args = query.split() if query else []
# await cc.handle_quote_command(
# db_conn=bot.db_conn,
# log_func=bot.log,
# is_discord=True,
# ctx=ctx,
# args=args,
# get_twitch_game_for_channel=None # None for Discord
# )
# -------------------------------------------------------------------------
# TEXTUAL COMMAND: quote
# -------------------------------------------------------------------------
@bot.command(name="quote") @bot.command(name="quote")
async def cmd_quote(ctx, *, arg_str: str = ""): async def cmd_quote_text(ctx, *, arg_str: str = ""):
""" """
Handles the !quote command with multiple subcommands. Handle the '!quote' command with multiple subcommands.
Usage: Usage:
- `!quote` - !quote
-> Retrieves a random (non-removed) quote. -> Retrieves a random (non-removed) quote.
- `!quote <number>` - !quote <number>
-> Retrieves the specific quote by ID. -> Retrieves a specific quote by its ID.
- `!quote add <quote text>` - !quote add <quote text>
-> Adds a new quote and replies with its quote number. -> Adds a new quote and replies with its quote number.
- `!quote remove <number>` - !quote remove <number>
-> Removes the specified quote. -> Removes the specified quote.
- `!quote restore <number>` - !quote restore <number>
-> Restores a previously removed quote. -> Restores a previously removed quote.
- `!quote info <number>` - !quote info <number>
-> Displays stored information about the quote (as an embed on Discord). -> Displays stored information about the quote.
- `!quote search [keywords]` - !quote search [keywords]
-> Searches for the best matching quote based on the provided keywords. -> Searches for the best matching quote.
- `!quote last/latest/newest` - !quote latest
-> Retrieves the latest (most recent) non-removed quote. -> Retrieves the latest (most recent) non-removed quote.
""" """
if not globals.init_db_conn: if not globals.init_db_conn:
await ctx.send("Database is unavailable, sorry.") await ctx.reply("Database is unavailable, sorry.")
return return
# Parse the arguments from the message text
args = arg_str.split() if arg_str else [] args = arg_str.split() if arg_str else []
globals.log(f"'quote' command initiated with arguments: {args}", "DEBUG") globals.log(f"'quote' command initiated with arguments: {args}", "DEBUG")
result = await cc.handle_quote_command( result = await cc.handle_quote_command(
db_conn=globals.init_db_conn, db_conn=globals.init_db_conn,
is_discord=True, is_discord=True,
@ -206,22 +74,82 @@ def setup(bot):
args=args, args=args,
get_twitch_game_for_channel=None get_twitch_game_for_channel=None
) )
globals.log(f"'quote' result: {result}", "DEBUG") globals.log(f"'quote' result: {result}", "DEBUG")
# If the result is a discord.Embed, send it as an embed; otherwise, send plain text.
if hasattr(result, "to_dict"): if hasattr(result, "to_dict"):
await ctx.send(embed=result) await ctx.reply(embed=result)
else: else:
await ctx.send(result) await ctx.reply(result)
###################### # -------------------------------------------------------------------------
# The following log entry must be last in the file to verify commands loading as they should # TEXTUAL COMMAND: howl
###################### # -------------------------------------------------------------------------
# Debug: Print that commands are being registered @bot.command(name="howl")
async def cmd_howl_text(ctx):
"""
Handle the '!howl' command.
Usage:
- !howl
-> Attempts a howl.
- !howl stat <user>
-> Looks up howling stats for a user (defaults to self if not provided).
"""
result = cc.handle_howl_command(ctx)
await ctx.reply(result)
# -------------------------------------------------------------------------
# TEXTUAL COMMAND: help
# -------------------------------------------------------------------------
@bot.command(name="help")
async def cmd_help_text(ctx, *, command: str = ""):
"""
Get help information about commands.
Usage:
- !help
-> Provides a list of all commands with brief descriptions.
- !help <command>
-> Provides detailed help information for the specified command.
"""
result = await handle_help_command(ctx, command, bot, is_discord=True)
await ctx.reply(result)
# -------------------------------------------------------------------------
# SLASH COMMAND: help
# -------------------------------------------------------------------------
@bot.tree.command(name="help", description="Get information about commands", guild=primary_guild["object"])
@app_commands.describe(command="The command to get help info about. Defaults to 'help'")
async def cmd_help_slash(interaction: discord.Interaction, command: Optional[str] = ""):
result = await handle_help_command(interaction, command, bot, is_discord=True)
await interaction.response.send_message(result)
# -------------------------------------------------------------------------
# TEXTUAL COMMAND: ping
# -------------------------------------------------------------------------
@bot.command(name="ping")
async def cmd_ping_text(ctx):
"""
Check the bot's uptime and latency.
Usage:
- !ping
-> Returns the bot's uptime along with its latency in milliseconds.
"""
result = cc.ping()
latency = round(float(bot.latency) * 1000)
result += f" (*latency: {latency}ms*)"
await ctx.reply(result)
# -------------------------------------------------------------------------
# Final logging: list registered commands.
# -------------------------------------------------------------------------
try: try:
command_names = [cmd.name for cmd in bot.commands] # Extract command names command_names = [cmd.name for cmd in bot.commands] # Extract command names
globals.log(f"Registering commands for Discord: {command_names}", "DEBUG") globals.log(f"Registering commands for Discord: {command_names}", "DEBUG")
except Exception as e: except Exception as e:
globals.log(f"An error occured while printing registered commands for Discord: {e}", "WARNING") globals.log(f"An error occurred while printing registered commands for Discord: {e}", "WARNING")

View File

@ -1,5 +1,5 @@
{ {
"discord_guilds": [896713616089309184, 1011543769344135168], "discord_guilds": [896713616089309184],
"twitch_channels": ["OokamiKunTV", "ookamipup"], "twitch_channels": ["OokamiKunTV", "ookamipup"],
"command_modules": ["cmd_discord", "cmd_twitch", "cmd_common"], "command_modules": ["cmd_discord", "cmd_twitch", "cmd_common"],
"logging": { "logging": {

View File

@ -2,15 +2,39 @@ import time
import json import json
import sys import sys
import traceback import traceback
import discord
# Store the start time globally # Store the start time globally.
_bot_start_time = time.time() _bot_start_time = time.time()
def get_bot_start_time(): def get_bot_start_time():
"""Retrieve the bot's start time globally.""" """
Retrieve the bot's start time.
This function returns the Unix timestamp (in seconds) when the bot was started.
The timestamp is stored in the global variable `_bot_start_time`, which is set
when the module is first imported.
Returns:
float: The Unix timestamp representing the bot's start time.
"""
return _bot_start_time return _bot_start_time
def load_config_file(): def load_config_file():
"""
Load the configuration file.
This function attempts to read the JSON configuration from 'config.json'
in the current directory and return its contents as a dictionary. If the
file is not found or if the file contains invalid JSON, an error message
is printed and the program terminates with a non-zero exit code.
Returns:
dict: The configuration data loaded from 'config.json'.
Raises:
SystemExit: If 'config.json' is missing or cannot be parsed.
"""
CONFIG_PATH = "config.json" CONFIG_PATH = "config.json"
try: try:
with open(CONFIG_PATH, "r") as f: with open(CONFIG_PATH, "r") as f:
@ -23,6 +47,7 @@ def load_config_file():
print(f"Error parsing config.json: {e}") print(f"Error parsing config.json: {e}")
sys.exit(1) sys.exit(1)
# Load configuration file # Load configuration file
config_data = load_config_file() config_data = load_config_file()
@ -30,19 +55,32 @@ config_data = load_config_file()
# Simple Logging System # Simple Logging System
############################### ###############################
def log(message, level="INFO", exec_info=False): def log(message: str, level="INFO", exec_info=False, linebreaks=False):
""" """
A simple logging function with adjustable log levels. Log a message with the specified log level.
Logs messages in a structured format.
Available levels:\n Capable of logging individual levels to the terminal and/or logfile separately.
DEBUG = Information useful for debugging\n Can also append traceback information if needed, and is capable of preserving/removing linebreaks from log messages as needed.
INFO = Informational messages\n
WARNING = Something happened that may lead to issues\n Args:
ERROR = A non-critical error has happened\n message (str): The message to log.
CRITICAL = A critical, but non-fatal, error\n level (str, optional): Log level of the message. Defaults to "INFO".
FATAL = Fatal error. Program exits after logging this\n\n exec_info (bool, optional): If True, append traceback information. Defaults to False.
See 'config.json' for disabling/enabling logging levels linebreaks (bool, optional): If True, preserve line breaks in the log. Defaults to False.
Available levels:
DEBUG - Information useful for debugging.
INFO - Informational messages.
WARNING - Something happened that may lead to issues.
ERROR - A non-critical error has occurred.
CRITICAL - A critical, but non-fatal, error occurred.
FATAL - Fatal error; program exits after logging this.
See:
config.json for further configuration options under "logging".
Example:
log("An error occured during processing", "ERROR", exec_info=True, linebreaks=False)
""" """
# Initiate logfile # Initiate logfile
@ -99,26 +137,88 @@ def log(message, level="INFO", exec_info=False):
sys.exit(1) sys.exit(1)
def reset_curlogfile(): def reset_curlogfile():
"""
Clear the current log file.
This function constructs the current log file path by prepending 'cur_'
to the log file path specified in the configuration data under the "logging"
section. It then opens the file in write mode, effectively truncating and
clearing its contents.
If an exception occurs while attempting to clear the log file, the error is
silently ignored.
"""
# Initiate logfile # Initiate logfile
lfp = config_data["logging"]["logfile_path"] # Log File Path lfp = config_data["logging"]["logfile_path"] # Log File Path
clfp = f"cur_{lfp}" # Current Log File Path clfp = f"cur_{lfp}" # Current Log File Path
try: try:
open(clfp, "w") open(clfp, "w")
#log(f"Current-run logfile cleared", "DEBUG") # log(f"Current-run logfile cleared", "DEBUG")
except Exception as e: except Exception as e:
#log(f"Failed to clear current-run logfile: {e}") # log(f"Failed to clear current-run logfile: {e}")
pass pass
def init_db_conn(): def init_db_conn():
"""
Initialize and return a database connection.
This function reads the configuration settings and attempts to establish a
connection to the database by invoking `modules.db.init_db_connection()`. If
no valid connection is obtained (i.e. if the connection is None), it logs a
fatal error and terminates the program using sys.exit(1). If an exception is
raised during the initialization process, the error is logged and the function
returns None.
Returns:
DatabaseConnection or None: A valid database connection object if
successfully established; otherwise, None (or the program may exit if the
connection is missing).
"""
try: try:
import modules.db import modules.db
db_conn = modules.db.init_db_connection(config_data) db_conn = modules.db.init_db_connection(config_data)
if not db_conn: if not db_conn:
# If we get None, it means FATAL. We might sys.exit(1) or handle it differently. # If we get None, it means a fatal error occurred.
log("Terminating bot due to no DB connection.", "FATAL") log("Terminating bot due to no DB connection.", "FATAL")
sys.exit(1) sys.exit(1)
return db_conn return db_conn
except Exception as e: except Exception as e:
log(f"Unable to initialize database!: {e}", "FATAL") log(f"Unable to initialize database!: {e}", "FATAL")
return None return None
class Constants:
@property
def config_data(self) -> dict:
"""Returns a dictionary of the contents of the config.json config file"""
return load_config_file()
def bot_start_time(self) -> float:
"""Returns the bot epoch start time"""
return _bot_start_time
def primary_discord_guild(self) -> object | None:
"""
Retrieve the primary Discord guild from the configuration.
This function attempts to obtain the primary Discord guild based on the
configuration data stored in `config_data["discord_guilds"]`. It converts the first
guild ID in the list to an integer and then creates a `discord.Object` from it. If the
configuration defines more than one (or fewer than the expected number of) guilds, the function
returns `None` for the guild ID.
Returns:
dict: A dictionary with the following keys:
- "object": A `discord.Object` representing the primary Discord guild if exactly one
guild is defined; otherwise, `None`.
- "id": The integer ID of the primary guild if available; otherwise, `None`.
"""
primary_guild_object = None
primary_guild_int = int(config_data["discord_guilds"][0]) if len(config_data["discord_guilds"]) == 1 else None
if primary_guild_int:
primary_guild_object = discord.Object(id=primary_guild_int)
return_dict = {"object": primary_guild_object, "id": primary_guild_int}
return return_dict
constants = Constants()

View File

@ -6,6 +6,7 @@ import re
import functools import functools
import inspect import inspect
import uuid import uuid
from typing import Union
from modules.db import run_db_operation, lookup_user, log_message from modules.db import run_db_operation, lookup_user, log_message
import modules.utility as utility import modules.utility as utility
import discord import discord
@ -156,23 +157,39 @@ DICTIONARY_PATH = "dictionary/" # Path to dictionary files
def format_uptime(seconds: float) -> tuple[str, int]: def format_uptime(seconds: float) -> tuple[str, int]:
""" """
Convert seconds into a human-readable string: Convert a duration in seconds into a human-readable string using up to two significant time units.
- Example outputs:
"32 minutes" This function breaks down the given number of seconds into larger time units (years, months,
"8 days, 4 hours" days, hours, minutes, and seconds) and returns a string representing the duration using only the
"1 year, 3 months" two most significant non-zero units. For example:
- Returns a tuple: - 192000 seconds might be formatted as "2 days, 5 hours"
(Human-readable string, total seconds) - 32 minutes will be formatted as "32 minutes"
- 37000000 seconds could be formatted as "1 year, 3 months"
Args:
seconds (float): The total number of seconds to convert.
Returns:
tuple(str, int):
A tuple containing:
- A human-readable string representing the duration (using up to two time units).
- The original duration in seconds as an integer.
Examples:
>>> format_uptime(192000)
('2 days, 5 hours', 192000)
>>> format_uptime(60)
('1 minute', 60)
""" """
seconds = int(seconds) # Ensure integer seconds seconds = int(seconds) # Ensure integer seconds
seconds_int = seconds seconds_int = seconds
# Define time units # Define time units as tuples of (unit name, seconds per unit)
units = [ units = [
("year", 31536000), # 365 days ("year", 31536000), # 365 days
("month", 2592000), # 30 days ("month", 2592000), # 30 days
("day", 86400), # 24 hours ("day", 86400), # 24 hours
("hour", 3600), # 60 minutes ("hour", 3600), # 60 minutes
("minute", 60), ("minute", 60),
("second", 1) ("second", 1)
] ]
@ -184,18 +201,28 @@ def format_uptime(seconds: float) -> tuple[str, int]:
if value > 0: if value > 0:
time_values.append(f"{value} {unit_name}{'s' if value > 1 else ''}") # Auto pluralize time_values.append(f"{value} {unit_name}{'s' if value > 1 else ''}") # Auto pluralize
# Return only the **two most significant** time units (e.g., "3 days, 4 hours") # Return only the two most significant time units (e.g., "3 days, 4 hours")
return (", ".join(time_values[:2]), seconds_int) if time_values else ("0 seconds", 0) return (", ".join(time_values[:2]), seconds_int) if time_values else ("0 seconds", 0)
def get_random_reply(dictionary_name: str, category: str, **variables) -> str: def get_random_reply(dictionary_name: str, category: str, **variables) -> str:
""" """
Fetches a random string from a given dictionary and category. Fetch a random reply from a specified dictionary file and category.
Supports variable substitution using keyword arguments.
:param dictionary_name: The name of the dictionary file (without .json) This function loads a JSON filenamed after the given dictionary (without the .json extension)
:param category: The category (key) inside the dictionary to fetch a response from and located in the DICTIONARY_PATH directoryand retrieves a random reply from the list of
:param variables: Keyword arguments to replace placeholders in the string responses under the specified category. It then substitutes any placeholders in the reply
:return: A formatted string with the variables replaced string with the provided keyword arguments. If the file does not exist, the JSON is invalid,
or the category is missing or not a list, the function returns an appropriate error message.
Args:
dictionary_name (str): The base name of the dictionary file (without the .json extension).
category (str): The key within the JSON file representing the category of replies.
**variables: Arbitrary keyword arguments for substituting placeholders in the reply string.
Returns:
str: A formatted reply string with the variables substituted, or an error message if the
file, category, or JSON data is invalid.
""" """
file_path = os.path.join(DICTIONARY_PATH, f"{dictionary_name}.json") file_path = os.path.join(DICTIONARY_PATH, f"{dictionary_name}.json")
@ -219,6 +246,7 @@ def get_random_reply(dictionary_name: str, category: str, **variables) -> str:
# Replace placeholders with provided variables # Replace placeholders with provided variables
return response.format(**variables) return response.format(**variables)
############################## ##############################
# Basic sanitization # Basic sanitization
# DO NOT RELY SOLELY ON THIS # DO NOT RELY SOLELY ON THIS
@ -229,34 +257,47 @@ def sanitize_user_input(
max_length: int = 500 max_length: int = 500
): ):
""" """
A whitelisting-based function for sanitizing user input. Sanitize user input using a whitelisting approach.
Returns a tuple of: This function sanitizes the provided user input by applying a whitelist filter based on
(sanitized_str, sanitization_applied_bool, sanitization_reason, original_str) the specified usage context. For inputs intended for calculation ("CALC"), it retains only
digits, mathematical operators, parentheses, and related characters. For general text
("GENERAL"), it permits a broader range of characters including letters, numbers, punctuation,
symbols, and separators. The function also truncates the input to a maximum length, removes
unwanted whitespace characters, and applies additional filtering using either a regex library
(if available) or a fallback method.
:param user_input: The raw string from the user (e.g., from Twitch or Discord). Args:
:param usage: user_input (str): The raw input string from the user (e.g., from Twitch or Discord).
- 'CALC': Keep digits, math operators, parentheses, etc. usage (str, optional): The context for sanitization. Accepted values are:
- 'GENERAL': Keep typical readable characters & punctuation. - "CALC": Retain only characters relevant for mathematical expressions.
:param max_length: Truncate the input if it exceeds this length. - "GENERAL": Allow typical readable characters and punctuation.
:return: (sanitized_str, bool, reason_string, original_str) Defaults to "GENERAL".
max_length (int, optional): The maximum allowed length of the input. Any characters beyond
this limit will be truncated. Defaults to 500.
====================== Returns:
SECURITY RECOMMENDATIONS tuple: A tuple in the form:
====================== (sanitized_str, sanitization_applied, reason_string, original_str)
1) For database storage (MariaDB, etc.): where:
- **Always** use parameterized queries or an ORM with bound parameters. - sanitized_str (str): The sanitized version of the input.
- Do not rely solely on string sanitization to prevent SQL injection. - sanitization_applied (bool): True if any modifications were applied; otherwise, False.
- reason_string (str): A semicolon-separated string explaining the sanitization steps.
2) For code execution (e.g., 'eval'): - original_str (str): The original, unmodified input string.
- Avoid using eval/exec on user input.
- If you must, consider a restricted math parser or an audited sandbox. Security Recommendations:
1) For Database Storage (e.g., MariaDB):
3) For HTML sanitization: - Always use parameterized queries or an ORM with bound parameters.
- Bleach is deprecated; research modern alternatives or frameworks that - Do not rely solely on string sanitization to prevent SQL injection.
safely sanitize HTML output. This function does *not* sanitize HTML tags.
2) For Code Execution (e.g., using eval):
- Avoid using eval/exec on user input.
- If execution of user input is required, consider a restricted math parser or an audited sandbox.
3) For HTML Sanitization:
- Bleach is deprecated; consider modern alternatives or frameworks that safely sanitize HTML output.
- Note: This function does not sanitize HTML tags.
""" """
original_string = str(user_input) original_string = str(user_input)
reasons = [] reasons = []
sanitization_applied = False sanitization_applied = False
@ -284,8 +325,8 @@ def sanitize_user_input(
if USE_REGEX_LIB: if USE_REGEX_LIB:
# Remove ASCII control chars (0-31, 127) first # Remove ASCII control chars (0-31, 127) first
step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized) step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized)
# Then apply a fairly broad whitelist: # Then apply a broad whitelist:
# \p{L}: letters; \p{N}: numbers; \p{P}: punctuation; \p{S}: symbols; \p{Z}: separators (including spaces). # \p{L}: letters; \p{N}: numbers; \p{P}: punctuation; \p{S}: symbols; \p{Z}: separators (including spaces)
# This keeps emojis, foreign characters, typical punctuation, etc. # This keeps emojis, foreign characters, typical punctuation, etc.
pattern = r"[^\p{L}\p{N}\p{P}\p{S}\p{Z}]" pattern = r"[^\p{L}\p{N}\p{P}\p{S}\p{Z}]"
new_sanitized = regex.sub(pattern, "", step1) new_sanitized = regex.sub(pattern, "", step1)
@ -297,7 +338,7 @@ def sanitize_user_input(
else: else:
# Fallback: If 'regex' is not installed, remove control chars and keep ASCII printable only. # Fallback: If 'regex' is not installed, remove control chars and keep ASCII printable only.
step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized) step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized)
pattern = r"[^ -~]" # Keep only ASCII 32-126 pattern = r"[^ -~]" # Keep only ASCII characters 32-126
new_sanitized = re.sub(pattern, "", step1) new_sanitized = re.sub(pattern, "", step1)
if new_sanitized != sanitized: if new_sanitized != sanitized:
@ -312,43 +353,66 @@ def sanitize_user_input(
reason_string = "; ".join(reasons) reason_string = "; ".join(reasons)
return (sanitized, sanitization_applied, reason_string, original_string) return (sanitized, sanitization_applied, reason_string, original_string)
##################### #####################
# Help command logic # Help command logic
##################### #####################
async def handle_help_command(ctx, command_name, bot, is_discord): async def handle_help_command(ctx, command_name, bot, is_discord):
""" """
Called by the platform-specific help commands to provide the help text. Provide help text for a specific command or list all commands.
:param ctx: discord.py or twitchio context
:param command_name: e.g. "quote" or None if user typed just "!help"
:param bot: The current bot instance
:param is_discord: True for Discord, False for Twitch
"""
This asynchronous function is called by platform-specific help commands to generate and
return the appropriate help message. It retrieves the help data stored on the bot (parsed
from a JSON file) and checks whether the requested command exists in that data. If no
specific command is provided (i.e. `command_name` is None), it returns a list of all loaded
commands. Depending on the platform (Discord or Twitch), it formats the help message using
the corresponding builder function.
Args:
ctx: The context from the command invocation (from discord.py or twitchio).
command_name (str): The name of the command for which help is requested (e.g., "quote"). If
None, a list of all available commands is returned.
bot: The current bot instance, which should have a `help_data` attribute containing the help
information.
is_discord (bool): True if the bot is running on Discord; False if running on Twitch.
Returns:
str: A formatted help message string. This message may indicate one of the following:
- A list of all loaded commands (if no command name is provided),
- Detailed help for a specific command (if help data exists for that command),
- A notice that the command is loaded but lacks help data,
- A notice that the command is deprecated/unloaded, or
- An error message if the help data is missing or invalid.
Side Effects:
Logs warnings or errors if the help data is missing, improperly structured, or if the
command is not found.
"""
# If there's no loaded help_data, we can't do much # If there's no loaded help_data, we can't do much
if not hasattr(bot, "help_data") or not bot.help_data: if not hasattr(bot, "help_data") or not bot.help_data:
return await "No help data found." return await "No help data found."
help_data = bot.help_data # The parsed JSON from e.g. help_discord.json help_data = bot.help_data # The parsed JSON from e.g. help_discord.json
if "commands" not in help_data: if "commands" not in help_data:
return "Invalid help data structure (no 'commands' key).\n*This is due to an error with the help file.*" return "Invalid help data structure (no 'commands' key).\n*This is due to an error with the help file.*"
if not command_name: if not command_name:
# User typed just "!help" => list all known commands from this bot # User typed just "!help" => list all known commands from this bot
loaded_cmds = get_loaded_commands(bot, is_discord) loaded_cmds = get_loaded_commands(bot, is_discord)
if not loaded_cmds: if not loaded_cmds:
return "I have no commands loaded." return "I have no commands loaded."
else: else:
if is_discord: if is_discord:
help_str = f"I currently offer these commands:" help_str = f"I currently offer these commands:"
for cmd in loaded_cmds: for cmd in loaded_cmds:
help_str += f"\n- !{cmd}" help_str += f"\n- `{cmd}`"
help_str += f"\n*Use '!help <command>' for more details.*" help_str += f"\n*Use `!help <command>` for more details.*"
return help_str return help_str
else: else:
short_list = ", ".join(loaded_cmds) short_list = ", ".join(loaded_cmds)
# We can also mention "Use !help [command] for more info." # We can also mention "Use !help [command] for more info."
return f"I currently offer these commands:{short_list}. \nUse '!help <command>' for details." return f"I currently offer these commands: {short_list}. \nUse '!help <command>' for details."
# 1) Check if the command is loaded # 1) Check if the command is loaded
loaded = (command_name in get_loaded_commands(bot, is_discord)) loaded = (command_name in get_loaded_commands(bot, is_discord))
@ -374,15 +438,33 @@ async def handle_help_command(ctx, command_name, bot, is_discord):
return f"I'm sorry, I don't offer a command named '{command_name}'." return f"I'm sorry, I don't offer a command named '{command_name}'."
def initialize_help_data(bot, help_json_path, is_discord): def initialize_help_data(bot, help_json_path, is_discord):
""" """
Loads help data from a JSON file, stores it in bot.help_data, Load help data from a JSON file and verify bot commands against it.
then verifies each loaded command vs. the help_data.
Logs any mismatches:
- Commands in help file but not loaded => "deprecated"
- Loaded commands not in help file => "missing help"
"""
This function loads help data from the specified JSON file and stores the parsed data
in the bot's `help_data` attribute. After loading the data, it cross-checks the commands
loaded on the bot with the commands defined in the help file. The function logs warnings
for any discrepancies:
- If a command is present in the help file but not loaded on the bot, it logs a warning
(indicating the command may be deprecated).
- If a command is loaded on the bot but missing from the help file, it logs a warning
(indicating missing help information).
Args:
bot: The bot instance, which will have its `help_data` attribute updated.
help_json_path (str): The file path to the JSON file containing help data.
is_discord (bool): A flag indicating whether the bot is a Discord bot (True) or a Twitch bot (False).
Returns:
None
Side Effects:
- Updates the bot's `help_data` attribute with the contents of the JSON file.
- Logs warnings or errors if the help file is missing, cannot be parsed, or if there are mismatches
between the loaded commands and the help data.
"""
platform_name = "Discord" if is_discord else "Twitch" platform_name = "Discord" if is_discord else "Twitch"
if not os.path.exists(help_json_path): if not os.path.exists(help_json_path):
@ -419,7 +501,29 @@ def initialize_help_data(bot, help_json_path, is_discord):
globals.log(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING") globals.log(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING")
def get_loaded_commands(bot, is_discord): def get_loaded_commands(bot, is_discord):
"""
Retrieve and sort the list of loaded commands from a bot instance.
This function examines the provided bot instance and extracts its registered commands.
For a Discord bot (when `is_discord` is True), it iterates over `bot.commands` and collects
the command names. For a Twitch bot (when `is_discord` is False), it iterates over `bot._commands`
and collects the command objects. Throughout the process, the function logs debug and error
messages to help trace the execution flow. Finally, it returns the list of commands in sorted order.
Args:
bot: The bot instance from which to retrieve commands. This may be either a Discord or Twitch bot.
is_discord (bool): Indicates whether the bot is a Discord bot. If False, the function assumes the bot
is a Twitch bot.
Returns:
list: A sorted list of commands. For a Discord bot, this list contains command names (strings);
for a Twitch bot, it contains command objects as stored in `bot._commands`.
Side Effects:
Logs debug, warning, and error messages regarding the command processing.
"""
from discord.ext import commands as discord_commands from discord.ext import commands as discord_commands
from twitchio.ext import commands as twitch_commands from twitchio.ext import commands as twitch_commands
@ -433,12 +537,11 @@ def get_loaded_commands(bot, is_discord):
# For Discord # For Discord
if is_discord: if is_discord:
#if isinstance(bot, discord_commands.Bot):
try: try:
# 'bot.commands' is a set of Command objects # 'bot.commands' is a set of Command objects.
for cmd_obj in bot.commands: for cmd_obj in bot.commands:
commands_list.append(cmd_obj.name) commands_list.append(cmd_obj.name)
debug_level ="DEBUG" if len(commands_list) > 0 else "WARNING" debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING"
globals.log(f"Discord commands body: {commands_list}", f"{debug_level}") globals.log(f"Discord commands body: {commands_list}", f"{debug_level}")
except Exception as e: except Exception as e:
globals.log(f"Error retrieving Discord commands: {e}", "ERROR") globals.log(f"Error retrieving Discord commands: {e}", "ERROR")
@ -446,7 +549,7 @@ def get_loaded_commands(bot, is_discord):
try: try:
for cmd_obj in bot._commands: for cmd_obj in bot._commands:
commands_list.append(cmd_obj) commands_list.append(cmd_obj)
debug_level ="DEBUG" if len(commands_list) > 0 else "WARNING" debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING"
globals.log(f"Twitch commands body: {commands_list}", f"{debug_level}") globals.log(f"Twitch commands body: {commands_list}", f"{debug_level}")
except Exception as e: except Exception as e:
globals.log(f"Error retrieving Twitch commands: {e}", "ERROR") globals.log(f"Error retrieving Twitch commands: {e}", "ERROR")
@ -459,7 +562,29 @@ def get_loaded_commands(bot, is_discord):
def build_discord_help_message(cmd_name, cmd_help_dict): def build_discord_help_message(cmd_name, cmd_help_dict):
""" """
A verbose multi-line string for Discord. Build a verbose, multi-line help message for Discord.
This function constructs a detailed help message for a Discord command using the
provided command name and a dictionary of help information. The dictionary may include
a description, subcommands with their arguments and descriptions, and example usage
scenarios. The generated message is formatted as a multi-line string with Markdown
styling for use on Discord.
Args:
cmd_name (str): The name of the command.
cmd_help_dict (dict): A dictionary containing help details for the command.
Expected keys include:
- "description" (str): A detailed description of the command.
- "subcommands" (dict, optional): A mapping of subcommand names to a dictionary
with keys "args" (str) and "desc" (str) representing the arguments and a short
description of the subcommand.
- "examples" (list, optional): A list of example strings demonstrating how to use
the command. Each example should be a string that optionally contains a description
after a " : " separator.
Returns:
str: A multi-line string formatted for Discord that includes the command description,
subcommands (if any), and example usage.
""" """
description = cmd_help_dict.get("description", "No description available.\n") description = cmd_help_dict.get("description", "No description available.\n")
subcommands = cmd_help_dict.get("subcommands", {}) subcommands = cmd_help_dict.get("subcommands", {})
@ -484,16 +609,36 @@ def build_discord_help_message(cmd_name, cmd_help_dict):
# Handle missing description case # Handle missing description case
ex_cmd = ex_arr[0] ex_cmd = ex_arr[0]
ex_note = str(f"\n {ex_arr[1]}") if len(ex_arr) > 1 else "" ex_note = f"\n {ex_arr[1]}" if len(ex_arr) > 1 else ""
lines.append(f"- `{ex_cmd}`{ex_note}") lines.append(f"- `{ex_cmd}`{ex_note}")
return "\n".join(lines) return "\n".join(lines)
def build_twitch_help_message(cmd_name, cmd_help_dict): def build_twitch_help_message(cmd_name, cmd_help_dict):
""" """
A concise, possibly single-line help for Twitch. Build a concise Twitch help message for a command.
This function constructs a help message string for a Twitch command based on the
provided command name and a dictionary of help details. The help dictionary should
contain a "description" of the command and may include a "subcommands" mapping. For
each subcommand, the function builds usage examples by appending the subcommand name
and its arguments (if any) to the base command. If no subcommands are defined, the
usage defaults to the base command.
Args:
cmd_name (str): The name of the command (without the "!" prefix).
cmd_help_dict (dict): A dictionary containing help details for the command.
Expected keys include:
- "description" (str): A brief description of the command.
- "subcommands" (dict, optional): A mapping of subcommand names to their details.
Each value should be a dictionary that may contain an "args" key (str) representing
additional command arguments.
Returns:
str: A formatted help message string that includes the command description and a usage
line with examples of how to invoke the command and its subcommands.
""" """
description = cmd_help_dict.get("description", "No description available.") description = cmd_help_dict.get("description", "No description available.")
subcommands = cmd_help_dict.get("subcommands", {}) subcommands = cmd_help_dict.get("subcommands", {})
@ -513,6 +658,7 @@ def build_twitch_help_message(cmd_name, cmd_help_dict):
return f"Help for !{cmd_name}: {description}. Usage: {usage_line}" return f"Help for !{cmd_name}: {description}. Usage: {usage_line}"
async def send_message(ctx, text): async def send_message(ctx, text):
""" """
Minimal helper to send a message to either Discord or Twitch. Minimal helper to send a message to either Discord or Twitch.
@ -522,38 +668,53 @@ async def send_message(ctx, text):
def track_user_activity( def track_user_activity(
db_conn, db_conn,
platform: str, platform: str,
user_id: str, user_id: str|int,
username: str, username: str,
display_name: str, display_name: str,
user_is_bot: bool = False user_is_bot: bool = False
): ):
""" """
Checks or creates/updates a user in the 'users' table for the given platform's message. Create or update a user record in the database for a given platform.
:param db_conn: The active DB connection This function checks whether a user with the specified user ID exists in the 'users'
:param platform: "discord" or "twitch" table for the provided platform (either "discord" or "twitch"). If a matching record is found,
:param user_id: e.g., Discord user ID or Twitch user ID the function compares the provided username, display name, and bot status with the stored values,
:param username: The raw username (no #discriminator for Discord) updating the record if any discrepancies are detected. If no record exists, a new user record is
:param display_name: The users display name created with a generated UUID.
:param user_is_bot: Boolean if the user is recognized as a bot on that platform
Args:
db_conn: The active database connection used to perform database operations.
platform (str): The platform to which the user belongs. Expected values are "discord" or "twitch".
user_id (str): The unique identifier of the user on the given platform.
username (str): The raw username of the user (for Discord, this excludes the discriminator).
display_name (str): The display name of the user.
user_is_bot (bool, optional): Indicates whether the user is a bot on the platform.
Defaults to False.
Returns:
None
Side Effects:
- Logs debugging and error messages via the global logger.
- Updates an existing user record if discrepancies are found.
- Inserts a new user record if no existing record is found.
""" """
globals.log(f"UUI Lookup for: {username} - {user_id} ({platform.lower()}) ...", "DEBUG") globals.log(f"UUI Lookup for: {username} - {user_id} ({platform.lower()}) ...", "DEBUG")
# Decide which column we use for the ID lookup # Decide which column we use for the ID lookup ("discord_user_id" or "twitch_user_id")
# "discord_user_id" or "twitch_user_id"
if platform.lower() in ("discord", "twitch"): if platform.lower() in ("discord", "twitch"):
identifier_type = f"{platform.lower()}_user_id" identifier_type = f"{platform.lower()}_user_id"
else: else:
globals.log(f"Unknown platform '{platform}' in track_user_activity!", "WARNING") globals.log(f"Unknown platform '{platform}' in track_user_activity!", "WARNING")
return return
# 1) Try to find an existing user row # 1) Try to find an existing user row.
user_data = lookup_user(db_conn, identifier=user_id, identifier_type=identifier_type) user_data = lookup_user(db_conn, identifier=user_id, identifier_type=identifier_type)
if user_data: if user_data:
# Found an existing row for that user ID on this platform # Found an existing row for that user ID on this platform.
# Check if the username or display_name is different => if so, update # Check if the username or display_name is different and update if necessary.
need_update = False need_update = False
column_updates = [] column_updates = []
params = [] params = []
@ -571,10 +732,6 @@ def track_user_activity(
column_updates.append("discord_user_display_name = ?") column_updates.append("discord_user_display_name = ?")
params.append(display_name) params.append(display_name)
# Possibly check user_is_bot
# If it's different than what's stored, update
# (We must add a column in your table for that if you want it stored per-platform.)
# For demonstration, let's store it in "user_is_bot"
if user_data["user_is_bot"] != user_is_bot: if user_data["user_is_bot"] != user_is_bot:
need_update = True need_update = True
column_updates.append("user_is_bot = ?") column_updates.append("user_is_bot = ?")
@ -604,7 +761,6 @@ def track_user_activity(
column_updates.append("twitch_user_display_name = ?") column_updates.append("twitch_user_display_name = ?")
params.append(display_name) params.append(display_name)
# Possibly store is_bot in user_is_bot
if user_data["user_is_bot"] != user_is_bot: if user_data["user_is_bot"] != user_is_bot:
need_update = True need_update = True
column_updates.append("user_is_bot = ?") column_updates.append("user_is_bot = ?")
@ -624,8 +780,7 @@ def track_user_activity(
globals.log(f"Updated Twitch user '{username}' (display '{display_name}') in 'users'.", "DEBUG") globals.log(f"Updated Twitch user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
else: else:
# 2) No row found => create a new user row # 2) No row found => create a new user row.
# Generate a new UUID for this user
new_uuid = str(uuid.uuid4()) new_uuid = str(uuid.uuid4())
if platform.lower() == "discord": if platform.lower() == "discord":
@ -640,8 +795,7 @@ def track_user_activity(
VALUES (?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?)
""" """
params = (new_uuid, user_id, username, display_name, int(user_is_bot)) params = (new_uuid, user_id, username, display_name, int(user_is_bot))
else: # platform is "twitch"
else: # "twitch"
insert_sql = """ insert_sql = """
INSERT INTO users ( INSERT INTO users (
UUID, UUID,
@ -660,6 +814,7 @@ def track_user_activity(
else: else:
globals.log(f"Failed to create new user row for {platform} user '{username}'", "ERROR") globals.log(f"Failed to create new user row for {platform} user '{username}'", "ERROR")
from modules.db import log_bot_event from modules.db import log_bot_event
def log_bot_startup(db_conn): def log_bot_startup(db_conn):
@ -681,19 +836,40 @@ def generate_link_code():
def time_since(start, end=None, format=None): def time_since(start, end=None, format=None):
""" """
Returns the epoch time since the start value. Calculate and format the elapsed time between two epoch timestamps.
:param start: The epoch time to check against. This function computes the elapsed time between a starting epoch timestamp and an ending
:param end: The epoch time to compare with. epoch timestamp. If no end time is provided, the current time is used. The elapsed time is
Defaults to current time. formatted according to the specified unit:
:param format: One of 's', 'm', 'h', or 'd' corresponding to seconds, minutes, hours, or days. - 's': whole seconds and remaining milliseconds.
Defaults to "s" (seconds). - 'm': whole minutes and remaining seconds.
:return: A tuple (x, y, total_elapsed) where: - 'h': whole hours and remaining minutes.
- For "s": x is whole seconds and y is the remaining milliseconds. - 'd': whole days and remaining hours.
- For "m": x is whole minutes and y is the remaining seconds. If an invalid format is given, a warning is logged and the default format 's' is used.
- For "h": x is whole hours and y is the remaining minutes.
- For "d": x is whole days and y is the remaining hours. Args:
- total_elapsed is the complete elapsed time in seconds. start (float): The starting epoch timestamp (in seconds). If None, an error is logged and
the function returns None.
end (float, optional): The ending epoch timestamp (in seconds). Defaults to the current time.
format (str, optional): A single character indicating the desired format of the elapsed time.
Must be one of:
's' for seconds (with milliseconds),
'm' for minutes (with seconds),
'h' for hours (with minutes), or
'd' for days (with hours).
Defaults to 's' if an invalid value is provided.
Returns:
tuple: A tuple (x, y, total_elapsed) where:
- For 's': x is a string representing whole seconds (with "s" appended) and y is a string
representing the remaining milliseconds (with "ms" appended).
- For 'm': x is a string representing whole minutes (with "m" appended) and y is a string
representing the remaining seconds (with "s" appended).
- For 'h': x is a string representing whole hours (with "h" appended) and y is a string
representing the remaining minutes (with "m" appended).
- For 'd': x is a string representing whole days (with "d" appended) and y is a string
representing the remaining hours (with "h" appended).
- total_elapsed (float): The total elapsed time in seconds.
""" """
# Ensure a start time is provided. # Ensure a start time is provided.
if start is None: if start is None:
@ -740,13 +916,14 @@ def time_since(start, end=None, format=None):
return (x, y, since) return (x, y, since)
def wfstl(): def wfstl():
""" """
Write Function Start To Log (debug) Write Function Start To Log (debug)
Writes the calling function to log under the DEBUG category. Writes the calling function to log under the DEBUG category.
""" """
caller_function_name = inspect.currentframe().f_back.f_code.co_name caller_function_name = inspect.currentframe().f_back.f_code.co_name
globals.log(f"Function {caller_function_name} started processing") globals.log(f"Function {caller_function_name} started processing", "DEBUG")
def wfetl(): def wfetl():
""" """
@ -754,7 +931,67 @@ def wfetl():
Writes the calling function to log under the DEBUG category. Writes the calling function to log under the DEBUG category.
""" """
caller_function_name = inspect.currentframe().f_back.f_code.co_name caller_function_name = inspect.currentframe().f_back.f_code.co_name
globals.log(f"Function {caller_function_name} finished processing") globals.log(f"Function {caller_function_name} finished processing", "DEBUG")
async def get_guild_info(bot: discord.Client, guild_id: Union[int, str]) -> dict:
"""
Retrieve information about a Discord guild given its ID.
This asynchronous function attempts to retrieve a guild's information by its ID. The provided
guild_id is first converted to an integer if it is not already one. The function then tries to
obtain the guild from the bot's cache using `bot.get_guild()`. If the guild is not found in the
cache, it fetches the guild from the Discord API using `bot.fetch_guild()`. If the guild cannot be
found or if an error occurs during the fetch, the function raises an appropriate exception.
The function returns a dictionary containing basic information about the guild, including:
- id (int): The unique identifier of the guild.
- name (str): The name of the guild.
- owner_id (int): The user ID of the guild's owner.
- member_count (int): The number of members in the guild.
- icon_url (Optional[str]): The URL of the guild's icon if available.
- created_at (Optional[str]): The ISO-formatted creation date of the guild if available.
Args:
bot (discord.Client): The Discord client instance.
guild_id (Union[int, str]): The ID of the guild. This can be provided as an integer or as a
string that represents an integer.
Returns:
dict: A dictionary containing the guild's information.
Raises:
ValueError: If the provided guild_id is not convertible to an integer or if the guild is not found.
RuntimeError: If an HTTP error occurs while fetching the guild.
"""
# Ensure guild_id is an integer.
try:
guild_id = int(guild_id)
except ValueError:
raise ValueError("guild_id must be an int or a string representing an int.")
# Try to get the guild from the cache first.
guild = bot.get_guild(guild_id)
# If not in cache, try to fetch it from the API.
if guild is None:
try:
guild = await bot.fetch_guild(guild_id)
except discord.NotFound:
raise ValueError("Guild not found.")
except discord.HTTPException as e:
raise RuntimeError(f"An error occurred while fetching the guild: {e}")
# Build a dictionary with some basic information.
info = {
"id": guild.id,
"name": guild.name,
"owner_id": guild.owner_id,
"member_count": guild.member_count,
"icon_url": guild.icon.url if guild.icon else None,
"created_at": guild.created_at.isoformat() if guild.created_at else None,
}
return info
############################################### ###############################################