OokamiPupV2/bot_discord.py

481 lines
22 KiB
Python

# bot_discord.py
import discord
from discord import app_commands
from discord.ext import commands, tasks
import importlib
import cmd_discord
import json
import globals
import modules
import modules.utility
from modules.db import log_message, lookup_user, log_bot_event
primary_guild = globals.constants.primary_discord_guild()
class DiscordBot(commands.Bot):
def __init__(self):
super().__init__(command_prefix="!", intents=discord.Intents.all())
self.remove_command("help") # Remove built-in help function
self.config = globals.constants.config_data
self.log = globals.log # Use the logging function from bots.py
self.db_conn = None # We'll set this later
self.help_data = None # We'll set this later
self.load_commands()
globals.log("Discord bot initiated")
# async def sync_slash_commands(self):
# """Syncs slash commands for the bot."""
# await self.wait_until_ready()
# try:
# await self.tree.sync()
# primary_guild = discord.Object(id=int(self.config["discord_guilds"][0]))
# await self.tree.sync(guild=primary_guild)
# self.log("Discord slash commands synced.")
# except Exception as e:
# self.log(f"Unable to sync Discord slash commands: {e}", "ERROR")
def set_db_connection(self, db_conn):
"""
Store the DB connection in the bot so commands can use it.
"""
self.db_conn = db_conn
def load_commands(self):
"""
Load all commands from cmd_discord.py
"""
try:
importlib.reload(cmd_discord) # Reload the commands file
cmd_discord.setup(self) # Ensure commands are registered
globals.log("Discord commands loaded successfully.")
# Load help info
help_json_path = "dictionary/help_discord.json"
modules.utility.initialize_help_data(
bot=self,
help_json_path=help_json_path,
is_discord=True
)
except Exception as e:
_result = f"Error loading Discord commands: {e}"
globals.log(_result, "ERROR", True)
@commands.command(name="cmd_reload")
@commands.is_owner()
async def cmd_reload(self, ctx: commands.Context):
try:
importlib.reload(cmd_discord) # Reload the commands file
cmd_discord.setup(self) # Ensure commands are registered
# Load help info
help_json_path = "dictionary/help_discord.json"
modules.utility.initialize_help_data(
bot=self,
help_json_path=help_json_path,
is_discord=True
)
_result = "Commands reloaded successfully"
globals.log("Discord commands reloaded successfully.")
except Exception as e:
_result = f"Error reloading Discord commands: {e}"
globals.log(_result, "ERROR", True)
await ctx.reply(_result)
async def on_message(self, message):
if message.guild:
guild_name = message.guild.name
channel_name = message.channel.name
else:
guild_name = "DM"
channel_name = "Direct Message"
globals.log(f"Message detected by '{message.author.name}' in '{guild_name}' - #'{channel_name}'", "DEBUG")
globals.log(f"Message content: '{message.content}'", "DEBUG")
globals.log(f"Attempting UUI lookup on '{message.author.name}' ...", "DEBUG")
try:
# If it's a bot message, ignore or pass user_is_bot=True
is_bot = message.author.bot
user_id = str(message.author.id)
user_name = message.author.name # no discriminator
display_name = message.author.display_name
# Track user activity first
modules.utility.track_user_activity(
db_conn=self.db_conn,
platform="discord",
user_id=user_id,
username=user_name,
display_name=display_name,
user_is_bot=is_bot
)
# Let log_message() handle UUID lookup internally
platform_str = f"discord-{guild_name}"
channel_str = channel_name
attachments = ", ".join(a.url for a in message.attachments) if message.attachments else ""
log_message(
db_conn=self.db_conn,
identifier=user_id,
identifier_type="discord_user_id",
message_content=message.content or "",
platform=platform_str,
channel=channel_str,
attachments=attachments
)
except Exception as e:
globals.log(f"... UUI lookup failed: {e}", "WARNING")
pass
try:
# Pass message contents to commands processing
await self.process_commands(message)
globals.log(f"Command processing complete", "DEBUG")
except Exception as e:
globals.log(f"Command processing failed: {e}", "ERROR")
def load_bot_settings(self):
"""Loads bot activity settings from JSON file."""
try:
with open("settings/discord_bot_settings.json", "r") as file:
return json.load(file)
except Exception as e:
self.log(f"Failed to load settings: {e}", "ERROR")
return {
"activity_mode": 0,
"static_activity": {"type": "Playing", "name": "with my commands!"},
"rotating_activities": [],
"dynamic_activities": {},
"rotation_interval": 600
}
async def on_command(self, ctx):
"""Logs every command execution at DEBUG level."""
_cmd_args = str(ctx.message.content).split(" ")[1:]
channel_name = "Direct Message" if "Direct Message with" in str(ctx.channel) else ctx.channel
globals.log(f"Command '{ctx.command}' (Discord) initiated by {ctx.author} in #{channel_name}", "DEBUG")
if len(_cmd_args) > 1: globals.log(f"!{ctx.command} arguments: {_cmd_args}", "DEBUG")
async def on_interaction(interaction: discord.Interaction):
# Only log application command (slash command) interactions.
if interaction.type == discord.InteractionType.application_command:
# Get the command name from the interaction data.
command_name = interaction.data.get("name")
# Get the options (arguments) if any.
options = interaction.data.get("options", [])
# Convert options to a list of values or key-value pairs.
option_values = [f'{opt.get("name")}: {opt.get("value")}' for opt in options]
# Determine the channel name (or DM).
if interaction.channel and hasattr(interaction.channel, "name"):
channel_name = interaction.channel.name
else:
channel_name = "Direct Message"
globals.log(
f"Command '{command_name}' (Discord) initiated by {interaction.user} in #{channel_name}",
"DEBUG"
)
if option_values:
globals.log(f"Command '{command_name}' arguments: {option_values}", "DEBUG")
async def on_ready(self):
"""Runs when the bot successfully logs in."""
# Load activity settings
self.settings = self.load_bot_settings()
# Set initial bot activity
await self.update_activity()
# Sync Slash Commands
try:
# Sync slash commands globally
#await self.tree.sync()
#globals.log("Discord slash commands synced.")
num_guilds = len(self.config["discord_guilds"])
cmd_tree_result = (await self.tree.sync(guild=primary_guild["object"]))
command_names = [command.name for command in cmd_tree_result] if cmd_tree_result else None
if primary_guild["id"]:
try:
guild_info = await modules.utility.get_guild_info(self, primary_guild["id"])
primary_guild_name = guild_info["name"]
except Exception as e:
primary_guild_name = f"{primary_guild["id"]}"
globals.log(f"Guild lookup failed: {e}", "ERROR")
_log_message = f"{num_guilds} guilds (global)" if num_guilds > 1 else f"guild: {primary_guild_name}"
globals.log(f"Discord slash commands force synced to {_log_message}")
globals.log(f"Discord slash commands that got synced: {command_names}")
else:
globals.log("Discord commands synced globally.")
except Exception as e:
globals.log(f"Unable to sync Discord slash commands: {e}")
# Log successful bot startup
globals.log(f"Discord bot is online as {self.user}")
log_bot_event(self.db_conn, "DISCORD_RECONNECTED", "Discord bot logged in.")
async def on_disconnect(self):
globals.log("Discord bot has lost connection!", "WARNING")
log_bot_event(self.db_conn, "DISCORD_DISCONNECTED", "Discord bot lost connection.")
async def update_activity(self):
"""Sets the bot's activity based on settings."""
mode = self.settings["activity_mode"]
if mode == 0:
await self.change_presence(activity=None)
self.log("Activity disabled", "DEBUG")
elif mode == 1:
# Set static activity
activity_data = self.settings["static_activity"]
activity = self.get_activity(activity_data["type"], activity_data["name"])
await self.change_presence(activity=activity)
self.log(f"Static activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
elif mode == 2:
# Start rotating activity task
if not self.change_rotating_activity.is_running():
self.change_rotating_activity.start()
self.log("Rotating activity mode enabled", "DEBUG")
elif mode == 3:
# Check for dynamic activity
await self.set_dynamic_activity()
@tasks.loop(seconds=600) # Default to 10 minutes
async def change_rotating_activity(self):
"""Rotates activities every set interval."""
activities = self.settings["rotating_activities"]
if not activities:
return
# Pick the next activity
activity_data = activities.pop(0)
activities.append(activity_data) # Move to the end of the list
activity = self.get_activity(activity_data["type"], activity_data["name"])
await self.change_presence(activity=activity)
self.log(f"Rotating activity: {activity_data['type']} {activity_data['name']}", "DEBUG")
async def set_dynamic_activity(self):
"""Sets a dynamic activity based on external conditions."""
twitch_live = await modules.utility.is_channel_live(self)
if twitch_live:
activity_data = self.settings["dynamic_activities"]["twitch_live"]
else:
# activity_data = self.settings["dynamic_activities"].get("default_idle", None)
if not self.change_rotating_activity.is_running():
self.change_rotating_activity.start()
if activity_data:
activity = self.get_activity(activity_data["type"], activity_data["name"], activity_data.get("url"))
await self.change_presence(activity=activity)
self.log(f"Dynamic activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
def get_activity(self, activity_type, name, url=None):
"""Returns a discord activity object based on type, including support for Custom Status."""
activity_map = {
"Playing": discord.Game(name=name),
"Streaming": discord.Streaming(name=name, url=url or "https://twitch.tv/OokamiKunTV"),
"Listening": discord.Activity(type=discord.ActivityType.listening, name=name),
"Watching": discord.Activity(type=discord.ActivityType.watching, name=name),
"Custom": discord.CustomActivity(name=name)
}
return activity_map.get(activity_type, discord.Game(name="around in Discord"))
async def on_voice_state_update(self, member, before, after):
"""
Tracks user joins, leaves, mutes, deafens, streams, and voice channel moves.
"""
guild_id = str(member.guild.id)
discord_user_id = str(member.id)
voice_channel = after.channel.name if after.channel else before.channel.name if before.channel else None
# Ensure user exists in the UUI system
user_uuid = modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
globals.log(f"User {member.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "WARNING")
modules.utility.track_user_activity(
db_conn=self.db_conn,
platform="discord",
user_id=discord_user_id,
username=member.name,
display_name=member.display_name,
user_is_bot=member.bot
)
user_uuid= modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
globals.log(f"ERROR: Failed to associate {member.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
return # Prevent logging with invalid UUID
# Detect join and leave events
if before.channel is None and after.channel is not None:
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "JOIN", after.channel.name)
elif before.channel is not None and after.channel is None:
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "LEAVE", before.channel.name)
# Detect VC moves (self/moved)
if before.channel and after.channel and before.channel != after.channel:
move_detail = f"{before.channel.name} -> {after.channel.name}"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "VC_MOVE", after.channel.name, move_detail)
# Detect mute/unmute
if before.self_mute != after.self_mute:
mute_action = "MUTE" if after.self_mute else "UNMUTE"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, mute_action, voice_channel)
# Detect deafen/undeafen
if before.self_deaf != after.self_deaf:
deaf_action = "DEAFEN" if after.self_deaf else "UNDEAFEN"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, deaf_action, voice_channel)
# Detect streaming
if before.self_stream != after.self_stream:
stream_action = "STREAM_START" if after.self_stream else "STREAM_STOP"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, stream_action, voice_channel)
# Detect camera usage
if before.self_video != after.self_video:
camera_action = "CAMERA_ON" if after.self_video else "CAMERA_OFF"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, camera_action, voice_channel)
async def on_presence_update(self, before, after):
"""
Detects when a user starts or stops a game, Spotify, or Discord activity.
Ensures the activity is logged using the correct UUID from the UUI system.
"""
if not after.guild: # Ensure it's in a guild (server)
return
guild_id = str(after.guild.id)
discord_user_id = str(after.id)
# Ensure user exists in the UUI system
user_uuid = modules.db.lookup_user(
self.db_conn,
identifier=discord_user_id,
identifier_type="discord_user_id",
target_identifier="UUID"
)
if not user_uuid:
globals.log(f"User {after.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "WARNING")
modules.utility.track_user_activity(
db_conn=self.db_conn,
platform="discord",
user_id=discord_user_id,
username=after.name,
display_name=after.display_name,
user_is_bot=after.bot
)
user_uuid = modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
globals.log(f"ERROR: Failed to associate {after.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
return
# Check all activities
new_activity = None
for n_activity in after.activities:
if isinstance(n_activity, discord.Game):
new_activity = ("GAME_START", n_activity.name)
elif isinstance(n_activity, discord.Spotify):
# Get artist name(s) and format as "{artist_name} - {song_title}"
artist_name = ", ".join(n_activity.artists)
song_name = n_activity.title
spotify_detail = f"{artist_name} - {song_name}"
new_activity = ("LISTENING_SPOTIFY", spotify_detail)
elif isinstance(n_activity, discord.Streaming):
new_activity = ("STREAM_START", n_activity.game or "Sharing screen")
# Check all activities
old_activity = None
for o_activity in before.activities:
if isinstance(o_activity, discord.Game):
old_activity = ("GAME_STOP", o_activity.name)
# IGNORE OLD SPOTIFY EVENTS
elif isinstance(o_activity, discord.Streaming):
old_activity = ("STREAM_STOP", o_activity.game or "Sharing screen")
if new_activity:
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, new_activity[0], None, new_activity[1])
if old_activity:
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, old_activity[0], None, old_activity[1])
# async def start_account_linking(self, interaction: discord.Interaction):
# """Starts the linking process by generating a link code and displaying instructions."""
# user_id = str(interaction.user.id)
# # Check if the user already has a linked account
# user_data = modules.db.lookup_user(self.db_conn, self.log, identifier=user_id, identifier_type="discord_user_id")
# if user_data and user_data["twitch_user_id"]:
# link_date = user_data["datetime_linked"]
# await interaction.response.send_message(
# f"Your Discord account is already linked to Twitch user **{user_data['twitch_user_display_name']}** "
# f"(linked on {link_date}). You must remove the link before linking another account.", ephemeral=True)
# return
# # Generate a unique link code
# link_code = modules.utility.generate_link_code()
# modules.db.run_db_operation(
# self.db_conn, "write",
# "INSERT INTO link_codes (DISCORD_USER_ID, LINK_CODE) VALUES (?, ?)",
# (user_id, link_code), self.log
# )
# # Show the user the link modal
# await interaction.response.send_message(
# f"To link your Twitch account, post the following message in Twitch chat:\n"
# f"`!acc_link {link_code}`\n\n"
# f"Then, return here and click 'Done'.", ephemeral=True
# )
# async def finalize_account_linking(self, interaction: discord.Interaction):
# """Finalizes the linking process by merging duplicate UUIDs."""
# from modules import db
# user_id = str(interaction.user.id)
# # Fetch the updated user info
# user_data = modules.db.lookup_user(self.db_conn, self.log, identifier=user_id, identifier_type="discord_user_id")
# if not user_data or not user_data["twitch_user_id"]:
# await interaction.response.send_message(
# "No linked Twitch account found. Please complete the linking process first.", ephemeral=True)
# return
# discord_uuid = user_data["UUID"]
# twitch_uuid = modules.db.lookup_user(self.db_conn, self.log, identifier=user_data["twitch_user_id"], identifier_type="twitch_user_id")["UUID"]
# if discord_uuid == twitch_uuid:
# await interaction.response.send_message("Your accounts are already fully linked.", ephemeral=True)
# return
# # Merge all records from `twitch_uuid` into `discord_uuid`
# db.merge_uuid_data(self.db_conn, self.log, old_uuid=twitch_uuid, new_uuid=discord_uuid)
# # Delete the old Twitch UUID entry
# db.run_db_operation(self.db_conn, "write", "DELETE FROM users WHERE UUID = ?", (twitch_uuid,), self.log)
# # Confirm the final linking
# await interaction.response.send_message("Your Twitch and Discord accounts are now fully linked.", ephemeral=True)
async def run(self, token):
try:
await super().start(token)
except Exception as e:
globals.log(f"Discord bot error: {e}", "CRITICAL")