# bot_twitch.py import os import requests import asyncio from twitchio.ext import commands import importlib import cmd_twitch import globals import modules import modules.utility from modules.db import log_message, lookup_user, log_bot_event twitch_channels = globals.constants.config_data["twitch_channels"] class TwitchBot(commands.Bot): def __init__(self): self.client_id = os.getenv("TWITCH_CLIENT_ID") self.client_secret = os.getenv("TWITCH_CLIENT_SECRET") self.token = os.getenv("TWITCH_BOT_TOKEN") self.refresh_token = os.getenv("TWITCH_REFRESH_TOKEN") self.log = globals.log # Use the logging function from bots.py self.config = globals.constants.config_data self.db_conn = None # We'll set this later self.help_data = None # We'll set this later # 1) Initialize the parent Bot FIRST super().__init__( token=self.token, prefix="!", initial_channels=twitch_channels ) globals.log("Twitch bot initiated") # 2) Then load commands self.load_commands() def set_db_connection(self, db_conn): """ Store the DB connection so that commands can use it. """ self.db_conn = db_conn async def event_message(self, message): """ Called every time a Twitch message is received (chat message in a channel). """ if message.echo: return try: author = message.author if not author: return is_bot = False user_id = str(author.id) user_name = author.name display_name = author.display_name or user_name globals.log(f"Message detected, attempting UUI lookup on {user_name} ...", "DEBUG") modules.utility.track_user_activity( db_conn=self.db_conn, platform="twitch", user_id=user_id, username=user_name, display_name=display_name, user_is_bot=is_bot ) globals.log("... UUI lookup complete.", "DEBUG") log_message( db_conn=self.db_conn, identifier=user_id, identifier_type="twitch_user_id", message_content=message.content or "", platform="twitch", channel=message.channel.name, attachments="", platform_message_id=str(message.id) # Include Twitch message ID ) except Exception as e: globals.log(f"... UUI lookup failed: {e}", "ERROR") await self.handle_commands(message) async def event_ready(self): globals.log(f"Twitch bot is online as {self.nick}") modules.utility.list_channels(self) kami_status = "OokamiKunTV is currently LIVE" if await modules.utility.is_channel_live(self) else "OokamikunTV is currently not streaming" globals.log(kami_status) log_bot_event(self.db_conn, "TWITCH_RECONNECTED", "Twitch bot logged in.") async def event_disconnected(self): globals.log("Twitch bot has lost connection!", "WARNING") log_bot_event(self.db_conn, "TWITCH_DISCONNECTED", "Twitch bot lost connection.") async def refresh_access_token(self, automatic=False): """ Refresh the Twitch access token using the stored refresh token. If 'automatic' is True, do NOT shut down the bot or require manual restart. Return True if success, False if not. """ self.log("Attempting to refresh Twitch token...") url = "https://id.twitch.tv/oauth2/token" params = { "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.refresh_token, "grant_type": "refresh_token" } try: response = requests.post(url, params=params) data = response.json() self.log(f"Twitch token response: {data}", "DEBUG") if "access_token" in data: self.token = data["access_token"] self.refresh_token = data.get("refresh_token", self.refresh_token) os.environ["TWITCH_BOT_TOKEN"] = self.token os.environ["TWITCH_REFRESH_TOKEN"] = self.refresh_token self.update_env_file() # Validate newly refreshed token: if not await self.validate_token(): self.log("New token is still invalid, re-auth required.", "CRITICAL") if not automatic: await self.prompt_manual_token() return False self.log("Twitch token refreshed successfully.") return True elif "error" in data and data["error"] == "invalid_grant": self.log("Refresh token is invalid or expired; manual re-auth required.", "CRITICAL") if not automatic: await self.prompt_manual_token() return False else: self.log(f"Unexpected refresh response: {data}", "ERROR") if not automatic: await self.prompt_manual_token() return False except Exception as e: self.log(f"Twitch token refresh error: {e}", "ERROR") if not automatic: await self.prompt_manual_token() return False async def shutdown_gracefully(self): """ Gracefully shuts down the bot, ensuring all resources are cleaned up. """ self.log("Closing Twitch bot gracefully...", "INFO") try: await self.close() # Closes TwitchIO bot properly self.log("Twitch bot closed successfully.", "INFO") except Exception as e: self.log(f"Error during bot shutdown: {e}", "ERROR") self.log("Bot has been stopped. Please restart it manually.", "FATAL") async def validate_token(self): """ Validate the current Twitch token by making a test API request. """ url = "https://id.twitch.tv/oauth2/validate" headers = {"Authorization": f"OAuth {self.token}"} try: response = requests.get(url, headers=headers) self.log(f"Token validation response: {response.status_code}, {response.text}", "DEBUG") return response.status_code == 200 except Exception as e: self.log(f"Error during token validation: {e}", "ERROR") return False async def prompt_manual_token(self): """ Prompt the user in-terminal to manually enter a new Twitch access token. """ self.log("Prompting user for manual Twitch token input.", "WARNING") new_token = input("Enter a new valid Twitch access token: ").strip() if new_token: self.token = new_token os.environ["TWITCH_BOT_TOKEN"] = self.token self.update_env_file() self.log("New Twitch token entered manually. Please restart the bot.", "INFO") else: self.log("No valid token entered. Bot cannot continue.", "FATAL") async def try_refresh_and_reconnect(self) -> bool: """ Attempts to refresh the token and reconnect the bot automatically. Returns True if successful, False if refresh/manual re-auth is needed. """ try: # Refresh the token in the same manner as refresh_access_token() success = await self.refresh_access_token(automatic=True) if not success: return False # If we got here, we have a valid new token. # We can call self.start() again in the same run. self.log("Re-initializing the Twitch connection with the new token...", "INFO") self._http.token = self.token # Make sure TwitchIO sees the new token await self.start() return True except Exception as e: self.log(f"Auto-reconnect failed after token refresh: {e}", "ERROR") return False def update_env_file(self): """ Updates the .env file with the new Twitch token. """ try: with open(".env", "r") as file: lines = file.readlines() with open(".env", "w") as file: for line in lines: if line.startswith("TWITCH_BOT_TOKEN="): file.write(f"TWITCH_BOT_TOKEN={self.token}\n") elif line.startswith("TWITCH_REFRESH_TOKEN="): file.write(f"TWITCH_REFRESH_TOKEN={self.refresh_token}\n") else: file.write(line) globals.log("Updated .env file with new Twitch token.") except Exception as e: globals.log(f"Failed to update .env file: {e}", "ERROR") def load_commands(self): """ Load all commands from cmd_twitch.py """ try: cmd_twitch.setup(self) globals.log("Twitch commands loaded successfully.") # Now load the help info from dictionary/help_twitch.json help_json_path = "dictionary/help_twitch.json" modules.utility.initialize_help_data( bot=self, help_json_path=help_json_path, is_discord=False # Twitch ) except Exception as e: globals.log(f"Error loading Twitch commands: {e}", "ERROR") async def run(self): """ Attempt to start the bot once. If token is invalid, refresh it, then re-instantiate a fresh TwitchBot in the same Python process. This avoids any manual restarts or external managers. """ try: # Normal attempt: just call self.start() await self.start() except Exception as e: self.log(f"Twitch bot failed to start: {e}", "CRITICAL") # Check if error is invalid token if "Invalid or unauthorized Access Token passed." in str(e): self.log("Attempting token refresh...", "WARNING") refresh_success = await self.refresh_access_token() if not refresh_success: # If refresh truly failed => we can't proceed. # Log a shutdown, do no external restart. self.log("Refresh failed. Shutting down in same run. Token is invalid.", "SHUTDOWN") return # If refresh succeeded, we have a new valid token in .env. # Now we must forcibly close THIS bot instance. try: self.log("Closing old bot instance after refresh...", "DEBUG") await self.close() except Exception as close_err: self.log(f"Ignored close() error: {close_err}", "DEBUG") # Create a brand-new instance, referencing the updated token from .env self.log("Creating a fresh TwitchBot instance with the new token...", "INFO") from bot_twitch import TwitchBot # Re-import or define new_bot = TwitchBot() # Re-run __init__, loads new token from environment new_bot.set_db_connection(self.db_conn) self.log("Starting the new TwitchBot in the same run...", "INFO") await new_bot.run() # Now call *its* run method return # Our job is done else: # Unknown error => you can either do a SHUTDOWN or ignore self.log("Could not connect due to an unknown error. Shutting down in same run...", "SHUTDOWN") return