OokamiPupV2/cmd_discord/customvc.py

872 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# cmd_discord/customvc.py
#
# TODO
# - Fix "allow" and "deny" subcommands not working (modifications not applied)
# - Fix "lock" subcommand not working (modifications not applied)
# - Add "video_bitrate" to subcommands list
# - Rename "bitrate" to "audio_bitrate"
# - Add automatic channel naming
# - Dynamic mode: displays the game currently (at any time) played by the owner, respecting conservative ratelimits
# - Static mode: names the channel according to pre-defined rules (used on creation if owner is not playing)
# - Add "autoname" to subcommands list
# - Sets the channel name to Dynamic Mode (see above)
# - Modify "settings" to display new information
import discord
from discord.ext import commands, tasks
import datetime
import globals
from globals import logger
from modules.permissions import has_custom_vc_permission
class CustomVCCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.settings_data = globals.load_settings_file("discord_guilds_config.json")
guild_id = "896713616089309184" # Adjust based on your settings structure
self.LOBBY_VC_ID = self.settings_data[guild_id]["customvc_settings"]["lobby_vc_id"]
self.CUSTOM_VC_CATEGORY_ID = self.settings_data[guild_id]["customvc_settings"]["customvc_category_id"]
self.USER_COOLDOWN_MINUTES = self.settings_data[guild_id]["customvc_settings"]["vc_creation_user_cooldown"]
self.GLOBAL_CHANNELS_PER_MINUTE = self.settings_data[guild_id]["customvc_settings"]["vc_creation_global_per_min"]
self.DELETE_DELAY_SECONDS = self.settings_data[guild_id]["customvc_settings"]["empty_vc_autodelete_delay"]
self.MAX_CUSTOM_CHANNELS = self.settings_data[guild_id]["customvc_settings"]["customvc_max_limit"]
self.CUSTOM_VC_INFO = {}
"""
Each entry in CUSTOM_VC_INFO will look like:
{
"owner_id": <int>,
"autoname": <bool>,
"locked": <bool>,
"allowed_ids": set(),
"denied_ids": set(),
"user_limit": <int or None>,
"bitrate": <int or None>,
"last_rename_time": datetime.datetime or None # We'll add this for the rename cooldown
}
"""
self.USER_LAST_CREATED = {}
self.GLOBAL_CREATIONS = []
self.CHANNEL_COUNTER = 0
self.PENDING_DELETIONS = {}
# Start the auto-rename loop
self.rename_cooldown_seconds = 300 # For example, 5 minutes
self.auto_rename_loop.start()
def cog_unload(self):
"""Stop the loop when the cog is unloaded."""
self.auto_rename_loop.cancel()
@tasks.loop(seconds=30.0) # Desired interval
async def auto_rename_loop(self):
"""
Periodically checks for auto-named channels and updates their name
based on the owners current game, respecting a cooldown.
"""
# For debug, let's log that the loop is running:
logger.debug("[AutoRenameLoop] Starting iteration over all custom VCs...")
# We can fetch the one guild if you only have one, or do this for multiple
# For simplicity, we assume the single guild from your JSON:
guild_id = 896713616089309184
guild = self.bot.get_guild(guild_id)
if not guild:
logger.warning("[AutoRenameLoop] Guild not found!")
return
for vc_id, info in list(self.CUSTOM_VC_INFO.items()):
if not info.get("autoname"):
continue # skip if auto-naming disabled
vc = guild.get_channel(vc_id)
if not vc or not isinstance(vc, discord.VoiceChannel):
logger.debug(f"[AutoRenameLoop] VC {vc_id} not found or not a VoiceChannel.")
continue
owner_id = info["owner_id"]
owner = guild.get_member(owner_id)
if not owner:
logger.debug(f"[AutoRenameLoop] Owner {owner_id} not found in guild.")
continue
# Determine what name we *want* the channel to have
desired_name = self.get_desired_name(owner)
current_name = vc.name
# Compare to see if there's a difference
if desired_name != current_name:
# Check when we last renamed
last_rename = info.get("last_rename_time")
now = datetime.datetime.utcnow()
if last_rename is None:
# Means we've never renamed, so let's rename immediately
logger.debug(
f"[AutoRenameLoop] VC {vc.id}: No last rename; renaming from '{current_name}' to '{desired_name}'."
)
await vc.edit(name=desired_name, reason="Auto-naming initial rename.")
info["last_rename_time"] = now
else:
# We have a last rename time, see how many seconds
diff = (now - last_rename).total_seconds()
logger.debug(
f"[AutoRenameLoop] VC {vc.id} name differ. Last rename was {int(diff)}s ago. "
f"Cooldown = {self.rename_cooldown_seconds}."
)
if diff >= self.rename_cooldown_seconds:
# We rename now
logger.debug(
f"[AutoRenameLoop] Renaming VC {vc.id} from '{current_name}' to '{desired_name}'"
)
await vc.edit(name=desired_name, reason="Auto-naming update.")
info["last_rename_time"] = now
else:
logger.debug(
f"[AutoRenameLoop] Skipping rename of VC {vc.id} because cooldown not reached ({int(diff)} < {self.rename_cooldown_seconds})."
)
# end if last_rename is None
else:
logger.debug(f"[AutoRenameLoop] VC {vc.id} already at desired name '{current_name}'. No rename needed.")
logger.debug("[AutoRenameLoop] Finished iteration.")
@auto_rename_loop.before_loop
async def before_auto_rename_loop(self):
"""
If we need the bot to be ready, wait until on_ready is done.
"""
await self.bot.wait_until_ready()
@commands.Cog.listener()
async def on_ready(self):
"""Handles checking existing voice channels on bot startup."""
await self.scan_existing_custom_vcs()
@commands.Cog.listener()
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
"""Handles user movement between voice channels."""
await self.on_voice_update(member, before, after)
@commands.group(name="customvc", invoke_without_command=True)
async def customvc(self, ctx):
"""
Base !customvc command -> show help if no subcommand used.
"""
if ctx.invoked_subcommand is None:
msg = (
"**Custom VC Help**\n"
"Join the lobby to create a new voice channel automatically.\n"
"Subcommands:\n"
"- **name** <new_name> - rename\n"
" - `!customvc name my_custom_vc`\n"
"- **autoname** - enables automatic renaming based on game presence\n"
" - `!customvc autoname`\n"
"- **claim** - claim ownership\n"
" - `!customvc claim`\n"
"- **lock** - lock the channel\n"
" - `!customvc lock`\n"
"- **allow** <user> - allow user\n"
" - `!customvc allow some_user`\n"
"- **deny** <user> - deny user\n"
" - `!customvc deny some_user`\n"
"- **unlock** - unlock channel\n"
" - `!customvc unlock`\n"
"- **users** <count> - set user limit\n"
" - `!customvc users 2`\n"
"- **audio_bitrate** <kbps> - set audio bitrate\n"
" - `!customvc audio_bitrate 64`\n"
"- **video_bitrate** <kbps> - set video bitrate\n"
" - `!customvc video_bitrate 2500`\n"
"- ~~**status** <status_str> - set a custom status~~\n"
"- **op** <user> - co-owner\n"
" - `!customvc op some_user`\n"
"- **settings** - show channel settings\n"
" - `!customvc settings`\n"
)
await ctx.reply(msg)
else:
try:
await self.bot.invoke(ctx) # This will ensure subcommands get processed.
logger.debug(f"{ctx.author.name} executed Custom VC subcommand '{ctx.invoked_subcommand}' in {ctx.channel.name}")
except Exception as e:
logger.error(f"'customvc {ctx.invoked_subcommand}' failed to execute: {e}")
# Subcommands:
@customvc.command(name="name")
async def rename_channel(self, ctx, *, new_name: str):
"""Renames a custom VC."""
await self.rename_channel_logic(ctx, new_name)
@customvc.command(name="claim")
async def claim_channel(self, ctx):
"""Claims ownership of a VC if the owner is absent."""
await self.claim_channel_logic(ctx)
@customvc.command(name="lock")
async def lock_channel(self, ctx):
"""Locks a custom VC."""
await self.lock_channel_logic(ctx)
@customvc.command(name="allow")
async def allow_user(self, ctx, *, user: str):
"""Allows a user to join a VC."""
await self.allow_user_logic(ctx, user)
@customvc.command(name="deny")
async def deny_user(self, ctx, *, user: str):
"""Denies a user from joining a VC."""
await self.deny_user_logic(ctx, user)
@customvc.command(name="unlock")
async def unlock_channel(self, ctx):
"""Unlocks a custom VC."""
await self.unlock_channel_logic(ctx)
@customvc.command(name="settings")
async def show_settings(self, ctx):
"""Shows the settings of the current VC."""
await self.show_settings_logic(ctx)
@customvc.command(name="users")
async def set_users_limit(self, ctx, *, limit: int):
"""Assign a VC users limit"""
await self.set_user_limit_logic(ctx, limit)
@customvc.command(name="op")
async def op_user(self, ctx, *, user: str):
"""Make another user co-owner"""
await self.op_user_logic(ctx, user)
#
# Main voice update logic
#
def get_desired_name(self, member: discord.Member) -> str:
game_name = self.get_current_game(member)
if game_name:
return game_name
else:
return f"{member.display_name}'s Channel"
@auto_rename_loop.before_loop
async def before_auto_rename_loop(self):
"""
If we need the bot to be ready, wait until on_ready is done.
"""
logger.debug("[AutoRenameLoop] Waiting for bot to be ready...")
await self.bot.wait_until_ready()
logger.debug("[AutoRenameLoop] Bot is ready. Starting loop...")
@commands.Cog.listener()
async def on_ready(self):
"""Handles checking existing voice channels on bot startup."""
logger.info("[CustomVCCog] on_ready triggered, scanning for existing custom VCs.")
await self.scan_existing_custom_vcs()
def get_current_game(self, member: discord.Member) -> str | None:
"""
Return the name of the game if the member is playing one, else None.
"""
for activity in member.activities or []:
# In modern discord.py, a "game" is typically an activity with type=ActivityType.playing
if activity.type == discord.ActivityType.playing and activity.name:
return activity.name
return None
# --------------
# Helper: identify if user is playing a game
# --------------
def get_current_game(self, member: discord.Member) -> str | None:
"""
Return the name of the game if the member is playing one, else None.
"""
for activity in member.activities or []:
# In modern discord.py, a "game" is typically an activity with type=ActivityType.playing
if activity.type == discord.ActivityType.playing and activity.name:
return activity.name
return None
# --------------
# EVENT: on_voice_state_update
# --------------
@commands.Cog.listener()
async def on_voice_state_update(
self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState
):
"""Handles user movement between voice channels."""
await self.on_voice_update(member, before, after)
async def on_voice_update(self, member, before, after):
# If user changed channels
if before.channel != after.channel:
logger.debug(
f"[on_voice_update] {member.display_name} changed channels "
f"from '{getattr(before.channel, 'name', None)}' to '{getattr(after.channel, 'name', None)}'"
)
# 1) If user just joined the "lobby"
if after.channel and after.channel.id == self.LOBBY_VC_ID:
logger.debug(f"[on_voice_update] {member.display_name} joined the lobby.")
# Respect rate-limits
if not await self.can_create_vc(member):
# forcibly disconnect
await member.move_to(None)
logger.debug(
f"[on_voice_update] {member.display_name} exceeded creation limits. Disconnected."
)
lobby_chan = after.channel
if lobby_chan and hasattr(lobby_chan, "send"):
await lobby_chan.send(
f"{member.mention}, youve exceeded custom VC creation limits. Try again later!"
)
return
# Identify category
category = member.guild.get_channel(self.CUSTOM_VC_CATEGORY_ID)
if not category or not isinstance(category, discord.CategoryChannel):
logger.error(
f"[CustomVC] Could not find valid category for guild {member.guild.id}. Aborting creation."
)
return
# Attempt to figure out the game or fallback
game_name = self.get_current_game(member)
if game_name:
vc_name = game_name
else:
vc_name = f"{member.display_name}'s Channel"
if not vc_name:
logger.error(
f"[CustomVC] Could not derive channel name from user {member.display_name}"
)
lobby_chan = after.channel
if lobby_chan and hasattr(lobby_chan, "send"):
await lobby_chan.send(
f"Could not create your channel, {member.mention}. Please try again."
)
return
try:
self.CHANNEL_COUNTER += 1
new_vc = await category.create_voice_channel(name=vc_name)
except Exception as e:
logger.error(f"[CustomVC] Failed to create voice channel: {e}", exc_info=True)
return
logger.info(
f"[CustomVC] Created new channel '{vc_name}' (ID {new_vc.id}) for {member.display_name}."
)
await member.move_to(new_vc)
now = datetime.datetime.utcnow()
self.USER_LAST_CREATED[member.id] = now
self.GLOBAL_CREATIONS.append(now)
# prune old from GLOBAL_CREATIONS
cutoff = now - datetime.timedelta(seconds=60)
self.GLOBAL_CREATIONS[:] = [t for t in self.GLOBAL_CREATIONS if t > cutoff]
# Store custom VC info
self.CUSTOM_VC_INFO[new_vc.id] = {
"owner_id": member.id,
"autoname": False,
"locked": False,
"allowed_ids": set(),
"denied_ids": set(),
"user_limit": None,
"bitrate": None,
"last_rename_time": None, # We haven't renamed it yet
}
if hasattr(new_vc, "send"):
await new_vc.send(
f"{member.display_name}, your custom voice channel is ready! "
"Type `!customvc` here for help with subcommands."
)
# If user left a channel that is tracked
if before.channel and before.channel.id in self.CUSTOM_VC_INFO:
old_vc = before.channel
if len(old_vc.members) == 0:
self.schedule_deletion(old_vc)
# Also check if they joined a channel pending deletion
if after.channel and after.channel.id in self.CUSTOM_VC_INFO:
if after.channel.id in self.PENDING_DELETIONS:
self.PENDING_DELETIONS[after.channel.id].cancel()
del self.PENDING_DELETIONS[after.channel.id]
def schedule_deletion(self, vc: discord.VoiceChannel):
"""
Schedules this custom VC for deletion in self.DELETE_DELAY_SECONDS if it stays empty.
"""
import asyncio
async def delete_task():
await asyncio.sleep(self.DELETE_DELAY_SECONDS)
if len(vc.members) == 0:
logger.info(f"[CustomVC] Deleting empty channel '{vc.name}' (ID {vc.id}).")
self.CUSTOM_VC_INFO.pop(vc.id, None)
try:
await vc.delete()
except Exception as e:
logger.error(f"[CustomVC] Could not delete VC {vc.id}: {e}", exc_info=True)
self.PENDING_DELETIONS.pop(vc.id, None)
loop = vc.guild._state.loop
task = loop.create_task(delete_task())
self.PENDING_DELETIONS[vc.id] = task
logger.debug(f"[CustomVC] Scheduled deletion for channel '{vc.name}' in {self.DELETE_DELAY_SECONDS}s.")
async def can_create_vc(self, member: discord.Member) -> bool:
"""
Return False if user or global rate-limits are hit:
- user can only create 1 channel every X minutes
- globally only Y channels per minute
"""
now = datetime.datetime.utcnow()
last_time = self.USER_LAST_CREATED.get(member.id)
if last_time:
diff = (now - last_time).total_seconds()
if diff < (self.USER_COOLDOWN_MINUTES * 60):
logger.debug(
f"[can_create_vc] {member.display_name} last created a VC {int(diff)}s ago, "
f"less than cooldown of {self.USER_COOLDOWN_MINUTES*60}s."
)
return False
# global limit
cutoff = now - datetime.timedelta(seconds=60)
recent = [t for t in self.GLOBAL_CREATIONS if t > cutoff]
if len(recent) >= self.GLOBAL_CHANNELS_PER_MINUTE:
logger.debug(
f"[can_create_vc] Global creation limit of {self.GLOBAL_CHANNELS_PER_MINUTE}/min reached."
)
return False
return True
async def scan_existing_custom_vcs(self):
"""
On startup: check the custom VC category for leftover channels:
- If channel.id == LOBBY_VC_ID -> skip
- If empty, delete immediately
- If non-empty, first occupant is new owner
"""
logger.debug("[scan_existing_custom_vcs] Checking leftover channels at startup...")
for g in self.bot.guilds:
cat = g.get_channel(self.CUSTOM_VC_CATEGORY_ID)
if not cat or not isinstance(cat, discord.CategoryChannel):
continue
for ch in cat.voice_channels:
if ch.id == self.LOBBY_VC_ID:
continue
if len(ch.members) == 0:
# safe to delete
try:
await ch.delete()
logger.info(
f"[scan_existing_custom_vcs] Deleted empty leftover channel: {ch.name} (ID {ch.id})"
)
except Exception as e:
logger.error(
f"[scan_existing_custom_vcs] Could not delete leftover VC {ch.id}: {e}",
exc_info=True
)
else:
# pick first occupant as owner
first = ch.members[0]
self.CHANNEL_COUNTER += 1
self.CUSTOM_VC_INFO[ch.id] = {
"owner_id": first.id,
"autoname": False,
"locked": False,
"allowed_ids": set(),
"denied_ids": set(),
"user_limit": None,
"bitrate": None,
"last_rename_time": None,
}
logger.info(
f"[scan_existing_custom_vcs] Assigned {first.display_name} as owner "
f"of leftover VC: {ch.name} (ID {ch.id})."
)
if hasattr(ch, "send"):
try:
await ch.send(
f"{first.mention}, you're now the owner of leftover channel **{ch.name}** "
"after a restart. Type `!customvc` here to see available commands."
)
except:
pass
#
# The subcommand logic below, referencing CUSTOM_VC_INFO
#
def get_custom_vc_for(self, ctx: commands.Context) -> discord.VoiceChannel:
"""
Return the custom voice channel the command user is currently in (or None).
We'll look at ctx.author.voice.
"""
if not isinstance(ctx.author, discord.Member):
return None
vs = ctx.author.voice
if not vs or not vs.channel:
return None
vc = vs.channel
if vc.id not in self.CUSTOM_VC_INFO:
return None
return vc
def is_initiator_or_mod(self, ctx: commands.Context, vc_id: int) -> bool:
info = self.CUSTOM_VC_INFO.get(vc_id)
if not info:
return False
return has_custom_vc_permission(ctx.author, info["owner_id"])
async def rename_channel_logic(self, ctx: commands.Context, new_name: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("You are not in a custom voice channel.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to rename.")
await vc.edit(name=new_name, reason=f'{ctx.author.name} renamed the channel.')
await ctx.send(f"Renamed channel to **{new_name}**.")
@customvc.command(name="autoname")
async def toggle_autoname(self, ctx: commands.Context):
"""
Toggles automatic renaming of the channel based on the owner's current game.
- If enabling, we rename immediately.
- If disabling, we do nothing else but set the flag to False.
"""
vc = self.get_custom_vc_for(ctx)
if not vc:
await ctx.send("You are not in a custom voice channel.")
return
if not self.is_initiator_or_mod(ctx, vc.id):
await ctx.send("No permission to toggle auto-naming.")
return
info = self.CUSTOM_VC_INFO[vc.id]
if info["autoname"]:
info["autoname"] = False
logger.info(f"[autoname] Disabled for VC {vc.id}.")
await ctx.send("Auto-naming disabled. The channel name will remain as-is.")
else:
info["autoname"] = True
logger.info(f"[autoname] Enabled for VC {vc.id}.")
await ctx.send(
"Auto-naming enabled! The channel will periodically update to reflect your current game."
)
# rename now
owner_id = info["owner_id"]
owner = vc.guild.get_member(owner_id)
if owner:
await self.update_channel_name(owner, vc, immediate=True)
def get_current_game(self, member: discord.Member) -> str | None:
"""
Return the name of the game if the member is playing one, else None.
"""
for activity in member.activities or []:
# In modern discord.py, a "game" is typically an activity with type=ActivityType.playing
if activity.type == discord.ActivityType.playing and activity.name:
return activity.name
return None
async def claim_channel_logic(self, ctx: commands.Context):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
info = self.CUSTOM_VC_INFO.get(vc.id)
if not info:
return await ctx.send("No memory for this channel?")
old = info["owner_id"]
# if author is owner
if old == ctx.author:
return await ctx.send("You're already the owner of this Custom VC!")
# if old owner is still inside
if any(m.id == old for m in vc.members):
return await ctx.send(f"The current owner, {old.name}, is still here!.")
info["owner_id"] = ctx.author.id
await ctx.send("You are now the owner of this channel!")
async def lock_channel_logic(self, ctx: commands.Context):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("You're not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to lock.")
info = self.CUSTOM_VC_INFO[vc.id]
info["locked"] = True
# Use set_permissions() instead of modifying overwrites directly
await vc.set_permissions(ctx.guild.default_role, connect=False, reason=f"{ctx.author.name} locked the Custom VC")
await ctx.send("Channel locked. Only allowed users can join.")
async def allow_user_logic(self, ctx: commands.Context, user: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to allow a user.")
mem = await self.resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = self.CUSTOM_VC_INFO[vc.id]
info["allowed_ids"].add(mem.id)
# Set explicit permissions instead of overwriting
await vc.set_permissions(mem, connect=True, reason=f"{ctx.author.name} allowed {mem.display_name} into their Custom VC")
await ctx.send(f"Allowed **{mem.display_name}** to join.")
async def deny_user_logic(self, ctx: commands.Context, user: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to deny user.")
mem = await self.resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = self.CUSTOM_VC_INFO[vc.id]
if has_custom_vc_permission(mem, info["owner_id"]):
return await ctx.send("Cannot deny a moderator or the owner.")
info["denied_ids"].add(mem.id)
# Explicitly deny permission
await vc.set_permissions(mem, connect=False, reason=f"{ctx.author.name} denied {mem.display_name} from their Custom VC")
await ctx.send(f"Denied **{mem.display_name}** from connecting.")
async def unlock_channel_logic(self, ctx: commands.Context):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to unlock.")
# Fetch category and its permissions
category = vc.category
if not category:
return await ctx.send("Error: Could not determine category permissions.")
# Copy category permissions
overwrites = category.overwrites.copy()
# Update stored channel info
info = self.CUSTOM_VC_INFO[vc.id]
info["locked"] = False
await vc.edit(overwrites=overwrites, reason=f'{ctx.author.name} unlocked their Custom VC')
await ctx.send("Unlocked the channel. Permissions now match the category default.")
async def set_user_limit_logic(self, ctx: commands.Context, limit: int):
MIN = 2
MAX = 99
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if limit < MIN:
return await ctx.send(f"Minimum limit is {MIN}.")
if limit > MAX:
return await ctx.send(f"Maximum limit is {MAX}.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set user limit.")
info = self.CUSTOM_VC_INFO[vc.id]
info["user_limit"] = limit
await vc.edit(user_limit=limit, reason=f'{ctx.author.name} changed users limit to {limit} in their Custom VC')
await ctx.send(f"User limit set to {limit}.")
@customvc.command(name="audio_bitrate")
async def set_audio_bitrate(self, ctx: commands.Context, kbps: int):
"""Sets the audio bitrate for a VC."""
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set audio bitrate.")
if kbps < 8 or kbps > 128:
return await ctx.send(f"Invalid audio bitrate! Must be between 8 and 128 kbps.")
await vc.edit(bitrate=kbps * 1000, reason=f"{ctx.author.name} changed audio bitrate")
await ctx.send(f"Audio bitrate set to {kbps} kbps.")
@customvc.command(name="video_bitrate")
async def set_video_bitrate(self, ctx: commands.Context, kbps: int):
"""Sets the video bitrate for a VC."""
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set video bitrate.")
if kbps < 100 or kbps > 8000:
return await ctx.send(f"Invalid video bitrate! Must be between 100 and 8000 kbps.")
await vc.edit(video_quality_mode=discord.VideoQualityMode.full, bitrate=kbps * 1000, reason=f"{ctx.author.name} changed video bitrate")
await ctx.send(f"Video bitrate set to {kbps} kbps.")
async def set_status_logic(self, ctx: commands.Context, status_str: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to set channel status.")
info = self.CUSTOM_VC_INFO[vc.id]
info["status"] = status_str
await ctx.send(f"Channel status set to: **{status_str}** (placeholder).")
async def update_channel_name(
self, member: discord.Member, vc: discord.VoiceChannel, immediate: bool = False
):
"""
Attempt to rename the channel to reflect the owners current game or fallback.
If `immediate=True`, we ignore the rename cooldown (used for first-time enabling).
"""
if not vc or not member:
return
desired_name = self.get_desired_name(member)
current_name = vc.name
if current_name == desired_name:
logger.debug(
f"[update_channel_name] VC {vc.id} already named '{current_name}', no change needed."
)
return
info = self.CUSTOM_VC_INFO.get(vc.id)
if not info:
return # safety check
now = datetime.datetime.utcnow()
last_rename = info.get("last_rename_time")
if immediate:
logger.debug(
f"[update_channel_name] Doing an immediate rename of VC {vc.id} to '{desired_name}'."
)
await vc.edit(name=desired_name, reason="Manual immediate rename (autoname toggled on).")
info["last_rename_time"] = now
else:
# Normal logic with cooldown
if last_rename is None:
logger.debug(
f"[update_channel_name] VC {vc.id}: first rename => '{current_name}' -> '{desired_name}'."
)
await vc.edit(name=desired_name, reason="Auto-naming initial rename.")
info["last_rename_time"] = now
else:
diff = (now - last_rename).total_seconds()
if diff >= self.rename_cooldown_seconds:
logger.debug(
f"[update_channel_name] VC {vc.id} rename from '{current_name}' -> '{desired_name}'"
f"(cooldown {int(diff)}s >= {self.rename_cooldown_seconds})."
)
await vc.edit(name=desired_name, reason="Auto-naming update.")
info["last_rename_time"] = now
else:
logger.debug(
f"[update_channel_name] Skipping rename for VC {vc.id}, only {int(diff)}s "
f"since last rename (cooldown {self.rename_cooldown_seconds}s)."
)
async def op_user_logic(self, ctx: commands.Context, user: str):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
if not self.is_initiator_or_mod(ctx, vc.id):
return await ctx.send("No permission to op someone.")
mem = await self.resolve_member(ctx, user)
if not mem:
return await ctx.send(f"Could not find user: {user}.")
info = self.CUSTOM_VC_INFO[vc.id]
if "ops" not in info:
info["ops"] = set()
info["ops"].add(mem.id)
await ctx.send(f"Granted co-ownership to {mem.display_name}.")
async def show_settings_logic(self, ctx: commands.Context):
vc = self.get_custom_vc_for(ctx)
if not vc:
return await ctx.send("Not in a custom VC.")
info = self.CUSTOM_VC_INFO.get(vc.id)
if not info:
return await ctx.send("No data found for this channel?")
autoname_str = "Yes" if info["autoname"] else "No"
locked_str = "Yes" if info["locked"] else "No"
user_lim = info["user_limit"] if info["user_limit"] else "Default"
bitrate_str = info["bitrate"] if info["bitrate"] else "Default (64 kbps)"
status_str = info.get("status", "None")
owner_id = info["owner_id"]
owner = ctx.guild.get_member(owner_id)
owner_str = owner.display_name if owner else f"<ID {owner_id}>"
allowed_str = ", ".join(map(str, info["allowed_ids"])) if info["allowed_ids"] else "None specified"
denied_str = ", ".join(map(str, info["denied_ids"])) if info["denied_ids"] else "None specified"
msg = (
f"**Channel Settings**\n"
f"Owner: {owner_str}\n"
f"Auto rename: {autoname_str}\n"
f"Locked: {locked_str}\n"
f"User Limit: {user_lim}\n"
f"Bitrate: {bitrate_str}\n"
f"Status: {status_str}\n"
f"Allowed: {allowed_str}\n"
f"Denied: {denied_str}\n"
)
await ctx.send(msg)
async def resolve_member(ctx: commands.Context, user_str: str) -> discord.Member:
"""
Attempt to parse user mention/ID/partial name.
"""
user_str = user_str.strip()
if user_str.startswith("<@") and user_str.endswith(">"):
uid = user_str.strip("<@!>")
if uid.isdigit():
return ctx.guild.get_member(int(uid))
elif user_str.isdigit():
return ctx.guild.get_member(int(user_str))
lower = user_str.lower()
for m in ctx.guild.members:
if m.name.lower() == lower or m.display_name.lower() == lower:
return m
return None
async def setup(bot):
await bot.add_cog(CustomVCCog(bot))