OokamiPupV2/bot_twitch.py

325 lines
12 KiB
Python

# bot_twitch.py
import os
import requests
import asyncio
from twitchio.ext import commands
import importlib
import cmd_twitch
import globals
from globals import logger
import modules
import modules.utility
from modules.db import log_message, lookup_user, log_bot_event
twitch_channels = globals.constants.config_data["twitch_channels"]
class TwitchBot(commands.Bot):
def __init__(self):
self.client_id = os.getenv("TWITCH_CLIENT_ID")
self.client_secret = os.getenv("TWITCH_CLIENT_SECRET")
self.token = os.getenv("TWITCH_BOT_TOKEN")
self.refresh_token = os.getenv("TWITCH_REFRESH_TOKEN")
self.log = globals.logger # Use the logging function from bots.py
self.config = globals.constants.config_data
self.db_conn = None # We'll set this later
self.help_data = None # We'll set this later
# 1) Initialize the parent Bot FIRST
super().__init__(
token=self.token,
prefix="!",
initial_channels=twitch_channels
)
logger.info("Twitch bot initiated")
# 2) Then load commands
self.load_commands()
def set_db_connection(self, db_conn):
"""
Store the DB connection so that commands can use it.
"""
self.db_conn = db_conn
async def event_message(self, message):
"""
Called every time a Twitch message is received (chat message in a channel).
"""
if message.echo:
return
try:
author = message.author
if not author:
return
is_bot = False
user_id = str(author.id)
user_name = author.name
display_name = author.display_name or user_name
logger.debug(f"Message detected, attempting UUI lookup on {user_name} ...")
modules.utility.track_user_activity(
db_conn=self.db_conn,
platform="twitch",
user_id=user_id,
username=user_name,
display_name=display_name,
user_is_bot=is_bot
)
logger.debug("... UUI lookup complete.")
log_message(
db_conn=self.db_conn,
identifier=user_id,
identifier_type="twitch_user_id",
message_content=message.content or "",
platform="twitch",
channel=message.channel.name,
attachments="",
platform_message_id=str(message.id) # Include Twitch message ID
)
except Exception as e:
logger.error(f"... UUI lookup failed: {e}")
await self.handle_commands(message)
async def event_ready(self):
logger.info(f"Twitch bot is online as {self.nick}")
modules.utility.list_channels(self)
kami_status = "OokamiKunTV is currently LIVE" if await modules.utility.is_channel_live(self) else "OokamikunTV is currently not streaming"
logger.info(kami_status)
log_bot_event(self.db_conn, "TWITCH_RECONNECTED", "Twitch bot logged in.")
async def event_disconnected(self):
logger.warning("Twitch bot has lost connection!")
log_bot_event(self.db_conn, "TWITCH_DISCONNECTED", "Twitch bot lost connection.")
async def refresh_access_token(self, automatic=False, retries=1):
"""Refresh the Twitch access token and ensure it is applied correctly."""
self.log.info("Attempting to refresh Twitch token...")
url = "https://id.twitch.tv/oauth2/token"
params = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
"grant_type": "refresh_token"
}
for attempt in range(retries + 1):
try:
response = requests.post(url, params=params)
data = response.json()
self.log.debug(f"Twitch token response: {data}")
if "access_token" in data:
_before_token = os.getenv("TWITCH_BOT_TOKEN", "")
self.token = data["access_token"]
self.refresh_token = data.get("refresh_token", self.refresh_token)
os.environ["TWITCH_BOT_TOKEN"] = self.token
os.environ["TWITCH_REFRESH_TOKEN"] = self.refresh_token
self.update_env_file()
await asyncio.sleep(1) # Allow Twitch API time to register the new token
# Ensure bot reloads the updated token
self.token = os.getenv("TWITCH_BOT_TOKEN")
if self.token == _before_token:
self.log.critical("Token did not change after refresh! Avoiding refresh loop.")
return False
self.log.info("Twitch token successfully refreshed.")
# Validate the new token
if not await self.validate_token():
self.log.critical("New token is still invalid, re-auth required.")
if not automatic:
await self.prompt_manual_token()
return False
return True # Successful refresh
elif "error" in data and data["error"] == "invalid_grant":
self.log.critical("Refresh token is invalid or expired; manual re-auth required.")
if not automatic:
await self.prompt_manual_token()
return False
else:
self.log.error(f"Unexpected refresh response: {data}")
if not automatic:
await self.prompt_manual_token()
return False
except Exception as e:
self.log.error(f"Twitch token refresh error: {e}")
if attempt < retries:
self.log.warning(f"Retrying token refresh in 2 seconds... (Attempt {attempt + 1}/{retries})")
await asyncio.sleep(2)
else:
if not automatic:
await self.prompt_manual_token()
return False
async def shutdown_gracefully(self):
"""
Gracefully shuts down the bot, ensuring all resources are cleaned up.
"""
self.log.info("Closing Twitch bot gracefully...")
try:
await self.close() # Closes TwitchIO bot properly
self.log.info("Twitch bot closed successfully.")
except Exception as e:
self.log.error(f"Error during bot shutdown: {e}")
self.log.fatal("Bot has been stopped. Please restart it manually.")
async def validate_token(self):
"""
Validate the current Twitch token by making a test API request.
"""
url = "https://id.twitch.tv/oauth2/validate"
headers = {"Authorization": f"OAuth {self.token}"}
try:
response = requests.get(url, headers=headers)
self.log.debug(f"Token validation response: {response.status_code}, {response.text}")
return response.status_code == 200
except Exception as e:
self.log.error(f"Error during token validation: {e}")
return False
async def prompt_manual_token(self):
"""
Prompt the user in-terminal to manually enter a new Twitch access token.
"""
self.log.warning("Prompting user for manual Twitch token input.")
new_token = input("Enter a new valid Twitch access token: ").strip()
if new_token:
self.token = new_token
os.environ["TWITCH_BOT_TOKEN"] = self.token
self.update_env_file()
self.log.info("New Twitch token entered manually. Please restart the bot.")
else:
self.log.fatal("No valid token entered. Bot cannot continue.")
async def try_refresh_and_reconnect(self) -> bool:
"""
Attempts to refresh the token and reconnect the bot automatically.
Returns True if successful, False if refresh/manual re-auth is needed.
"""
try:
# Refresh the token in the same manner as refresh_access_token()
success = await self.refresh_access_token(automatic=True)
if not success:
return False
# If we got here, we have a valid new token.
# We can call self.start() again in the same run.
self.log.info("Re-initializing the Twitch connection with the new token...")
self._http.token = self.token # Make sure TwitchIO sees the new token
await self.start()
return True
except Exception as e:
self.log.error(f"Auto-reconnect failed after token refresh: {e}")
return False
def update_env_file(self):
"""
Updates the .env file with the new Twitch token.
"""
try:
with open(".env", "r") as file:
lines = file.readlines()
with open(".env", "w") as file:
for line in lines:
if line.startswith("TWITCH_BOT_TOKEN="):
file.write(f"TWITCH_BOT_TOKEN={self.token}\n")
elif line.startswith("TWITCH_REFRESH_TOKEN="):
file.write(f"TWITCH_REFRESH_TOKEN={self.refresh_token}\n")
else:
file.write(line)
logger.info("Updated .env file with new Twitch token.")
except Exception as e:
logger.error(f"Failed to update .env file: {e}")
def load_commands(self):
"""
Load all commands from cmd_twitch.py
"""
try:
cmd_twitch.setup(self)
logger.info("Twitch commands loaded successfully.")
# Now load the help info from dictionary/help_twitch.json
help_json_path = "dictionary/help_twitch.json"
modules.utility.initialize_help_data(
bot=self,
help_json_path=help_json_path,
is_discord=False # Twitch
)
except Exception as e:
logger.error(f"Error loading Twitch commands: {e}")
async def run(self):
"""
Attempt to start the bot once. If the token is invalid, refresh it first,
then re-instantiate a fresh TwitchBot within the same Python process.
This avoids manual restarts or external managers.
"""
# Attempt token refresh before starting the bot
refresh_success = await self.refresh_access_token()
if not refresh_success:
self.log.shutdown("Token refresh failed. Shutting down in the same run.")
return
await asyncio.sleep(1) # Give Twitch a moment to recognize the refreshed token
try:
await self.start()
except Exception as e:
self.log.critical(f"Twitch bot failed to start: {e}")
if "Invalid or unauthorized Access Token passed." in str(e):
self.log.warning("Token became invalid after refresh. Attempting another refresh...")
if not await self.refresh_access_token():
self.log.shutdown("Second refresh attempt failed. Shutting down.")
return
try:
self.log.debug("Closing old bot instance after refresh...")
await self.close()
await asyncio.sleep(1) # give the old WebSocket time to fully close
except Exception as close_err:
self.log.debug(f"Ignored close() error: {close_err}")
self.log.info("Creating a fresh TwitchBot instance with the new token...")
from bot_twitch import TwitchBot
new_bot = TwitchBot()
new_bot.set_db_connection(self.db_conn)
self.log.info("Starting the new TwitchBot in the same run in 3 seconds...")
await asyncio.sleep(1) # final delay
await new_bot.run()
return
else:
self.log.shutdown("Could not connect due to an unknown error. Shutting down.")
return