import discord from discord.ext import commands import globals from globals import logger from utility.PoE2_wishlist import PoeTradeWatcher import re, json, asyncio, os from pathlib import Path from urllib.parse import quote watcher = PoeTradeWatcher() # Load currency metadata from JSON storage_path = Path(__file__).resolve().parent.parent / "local_storage" currency_lookup_path = storage_path / "poe_currency_data.json" with open(currency_lookup_path, "r", encoding="utf-8") as f: CURRENCY_METADATA = json.load(f) class PoE2TradeCog(commands.Cog): def __init__(self, bot): self.bot = bot watcher.set_update_callback(self.handle_poe2_updates) @commands.command(name="poe2trade") async def poe2trade(self, ctx, *, arg_str: str = ""): if not arg_str: await ctx.author.send(embed=self._get_help_embed()) await ctx.reply("I've sent you a DM with usage instructions.") return args = arg_str.strip().split() subcommand = args[0].lower() user_id = str(ctx.author.id) try: if subcommand == "add" and len(args) >= 2: filter_id = args[1] custom_name = " ".join(args[2:]).strip() if len(args) > 2 else None # Validate name (only basic safe characters) if custom_name and not re.fullmatch(r"[a-zA-Z0-9\s\-\_\!\?\(\)\[\]]+", custom_name): await ctx.reply("❌ Invalid characters in custom name. Only letters, numbers, spaces, and basic punctuation are allowed.") return result = await watcher.add_filter(user_id, filter_id, custom_name) if result["status"] == "success": await ctx.reply(f"✅ Filter `{filter_id}` was successfully added to your watchlist.") logger.info(f"User {user_id} added filter {filter_id} ({custom_name or 'no name'})") elif result["status"] == "exists": await ctx.reply(f"⚠️ Filter `{filter_id}` is already on your watchlist.") logger.info(f"User {user_id} attempted to add existing filter {filter_id} ({custom_name or 'no name'})") elif result["status"] == "rate_limited": await ctx.reply("⏱️ You're adding filters too quickly. Please wait a bit before trying again.") logger.warning(f"User {user_id} adding filters too quickly! Filter {filter_id} ({custom_name or 'no name'})") elif result["status"] == "limit_reached": await ctx.reply("🚫 You've reached the maximum number of filters. Remove one before adding another.") logger.info(f"User {user_id} already at max filters when attempting to add filter {filter_id} ({custom_name or 'no name'})") elif result["status"] == "warning": await ctx.reply("⚠️ That filter returns too many results. Add it with better filters or use force mode (not currently supported).") logger.info(f"User {user_id} attempted to add too broad filter: {filter_id} ({custom_name or 'no name'})") elif result["status"] == "invalid_session": await ctx.reply("🔒 The session token is invalid or expired. Please update it via `!poe2trade set POESESSID ` (owner-only).") logger.error(f"POESESSID token invalid when user {user_id} attempted to add filter {filter_id} ({custom_name or 'no name'})") elif result["status"] == "queued": await ctx.reply("🕑 The filter has been queued for addition") logger.error(f"User {user_id} queued filter {filter_id} ({custom_name or 'no name'}) for addition.") else: await ctx.reply(f"❌ Failed to add filter `{filter_id}`. Status: {result['status']}") logger.error(f"An unknown error occured when user {user_id} attempted to add filter {filter_id} ({custom_name or 'no name'})") elif subcommand == "remove" and len(args) >= 2: filter_id = args[1] if filter_id.lower() == "all": result = watcher.remove_all_filters(user_id) if result["status"] == "removed": await ctx.reply("✅ All filters have been removed from your watchlist.") logger.info(f"User {user_id} removed all filters.") else: await ctx.reply("⚠️ You don't have any filters to remove.") else: result = watcher.remove_filter(user_id, filter_id) if result["status"] == "removed": await ctx.reply(f"✅ Filter `{filter_id}` was successfully removed.") logger.info(f"User {user_id} removed filter {filter_id}") elif result["status"] == "not_found": await ctx.reply(f"⚠️ Filter `{filter_id}` was not found on your watchlist.") else: await ctx.reply(f"❌ Failed to remove filter `{filter_id}`. Status: {result['status']}") elif subcommand == "list": filters = watcher.watchlists.get(user_id, []) if not filters: await ctx.reply("You don't have any watchlisted filters.") else: embed = discord.Embed(title="Your PoE2 Watchlist", color=discord.Color.orange()) for fid in filters: name = watcher.get_filter_name(user_id, fid) label = f"{fid} — *{name}*" if name else fid next_time = watcher.get_next_query_time(user_id, fid) if next_time: time_line = f"Next check: ~\n" else: time_line = "" embed.add_field( name=label, value=f"{time_line}[Open in Trade Site](https://www.pathofexile.com/trade2/search/poe2/Standard/{fid})\n`!poe2trade remove {fid}`", inline=False ) await ctx.reply(embed=embed) logger.info(f"User {user_id} requested their watchlist.") elif subcommand == "set" and len(args) >= 3: if not self._is_owner(user_id): await ctx.reply("Only the server owner can modify settings.") return key, value = args[1], args[2] parsed_value = int(value) if value.isdigit() else value result = await watcher.set_setting(key, parsed_value) if result["status"] == "ok": logger.info(f"Setting updated: {key} = {value}") await ctx.reply(f"✅ Setting `{key}` updated successfully.") elif result["status"] == "invalid_key": await ctx.reply(f"❌ Setting `{key}` is not recognized.") else: await ctx.reply("⚠️ Something went wrong while updating the setting.") elif subcommand == "settings": if not self._is_owner(user_id): await ctx.reply("Only the server owner can view settings.") logger.info(f"User {user_id} requested (but not allowed) to see current settings.") return result = watcher.get_settings() embed = discord.Embed(title="Current `PoE 2 Trade` Settings", color=discord.Color.teal()) embed.set_footer(text="Use `!poe2trade set ` to change a setting (owner-only)") status = result.get("status", "unknown") settings = result.get("settings", {}) embed = discord.Embed(title="Current PoE 2 Trade Settings", color=discord.Color.teal()) embed.add_field(name="Status", value=f"`{status}`", inline=False) user = self.bot.get_user(int(user_id)) if user: owner_display_name = user.display_name # or user.name else: owner_display_name = f"Unknown User" for key, value in settings.items(): if key == "POESESSID": value = f"`{value[:4]}...{value[-4:]}` (*Obfuscated*)" if key == "AUTO_POLL_INTERVAL": value = f"`{value}` seconds" if key == "MAX_SAFE_RESULTS": value = f"`{value}` max results" if key == "USER_ADD_INTERVAL": value = f"`{value}` seconds" if key == "MAX_FILTERS_PER_USER": value = f"`{value}` filters" if key == "RATE_LIMIT_INTERVAL": value = f"`{value}` seconds" if key == "OWNER_ID": value = f"`{value}` (**{owner_display_name}**)" if key == "LEGACY_WEBHOOK_URL": value = f"`{value[:23]}...{value[-10:]}` (*Obfuscated*)" embed.add_field(name=key, value=f"{value}", inline=False) embed.set_footer(text="Use `!poe2trade set ` to change a setting (owner-only)") await ctx.reply(embed=embed) logger.info(f"User {user_id} requested and received current settings.") elif subcommand == "pause": if watcher.pause_user(user_id): await ctx.reply("⏸️ Your filter notifications are now paused.") logger.info(f"User {user_id} paused their filters.") else: await ctx.reply("⏸️ Your filters were already paused.") elif subcommand == "resume": if watcher.resume_user(user_id): await ctx.reply("▶️ Filter notifications have been resumed.") logger.info(f"User {user_id} resumed their filters.") else: await ctx.reply("▶️ Your filters were not paused.") elif subcommand == "clearcache": if not self._is_owner(user_id): await ctx.reply("❌ Only the bot owner can use this command.") return if "-y" in args: result = watcher.clear_seen_cache(user_id) if result["status"] == "success": await ctx.reply("🧹 All cached seen listings have been cleared.") logger.info(f"User {user_id} cleared their listing cache.") else: await ctx.reply("⚠️ Cache clearing failed or confirmation window expired.") else: watcher.initiate_cache_clear(user_id) await ctx.reply( "⚠️ Are you sure you want to clear all listing cache?\n" "Run `!poe2trade clearcache -y` within 60 seconds to confirm." ) logger.info(f"User {user_id} requested cache clear confirmation.") else: await ctx.reply("Unknown subcommand. Try `!poe2trade` to see help.") except Exception as e: logger.error(f"Error in !poe2trade command: {e}", exc_info=True) await ctx.reply("Something went wrong handling your command.") def _get_help_embed(self): embed = discord.Embed(title="PoE2 Trade Tracker Help", color=discord.Color.blue()) embed.description = ( "This bot allows you to track Path of Exile 2 trade listings.\n\n" "**Usage:**\n" "`!poe2trade add [filter_nickname]` — Adds a new filter to your watchlist.\n" "`!poe2trade remove ` — Removes a filter or all.\n" "`!poe2trade list` — Shows your currently watched filters.\n" "`!poe2trade pause` — Temporarily stop notifications for your filters.\n" "`!poe2trade resume` — Resume notifications for your filters.\n" "`!poe2trade set ` — *(Owner only)* Change a system setting.\n" "`!poe2trade settings` — *(Owner only)* View current settings.\n" "`!poe2trade clearcache` — *(Owner only)* Clear current data cache\n\n" "**How to use filters:**\n" "Go to the official trade site: https://www.pathofexile.com/trade2/search/poe2/Standard\n" "Apply the filters you want (stat filters, name, price, etc).\n" "Copy the **filter ID** at the end of the URL.\n" "Example: `https://www.pathofexile.com/trade2/search/poe2/Standard/Ezn0zLzf5`\n" "Filter ID = `Ezn0zLzf5`.\n" "Then run: `!poe2trade add Ezn0zLzf5`\n\n" "**Important note**\n" "Only the first 10 results from the filter is checked.\n" "This means if you sort by eg. price, only the 10 cheapest results will be checked.\n" "If you sort by a stat from highest to lowest on the other hand, only the 10 results \n" "with the highest stat will be checked. So double-check your filter sorting!" ) return embed async def handle_poe2_updates(self, data): EMBED_CHAR_LIMIT = 6000 MAX_EMBEDS_PER_MESSAGE = 10 for (user_id, filter_id), payload in data.items(): user = self.bot.get_user(int(user_id)) if not user: logger.warning(f"User ID {user_id} not found in cache.") continue embeds = [] current_char_count = 0 search_id = payload["search_id"] items = payload["items"] for item in items: try: listing = item.get("listing", {}) item_data = item.get("item", {}) price_data = listing.get("price", {}) account_data = listing.get("account", {}) item_name = item_data.get("name") or item_data.get("typeLine", "Unknown Item") item_type = item_data.get("typeLine", "Unknown") full_name = item_name if item_name.strip().lower() == item_type.strip().lower() else f"{item_name} ({item_type})" rarity = item_data.get("frameType", 2) rarity_colors = { 0: discord.Color.light_grey(), # Normal 1: discord.Color.blue(), # Magic 2: discord.Color.gold(), # Rare 3: discord.Color.dark_orange(), # Unique } embed = discord.Embed( title=full_name, url=f"https://www.pathofexile.com/trade2/search/poe2/Standard/{filter_id}", color=rarity_colors.get(rarity, discord.Color.gold()) ) icon_url = item_data.get("icon") if icon_url: embed.set_thumbnail(url=icon_url) amount = price_data.get("amount", "?") currency = price_data.get("currency", "?") if currency in CURRENCY_METADATA: name = CURRENCY_METADATA[currency]["name"] currency_field = f"{amount} × {name}" else: currency_field = f"{amount} {currency}" embed.add_field(name="Price", value=currency_field, inline=True) seller_name = account_data.get("name", "Unknown Seller") seller_url = f"https://www.pathofexile.com/account/view-profile/{quote(seller_name)}" embed.add_field(name="Seller", value=f"[{seller_name}]({seller_url})", inline=True) embed.add_field(name="Item Level", value=str(item_data.get("ilvl", "?")), inline=True) def clean_stat(stat: str) -> str: stat = re.sub(r"\[.*?\|(.+?)\]", r"\1", stat) stat = re.sub(r"\[(.+?)\]", r"\1", stat) return stat stats = [] if "implicitMods" in item["item"]: stats.append("__**Implicit Mods**__") stats.extend(item["item"]["implicitMods"]) if "explicitMods" in item["item"]: stats.append("__**Explicit Mods**__") stats.extend(item["item"]["explicitMods"]) if stats: stats_str = "\n".join(clean_stat(stat) for stat in stats) embed.add_field(name="Stats", value=stats_str, inline=False) next_check = watcher.get_next_query_time(user_id, filter_id) timestamp_str = f"\nNext check: ~" if next_check else "Unknown" embed.add_field( name="*Remove from watchlist*:", value=f"`!poe2trade remove {filter_id}`{timestamp_str}", inline=False ) filter_name = watcher.get_filter_name(str(user_id), filter_id) footer_lines = ["New PoE 2 trade listing"] if filter_name: footer_lines.insert(0, f"Filter name: {filter_name}") embed.set_footer(text="\n".join(footer_lines)) est_char_len = len(embed.title or "") + sum(len(f.name or "") + len(f.value or "") for f in embed.fields) if (len(embeds) + 1 > MAX_EMBEDS_PER_MESSAGE) or (current_char_count + est_char_len > EMBED_CHAR_LIMIT): await user.send(embeds=embeds) embeds = [] current_char_count = 0 embeds.append(embed) current_char_count += est_char_len except Exception as e: logger.warning(f"Failed to process or build embed for user {user_id}: {e}") if embeds: try: await user.send(embeds=embeds) logger.info(f"Sent {len(embeds)} embeds to user {user_id} for filter {filter_id}") except Exception as e: logger.warning(f"Failed to send batched embeds to user {user_id}: {e}") def _is_owner(self, user_id): """ Returns True or False depending on user ID comparison to owner ID """ owner_id = watcher.settings.get("OWNER_ID") if str(user_id) != owner_id: return False elif str(user_id) == owner_id: return True else: logger.error(f"Failed to verify author as owner! user_id: '{user_id}', owner_id: '{owner_id}'") async def setup(bot): await bot.add_cog(PoE2TradeCog(bot))