Finalized database restructure, fixed Twitch auth issue

- Ironed out some issues with registering certain events into the database
- Fixed the old Twitch re-auth token issue. Now automatically renews it and initiates the bot with it.

These issues has been pestering the project a while now as they required some workarounds. Should work smoother from here on out! (I hope ...)
kami_dev
Kami 2025-03-01 23:14:10 +01:00
parent d0313a6a92
commit 50617ef9ab
6 changed files with 251 additions and 103 deletions

View File

@ -10,7 +10,7 @@ import globals
import modules
import modules.utility
from modules.db import log_message, lookup_user, log_bot_event
from modules.db import log_message, lookup_user, log_bot_event, log_discord_activity
primary_guild = globals.constants.primary_discord_guild()
@ -231,41 +231,79 @@ class DiscordBot(commands.Bot):
async def update_activity(self):
"""Sets the bot's activity based on settings."""
mode = self.settings["activity_mode"]
mode = self.settings.get("activity_mode", 0)
# Stop rotating activity loop if it's running
if self.change_rotating_activity.is_running():
self.change_rotating_activity.stop()
if mode == 0:
# Disable activity
await self.change_presence(activity=None)
self.log("Activity disabled", "DEBUG")
elif mode == 1:
# Set static activity
activity_data = self.settings["static_activity"]
activity = self.get_activity(activity_data["type"], activity_data["name"])
await self.change_presence(activity=activity)
self.log(f"Static activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
# Static activity
activity_data = self.settings.get("static_activity", {})
if activity_data:
activity = self.get_activity(activity_data.get("type"), activity_data.get("name"))
await self.change_presence(activity=activity)
self.log(f"Static activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
else:
await self.change_presence(activity=None)
self.log("No static activity defined", "DEBUG")
elif mode == 2:
# Start rotating activity task
if not self.change_rotating_activity.is_running():
# Rotating activity
activities = self.settings.get("rotating_activities", [])
if activities:
self.change_rotating_activity.change_interval(seconds=self.settings.get("rotation_interval", 300))
self.change_rotating_activity.start()
self.log("Rotating activity mode enabled", "DEBUG")
self.log("Rotating activity mode enabled", "DEBUG")
else:
self.log("No rotating activities defined, falling back to static.", "INFO")
await self.update_activity_static()
elif mode == 3:
# Check for dynamic activity
await self.set_dynamic_activity()
# Dynamic activity with fallback
if not await self.set_dynamic_activity():
self.log("Dynamic activity unavailable, falling back.", "INFO")
# Fallback to rotating or static
if self.settings.get("rotating_activities"):
self.change_rotating_activity.start()
self.log("Falling back to rotating activity.", "DEBUG")
else:
await self.update_activity_static()
@tasks.loop(seconds=600) # Default to 10 minutes
else:
self.log("Invalid activity mode, defaulting to disabled.", "WARNING")
await self.change_presence(activity=None)
async def update_activity_static(self):
"""Fallback to static activity if available."""
activity_data = self.settings.get("static_activity", {})
if activity_data:
activity = self.get_activity(activity_data.get("type"), activity_data.get("name"))
await self.change_presence(activity=activity)
self.log(f"Static activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
else:
await self.change_presence(activity=None)
self.log("No static activity defined, activity disabled.", "DEBUG")
@tasks.loop(seconds=300) # Default to 5 minutes
async def change_rotating_activity(self):
"""Rotates activities every set interval."""
activities = self.settings["rotating_activities"]
activities = self.settings.get("rotating_activities", [])
if not activities:
self.log("No rotating activities available, stopping rotation.", "INFO")
self.change_rotating_activity.stop()
return
# Pick the next activity
# Rotate activity
activity_data = activities.pop(0)
activities.append(activity_data) # Move to the end of the list
activity = self.get_activity(activity_data["type"], activity_data["name"])
activity = self.get_activity(activity_data.get("type"), activity_data.get("name"))
await self.change_presence(activity=activity)
self.log(f"Rotating activity: {activity_data['type']} {activity_data['name']}", "DEBUG")
@ -274,16 +312,17 @@ class DiscordBot(commands.Bot):
twitch_live = await modules.utility.is_channel_live(self)
if twitch_live:
activity_data = self.settings["dynamic_activities"]["twitch_live"]
activity_data = self.settings["dynamic_activities"].get("twitch_live")
else:
# activity_data = self.settings["dynamic_activities"].get("default_idle", None)
if not self.change_rotating_activity.is_running():
self.change_rotating_activity.start()
activity_data = self.settings["dynamic_activities"].get("default_idle")
if activity_data:
activity = self.get_activity(activity_data["type"], activity_data["name"], activity_data.get("url"))
activity = self.get_activity(activity_data.get("type"), activity_data.get("name"), activity_data.get("url"))
await self.change_presence(activity=activity)
self.log(f"Dynamic activity set: {activity_data['type']} {activity_data['name']}", "DEBUG")
return True # Dynamic activity was set
return False # No dynamic activity available
def get_activity(self, activity_type, name, url=None):
"""Returns a discord activity object based on type, including support for Custom Status."""
@ -296,6 +335,7 @@ class DiscordBot(commands.Bot):
}
return activity_map.get(activity_type, discord.Game(name="around in Discord"))
async def on_voice_state_update(self, member, before, after):
"""
Tracks user joins, leaves, mutes, deafens, streams, and voice channel moves.
@ -308,7 +348,7 @@ class DiscordBot(commands.Bot):
user_uuid = modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
globals.log(f"User {member.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "WARNING")
globals.log(f"User {member.name} ({discord_user_id}) not found in 'users'. Attempting to add...", "INFO")
modules.utility.track_user_activity(
db_conn=self.db_conn,
platform="discord",
@ -319,39 +359,41 @@ class DiscordBot(commands.Bot):
)
user_uuid= modules.db.lookup_user(self.db_conn, identifier=discord_user_id, identifier_type="discord_user_id", target_identifier="UUID")
if not user_uuid:
globals.log(f"ERROR: Failed to associate {member.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
globals.log(f"Failed to associate {member.name} ({discord_user_id}) with a UUID. Skipping activity log.", "WARNING")
return # Prevent logging with invalid UUID
if user_uuid:
globals.log(f"Successfully added {member.name} ({discord_user_id}) to the UUI database.", "INFO")
# Detect join and leave events
if before.channel is None and after.channel is not None:
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "JOIN", after.channel.name)
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, "JOIN", after.channel.name)
elif before.channel is not None and after.channel is None:
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "LEAVE", before.channel.name)
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, "LEAVE", before.channel.name)
# Detect VC moves (self/moved)
if before.channel and after.channel and before.channel != after.channel:
move_detail = f"{before.channel.name} -> {after.channel.name}"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, "VC_MOVE", after.channel.name, move_detail)
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, "VC_MOVE", after.channel.name, move_detail)
# Detect mute/unmute
if before.self_mute != after.self_mute:
mute_action = "MUTE" if after.self_mute else "UNMUTE"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, mute_action, voice_channel)
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, mute_action, voice_channel)
# Detect deafen/undeafen
if before.self_deaf != after.self_deaf:
deaf_action = "DEAFEN" if after.self_deaf else "UNDEAFEN"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, deaf_action, voice_channel)
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, deaf_action, voice_channel)
# Detect streaming
if before.self_stream != after.self_stream:
stream_action = "STREAM_START" if after.self_stream else "STREAM_STOP"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, stream_action, voice_channel)
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, stream_action, voice_channel)
# Detect camera usage
if before.self_video != after.self_video:
camera_action = "CAMERA_ON" if after.self_video else "CAMERA_OFF"
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, camera_action, voice_channel)
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, camera_action, voice_channel)
async def on_presence_update(self, before, after):
@ -362,6 +404,10 @@ class DiscordBot(commands.Bot):
if not after.guild: # Ensure it's in a guild (server)
return
if before.activities == after.activities and before.status == after.status:
# No real changes, skip
return
guild_id = str(after.guild.id)
discord_user_id = str(after.id)
@ -387,6 +433,8 @@ class DiscordBot(commands.Bot):
if not user_uuid:
globals.log(f"ERROR: Failed to associate {after.name} ({discord_user_id}) with a UUID. Skipping activity log.", "ERROR")
return
if user_uuid:
globals.log(f"Successfully added {after.name} ({discord_user_id}) to the UUI database.", "INFO")
# Check all activities
new_activity = None
@ -412,9 +460,9 @@ class DiscordBot(commands.Bot):
old_activity = ("STREAM_STOP", o_activity.game or "Sharing screen")
if new_activity:
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, new_activity[0], None, new_activity[1])
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, new_activity[0], None, new_activity[1])
if old_activity:
modules.db.log_discord_activity(self.db_conn, guild_id, user_uuid, old_activity[0], None, old_activity[1])
modules.db.log_discord_activity(self.db_conn, guild_id, discord_user_id, old_activity[0], None, old_activity[1])
# async def start_account_linking(self, interaction: discord.Interaction):
# """Starts the linking process by generating a link code and displaying instructions."""

View File

@ -22,7 +22,7 @@ class TwitchBot(commands.Bot):
self.refresh_token = os.getenv("TWITCH_REFRESH_TOKEN")
self.log = globals.log # Use the logging function from bots.py
self.config = globals.constants.config_data
self.db_conn = None # We'll set this
self.db_conn = None # We'll set this later
self.help_data = None # We'll set this later
# 1) Initialize the parent Bot FIRST
@ -89,7 +89,8 @@ class TwitchBot(commands.Bot):
from modules.db import log_message
log_message(
db_conn=self.db_conn,
user_uuid=user_uuid,
identifier=str(message.author.id),
identifier_type="twitch_user_id",
message_content=message.content or "",
platform="twitch",
channel=message.channel.name,
@ -114,12 +115,13 @@ class TwitchBot(commands.Bot):
globals.log("Twitch bot has lost connection!", "WARNING")
log_bot_event(self.db_conn, "TWITCH_DISCONNECTED", "Twitch bot lost connection.")
async def refresh_access_token(self):
async def refresh_access_token(self, automatic=False):
"""
Refreshes the Twitch access token using the stored refresh token.
Retries up to 3 times before logging a fatal error.
Refresh the Twitch access token using the stored refresh token.
If 'automatic' is True, do NOT shut down the bot or require manual restart.
Return True if success, False if not.
"""
globals.log("Attempting to refresh Twitch token...")
self.log("Attempting to refresh Twitch token...")
url = "https://id.twitch.tv/oauth2/token"
params = {
@ -129,39 +131,108 @@ class TwitchBot(commands.Bot):
"grant_type": "refresh_token"
}
for attempt in range(3): # Attempt up to 3 times
try:
response = requests.post(url, params=params)
data = response.json()
try:
response = requests.post(url, params=params)
data = response.json()
self.log(f"Twitch token response: {data}", "DEBUG")
if "access_token" in data:
self.token = data["access_token"]
self.refresh_token = data.get("refresh_token", self.refresh_token)
if "access_token" in data:
self.token = data["access_token"]
self.refresh_token = data.get("refresh_token", self.refresh_token)
os.environ["TWITCH_BOT_TOKEN"] = self.token
os.environ["TWITCH_REFRESH_TOKEN"] = self.refresh_token
self.update_env_file()
os.environ["TWITCH_BOT_TOKEN"] = self.token
os.environ["TWITCH_REFRESH_TOKEN"] = self.refresh_token
self.update_env_file()
# Validate newly refreshed token:
if not await self.validate_token():
self.log("New token is still invalid, re-auth required.", "CRITICAL")
if not automatic:
await self.prompt_manual_token()
return False
globals.log("Twitch token refreshed successfully. Restarting bot...")
self.log("Twitch token refreshed successfully.")
return True
# Restart the TwitchIO connection
try:
await self.close() # Close the old connection
except Exception as e:
globals.log(f"refresh_access_token() failed during close attempt: {e}", "WARNING")
await self.start() # Restart with the new token
return # Exit function after successful refresh
else:
globals.log(f"Twitch token refresh failed (Attempt {attempt+1}/3): {data}", "WARNING")
elif "error" in data and data["error"] == "invalid_grant":
self.log("Refresh token is invalid or expired; manual re-auth required.", "CRITICAL")
if not automatic:
await self.prompt_manual_token()
return False
else:
self.log(f"Unexpected refresh response: {data}", "ERROR")
if not automatic:
await self.prompt_manual_token()
return False
except Exception as e:
globals.log(f"Twitch token refresh error (Attempt {attempt+1}/3): {e}", "ERROR")
except Exception as e:
self.log(f"Twitch token refresh error: {e}", "ERROR")
if not automatic:
await self.prompt_manual_token()
return False
await asyncio.sleep(10) # Wait before retrying
# If all attempts fail, log error
globals.log("Twitch token refresh failed after 3 attempts.", "FATAL")
async def shutdown_gracefully(self):
"""
Gracefully shuts down the bot, ensuring all resources are cleaned up.
"""
self.log("Closing Twitch bot gracefully...", "INFO")
try:
await self.close() # Closes TwitchIO bot properly
self.log("Twitch bot closed successfully.", "INFO")
except Exception as e:
self.log(f"Error during bot shutdown: {e}", "ERROR")
self.log("Bot has been stopped. Please restart it manually.", "FATAL")
async def validate_token(self):
"""
Validate the current Twitch token by making a test API request.
"""
url = "https://id.twitch.tv/oauth2/validate"
headers = {"Authorization": f"OAuth {self.token}"}
try:
response = requests.get(url, headers=headers)
self.log(f"Token validation response: {response.status_code}, {response.text}", "DEBUG")
return response.status_code == 200
except Exception as e:
self.log(f"Error during token validation: {e}", "ERROR")
return False
async def prompt_manual_token(self):
"""
Prompt the user in-terminal to manually enter a new Twitch access token.
"""
self.log("Prompting user for manual Twitch token input.", "WARNING")
new_token = input("Enter a new valid Twitch access token: ").strip()
if new_token:
self.token = new_token
os.environ["TWITCH_BOT_TOKEN"] = self.token
self.update_env_file()
self.log("New Twitch token entered manually. Please restart the bot.", "INFO")
else:
self.log("No valid token entered. Bot cannot continue.", "FATAL")
async def try_refresh_and_reconnect(self) -> bool:
"""
Attempts to refresh the token and reconnect the bot automatically.
Returns True if successful, False if refresh/manual re-auth is needed.
"""
try:
# Refresh the token in the same manner as refresh_access_token()
success = await self.refresh_access_token(automatic=True)
if not success:
return False
# If we got here, we have a valid new token.
# We can call self.start() again in the same run.
self.log("Re-initializing the Twitch connection with the new token...", "INFO")
self._http.token = self.token # Make sure TwitchIO sees the new token
await self.start()
return True
except Exception as e:
self.log(f"Auto-reconnect failed after token refresh: {e}", "ERROR")
return False
def update_env_file(self):
@ -207,33 +278,46 @@ class TwitchBot(commands.Bot):
async def run(self):
"""
Run the Twitch bot, refreshing tokens if needed.
Attempt to start the bot once. If token is invalid, refresh it,
then re-instantiate a fresh TwitchBot in the same Python process.
This avoids any manual restarts or external managers.
"""
try:
# Normal attempt: just call self.start()
await self.start()
retries = 0
while True:
if retries > 3:
globals.log(f"Twitch bot failed to connect after {retries} attempts.", "CIRITCAL")
break # Break loop if repeatedly failing to connect to Twitch
try:
await self.start()
#while True:
# await self.refresh_access_token()
# await asyncio.sleep(10800) # Refresh every 3 hours
except Exception as e:
retries += 1
globals.log(f"Twitch bot failed to start: {e}", "CRITICAL")
if "Invalid or unauthorized Access Token passed." in str(e):
try:
await self.refresh_access_token()
globals.log("Retrying bot connection after token refresh...", "INFO")
await self.start() # Restart connection with new token
return # Exit retry loop
except Exception as e:
globals.log(f"Unable to refresh Twitch token! Twitch bot will be offline!", "CRITICAL")
if self._keeper:
self._keeper.cancel()
if "'NoneType' object has no attribute 'cancel'" in str(e):
globals.log(f"The Twitch bot experienced an initialization glitch. Try starting again", "FATAL")
await asyncio.sleep(5) # Wait before retrying to authenticate
except Exception as e:
self.log(f"Twitch bot failed to start: {e}", "CRITICAL")
# Check if error is invalid token
if "Invalid or unauthorized Access Token passed." in str(e):
self.log("Attempting token refresh...", "WARNING")
refresh_success = await self.refresh_access_token()
if not refresh_success:
# If refresh truly failed => we can't proceed.
# Log a shutdown, do no external restart.
self.log("Refresh failed. Shutting down in same run. Token is invalid.", "SHUTDOWN")
return
# If refresh succeeded, we have a new valid token in .env.
# Now we must forcibly close THIS bot instance.
try:
self.log("Closing old bot instance after refresh...", "DEBUG")
await self.close()
except Exception as close_err:
self.log(f"Ignored close() error: {close_err}", "DEBUG")
# Create a brand-new instance, referencing the updated token from .env
self.log("Creating a fresh TwitchBot instance with the new token...", "INFO")
from bot_twitch import TwitchBot # Re-import or define
new_bot = TwitchBot() # Re-run __init__, loads new token from environment
new_bot.set_db_connection(self.db_conn)
self.log("Starting the new TwitchBot in the same run...", "INFO")
await new_bot.run() # Now call *its* run method
return # Our job is done
else:
# Unknown error => you can either do a SHUTDOWN or ignore
self.log("Could not connect due to an unknown error. Shutting down in same run...", "SHUTDOWN")
return

10
bots.py
View File

@ -70,7 +70,7 @@ async def main():
# Create both bots
discord_bot = DiscordBot()
#twitch_bot = TwitchBot()
twitch_bot = TwitchBot()
# Log startup
utility.log_bot_startup(db_conn)
@ -78,7 +78,7 @@ async def main():
# Provide DB connection to both bots
try:
discord_bot.set_db_connection(db_conn)
#twitch_bot.set_db_connection(db_conn)
twitch_bot.set_db_connection(db_conn)
globals.log(f"Initialized database connection to both bots")
except Exception as e:
globals.log(f"Unable to initialize database connection to one or both bots: {e}", "FATAL")
@ -86,7 +86,7 @@ async def main():
globals.log("Starting Discord and Twitch bots...")
discord_task = asyncio.create_task(discord_bot.run(os.getenv("DISCORD_BOT_TOKEN")))
#twitch_task = asyncio.create_task(twitch_bot.run())
twitch_task = asyncio.create_task(twitch_bot.run())
from modules.utility import dev_func
enable_dev_func = False
@ -94,8 +94,8 @@ async def main():
dev_func_result = dev_func(db_conn, enable_dev_func)
globals.log(f"dev_func output: {dev_func_result}")
#await asyncio.gather(discord_task, twitch_task)
await asyncio.gather(discord_task)
await asyncio.gather(discord_task, twitch_task)
#await asyncio.gather(discord_task)
if __name__ == "__main__":
try:

View File

@ -77,6 +77,8 @@ def log(message: str, level="INFO", exec_info=False, linebreaks=False):
ERROR - A non-critical error has occurred.
CRITICAL - A critical, but non-fatal, error occurred.
FATAL - Fatal error; program exits after logging this.
RESTART - Graceful restart.
SHUTDOWN - Graceful exit.
See:
config.json for further configuration options under "logging".
@ -87,7 +89,7 @@ def log(message: str, level="INFO", exec_info=False, linebreaks=False):
# Hard-coded options/settings (can be expanded as needed)
default_level = "INFO" # Fallback log level
allowed_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"}
allowed_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL", "RESTART", "SHUTDOWN"}
# Ensure valid level
if level not in allowed_levels:
@ -153,6 +155,14 @@ def log(message: str, level="INFO", exec_info=False, linebreaks=False):
print("!!! FATAL ERROR LOGGED, SHUTTING DOWN !!!")
sys.exit(1)
if level == "RESTART":
print("!!! RESTART LOG LEVEL TRIGGERED, EXITING!!!")
sys.exit(0)
if level == "SHUTDOWN":
print("!!! SHUTDOWN LOG LEVEL TRIGGERED, EXITING!!!")
sys.exit(0)
def reset_curlogfile():
"""
Clear the current log file.

View File

@ -409,8 +409,10 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
If the lookup fails or the parameters are invalid: None.
"""
PRINT_QUERY_DEBUG = False
# Debug: Log the inputs
globals.log(f"lookup_user() called with: identifier='{identifier}', identifier_type='{identifier_type}', target_identifier='{target_identifier}'", "DEBUG")
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() called with: identifier='{identifier}', identifier_type='{identifier_type}', target_identifier='{target_identifier}'", "DEBUG")
# Define platform type and column mappings
platform_map = {
@ -468,13 +470,13 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
params = (platform_type, identifier)
# Debug: Log the query and parameters
globals.log(f"lookup_user() executing query: {query} with params={params}", "DEBUG")
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() executing query: {query} with params={params}", "DEBUG")
# Run the query
rows = run_db_operation(db_conn, "read", query, params)
# Debug: Log the result of the query
globals.log(f"lookup_user() query result: {rows}", "DEBUG")
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() query result: {rows}", "DEBUG")
# Handle no result case
if not rows:
@ -496,19 +498,19 @@ def lookup_user(db_conn, identifier: str, identifier_type: str, target_identifie
}
# Debug: Log the constructed user data
globals.log(f"lookup_user() constructed user_data: {user_data}", "DEBUG")
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() constructed user_data: {user_data}", "DEBUG")
# If target_identifier is provided, return just that value
if target_identifier:
target_identifier = target_identifier.upper() # Force uppercase for consistency
if target_identifier in user_data:
globals.log(f"lookup_user() returning target_identifier='{target_identifier}' with value='{user_data[target_identifier]}'", "DEBUG")
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() returning target_identifier='{target_identifier}' with value='{user_data[target_identifier]}'", "DEBUG")
return user_data[target_identifier]
else:
globals.log(f"lookup_user error: target_identifier '{target_identifier}' not found in user_data. Available keys: {list(user_data.keys())}", "WARNING")
return None
globals.log(f"lookup_user() returning full user_data: {user_data}", "DEBUG")
if PRINT_QUERY_DEBUG: globals.log(f"lookup_user() returning full user_data: {user_data}", "DEBUG")
return user_data

View File

@ -6,5 +6,9 @@
"ookamipup": {
"commands_filter_mode": "exclude",
"commands_filtered": []
},
"packcommunity": {
"commands_filter_mode": "include",
"commands_filtered": ["howl"]
}
}