From d541f65804fa38a3309a8283713e4cfdbfa9fd90 Mon Sep 17 00:00:00 2001 From: Kami Date: Sat, 8 Mar 2025 18:11:42 +0100 Subject: [PATCH] Experimental early implementation of expanded permissions system and runtime checks --- cmd_common/common_commands.py | 20 ++- cmd_twitch/howl.py | 11 +- modules/db.py | 274 ++++++++++++++++++++++------------ modules/permissions.py | 193 ++++++++++++++++++++---- modules/utility.py | 53 +++---- permissions.json | 9 +- 6 files changed, 381 insertions(+), 179 deletions(-) diff --git a/cmd_common/common_commands.py b/cmd_common/common_commands.py index 88f08c8..8203cd8 100644 --- a/cmd_common/common_commands.py +++ b/cmd_common/common_commands.py @@ -102,7 +102,7 @@ def handle_howl_normal(ctx, platform, author_id, author_display_name) -> str: # Consistent UUID lookup user_data = db.lookup_user(db_conn, identifier=author_id, identifier_type=f"{platform}_user_id") if user_data: - user_uuid = user_data["UUID"] + user_uuid = user_data["uuid"] db.insert_howl(db_conn, user_uuid, howl_val) else: globals.log(f"Could not find user by ID={author_id} on {platform}. Not storing howl.", "WARNING") @@ -141,10 +141,14 @@ def handle_howl_stats(ctx, platform, target_name) -> str: # Otherwise, lookup a single user user_data = db.lookup_user(db_conn, identifier=target_name, identifier_type=f"{platform}_username") if not user_data: - utility.wfetl() - return f"I don't know that user: {target_name}" + user_data = db.lookup_user(db_conn, identifier=target_name, identifier_type=f"{platform}_displayname") + if not user_data: + user_data = db.lookup_user(db_conn, identifier=target_name, identifier_type=f"{platform}_user_id") + if not user_data: + utility.wfetl() + return f"I don't know that user: {target_name}" - user_uuid = user_data["UUID"] + user_uuid = user_data["uuid"] stats = db.get_howl_stats(db_conn, user_uuid) if not stats: utility.wfetl() @@ -167,7 +171,7 @@ def lookup_user_by_name(db_conn, platform, name_str): utility.wfstl() if platform == "discord": - ud = db.lookup_user(db_conn, name_str, "discord_user_display_name") + ud = db.lookup_user(db_conn, name_str, "discord_display_name") if ud: utility.wfetl() return ud @@ -176,7 +180,7 @@ def lookup_user_by_name(db_conn, platform, name_str): return ud elif platform == "twitch": - ud = db.lookup_user(db_conn, name_str, "twitch_user_display_name") + ud = db.lookup_user(db_conn, name_str, "twitch_display_name") if ud: utility.wfetl() return ud @@ -412,7 +416,7 @@ async def add_new_quote(db_conn, is_discord, ctx, quote_text, game_name: str = N utility.wfetl() return "Could not save quote. Your user data is missing from the system." - user_uuid = user_data["UUID"] + user_uuid = user_data["uuid"] channel_name = "Discord" if is_discord else ctx.channel.name if is_discord or not game_name: game_name = None @@ -455,7 +459,7 @@ async def remove_quote(db_conn, is_discord: bool, ctx, quote_id_str): utility.wfetl() return "Could not remove quote. Your user data is missing from the system." - user_uuid = user_data["UUID"] + user_uuid = user_data["uuid"] quote_id = int(quote_id_str) remover_user = str(user_uuid) diff --git a/cmd_twitch/howl.py b/cmd_twitch/howl.py index ff773e4..a6ab00f 100644 --- a/cmd_twitch/howl.py +++ b/cmd_twitch/howl.py @@ -2,6 +2,7 @@ from twitchio.ext import commands from cmd_common import common_commands as cc from modules.utility import is_channel_live +from modules.permissions import has_permission def setup(bot): """ @@ -16,6 +17,10 @@ def setup(bot): - !howl -> Attempts a howl. - !howl stat -> Looks up howling stats for a user. """ - if not await is_channel_live(bot): - response = cc.handle_howl_command(ctx) - await ctx.reply(response) + user_roles = ctx.author.badges.keys() # Extract Twitch user badges + if not has_permission("howl", str(ctx.author.id), user_roles, "twitch", ctx.channel.name): + await ctx.reply(f"You don't have permission to use this command.") + return + + response = cc.handle_howl_command(ctx) + await ctx.reply(response) \ No newline at end of file diff --git a/modules/db.py b/modules/db.py index 3730baf..5331417 100644 --- a/modules/db.py +++ b/modules/db.py @@ -258,52 +258,58 @@ def ensure_quotes_table(db_conn): def ensure_users_table(db_conn): """ - Checks if 'Users' table exists. If not, creates it. + Ensures the 'Users' table exists and has the necessary columns. """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() if is_sqlite: - check_sql = """ - SELECT name - FROM sqlite_master - WHERE type='table' - AND name='Users' - """ + check_sql = "SELECT name FROM sqlite_master WHERE type='table' AND name='Users'" else: check_sql = """ - SELECT table_name - FROM information_schema.tables - WHERE table_name = 'Users' - AND table_schema = DATABASE() + SELECT table_name FROM information_schema.tables + WHERE table_name = 'Users' AND table_schema = DATABASE() """ rows = run_db_operation(db_conn, "read", check_sql) - if rows and rows[0] and rows[0][0]: - globals.log("Table 'Users' already exists, skipping creation.", "DEBUG") + if rows and rows[0]: + globals.log("Table 'Users' already exists, checking for column updates.", "DEBUG") + + # Ensure 'last_seen' column exists + column_check_sql = "PRAGMA table_info(Users)" if is_sqlite else """ + SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = 'Users' AND COLUMN_NAME = 'last_seen' + """ + columns = run_db_operation(db_conn, "read", column_check_sql) + if not any("last_seen" in col for col in columns): + globals.log("Adding 'last_seen' column to 'Users'...", "INFO") + alter_sql = "ALTER TABLE Users ADD COLUMN last_seen TEXT DEFAULT CURRENT_TIMESTAMP" if is_sqlite else """ + ALTER TABLE Users ADD COLUMN last_seen DATETIME DEFAULT CURRENT_TIMESTAMP + """ + run_db_operation(db_conn, "write", alter_sql) + return globals.log("Table 'Users' does not exist; creating now...", "INFO") - if is_sqlite: - create_sql = """ - CREATE TABLE Users ( - UUID TEXT PRIMARY KEY, - Unified_Username TEXT, - datetime_linked TEXT, - user_is_banned BOOLEAN DEFAULT 0, - user_is_bot BOOLEAN DEFAULT 0 - ) - """ - else: - create_sql = """ - CREATE TABLE Users ( - UUID VARCHAR(36) PRIMARY KEY, - Unified_Username VARCHAR(100), - datetime_linked DATETIME, - user_is_banned BOOLEAN DEFAULT FALSE, - user_is_bot BOOLEAN DEFAULT FALSE - ) - """ + create_sql = """ + CREATE TABLE Users ( + UUID TEXT PRIMARY KEY, + Unified_Username TEXT, + datetime_linked TEXT, + user_is_banned BOOLEAN DEFAULT 0, + user_is_bot BOOLEAN DEFAULT 0, + last_seen TEXT DEFAULT CURRENT_TIMESTAMP + ) + """ if is_sqlite else """ + CREATE TABLE Users ( + UUID VARCHAR(36) PRIMARY KEY, + Unified_Username VARCHAR(100), + datetime_linked DATETIME, + user_is_banned BOOLEAN DEFAULT FALSE, + user_is_bot BOOLEAN DEFAULT FALSE, + last_seen DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """ result = run_db_operation(db_conn, "write", create_sql) if result is None: @@ -313,14 +319,14 @@ def ensure_users_table(db_conn): globals.log("Successfully created table 'Users'.", "INFO") + ####################### # Ensure 'platform_mapping' table ####################### def ensure_platform_mapping_table(db_conn): """ - Ensures the 'Platform_Mapping' table exists. - This table maps platform-specific user IDs to the universal UUID. + Ensures the 'Platform_Mapping' table exists and has the necessary columns. """ is_sqlite = "sqlite3" in str(type(db_conn)).lower() @@ -341,7 +347,27 @@ def ensure_platform_mapping_table(db_conn): rows = run_db_operation(db_conn, "read", check_sql) if rows and rows[0] and rows[0][0]: - globals.log("Table 'Platform_Mapping' already exists, skipping creation.", "DEBUG") + globals.log("Table 'Platform_Mapping' already exists, checking for column updates.", "DEBUG") + + # Check if last_seen column exists + column_check_sql = """ + PRAGMA table_info(Platform_Mapping) + """ if is_sqlite else """ + SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = 'Platform_Mapping' AND COLUMN_NAME = 'last_seen' + """ + columns = run_db_operation(db_conn, "read", column_check_sql) + + # If column doesn't exist, add it + if not any("last_seen" in col for col in columns): + globals.log("Adding 'last_seen' column to 'Platform_Mapping'...", "INFO") + alter_sql = """ + ALTER TABLE Platform_Mapping ADD COLUMN last_seen TEXT DEFAULT CURRENT_TIMESTAMP + """ if is_sqlite else """ + ALTER TABLE Platform_Mapping ADD COLUMN last_seen DATETIME DEFAULT CURRENT_TIMESTAMP + """ + run_db_operation(db_conn, "write", alter_sql) + return globals.log("Table 'Platform_Mapping' does not exist; creating now...", "INFO") @@ -354,6 +380,7 @@ def ensure_platform_mapping_table(db_conn): UUID TEXT, Display_Name TEXT, Username TEXT, + last_seen TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (Platform_User_ID, Platform_Type), FOREIGN KEY (UUID) REFERENCES Users(UUID) ON DELETE CASCADE ) @@ -366,6 +393,7 @@ def ensure_platform_mapping_table(db_conn): UUID VARCHAR(36), Display_Name VARCHAR(100), Username VARCHAR(100), + last_seen DATETIME DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (Platform_User_ID, Platform_Type), FOREIGN KEY (UUID) REFERENCES Users(UUID) ON DELETE CASCADE ) @@ -386,13 +414,15 @@ def ensure_platform_mapping_table(db_conn): def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifier: str = None): """ - Looks up a user in the 'Users' table using 'Platform_Mapping' for platform-specific IDs or UUID. - + Looks up a user in the 'Users' table using 'Platform_Mapping' for platform-specific IDs, UUID, usernames, and display names. + identifier_type can be: - - "discord_user_id" or "discord" - - "twitch_user_id" or "twitch" - - "UUID" (to lookup by UUID directly) - + - "uuid" (to lookup by UUID directly) + - "unified_username" + - "discord_user_id" / "twitch_user_id" (Platform-specific user ID) + - "discord_display_name" / "twitch_display_name" (Platform-specific display name) + - "discord_username" / "twitch_username" (Platform-specific raw username) + Returns: If target_identifier is None: A dictionary with the following keys: { @@ -415,67 +445,70 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie # Debug: Log the inputs if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() called with: identifier='{identifier}', identifier_type='{identifier_type}', target_identifier='{target_identifier}'", "DEBUG") - # Define platform type and column mappings - platform_map = { - "discord": "Discord", - "discord_user_id": "Discord", - "twitch": "Twitch", - "twitch_user_id": "Twitch" + # Normalize identifier_type to lowercase + identifier_type = identifier_type.lower() + + # Define valid identifier types with SQL column mappings + valid_identifier_types = { + "uuid": "u.UUID", + "unified_username": "u.Unified_Username", + "discord_user_id": "pm.Platform_User_ID", + "twitch_user_id": "pm.Platform_User_ID", + "discord_display_name": "pm.Display_Name", + "twitch_display_name": "pm.Display_Name", + "discord_username": "pm.Username", + "twitch_username": "pm.Username", } + + # Extract platform from identifier type (if applicable) + platform_map = { + "discord_user_id": "Discord", + "twitch_user_id": "Twitch", + "discord_display_name": "Discord", + "twitch_display_name": "Twitch", + "discord_username": "Discord", + "twitch_username": "Twitch", + } + + if identifier_type not in valid_identifier_types: + globals.log(f"lookup_user error: invalid identifier_type '{identifier_type}'", "WARNING") + return None + + column_to_lookup = valid_identifier_types[identifier_type] + platform_filter = platform_map.get(identifier_type, None) + + # Construct query + query = f""" + SELECT + u.UUID, + u.Unified_Username, + u.datetime_linked, + u.user_is_banned, + u.user_is_bot, + pm.Platform_User_ID, + pm.Display_Name, + pm.Username, + pm.Platform_Type + FROM Users u + LEFT JOIN Platform_Mapping pm ON u.UUID = pm.UUID + WHERE {column_to_lookup} = ? + """ - # Handle UUID case separately - if identifier_type.upper() == "UUID": - query = """ - SELECT - u.UUID, - u.Unified_Username, - u.datetime_linked, - u.user_is_banned, - u.user_is_bot, - pm.Platform_User_ID, - pm.Display_Name, - pm.Username, - pm.Platform_Type - FROM Users u - LEFT JOIN Platform_Mapping pm ON u.UUID = pm.UUID - WHERE u.UUID = ? - LIMIT 1 - """ - params = (identifier,) - else: - # Handle platform-specific lookups - if identifier_type.lower() not in platform_map: - globals.log(f"lookup_user error: invalid identifier_type '{identifier_type}'", "WARNING") - return None + params = [identifier] - # Get the platform type (Discord or Twitch) - platform_type = platform_map[identifier_type.lower()] + # Apply platform filter if applicable + if platform_filter: + query += " AND pm.Platform_Type = ?" + params.append(platform_filter) - # Use platform_user_id to lookup the UUID - query = """ - SELECT - u.UUID, - u.Unified_Username, - u.datetime_linked, - u.user_is_banned, - u.user_is_bot, - pm.Platform_User_ID, - pm.Display_Name, - pm.Username, - pm.Platform_Type - FROM Users u - INNER JOIN Platform_Mapping pm ON u.UUID = pm.UUID - WHERE pm.Platform_Type = ? AND pm.Platform_User_ID = ? - LIMIT 1 - """ - params = (platform_type, identifier) + query += " LIMIT 1" # Debug: Log the query and parameters if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() executing query: {query} with params={params}", "DEBUG") # Run the query - rows = run_db_operation(db_conn, "read", query, params) - + rows = run_db_operation(db_conn, "read", query, tuple(params)) + # Debug: Log the result of the query if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() query result: {rows}", "DEBUG") @@ -487,7 +520,7 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie # Convert the row to a dictionary row = rows[0] user_data = { - "UUID": row[0], # Make UUID consistently uppercase + "uuid": row[0], # Ensure UUID is uppercase "unified_username": row[1], "datetime_linked": row[2], "user_is_banned": row[3], @@ -503,7 +536,7 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie # If target_identifier is provided, return just that value if target_identifier: - target_identifier = target_identifier.upper() # Force uppercase for consistency + target_identifier = target_identifier.lower() if target_identifier in user_data: if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() returning target_identifier='{target_identifier}' with value='{user_data[target_identifier]}'", "DEBUG") return user_data[target_identifier] @@ -514,6 +547,49 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() returning full user_data: {user_data}", "DEBUG") return user_data +def user_lastseen(db_conn, UUID: str, platform_name: str = None, platform_user_id: str | int = None, lookup: bool = False, update: bool = False): + """ + Handles user last seen updates and lookups. + + - `lookup=True`: Fetches the last-seen timestamp. + - `update=True`: Updates the last-seen timestamp. + - If platform_name and platform_user_id are provided, the query will be scoped to that account. + - Otherwise, it applies to all accounts unified under the UUI system. + """ + if not UUID: + globals.log("UUID is required for user_lastseen()", "ERROR") + return None + + if lookup: + query = """ + SELECT last_seen FROM Platform_Mapping WHERE UUID = ? + """ if not platform_name or not platform_user_id else """ + SELECT last_seen FROM Platform_Mapping WHERE UUID = ? AND Platform_Type = ? AND Platform_User_ID = ? + """ + + params = (UUID,) if not platform_name or not platform_user_id else (UUID, platform_name, str(platform_user_id)) + result = run_db_operation(db_conn, "read", query, params) + + if update: + update_sql = """ + UPDATE Platform_Mapping SET last_seen = CURRENT_TIMESTAMP WHERE UUID = ? + """ if not platform_name or not platform_user_id else """ + UPDATE Platform_Mapping SET last_seen = CURRENT_TIMESTAMP WHERE UUID = ? AND Platform_Type = ? AND Platform_User_ID = ? + """ + + params = (UUID,) if not platform_name or not platform_user_id else (UUID, platform_name, str(platform_user_id)) + run_db_operation(db_conn, "write", update_sql, params) + globals.log(f"Updated last_seen timestamp for UUID={UUID}", "DEBUG") + + if lookup: + if result and result[0]: + return result[0][0] # Return last seen datetime + return None # No data found + elif update and not lookup: + return True + + return False # No action taken + def ensure_chatlog_table(db_conn): """ @@ -590,7 +666,7 @@ def log_message(db_conn, identifier, identifier_type, message_content, platform, globals.log(f"User not found for {identifier_type}='{identifier}'", "WARNING") return - user_uuid = user_data["UUID"] + user_uuid = user_data["uuid"] message_uuid = str(uuid.uuid4()) # Generate a new UUID for the entry # Determine if a message ID is required for this platform @@ -835,7 +911,7 @@ def log_discord_activity(db_conn, guild_id, user_identifier, action, voice_chann globals.log(f"User not found for Discord ID: {user_identifier}", "WARNING") return - user_uuid = user_data["UUID"] + user_uuid = user_data["uuid"] # Prepare the voice_channel value (if it’s an object with a name, use that). channel_val = voice_channel.name if (voice_channel and hasattr(voice_channel, "name")) else voice_channel @@ -1174,7 +1250,7 @@ async def handle_community_event(db_conn, is_discord, ctx, args): globals.log(f"User not found: {ctx.author.name} ({user_id}) on {platform}", "ERROR") return "Could not log event: user data missing." - user_uuid = user_data["UUID"] + user_uuid = user_data["uuid"] # Insert new event. Adjust for SQLite or MariaDB. insert_sql = """ diff --git a/modules/permissions.py b/modules/permissions.py index c26db52..2343626 100644 --- a/modules/permissions.py +++ b/modules/permissions.py @@ -1,67 +1,97 @@ +# modules/permissions.py + import json import os +import globals import discord PERMISSIONS_FILE = "permissions.json" +TWITCH_CONFIG_FILE = "settings/twitch_channels_config.json" -# Load permission settings -def load_permissions(): - """Loads the permissions from JSON.""" - if not os.path.exists(PERMISSIONS_FILE): +########################## +# Load Configurations +########################## + +def load_json_file(file_path): + """Loads JSON data from a file, returns an empty dict if missing or invalid.""" + if not os.path.exists(file_path): + return {} + try: + with open(file_path, "r", encoding="utf-8") as file: + return json.load(file) + except json.JSONDecodeError as e: + globals.log(f"Error parsing JSON file {file_path}: {e}", "ERROR") return {} - with open(PERMISSIONS_FILE, "r", encoding="utf-8") as file: - return json.load(file) -def map_roles(platform: str, user_roles: list) -> list: +def load_permissions(): + """Dynamically loads permissions from `permissions.json`.""" + return load_json_file(PERMISSIONS_FILE) + +def load_twitch_config(): + """Dynamically loads Twitch-specific command allow/deny lists.""" + return load_json_file(TWITCH_CONFIG_FILE) + +########################## +# Role Mapping +########################## + +def map_roles(platform: str, user_roles: list, context_identifier: str = None) -> list: """ Maps platform-specific roles to a unified role system. + Supports per-guild (Discord) and per-channel (Twitch) overrides. :param platform: "discord" or "twitch" - :param user_roles: A list of raw roles/badges from the platform - :return: A list of mapped roles based on the JSON role mapping + :param user_roles: List of raw roles/badges from the platform + :param context_identifier: Guild ID (for Discord) or Channel Name (for Twitch) + :return: List of mapped roles """ permissions = load_permissions() role_mappings = permissions.get("role_mappings", {}).get(platform, {}) - mapped_roles = [] - for role in user_roles: - normalized_role = role.lower() - mapped_role = role_mappings.get(normalized_role, None) - if mapped_role: - mapped_roles.append(mapped_role) + # Allow per-context overrides + if context_identifier: + specific_mappings = permissions.get("role_mappings", {}).get(f"{platform}_{context_identifier}", {}) + role_mappings.update(specific_mappings) # Override defaults - return mapped_roles if mapped_roles else ["everyone"] + mapped_roles = [role_mappings.get(role.lower(), role.lower()) for role in user_roles] + + return list(set(mapped_roles)) if mapped_roles else ["everyone"] -def has_permission(command_name: str, user_id: str, user_roles: list, platform: str) -> bool: +########################## +# Permissions Checks +########################## + +def has_permission(command_name: str, user_id: str, user_roles: list, platform: str, context_identifier: str = None) -> bool: """ - Checks if a user has permission to run a command. + Checks if a user has permission to execute a command. - :param command_name: The name of the command being checked. - :param user_id: The ID of the user requesting the command. - :param user_roles: A list of roles/badges the user has (platform-specific). + :param command_name: The command to check + :param user_id: The user's ID + :param user_roles: The user's roles/badges :param platform: "discord" or "twitch" - :return: True if the user has permission, otherwise False. + :param context_identifier: Guild ID (for Discord) or Channel Name (for Twitch) + :return: True if the user has permission, False otherwise """ permissions = load_permissions() command_perms = permissions.get("commands", {}).get(command_name, {}) - # Extract settings + # Extract permission settings min_role = command_perms.get("min_role", "") allowed_roles = command_perms.get("allowed_roles", []) allowed_users = command_perms.get("allowed_users", []) - # If no min_role and no allowed_roles/users, it's open to everyone + # Auto-allow if no specific rules exist if not min_role and not allowed_roles and not allowed_users: return True - # Check if user is explicitly allowed + # Explicit user whitelist if user_id in allowed_users: - return True # Bypass role check + return True - # Convert platform-specific roles to mapped roles - mapped_roles = map_roles(platform, user_roles) + # Convert platform roles + mapped_roles = map_roles(platform, user_roles, context_identifier) - # If a min_role is set, check against it + # Check minimum required role if min_role: role_hierarchy = ["everyone", "follower", "subscriber", "vip", "moderator", "admin", "owner"] user_role_level = max([role_hierarchy.index(role) for role in mapped_roles if role in role_hierarchy], default=0) @@ -69,12 +99,113 @@ def has_permission(command_name: str, user_id: str, user_roles: list, platform: if user_role_level >= min_role_level: return True - # Check if the user has any explicitly allowed roles + # Check explicitly allowed roles if any(role in allowed_roles for role in mapped_roles): return True return False +########################## +# Twitch Command Filtering +########################## + +def is_command_allowed_twitch(command_name: str, channel_name: str) -> bool: + """ + Checks if a command is allowed in a specific Twitch channel. + + :param command_name: The command being checked + :param channel_name: The Twitch channel name + :return: True if allowed, False if blocked + """ + twitch_config = load_twitch_config() + channel_config = twitch_config.get(channel_name.lower(), {}) + + if not channel_config: + return False # Default to deny if no config exists + + mode = channel_config.get("commands_filter_mode", "exclude") + filtered_commands = channel_config.get("commands_filtered", []) + + if mode == "exclude" and command_name in filtered_commands: + return False + if mode == "include" and command_name not in filtered_commands: + return False + + return True + +# modules/permissions.py + +# Load permission settings +# def load_permissions(): +# """Loads the permissions from JSON.""" +# if not os.path.exists(PERMISSIONS_FILE): +# return {} +# with open(PERMISSIONS_FILE, "r", encoding="utf-8") as file: +# return json.load(file) + +# def map_roles(platform: str, user_roles: list) -> list: +# """ +# Maps platform-specific roles to a unified role system. + +# :param platform: "discord" or "twitch" +# :param user_roles: A list of raw roles/badges from the platform +# :return: A list of mapped roles based on the JSON role mapping +# """ +# permissions = load_permissions() +# role_mappings = permissions.get("role_mappings", {}).get(platform, {}) + +# mapped_roles = [] +# for role in user_roles: +# normalized_role = role.lower() +# mapped_role = role_mappings.get(normalized_role, None) +# if mapped_role: +# mapped_roles.append(mapped_role) + +# return mapped_roles if mapped_roles else ["everyone"] + +# def has_permission(command_name: str, user_id: str, user_roles: list, platform: str) -> bool: +# """ +# Checks if a user has permission to run a command. + +# :param command_name: The name of the command being checked. +# :param user_id: The ID of the user requesting the command. +# :param user_roles: A list of roles/badges the user has (platform-specific). +# :param platform: "discord" or "twitch" +# :return: True if the user has permission, otherwise False. +# """ +# permissions = load_permissions() +# command_perms = permissions.get("commands", {}).get(command_name, {}) + +# # Extract settings +# min_role = command_perms.get("min_role", "") +# allowed_roles = command_perms.get("allowed_roles", []) +# allowed_users = command_perms.get("allowed_users", []) + +# # If no min_role and no allowed_roles/users, it's open to everyone +# if not min_role and not allowed_roles and not allowed_users: +# return True + +# # Check if user is explicitly allowed +# if user_id in allowed_users: +# return True # Bypass role check + +# # Convert platform-specific roles to mapped roles +# mapped_roles = map_roles(platform, user_roles) + +# # If a min_role is set, check against it +# if min_role: +# role_hierarchy = ["everyone", "follower", "subscriber", "vip", "moderator", "admin", "owner"] +# user_role_level = max([role_hierarchy.index(role) for role in mapped_roles if role in role_hierarchy], default=0) +# min_role_level = role_hierarchy.index(min_role) if min_role in role_hierarchy else 0 +# if user_role_level >= min_role_level: +# return True + +# # Check if the user has any explicitly allowed roles +# if any(role in allowed_roles for role in mapped_roles): +# return True + +# return False + def has_custom_vc_permission(member: discord.Member, vc_owner_id: int = None) -> bool: """ @@ -100,4 +231,4 @@ def has_custom_vc_permission(member: discord.Member, vc_owner_id: int = None) -> return True # Otherwise, no permission - return False + return False \ No newline at end of file diff --git a/modules/utility.py b/modules/utility.py index d462f60..cbea11f 100644 --- a/modules/utility.py +++ b/modules/utility.py @@ -7,7 +7,7 @@ import functools import inspect import uuid from typing import Union -from modules.db import run_db_operation, lookup_user, log_message +from modules.db import run_db_operation, lookup_user, log_message, user_lastseen import modules.utility as utility import discord from functools import wraps @@ -525,8 +525,6 @@ def get_loaded_commands(bot, is_discord): Side Effects: Logs debug, warning, and error messages regarding the command processing. """ - from discord.ext import commands as discord_commands - from twitchio.ext import commands as twitch_commands commands_list = [] @@ -616,7 +614,6 @@ def build_discord_help_message(cmd_name, cmd_help_dict): return "\n".join(lines) - def build_twitch_help_message(cmd_name, cmd_help_dict): """ Build a concise Twitch help message for a command. @@ -710,11 +707,16 @@ def track_user_activity( if user_data: # Existing user found, update info if necessary - user_uuid = user_data["UUID"] + user_uuid = user_data["uuid"] need_update = False column_updates = [] params = [] + # Update user last-seen timestamp + user_lastseen(db_conn, UUID=user_uuid, platform_name=platform, platform_user_id=user_id, update=True) + last_seen = user_lastseen(db_conn, UUID=user_uuid, platform_name=platform, platform_user_id=user_id, lookup=True) + globals.log(f"Updated last-seen datetime to {last_seen}", "DEBUG") + if user_data["platform_username"] != username: need_update = True column_updates.append("Username = ?") @@ -760,7 +762,6 @@ def track_user_activity( globals.log(f"Failed to create user entry for {platform} user '{username}'.", "ERROR") - from modules.db import log_bot_event def log_bot_startup(db_conn): @@ -981,45 +982,27 @@ def list_channels(self): f"Currently connected to {num_connected_channels} Twitch channel(s): {connected_channels_str}" ) +from functools import wraps +import globals +from modules.permissions import is_command_allowed_twitch + def command_allowed_twitch(func): """ - A custom check that allows a command to run based on channel settings. - It looks up the current channel in CHANNEL_CONFIG and either allows or denies - the command based on the filter mode and list. + Checks if a command is allowed based on Twitch channel settings. """ @wraps(func) async def wrapper(ctx, *args, **kwargs): - # Load the full configuration. - full_config = globals.constants.twitch_channels_config() - - # Get the channel name and then the channel-specific configuration. channel_name = ctx.channel.name.lower() - channel_config = full_config.get(channel_name) - - # If there's no configuration for this channel, block the command. - if not channel_config: - globals.log(f"No configuration found for Twitch channel '{channel_name}'. Blocking command '{ctx.command.name}'.") + command_name = ctx.command.name + + # Block command if it's not allowed in the channel + if not is_command_allowed_twitch(command_name, channel_name): + globals.log(f"Command '{command_name}' is blocked in '{channel_name}'.", "WARNING") return - mode = channel_config.get("commands_filter_mode") - filtered = channel_config.get("commands_filtered", []) - command_name = ctx.command.name - - # Check based on filter mode. - if mode == "exclude": - if command_name in filtered: - globals.log(f"Command '{command_name}' is excluded on Twitch channel '{channel_name}'.") - return - elif mode == "include": - if command_name not in filtered: - globals.log(f"Command '{command_name}' is not allowed on Twitch channel '{channel_name}' (include mode).") - return - - # If all checks pass, run the command. return await func(ctx, *args, **kwargs) - - return wrapper + return wrapper ############################################### diff --git a/permissions.json b/permissions.json index f87193c..4f00aac 100644 --- a/permissions.json +++ b/permissions.json @@ -15,10 +15,13 @@ } }, "commands": { - "hi": { + "special_vip_command": { "min_role": "", - "allowed_roles": ["broadcaster", "owner"], - "allowed_users": ["203190147582394369"] + "allowed_roles": ["broadcaster", "vip"] + }, + "owner_only": { + "min_role": "owner", + "allowed_roles": [] } } }