OokamiPupV2/cmd_discord/customvc.py

516 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# cmd_discord/customvc.py
import discord
from discord.ext import commands
import datetime
from modules.permissions import has_custom_vc_permission
# IDs
LOBBY_VC_ID = 1345509388651069583 # The lobby voice channel
CUSTOM_VC_CATEGORY_ID = 1345509307503874149 # Where new custom VCs go
# Track data in memory
CUSTOM_VC_INFO = {} # voice_chan_id -> {owner_id, locked, allowed_ids, denied_ids, ...}
USER_LAST_CREATED = {} # user_id -> datetime (for rate-limits)
GLOBAL_CREATIONS = [] # list of datetime for global rate-limits
CHANNEL_COUNTER = 0
# Rate-limits
USER_COOLDOWN_MINUTES = 5
GLOBAL_CHANNELS_PER_MINUTE = 2
# Auto-delete scheduling
PENDING_DELETIONS = {}
DELETE_DELAY_SECONDS = 10
# Arbitrary max, so new channels get “VC x” or “Overflow x”
MAX_CUSTOM_CHANNELS = 9
def setup(bot: commands.Bot):
"""
Called by your `load_commands()` in bot_discord.py
"""
@bot.listen("on_ready")
async def after_bot_ready():
"""
When the bot starts, we scan leftover custom voice channels in the category:
- If it's the lobby => skip
- If it's empty => delete
- If it has users => pick first occupant as owner
Then mention them in that channel's built-in text chat (if supported).
"""
await scan_existing_custom_vcs(bot)
@bot.listen("on_voice_state_update")
async def handle_voice_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
# The logic for creation, auto-delete, etc.
await on_voice_state_update(bot, member, before, after)
@bot.group(name="customvc", invoke_without_command=True)
async def customvc_base(ctx: commands.Context):
"""
Base !customvc command -> show help if no subcommand used
"""
msg = (
"**Custom VC Help**\n"
"Join the lobby to create a new voice channel automatically.\n"
"Subcommands:\n"
"- **name** <new_name> - rename\n"
" - `!customvc name my_custom_vc`\n"
"- **claim** - claim ownership\n"
" - `!customvc claim`\n"
"- **lock** - lock the channel\n"
" - `!customvc lock`\n"
"- **allow** <user> - allow user\n"
" - `!customvc allow some_user`\n"
"- **deny** <user> - deny user\n"
" - `!customvc deny some_user`\n"
"- **unlock** - unlock channel\n"
" - `!customvc unlock`\n"
"- **users** <count> - set user limit\n"
" - `!customvc users 2`\n"
"- **bitrate** <kbps> - set channel bitrate\n"
" - `!customvc bitrate some_bitrate`\n"
"- **status** <status_str> - set a custom status **(TODO)**\n"
"- **op** <user> - co-owner\n"
" - `!customvc op some_user`\n"
"- **settings** - show config\n"
" - `!customvc settings`\n"
)
await ctx.send(msg)
# Subcommands:
@customvc_base.command(name="name")
async def name_subcommand(ctx: commands.Context, *, new_name: str):
await rename_channel(ctx, new_name)
@customvc_base.command(name="claim")
async def claim_subcommand(ctx: commands.Context):
await claim_channel(ctx)
@customvc_base.command(name="lock")
async def lock_subcommand(ctx: commands.Context):
await lock_channel(ctx)
@customvc_base.command(name="allow")
async def allow_subcommand(ctx: commands.Context, *, user: str):
await allow_user(ctx, user)
@customvc_base.command(name="deny")
async def deny_subcommand(ctx: commands.Context, *, user: str):
await deny_user(ctx, user)
@customvc_base.command(name="unlock")
async def unlock_subcommand(ctx: commands.Context):
await unlock_channel(ctx)
@customvc_base.command(name="users")
async def users_subcommand(ctx: commands.Context, limit: int):
await set_user_limit(ctx, limit)
@customvc_base.command(name="bitrate")
async def bitrate_subcommand(ctx: commands.Context, kbps: int):
await set_bitrate(ctx, kbps)
@customvc_base.command(name="status")
async def status_subcommand(ctx: commands.Context, *, status_str: str):
await set_status(ctx, status_str)
@customvc_base.command(name="op")
async def op_subcommand(ctx: commands.Context, *, user: str):
await op_user(ctx, user)
@customvc_base.command(name="settings")
async def settings_subcommand(ctx: commands.Context):
await show_settings(ctx)
#
# Main voice update logic
#
async def on_voice_state_update(bot: commands.Bot, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
global CHANNEL_COUNTER
if before.channel != after.channel:
# 1) If user just joined the lobby
if after.channel and after.channel.id == LOBBY_VC_ID:
if not await can_create_vc(member):
# Rate-limit => forcibly disconnect
await member.move_to(None)
# Also mention them in the ephemeral chat of LOBBY_VC_ID if we want (if possible)
lobby_chan = after.channel
if lobby_chan and hasattr(lobby_chan, "send"):
# We'll try to mention them
await lobby_chan.send(f"{member.mention}, youve exceeded custom VC creation limits. Try again later!")
return
# 2) Create a new custom VC
CHANNEL_COUNTER += 1
vc_name = f"VC {CHANNEL_COUNTER}"
if CHANNEL_COUNTER > MAX_CUSTOM_CHANNELS:
vc_name = f"Overflow {CHANNEL_COUNTER}"
category = member.guild.get_channel(CUSTOM_VC_CATEGORY_ID)
if not category or not isinstance(category, discord.CategoryChannel):
print("Could not find a valid custom VC category.")
return
new_vc = await category.create_voice_channel(name=vc_name)
await member.move_to(new_vc)
# Record memory
CUSTOM_VC_INFO[new_vc.id] = {
"owner_id": member.id,
"locked": False,
"allowed_ids": set(),
"denied_ids": set(),
"user_limit": None,
"bitrate": None,
}
now = datetime.datetime.utcnow()
USER_LAST_CREATED[member.id] = now
GLOBAL_CREATIONS.append(now)
# prune old
cutoff = now - datetime.timedelta(seconds=60)
GLOBAL_CREATIONS[:] = [t for t in GLOBAL_CREATIONS if t > cutoff]
# Announce in the new VC's ephemeral chat
if hasattr(new_vc, "send"):
await new_vc.send(
f"{member.mention}, your custom voice channel is ready! "
"Type `!customvc` here for help with subcommands."
)
# 3) If user left a custom VC -> maybe its now empty
if before.channel and before.channel.id in CUSTOM_VC_INFO:
old_vc = before.channel
if len(old_vc.members) == 0:
schedule_deletion(old_vc)
# If user joined a custom VC that was pending deletion, cancel it
if after.channel and after.channel.id in CUSTOM_VC_INFO:
if after.channel.id in PENDING_DELETIONS:
PENDING_DELETIONS[after.channel.id].cancel()
del PENDING_DELETIONS[after.channel.id]
def schedule_deletion(vc: discord.VoiceChannel):
"""
Schedules this custom VC for deletion in 10s if it stays empty.
"""
import asyncio
async def delete_task():
await asyncio.sleep(DELETE_DELAY_SECONDS)
if len(vc.members) == 0:
CUSTOM_VC_INFO.pop(vc.id, None)
try:
await vc.delete()
except:
pass
PENDING_DELETIONS.pop(vc.id, None)
loop = vc.guild._state.loop
task = loop.create_task(delete_task())
PENDING_DELETIONS[vc.id] = task
async def can_create_vc(member: discord.Member) -> bool:
"""
Return False if user or global rate-limits are hit:
- user can only create 1 channel every 5 minutes
- globally only 2 channels per minute
"""
now = datetime.datetime.utcnow()
last_time = USER_LAST_CREATED.get(member.id)
if last_time:
diff = (now - last_time).total_seconds()
if diff < (USER_COOLDOWN_MINUTES * 60):
return False
# global limit
cutoff = now - datetime.timedelta(seconds=60)
recent = [t for t in GLOBAL_CREATIONS if t > cutoff]
if len(recent) >= GLOBAL_CHANNELS_PER_MINUTE:
return False
return True
async def scan_existing_custom_vcs(bot: commands.Bot):
"""
On startup: check the custom VC category for leftover channels:
- If channel.id == LOBBY_VC_ID -> skip (don't delete the main lobby).
- If empty, delete immediately.
- If non-empty, first occupant is new owner. Mention them in ephemeral chat if possible.
"""
for g in bot.guilds:
cat = g.get_channel(CUSTOM_VC_CATEGORY_ID)
if not cat or not isinstance(cat, discord.CategoryChannel):
continue
for ch in cat.voice_channels:
# skip the LOBBY
if ch.id == LOBBY_VC_ID:
continue
if len(ch.members) == 0:
# safe to delete
try:
await ch.delete()
except:
pass
print(f"Deleted empty leftover channel: {ch.name}")
else:
# pick first occupant
first = ch.members[0]
global CHANNEL_COUNTER
CHANNEL_COUNTER += 1
CUSTOM_VC_INFO[ch.id] = {
"owner_id": first.id,
"locked": False,
"allowed_ids": set(),
"denied_ids": set(),
"user_limit": None,
"bitrate": None,
}
print(f"Assigned {first.display_name} as owner of leftover VC: {ch.name}")
if hasattr(ch, "send"):
try:
await ch.send(
f"{first.mention}, you're now the owner of leftover channel **{ch.name}** after a restart. "
"Type `!customvc` here to see available commands."
)
except:
pass
#
# The subcommand logic below, referencing CUSTOM_VC_INFO
#
def get_custom_vc_for(ctx: commands.Context) -> discord.VoiceChannel:
"""
Return the custom voice channel the command user is currently in (or None).
We'll look at ctx.author.voice.
"""
if not isinstance(ctx.author, discord.Member):
return None
vs = ctx.author.voice
if not vs or not vs.channel:
return None
vc = vs.channel
if vc.id not in CUSTOM_VC_INFO:
return None
return vc
def is_initiator_or_mod(ctx: commands.Context, vc_id: int) -> bool:
info = CUSTOM_VC_INFO.get(vc_id)
if not info:
return False
return has_custom_vc_permission(ctx.author, info["owner_id"])
async def rename_channel(ctx: commands.Context, new_name: str):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("You are not in a custom voice channel.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to rename.")
await vc.edit(name=new_name)
await ctx.send(f"Renamed channel to **{new_name}**.")
async def claim_channel(ctx: commands.Context):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
info = CUSTOM_VC_INFO.get(vc.id)
if not info:
return await ctx.send("No memory for this channel?")
old = info["owner_id"]
# if old owner is still inside
if any(m.id == old for m in vc.members):
return await ctx.send("The original owner is still here; you cannot claim.")
info["owner_id"] = ctx.author.id
await ctx.send("You are now the owner of this channel!")
async def lock_channel(ctx: commands.Context):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("You're not in a custom VC.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to lock.")
info = CUSTOM_VC_INFO[vc.id]
info["locked"] = True
overwrites = vc.overwrites or {}
overwrites[ctx.guild.default_role] = discord.PermissionOverwrite(connect=False)
await vc.edit(overwrites=overwrites)
await ctx.send("Channel locked.")
async def allow_user(ctx: commands.Context, user: str):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to allow a user.")
mem = await resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = CUSTOM_VC_INFO[vc.id]
info["allowed_ids"].add(mem.id)
overwrites = vc.overwrites or {}
overwrites[mem] = discord.PermissionOverwrite(connect=True)
await vc.edit(overwrites=overwrites)
await ctx.send(f"Allowed **{mem.display_name}** to join.")
async def deny_user(ctx: commands.Context, user: str):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to deny user.")
mem = await resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = CUSTOM_VC_INFO[vc.id]
# check if they're mod or owner
if has_custom_vc_permission(mem, info["owner_id"]):
return await ctx.send("Cannot deny a moderator or the owner.")
info["denied_ids"].add(mem.id)
overwrites = vc.overwrites or {}
overwrites[mem] = discord.PermissionOverwrite(connect=False)
await vc.edit(overwrites=overwrites)
await ctx.send(f"Denied {mem.display_name} from connecting.")
async def unlock_channel(ctx: commands.Context):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to unlock.")
info = CUSTOM_VC_INFO[vc.id]
info["locked"] = False
overwrites = vc.overwrites or {}
overwrites[ctx.guild.default_role] = discord.PermissionOverwrite(connect=True)
for d_id in info["denied_ids"]:
m = ctx.guild.get_member(d_id)
if m:
overwrites[m] = discord.PermissionOverwrite(connect=False)
await vc.edit(overwrites=overwrites)
await ctx.send("Unlocked the channel. Denied users still cannot join.")
async def set_user_limit(ctx: commands.Context, limit: int):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if limit < 2:
return await ctx.send("Minimum limit is 2.")
if limit > 99:
return await ctx.send("Maximum limit is 99.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set user limit.")
info = CUSTOM_VC_INFO[vc.id]
info["user_limit"] = limit
await vc.edit(user_limit=limit)
await ctx.send(f"User limit set to {limit}.")
async def set_bitrate(ctx: commands.Context, kbps: int):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set bitrate.")
info = CUSTOM_VC_INFO[vc.id]
info["bitrate"] = kbps
if kbps < 8 or kbps > 128:
return await ctx.send(f"Invalid bitrate of {kbps} kbps! Must be a number between `8` and `128`! Default is 64 kbps")
await vc.edit(bitrate=kbps * 1000)
await ctx.send(f"Bitrate set to {kbps} kbps.")
async def set_status(ctx: commands.Context, status_str: str):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set channel status.")
info = CUSTOM_VC_INFO[vc.id]
info["status"] = status_str
await ctx.send(f"Channel status set to: **{status_str}** (placeholder).")
async def op_user(ctx: commands.Context, user: str):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to op someone.")
mem = await resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = CUSTOM_VC_INFO[vc.id]
if "ops" not in info:
info["ops"] = set()
info["ops"].add(mem.id)
await ctx.send(f"Granted co-ownership to {mem.display_name}.")
async def show_settings(ctx: commands.Context):
vc = get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
info = CUSTOM_VC_INFO.get(vc.id)
if not info:
return await ctx.send("No data found for this channel?")
locked_str = "Yes" if info["locked"] else "No"
user_lim = info["user_limit"] if info["user_limit"] else "Default"
bitrate_str = info["bitrate"] if info["bitrate"] else "Default (64 kbps)"
status_str = info.get("status", "None")
owner_id = info["owner_id"]
owner = ctx.guild.get_member(owner_id)
owner_str = owner.display_name if owner else f"<ID {owner_id}>"
allowed_str = ", ".join(map(str, info["allowed_ids"])) if info["allowed_ids"] else "None specified"
denied_str = ", ".join(map(str, info["denied_ids"])) if info["denied_ids"] else "None specified"
msg = (
f"**Channel Settings**\n"
f"Owner: {owner_str}\n"
f"Locked: {locked_str}\n"
f"User Limit: {user_lim}\n"
f"Bitrate: {bitrate_str}\n"
f"Status: {status_str}\n"
f"Allowed: {allowed_str}\n"
f"Denied: {denied_str}\n"
)
await ctx.send(msg)
async def resolve_member(ctx: commands.Context, user_str: str) -> discord.Member:
"""
Attempt to parse user mention/ID/partial name.
"""
user_str = user_str.strip()
if user_str.startswith("<@") and user_str.endswith(">"):
uid = user_str.strip("<@!>")
if uid.isdigit():
return ctx.guild.get_member(int(uid))
elif user_str.isdigit():
return ctx.guild.get_member(int(user_str))
lower = user_str.lower()
for m in ctx.guild.members:
if m.name.lower() == lower or m.display_name.lower() == lower:
return m
return None