OokamiPupV2/cmd_discord/customvc.py

502 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# cmd_discord/customvc.py
import discord
from discord.ext import commands
import datetime
import globals
from modules.permissions import has_custom_vc_permission
class CustomVCCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.settings_data = globals.load_settings_file("discord_guilds_config.json")
guild_id = "896713616089309184" # Adjust based on your settings structure
self.LOBBY_VC_ID = self.settings_data[guild_id]["customvc_settings"]["lobby_vc_id"]
self.CUSTOM_VC_CATEGORY_ID = self.settings_data[guild_id]["customvc_settings"]["customvc_category_id"]
self.USER_COOLDOWN_MINUTES = self.settings_data[guild_id]["customvc_settings"]["vc_creation_user_cooldown"]
self.GLOBAL_CHANNELS_PER_MINUTE = self.settings_data[guild_id]["customvc_settings"]["vc_creation_global_per_min"]
self.DELETE_DELAY_SECONDS = self.settings_data[guild_id]["customvc_settings"]["empty_vc_autodelete_delay"]
self.MAX_CUSTOM_CHANNELS = self.settings_data[guild_id]["customvc_settings"]["customvc_max_limit"]
self.CUSTOM_VC_INFO = {}
self.USER_LAST_CREATED = {}
self.GLOBAL_CREATIONS = []
self.CHANNEL_COUNTER = 0
self.PENDING_DELETIONS = {}
@commands.Cog.listener()
async def on_ready(self):
"""Handles checking existing voice channels on bot startup."""
await self.scan_existing_custom_vcs()
@commands.Cog.listener()
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
"""Handles user movement between voice channels."""
await self.on_voice_update(member, before, after)
@commands.group(name="customvc", invoke_without_command=True)
async def customvc(self, ctx):
"""
Base !customvc command -> show help if no subcommand used.
"""
if ctx.invoked_subcommand is None:
msg = (
"**Custom VC Help**\n"
"Join the lobby to create a new voice channel automatically.\n"
"Subcommands:\n"
"- **name** <new_name> - rename\n"
" - `!customvc name my_custom_vc`\n"
"- **claim** - claim ownership\n"
" - `!customvc claim`\n"
"- **lock** - lock the channel\n"
" - `!customvc lock`\n"
"- **allow** <user> - allow user\n"
" - `!customvc allow some_user`\n"
"- **deny** <user> - deny user\n"
" - `!customvc deny some_user`\n"
"- **unlock** - unlock channel\n"
" - `!customvc unlock`\n"
"- **users** <count> - set user limit\n"
" - `!customvc users 2`\n"
"- **bitrate** <kbps> - set channel bitrate\n"
" - `!customvc bitrate some_bitrate`\n"
"- **status** <status_str> - set a custom status **(TODO)**\n"
"- **op** <user> - co-owner\n"
" - `!customvc op some_user`\n"
"- **settings** - show config\n"
" - `!customvc settings`\n"
)
await ctx.reply(msg)
else:
await self.bot.invoke(ctx) # This will ensure subcommands get processed.
# Subcommands:
@customvc.command(name="name")
async def rename_channel(self, ctx, *, new_name: str):
"""Renames a custom VC."""
await self.rename_channel_logic(ctx, new_name)
@customvc.command(name="claim")
async def claim_channel(self, ctx):
"""Claims ownership of a VC if the owner is absent."""
await self.claim_channel_logic(ctx)
@customvc.command(name="lock")
async def lock_channel(self, ctx):
"""Locks a custom VC."""
await self.lock_channel_logic(ctx)
@customvc.command(name="allow")
async def allow_user(self, ctx, *, user: str):
"""Allows a user to join a VC."""
await self.allow_user_logic(ctx, user)
@customvc.command(name="deny")
async def deny_user(self, ctx, *, user: str):
"""Denies a user from joining a VC."""
await self.deny_user_logic(ctx, user)
@customvc.command(name="unlock")
async def unlock_channel(self, ctx):
"""Unlocks a custom VC."""
await self.unlock_channel_logic(ctx)
@customvc.command(name="settings")
async def show_settings(self, ctx):
"""Shows the settings of the current VC."""
await self.show_settings_logic(ctx)
@customvc.command(name="users")
async def set_users_limit(self, ctx):
"""Assign a VC users limit"""
await self.set_user_limit_logic(ctx)
#
# Main voice update logic
#
async def on_voice_update(self, member, before, after):
if before.channel != after.channel:
# 1) If user just joined the lobby
if after.channel and after.channel.id == self.LOBBY_VC_ID:
if not await self.can_create_vc(member):
# Rate-limit => forcibly disconnect
await member.move_to(None)
# Also mention them in the ephemeral chat of LOBBY_VC_ID if we want (if possible)
lobby_chan = after.channel
if lobby_chan and hasattr(lobby_chan, "send"):
# We'll try to mention them
await lobby_chan.send(f"{member.mention}, youve exceeded custom VC creation limits. Try again later!")
return
# 2) Create a new custom VC
self.CHANNEL_COUNTER += 1
vc_name = f"VC {self.CHANNEL_COUNTER}"
if self.CHANNEL_COUNTER > self.MAX_CUSTOM_CHANNELS:
vc_name = f"Overflow {self.CHANNEL_COUNTER}"
category = member.guild.get_channel(self.CUSTOM_VC_CATEGORY_ID)
if not category or not isinstance(category, discord.CategoryChannel):
globals.log("Could not find a valid custom VC category.", "INFO")
return
new_vc = await category.create_voice_channel(name=vc_name)
await member.move_to(new_vc)
# Record memory
self.CUSTOM_VC_INFO[new_vc.id] = {
"owner_id": member.id,
"locked": False,
"allowed_ids": set(),
"denied_ids": set(),
"user_limit": None,
"bitrate": None,
}
now = datetime.datetime.utcnow()
self.USER_LAST_CREATED[member.id] = now
self.GLOBAL_CREATIONS.append(now)
# prune old
cutoff = now - datetime.timedelta(seconds=60)
self.GLOBAL_CREATIONS[:] = [t for t in self.GLOBAL_CREATIONS if t > cutoff]
# Announce in the new VC's ephemeral chat
if hasattr(new_vc, "send"):
await new_vc.send(
f"{member.mention}, your custom voice channel is ready! "
"Type `!customvc` here for help with subcommands."
)
# 3) If user left a custom VC -> maybe its now empty
if before.channel and before.channel.id in self.CUSTOM_VC_INFO:
old_vc = before.channel
if len(old_vc.members) == 0:
self.schedule_deletion(old_vc)
# If user joined a custom VC that was pending deletion, cancel it
if after.channel and after.channel.id in self.CUSTOM_VC_INFO:
if after.channel.id in self.PENDING_DELETIONS:
self.PENDING_DELETIONS[after.channel.id].cancel()
del self.PENDING_DELETIONS[after.channel.id]
def schedule_deletion(self, vc: discord.VoiceChannel):
"""
Schedules this custom VC for deletion in 10s if it stays empty.
"""
import asyncio
async def delete_task():
await asyncio.sleep(self.DELETE_DELAY_SECONDS)
if len(vc.members) == 0:
self.CUSTOM_VC_INFO.pop(vc.id, None)
try:
await vc.delete()
except:
pass
self.PENDING_DELETIONS.pop(vc.id, None)
loop = vc.guild._state.loop
task = loop.create_task(delete_task())
self.PENDING_DELETIONS[vc.id] = task
async def can_create_vc(self, member: discord.Member) -> bool:
"""
Return False if user or global rate-limits are hit:
- user can only create 1 channel every 5 minutes
- globally only 2 channels per minute
"""
now = datetime.datetime.utcnow()
last_time = self.USER_LAST_CREATED.get(member.id)
if last_time:
diff = (now - last_time).total_seconds()
if diff < (self.USER_COOLDOWN_MINUTES * 60):
return False
# global limit
cutoff = now - datetime.timedelta(seconds=60)
recent = [t for t in self.GLOBAL_CREATIONS if t > cutoff]
if len(recent) >= self.GLOBAL_CHANNELS_PER_MINUTE:
return False
return True
async def scan_existing_custom_vcs(self):
"""
On startup: check the custom VC category for leftover channels:
- If channel.id == LOBBY_VC_ID -> skip (don't delete the main lobby).
- If empty, delete immediately.
- If non-empty, first occupant is new owner. Mention them in ephemeral chat if possible.
"""
for g in self.bot.guilds:
cat = g.get_channel(self.CUSTOM_VC_CATEGORY_ID)
if not cat or not isinstance(cat, discord.CategoryChannel):
continue
for ch in cat.voice_channels:
# skip the LOBBY
if ch.id == self.LOBBY_VC_ID:
continue
if len(ch.members) == 0:
# safe to delete
try:
await ch.delete()
except:
pass
globals.log(f"Deleted empty leftover channel: {ch.name}", "INFO")
else:
# pick first occupant
first = ch.members[0]
self.CHANNEL_COUNTER += 1
self.CUSTOM_VC_INFO[ch.id] = {
"owner_id": first.id,
"locked": False,
"allowed_ids": set(),
"denied_ids": set(),
"user_limit": None,
"bitrate": None,
}
globals.log(f"Assigned {first.display_name} as owner of leftover VC: {ch.name}", "INFO")
if hasattr(ch, "send"):
try:
await ch.send(
f"{first.mention}, you're now the owner of leftover channel **{ch.name}** after a restart. "
"Type `!customvc` here to see available commands."
)
except:
pass
#
# The subcommand logic below, referencing CUSTOM_VC_INFO
#
def get_custom_vc_for(self, ctx: commands.Context) -> discord.VoiceChannel:
"""
Return the custom voice channel the command user is currently in (or None).
We'll look at ctx.author.voice.
"""
if not isinstance(ctx.author, discord.Member):
return None
vs = ctx.author.voice
if not vs or not vs.channel:
return None
vc = vs.channel
if vc.id not in self.CUSTOM_VC_INFO:
return None
return vc
def is_initiator_or_mod(self, ctx: commands.Context, vc_id: int) -> bool:
info = self.CUSTOM_VC_INFO.get(vc_id)
if not info:
return False
return has_custom_vc_permission(ctx.author, info["owner_id"])
async def rename_channel_logic(self, ctx: commands.Context, new_name: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("You are not in a custom voice channel.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to rename.")
await vc.edit(name=new_name)
await ctx.send(f"Renamed channel to **{new_name}**.")
async def claim_channel_logic(self, ctx: commands.Context):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
info = self.CUSTOM_VC_INFO.get(vc.id)
if not info:
return await ctx.send("No memory for this channel?")
old = info["owner_id"]
# if old owner is still inside
if any(m.id == old for m in vc.members):
return await ctx.send("The original owner is still here; you cannot claim.")
info["owner_id"] = ctx.author.id
await ctx.send("You are now the owner of this channel!")
async def lock_channel_logic(self, ctx: commands.Context):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("You're not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to lock.")
info = self.CUSTOM_VC_INFO[vc.id]
info["locked"] = True
overwrites = vc.overwrites or {}
overwrites[ctx.guild.default_role] = discord.PermissionOverwrite(connect=False)
await vc.edit(overwrites=overwrites)
await ctx.send("Channel locked.")
async def allow_user_logic(self, ctx: commands.Context, user: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to allow a user.")
mem = await self.resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = self.CUSTOM_VC_INFO[vc.id]
info["allowed_ids"].add(mem.id)
overwrites = vc.overwrites or {}
overwrites[mem] = discord.PermissionOverwrite(connect=True)
await vc.edit(overwrites=overwrites)
await ctx.send(f"Allowed **{mem.display_name}** to join.")
async def deny_user_logic(self, ctx: commands.Context, user: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to deny user.")
mem = await self.resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = self.CUSTOM_VC_INFO[vc.id]
# check if they're mod or owner
if has_custom_vc_permission(mem, info["owner_id"]):
return await ctx.send("Cannot deny a moderator or the owner.")
info["denied_ids"].add(mem.id)
overwrites = vc.overwrites or {}
overwrites[mem] = discord.PermissionOverwrite(connect=False)
await vc.edit(overwrites=overwrites)
await ctx.send(f"Denied {mem.display_name} from connecting.")
async def unlock_channel_logic(self, ctx: commands.Context):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to unlock.")
info = self.CUSTOM_VC_INFO[vc.id]
info["locked"] = False
overwrites = vc.overwrites or {}
overwrites[ctx.guild.default_role] = discord.PermissionOverwrite(connect=True)
for d_id in info["denied_ids"]:
m = ctx.guild.get_member(d_id)
if m:
overwrites[m] = discord.PermissionOverwrite(connect=False)
await vc.edit(overwrites=overwrites)
await ctx.send("Unlocked the channel. Denied users still cannot join.")
async def set_user_limit_logic(self, ctx: commands.Context, limit: int):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if limit < 2:
return await ctx.send("Minimum limit is 2.")
if limit > 99:
return await ctx.send("Maximum limit is 99.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set user limit.")
info = self.CUSTOM_VC_INFO[vc.id]
info["user_limit"] = limit
await vc.edit(user_limit=limit)
await ctx.send(f"User limit set to {limit}.")
async def set_bitrate_logic(self, ctx: commands.Context, kbps: int):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set bitrate.")
info = self.CUSTOM_VC_INFO[vc.id]
info["bitrate"] = kbps
if kbps < 8 or kbps > 128:
return await ctx.send(f"Invalid bitrate of {kbps} kbps! Must be a number between `8` and `128`! Default is 64 kbps")
await vc.edit(bitrate=kbps * 1000)
await ctx.send(f"Bitrate set to {kbps} kbps.")
async def set_status_logic(self, ctx: commands.Context, status_str: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set channel status.")
info = self.CUSTOM_VC_INFO[vc.id]
info["status"] = status_str
await ctx.send(f"Channel status set to: **{status_str}** (placeholder).")
async def op_user_logic(self, ctx: commands.Context, user: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to op someone.")
mem = await self.resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = self.CUSTOM_VC_INFO[vc.id]
if "ops" not in info:
info["ops"] = set()
info["ops"].add(mem.id)
await ctx.send(f"Granted co-ownership to {mem.display_name}.")
async def show_settings_logic(self, ctx: commands.Context):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
info = self.CUSTOM_VC_INFO.get(vc.id)
if not info:
return await ctx.send("No data found for this channel?")
locked_str = "Yes" if info["locked"] else "No"
user_lim = info["user_limit"] if info["user_limit"] else "Default"
bitrate_str = info["bitrate"] if info["bitrate"] else "Default (64 kbps)"
status_str = info.get("status", "None")
owner_id = info["owner_id"]
owner = ctx.guild.get_member(owner_id)
owner_str = owner.display_name if owner else f"<ID {owner_id}>"
allowed_str = ", ".join(map(str, info["allowed_ids"])) if info["allowed_ids"] else "None specified"
denied_str = ", ".join(map(str, info["denied_ids"])) if info["denied_ids"] else "None specified"
msg = (
f"**Channel Settings**\n"
f"Owner: {owner_str}\n"
f"Locked: {locked_str}\n"
f"User Limit: {user_lim}\n"
f"Bitrate: {bitrate_str}\n"
f"Status: {status_str}\n"
f"Allowed: {allowed_str}\n"
f"Denied: {denied_str}\n"
)
await ctx.send(msg)
async def resolve_member(ctx: commands.Context, user_str: str) -> discord.Member:
"""
Attempt to parse user mention/ID/partial name.
"""
user_str = user_str.strip()
if user_str.startswith("<@") and user_str.endswith(">"):
uid = user_str.strip("<@!>")
if uid.isdigit():
return ctx.guild.get_member(int(uid))
elif user_str.isdigit():
return ctx.guild.get_member(int(user_str))
lower = user_str.lower()
for m in ctx.guild.members:
if m.name.lower() == lower or m.display_name.lower() == lower:
return m
return None
async def setup(bot):
await bot.add_cog(CustomVCCog(bot))