diff --git a/cmd_discord/customvc.py b/cmd_discord/customvc.py new file mode 100644 index 0000000..5eb0d9a --- /dev/null +++ b/cmd_discord/customvc.py @@ -0,0 +1,515 @@ +# cmd_discord/customvc.py + +import discord +from discord.ext import commands +import datetime + +from modules.permissions import has_custom_vc_permission + +# IDs +LOBBY_VC_ID = 1345509388651069583 # The lobby voice channel +CUSTOM_VC_CATEGORY_ID = 1345509307503874149 # Where new custom VCs go + +# Track data in memory +CUSTOM_VC_INFO = {} # voice_chan_id -> {owner_id, locked, allowed_ids, denied_ids, ...} +USER_LAST_CREATED = {} # user_id -> datetime (for rate-limits) +GLOBAL_CREATIONS = [] # list of datetime for global rate-limits +CHANNEL_COUNTER = 0 + +# Rate-limits +USER_COOLDOWN_MINUTES = 5 +GLOBAL_CHANNELS_PER_MINUTE = 2 + +# Auto-delete scheduling +PENDING_DELETIONS = {} +DELETE_DELAY_SECONDS = 10 + +# Arbitrary max, so new channels get “VC x” or “Overflow x” +MAX_CUSTOM_CHANNELS = 9 + + +def setup(bot: commands.Bot): + """ + Called by your `load_commands()` in bot_discord.py + """ + + @bot.listen("on_ready") + async def after_bot_ready(): + """ + When the bot starts, we scan leftover custom voice channels in the category: + - If it's the lobby => skip + - If it's empty => delete + - If it has users => pick first occupant as owner + Then mention them in that channel's built-in text chat (if supported). + """ + await scan_existing_custom_vcs(bot) + + @bot.listen("on_voice_state_update") + async def handle_voice_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): + # The logic for creation, auto-delete, etc. + await on_voice_state_update(bot, member, before, after) + + @bot.group(name="customvc", invoke_without_command=True) + async def customvc_base(ctx: commands.Context): + """ + Base !customvc command -> show help if no subcommand used + """ + msg = ( + "**Custom VC Help**\n" + "Join the lobby to create a new voice channel automatically.\n" + "Subcommands:\n" + "- **name** - rename\n" + " - `!customvc name my_custom_vc`\n" + "- **claim** - claim ownership\n" + " - `!customvc claim`\n" + "- **lock** - lock the channel\n" + " - `!customvc lock`\n" + "- **allow** - allow user\n" + " - `!customvc allow some_user`\n" + "- **deny** - deny user\n" + " - `!customvc deny some_user`\n" + "- **unlock** - unlock channel\n" + " - `!customvc unlock`\n" + "- **users** - set user limit\n" + " - `!customvc users 2`\n" + "- **bitrate** - set channel bitrate\n" + " - `!customvc bitrate some_bitrate`\n" + "- **status** - set a custom status **(TODO)**\n" + "- **op** - co-owner\n" + " - `!customvc op some_user`\n" + "- **settings** - show config\n" + " - `!customvc settings`\n" + ) + await ctx.send(msg) + + # Subcommands: + @customvc_base.command(name="name") + async def name_subcommand(ctx: commands.Context, *, new_name: str): + await rename_channel(ctx, new_name) + + @customvc_base.command(name="claim") + async def claim_subcommand(ctx: commands.Context): + await claim_channel(ctx) + + @customvc_base.command(name="lock") + async def lock_subcommand(ctx: commands.Context): + await lock_channel(ctx) + + @customvc_base.command(name="allow") + async def allow_subcommand(ctx: commands.Context, *, user: str): + await allow_user(ctx, user) + + @customvc_base.command(name="deny") + async def deny_subcommand(ctx: commands.Context, *, user: str): + await deny_user(ctx, user) + + @customvc_base.command(name="unlock") + async def unlock_subcommand(ctx: commands.Context): + await unlock_channel(ctx) + + @customvc_base.command(name="users") + async def users_subcommand(ctx: commands.Context, limit: int): + await set_user_limit(ctx, limit) + + @customvc_base.command(name="bitrate") + async def bitrate_subcommand(ctx: commands.Context, kbps: int): + await set_bitrate(ctx, kbps) + + @customvc_base.command(name="status") + async def status_subcommand(ctx: commands.Context, *, status_str: str): + await set_status(ctx, status_str) + + @customvc_base.command(name="op") + async def op_subcommand(ctx: commands.Context, *, user: str): + await op_user(ctx, user) + + @customvc_base.command(name="settings") + async def settings_subcommand(ctx: commands.Context): + await show_settings(ctx) + +# +# Main voice update logic +# + +async def on_voice_state_update(bot: commands.Bot, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): + global CHANNEL_COUNTER + + if before.channel != after.channel: + # 1) If user just joined the lobby + if after.channel and after.channel.id == LOBBY_VC_ID: + if not await can_create_vc(member): + # Rate-limit => forcibly disconnect + await member.move_to(None) + # Also mention them in the ephemeral chat of LOBBY_VC_ID if we want (if possible) + lobby_chan = after.channel + if lobby_chan and hasattr(lobby_chan, "send"): + # We'll try to mention them + await lobby_chan.send(f"{member.mention}, you’ve exceeded custom VC creation limits. Try again later!") + return + + # 2) Create a new custom VC + CHANNEL_COUNTER += 1 + vc_name = f"VC {CHANNEL_COUNTER}" + if CHANNEL_COUNTER > MAX_CUSTOM_CHANNELS: + vc_name = f"Overflow {CHANNEL_COUNTER}" + + category = member.guild.get_channel(CUSTOM_VC_CATEGORY_ID) + if not category or not isinstance(category, discord.CategoryChannel): + print("Could not find a valid custom VC category.") + return + + new_vc = await category.create_voice_channel(name=vc_name) + await member.move_to(new_vc) + + # Record memory + CUSTOM_VC_INFO[new_vc.id] = { + "owner_id": member.id, + "locked": False, + "allowed_ids": set(), + "denied_ids": set(), + "user_limit": None, + "bitrate": None, + } + + now = datetime.datetime.utcnow() + USER_LAST_CREATED[member.id] = now + GLOBAL_CREATIONS.append(now) + # prune old + cutoff = now - datetime.timedelta(seconds=60) + GLOBAL_CREATIONS[:] = [t for t in GLOBAL_CREATIONS if t > cutoff] + + # Announce in the new VC's ephemeral chat + if hasattr(new_vc, "send"): + await new_vc.send( + f"{member.mention}, your custom voice channel is ready! " + "Type `!customvc` here for help with subcommands." + ) + + # 3) If user left a custom VC -> maybe it’s now empty + if before.channel and before.channel.id in CUSTOM_VC_INFO: + old_vc = before.channel + if len(old_vc.members) == 0: + schedule_deletion(old_vc) + + # If user joined a custom VC that was pending deletion, cancel it + if after.channel and after.channel.id in CUSTOM_VC_INFO: + if after.channel.id in PENDING_DELETIONS: + PENDING_DELETIONS[after.channel.id].cancel() + del PENDING_DELETIONS[after.channel.id] + + +def schedule_deletion(vc: discord.VoiceChannel): + """ + Schedules this custom VC for deletion in 10s if it stays empty. + """ + import asyncio + + async def delete_task(): + await asyncio.sleep(DELETE_DELAY_SECONDS) + if len(vc.members) == 0: + CUSTOM_VC_INFO.pop(vc.id, None) + try: + await vc.delete() + except: + pass + PENDING_DELETIONS.pop(vc.id, None) + + loop = vc.guild._state.loop + task = loop.create_task(delete_task()) + PENDING_DELETIONS[vc.id] = task + +async def can_create_vc(member: discord.Member) -> bool: + """ + Return False if user or global rate-limits are hit: + - user can only create 1 channel every 5 minutes + - globally only 2 channels per minute + """ + now = datetime.datetime.utcnow() + + last_time = USER_LAST_CREATED.get(member.id) + if last_time: + diff = (now - last_time).total_seconds() + if diff < (USER_COOLDOWN_MINUTES * 60): + return False + + # global limit + cutoff = now - datetime.timedelta(seconds=60) + recent = [t for t in GLOBAL_CREATIONS if t > cutoff] + if len(recent) >= GLOBAL_CHANNELS_PER_MINUTE: + return False + + return True + + +async def scan_existing_custom_vcs(bot: commands.Bot): + """ + On startup: check the custom VC category for leftover channels: + - If channel.id == LOBBY_VC_ID -> skip (don't delete the main lobby). + - If empty, delete immediately. + - If non-empty, first occupant is new owner. Mention them in ephemeral chat if possible. + """ + for g in bot.guilds: + cat = g.get_channel(CUSTOM_VC_CATEGORY_ID) + if not cat or not isinstance(cat, discord.CategoryChannel): + continue + + for ch in cat.voice_channels: + # skip the LOBBY + if ch.id == LOBBY_VC_ID: + continue + + if len(ch.members) == 0: + # safe to delete + try: + await ch.delete() + except: + pass + print(f"Deleted empty leftover channel: {ch.name}") + else: + # pick first occupant + first = ch.members[0] + global CHANNEL_COUNTER + CHANNEL_COUNTER += 1 + CUSTOM_VC_INFO[ch.id] = { + "owner_id": first.id, + "locked": False, + "allowed_ids": set(), + "denied_ids": set(), + "user_limit": None, + "bitrate": None, + } + print(f"Assigned {first.display_name} as owner of leftover VC: {ch.name}") + if hasattr(ch, "send"): + try: + await ch.send( + f"{first.mention}, you're now the owner of leftover channel **{ch.name}** after a restart. " + "Type `!customvc` here to see available commands." + ) + except: + pass + + +# +# The subcommand logic below, referencing CUSTOM_VC_INFO +# + +def get_custom_vc_for(ctx: commands.Context) -> discord.VoiceChannel: + """ + Return the custom voice channel the command user is currently in (or None). + We'll look at ctx.author.voice. + """ + if not isinstance(ctx.author, discord.Member): + return None + vs = ctx.author.voice + if not vs or not vs.channel: + return None + vc = vs.channel + if vc.id not in CUSTOM_VC_INFO: + return None + return vc + +def is_initiator_or_mod(ctx: commands.Context, vc_id: int) -> bool: + info = CUSTOM_VC_INFO.get(vc_id) + if not info: + return False + return has_custom_vc_permission(ctx.author, info["owner_id"]) + +async def rename_channel(ctx: commands.Context, new_name: str): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("You are not in a custom voice channel.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to rename.") + await vc.edit(name=new_name) + await ctx.send(f"Renamed channel to **{new_name}**.") + +async def claim_channel(ctx: commands.Context): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + info = CUSTOM_VC_INFO.get(vc.id) + if not info: + return await ctx.send("No memory for this channel?") + + old = info["owner_id"] + # if old owner is still inside + if any(m.id == old for m in vc.members): + return await ctx.send("The original owner is still here; you cannot claim.") + info["owner_id"] = ctx.author.id + await ctx.send("You are now the owner of this channel!") + +async def lock_channel(ctx: commands.Context): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("You're not in a custom VC.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to lock.") + + info = CUSTOM_VC_INFO[vc.id] + info["locked"] = True + overwrites = vc.overwrites or {} + overwrites[ctx.guild.default_role] = discord.PermissionOverwrite(connect=False) + await vc.edit(overwrites=overwrites) + await ctx.send("Channel locked.") + +async def allow_user(ctx: commands.Context, user: str): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to allow a user.") + + mem = await resolve_member(ctx, user) + if not mem: + return await ctx.send(f"Could not find user: {user}.") + info = CUSTOM_VC_INFO[vc.id] + info["allowed_ids"].add(mem.id) + overwrites = vc.overwrites or {} + overwrites[mem] = discord.PermissionOverwrite(connect=True) + await vc.edit(overwrites=overwrites) + await ctx.send(f"Allowed **{mem.display_name}** to join.") + +async def deny_user(ctx: commands.Context, user: str): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to deny user.") + + mem = await resolve_member(ctx, user) + if not mem: + return await ctx.send(f"Could not find user: {user}.") + info = CUSTOM_VC_INFO[vc.id] + # check if they're mod or owner + if has_custom_vc_permission(mem, info["owner_id"]): + return await ctx.send("Cannot deny a moderator or the owner.") + info["denied_ids"].add(mem.id) + + overwrites = vc.overwrites or {} + overwrites[mem] = discord.PermissionOverwrite(connect=False) + await vc.edit(overwrites=overwrites) + await ctx.send(f"Denied {mem.display_name} from connecting.") + +async def unlock_channel(ctx: commands.Context): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to unlock.") + + info = CUSTOM_VC_INFO[vc.id] + info["locked"] = False + overwrites = vc.overwrites or {} + overwrites[ctx.guild.default_role] = discord.PermissionOverwrite(connect=True) + + for d_id in info["denied_ids"]: + m = ctx.guild.get_member(d_id) + if m: + overwrites[m] = discord.PermissionOverwrite(connect=False) + + await vc.edit(overwrites=overwrites) + await ctx.send("Unlocked the channel. Denied users still cannot join.") + +async def set_user_limit(ctx: commands.Context, limit: int): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + if limit < 2: + return await ctx.send("Minimum limit is 2.") + if limit > 99: + return await ctx.send("Maximum limit is 99.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to set user limit.") + info = CUSTOM_VC_INFO[vc.id] + info["user_limit"] = limit + await vc.edit(user_limit=limit) + await ctx.send(f"User limit set to {limit}.") + +async def set_bitrate(ctx: commands.Context, kbps: int): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to set bitrate.") + + info = CUSTOM_VC_INFO[vc.id] + info["bitrate"] = kbps + if kbps < 8 or kbps > 128: + return await ctx.send(f"Invalid bitrate of {kbps} kbps! Must be a number between `8` and `128`! Default is 64 kbps") + await vc.edit(bitrate=kbps * 1000) + await ctx.send(f"Bitrate set to {kbps} kbps.") + +async def set_status(ctx: commands.Context, status_str: str): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to set channel status.") + + info = CUSTOM_VC_INFO[vc.id] + info["status"] = status_str + await ctx.send(f"Channel status set to: **{status_str}** (placeholder).") + +async def op_user(ctx: commands.Context, user: str): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + if not is_initiator_or_mod(ctx, vc.id): + return await ctx.send("No permission to op someone.") + mem = await resolve_member(ctx, user) + if not mem: + return await ctx.send(f"Could not find user: {user}.") + info = CUSTOM_VC_INFO[vc.id] + if "ops" not in info: + info["ops"] = set() + info["ops"].add(mem.id) + await ctx.send(f"Granted co-ownership to {mem.display_name}.") + +async def show_settings(ctx: commands.Context): + vc = get_custom_vc_for(ctx) + if not vc: + return await ctx.send("Not in a custom VC.") + info = CUSTOM_VC_INFO.get(vc.id) + if not info: + return await ctx.send("No data found for this channel?") + + locked_str = "Yes" if info["locked"] else "No" + user_lim = info["user_limit"] if info["user_limit"] else "Default" + bitrate_str = info["bitrate"] if info["bitrate"] else "Default (64 kbps)" + status_str = info.get("status", "None") + owner_id = info["owner_id"] + owner = ctx.guild.get_member(owner_id) + owner_str = owner.display_name if owner else f"" + + allowed_str = ", ".join(map(str, info["allowed_ids"])) if info["allowed_ids"] else "None specified" + denied_str = ", ".join(map(str, info["denied_ids"])) if info["denied_ids"] else "None specified" + + msg = ( + f"**Channel Settings**\n" + f"Owner: {owner_str}\n" + f"Locked: {locked_str}\n" + f"User Limit: {user_lim}\n" + f"Bitrate: {bitrate_str}\n" + f"Status: {status_str}\n" + f"Allowed: {allowed_str}\n" + f"Denied: {denied_str}\n" + ) + await ctx.send(msg) + +async def resolve_member(ctx: commands.Context, user_str: str) -> discord.Member: + """ + Attempt to parse user mention/ID/partial name. + """ + user_str = user_str.strip() + if user_str.startswith("<@") and user_str.endswith(">"): + uid = user_str.strip("<@!>") + if uid.isdigit(): + return ctx.guild.get_member(int(uid)) + elif user_str.isdigit(): + return ctx.guild.get_member(int(user_str)) + + lower = user_str.lower() + for m in ctx.guild.members: + if m.name.lower() == lower or m.display_name.lower() == lower: + return m + return None diff --git a/modules/permissions.py b/modules/permissions.py index b350596..c26db52 100644 --- a/modules/permissions.py +++ b/modules/permissions.py @@ -1,5 +1,6 @@ import json import os +import discord PERMISSIONS_FILE = "permissions.json" @@ -73,3 +74,30 @@ def has_permission(command_name: str, user_id: str, user_roles: list, platform: return True return False + + +def has_custom_vc_permission(member: discord.Member, vc_owner_id: int = None) -> bool: + """ + Checks if the given member is either the channel's owner (vc_owner_id) + or a recognized moderator (one of the roles in MODERATOR_ROLE_IDS). + Returns True if they can manage the channel, False otherwise. + """ + + MODERATOR_ROLE_IDS = { + 896715419681947650, # Pack Owner + 1345410182674513920, # Pack Host + 958415559370866709, # Discord-specific moderator + 896715186357043200, # Pack-general moderator + } + + # 1) Check if user is the “owner” of the channel + if vc_owner_id is not None and member.id == vc_owner_id: + return True + + # 2) Check if the user has any of the allowed moderator roles + user_role_ids = {r.id for r in member.roles} + if MODERATOR_ROLE_IDS.intersection(user_role_ids): + return True + + # Otherwise, no permission + return False