Experimental early implementation of expanded permissions system and runtime checks

experimental
Kami 2025-03-08 18:11:42 +01:00
parent d1faf7f214
commit d541f65804
6 changed files with 381 additions and 179 deletions

View File

@ -102,7 +102,7 @@ def handle_howl_normal(ctx, platform, author_id, author_display_name) -> str:
# Consistent UUID lookup
user_data = db.lookup_user(db_conn, identifier=author_id, identifier_type=f"{platform}_user_id")
if user_data:
user_uuid = user_data["UUID"]
user_uuid = user_data["uuid"]
db.insert_howl(db_conn, user_uuid, howl_val)
else:
globals.log(f"Could not find user by ID={author_id} on {platform}. Not storing howl.", "WARNING")
@ -140,11 +140,15 @@ def handle_howl_stats(ctx, platform, target_name) -> str:
# Otherwise, lookup a single user
user_data = db.lookup_user(db_conn, identifier=target_name, identifier_type=f"{platform}_username")
if not user_data:
user_data = db.lookup_user(db_conn, identifier=target_name, identifier_type=f"{platform}_displayname")
if not user_data:
user_data = db.lookup_user(db_conn, identifier=target_name, identifier_type=f"{platform}_user_id")
if not user_data:
utility.wfetl()
return f"I don't know that user: {target_name}"
user_uuid = user_data["UUID"]
user_uuid = user_data["uuid"]
stats = db.get_howl_stats(db_conn, user_uuid)
if not stats:
utility.wfetl()
@ -167,7 +171,7 @@ def lookup_user_by_name(db_conn, platform, name_str):
utility.wfstl()
if platform == "discord":
ud = db.lookup_user(db_conn, name_str, "discord_user_display_name")
ud = db.lookup_user(db_conn, name_str, "discord_display_name")
if ud:
utility.wfetl()
return ud
@ -176,7 +180,7 @@ def lookup_user_by_name(db_conn, platform, name_str):
return ud
elif platform == "twitch":
ud = db.lookup_user(db_conn, name_str, "twitch_user_display_name")
ud = db.lookup_user(db_conn, name_str, "twitch_display_name")
if ud:
utility.wfetl()
return ud
@ -412,7 +416,7 @@ async def add_new_quote(db_conn, is_discord, ctx, quote_text, game_name: str = N
utility.wfetl()
return "Could not save quote. Your user data is missing from the system."
user_uuid = user_data["UUID"]
user_uuid = user_data["uuid"]
channel_name = "Discord" if is_discord else ctx.channel.name
if is_discord or not game_name:
game_name = None
@ -455,7 +459,7 @@ async def remove_quote(db_conn, is_discord: bool, ctx, quote_id_str):
utility.wfetl()
return "Could not remove quote. Your user data is missing from the system."
user_uuid = user_data["UUID"]
user_uuid = user_data["uuid"]
quote_id = int(quote_id_str)
remover_user = str(user_uuid)

View File

@ -2,6 +2,7 @@
from twitchio.ext import commands
from cmd_common import common_commands as cc
from modules.utility import is_channel_live
from modules.permissions import has_permission
def setup(bot):
"""
@ -16,6 +17,10 @@ def setup(bot):
- !howl -> Attempts a howl.
- !howl stat <user> -> Looks up howling stats for a user.
"""
if not await is_channel_live(bot):
user_roles = ctx.author.badges.keys() # Extract Twitch user badges
if not has_permission("howl", str(ctx.author.id), user_roles, "twitch", ctx.channel.name):
await ctx.reply(f"You don't have permission to use this command.")
return
response = cc.handle_howl_command(ctx)
await ctx.reply(response)

View File

@ -258,50 +258,56 @@ def ensure_quotes_table(db_conn):
def ensure_users_table(db_conn):
"""
Checks if 'Users' table exists. If not, creates it.
Ensures the 'Users' table exists and has the necessary columns.
"""
is_sqlite = "sqlite3" in str(type(db_conn)).lower()
if is_sqlite:
check_sql = """
SELECT name
FROM sqlite_master
WHERE type='table'
AND name='Users'
"""
check_sql = "SELECT name FROM sqlite_master WHERE type='table' AND name='Users'"
else:
check_sql = """
SELECT table_name
FROM information_schema.tables
WHERE table_name = 'Users'
AND table_schema = DATABASE()
SELECT table_name FROM information_schema.tables
WHERE table_name = 'Users' AND table_schema = DATABASE()
"""
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
globals.log("Table 'Users' already exists, skipping creation.", "DEBUG")
if rows and rows[0]:
globals.log("Table 'Users' already exists, checking for column updates.", "DEBUG")
# Ensure 'last_seen' column exists
column_check_sql = "PRAGMA table_info(Users)" if is_sqlite else """
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'Users' AND COLUMN_NAME = 'last_seen'
"""
columns = run_db_operation(db_conn, "read", column_check_sql)
if not any("last_seen" in col for col in columns):
globals.log("Adding 'last_seen' column to 'Users'...", "INFO")
alter_sql = "ALTER TABLE Users ADD COLUMN last_seen TEXT DEFAULT CURRENT_TIMESTAMP" if is_sqlite else """
ALTER TABLE Users ADD COLUMN last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
"""
run_db_operation(db_conn, "write", alter_sql)
return
globals.log("Table 'Users' does not exist; creating now...", "INFO")
if is_sqlite:
create_sql = """
CREATE TABLE Users (
UUID TEXT PRIMARY KEY,
Unified_Username TEXT,
datetime_linked TEXT,
user_is_banned BOOLEAN DEFAULT 0,
user_is_bot BOOLEAN DEFAULT 0
user_is_bot BOOLEAN DEFAULT 0,
last_seen TEXT DEFAULT CURRENT_TIMESTAMP
)
"""
else:
create_sql = """
""" if is_sqlite else """
CREATE TABLE Users (
UUID VARCHAR(36) PRIMARY KEY,
Unified_Username VARCHAR(100),
datetime_linked DATETIME,
user_is_banned BOOLEAN DEFAULT FALSE,
user_is_bot BOOLEAN DEFAULT FALSE
user_is_bot BOOLEAN DEFAULT FALSE,
last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
@ -313,14 +319,14 @@ def ensure_users_table(db_conn):
globals.log("Successfully created table 'Users'.", "INFO")
#######################
# Ensure 'platform_mapping' table
#######################
def ensure_platform_mapping_table(db_conn):
"""
Ensures the 'Platform_Mapping' table exists.
This table maps platform-specific user IDs to the universal UUID.
Ensures the 'Platform_Mapping' table exists and has the necessary columns.
"""
is_sqlite = "sqlite3" in str(type(db_conn)).lower()
@ -341,7 +347,27 @@ def ensure_platform_mapping_table(db_conn):
rows = run_db_operation(db_conn, "read", check_sql)
if rows and rows[0] and rows[0][0]:
globals.log("Table 'Platform_Mapping' already exists, skipping creation.", "DEBUG")
globals.log("Table 'Platform_Mapping' already exists, checking for column updates.", "DEBUG")
# Check if last_seen column exists
column_check_sql = """
PRAGMA table_info(Platform_Mapping)
""" if is_sqlite else """
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'Platform_Mapping' AND COLUMN_NAME = 'last_seen'
"""
columns = run_db_operation(db_conn, "read", column_check_sql)
# If column doesn't exist, add it
if not any("last_seen" in col for col in columns):
globals.log("Adding 'last_seen' column to 'Platform_Mapping'...", "INFO")
alter_sql = """
ALTER TABLE Platform_Mapping ADD COLUMN last_seen TEXT DEFAULT CURRENT_TIMESTAMP
""" if is_sqlite else """
ALTER TABLE Platform_Mapping ADD COLUMN last_seen DATETIME DEFAULT CURRENT_TIMESTAMP
"""
run_db_operation(db_conn, "write", alter_sql)
return
globals.log("Table 'Platform_Mapping' does not exist; creating now...", "INFO")
@ -354,6 +380,7 @@ def ensure_platform_mapping_table(db_conn):
UUID TEXT,
Display_Name TEXT,
Username TEXT,
last_seen TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (Platform_User_ID, Platform_Type),
FOREIGN KEY (UUID) REFERENCES Users(UUID) ON DELETE CASCADE
)
@ -366,6 +393,7 @@ def ensure_platform_mapping_table(db_conn):
UUID VARCHAR(36),
Display_Name VARCHAR(100),
Username VARCHAR(100),
last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (Platform_User_ID, Platform_Type),
FOREIGN KEY (UUID) REFERENCES Users(UUID) ON DELETE CASCADE
)
@ -386,12 +414,14 @@ def ensure_platform_mapping_table(db_conn):
def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifier: str = None):
"""
Looks up a user in the 'Users' table using 'Platform_Mapping' for platform-specific IDs or UUID.
Looks up a user in the 'Users' table using 'Platform_Mapping' for platform-specific IDs, UUID, usernames, and display names.
identifier_type can be:
- "discord_user_id" or "discord"
- "twitch_user_id" or "twitch"
- "UUID" (to lookup by UUID directly)
- "uuid" (to lookup by UUID directly)
- "unified_username"
- "discord_user_id" / "twitch_user_id" (Platform-specific user ID)
- "discord_display_name" / "twitch_display_name" (Platform-specific display name)
- "discord_username" / "twitch_username" (Platform-specific raw username)
Returns:
If target_identifier is None: A dictionary with the following keys:
@ -415,17 +445,40 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
# Debug: Log the inputs
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() called with: identifier='{identifier}', identifier_type='{identifier_type}', target_identifier='{target_identifier}'", "DEBUG")
# Define platform type and column mappings
platform_map = {
"discord": "Discord",
"discord_user_id": "Discord",
"twitch": "Twitch",
"twitch_user_id": "Twitch"
# Normalize identifier_type to lowercase
identifier_type = identifier_type.lower()
# Define valid identifier types with SQL column mappings
valid_identifier_types = {
"uuid": "u.UUID",
"unified_username": "u.Unified_Username",
"discord_user_id": "pm.Platform_User_ID",
"twitch_user_id": "pm.Platform_User_ID",
"discord_display_name": "pm.Display_Name",
"twitch_display_name": "pm.Display_Name",
"discord_username": "pm.Username",
"twitch_username": "pm.Username",
}
# Handle UUID case separately
if identifier_type.upper() == "UUID":
query = """
# Extract platform from identifier type (if applicable)
platform_map = {
"discord_user_id": "Discord",
"twitch_user_id": "Twitch",
"discord_display_name": "Discord",
"twitch_display_name": "Twitch",
"discord_username": "Discord",
"twitch_username": "Twitch",
}
if identifier_type not in valid_identifier_types:
globals.log(f"lookup_user error: invalid identifier_type '{identifier_type}'", "WARNING")
return None
column_to_lookup = valid_identifier_types[identifier_type]
platform_filter = platform_map.get(identifier_type, None)
# Construct query
query = f"""
SELECT
u.UUID,
u.Unified_Username,
@ -438,43 +491,23 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
pm.Platform_Type
FROM Users u
LEFT JOIN Platform_Mapping pm ON u.UUID = pm.UUID
WHERE u.UUID = ?
LIMIT 1
WHERE {column_to_lookup} = ?
"""
params = (identifier,)
else:
# Handle platform-specific lookups
if identifier_type.lower() not in platform_map:
globals.log(f"lookup_user error: invalid identifier_type '{identifier_type}'", "WARNING")
return None
# Get the platform type (Discord or Twitch)
platform_type = platform_map[identifier_type.lower()]
params = [identifier]
# Use platform_user_id to lookup the UUID
query = """
SELECT
u.UUID,
u.Unified_Username,
u.datetime_linked,
u.user_is_banned,
u.user_is_bot,
pm.Platform_User_ID,
pm.Display_Name,
pm.Username,
pm.Platform_Type
FROM Users u
INNER JOIN Platform_Mapping pm ON u.UUID = pm.UUID
WHERE pm.Platform_Type = ? AND pm.Platform_User_ID = ?
LIMIT 1
"""
params = (platform_type, identifier)
# Apply platform filter if applicable
if platform_filter:
query += " AND pm.Platform_Type = ?"
params.append(platform_filter)
query += " LIMIT 1"
# Debug: Log the query and parameters
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() executing query: {query} with params={params}", "DEBUG")
# Run the query
rows = run_db_operation(db_conn, "read", query, params)
rows = run_db_operation(db_conn, "read", query, tuple(params))
# Debug: Log the result of the query
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() query result: {rows}", "DEBUG")
@ -487,7 +520,7 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
# Convert the row to a dictionary
row = rows[0]
user_data = {
"UUID": row[0], # Make UUID consistently uppercase
"uuid": row[0], # Ensure UUID is uppercase
"unified_username": row[1],
"datetime_linked": row[2],
"user_is_banned": row[3],
@ -503,7 +536,7 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
# If target_identifier is provided, return just that value
if target_identifier:
target_identifier = target_identifier.upper() # Force uppercase for consistency
target_identifier = target_identifier.lower()
if target_identifier in user_data:
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() returning target_identifier='{target_identifier}' with value='{user_data[target_identifier]}'", "DEBUG")
return user_data[target_identifier]
@ -514,6 +547,49 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() returning full user_data: {user_data}", "DEBUG")
return user_data
def user_lastseen(db_conn, UUID: str, platform_name: str = None, platform_user_id: str | int = None, lookup: bool = False, update: bool = False):
"""
Handles user last seen updates and lookups.
- `lookup=True`: Fetches the last-seen timestamp.
- `update=True`: Updates the last-seen timestamp.
- If platform_name and platform_user_id are provided, the query will be scoped to that account.
- Otherwise, it applies to all accounts unified under the UUI system.
"""
if not UUID:
globals.log("UUID is required for user_lastseen()", "ERROR")
return None
if lookup:
query = """
SELECT last_seen FROM Platform_Mapping WHERE UUID = ?
""" if not platform_name or not platform_user_id else """
SELECT last_seen FROM Platform_Mapping WHERE UUID = ? AND Platform_Type = ? AND Platform_User_ID = ?
"""
params = (UUID,) if not platform_name or not platform_user_id else (UUID, platform_name, str(platform_user_id))
result = run_db_operation(db_conn, "read", query, params)
if update:
update_sql = """
UPDATE Platform_Mapping SET last_seen = CURRENT_TIMESTAMP WHERE UUID = ?
""" if not platform_name or not platform_user_id else """
UPDATE Platform_Mapping SET last_seen = CURRENT_TIMESTAMP WHERE UUID = ? AND Platform_Type = ? AND Platform_User_ID = ?
"""
params = (UUID,) if not platform_name or not platform_user_id else (UUID, platform_name, str(platform_user_id))
run_db_operation(db_conn, "write", update_sql, params)
globals.log(f"Updated last_seen timestamp for UUID={UUID}", "DEBUG")
if lookup:
if result and result[0]:
return result[0][0] # Return last seen datetime
return None # No data found
elif update and not lookup:
return True
return False # No action taken
def ensure_chatlog_table(db_conn):
"""
@ -590,7 +666,7 @@ def log_message(db_conn, identifier, identifier_type, message_content, platform,
globals.log(f"User not found for {identifier_type}='{identifier}'", "WARNING")
return
user_uuid = user_data["UUID"]
user_uuid = user_data["uuid"]
message_uuid = str(uuid.uuid4()) # Generate a new UUID for the entry
# Determine if a message ID is required for this platform
@ -835,7 +911,7 @@ def log_discord_activity(db_conn, guild_id, user_identifier, action, voice_chann
globals.log(f"User not found for Discord ID: {user_identifier}", "WARNING")
return
user_uuid = user_data["UUID"]
user_uuid = user_data["uuid"]
# Prepare the voice_channel value (if its an object with a name, use that).
channel_val = voice_channel.name if (voice_channel and hasattr(voice_channel, "name")) else voice_channel
@ -1174,7 +1250,7 @@ async def handle_community_event(db_conn, is_discord, ctx, args):
globals.log(f"User not found: {ctx.author.name} ({user_id}) on {platform}", "ERROR")
return "Could not log event: user data missing."
user_uuid = user_data["UUID"]
user_uuid = user_data["uuid"]
# Insert new event. Adjust for SQLite or MariaDB.
insert_sql = """

View File

@ -1,67 +1,97 @@
# modules/permissions.py
import json
import os
import globals
import discord
PERMISSIONS_FILE = "permissions.json"
TWITCH_CONFIG_FILE = "settings/twitch_channels_config.json"
# Load permission settings
def load_permissions():
"""Loads the permissions from JSON."""
if not os.path.exists(PERMISSIONS_FILE):
##########################
# Load Configurations
##########################
def load_json_file(file_path):
"""Loads JSON data from a file, returns an empty dict if missing or invalid."""
if not os.path.exists(file_path):
return {}
with open(PERMISSIONS_FILE, "r", encoding="utf-8") as file:
try:
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
except json.JSONDecodeError as e:
globals.log(f"Error parsing JSON file {file_path}: {e}", "ERROR")
return {}
def map_roles(platform: str, user_roles: list) -> list:
def load_permissions():
"""Dynamically loads permissions from `permissions.json`."""
return load_json_file(PERMISSIONS_FILE)
def load_twitch_config():
"""Dynamically loads Twitch-specific command allow/deny lists."""
return load_json_file(TWITCH_CONFIG_FILE)
##########################
# Role Mapping
##########################
def map_roles(platform: str, user_roles: list, context_identifier: str = None) -> list:
"""
Maps platform-specific roles to a unified role system.
Supports per-guild (Discord) and per-channel (Twitch) overrides.
:param platform: "discord" or "twitch"
:param user_roles: A list of raw roles/badges from the platform
:return: A list of mapped roles based on the JSON role mapping
:param user_roles: List of raw roles/badges from the platform
:param context_identifier: Guild ID (for Discord) or Channel Name (for Twitch)
:return: List of mapped roles
"""
permissions = load_permissions()
role_mappings = permissions.get("role_mappings", {}).get(platform, {})
mapped_roles = []
for role in user_roles:
normalized_role = role.lower()
mapped_role = role_mappings.get(normalized_role, None)
if mapped_role:
mapped_roles.append(mapped_role)
# Allow per-context overrides
if context_identifier:
specific_mappings = permissions.get("role_mappings", {}).get(f"{platform}_{context_identifier}", {})
role_mappings.update(specific_mappings) # Override defaults
return mapped_roles if mapped_roles else ["everyone"]
mapped_roles = [role_mappings.get(role.lower(), role.lower()) for role in user_roles]
def has_permission(command_name: str, user_id: str, user_roles: list, platform: str) -> bool:
return list(set(mapped_roles)) if mapped_roles else ["everyone"]
##########################
# Permissions Checks
##########################
def has_permission(command_name: str, user_id: str, user_roles: list, platform: str, context_identifier: str = None) -> bool:
"""
Checks if a user has permission to run a command.
Checks if a user has permission to execute a command.
:param command_name: The name of the command being checked.
:param user_id: The ID of the user requesting the command.
:param user_roles: A list of roles/badges the user has (platform-specific).
:param command_name: The command to check
:param user_id: The user's ID
:param user_roles: The user's roles/badges
:param platform: "discord" or "twitch"
:return: True if the user has permission, otherwise False.
:param context_identifier: Guild ID (for Discord) or Channel Name (for Twitch)
:return: True if the user has permission, False otherwise
"""
permissions = load_permissions()
command_perms = permissions.get("commands", {}).get(command_name, {})
# Extract settings
# Extract permission settings
min_role = command_perms.get("min_role", "")
allowed_roles = command_perms.get("allowed_roles", [])
allowed_users = command_perms.get("allowed_users", [])
# If no min_role and no allowed_roles/users, it's open to everyone
# Auto-allow if no specific rules exist
if not min_role and not allowed_roles and not allowed_users:
return True
# Check if user is explicitly allowed
# Explicit user whitelist
if user_id in allowed_users:
return True # Bypass role check
return True
# Convert platform-specific roles to mapped roles
mapped_roles = map_roles(platform, user_roles)
# Convert platform roles
mapped_roles = map_roles(platform, user_roles, context_identifier)
# If a min_role is set, check against it
# Check minimum required role
if min_role:
role_hierarchy = ["everyone", "follower", "subscriber", "vip", "moderator", "admin", "owner"]
user_role_level = max([role_hierarchy.index(role) for role in mapped_roles if role in role_hierarchy], default=0)
@ -69,12 +99,113 @@ def has_permission(command_name: str, user_id: str, user_roles: list, platform:
if user_role_level >= min_role_level:
return True
# Check if the user has any explicitly allowed roles
# Check explicitly allowed roles
if any(role in allowed_roles for role in mapped_roles):
return True
return False
##########################
# Twitch Command Filtering
##########################
def is_command_allowed_twitch(command_name: str, channel_name: str) -> bool:
"""
Checks if a command is allowed in a specific Twitch channel.
:param command_name: The command being checked
:param channel_name: The Twitch channel name
:return: True if allowed, False if blocked
"""
twitch_config = load_twitch_config()
channel_config = twitch_config.get(channel_name.lower(), {})
if not channel_config:
return False # Default to deny if no config exists
mode = channel_config.get("commands_filter_mode", "exclude")
filtered_commands = channel_config.get("commands_filtered", [])
if mode == "exclude" and command_name in filtered_commands:
return False
if mode == "include" and command_name not in filtered_commands:
return False
return True
# modules/permissions.py
# Load permission settings
# def load_permissions():
# """Loads the permissions from JSON."""
# if not os.path.exists(PERMISSIONS_FILE):
# return {}
# with open(PERMISSIONS_FILE, "r", encoding="utf-8") as file:
# return json.load(file)
# def map_roles(platform: str, user_roles: list) -> list:
# """
# Maps platform-specific roles to a unified role system.
# :param platform: "discord" or "twitch"
# :param user_roles: A list of raw roles/badges from the platform
# :return: A list of mapped roles based on the JSON role mapping
# """
# permissions = load_permissions()
# role_mappings = permissions.get("role_mappings", {}).get(platform, {})
# mapped_roles = []
# for role in user_roles:
# normalized_role = role.lower()
# mapped_role = role_mappings.get(normalized_role, None)
# if mapped_role:
# mapped_roles.append(mapped_role)
# return mapped_roles if mapped_roles else ["everyone"]
# def has_permission(command_name: str, user_id: str, user_roles: list, platform: str) -> bool:
# """
# Checks if a user has permission to run a command.
# :param command_name: The name of the command being checked.
# :param user_id: The ID of the user requesting the command.
# :param user_roles: A list of roles/badges the user has (platform-specific).
# :param platform: "discord" or "twitch"
# :return: True if the user has permission, otherwise False.
# """
# permissions = load_permissions()
# command_perms = permissions.get("commands", {}).get(command_name, {})
# # Extract settings
# min_role = command_perms.get("min_role", "")
# allowed_roles = command_perms.get("allowed_roles", [])
# allowed_users = command_perms.get("allowed_users", [])
# # If no min_role and no allowed_roles/users, it's open to everyone
# if not min_role and not allowed_roles and not allowed_users:
# return True
# # Check if user is explicitly allowed
# if user_id in allowed_users:
# return True # Bypass role check
# # Convert platform-specific roles to mapped roles
# mapped_roles = map_roles(platform, user_roles)
# # If a min_role is set, check against it
# if min_role:
# role_hierarchy = ["everyone", "follower", "subscriber", "vip", "moderator", "admin", "owner"]
# user_role_level = max([role_hierarchy.index(role) for role in mapped_roles if role in role_hierarchy], default=0)
# min_role_level = role_hierarchy.index(min_role) if min_role in role_hierarchy else 0
# if user_role_level >= min_role_level:
# return True
# # Check if the user has any explicitly allowed roles
# if any(role in allowed_roles for role in mapped_roles):
# return True
# return False
def has_custom_vc_permission(member: discord.Member, vc_owner_id: int = None) -> bool:
"""

View File

@ -7,7 +7,7 @@ import functools
import inspect
import uuid
from typing import Union
from modules.db import run_db_operation, lookup_user, log_message
from modules.db import run_db_operation, lookup_user, log_message, user_lastseen
import modules.utility as utility
import discord
from functools import wraps
@ -525,8 +525,6 @@ def get_loaded_commands(bot, is_discord):
Side Effects:
Logs debug, warning, and error messages regarding the command processing.
"""
from discord.ext import commands as discord_commands
from twitchio.ext import commands as twitch_commands
commands_list = []
@ -616,7 +614,6 @@ def build_discord_help_message(cmd_name, cmd_help_dict):
return "\n".join(lines)
def build_twitch_help_message(cmd_name, cmd_help_dict):
"""
Build a concise Twitch help message for a command.
@ -710,11 +707,16 @@ def track_user_activity(
if user_data:
# Existing user found, update info if necessary
user_uuid = user_data["UUID"]
user_uuid = user_data["uuid"]
need_update = False
column_updates = []
params = []
# Update user last-seen timestamp
user_lastseen(db_conn, UUID=user_uuid, platform_name=platform, platform_user_id=user_id, update=True)
last_seen = user_lastseen(db_conn, UUID=user_uuid, platform_name=platform, platform_user_id=user_id, lookup=True)
globals.log(f"Updated last-seen datetime to {last_seen}", "DEBUG")
if user_data["platform_username"] != username:
need_update = True
column_updates.append("Username = ?")
@ -760,7 +762,6 @@ def track_user_activity(
globals.log(f"Failed to create user entry for {platform} user '{username}'.", "ERROR")
from modules.db import log_bot_event
def log_bot_startup(db_conn):
@ -981,47 +982,29 @@ def list_channels(self):
f"Currently connected to {num_connected_channels} Twitch channel(s): {connected_channels_str}"
)
from functools import wraps
import globals
from modules.permissions import is_command_allowed_twitch
def command_allowed_twitch(func):
"""
A custom check that allows a command to run based on channel settings.
It looks up the current channel in CHANNEL_CONFIG and either allows or denies
the command based on the filter mode and list.
Checks if a command is allowed based on Twitch channel settings.
"""
@wraps(func)
async def wrapper(ctx, *args, **kwargs):
# Load the full configuration.
full_config = globals.constants.twitch_channels_config()
# Get the channel name and then the channel-specific configuration.
channel_name = ctx.channel.name.lower()
channel_config = full_config.get(channel_name)
# If there's no configuration for this channel, block the command.
if not channel_config:
globals.log(f"No configuration found for Twitch channel '{channel_name}'. Blocking command '{ctx.command.name}'.")
return
mode = channel_config.get("commands_filter_mode")
filtered = channel_config.get("commands_filtered", [])
command_name = ctx.command.name
# Check based on filter mode.
if mode == "exclude":
if command_name in filtered:
globals.log(f"Command '{command_name}' is excluded on Twitch channel '{channel_name}'.")
return
elif mode == "include":
if command_name not in filtered:
globals.log(f"Command '{command_name}' is not allowed on Twitch channel '{channel_name}' (include mode).")
# Block command if it's not allowed in the channel
if not is_command_allowed_twitch(command_name, channel_name):
globals.log(f"Command '{command_name}' is blocked in '{channel_name}'.", "WARNING")
return
# If all checks pass, run the command.
return await func(ctx, *args, **kwargs)
return wrapper
###############################################
# Development Test Function (called upon start)
###############################################

View File

@ -15,10 +15,13 @@
}
},
"commands": {
"hi": {
"special_vip_command": {
"min_role": "",
"allowed_roles": ["broadcaster", "owner"],
"allowed_users": ["203190147582394369"]
"allowed_roles": ["broadcaster", "vip"]
},
"owner_only": {
"min_role": "owner",
"allowed_roles": []
}
}
}