import time import os import random import json import re import functools import inspect import uuid from typing import Union from modules.db import run_db_operation, lookup_user, log_message import modules.utility as utility import discord from functools import wraps import globals try: # 'regex' on PyPI supports `\p{L}`, `\p{N}`, etc. import regex USE_REGEX_LIB = True except ImportError: # Fallback to Python's built-in 're' if 'regex' isn't installed USE_REGEX_LIB = False DICTIONARY_PATH = "dictionary/" # Path to dictionary files # def monitor_cmds(log_func): # """ # Decorator that logs when a command starts and ends execution. # """ # def decorator(func): # @functools.wraps(func) # async def wrapper(*args, **kwargs): # start_time = time.time() # try: # # Extract a command name from the function name # cmd_name = str(func.__name__).split("_")[1] # log_func(f"Command '{cmd_name}' started execution.", "DEBUG") # # Await the actual command function # result = await func(*args, **kwargs) # end_time = time.time() # cmd_duration = round(end_time - start_time, 2) # log_func(f"Command '{cmd_name}' finished execution after {cmd_duration}s.", "DEBUG") # return result # except Exception as e: # end_time = time.time() # cmd_duration = round(end_time - start_time, 2) # log_func(f"Command '{cmd_name}' FAILED while executing after {cmd_duration}s: {e}", "CRITICAL") # # Explicitly preserve the original signature for slash command introspection # wrapper.__signature__ = inspect.signature(func) # return wrapper # return decorator # def monitor_cmds(): # """ # Decorator that logs when a command starts, completes, or fails. # Uses `ctx.bot.log` dynamically. # """ # def decorator(func): # @functools.wraps(func) # async def wrapper(*args, **kwargs): # start_time = time.time() # try: # ctx = args[0] # First argument should be `ctx` # cmd_name = ctx.command.name if hasattr(ctx, "command") else func.__name__ # log_func = ctx.bot.log # log_func(f"Command '{cmd_name}' started by {ctx.author} in #{ctx.channel}", "DEBUG") # # Execute the command # result = await func(*args, **kwargs) # # Log successful execution # duration = round(time.time() - start_time, 2) # log_func(f"Command '{cmd_name}' finished execution in {duration}s.", "DEBUG") # return result # except Exception as e: # duration = round(time.time() - start_time, 2) # log_func(f"Command execution failed: '{cmd_name}' after {duration}s: {e}", "CRITICAL") # # Fix: Ensure Discord's command system keeps the correct function parameters # wrapper.__signature__ = inspect.signature(func) # return wrapper # return decorator # def monitor_msgs(log_func, db_conn): # """ # Decorator that logs Discord messages and tracks user activity. # Works for both commands and event handlers like on_message(). # """ # def decorator(func): # @functools.wraps(func) # async def wrapper(self, message: discord.Message, *args, **kwargs): # start_time = time.time() # try: # # Ignore bot messages # if message.author.bot: # return # user_id = str(message.author.id) # user_name = message.author.name # display_name = message.author.display_name # # Track user activity # track_user_activity( # db_conn=db_conn, # log_func=log_func, # platform="discord", # user_id=user_id, # username=user_name, # display_name=display_name, # user_is_bot=False # ) # log_func(f"Message from {user_name} in {message.channel} (Guild: {message.guild.name if message.guild else 'DM'})", "DEBUG") # # Fetch user UUID # user_data = lookup_user(db_conn, log_func, identifier=user_id, identifier_type="discord_user_id") # user_uuid = user_data["UUID"] if user_data else "UNKNOWN" # # Extract message details # platform_str = f"discord-{message.guild.name}" if message.guild else "discord-DM" # channel_str = message.channel.name if hasattr(message.channel, "name") else "DM" # attachments = ", ".join(a.url for a in message.attachments) if message.attachments else "" # # Log message in DB # log_message( # db_conn=db_conn, # log_func=log_func, # user_uuid=user_uuid, # message_content=message.content or "", # platform=platform_str, # channel=channel_str, # attachments=attachments # ) # # Call the original `on_message()` function # await func(self, message, *args, **kwargs) # # Ensure the bot processes commands # await self.process_commands(message) # except Exception as e: # log_func(f"Error processing message from {message.author}: {e}", "ERROR") # finally: # duration = round(time.time() - start_time, 2) # log_func(f"Message processing complete in {duration}s.", "DEBUG") # # Preserve original function signature # wrapper.__signature__ = inspect.signature(func) # return wrapper # return decorator def format_uptime(seconds: float) -> tuple[str, int]: """ Convert a duration in seconds into a human-readable string using up to two significant time units. This function breaks down the given number of seconds into larger time units (years, months, days, hours, minutes, and seconds) and returns a string representing the duration using only the two most significant non-zero units. For example: - 192000 seconds might be formatted as "2 days, 5 hours" - 32 minutes will be formatted as "32 minutes" - 37000000 seconds could be formatted as "1 year, 3 months" Args: seconds (float): The total number of seconds to convert. Returns: tuple(str, int): A tuple containing: - A human-readable string representing the duration (using up to two time units). - The original duration in seconds as an integer. Examples: >>> format_uptime(192000) ('2 days, 5 hours', 192000) >>> format_uptime(60) ('1 minute', 60) """ seconds = int(seconds) # Ensure integer seconds seconds_int = seconds # Define time units as tuples of (unit name, seconds per unit) units = [ ("year", 31536000), # 365 days ("month", 2592000), # 30 days ("day", 86400), # 24 hours ("hour", 3600), # 60 minutes ("minute", 60), ("second", 1) ] # Compute time breakdown time_values = [] for unit_name, unit_seconds in units: value, seconds = divmod(seconds, unit_seconds) if value > 0: time_values.append(f"{value} {unit_name}{'s' if value > 1 else ''}") # Auto pluralize # Return only the two most significant time units (e.g., "3 days, 4 hours") return (", ".join(time_values[:2]), seconds_int) if time_values else ("0 seconds", 0) def get_random_reply(dictionary_name: str, category: str, **variables) -> str: """ Fetch a random reply from a specified dictionary file and category. This function loads a JSON file—named after the given dictionary (without the .json extension) and located in the DICTIONARY_PATH directory—and retrieves a random reply from the list of responses under the specified category. It then substitutes any placeholders in the reply string with the provided keyword arguments. If the file does not exist, the JSON is invalid, or the category is missing or not a list, the function returns an appropriate error message. Args: dictionary_name (str): The base name of the dictionary file (without the .json extension). category (str): The key within the JSON file representing the category of replies. **variables: Arbitrary keyword arguments for substituting placeholders in the reply string. Returns: str: A formatted reply string with the variables substituted, or an error message if the file, category, or JSON data is invalid. """ file_path = os.path.join(DICTIONARY_PATH, f"{dictionary_name}.json") # Ensure file exists if not os.path.exists(file_path): return f"[Error: Missing {dictionary_name}.json]" try: with open(file_path, "r", encoding="utf-8") as file: data = json.load(file) except json.JSONDecodeError: return f"[Error: Failed to load {dictionary_name}.json]" # Ensure category exists if category not in data or not isinstance(data[category], list): return f"[Error: No valid entries for {category} in {dictionary_name}.json]" # Select a random reply response = random.choice(data[category]) # Replace placeholders with provided variables return response.format(**variables) ############################## # Basic sanitization # DO NOT RELY SOLELY ON THIS ############################## def sanitize_user_input( user_input: str, usage: str = "GENERAL", max_length: int = 500 ): """ Sanitize user input using a whitelisting approach. This function sanitizes the provided user input by applying a whitelist filter based on the specified usage context. For inputs intended for calculation ("CALC"), it retains only digits, mathematical operators, parentheses, and related characters. For general text ("GENERAL"), it permits a broader range of characters including letters, numbers, punctuation, symbols, and separators. The function also truncates the input to a maximum length, removes unwanted whitespace characters, and applies additional filtering using either a regex library (if available) or a fallback method. Args: user_input (str): The raw input string from the user (e.g., from Twitch or Discord). usage (str, optional): The context for sanitization. Accepted values are: - "CALC": Retain only characters relevant for mathematical expressions. - "GENERAL": Allow typical readable characters and punctuation. Defaults to "GENERAL". max_length (int, optional): The maximum allowed length of the input. Any characters beyond this limit will be truncated. Defaults to 500. Returns: tuple: A tuple in the form: (sanitized_str, sanitization_applied, reason_string, original_str) where: - sanitized_str (str): The sanitized version of the input. - sanitization_applied (bool): True if any modifications were applied; otherwise, False. - reason_string (str): A semicolon-separated string explaining the sanitization steps. - original_str (str): The original, unmodified input string. Security Recommendations: 1) For Database Storage (e.g., MariaDB): - Always use parameterized queries or an ORM with bound parameters. - Do not rely solely on string sanitization to prevent SQL injection. 2) For Code Execution (e.g., using eval): - Avoid using eval/exec on user input. - If execution of user input is required, consider a restricted math parser or an audited sandbox. 3) For HTML Sanitization: - Bleach is deprecated; consider modern alternatives or frameworks that safely sanitize HTML output. - Note: This function does not sanitize HTML tags. """ original_string = str(user_input) reasons = [] sanitization_applied = False # 1. Truncate and remove newlines, tabs, etc. truncated = original_string[:max_length] truncated = re.sub(r"[\r\n\t]+", " ", truncated) sanitized = truncated # 2. Choose how to filter based on usage usage = usage.upper() if usage == "CALC": # Allow digits, +, -, *, /, %, parentheses, decimal points, ^ for exponent, spaces # Remove everything else pattern = r"[^0-9+\-*/%().^ \t]" new_sanitized = re.sub(pattern, "", sanitized) if new_sanitized != sanitized: sanitization_applied = True reasons.append("CALC: Removed non-math characters.") sanitized = new_sanitized else: # GENERAL usage if USE_REGEX_LIB: # Remove ASCII control chars (0-31, 127) first step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized) # Then apply a broad whitelist: # \p{L}: letters; \p{N}: numbers; \p{P}: punctuation; \p{S}: symbols; \p{Z}: separators (including spaces) # This keeps emojis, foreign characters, typical punctuation, etc. pattern = r"[^\p{L}\p{N}\p{P}\p{S}\p{Z}]" new_sanitized = regex.sub(pattern, "", step1) if new_sanitized != sanitized: sanitization_applied = True reasons.append("GENERAL: Removed disallowed chars via regex.") sanitized = new_sanitized else: # Fallback: If 'regex' is not installed, remove control chars and keep ASCII printable only. step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized) pattern = r"[^ -~]" # Keep only ASCII characters 32-126 new_sanitized = re.sub(pattern, "", step1) if new_sanitized != sanitized: sanitization_applied = True reasons.append("GENERAL: Removed non-ASCII or control chars (fallback).") sanitized = new_sanitized # 3. Final trim sanitized = sanitized.strip() # 4. Prepare output reason_string = "; ".join(reasons) return (sanitized, sanitization_applied, reason_string, original_string) ##################### # Help command logic ##################### async def handle_help_command(ctx, command_name, bot, is_discord): """ Provide help text for a specific command or list all commands. This asynchronous function is called by platform-specific help commands to generate and return the appropriate help message. It retrieves the help data stored on the bot (parsed from a JSON file) and checks whether the requested command exists in that data. If no specific command is provided (i.e. `command_name` is None), it returns a list of all loaded commands. Depending on the platform (Discord or Twitch), it formats the help message using the corresponding builder function. Args: ctx: The context from the command invocation (from discord.py or twitchio). command_name (str): The name of the command for which help is requested (e.g., "quote"). If None, a list of all available commands is returned. bot: The current bot instance, which should have a `help_data` attribute containing the help information. is_discord (bool): True if the bot is running on Discord; False if running on Twitch. Returns: str: A formatted help message string. This message may indicate one of the following: - A list of all loaded commands (if no command name is provided), - Detailed help for a specific command (if help data exists for that command), - A notice that the command is loaded but lacks help data, - A notice that the command is deprecated/unloaded, or - An error message if the help data is missing or invalid. Side Effects: Logs warnings or errors if the help data is missing, improperly structured, or if the command is not found. """ # If there's no loaded help_data, we can't do much if not hasattr(bot, "help_data") or not bot.help_data: return await "No help data found." help_data = bot.help_data # The parsed JSON from e.g. help_discord.json if "commands" not in help_data: return "Invalid help data structure (no 'commands' key).\n*This is due to an error with the help file.*" if not command_name: # User typed just "!help" => list all known commands from this bot loaded_cmds = get_loaded_commands(bot, is_discord) if not loaded_cmds: return "I have no commands loaded." else: if is_discord: help_str = f"I currently offer these commands:" for cmd in loaded_cmds: help_str += f"\n- `{cmd}`" help_str += f"\n*Use `!help ` for more details.*" return help_str else: short_list = ", ".join(loaded_cmds) # We can also mention "Use !help [command] for more info." return f"I currently offer these commands: {short_list}. \nUse '!help ' for details." # 1) Check if the command is loaded loaded = (command_name in get_loaded_commands(bot, is_discord)) # 2) Check if it has help info in the JSON cmd_help = help_data["commands"].get(command_name, None) if loaded and cmd_help: # The command is loaded, and we have help info => show it if is_discord: msg = build_discord_help_message(command_name, cmd_help) else: msg = build_twitch_help_message(command_name, cmd_help) return msg elif loaded and not cmd_help: # The command is loaded but no help info => mention that return f"The '{command_name}' command is loaded but has no help info yet." elif (not loaded) and cmd_help: # The command is not loaded, but we have an entry => mention it's unloaded/deprecated return f"The '{command_name}' command is not currently loaded (deprecated or unavailable)." else: # Not loaded, no help info => not found at all return f"I'm sorry, I don't offer a command named '{command_name}'." def initialize_help_data(bot, help_json_path, is_discord): """ Load help data from a JSON file and verify bot commands against it. This function loads help data from the specified JSON file and stores the parsed data in the bot's `help_data` attribute. After loading the data, it cross-checks the commands loaded on the bot with the commands defined in the help file. The function logs warnings for any discrepancies: - If a command is present in the help file but not loaded on the bot, it logs a warning (indicating the command may be deprecated). - If a command is loaded on the bot but missing from the help file, it logs a warning (indicating missing help information). Args: bot: The bot instance, which will have its `help_data` attribute updated. help_json_path (str): The file path to the JSON file containing help data. is_discord (bool): A flag indicating whether the bot is a Discord bot (True) or a Twitch bot (False). Returns: None Side Effects: - Updates the bot's `help_data` attribute with the contents of the JSON file. - Logs warnings or errors if the help file is missing, cannot be parsed, or if there are mismatches between the loaded commands and the help data. """ platform_name = "Discord" if is_discord else "Twitch" if not os.path.exists(help_json_path): globals.log(f"Help file '{help_json_path}' not found. No help data loaded.", "WARNING") bot.help_data = {} return # Load the JSON try: with open(help_json_path, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: globals.log(f"Error parsing help JSON '{help_json_path}': {e}", "ERROR") data = {} bot.help_data = data # Now cross-check the loaded commands vs. the data loaded_cmds = set(get_loaded_commands(bot, is_discord)) if "commands" not in data: globals.log(f"No 'commands' key in {help_json_path}, skipping checks.", "ERROR") return file_cmds = set(data["commands"].keys()) # 1) Commands in file but not loaded missing_cmds = file_cmds - loaded_cmds for cmd in missing_cmds: globals.log(f"Help file has '{cmd}', but it's not loaded on this {platform_name} bot (deprecated?).", "WARNING") # 2) Commands loaded but not in file needed_cmds = loaded_cmds - file_cmds for cmd in needed_cmds: globals.log(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING") def get_loaded_commands(bot, is_discord): """ Retrieve and sort the list of loaded commands from a bot instance. This function examines the provided bot instance and extracts its registered commands. For a Discord bot (when `is_discord` is True), it iterates over `bot.commands` and collects the command names. For a Twitch bot (when `is_discord` is False), it iterates over `bot._commands` and collects the command objects. Throughout the process, the function logs debug and error messages to help trace the execution flow. Finally, it returns the list of commands in sorted order. Args: bot: The bot instance from which to retrieve commands. This may be either a Discord or Twitch bot. is_discord (bool): Indicates whether the bot is a Discord bot. If False, the function assumes the bot is a Twitch bot. Returns: list: A sorted list of commands. For a Discord bot, this list contains command names (strings); for a Twitch bot, it contains command objects as stored in `bot._commands`. Side Effects: Logs debug, warning, and error messages regarding the command processing. """ from discord.ext import commands as discord_commands from twitchio.ext import commands as twitch_commands commands_list = [] try: _bot_type = str(type(bot)).split("_")[1].split(".")[0] globals.log(f"Currently processing commands for {_bot_type} ...", "DEBUG") except Exception as e: globals.log(f"Unable to determine current bot type: {e}", "WARNING") # For Discord if is_discord: try: # 'bot.commands' is a set of Command objects. for cmd_obj in bot.commands: commands_list.append(cmd_obj.name) debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING" globals.log(f"Discord commands body: {commands_list}", f"{debug_level}") except Exception as e: globals.log(f"Error retrieving Discord commands: {e}", "ERROR") elif not is_discord: try: for cmd_obj in bot._commands: commands_list.append(cmd_obj) debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING" globals.log(f"Twitch commands body: {commands_list}", f"{debug_level}") except Exception as e: globals.log(f"Error retrieving Twitch commands: {e}", "ERROR") else: globals.log(f"Unable to determine platform in 'get_loaded_commands()'!", "CRITICAL") globals.log(f"... Finished processing commands for {_bot_type} ...", "DEBUG") return sorted(commands_list) def build_discord_help_message(cmd_name, cmd_help_dict): """ Build a verbose, multi-line help message for Discord. This function constructs a detailed help message for a Discord command using the provided command name and a dictionary of help information. The dictionary may include a description, subcommands with their arguments and descriptions, and example usage scenarios. The generated message is formatted as a multi-line string with Markdown styling for use on Discord. Args: cmd_name (str): The name of the command. cmd_help_dict (dict): A dictionary containing help details for the command. Expected keys include: - "description" (str): A detailed description of the command. - "subcommands" (dict, optional): A mapping of subcommand names to a dictionary with keys "args" (str) and "desc" (str) representing the arguments and a short description of the subcommand. - "examples" (list, optional): A list of example strings demonstrating how to use the command. Each example should be a string that optionally contains a description after a " : " separator. Returns: str: A multi-line string formatted for Discord that includes the command description, subcommands (if any), and example usage. """ description = cmd_help_dict.get("description", "No description available.\n") subcommands = cmd_help_dict.get("subcommands", {}) examples = cmd_help_dict.get("examples", []) lines = [f"**Help for `{cmd_name}`**:", f"Description: {description}"] if subcommands: lines.append("\n**Subcommands / Arguments:**") for sub, detail in subcommands.items(): arg_part = detail.get("args", "") desc_part = detail.get("desc", "") lines.append(f"• **{sub}** {arg_part} **->** {desc_part}") else: lines.append("\n*No subcommands defined.*") if examples: lines.append("\nExample usage:") for ex in examples: ex_arr = ex.split(" : ", 1) # Split into max 2 parts # Handle missing description case ex_cmd = ex_arr[0] ex_note = f"\n {ex_arr[1]}" if len(ex_arr) > 1 else "" lines.append(f"- `{ex_cmd}`{ex_note}") return "\n".join(lines) def build_twitch_help_message(cmd_name, cmd_help_dict): """ Build a concise Twitch help message for a command. This function constructs a help message string for a Twitch command based on the provided command name and a dictionary of help details. The help dictionary should contain a "description" of the command and may include a "subcommands" mapping. For each subcommand, the function builds usage examples by appending the subcommand name and its arguments (if any) to the base command. If no subcommands are defined, the usage defaults to the base command. Args: cmd_name (str): The name of the command (without the "!" prefix). cmd_help_dict (dict): A dictionary containing help details for the command. Expected keys include: - "description" (str): A brief description of the command. - "subcommands" (dict, optional): A mapping of subcommand names to their details. Each value should be a dictionary that may contain an "args" key (str) representing additional command arguments. Returns: str: A formatted help message string that includes the command description and a usage line with examples of how to invoke the command and its subcommands. """ description = cmd_help_dict.get("description", "No description available.") subcommands = cmd_help_dict.get("subcommands", {}) sub_line_parts = [] for sub, detail in subcommands.items(): if sub == "no subcommand": sub_line_parts.append(f"!{cmd_name}") else: arg_part = detail.get("args", "") if arg_part: sub_line_parts.append(f"!{cmd_name} {sub} {arg_part}".strip()) else: sub_line_parts.append(f"!{cmd_name} {sub}".strip()) usage_line = " | ".join(sub_line_parts) if sub_line_parts else f"!{cmd_name}" return f"Help for !{cmd_name}: {description}. Usage: {usage_line}" async def send_message(ctx, text): """ Minimal helper to send a message to either Discord or Twitch. """ await ctx.send(text) import uuid from modules.db import run_db_operation, lookup_user import globals def track_user_activity( db_conn, platform: str, user_id: str|int, username: str, display_name: str, user_is_bot: bool = False ): """ Tracks or updates a user in the database. This function: - Checks if the user already exists in Platform_Mapping. - If found, updates username/display_name if changed. - If not found, adds a new user and platform mapping entry. Args: db_conn: Active database connection. platform (str): The platform of the user ("discord" or "twitch"). user_id (str): The unique user identifier on the platform. username (str): The platform-specific username. display_name (str): The platform-specific display name. user_is_bot (bool, optional): Indicates if the user is a bot. Defaults to False. Returns: None """ platform = platform.lower() valid_platforms = {"discord", "twitch"} if platform not in valid_platforms: globals.log(f"Unknown platform '{platform}' in track_user_activity!", "WARNING") return # Look up user by platform-specific ID in Platform_Mapping user_data = lookup_user(db_conn, identifier=user_id, identifier_type=f"{platform}_user_id") if user_data: # Existing user found, update info if necessary user_uuid = user_data["UUID"] need_update = False column_updates = [] params = [] if user_data["platform_username"] != username: need_update = True column_updates.append("Username = ?") params.append(username) if user_data["platform_display_name"] != display_name: need_update = True column_updates.append("Display_Name = ?") params.append(display_name) if need_update: update_sql = f""" UPDATE Platform_Mapping SET {", ".join(column_updates)} WHERE Platform_User_ID = ? AND Platform_Type = ? """ params.extend([user_id, platform.capitalize()]) rowcount = run_db_operation(db_conn, "update", update_sql, params) if rowcount and rowcount > 0: globals.log(f"Updated {platform.capitalize()} user '{username}' (display '{display_name}') in Platform_Mapping.", "DEBUG") return # If user was not found in Platform_Mapping, check Users table user_uuid = str(uuid.uuid4()) insert_user_sql = """ INSERT INTO Users (UUID, Unified_Username, user_is_banned, user_is_bot) VALUES (?, ?, 0, ?) """ run_db_operation(db_conn, "write", insert_user_sql, (user_uuid, username, int(user_is_bot))) # Insert into Platform_Mapping insert_mapping_sql = """ INSERT INTO Platform_Mapping (Platform_User_ID, Platform_Type, UUID, Display_Name, Username) VALUES (?, ?, ?, ?, ?) """ params = (user_id, platform.capitalize(), user_uuid, display_name, username) rowcount = run_db_operation(db_conn, "write", insert_mapping_sql, params) if rowcount and rowcount > 0: globals.log(f"Created new user entry for {platform} user '{username}' (display '{display_name}') with UUID={user_uuid}.", "DEBUG") else: globals.log(f"Failed to create user entry for {platform} user '{username}'.", "ERROR") from modules.db import log_bot_event def log_bot_startup(db_conn): """ Logs a bot startup event. """ log_bot_event(db_conn, "BOT_STARTUP", "Bot successfully started.") def log_bot_shutdown(db_conn, intent: str = "Error/Crash"): """ Logs a bot shutdown event. """ log_bot_event(db_conn, "BOT_SHUTDOWN", f"Bot is shutting down - {intent}.") def generate_link_code(): """Generates a unique 8-character alphanumeric link code.""" import random, string return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) def time_since(start, end=None, format=None): """ Calculate and format the elapsed time between two epoch timestamps. This function computes the elapsed time between a starting epoch timestamp and an ending epoch timestamp. If no end time is provided, the current time is used. The elapsed time is formatted according to the specified unit: - 's': whole seconds and remaining milliseconds. - 'm': whole minutes and remaining seconds. - 'h': whole hours and remaining minutes. - 'd': whole days and remaining hours. If an invalid format is given, a warning is logged and the default format 's' is used. Args: start (float): The starting epoch timestamp (in seconds). If None, an error is logged and the function returns None. end (float, optional): The ending epoch timestamp (in seconds). Defaults to the current time. format (str, optional): A single character indicating the desired format of the elapsed time. Must be one of: 's' for seconds (with milliseconds), 'm' for minutes (with seconds), 'h' for hours (with minutes), or 'd' for days (with hours). Defaults to 's' if an invalid value is provided. Returns: tuple: A tuple (x, y, total_elapsed) where: - For 's': x is a string representing whole seconds (with "s" appended) and y is a string representing the remaining milliseconds (with "ms" appended). - For 'm': x is a string representing whole minutes (with "m" appended) and y is a string representing the remaining seconds (with "s" appended). - For 'h': x is a string representing whole hours (with "h" appended) and y is a string representing the remaining minutes (with "m" appended). - For 'd': x is a string representing whole days (with "d" appended) and y is a string representing the remaining hours (with "h" appended). - total_elapsed (float): The total elapsed time in seconds. """ # Ensure a start time is provided. if start is None: globals.log("time_since() lacks a start value!", "ERROR") return None # If no end time is provided, use the current time. if end is None: end = time.time() # Default to seconds if format is not valid. if format not in ["s", "m", "h", "d"]: globals.log("time_since() has incorrect format string. Defaulting to 's'", "WARNING") format = "s" # Compute the total elapsed time in seconds. since = end - start # Break down the elapsed time according to the requested format. if format == "s": # Format as seconds: whole seconds and remainder in milliseconds. x = int(since) y = int((since - x) * 1000) x = str(x) + "s" y = str(y) + "ms" elif format == "m": # Format as minutes: whole minutes and remainder in seconds. x = int(since // 60) y = int(since % 60) x = str(x) + "m" y = str(y) + "s" elif format == "h": # Format as hours: whole hours and remainder in minutes. x = int(since // 3600) y = int((since % 3600) // 60) x = str(x) + "h" y = str(y) + "m" elif format == "d": # Format as days: whole days and remainder in hours. x = int(since // 86400) y = int((since % 86400) // 3600) x = str(x) + "d" y = str(y) + "h" return (x, y, since) def wfstl(): """ Write Function Start To Log (debug) Writes the calling function to log under the DEBUG category. """ caller_function_name = inspect.currentframe().f_back.f_code.co_name globals.log(f"Function {caller_function_name} started processing", "DEBUG") def wfetl(): """ Write Function End To Log (debug) Writes the calling function to log under the DEBUG category. """ caller_function_name = inspect.currentframe().f_back.f_code.co_name globals.log(f"Function {caller_function_name} finished processing", "DEBUG") async def get_guild_info(bot: discord.Client, guild_id: Union[int, str]) -> dict: """ Retrieve information about a Discord guild given its ID. This asynchronous function attempts to retrieve a guild's information by its ID. The provided guild_id is first converted to an integer if it is not already one. The function then tries to obtain the guild from the bot's cache using `bot.get_guild()`. If the guild is not found in the cache, it fetches the guild from the Discord API using `bot.fetch_guild()`. If the guild cannot be found or if an error occurs during the fetch, the function raises an appropriate exception. The function returns a dictionary containing basic information about the guild, including: - id (int): The unique identifier of the guild. - name (str): The name of the guild. - owner_id (int): The user ID of the guild's owner. - member_count (int): The number of members in the guild. - icon_url (Optional[str]): The URL of the guild's icon if available. - created_at (Optional[str]): The ISO-formatted creation date of the guild if available. Args: bot (discord.Client): The Discord client instance. guild_id (Union[int, str]): The ID of the guild. This can be provided as an integer or as a string that represents an integer. Returns: dict: A dictionary containing the guild's information. Raises: ValueError: If the provided guild_id is not convertible to an integer or if the guild is not found. RuntimeError: If an HTTP error occurs while fetching the guild. """ # Ensure guild_id is an integer. try: guild_id = int(guild_id) except ValueError: raise ValueError("guild_id must be an int or a string representing an int.") # Try to get the guild from the cache first. guild = bot.get_guild(guild_id) # If not in cache, try to fetch it from the API. if guild is None: try: guild = await bot.fetch_guild(guild_id) except discord.NotFound: raise ValueError("Guild not found.") except discord.HTTPException as e: raise RuntimeError(f"An error occurred while fetching the guild: {e}") # Build a dictionary with some basic information. info = { "id": guild.id, "name": guild.name, "owner_id": guild.owner_id, "member_count": guild.member_count, "icon_url": guild.icon.url if guild.icon else None, "created_at": guild.created_at.isoformat() if guild.created_at else None, } return info async def get_current_twitch_game(bot, channel_name: str) -> str: """ Retrieve the name of the game currently being played on the given Twitch channel. Parameters: bot: A TwitchIO bot instance (or any object with a `fetch_streams` method). channel_name (str): The Twitch channel's username. Returns: str: The game name as string """ # Fetch stream data for the specified channel. streams = await bot.fetch_streams(user_logins=[channel_name]) if streams: # Assume the first stream is the one we're interested in. stream = streams[0] # Depending on your TwitchIO version, the attribute may be `game_name` or `game`. game_name = getattr(stream, 'game_name') game_id = getattr(stream, 'game_id') globals.log(f"'get_current_twitch_game()' result for Twitch channel '{channel_name}': Game ID: {game_id}, Game Name: {game_name}", "DEBUG") return game_name return "" async def is_channel_live(bot=None, channel_name="ookamikuntv") -> bool: """ Returns True if the specified channel is live, otherwise False. Defaults to "ookamikuntv" if no channel_name is provided. """ streams = await bot.fetch_streams(user_logins=[channel_name]) if bot else [] return bool(streams) def list_channels(self): """Logs the names of all connected Twitch channels.""" channels_list = [channel.name for channel in self.connected_channels] connected_channels_str = ", ".join(channels_list) num_connected_channels = len(channels_list) globals.log( f"Currently connected to {num_connected_channels} Twitch channel(s): {connected_channels_str}" ) def command_allowed_twitch(func): """ A custom check that allows a command to run based on channel settings. It looks up the current channel in CHANNEL_CONFIG and either allows or denies the command based on the filter mode and list. """ @wraps(func) async def wrapper(ctx, *args, **kwargs): # Load the full configuration. full_config = globals.constants.twitch_channels_config() # Get the channel name and then the channel-specific configuration. channel_name = ctx.channel.name.lower() channel_config = full_config.get(channel_name) # If there's no configuration for this channel, block the command. if not channel_config: globals.log(f"No configuration found for Twitch channel '{channel_name}'. Blocking command '{ctx.command.name}'.") return mode = channel_config.get("commands_filter_mode") filtered = channel_config.get("commands_filtered", []) command_name = ctx.command.name # Check based on filter mode. if mode == "exclude": if command_name in filtered: globals.log(f"Command '{command_name}' is excluded on Twitch channel '{channel_name}'.") return elif mode == "include": if command_name not in filtered: globals.log(f"Command '{command_name}' is not allowed on Twitch channel '{channel_name}' (include mode).") return # If all checks pass, run the command. return await func(ctx, *args, **kwargs) return wrapper ############################################### # Development Test Function (called upon start) ############################################### def dev_func(db_conn, enable: bool = False): if enable: id = "203190147582394369" id_type = "discord_user_id" uui_info = lookup_user(db_conn, identifier=id, identifier_type=id_type) if uui_info: return list(uui_info.values()) else: return f"User with identifier '{id}' ({id_type}) not found"