OokamiPupV2/modules/utility.py

596 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import os
import random
import json
import re
import functools
import inspect
import uuid
from modules.db import run_db_operation, lookup_user
try:
# 'regex' on PyPI supports `\p{L}`, `\p{N}`, etc.
import regex
USE_REGEX_LIB = True
except ImportError:
# Fallback to Python's built-in 're' if 'regex' isn't installed
USE_REGEX_LIB = False
DICTIONARY_PATH = "dictionary/" # Path to dictionary files
def monitor_cmds(log_func):
"""
Decorator that logs when a command starts and ends execution.
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
# Extract a command name from the function name
cmd_name = str(func.__name__).split("_")[1]
log_func(f"Command '{cmd_name}' started execution.", "DEBUG")
# Await the actual command function
result = await func(*args, **kwargs)
end_time = time.time()
cmd_duration = round(end_time - start_time, 2)
log_func(f"Command '{cmd_name}' finished execution after {cmd_duration}s.", "DEBUG")
return result
except Exception as e:
end_time = time.time()
cmd_duration = round(end_time - start_time, 2)
log_func(f"Command '{cmd_name}' FAILED while executing after {cmd_duration}s: {e}", "CRITICAL")
# Explicitly preserve the original signature for slash command introspection
wrapper.__signature__ = inspect.signature(func)
return wrapper
return decorator
def format_uptime(seconds: float) -> tuple[str, int]:
"""
Convert seconds into a human-readable string:
- Example outputs:
"32 minutes"
"8 days, 4 hours"
"1 year, 3 months"
- Returns a tuple:
(Human-readable string, total seconds)
"""
seconds = int(seconds) # Ensure integer seconds
seconds_int = seconds
# Define time units
units = [
("year", 31536000), # 365 days
("month", 2592000), # 30 days
("day", 86400), # 24 hours
("hour", 3600), # 60 minutes
("minute", 60),
("second", 1)
]
# Compute time breakdown
time_values = []
for unit_name, unit_seconds in units:
value, seconds = divmod(seconds, unit_seconds)
if value > 0:
time_values.append(f"{value} {unit_name}{'s' if value > 1 else ''}") # Auto pluralize
# Return only the **two most significant** time units (e.g., "3 days, 4 hours")
return (", ".join(time_values[:2]), seconds_int) if time_values else ("0 seconds", 0)
def get_random_reply(dictionary_name: str, category: str, **variables) -> str:
"""
Fetches a random string from a given dictionary and category.
Supports variable substitution using keyword arguments.
:param dictionary_name: The name of the dictionary file (without .json)
:param category: The category (key) inside the dictionary to fetch a response from
:param variables: Keyword arguments to replace placeholders in the string
:return: A formatted string with the variables replaced
"""
file_path = os.path.join(DICTIONARY_PATH, f"{dictionary_name}.json")
# Ensure file exists
if not os.path.exists(file_path):
return f"[Error: Missing {dictionary_name}.json]"
try:
with open(file_path, "r", encoding="utf-8") as file:
data = json.load(file)
except json.JSONDecodeError:
return f"[Error: Failed to load {dictionary_name}.json]"
# Ensure category exists
if category not in data or not isinstance(data[category], list):
return f"[Error: No valid entries for {category} in {dictionary_name}.json]"
# Select a random reply
response = random.choice(data[category])
# Replace placeholders with provided variables
return response.format(**variables)
##############################
# Basic sanitization
# DO NOT RELY SOLELY ON THIS
##############################
def sanitize_user_input(
user_input: str,
usage: str = "GENERAL",
max_length: int = 500
):
"""
A whitelisting-based function for sanitizing user input.
Returns a tuple of:
(sanitized_str, sanitization_applied_bool, sanitization_reason, original_str)
:param user_input: The raw string from the user (e.g., from Twitch or Discord).
:param usage:
- 'CALC': Keep digits, math operators, parentheses, etc.
- 'GENERAL': Keep typical readable characters & punctuation.
:param max_length: Truncate the input if it exceeds this length.
:return: (sanitized_str, bool, reason_string, original_str)
======================
SECURITY RECOMMENDATIONS
======================
1) For database storage (MariaDB, etc.):
- **Always** use parameterized queries or an ORM with bound parameters.
- Do not rely solely on string sanitization to prevent SQL injection.
2) For code execution (e.g., 'eval'):
- Avoid using eval/exec on user input.
- If you must, consider a restricted math parser or an audited sandbox.
3) For HTML sanitization:
- Bleach is deprecated; research modern alternatives or frameworks that
safely sanitize HTML output. This function does *not* sanitize HTML tags.
"""
original_string = str(user_input)
reasons = []
sanitization_applied = False
# 1. Truncate and remove newlines, tabs, etc.
truncated = original_string[:max_length]
truncated = re.sub(r"[\r\n\t]+", " ", truncated)
sanitized = truncated
# 2. Choose how to filter based on usage
usage = usage.upper()
if usage == "CALC":
# Allow digits, +, -, *, /, %, parentheses, decimal points, ^ for exponent, spaces
# Remove everything else
pattern = r"[^0-9+\-*/%().^ \t]"
new_sanitized = re.sub(pattern, "", sanitized)
if new_sanitized != sanitized:
sanitization_applied = True
reasons.append("CALC: Removed non-math characters.")
sanitized = new_sanitized
else: # GENERAL usage
if USE_REGEX_LIB:
# Remove ASCII control chars (0-31, 127) first
step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized)
# Then apply a fairly broad whitelist:
# \p{L}: letters; \p{N}: numbers; \p{P}: punctuation; \p{S}: symbols; \p{Z}: separators (including spaces).
# This keeps emojis, foreign characters, typical punctuation, etc.
pattern = r"[^\p{L}\p{N}\p{P}\p{S}\p{Z}]"
new_sanitized = regex.sub(pattern, "", step1)
if new_sanitized != sanitized:
sanitization_applied = True
reasons.append("GENERAL: Removed disallowed chars via regex.")
sanitized = new_sanitized
else:
# Fallback: If 'regex' is not installed, remove control chars and keep ASCII printable only.
step1 = re.sub(r"[\x00-\x1F\x7F]", "", sanitized)
pattern = r"[^ -~]" # Keep only ASCII 32-126
new_sanitized = re.sub(pattern, "", step1)
if new_sanitized != sanitized:
sanitization_applied = True
reasons.append("GENERAL: Removed non-ASCII or control chars (fallback).")
sanitized = new_sanitized
# 3. Final trim
sanitized = sanitized.strip()
# 4. Prepare output
reason_string = "; ".join(reasons)
return (sanitized, sanitization_applied, reason_string, original_string)
#####################
# Help command logic
#####################
async def handle_help_command(ctx, command_name, bot, is_discord, log_func):
"""
Called by the platform-specific help commands to provide the help text.
:param ctx: discord.py or twitchio context
:param command_name: e.g. "quote" or None if user typed just "!help"
:param bot: The current bot instance
:param is_discord: True for Discord, False for Twitch
:param log_func: The logging function
"""
# If there's no loaded help_data, we can't do much
if not hasattr(bot, "help_data") or not bot.help_data:
return await send_message(ctx, "No help data found.")
help_data = bot.help_data # The parsed JSON from e.g. help_discord.json
if "commands" not in help_data:
return await send_message(ctx, "Invalid help data structure (no 'commands' key).")
if not command_name:
# User typed just "!help" => list all known commands from this bot
loaded_cmds = get_loaded_commands(bot, log_func, is_discord)
if not loaded_cmds:
return await send_message(ctx, "I have no commands loaded.")
else:
if is_discord:
help_str = f"I currently offer these commands:"
for cmd in loaded_cmds:
help_str += f"\n- !{cmd}"
help_str += f"\n*Use '!help <command>' for more details.*"
return await send_message(
ctx,
help_str
)
else:
short_list = ", ".join(loaded_cmds)
# We can also mention "Use !help [command] for more info."
return await send_message(
ctx,
f"I currently offer these commands:{short_list}. \nUse '!help <command>' for details."
)
# 1) Check if the command is loaded
loaded = (command_name in get_loaded_commands(bot, log_func, is_discord))
# 2) Check if it has help info in the JSON
cmd_help = help_data["commands"].get(command_name, None)
if loaded and cmd_help:
# The command is loaded, and we have help info => show it
if is_discord:
msg = build_discord_help_message(command_name, cmd_help)
else:
msg = build_twitch_help_message(command_name, cmd_help)
await send_message(ctx, msg)
elif loaded and not cmd_help:
# The command is loaded but no help info => mention that
await send_message(ctx, f"The '{command_name}' command is loaded but has no help info yet.")
elif (not loaded) and cmd_help:
# The command is not loaded, but we have an entry => mention it's unloaded/deprecated
await send_message(ctx, f"The '{command_name}' command is not currently loaded (deprecated or unavailable).")
else:
# Not loaded, no help info => not found at all
await send_message(ctx, f"I'm sorry, I don't offer a command named '{command_name}'.")
def initialize_help_data(bot, help_json_path, is_discord, log_func):
"""
Loads help data from a JSON file, stores it in bot.help_data,
then verifies each loaded command vs. the help_data.
Logs any mismatches:
- Commands in help file but not loaded => "deprecated"
- Loaded commands not in help file => "missing help"
"""
platform_name = "Discord" if is_discord else "Twitch"
if not os.path.exists(help_json_path):
log_func(f"Help file '{help_json_path}' not found. No help data loaded.", "WARNING")
bot.help_data = {}
return
# Load the JSON
try:
with open(help_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
log_func(f"Error parsing help JSON '{help_json_path}': {e}", "ERROR")
data = {}
bot.help_data = data
# Now cross-check the loaded commands vs. the data
loaded_cmds = set(get_loaded_commands(bot, log_func, is_discord))
if "commands" not in data:
log_func(f"No 'commands' key in {help_json_path}, skipping checks.", "ERROR")
return
file_cmds = set(data["commands"].keys())
# 1) Commands in file but not loaded
missing_cmds = file_cmds - loaded_cmds
for cmd in missing_cmds:
log_func(f"Help file has '{cmd}', but it's not loaded on this {platform_name} bot (deprecated?).", "WARNING")
# 2) Commands loaded but not in file
needed_cmds = loaded_cmds - file_cmds
for cmd in needed_cmds:
log_func(f"Command '{cmd}' is loaded on {platform_name} but no help info is provided in {help_json_path}.", "WARNING")
def get_loaded_commands(bot, log_func, is_discord):
from discord.ext import commands as discord_commands
from twitchio.ext import commands as twitch_commands
commands_list = []
try:
_bot_type = str(type(bot)).split("_")[1].split(".")[0]
log_func(f"Currently processing commands for {_bot_type} ...", "DEBUG")
except Exception as e:
log_func(f"Unable to determine current bot type: {e}", "WARNING")
# For Discord
if is_discord:
#if isinstance(bot, discord_commands.Bot):
try:
# 'bot.commands' is a set of Command objects
for cmd_obj in bot.commands:
commands_list.append(cmd_obj.name)
debug_level ="DEBUG" if len(commands_list) > 0 else "WARNING"
log_func(f"Discord commands body: {commands_list}", f"{debug_level}")
except Exception as e:
log_func(f"Error retrieving Discord commands: {e}", "ERROR")
elif not is_discord:
try:
for cmd_obj in bot._commands:
commands_list.append(cmd_obj)
debug_level ="DEBUG" if len(commands_list) > 0 else "WARNING"
log_func(f"Twitch commands body: {commands_list}", f"{debug_level}")
except Exception as e:
log_func(f"Error retrieving Twitch commands: {e}", "ERROR")
else:
log_func(f"Unable to determine platform in 'get_loaded_commands()'!", "CRITICAL")
log_func(f"... Finished processing commands for {_bot_type} ...", "DEBUG")
return sorted(commands_list)
def build_discord_help_message(cmd_name, cmd_help_dict):
"""
A verbose multi-line string for Discord.
"""
description = cmd_help_dict.get("description", "No description available.\n")
subcommands = cmd_help_dict.get("subcommands", {})
examples = cmd_help_dict.get("examples", [])
lines = [f"**Help for `{cmd_name}`**:",
f"Description: {description}"]
if subcommands:
lines.append("\n**Subcommands / Arguments:**")
for sub, detail in subcommands.items():
arg_part = detail.get("args", "")
desc_part = detail.get("desc", "")
lines.append(f"• **{sub}** {arg_part} **->** {desc_part}")
else:
lines.append("\n*No subcommands defined.*")
if examples:
lines.append("\nExample usage:")
for ex in examples:
ex_arr = ex.split(" : ", 1) # Split into max 2 parts
# Handle missing description case
ex_cmd = ex_arr[0]
ex_note = str(f"\n {ex_arr[1]}") if len(ex_arr) > 1 else ""
lines.append(f"- `{ex_cmd}`{ex_note}")
return "\n".join(lines)
def build_twitch_help_message(cmd_name, cmd_help_dict):
"""
A concise, possibly single-line help for Twitch.
"""
description = cmd_help_dict.get("description", "No description available.")
subcommands = cmd_help_dict.get("subcommands", {})
sub_line_parts = []
for sub, detail in subcommands.items():
if sub == "no subcommand":
sub_line_parts.append(f"!{cmd_name}")
else:
arg_part = detail.get("args", "")
if arg_part:
sub_line_parts.append(f"!{cmd_name} {sub} {arg_part}".strip())
else:
sub_line_parts.append(f"!{cmd_name} {sub}".strip())
usage_line = " | ".join(sub_line_parts) if sub_line_parts else f"!{cmd_name}"
return f"Help for !{cmd_name}: {description}. Usage: {usage_line}"
async def send_message(ctx, text):
"""
Minimal helper to send a message to either Discord or Twitch.
"""
await ctx.send(text)
def track_user_activity(
db_conn,
log_func,
platform: str,
user_id: str,
username: str,
display_name: str,
user_is_bot: bool = False
):
"""
Checks or creates/updates a user in the 'users' table for the given platform's message.
:param db_conn: The active DB connection
:param log_func: The logging function (message, level="INFO")
:param platform: "discord" or "twitch"
:param user_id: e.g., Discord user ID or Twitch user ID
:param username: The raw username (no #discriminator for Discord)
:param display_name: The users display name
:param user_is_bot: Boolean if the user is recognized as a bot on that platform
"""
log_func(f"UUI Lookup for: {username} - {user_id} ({platform.lower()}) ...", "DEBUG")
# Decide which column we use for the ID lookup
# "discord_user_id" or "twitch_user_id"
if platform.lower() in ("discord", "twitch"):
identifier_type = f"{platform.lower()}_user_id"
else:
log_func(f"Unknown platform '{platform}' in track_user_activity!", "WARNING")
return
# 1) Try to find an existing user row
user_data = lookup_user(db_conn, log_func, identifier=user_id, identifier_type=identifier_type)
if user_data:
# Found an existing row for that user ID on this platform
# Check if the username or display_name is different => if so, update
need_update = False
column_updates = []
params = []
log_func(f"... Returned {user_data}", "DEBUG")
if platform.lower() == "discord":
if user_data["discord_username"] != username:
need_update = True
column_updates.append("discord_username = ?")
params.append(username)
if user_data["discord_user_display_name"] != display_name:
need_update = True
column_updates.append("discord_user_display_name = ?")
params.append(display_name)
# Possibly check user_is_bot
# If it's different than what's stored, update
# (We must add a column in your table for that if you want it stored per-platform.)
# For demonstration, let's store it in "user_is_bot"
if user_data["user_is_bot"] != user_is_bot:
need_update = True
column_updates.append("user_is_bot = ?")
params.append(int(user_is_bot))
if need_update:
set_clause = ", ".join(column_updates)
update_sql = f"""
UPDATE users
SET {set_clause}
WHERE discord_user_id = ?
"""
params.append(user_id)
rowcount = run_db_operation(db_conn, "update", update_sql, params=params, log_func=log_func)
if rowcount and rowcount > 0:
log_func(f"Updated Discord user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
elif platform.lower() == "twitch":
if user_data["twitch_username"] != username:
need_update = True
column_updates.append("twitch_username = ?")
params.append(username)
if user_data["twitch_user_display_name"] != display_name:
need_update = True
column_updates.append("twitch_user_display_name = ?")
params.append(display_name)
# Possibly store is_bot in user_is_bot
if user_data["user_is_bot"] != user_is_bot:
need_update = True
column_updates.append("user_is_bot = ?")
params.append(int(user_is_bot))
if need_update:
set_clause = ", ".join(column_updates)
update_sql = f"""
UPDATE users
SET {set_clause}
WHERE twitch_user_id = ?
"""
params.append(user_id)
rowcount = run_db_operation(db_conn, "update", update_sql, params=params, log_func=log_func)
if rowcount and rowcount > 0:
log_func(f"Updated Twitch user '{username}' (display '{display_name}') in 'users'.", "DEBUG")
else:
# 2) No row found => create a new user row
# Generate a new UUID for this user
new_uuid = str(uuid.uuid4())
if platform.lower() == "discord":
insert_sql = """
INSERT INTO users (
UUID,
discord_user_id,
discord_username,
discord_user_display_name,
user_is_bot
)
VALUES (?, ?, ?, ?, ?)
"""
params = (new_uuid, user_id, username, display_name, int(user_is_bot))
else: # "twitch"
insert_sql = """
INSERT INTO users (
UUID,
twitch_user_id,
twitch_username,
twitch_user_display_name,
user_is_bot
)
VALUES (?, ?, ?, ?, ?)
"""
params = (new_uuid, user_id, username, display_name, int(user_is_bot))
rowcount = run_db_operation(db_conn, "write", insert_sql, params, log_func=log_func)
if rowcount and rowcount > 0:
log_func(f"Created new user row for {platform} user '{username}' (display '{display_name}') with UUID={new_uuid}.", "DEBUG")
else:
log_func(f"Failed to create new user row for {platform} user '{username}'", "ERROR")
from modules.db import log_bot_event
def log_bot_startup(db_conn, log_func):
"""
Logs a bot startup event.
"""
log_bot_event(db_conn, log_func, "BOT_STARTUP", "Bot successfully started.")
def log_bot_shutdown(db_conn, log_func, intent: str = "Error/Crash"):
"""
Logs a bot shutdown event.
"""
log_bot_event(db_conn, log_func, "BOT_SHUTDOWN", f"Bot is shutting down - {intent}.")
def generate_link_code():
"""Generates a unique 8-character alphanumeric link code."""
import random, string
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
###############################################
# Development Test Function (called upon start)
###############################################
def dev_func(db_conn, log, enable: bool = False):
if enable:
id = "203190147582394369"
id_type = "discord_user_id"
uui_info = lookup_user(db_conn, log, identifier=id, identifier_type=id_type)
if uui_info:
return list(uui_info.values())
else:
return f"User with identifier '{id}' ({id_type}) not found"