1036 lines
43 KiB
Python
1036 lines
43 KiB
Python
import time
|
|
import os
|
|
import random
|
|
import json
|
|
import re
|
|
import functools
|
|
import inspect
|
|
import uuid
|
|
from typing import Union
|
|
from modules.db import run_db_operation, lookup_user, log_message
|
|
import modules.utility as utility
|
|
import discord
|
|
from functools import wraps
|
|
|
|
import globals
|
|
|
|
try:
|
|
# 'regex' on PyPI supports `\p{L}`, `\p{N}`, etc.
|
|
import regex
|
|
USE_REGEX_LIB = True
|
|
except ImportError:
|
|
# Fallback to Python's built-in 're' if 'regex' isn't installed
|
|
USE_REGEX_LIB = False
|
|
|
|
DICTIONARY_PATH = "dictionary/" # Path to dictionary files
|
|
|
|
# def monitor_cmds(log_func):
|
|
# """
|
|
# Decorator that logs when a command starts and ends execution.
|
|
# """
|
|
# def decorator(func):
|
|
# @functools.wraps(func)
|
|
# async def wrapper(*args, **kwargs):
|
|
# start_time = time.time()
|
|
# try:
|
|
# # Extract a command name from the function name
|
|
# cmd_name = str(func.__name__).split("_")[1]
|
|
# log_func(f"Command '{cmd_name}' started execution.", "DEBUG")
|
|
|
|
# # Await the actual command function
|
|
# result = await func(*args, **kwargs)
|
|
|
|
# end_time = time.time()
|
|
# cmd_duration = round(end_time - start_time, 2)
|
|
# log_func(f"Command '{cmd_name}' finished execution after {cmd_duration}s.", "DEBUG")
|
|
# return result
|
|
# except Exception as e:
|
|
# end_time = time.time()
|
|
# cmd_duration = round(end_time - start_time, 2)
|
|
# log_func(f"Command '{cmd_name}' FAILED while executing after {cmd_duration}s: {e}", "CRITICAL")
|
|
# # Explicitly preserve the original signature for slash command introspection
|
|
# wrapper.__signature__ = inspect.signature(func)
|
|
# return wrapper
|
|
# return decorator
|
|
|
|
# def monitor_cmds():
|
|
# """
|
|
# Decorator that logs when a command starts, completes, or fails.
|
|
# Uses `ctx.bot.log` dynamically.
|
|
# """
|
|
# def decorator(func):
|
|
# @functools.wraps(func)
|
|
# async def wrapper(*args, **kwargs):
|
|
# start_time = time.time()
|
|
# try:
|
|
# ctx = args[0] # First argument should be `ctx`
|
|
# cmd_name = ctx.command.name if hasattr(ctx, "command") else func.__name__
|
|
|
|
# log_func = ctx.bot.log
|
|
# log_func(f"Command '{cmd_name}' started by {ctx.author} in #{ctx.channel}", "DEBUG")
|
|
|
|
# # Execute the command
|
|
# result = await func(*args, **kwargs)
|
|
|
|
# # Log successful execution
|
|
# duration = round(time.time() - start_time, 2)
|
|
# log_func(f"Command '{cmd_name}' finished execution in {duration}s.", "DEBUG")
|
|
|
|
# return result
|
|
# except Exception as e:
|
|
# duration = round(time.time() - start_time, 2)
|
|
# log_func(f"Command execution failed: '{cmd_name}' after {duration}s: {e}", "CRITICAL")
|
|
|
|
# # Fix: Ensure Discord's command system keeps the correct function parameters
|
|
# wrapper.__signature__ = inspect.signature(func)
|
|
|
|
# return wrapper
|
|
# return decorator
|
|
|
|
# def monitor_msgs(log_func, db_conn):
|
|
# """
|
|
# Decorator that logs Discord messages and tracks user activity.
|
|
# Works for both commands and event handlers like on_message().
|
|
# """
|
|
# def decorator(func):
|
|
# @functools.wraps(func)
|
|
# async def wrapper(self, message: discord.Message, *args, **kwargs):
|
|
# start_time = time.time()
|
|
# try:
|
|
# # Ignore bot messages
|
|
# if message.author.bot:
|
|
# return
|
|
|
|
# user_id = str(message.author.id)
|
|
# user_name = message.author.name
|
|
# display_name = message.author.display_name
|
|
|
|
# # Track user activity
|
|
# track_user_activity(
|
|
# db_conn=db_conn,
|
|
# log_func=log_func,
|
|
# platform="discord",
|
|
# user_id=user_id,
|
|
# username=user_name,
|
|
# display_name=display_name,
|
|
# user_is_bot=False
|
|
# )
|
|
|
|
# log_func(f"Message from {user_name} in {message.channel} (Guild: {message.guild.name if message.guild else 'DM'})", "DEBUG")
|
|
|
|
# # Fetch user UUID
|
|
# user_data = lookup_user(db_conn, log_func, identifier=user_id, identifier_type="discord_user_id")
|
|
# user_uuid = user_data["UUID"] if user_data else "UNKNOWN"
|
|
|
|
# # Extract message details
|
|
# platform_str = f"discord-{message.guild.name}" if message.guild else "discord-DM"
|
|
# channel_str = message.channel.name if hasattr(message.channel, "name") else "DM"
|
|
# attachments = ", ".join(a.url for a in message.attachments) if message.attachments else ""
|
|
|
|
# # Log message in DB
|
|
# log_message(
|
|
# db_conn=db_conn,
|
|
# log_func=log_func,
|
|
# user_uuid=user_uuid,
|
|
# message_content=message.content or "",
|
|
# platform=platform_str,
|
|
# channel=channel_str,
|
|
# attachments=attachments
|
|
# )
|
|
|
|
# # Call the original `on_message()` function
|
|
# await func(self, message, *args, **kwargs)
|
|
|
|
# # Ensure the bot processes commands
|
|
# await self.process_commands(message)
|
|
|
|
# except Exception as e:
|
|
# log_func(f"Error processing message from {message.author}: {e}", "ERROR")
|
|
# finally:
|
|
# duration = round(time.time() - start_time, 2)
|
|
# log_func(f"Message processing complete in {duration}s.", "DEBUG")
|
|
|
|
# # Preserve original function signature
|
|
# wrapper.__signature__ = inspect.signature(func)
|
|
# return wrapper
|
|
# return decorator
|
|
|
|
|
|
def format_uptime(seconds: float) -> tuple[str, int]:
|
|
"""
|
|
Convert a duration in seconds into a human-readable string using up to two significant time units.
|
|
|
|
This function breaks down the given number of seconds into larger time units (years, months,
|
|
days, hours, minutes, and seconds) and returns a string representing the duration using only the
|
|
two most significant non-zero units. For example:
|
|
- 192000 seconds might be formatted as "2 days, 5 hours"
|
|
- 32 minutes will be formatted as "32 minutes"
|
|
- 37000000 seconds could be formatted as "1 year, 3 months"
|
|
|
|
Args:
|
|
seconds (float): The total number of seconds to convert.
|
|
|
|
Returns:
|
|
tuple(str, int):
|
|
A tuple containing:
|
|
- A human-readable string representing the duration (using up to two time units).
|
|
- The original duration in seconds as an integer.
|
|
|
|
Examples:
|
|
>>> format_uptime(192000)
|
|
('2 days, 5 hours', 192000)
|
|
>>> format_uptime(60)
|
|
('1 minute', 60)
|
|
"""
|
|
seconds = int(seconds) # Ensure integer seconds
|
|
seconds_int = seconds
|
|
|
|
# Define time units as tuples of (unit name, seconds per unit)
|
|
units = [
|
|
("year", 31536000), # 365 days
|
|
("month", 2592000), # 30 days
|
|
("day", 86400), # 24 hours
|
|
("hour", 3600), # 60 minutes
|
|
("minute", 60),
|
|
("second", 1)
|
|
]
|
|
|
|
# Compute time breakdown
|
|
time_values = []
|
|
for unit_name, unit_seconds in units:
|
|
value, seconds = divmod(seconds, unit_seconds)
|
|
if value > 0:
|
|
time_values.append(f"{value} {unit_name}{'s' if value > 1 else ''}") # Auto pluralize
|
|
|
|
# Return only the two most significant time units (e.g., "3 days, 4 hours")
|
|
return (", ".join(time_values[:2]), seconds_int) if time_values else ("0 seconds", 0)
|
|
|
|
|
|
def get_random_reply(dictionary_name: str, category: str, **variables) -> str:
|
|
"""
|
|
Fetch a random reply from a specified dictionary file and category.
|
|
|
|
This function loads a JSON file—named after the given dictionary (without the .json extension)
|
|
and located in the DICTIONARY_PATH directory—and retrieves a random reply from the list of
|
|
responses under the specified category. It then substitutes any placeholders in the reply
|
|
string with the provided keyword arguments. If the file does not exist, the JSON is invalid,
|
|
or the category is missing or not a list, the function returns an appropriate error message.
|
|
|
|
Args:
|
|
dictionary_name (str): The base name of the dictionary file (without the .json extension).
|
|
category (str): The key within the JSON file representing the category of replies.
|
|
**variables: Arbitrary keyword arguments for substituting placeholders in the reply string.
|
|
|
|
Returns:
|
|
str: A formatted reply string with the variables substituted, or an error message if the
|
|
file, category, or JSON data is invalid.
|
|
"""
|
|
file_path = os.path.join(DICTIONARY_PATH, f"{dictionary_name}.json")
|
|
|
|
# Ensure file exists
|
|
if not os.path.exists(file_path):
|
|
return f"[Error: Missing {dictionary_name}.json]"
|
|
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as file:
|
|
data = json.load(file)
|
|
except json.JSONDecodeError:
|
|
return f"[Error: Failed to load {dictionary_name}.json]"
|
|
|
|
# Ensure category exists
|
|
if category not in data or not isinstance(data[category], list):
|
|
return f"[Error: No valid entries for {category} in {dictionary_name}.json]"
|
|
|
|
# Select a random reply
|
|
response = random.choice(data[category])
|
|
|
|
# Replace placeholders with provided variables
|
|
return response.format(**variables)
|
|
|
|
|
|
##############################
|
|
# Basic sanitization
|
|
# DO NOT RELY SOLELY ON THIS
|
|
##############################
|
|
def sanitize_user_input(
|
|
user_input: str,
|
|
usage: str = "GENERAL",
|
|
max_length: int = 500
|
|
):
|
|
"""
|
|
Sanitize user input using a whitelisting approach.
|
|
|
|
This function sanitizes the provided user input by applying a whitelist filter based on
|
|
the specified usage context. For inputs intended for calculation ("CALC"), it retains only
|
|
digits, mathematical operators, parentheses, and related characters. For general text
|
|
("GENERAL"), it permits a broader range of characters including letters, numbers, punctuation,
|
|
symbols, and separators. The function also truncates the input to a maximum length, removes
|
|
unwanted whitespace characters, and applies additional filtering using either a regex library
|
|
(if available) or a fallback method.
|
|
|
|
Args:
|
|
user_input (str): The raw input string from the user (e.g., from Twitch or Discord).
|
|
usage (str, optional): The context for sanitization. Accepted values are:
|
|
- "CALC": Retain only characters relevant for mathematical expressions.
|
|
- "GENERAL": Allow typical readable characters and punctuation.
|
|
Defaults to "GENERAL".
|
|
max_length (int, optional): The maximum allowed length of the input. Any characters beyond
|
|
this limit will be truncated. Defaults to 500.
|
|
|
|
Returns:
|
|
tuple: A tuple in the form:
|
|
(sanitized_str, sanitization_applied, reason_string, original_str)
|
|
where:
|
|
- sanitized_str (str): The sanitized version of the input.
|
|
- sanitization_applied (bool): True if any modifications were applied; otherwise, False.
|
|
- reason_string (str): A semicolon-separated string explaining the sanitization steps.
|
|
- original_str (str): The original, unmodified input string.
|
|
|
|
Security Recommendations:
|
|
1) For Database Storage (e.g., MariaDB):
|
|
- Always use parameterized queries or an ORM with bound parameters.
|
|
- Do not rely solely on string sanitization to prevent SQL injection.
|
|
|
|
2) For Code Execution (e.g., using eval):
|
|
- Avoid using eval/exec on user input.
|
|
- If execution of user input is required, consider a restricted math parser or an audited sandbox.
|
|
|
|
3) For HTML Sanitization:
|
|
- Bleach is deprecated; consider modern alternatives or frameworks that safely sanitize HTML output.
|
|
- Note: This function does not sanitize HTML tags.
|
|
"""
|
|
original_string = str(user_input)
|
|
reasons = []
|
|
sanitization_applied = False
|
|
|
|
# 1. Truncate and remove newlines, tabs, etc.
|
|
truncated = original_string[:max_length]
|
|
truncated = re.sub(r"[\r\n\t]+", " ", truncated)
|
|
|
|
sanitized = truncated
|
|
|
|
# 2. Choose how to filter based on usage
|
|
usage = usage.upper()
|
|
|
|
if usage == "CALC":
|
|
# Allow digits, +, -, *, /, %, parentheses, decimal points, ^ for exponent, spaces
|
|
# Remove everything else
|
|
pattern = r"[^0-9+\-*/%().^ \t]"
|
|
new_sanitized = re.sub(pattern, "", sanitized)
|
|
if new_sanitized != sanitized:
|
|
sanitization_applied = True
|
|
reasons.append("CALC: Removed non-math characters.")
|
|
sanitized = new_sanitized
|
|
|
|
else: # GENERAL usage
|
|
if USE_REGEX_LIB:
|
|
# Remove ASCII control chars (0-31, 127) first
|
|
step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized)
|
|
# Then apply a broad whitelist:
|
|
# \p{L}: letters; \p{N}: numbers; \p{P}: punctuation; \p{S}: symbols; \p{Z}: separators (including spaces)
|
|
# This keeps emojis, foreign characters, typical punctuation, etc.
|
|
pattern = r"[^\p{L}\p{N}\p{P}\p{S}\p{Z}]"
|
|
new_sanitized = regex.sub(pattern, "", step1)
|
|
|
|
if new_sanitized != sanitized:
|
|
sanitization_applied = True
|
|
reasons.append("GENERAL: Removed disallowed chars via regex.")
|
|
sanitized = new_sanitized
|
|
else:
|
|
# Fallback: If 'regex' is not installed, remove control chars and keep ASCII printable only.
|
|
step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized)
|
|
pattern = r"[^ -~]" # Keep only ASCII characters 32-126
|
|
new_sanitized = re.sub(pattern, "", step1)
|
|
|
|
if new_sanitized != sanitized:
|
|
sanitization_applied = True
|
|
reasons.append("GENERAL: Removed non-ASCII or control chars (fallback).")
|
|
sanitized = new_sanitized
|
|
|
|
# 3. Final trim
|
|
sanitized = sanitized.strip()
|
|
|
|
# 4. Prepare output
|
|
reason_string = "; ".join(reasons)
|
|
return (sanitized, sanitization_applied, reason_string, original_string)
|
|
|
|
|
|
#####################
|
|
# Help command logic
|
|
#####################
|
|
|
|
async def handle_help_command(ctx, command_name, bot, is_discord):
|
|
"""
|
|
Provide help text for a specific command or list all commands.
|
|
|
|
This asynchronous function is called by platform-specific help commands to generate and
|
|
return the appropriate help message. It retrieves the help data stored on the bot (parsed
|
|
from a JSON file) and checks whether the requested command exists in that data. If no
|
|
specific command is provided (i.e. `command_name` is None), it returns a list of all loaded
|
|
commands. Depending on the platform (Discord or Twitch), it formats the help message using
|
|
the corresponding builder function.
|
|
|
|
Args:
|
|
ctx: The context from the command invocation (from discord.py or twitchio).
|
|
command_name (str): The name of the command for which help is requested (e.g., "quote"). If
|
|
None, a list of all available commands is returned.
|
|
bot: The current bot instance, which should have a `help_data` attribute containing the help
|
|
information.
|
|
is_discord (bool): True if the bot is running on Discord; False if running on Twitch.
|
|
|
|
Returns:
|
|
str: A formatted help message string. This message may indicate one of the following:
|
|
- A list of all loaded commands (if no command name is provided),
|
|
- Detailed help for a specific command (if help data exists for that command),
|
|
- A notice that the command is loaded but lacks help data,
|
|
- A notice that the command is deprecated/unloaded, or
|
|
- An error message if the help data is missing or invalid.
|
|
|
|
Side Effects:
|
|
Logs warnings or errors if the help data is missing, improperly structured, or if the
|
|
command is not found.
|
|
"""
|
|
# If there's no loaded help_data, we can't do much
|
|
if not hasattr(bot, "help_data") or not bot.help_data:
|
|
return await "No help data found."
|
|
|
|
help_data = bot.help_data # The parsed JSON from e.g. help_discord.json
|
|
if "commands" not in help_data:
|
|
return "Invalid help data structure (no 'commands' key).\n*This is due to an error with the help file.*"
|
|
|
|
if not command_name:
|
|
# User typed just "!help" => list all known commands from this bot
|
|
loaded_cmds = get_loaded_commands(bot, is_discord)
|
|
if not loaded_cmds:
|
|
return "I have no commands loaded."
|
|
else:
|
|
if is_discord:
|
|
help_str = f"I currently offer these commands:"
|
|
for cmd in loaded_cmds:
|
|
help_str += f"\n- `{cmd}`"
|
|
help_str += f"\n*Use `!help <command>` for more details.*"
|
|
return help_str
|
|
else:
|
|
short_list = ", ".join(loaded_cmds)
|
|
# We can also mention "Use !help [command] for more info."
|
|
return f"I currently offer these commands: {short_list}. \nUse '!help <command>' for details."
|
|
|
|
# 1) Check if the command is loaded
|
|
loaded = (command_name in get_loaded_commands(bot, is_discord))
|
|
# 2) Check if it has help info in the JSON
|
|
cmd_help = help_data["commands"].get(command_name, None)
|
|
|
|
if loaded and cmd_help:
|
|
# The command is loaded, and we have help info => show it
|
|
if is_discord:
|
|
msg = build_discord_help_message(command_name, cmd_help)
|
|
else:
|
|
msg = build_twitch_help_message(command_name, cmd_help)
|
|
return msg
|
|
|
|
elif loaded and not cmd_help:
|
|
# The command is loaded but no help info => mention that
|
|
return f"The '{command_name}' command is loaded but has no help info yet."
|
|
elif (not loaded) and cmd_help:
|
|
# The command is not loaded, but we have an entry => mention it's unloaded/deprecated
|
|
return f"The '{command_name}' command is not currently loaded (deprecated or unavailable)."
|
|
else:
|
|
# Not loaded, no help info => not found at all
|
|
return f"I'm sorry, I don't offer a command named '{command_name}'."
|
|
|
|
|
|
|
|
def initialize_help_data(bot, help_json_path, is_discord):
|
|
"""
|
|
Load help data from a JSON file and verify bot commands against it.
|
|
|
|
This function loads help data from the specified JSON file and stores the parsed data
|
|
in the bot's `help_data` attribute. After loading the data, it cross-checks the commands
|
|
loaded on the bot with the commands defined in the help file. The function logs warnings
|
|
for any discrepancies:
|
|
- If a command is present in the help file but not loaded on the bot, it logs a warning
|
|
(indicating the command may be deprecated).
|
|
- If a command is loaded on the bot but missing from the help file, it logs a warning
|
|
(indicating missing help information).
|
|
|
|
Args:
|
|
bot: The bot instance, which will have its `help_data` attribute updated.
|
|
help_json_path (str): The file path to the JSON file containing help data.
|
|
is_discord (bool): A flag indicating whether the bot is a Discord bot (True) or a Twitch bot (False).
|
|
|
|
Returns:
|
|
None
|
|
|
|
Side Effects:
|
|
- Updates the bot's `help_data` attribute with the contents of the JSON file.
|
|
- Logs warnings or errors if the help file is missing, cannot be parsed, or if there are mismatches
|
|
between the loaded commands and the help data.
|
|
"""
|
|
platform_name = "Discord" if is_discord else "Twitch"
|
|
|
|
if not os.path.exists(help_json_path):
|
|
globals.log(f"Help file '{help_json_path}' not found. No help data loaded.", "WARNING")
|
|
bot.help_data = {}
|
|
return
|
|
|
|
# Load the JSON
|
|
try:
|
|
with open(help_json_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
except Exception as e:
|
|
globals.log(f"Error parsing help JSON '{help_json_path}': {e}", "ERROR")
|
|
data = {}
|
|
|
|
bot.help_data = data
|
|
|
|
# Now cross-check the loaded commands vs. the data
|
|
loaded_cmds = set(get_loaded_commands(bot, is_discord))
|
|
if "commands" not in data:
|
|
globals.log(f"No 'commands' key in {help_json_path}, skipping checks.", "ERROR")
|
|
return
|
|
|
|
file_cmds = set(data["commands"].keys())
|
|
|
|
# 1) Commands in file but not loaded
|
|
missing_cmds = file_cmds - loaded_cmds
|
|
for cmd in missing_cmds:
|
|
globals.log(f"Help file has '{cmd}', but it's not loaded on this {platform_name} bot (deprecated?).", "WARNING")
|
|
|
|
# 2) Commands loaded but not in file
|
|
needed_cmds = loaded_cmds - file_cmds
|
|
for cmd in needed_cmds:
|
|
globals.log(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING")
|
|
|
|
|
|
|
|
def get_loaded_commands(bot, is_discord):
|
|
"""
|
|
Retrieve and sort the list of loaded commands from a bot instance.
|
|
|
|
This function examines the provided bot instance and extracts its registered commands.
|
|
For a Discord bot (when `is_discord` is True), it iterates over `bot.commands` and collects
|
|
the command names. For a Twitch bot (when `is_discord` is False), it iterates over `bot._commands`
|
|
and collects the command objects. Throughout the process, the function logs debug and error
|
|
messages to help trace the execution flow. Finally, it returns the list of commands in sorted order.
|
|
|
|
Args:
|
|
bot: The bot instance from which to retrieve commands. This may be either a Discord or Twitch bot.
|
|
is_discord (bool): Indicates whether the bot is a Discord bot. If False, the function assumes the bot
|
|
is a Twitch bot.
|
|
|
|
Returns:
|
|
list: A sorted list of commands. For a Discord bot, this list contains command names (strings);
|
|
for a Twitch bot, it contains command objects as stored in `bot._commands`.
|
|
|
|
Side Effects:
|
|
Logs debug, warning, and error messages regarding the command processing.
|
|
"""
|
|
from discord.ext import commands as discord_commands
|
|
from twitchio.ext import commands as twitch_commands
|
|
|
|
commands_list = []
|
|
|
|
try:
|
|
_bot_type = str(type(bot)).split("_")[1].split(".")[0]
|
|
globals.log(f"Currently processing commands for {_bot_type} ...", "DEBUG")
|
|
except Exception as e:
|
|
globals.log(f"Unable to determine current bot type: {e}", "WARNING")
|
|
|
|
# For Discord
|
|
if is_discord:
|
|
try:
|
|
# 'bot.commands' is a set of Command objects.
|
|
for cmd_obj in bot.commands:
|
|
commands_list.append(cmd_obj.name)
|
|
debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING"
|
|
globals.log(f"Discord commands body: {commands_list}", f"{debug_level}")
|
|
except Exception as e:
|
|
globals.log(f"Error retrieving Discord commands: {e}", "ERROR")
|
|
elif not is_discord:
|
|
try:
|
|
for cmd_obj in bot._commands:
|
|
commands_list.append(cmd_obj)
|
|
debug_level = "DEBUG" if len(commands_list) > 0 else "WARNING"
|
|
globals.log(f"Twitch commands body: {commands_list}", f"{debug_level}")
|
|
except Exception as e:
|
|
globals.log(f"Error retrieving Twitch commands: {e}", "ERROR")
|
|
else:
|
|
globals.log(f"Unable to determine platform in 'get_loaded_commands()'!", "CRITICAL")
|
|
|
|
globals.log(f"... Finished processing commands for {_bot_type} ...", "DEBUG")
|
|
return sorted(commands_list)
|
|
|
|
|
|
def build_discord_help_message(cmd_name, cmd_help_dict):
|
|
"""
|
|
Build a verbose, multi-line help message for Discord.
|
|
|
|
This function constructs a detailed help message for a Discord command using the
|
|
provided command name and a dictionary of help information. The dictionary may include
|
|
a description, subcommands with their arguments and descriptions, and example usage
|
|
scenarios. The generated message is formatted as a multi-line string with Markdown
|
|
styling for use on Discord.
|
|
|
|
Args:
|
|
cmd_name (str): The name of the command.
|
|
cmd_help_dict (dict): A dictionary containing help details for the command.
|
|
Expected keys include:
|
|
- "description" (str): A detailed description of the command.
|
|
- "subcommands" (dict, optional): A mapping of subcommand names to a dictionary
|
|
with keys "args" (str) and "desc" (str) representing the arguments and a short
|
|
description of the subcommand.
|
|
- "examples" (list, optional): A list of example strings demonstrating how to use
|
|
the command. Each example should be a string that optionally contains a description
|
|
after a " : " separator.
|
|
|
|
Returns:
|
|
str: A multi-line string formatted for Discord that includes the command description,
|
|
subcommands (if any), and example usage.
|
|
"""
|
|
description = cmd_help_dict.get("description", "No description available.\n")
|
|
subcommands = cmd_help_dict.get("subcommands", {})
|
|
examples = cmd_help_dict.get("examples", [])
|
|
|
|
lines = [f"**Help for `{cmd_name}`**:",
|
|
f"Description: {description}"]
|
|
|
|
if subcommands:
|
|
lines.append("\n**Subcommands / Arguments:**")
|
|
for sub, detail in subcommands.items():
|
|
arg_part = detail.get("args", "")
|
|
desc_part = detail.get("desc", "")
|
|
lines.append(f"• **{sub}** {arg_part} **->** {desc_part}")
|
|
else:
|
|
lines.append("\n*No subcommands defined.*")
|
|
|
|
if examples:
|
|
lines.append("\nExample usage:")
|
|
for ex in examples:
|
|
ex_arr = ex.split(" : ", 1) # Split into max 2 parts
|
|
|
|
# Handle missing description case
|
|
ex_cmd = ex_arr[0]
|
|
ex_note = f"\n {ex_arr[1]}" if len(ex_arr) > 1 else ""
|
|
lines.append(f"- `{ex_cmd}`{ex_note}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
def build_twitch_help_message(cmd_name, cmd_help_dict):
|
|
"""
|
|
Build a concise Twitch help message for a command.
|
|
|
|
This function constructs a help message string for a Twitch command based on the
|
|
provided command name and a dictionary of help details. The help dictionary should
|
|
contain a "description" of the command and may include a "subcommands" mapping. For
|
|
each subcommand, the function builds usage examples by appending the subcommand name
|
|
and its arguments (if any) to the base command. If no subcommands are defined, the
|
|
usage defaults to the base command.
|
|
|
|
Args:
|
|
cmd_name (str): The name of the command (without the "!" prefix).
|
|
cmd_help_dict (dict): A dictionary containing help details for the command.
|
|
Expected keys include:
|
|
- "description" (str): A brief description of the command.
|
|
- "subcommands" (dict, optional): A mapping of subcommand names to their details.
|
|
Each value should be a dictionary that may contain an "args" key (str) representing
|
|
additional command arguments.
|
|
|
|
Returns:
|
|
str: A formatted help message string that includes the command description and a usage
|
|
line with examples of how to invoke the command and its subcommands.
|
|
"""
|
|
description = cmd_help_dict.get("description", "No description available.")
|
|
subcommands = cmd_help_dict.get("subcommands", {})
|
|
sub_line_parts = []
|
|
for sub, detail in subcommands.items():
|
|
if sub == "no subcommand":
|
|
sub_line_parts.append(f"!{cmd_name}")
|
|
else:
|
|
arg_part = detail.get("args", "")
|
|
if arg_part:
|
|
sub_line_parts.append(f"!{cmd_name} {sub} {arg_part}".strip())
|
|
else:
|
|
sub_line_parts.append(f"!{cmd_name} {sub}".strip())
|
|
|
|
usage_line = " | ".join(sub_line_parts) if sub_line_parts else f"!{cmd_name}"
|
|
|
|
return f"Help for !{cmd_name}: {description}. Usage: {usage_line}"
|
|
|
|
|
|
|
|
async def send_message(ctx, text):
|
|
"""
|
|
Minimal helper to send a message to either Discord or Twitch.
|
|
"""
|
|
await ctx.send(text)
|
|
|
|
import uuid
|
|
from modules.db import run_db_operation, lookup_user
|
|
import globals
|
|
|
|
def track_user_activity(
|
|
db_conn,
|
|
platform: str,
|
|
user_id: str|int,
|
|
username: str,
|
|
display_name: str,
|
|
user_is_bot: bool = False
|
|
):
|
|
"""
|
|
Tracks or updates a user in the database.
|
|
|
|
This function:
|
|
- Checks if the user already exists in Platform_Mapping.
|
|
- If found, updates username/display_name if changed.
|
|
- If not found, adds a new user and platform mapping entry.
|
|
|
|
Args:
|
|
db_conn: Active database connection.
|
|
platform (str): The platform of the user ("discord" or "twitch").
|
|
user_id (str): The unique user identifier on the platform.
|
|
username (str): The platform-specific username.
|
|
display_name (str): The platform-specific display name.
|
|
user_is_bot (bool, optional): Indicates if the user is a bot. Defaults to False.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
platform = platform.lower()
|
|
valid_platforms = {"discord", "twitch"}
|
|
|
|
if platform not in valid_platforms:
|
|
globals.log(f"Unknown platform '{platform}' in track_user_activity!", "WARNING")
|
|
return
|
|
|
|
# Look up user by platform-specific ID in Platform_Mapping
|
|
user_data = lookup_user(db_conn, identifier=user_id, identifier_type=f"{platform}_user_id")
|
|
|
|
if user_data:
|
|
# Existing user found, update info if necessary
|
|
user_uuid = user_data["UUID"]
|
|
need_update = False
|
|
column_updates = []
|
|
params = []
|
|
|
|
if user_data["platform_username"] != username:
|
|
need_update = True
|
|
column_updates.append("Username = ?")
|
|
params.append(username)
|
|
|
|
if user_data["platform_display_name"] != display_name:
|
|
need_update = True
|
|
column_updates.append("Display_Name = ?")
|
|
params.append(display_name)
|
|
|
|
if need_update:
|
|
update_sql = f"""
|
|
UPDATE Platform_Mapping
|
|
SET {", ".join(column_updates)}
|
|
WHERE Platform_User_ID = ? AND Platform_Type = ?
|
|
"""
|
|
params.extend([user_id, platform.capitalize()])
|
|
rowcount = run_db_operation(db_conn, "update", update_sql, params)
|
|
|
|
if rowcount and rowcount > 0:
|
|
globals.log(f"Updated {platform.capitalize()} user '{username}' (display '{display_name}') in Platform_Mapping.", "DEBUG")
|
|
return
|
|
|
|
# If user was not found in Platform_Mapping, check Users table
|
|
user_uuid = str(uuid.uuid4())
|
|
insert_user_sql = """
|
|
INSERT INTO Users (UUID, Unified_Username, user_is_banned, user_is_bot)
|
|
VALUES (?, ?, 0, ?)
|
|
"""
|
|
run_db_operation(db_conn, "write", insert_user_sql, (user_uuid, username, int(user_is_bot)))
|
|
|
|
# Insert into Platform_Mapping
|
|
insert_mapping_sql = """
|
|
INSERT INTO Platform_Mapping (Platform_User_ID, Platform_Type, UUID, Display_Name, Username)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
"""
|
|
params = (user_id, platform.capitalize(), user_uuid, display_name, username)
|
|
rowcount = run_db_operation(db_conn, "write", insert_mapping_sql, params)
|
|
|
|
if rowcount and rowcount > 0:
|
|
globals.log(f"Created new user entry for {platform} user '{username}' (display '{display_name}') with UUID={user_uuid}.", "DEBUG")
|
|
else:
|
|
globals.log(f"Failed to create user entry for {platform} user '{username}'.", "ERROR")
|
|
|
|
|
|
|
|
from modules.db import log_bot_event
|
|
|
|
def log_bot_startup(db_conn):
|
|
"""
|
|
Logs a bot startup event.
|
|
"""
|
|
log_bot_event(db_conn, "BOT_STARTUP", "Bot successfully started.")
|
|
|
|
def log_bot_shutdown(db_conn, intent: str = "Error/Crash"):
|
|
"""
|
|
Logs a bot shutdown event.
|
|
"""
|
|
log_bot_event(db_conn, "BOT_SHUTDOWN", f"Bot is shutting down - {intent}.")
|
|
|
|
def generate_link_code():
|
|
"""Generates a unique 8-character alphanumeric link code."""
|
|
import random, string
|
|
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
|
|
|
|
def time_since(start, end=None, format=None):
|
|
"""
|
|
Calculate and format the elapsed time between two epoch timestamps.
|
|
|
|
This function computes the elapsed time between a starting epoch timestamp and an ending
|
|
epoch timestamp. If no end time is provided, the current time is used. The elapsed time is
|
|
formatted according to the specified unit:
|
|
- 's': whole seconds and remaining milliseconds.
|
|
- 'm': whole minutes and remaining seconds.
|
|
- 'h': whole hours and remaining minutes.
|
|
- 'd': whole days and remaining hours.
|
|
If an invalid format is given, a warning is logged and the default format 's' is used.
|
|
|
|
Args:
|
|
start (float): The starting epoch timestamp (in seconds). If None, an error is logged and
|
|
the function returns None.
|
|
end (float, optional): The ending epoch timestamp (in seconds). Defaults to the current time.
|
|
format (str, optional): A single character indicating the desired format of the elapsed time.
|
|
Must be one of:
|
|
's' for seconds (with milliseconds),
|
|
'm' for minutes (with seconds),
|
|
'h' for hours (with minutes), or
|
|
'd' for days (with hours).
|
|
Defaults to 's' if an invalid value is provided.
|
|
|
|
Returns:
|
|
tuple: A tuple (x, y, total_elapsed) where:
|
|
- For 's': x is a string representing whole seconds (with "s" appended) and y is a string
|
|
representing the remaining milliseconds (with "ms" appended).
|
|
- For 'm': x is a string representing whole minutes (with "m" appended) and y is a string
|
|
representing the remaining seconds (with "s" appended).
|
|
- For 'h': x is a string representing whole hours (with "h" appended) and y is a string
|
|
representing the remaining minutes (with "m" appended).
|
|
- For 'd': x is a string representing whole days (with "d" appended) and y is a string
|
|
representing the remaining hours (with "h" appended).
|
|
- total_elapsed (float): The total elapsed time in seconds.
|
|
"""
|
|
# Ensure a start time is provided.
|
|
if start is None:
|
|
globals.log("time_since() lacks a start value!", "ERROR")
|
|
return None
|
|
|
|
# If no end time is provided, use the current time.
|
|
if end is None:
|
|
end = time.time()
|
|
|
|
# Default to seconds if format is not valid.
|
|
if format not in ["s", "m", "h", "d"]:
|
|
globals.log("time_since() has incorrect format string. Defaulting to 's'", "WARNING")
|
|
format = "s"
|
|
|
|
# Compute the total elapsed time in seconds.
|
|
since = end - start
|
|
|
|
# Break down the elapsed time according to the requested format.
|
|
if format == "s":
|
|
# Format as seconds: whole seconds and remainder in milliseconds.
|
|
x = int(since)
|
|
y = int((since - x) * 1000)
|
|
x = str(x) + "s"
|
|
y = str(y) + "ms"
|
|
elif format == "m":
|
|
# Format as minutes: whole minutes and remainder in seconds.
|
|
x = int(since // 60)
|
|
y = int(since % 60)
|
|
x = str(x) + "m"
|
|
y = str(y) + "s"
|
|
elif format == "h":
|
|
# Format as hours: whole hours and remainder in minutes.
|
|
x = int(since // 3600)
|
|
y = int((since % 3600) // 60)
|
|
x = str(x) + "h"
|
|
y = str(y) + "m"
|
|
elif format == "d":
|
|
# Format as days: whole days and remainder in hours.
|
|
x = int(since // 86400)
|
|
y = int((since % 86400) // 3600)
|
|
x = str(x) + "d"
|
|
y = str(y) + "h"
|
|
|
|
return (x, y, since)
|
|
|
|
|
|
def wfstl():
|
|
"""
|
|
Write Function Start To Log (debug)
|
|
Writes the calling function to log under the DEBUG category.
|
|
"""
|
|
caller_function_name = inspect.currentframe().f_back.f_code.co_name
|
|
globals.log(f"Function {caller_function_name} started processing", "DEBUG")
|
|
|
|
def wfetl():
|
|
"""
|
|
Write Function End To Log (debug)
|
|
Writes the calling function to log under the DEBUG category.
|
|
"""
|
|
caller_function_name = inspect.currentframe().f_back.f_code.co_name
|
|
globals.log(f"Function {caller_function_name} finished processing", "DEBUG")
|
|
|
|
async def get_guild_info(bot: discord.Client, guild_id: Union[int, str]) -> dict:
|
|
"""
|
|
Retrieve information about a Discord guild given its ID.
|
|
|
|
This asynchronous function attempts to retrieve a guild's information by its ID. The provided
|
|
guild_id is first converted to an integer if it is not already one. The function then tries to
|
|
obtain the guild from the bot's cache using `bot.get_guild()`. If the guild is not found in the
|
|
cache, it fetches the guild from the Discord API using `bot.fetch_guild()`. If the guild cannot be
|
|
found or if an error occurs during the fetch, the function raises an appropriate exception.
|
|
|
|
The function returns a dictionary containing basic information about the guild, including:
|
|
- id (int): The unique identifier of the guild.
|
|
- name (str): The name of the guild.
|
|
- owner_id (int): The user ID of the guild's owner.
|
|
- member_count (int): The number of members in the guild.
|
|
- icon_url (Optional[str]): The URL of the guild's icon if available.
|
|
- created_at (Optional[str]): The ISO-formatted creation date of the guild if available.
|
|
|
|
Args:
|
|
bot (discord.Client): The Discord client instance.
|
|
guild_id (Union[int, str]): The ID of the guild. This can be provided as an integer or as a
|
|
string that represents an integer.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the guild's information.
|
|
|
|
Raises:
|
|
ValueError: If the provided guild_id is not convertible to an integer or if the guild is not found.
|
|
RuntimeError: If an HTTP error occurs while fetching the guild.
|
|
"""
|
|
# Ensure guild_id is an integer.
|
|
try:
|
|
guild_id = int(guild_id)
|
|
except ValueError:
|
|
raise ValueError("guild_id must be an int or a string representing an int.")
|
|
|
|
# Try to get the guild from the cache first.
|
|
guild = bot.get_guild(guild_id)
|
|
|
|
# If not in cache, try to fetch it from the API.
|
|
if guild is None:
|
|
try:
|
|
guild = await bot.fetch_guild(guild_id)
|
|
except discord.NotFound:
|
|
raise ValueError("Guild not found.")
|
|
except discord.HTTPException as e:
|
|
raise RuntimeError(f"An error occurred while fetching the guild: {e}")
|
|
|
|
# Build a dictionary with some basic information.
|
|
info = {
|
|
"id": guild.id,
|
|
"name": guild.name,
|
|
"owner_id": guild.owner_id,
|
|
"member_count": guild.member_count,
|
|
"icon_url": guild.icon.url if guild.icon else None,
|
|
"created_at": guild.created_at.isoformat() if guild.created_at else None,
|
|
}
|
|
return info
|
|
|
|
async def get_current_twitch_game(bot, channel_name: str) -> str:
|
|
"""
|
|
Retrieve the name of the game currently being played on the given Twitch channel.
|
|
|
|
Parameters:
|
|
bot: A TwitchIO bot instance (or any object with a `fetch_streams` method).
|
|
channel_name (str): The Twitch channel's username.
|
|
|
|
Returns:
|
|
str: The game name as string
|
|
"""
|
|
# Fetch stream data for the specified channel.
|
|
streams = await bot.fetch_streams(user_logins=[channel_name])
|
|
|
|
if streams:
|
|
# Assume the first stream is the one we're interested in.
|
|
stream = streams[0]
|
|
# Depending on your TwitchIO version, the attribute may be `game_name` or `game`.
|
|
game_name = getattr(stream, 'game_name')
|
|
game_id = getattr(stream, 'game_id')
|
|
globals.log(f"'get_current_twitch_game()' result for Twitch channel '{channel_name}': Game ID: {game_id}, Game Name: {game_name}", "DEBUG")
|
|
return game_name
|
|
|
|
return ""
|
|
|
|
|
|
async def is_channel_live(bot=None, channel_name="ookamikuntv") -> bool:
|
|
"""
|
|
Returns True if the specified channel is live, otherwise False.
|
|
Defaults to "ookamikuntv" if no channel_name is provided.
|
|
"""
|
|
streams = await bot.fetch_streams(user_logins=[channel_name]) if bot else []
|
|
return bool(streams)
|
|
|
|
def list_channels(self):
|
|
"""Logs the names of all connected Twitch channels."""
|
|
channels_list = [channel.name for channel in self.connected_channels]
|
|
connected_channels_str = ", ".join(channels_list)
|
|
num_connected_channels = len(channels_list)
|
|
globals.log(
|
|
f"Currently connected to {num_connected_channels} Twitch channel(s): {connected_channels_str}"
|
|
)
|
|
|
|
def command_allowed_twitch(func):
|
|
"""
|
|
A custom check that allows a command to run based on channel settings.
|
|
It looks up the current channel in CHANNEL_CONFIG and either allows or denies
|
|
the command based on the filter mode and list.
|
|
"""
|
|
@wraps(func)
|
|
async def wrapper(ctx, *args, **kwargs):
|
|
# Load the full configuration.
|
|
full_config = globals.constants.twitch_channels_config()
|
|
|
|
# Get the channel name and then the channel-specific configuration.
|
|
channel_name = ctx.channel.name.lower()
|
|
channel_config = full_config.get(channel_name)
|
|
|
|
# If there's no configuration for this channel, block the command.
|
|
if not channel_config:
|
|
globals.log(f"No configuration found for Twitch channel '{channel_name}'. Blocking command '{ctx.command.name}'.")
|
|
return
|
|
|
|
mode = channel_config.get("commands_filter_mode")
|
|
filtered = channel_config.get("commands_filtered", [])
|
|
command_name = ctx.command.name
|
|
|
|
# Check based on filter mode.
|
|
if mode == "exclude":
|
|
if command_name in filtered:
|
|
globals.log(f"Command '{command_name}' is excluded on Twitch channel '{channel_name}'.")
|
|
return
|
|
elif mode == "include":
|
|
if command_name not in filtered:
|
|
globals.log(f"Command '{command_name}' is not allowed on Twitch channel '{channel_name}' (include mode).")
|
|
return
|
|
|
|
# If all checks pass, run the command.
|
|
return await func(ctx, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
###############################################
|
|
# Development Test Function (called upon start)
|
|
###############################################
|
|
def dev_func(db_conn, enable: bool = False):
|
|
if enable:
|
|
id = "203190147582394369"
|
|
id_type = "discord_user_id"
|
|
uui_info = lookup_user(db_conn, identifier=id, identifier_type=id_type)
|
|
if uui_info:
|
|
return list(uui_info.values())
|
|
else:
|
|
return f"User with identifier '{id}' ({id_type}) not found" |