OokamiPupV2/cmd_discord/poe2trade.py

381 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import discord
from discord.ext import commands
import globals
from globals import logger
from utility.PoE2_wishlist import PoeTradeWatcher
import re, json, asyncio, os
from pathlib import Path
from urllib.parse import quote
watcher = PoeTradeWatcher()
# Load currency metadata from JSON
storage_path = Path(__file__).resolve().parent.parent / "local_storage"
currency_lookup_path = storage_path / "poe_currency_data.json"
with open(currency_lookup_path, "r", encoding="utf-8") as f:
CURRENCY_METADATA = json.load(f)
class PoE2TradeCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
watcher.set_update_callback(self.handle_poe2_updates)
@commands.command(name="poe2trade")
async def poe2trade(self, ctx, *, arg_str: str = ""):
if not arg_str:
await ctx.author.send(embed=self._get_help_embed())
await ctx.reply("I've sent you a DM with usage instructions.")
return
args = arg_str.strip().split()
subcommand = args[0].lower()
user_id = str(ctx.author.id)
try:
if subcommand == "add" and len(args) >= 2:
filter_id = args[1]
custom_name = " ".join(args[2:]).strip() if len(args) > 2 else None
# Validate name (only basic safe characters)
if custom_name and not re.fullmatch(r"[a-zA-Z0-9\s\-\_\!\?\(\)\[\]]+", custom_name):
await ctx.reply("❌ Invalid characters in custom name. Only letters, numbers, spaces, and basic punctuation are allowed.")
return
result = await watcher.add_filter(user_id, filter_id, custom_name)
if result["status"] == "success":
await ctx.reply(f"✅ Filter `{filter_id}` was successfully added to your watchlist.")
logger.info(f"User {user_id} added filter {filter_id} ({custom_name or 'no name'})")
elif result["status"] == "exists":
await ctx.reply(f"⚠️ Filter `{filter_id}` is already on your watchlist.")
logger.info(f"User {user_id} attempted to add existing filter {filter_id} ({custom_name or 'no name'})")
elif result["status"] == "rate_limited":
await ctx.reply("⏱️ You're adding filters too quickly. Please wait a bit before trying again.")
logger.warning(f"User {user_id} adding filters too quickly! Filter {filter_id} ({custom_name or 'no name'})")
elif result["status"] == "limit_reached":
await ctx.reply("🚫 You've reached the maximum number of filters. Remove one before adding another.")
logger.info(f"User {user_id} already at max filters when attempting to add filter {filter_id} ({custom_name or 'no name'})")
elif result["status"] == "warning":
await ctx.reply("⚠️ That filter returns too many results. Add it with better filters or use force mode (not currently supported).")
logger.info(f"User {user_id} attempted to add too broad filter: {filter_id} ({custom_name or 'no name'})")
elif result["status"] == "invalid_session":
await ctx.reply("🔒 The session token is invalid or expired. Please update it via `!poe2trade set POESESSID <token>` (owner-only).")
logger.error(f"POESESSID token invalid when user {user_id} attempted to add filter {filter_id} ({custom_name or 'no name'})")
elif result["status"] == "queued":
await ctx.reply("🕑 The filter has been queued for addition")
logger.error(f"User {user_id} queued filter {filter_id} ({custom_name or 'no name'}) for addition.")
else:
await ctx.reply(f"❌ Failed to add filter `{filter_id}`. Status: {result['status']}")
logger.error(f"An unknown error occured when user {user_id} attempted to add filter {filter_id} ({custom_name or 'no name'})")
elif subcommand == "remove" and len(args) >= 2:
filter_id = args[1]
if filter_id.lower() == "all":
result = watcher.remove_all_filters(user_id)
if result["status"] == "removed":
await ctx.reply("✅ All filters have been removed from your watchlist.")
logger.info(f"User {user_id} removed all filters.")
else:
await ctx.reply("⚠️ You don't have any filters to remove.")
else:
result = watcher.remove_filter(user_id, filter_id)
if result["status"] == "removed":
await ctx.reply(f"✅ Filter `{filter_id}` was successfully removed.")
logger.info(f"User {user_id} removed filter {filter_id}")
elif result["status"] == "not_found":
await ctx.reply(f"⚠️ Filter `{filter_id}` was not found on your watchlist.")
else:
await ctx.reply(f"❌ Failed to remove filter `{filter_id}`. Status: {result['status']}")
elif subcommand == "list":
filters = watcher.watchlists.get(user_id, [])
if not filters:
await ctx.reply("You don't have any watchlisted filters.")
else:
embed = discord.Embed(title="Your PoE2 Watchlist", color=discord.Color.orange())
for fid in filters:
name = watcher.get_filter_name(user_id, fid)
label = f"{fid} — *{name}*" if name else fid
next_time = watcher.get_next_query_time(user_id, fid)
if next_time:
time_line = f"Next check: ~<t:{next_time}:R>\n"
else:
time_line = ""
embed.add_field(
name=label,
value=f"{time_line}[Open in Trade Site](https://www.pathofexile.com/trade2/search/poe2/Standard/{fid})\n`!poe2trade remove {fid}`",
inline=False
)
await ctx.reply(embed=embed)
logger.info(f"User {user_id} requested their watchlist.")
elif subcommand == "set" and len(args) >= 3:
if not self._is_owner(user_id):
await ctx.reply("Only the server owner can modify settings.")
return
key, value = args[1], args[2]
parsed_value = int(value) if value.isdigit() else value
result = await watcher.set_setting(key, parsed_value)
if result["status"] == "ok":
logger.info(f"Setting updated: {key} = {value}")
await ctx.reply(f"✅ Setting `{key}` updated successfully.")
elif result["status"] == "invalid_key":
await ctx.reply(f"❌ Setting `{key}` is not recognized.")
else:
await ctx.reply("⚠️ Something went wrong while updating the setting.")
elif subcommand == "settings":
if not self._is_owner(user_id):
await ctx.reply("Only the server owner can view settings.")
logger.info(f"User {user_id} requested (but not allowed) to see current settings.")
return
result = watcher.get_settings()
embed = discord.Embed(title="Current `PoE 2 Trade` Settings", color=discord.Color.teal())
embed.set_footer(text="Use `!poe2trade set <key> <value>` to change a setting (owner-only)")
status = result.get("status", "unknown")
settings = result.get("settings", {})
embed = discord.Embed(title="Current PoE 2 Trade Settings", color=discord.Color.teal())
embed.add_field(name="Status", value=f"`{status}`", inline=False)
user = self.bot.get_user(int(user_id))
if user:
owner_display_name = user.display_name # or user.name
else:
owner_display_name = f"Unknown User"
for key, value in settings.items():
if key == "POESESSID":
value = f"`{value[:4]}...{value[-4:]}` (*Obfuscated*)"
if key == "AUTO_POLL_INTERVAL":
value = f"`{value}` seconds"
if key == "MAX_SAFE_RESULTS":
value = f"`{value}` max results"
if key == "USER_ADD_INTERVAL":
value = f"`{value}` seconds"
if key == "MAX_FILTERS_PER_USER":
value = f"`{value}` filters"
if key == "RATE_LIMIT_INTERVAL":
value = f"`{value}` seconds"
if key == "OWNER_ID":
value = f"`{value}` (**{owner_display_name}**)"
if key == "LEGACY_WEBHOOK_URL":
value = f"`{value[:23]}...{value[-10:]}` (*Obfuscated*)"
embed.add_field(name=key, value=f"{value}", inline=False)
embed.set_footer(text="Use `!poe2trade set <key> <value>` to change a setting (owner-only)")
await ctx.reply(embed=embed)
logger.info(f"User {user_id} requested and received current settings.")
elif subcommand == "pause":
if watcher.pause_user(user_id):
await ctx.reply("⏸️ Your filter notifications are now paused.")
logger.info(f"User {user_id} paused their filters.")
else:
await ctx.reply("⏸️ Your filters were already paused.")
elif subcommand == "resume":
if watcher.resume_user(user_id):
await ctx.reply("▶️ Filter notifications have been resumed.")
logger.info(f"User {user_id} resumed their filters.")
else:
await ctx.reply("▶️ Your filters were not paused.")
elif subcommand == "clearcache":
if not self._is_owner(user_id):
await ctx.reply("❌ Only the bot owner can use this command.")
return
if "-y" in args:
result = watcher.clear_seen_cache(user_id)
if result["status"] == "success":
await ctx.reply("🧹 All cached seen listings have been cleared.")
logger.info(f"User {user_id} cleared their listing cache.")
else:
await ctx.reply("⚠️ Cache clearing failed or confirmation window expired.")
else:
watcher.initiate_cache_clear(user_id)
await ctx.reply(
"⚠️ Are you sure you want to clear all listing cache?\n"
"Run `!poe2trade clearcache -y` within 60 seconds to confirm."
)
logger.info(f"User {user_id} requested cache clear confirmation.")
else:
await ctx.reply("Unknown subcommand. Try `!poe2trade` to see help.")
except Exception as e:
logger.error(f"Error in !poe2trade command: {e}", exc_info=True)
await ctx.reply("Something went wrong handling your command.")
def _get_help_embed(self):
embed = discord.Embed(title="PoE2 Trade Tracker Help", color=discord.Color.blue())
embed.description = (
"This bot allows you to track Path of Exile 2 trade listings.\n\n"
"**Usage:**\n"
"`!poe2trade add <filter_id> [filter_nickname]` — Adds a new filter to your watchlist.\n"
"`!poe2trade remove <filter_id | all>` — Removes a filter or all.\n"
"`!poe2trade list` — Shows your currently watched filters.\n"
"`!poe2trade pause` — Temporarily stop notifications for your filters.\n"
"`!poe2trade resume` — Resume notifications for your filters.\n"
"`!poe2trade set <key> <value>` — *(Owner only)* Change a system setting.\n"
"`!poe2trade settings` — *(Owner only)* View current settings.\n"
"`!poe2trade clearcache` — *(Owner only)* Clear current data cache\n\n"
"**How to use filters:**\n"
"Go to the official trade site: https://www.pathofexile.com/trade2/search/poe2/Standard\n"
"Apply the filters you want (stat filters, name, price, etc).\n"
"Copy the **filter ID** at the end of the URL.\n"
"Example: `https://www.pathofexile.com/trade2/search/poe2/Standard/Ezn0zLzf5`\n"
"Filter ID = `Ezn0zLzf5`.\n"
"Then run: `!poe2trade add Ezn0zLzf5`\n\n"
"**Important note**\n"
"Only the first 10 results from the filter is checked.\n"
"This means if you sort by eg. price, only the 10 cheapest results will be checked.\n"
"If you sort by a stat from highest to lowest on the other hand, only the 10 results \n"
"with the highest stat will be checked. So double-check your filter sorting!"
)
return embed
async def handle_poe2_updates(self, data):
EMBED_CHAR_LIMIT = 6000
MAX_EMBEDS_PER_MESSAGE = 10
for (user_id, filter_id), payload in data.items():
user = self.bot.get_user(int(user_id))
if not user:
logger.warning(f"User ID {user_id} not found in cache.")
continue
embeds = []
current_char_count = 0
search_id = payload["search_id"]
items = payload["items"]
for item in items:
try:
listing = item.get("listing", {})
item_data = item.get("item", {})
price_data = listing.get("price", {})
account_data = listing.get("account", {})
item_name = item_data.get("name") or item_data.get("typeLine", "Unknown Item")
item_type = item_data.get("typeLine", "Unknown")
full_name = item_name if item_name.strip().lower() == item_type.strip().lower() else f"{item_name} ({item_type})"
rarity = item_data.get("frameType", 2)
rarity_colors = {
0: discord.Color.light_grey(), # Normal
1: discord.Color.blue(), # Magic
2: discord.Color.gold(), # Rare
3: discord.Color.dark_orange(), # Unique
}
embed = discord.Embed(
title=full_name,
url=f"https://www.pathofexile.com/trade2/search/poe2/Standard/{filter_id}",
color=rarity_colors.get(rarity, discord.Color.gold())
)
icon_url = item_data.get("icon")
if icon_url:
embed.set_thumbnail(url=icon_url)
amount = price_data.get("amount", "?")
currency = price_data.get("currency", "?")
if currency in CURRENCY_METADATA:
name = CURRENCY_METADATA[currency]["name"]
currency_field = f"{amount} × {name}"
else:
currency_field = f"{amount} {currency}"
embed.add_field(name="Price", value=currency_field, inline=True)
seller_name = account_data.get("name", "Unknown Seller")
seller_url = f"https://www.pathofexile.com/account/view-profile/{quote(seller_name)}"
embed.add_field(name="Seller", value=f"[{seller_name}]({seller_url})", inline=True)
embed.add_field(name="Item Level", value=str(item_data.get("ilvl", "?")), inline=True)
def clean_stat(stat: str) -> str:
stat = re.sub(r"\[.*?\|(.+?)\]", r"\1", stat)
stat = re.sub(r"\[(.+?)\]", r"\1", stat)
return stat
stats = []
if "implicitMods" in item["item"]:
stats.append("__**Implicit Mods**__")
stats.extend(item["item"]["implicitMods"])
if "explicitMods" in item["item"]:
stats.append("__**Explicit Mods**__")
stats.extend(item["item"]["explicitMods"])
if stats:
stats_str = "\n".join(clean_stat(stat) for stat in stats)
embed.add_field(name="Stats", value=stats_str, inline=False)
next_check = watcher.get_next_query_time(user_id, filter_id)
timestamp_str = f"\nNext check: ~<t:{next_check}:R>" if next_check else "Unknown"
embed.add_field(
name="*Remove from watchlist*:",
value=f"`!poe2trade remove {filter_id}`{timestamp_str}",
inline=False
)
filter_name = watcher.get_filter_name(str(user_id), filter_id)
footer_lines = ["New PoE 2 trade listing"]
if filter_name:
footer_lines.insert(0, f"Filter name: {filter_name}")
embed.set_footer(text="\n".join(footer_lines))
est_char_len = len(embed.title or "") + sum(len(f.name or "") + len(f.value or "") for f in embed.fields)
if (len(embeds) + 1 > MAX_EMBEDS_PER_MESSAGE) or (current_char_count + est_char_len > EMBED_CHAR_LIMIT):
await user.send(embeds=embeds)
embeds = []
current_char_count = 0
embeds.append(embed)
current_char_count += est_char_len
except Exception as e:
logger.warning(f"Failed to process or build embed for user {user_id}: {e}")
if embeds:
try:
await user.send(embeds=embeds)
logger.info(f"Sent {len(embeds)} embeds to user {user_id} for filter {filter_id}")
except Exception as e:
logger.warning(f"Failed to send batched embeds to user {user_id}: {e}")
def _is_owner(self, user_id):
"""
Returns True or False depending on user ID comparison to owner ID
"""
owner_id = watcher.settings.get("OWNER_ID")
if str(user_id) != owner_id:
return False
elif str(user_id) == owner_id:
return True
else:
logger.error(f"Failed to verify author as owner! user_id: '{user_id}', owner_id: '{owner_id}'")
async def setup(bot):
await bot.add_cog(PoE2TradeCog(bot))