OokamiPupV2/modules/utility.py

1057 lines
44 KiB
Python

import time
import os
import random
import json
import re
import functools
import inspect
import uuid
from typing import Union
from modules.db import run_db_operation, lookup_user, log_message
import modules.utility as utility
import discord
from functools import wraps
import globals
try:
# 'regex' on PyPI supports `\p{L}`, `\p{N}`, etc.
import regex
USE_REGEX_LIB = True
except ImportError:
# Fallback to Python's built-in 're' if 'regex' isn't installed
USE_REGEX_LIB = False
DICTIONARY_PATH = "dictionary/" # Path to dictionary files
# def monitor_cmds(log_func):
# """
# Decorator that logs when a command starts and ends execution.
# """
# def decorator(func):
# @functools.wraps(func)
# async def wrapper(*args, **kwargs):
# start_time = time.time()
# try:
# # Extract a command name from the function name
# cmd_name = str(func.__name__).split("_")[1]
# log_func(f"Command '{cmd_name}' started execution.", "DEBUG")
# # Await the actual command function
# result = await func(*args, **kwargs)
# end_time = time.time()
# cmd_duration = round(end_time - start_time, 2)
# log_func(f"Command '{cmd_name}' finished execution after {cmd_duration}s.", "DEBUG")
# return result
# except Exception as e:
# end_time = time.time()
# cmd_duration = round(end_time - start_time, 2)
# log_func(f"Command '{cmd_name}' FAILED while executing after {cmd_duration}s: {e}", "CRITICAL")
# # Explicitly preserve the original signature for slash command introspection
# wrapper.__signature__ = inspect.signature(func)
# return wrapper
# return decorator
# def monitor_cmds():
# """
# Decorator that logs when a command starts, completes, or fails.
# Uses `ctx.bot.log` dynamically.
# """
# def decorator(func):
# @functools.wraps(func)
# async def wrapper(*args, **kwargs):
# start_time = time.time()
# try:
# ctx = args[0] # First argument should be `ctx`
# cmd_name = ctx.command.name if hasattr(ctx, "command") else func.__name__
# log_func = ctx.bot.log
# log_func(f"Command '{cmd_name}' started by {ctx.author} in #{ctx.channel}", "DEBUG")
# # Execute the command
# result = await func(*args, **kwargs)
# # Log successful execution
# duration = round(time.time() - start_time, 2)
# log_func(f"Command '{cmd_name}' finished execution in {duration}s.", "DEBUG")
# return result
# except Exception as e:
# duration = round(time.time() - start_time, 2)
# log_func(f"Command execution failed: '{cmd_name}' after {duration}s: {e}", "CRITICAL")
# # Fix: Ensure Discord's command system keeps the correct function parameters
# wrapper.__signature__ = inspect.signature(func)
# return wrapper
# return decorator
# def monitor_msgs(log_func, db_conn):
# """
# Decorator that logs Discord messages and tracks user activity.
# Works for both commands and event handlers like on_message().
# """
# def decorator(func):
# @functools.wraps(func)
# async def wrapper(self, message: discord.Message, *args, **kwargs):
# start_time = time.time()
# try:
# # Ignore bot messages
# if message.author.bot:
# return
# user_id = str(message.author.id)
# user_name = message.author.name
# display_name = message.author.display_name
# # Track user activity
# track_user_activity(
# db_conn=db_conn,
# log_func=log_func,
# platform="discord",
# user_id=user_id,
# username=user_name,
# display_name=display_name,
# user_is_bot=False
# )
# log_func(f"Message from {user_name} in {message.channel} (Guild: {message.guild.name if message.guild else 'DM'})", "DEBUG")
# # Fetch user UUID
# user_data = lookup_user(db_conn, log_func, identifier=user_id, identifier_type="discord_user_id")
# user_uuid = user_data["UUID"] if user_data else "UNKNOWN"
# # Extract message details
# platform_str = f"discord-{message.guild.name}" if message.guild else "discord-DM"
# channel_str = message.channel.name if hasattr(message.channel, "name") else "DM"
# attachments = ", ".join(a.url for a in message.attachments) if message.attachments else ""
# # Log message in DB
# log_message(
# db_conn=db_conn,
# log_func=log_func,
# user_uuid=user_uuid,
# message_content=message.content or "",
# platform=platform_str,
# channel=channel_str,
# attachments=attachments
# )
# # Call the original `on_message()` function
# await func(self, message, *args, **kwargs)
# # Ensure the bot processes commands
# await self.process_commands(message)
# except Exception as e:
# log_func(f"Error processing message from {message.author}: {e}", "ERROR")
# finally:
# duration = round(time.time() - start_time, 2)
# log_func(f"Message processing complete in {duration}s.", "DEBUG")
# # Preserve original function signature
# wrapper.__signature__ = inspect.signature(func)
# return wrapper
# return decorator
def format_uptime(seconds: float) -> tuple[str, int]:
"""
Convert a duration in seconds into a human-readable string using up to two significant time units.
This function breaks down the given number of seconds into larger time units (years, months,
days, hours, minutes, and seconds) and returns a string representing the duration using only the
two most significant non-zero units. For example:
- 192000 seconds might be formatted as "2 days, 5 hours"
- 32 minutes will be formatted as "32 minutes"
- 37000000 seconds could be formatted as "1 year, 3 months"
Args:
seconds (float): The total number of seconds to convert.
Returns:
tuple(str, int):
A tuple containing:
- A human-readable string representing the duration (using up to two time units).
- The original duration in seconds as an integer.
Examples:
>>> format_uptime(192000)
('2 days, 5 hours', 192000)
>>> format_uptime(60)
('1 minute', 60)
"""
seconds = int(seconds) # Ensure integer seconds
seconds_int = seconds
# Define time units as tuples of (unit name, seconds per unit)
units = [
("year", 31536000), # 365 days
("month", 2592000), # 30 days
("day", 86400), # 24 hours
("hour", 3600), # 60 minutes
("minute", 60),
("second", 1)
]
# Compute time breakdown
time_values = []
for unit_name, unit_seconds in units:
value, seconds = divmod(seconds, unit_seconds)
if value > 0:
time_values.append(f"{value} {unit_name}{'s' if value > 1 else ''}") # Auto pluralize
# Return only the two most significant time units (e.g., "3 days, 4 hours")
return (", ".join(time_values[:2]), seconds_int) if time_values else ("0 seconds", 0)
def get_random_reply(dictionary_name: str, category: str, **variables) -> str:
"""
Fetch a random reply from a specified dictionary file and category.
This function loads a JSON file—named after the given dictionary (without the .json extension)
and located in the DICTIONARY_PATH directory—and retrieves a random reply from the list of
responses under the specified category. It then substitutes any placeholders in the reply
string with the provided keyword arguments. If the file does not exist, the JSON is invalid,
or the category is missing or not a list, the function returns an appropriate error message.
Args:
dictionary_name (str): The base name of the dictionary file (without the .json extension).
category (str): The key within the JSON file representing the category of replies.
**variables: Arbitrary keyword arguments for substituting placeholders in the reply string.
Returns:
str: A formatted reply string with the variables substituted, or an error message if the
file, category, or JSON data is invalid.
"""
file_path = os.path.join(DICTIONARY_PATH, f"{dictionary_name}.json")
# Ensure file exists
if not os.path.exists(file_path):
return f"[Error: Missing {dictionary_name}.json]"
try:
with open(file_path, "r", encoding="utf-8") as file:
data = json.load(file)
except json.JSONDecodeError:
return f"[Error: Failed to load {dictionary_name}.json]"
# Ensure category exists
if category not in data or not isinstance(data[category], list):
return f"[Error: No valid entries for {category} in {dictionary_name}.json]"
# Select a random reply
response = random.choice(data[category])
# Replace placeholders with provided variables
return response.format(**variables)
##############################
# Basic sanitization
# DO NOT RELY SOLELY ON THIS
##############################
def sanitize_user_input(
user_input: str,
usage: str = "GENERAL",
max_length: int = 500
):
"""
Sanitize user input using a whitelisting approach.
This function sanitizes the provided user input by applying a whitelist filter based on
the specified usage context. For inputs intended for calculation ("CALC"), it retains only
digits, mathematical operators, parentheses, and related characters. For general text
("GENERAL"), it permits a broader range of characters including letters, numbers, punctuation,
symbols, and separators. The function also truncates the input to a maximum length, removes
unwanted whitespace characters, and applies additional filtering using either a regex library
(if available) or a fallback method.
Args:
user_input (str): The raw input string from the user (e.g., from Twitch or Discord).
usage (str, optional): The context for sanitization. Accepted values are:
- "CALC": Retain only characters relevant for mathematical expressions.
- "GENERAL": Allow typical readable characters and punctuation.
Defaults to "GENERAL".
max_length (int, optional): The maximum allowed length of the input. Any characters beyond
this limit will be truncated. Defaults to 500.
Returns:
tuple: A tuple in the form:
(sanitized_str, sanitization_applied, reason_string, original_str)
where:
- sanitized_str (str): The sanitized version of the input.
- sanitization_applied (bool): True if any modifications were applied; otherwise, False.
- reason_string (str): A semicolon-separated string explaining the sanitization steps.
- original_str (str): The original, unmodified input string.
Security Recommendations:
1) For Database Storage (e.g., MariaDB):
- Always use parameterized queries or an ORM with bound parameters.
- Do not rely solely on string sanitization to prevent SQL injection.
2) For Code Execution (e.g., using eval):
- Avoid using eval/exec on user input.
- If execution of user input is required, consider a restricted math parser or an audited sandbox.
3) For HTML Sanitization:
- Bleach is deprecated; consider modern alternatives or frameworks that safely sanitize HTML output.
- Note: This function does not sanitize HTML tags.
"""
original_string = str(user_input)
reasons = []
sanitization_applied = False
# 1. Truncate and remove newlines, tabs, etc.
truncated = original_string[:max_length]
truncated = re.sub(r"[\r\n\t]+", " ", truncated)
sanitized = truncated
# 2. Choose how to filter based on usage
usage = usage.upper()
if usage == "CALC":
# Allow digits, +, -, *, /, %, parentheses, decimal points, ^ for exponent, spaces
# Remove everything else
pattern = r"[^0-9+\-*/%().^ \t]"
new_sanitized = re.sub(pattern, "", sanitized)
if new_sanitized != sanitized:
sanitization_applied = True
reasons.append("CALC: Removed non-math characters.")
sanitized = new_sanitized
else: # GENERAL usage
if USE_REGEX_LIB:
# Remove ASCII control chars (0-31, 127) first
step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized)
# Then apply a broad whitelist:
# \p{L}: letters; \p{N}: numbers; \p{P}: punctuation; \p{S}: symbols; \p{Z}: separators (including spaces)
# This keeps emojis, foreign characters, typical punctuation, etc.
pattern = r"[^\p{L}\p{N}\p{P}\p{S}\p{Z}]"
new_sanitized = regex.sub(pattern, "", step1)
if new_sanitized != sanitized:
sanitization_applied = True
reasons.append("GENERAL: Removed disallowed chars via regex.")
sanitized = new_sanitized
else:
# Fallback: If 'regex' is not installed, remove control chars and keep ASCII printable only.
step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized)
pattern = r"[^ -~]" # Keep only ASCII characters 32-126
new_sanitized = re.sub(pattern, "", step1)
if new_sanitized != sanitized:
sanitization_applied = True
reasons.append("GENERAL: Removed non-ASCII or control chars (fallback).")
sanitized = new_sanitized
# 3. Final trim
sanitized = sanitized.strip()
# 4. Prepare output
reason_string = "; ".join(reasons)
return (sanitized, sanitization_applied, reason_string, original_string)
#####################
# Help command logic
#####################
async def handle_help_command(ctx, command_name, bot, is_discord):
"""
Provide help text for a specific command or list all commands.
This asynchronous function is called by platform-specific help commands to generate and
return the appropriate help message. It retrieves the help data stored on the bot (parsed
from a JSON file) and checks whether the requested command exists in that data. If no
specific command is provided (i.e. `command_name` is None), it returns a list of all loaded
commands. Depending on the platform (Discord or Twitch), it formats the help message using
the corresponding builder function.
Args:
ctx: The context from the command invocation (from discord.py or twitchio).
command_name (str): The name of the command for which help is requested (e.g., "quote"). If
None, a list of all available commands is returned.
bot: The current bot instance, which should have a `help_data` attribute containing the help
information.
is_discord (bool): True if the bot is running on Discord; False if running on Twitch.
Returns:
str: A formatted help message string. This message may indicate one of the following:
- A list of all loaded commands (if no command name is provided),
- Detailed help for a specific command (if help data exists for that command),
- A notice that the command is loaded but lacks help data,
- A notice that the command is deprecated/unloaded, or
- An error message if the help data is missing or invalid.
Side Effects:
Logs warnings or errors if the help data is missing, improperly structured, or if the
command is not found.
"""
# If there's no loaded help_data, we can't do much
if not hasattr(bot, "help_data") or not bot.help_data:
return await "No help data found."
help_data = bot.help_data # The parsed JSON from e.g. help_discord.json
if "commands" not in help_data:
return "Invalid help data structure (no 'commands' key).\n*This is due to an error with the help file.*"
if not command_name:
# User typed just "!help" => list all known commands from this bot
loaded_cmds = get_loaded_commands(bot, is_discord)
if not loaded_cmds:
return "I have no commands loaded."
else:
if is_discord:
help_str = f"I currently offer these commands:"
for cmd in loaded_cmds:
help_str += f"\n- `{cmd}`"
help_str += f"\n*Use `!help <command>` for more details.*"
return help_str
else:
short_list = ", ".join(loaded_cmds)
# We can also mention "Use !help [command] for more info."
return f"I currently offer these commands: {short_list}. \nUse '!help <command>' for details."
# 1) Check if the command is loaded
loaded = (command_name in get_loaded_commands(bot, is_discord))
# 2) Check if it has help info in the JSON
cmd_help = help_data["commands"].get(command_name, None)
if loaded and cmd_help:
# The command is loaded, and we have help info => show it
if is_discord:
msg = build_discord_help_message(command_name, cmd_help)
else:
msg = build_twitch_help_message(command_name, cmd_help)
return msg
elif loaded and not cmd_help:
# The command is loaded but no help info => mention that
return f"The '{command_name}' command is loaded but has no help info yet."
elif (not loaded) and cmd_help:
# The command is not loaded, but we have an entry => mention it's unloaded/deprecated
return f"The '{command_name}' command is not currently loaded (deprecated or unavailable)."
else:
# Not loaded, no help info => not found at all
return f"I'm sorry, I don't offer a command named '{command_name}'."
def initialize_help_data(bot, help_json_path, is_discord):
"""
Load help data from a JSON file and verify bot commands against it.
This function loads help data from the specified JSON file and stores the parsed data
in the bot's `help_data` attribute. After loading the data, it cross-checks the commands
loaded on the bot with the commands defined in the help file. The function logs warnings
for any discrepancies:
- If a command is present in the help file but not loaded on the bot, it logs a warning
(indicating the command may be deprecated).
- If a command is loaded on the bot but missing from the help file, it logs a warning
(indicating missing help information).
Args:
bot: The bot instance, which will have its `help_data` attribute updated.
help_json_path (str): The file path to the JSON file containing help data.
is_discord (bool): A flag indicating whether the bot is a Discord bot (True) or a Twitch bot (False).
Returns:
None
Side Effects:
- Updates the bot's `help_data` attribute with the contents of the JSON file.
- Logs warnings or errors if the help file is missing, cannot be parsed, or if there are mismatches
between the loaded commands and the help data.
"""
platform_name = "Discord" if is_discord else "Twitch"
if not os.path.exists(help_json_path):
globals.log(f"Help file '{help_json_path}' not found. No help data loaded.", "WARNING")
bot.help_data = {}
return
# Load the JSON
try:
with open(help_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
globals.log(f"Error parsing help JSON '{help_json_path}': {e}", "ERROR")
data = {}
bot.help_data = data
# Now cross-check the loaded commands vs. the data
loaded_cmds = set(get_loaded_commands(bot, is_discord))
if "commands" not in data:
globals.log(f"No 'commands' key in {help_json_path}, skipping checks.", "ERROR")
return
file_cmds = set(data["commands"].keys())
# 1) Commands in file but not loaded
missing_cmds = file_cmds - loaded_cmds
for cmd in missing_cmds:
globals.log(f"Help file has '{cmd}', but it's not loaded on this {platform_name} bot (deprecated?).", "WARNING")
# 2) Commands loaded but not in file
needed_cmds = loaded_cmds - file_cmds
for cmd in needed_cmds:
globals.log(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING")
def get_loaded_commands(bot, is_discord):
"""
Retrieve and sort the list of loaded commands from a bot instance.
This function examines the provided bot instance and extracts its registered commands.
For a Discord bot (when `is_discord` is True), it iterates over `bot.commands` and collects
the command names. For a Twitch bot (when `is_discord` is False), it iterates over `bot._commands`
and collects the command objects. Throughout the process, the function logs debug and error
messages to help trace the execution flow. Finally, it returns the list of commands in sorted order.
Args:
bot: The bot instance from which to retrieve commands. This may be either a Discord or Twitch bot.
is_discord (bool): Indicates whether the bot is a Discord bot. If False, the function assumes the bot
is a Twitch bot.
Returns:
list: A sorted list of commands. For a Discord bot, this list contains command names (strings);
for a Twitch bot, it contains command objects as stored in `bot._commands`.
Side Effects:
Logs debug, warning, and error messages regarding the command processing.
"""
from discord.ext import commands as discord_commands
from twitchio.ext import commands as twitch_commands
commands_list = []
try:
_bot_type = str(type(bot)).split("_")[1].split(".")[0]
globals.log(f"Currently processing commands for {_bot_type} ...", "DEBUG")
except Exception as e:
globals.log(f"Unable to determine current bot type: {e}", "WARNING")
# For Discord
if is_discord:
try:
# 'bot.commands' is a set of Command objects.
for cmd_obj in bot.commands:
commands_list.append(cmd_obj.name)
debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING"
globals.log(f"Discord commands body: {commands_list}", f"{debug_level}")
except Exception as e:
globals.log(f"Error retrieving Discord commands: {e}", "ERROR")
elif not is_discord:
try:
for cmd_obj in bot._commands:
commands_list.append(cmd_obj)
debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING"
globals.log(f"Twitch commands body: {commands_list}", f"{debug_level}")
except Exception as e:
globals.log(f"Error retrieving Twitch commands: {e}", "ERROR")
else:
globals.log(f"Unable to determine platform in 'get_loaded_commands()'!", "CRITICAL")
globals.log(f"... Finished processing commands for {_bot_type} ...", "DEBUG")
return sorted(commands_list)
def build_discord_help_message(cmd_name, cmd_help_dict):
"""
Build a verbose, multi-line help message for Discord.
This function constructs a detailed help message for a Discord command using the
provided command name and a dictionary of help information. The dictionary may include
a description, subcommands with their arguments and descriptions, and example usage
scenarios. The generated message is formatted as a multi-line string with Markdown
styling for use on Discord.
Args:
cmd_name (str): The name of the command.
cmd_help_dict (dict): A dictionary containing help details for the command.
Expected keys include:
- "description" (str): A detailed description of the command.
- "subcommands" (dict, optional): A mapping of subcommand names to a dictionary
with keys "args" (str) and "desc" (str) representing the arguments and a short
description of the subcommand.
- "examples" (list, optional): A list of example strings demonstrating how to use
the command. Each example should be a string that optionally contains a description
after a " : " separator.
Returns:
str: A multi-line string formatted for Discord that includes the command description,
subcommands (if any), and example usage.
"""
description = cmd_help_dict.get("description", "No description available.\n")
subcommands = cmd_help_dict.get("subcommands", {})
examples = cmd_help_dict.get("examples", [])
lines = [f"**Help for `{cmd_name}`**:",
f"Description: {description}"]
if subcommands:
lines.append("\n**Subcommands / Arguments:**")
for sub, detail in subcommands.items():
arg_part = detail.get("args", "")
desc_part = detail.get("desc", "")
lines.append(f"• **{sub}** {arg_part} **->** {desc_part}")
else:
lines.append("\n*No subcommands defined.*")
if examples:
lines.append("\nExample usage:")
for ex in examples:
ex_arr = ex.split(" : ", 1) # Split into max 2 parts
# Handle missing description case
ex_cmd = ex_arr[0]
ex_note = f"\n {ex_arr[1]}" if len(ex_arr) > 1 else ""
lines.append(f"- `{ex_cmd}`{ex_note}")
return "\n".join(lines)
def build_twitch_help_message(cmd_name, cmd_help_dict):
"""
Build a concise Twitch help message for a command.
This function constructs a help message string for a Twitch command based on the
provided command name and a dictionary of help details. The help dictionary should
contain a "description" of the command and may include a "subcommands" mapping. For
each subcommand, the function builds usage examples by appending the subcommand name
and its arguments (if any) to the base command. If no subcommands are defined, the
usage defaults to the base command.
Args:
cmd_name (str): The name of the command (without the "!" prefix).
cmd_help_dict (dict): A dictionary containing help details for the command.
Expected keys include:
- "description" (str): A brief description of the command.
- "subcommands" (dict, optional): A mapping of subcommand names to their details.
Each value should be a dictionary that may contain an "args" key (str) representing
additional command arguments.
Returns:
str: A formatted help message string that includes the command description and a usage
line with examples of how to invoke the command and its subcommands.
"""
description = cmd_help_dict.get("description", "No description available.")
subcommands = cmd_help_dict.get("subcommands", {})
sub_line_parts = []
for sub, detail in subcommands.items():
if sub == "no subcommand":
sub_line_parts.append(f"!{cmd_name}")
else:
arg_part = detail.get("args", "")
if arg_part:
sub_line_parts.append(f"!{cmd_name} {sub} {arg_part}".strip())
else:
sub_line_parts.append(f"!{cmd_name} {sub}".strip())
usage_line = " | ".join(sub_line_parts) if sub_line_parts else f"!{cmd_name}"
return f"Help for !{cmd_name}: {description}. Usage: {usage_line}"
async def send_message(ctx, text):
"""
Minimal helper to send a message to either Discord or Twitch.
"""
await ctx.send(text)
def track_user_activity(
db_conn,
platform: str,
user_id: str|int,
username: str,
display_name: str,
user_is_bot: bool = False
):
"""
Create or update a user record in the database for a given platform.
This function checks whether a user with the specified user ID exists in the 'users'
table for the provided platform (either "discord" or "twitch"). If a matching record is found,
the function compares the provided username, display name, and bot status with the stored values,
updating the record if any discrepancies are detected. If no record exists, a new user record is
created with a generated UUID.
Args:
db_conn: The active database connection used to perform database operations.
platform (str): The platform to which the user belongs. Expected values are "discord" or "twitch".
user_id (str): The unique identifier of the user on the given platform.
username (str): The raw username of the user (for Discord, this excludes the discriminator).
display_name (str): The display name of the user.
user_is_bot (bool, optional): Indicates whether the user is a bot on the platform.
Defaults to False.
Returns:
None
Side Effects:
- Logs debugging and error messages via the global logger.
- Updates an existing user record if discrepancies are found.
- Inserts a new user record if no existing record is found.
"""
globals.log(f"UUI Lookup for: {username} - {user_id} ({platform.lower()}) ...", "DEBUG")
# Decide which column we use for the ID lookup ("discord_user_id" or "twitch_user_id")
if platform.lower() in ("discord", "twitch"):
identifier_type = f"{platform.lower()}_user_id"
else:
globals.log(f"Unknown platform '{platform}' in track_user_activity!", "WARNING")
return
# 1) Try to find an existing user row.
user_data = lookup_user(db_conn, identifier=user_id, identifier_type=identifier_type)
if user_data:
# Found an existing row for that user ID on this platform.
# Check if the username or display_name is different and update if necessary.
need_update = False
column_updates = []
params = []
globals.log(f"... Returned {user_data}", "DEBUG")
if platform.lower() == "discord":
if user_data["discord_username"] != username:
need_update = True
column_updates.append("discord_username = ?")
params.append(username)
if user_data["discord_user_display_name"] != display_name:
need_update = True
column_updates.append("discord_user_display_name = ?")
params.append(display_name)
if user_data["user_is_bot"] != user_is_bot:
need_update = True
column_updates.append("user_is_bot = ?")
params.append(int(user_is_bot))
if need_update:
set_clause = ", ".join(column_updates)
update_sql = f"""
UPDATE users
SET {set_clause}
WHERE discord_user_id = ?
"""
params.append(user_id)
rowcount = run_db_operation(db_conn, "update", update_sql, params=params)
if rowcount and rowcount > 0:
globals.log(f"Updated Discord user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
elif platform.lower() == "twitch":
if user_data["twitch_username"] != username:
need_update = True
column_updates.append("twitch_username = ?")
params.append(username)
if user_data["twitch_user_display_name"] != display_name:
need_update = True
column_updates.append("twitch_user_display_name = ?")
params.append(display_name)
if user_data["user_is_bot"] != user_is_bot:
need_update = True
column_updates.append("user_is_bot = ?")
params.append(int(user_is_bot))
if need_update:
set_clause = ", ".join(column_updates)
update_sql = f"""
UPDATE users
SET {set_clause}
WHERE twitch_user_id = ?
"""
params.append(user_id)
rowcount = run_db_operation(db_conn, "update", update_sql, params=params)
if rowcount and rowcount > 0:
globals.log(f"Updated Twitch user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
else:
# 2) No row found => create a new user row.
new_uuid = str(uuid.uuid4())
if platform.lower() == "discord":
insert_sql = """
INSERT INTO users (
UUID,
discord_user_id,
discord_username,
discord_user_display_name,
user_is_bot
)
VALUES (?, ?, ?, ?, ?)
"""
params = (new_uuid, user_id, username, display_name, int(user_is_bot))
else: # platform is "twitch"
insert_sql = """
INSERT INTO users (
UUID,
twitch_user_id,
twitch_username,
twitch_user_display_name,
user_is_bot
)
VALUES (?, ?, ?, ?, ?)
"""
params = (new_uuid, user_id, username, display_name, int(user_is_bot))
rowcount = run_db_operation(db_conn, "write", insert_sql, params)
if rowcount and rowcount > 0:
globals.log(f"Created new user row for {platform} user '{username}' (display '{display_name}') with UUID={new_uuid}.", "DEBUG")
else:
globals.log(f"Failed to create new user row for {platform} user '{username}'", "ERROR")
from modules.db import log_bot_event
def log_bot_startup(db_conn):
"""
Logs a bot startup event.
"""
log_bot_event(db_conn, "BOT_STARTUP", "Bot successfully started.")
def log_bot_shutdown(db_conn, intent: str = "Error/Crash"):
"""
Logs a bot shutdown event.
"""
log_bot_event(db_conn, "BOT_SHUTDOWN", f"Bot is shutting down - {intent}.")
def generate_link_code():
"""Generates a unique 8-character alphanumeric link code."""
import random, string
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
def time_since(start, end=None, format=None):
"""
Calculate and format the elapsed time between two epoch timestamps.
This function computes the elapsed time between a starting epoch timestamp and an ending
epoch timestamp. If no end time is provided, the current time is used. The elapsed time is
formatted according to the specified unit:
- 's': whole seconds and remaining milliseconds.
- 'm': whole minutes and remaining seconds.
- 'h': whole hours and remaining minutes.
- 'd': whole days and remaining hours.
If an invalid format is given, a warning is logged and the default format 's' is used.
Args:
start (float): The starting epoch timestamp (in seconds). If None, an error is logged and
the function returns None.
end (float, optional): The ending epoch timestamp (in seconds). Defaults to the current time.
format (str, optional): A single character indicating the desired format of the elapsed time.
Must be one of:
's' for seconds (with milliseconds),
'm' for minutes (with seconds),
'h' for hours (with minutes), or
'd' for days (with hours).
Defaults to 's' if an invalid value is provided.
Returns:
tuple: A tuple (x, y, total_elapsed) where:
- For 's': x is a string representing whole seconds (with "s" appended) and y is a string
representing the remaining milliseconds (with "ms" appended).
- For 'm': x is a string representing whole minutes (with "m" appended) and y is a string
representing the remaining seconds (with "s" appended).
- For 'h': x is a string representing whole hours (with "h" appended) and y is a string
representing the remaining minutes (with "m" appended).
- For 'd': x is a string representing whole days (with "d" appended) and y is a string
representing the remaining hours (with "h" appended).
- total_elapsed (float): The total elapsed time in seconds.
"""
# Ensure a start time is provided.
if start is None:
globals.log("time_since() lacks a start value!", "ERROR")
return None
# If no end time is provided, use the current time.
if end is None:
end = time.time()
# Default to seconds if format is not valid.
if format not in ["s", "m", "h", "d"]:
globals.log("time_since() has incorrect format string. Defaulting to 's'", "WARNING")
format = "s"
# Compute the total elapsed time in seconds.
since = end - start
# Break down the elapsed time according to the requested format.
if format == "s":
# Format as seconds: whole seconds and remainder in milliseconds.
x = int(since)
y = int((since - x) * 1000)
x = str(x) + "s"
y = str(y) + "ms"
elif format == "m":
# Format as minutes: whole minutes and remainder in seconds.
x = int(since // 60)
y = int(since % 60)
x = str(x) + "m"
y = str(y) + "s"
elif format == "h":
# Format as hours: whole hours and remainder in minutes.
x = int(since // 3600)
y = int((since % 3600) // 60)
x = str(x) + "h"
y = str(y) + "m"
elif format == "d":
# Format as days: whole days and remainder in hours.
x = int(since // 86400)
y = int((since % 86400) // 3600)
x = str(x) + "d"
y = str(y) + "h"
return (x, y, since)
def wfstl():
"""
Write Function Start To Log (debug)
Writes the calling function to log under the DEBUG category.
"""
caller_function_name = inspect.currentframe().f_back.f_code.co_name
globals.log(f"Function {caller_function_name} started processing", "DEBUG")
def wfetl():
"""
Write Function End To Log (debug)
Writes the calling function to log under the DEBUG category.
"""
caller_function_name = inspect.currentframe().f_back.f_code.co_name
globals.log(f"Function {caller_function_name} finished processing", "DEBUG")
async def get_guild_info(bot: discord.Client, guild_id: Union[int, str]) -> dict:
"""
Retrieve information about a Discord guild given its ID.
This asynchronous function attempts to retrieve a guild's information by its ID. The provided
guild_id is first converted to an integer if it is not already one. The function then tries to
obtain the guild from the bot's cache using `bot.get_guild()`. If the guild is not found in the
cache, it fetches the guild from the Discord API using `bot.fetch_guild()`. If the guild cannot be
found or if an error occurs during the fetch, the function raises an appropriate exception.
The function returns a dictionary containing basic information about the guild, including:
- id (int): The unique identifier of the guild.
- name (str): The name of the guild.
- owner_id (int): The user ID of the guild's owner.
- member_count (int): The number of members in the guild.
- icon_url (Optional[str]): The URL of the guild's icon if available.
- created_at (Optional[str]): The ISO-formatted creation date of the guild if available.
Args:
bot (discord.Client): The Discord client instance.
guild_id (Union[int, str]): The ID of the guild. This can be provided as an integer or as a
string that represents an integer.
Returns:
dict: A dictionary containing the guild's information.
Raises:
ValueError: If the provided guild_id is not convertible to an integer or if the guild is not found.
RuntimeError: If an HTTP error occurs while fetching the guild.
"""
# Ensure guild_id is an integer.
try:
guild_id = int(guild_id)
except ValueError:
raise ValueError("guild_id must be an int or a string representing an int.")
# Try to get the guild from the cache first.
guild = bot.get_guild(guild_id)
# If not in cache, try to fetch it from the API.
if guild is None:
try:
guild = await bot.fetch_guild(guild_id)
except discord.NotFound:
raise ValueError("Guild not found.")
except discord.HTTPException as e:
raise RuntimeError(f"An error occurred while fetching the guild: {e}")
# Build a dictionary with some basic information.
info = {
"id": guild.id,
"name": guild.name,
"owner_id": guild.owner_id,
"member_count": guild.member_count,
"icon_url": guild.icon.url if guild.icon else None,
"created_at": guild.created_at.isoformat() if guild.created_at else None,
}
return info
async def is_channel_live(bot = None) -> bool:
streams = await bot.fetch_streams(user_logins=["ookamikuntv"]) if bot else []
return bool(streams)
def list_channels(self):
# Command to list connected channels.
connected_channels = ", ".join(channel.name for channel in self.connected_channels)
globals.log(f"Currently connected to {connected_channels}")
def command_allowed_twitch(func):
"""
A custom check that allows a command to run based on channel settings.
It looks up the current channel in CHANNEL_CONFIG and either allows or denies
the command based on the filter mode and list.
"""
@wraps(func)
async def wrapper(ctx, *args, **kwargs):
# Load the full configuration.
full_config = globals.constants.twitch_channels_config()
# Get the channel name and then the channel-specific configuration.
channel_name = ctx.channel.name.lower()
channel_config = full_config.get(channel_name)
# If there's no configuration for this channel, block the command.
if not channel_config:
globals.log(f"No configuration found for Twitch channel '{channel_name}'. Blocking command '{ctx.command.name}'.")
return
mode = channel_config.get("commands_filter_mode")
filtered = channel_config.get("commands_filtered", [])
command_name = ctx.command.name
# Check based on filter mode.
if mode == "exclude":
if command_name in filtered:
globals.log(f"Command '{command_name}' is excluded on Twitch channel '{channel_name}'.")
return
elif mode == "include":
if command_name not in filtered:
globals.log(f"Command '{command_name}' is not allowed on Twitch channel '{channel_name}' (include mode).")
return
# If all checks pass, run the command.
return await func(ctx, *args, **kwargs)
return wrapper
###############################################
# Development Test Function (called upon start)
###############################################
def dev_func(db_conn, enable: bool = False):
if enable:
id = "203190147582394369"
id_type = "discord_user_id"
uui_info = lookup_user(db_conn, identifier=id, identifier_type=id_type)
if uui_info:
return list(uui_info.values())
else:
return f"User with identifier '{id}' ({id_type}) not found"