381 lines
19 KiB
Python
381 lines
19 KiB
Python
import discord
|
||
from discord.ext import commands
|
||
import globals
|
||
from globals import logger
|
||
from utility.PoE2_wishlist import PoeTradeWatcher
|
||
import re, json, asyncio, os
|
||
from pathlib import Path
|
||
from urllib.parse import quote
|
||
|
||
|
||
watcher = PoeTradeWatcher()
|
||
|
||
# Load currency metadata from JSON
|
||
storage_path = Path(__file__).resolve().parent.parent / "local_storage"
|
||
currency_lookup_path = storage_path / "poe_currency_data.json"
|
||
with open(currency_lookup_path, "r", encoding="utf-8") as f:
|
||
CURRENCY_METADATA = json.load(f)
|
||
|
||
class PoE2TradeCog(commands.Cog):
|
||
def __init__(self, bot):
|
||
self.bot = bot
|
||
watcher.set_update_callback(self.handle_poe2_updates)
|
||
|
||
|
||
@commands.command(name="poe2trade")
|
||
async def poe2trade(self, ctx, *, arg_str: str = ""):
|
||
if not arg_str:
|
||
await ctx.author.send(embed=self._get_help_embed())
|
||
await ctx.reply("I've sent you a DM with usage instructions.")
|
||
return
|
||
|
||
args = arg_str.strip().split()
|
||
subcommand = args[0].lower()
|
||
user_id = str(ctx.author.id)
|
||
|
||
try:
|
||
if subcommand == "add" and len(args) >= 2:
|
||
filter_id = args[1]
|
||
custom_name = " ".join(args[2:]).strip() if len(args) > 2 else None
|
||
|
||
# Validate name (only basic safe characters)
|
||
if custom_name and not re.fullmatch(r"[a-zA-Z0-9\s\-\_\!\?\(\)\[\]]+", custom_name):
|
||
await ctx.reply("❌ Invalid characters in custom name. Only letters, numbers, spaces, and basic punctuation are allowed.")
|
||
return
|
||
|
||
result = await watcher.add_filter(user_id, filter_id, custom_name)
|
||
|
||
if result["status"] == "success":
|
||
await ctx.reply(f"✅ Filter `{filter_id}` was successfully added to your watchlist.")
|
||
logger.info(f"User {user_id} added filter {filter_id} ({custom_name or 'no name'})")
|
||
elif result["status"] == "exists":
|
||
await ctx.reply(f"⚠️ Filter `{filter_id}` is already on your watchlist.")
|
||
logger.info(f"User {user_id} attempted to add existing filter {filter_id} ({custom_name or 'no name'})")
|
||
elif result["status"] == "rate_limited":
|
||
await ctx.reply("⏱️ You're adding filters too quickly. Please wait a bit before trying again.")
|
||
logger.warning(f"User {user_id} adding filters too quickly! Filter {filter_id} ({custom_name or 'no name'})")
|
||
elif result["status"] == "limit_reached":
|
||
await ctx.reply("🚫 You've reached the maximum number of filters. Remove one before adding another.")
|
||
logger.info(f"User {user_id} already at max filters when attempting to add filter {filter_id} ({custom_name or 'no name'})")
|
||
elif result["status"] == "warning":
|
||
await ctx.reply("⚠️ That filter returns too many results. Add it with better filters or use force mode (not currently supported).")
|
||
logger.info(f"User {user_id} attempted to add too broad filter: {filter_id} ({custom_name or 'no name'})")
|
||
elif result["status"] == "invalid_session":
|
||
await ctx.reply("🔒 The session token is invalid or expired. Please update it via `!poe2trade set POESESSID <token>` (owner-only).")
|
||
logger.error(f"POESESSID token invalid when user {user_id} attempted to add filter {filter_id} ({custom_name or 'no name'})")
|
||
elif result["status"] == "queued":
|
||
await ctx.reply("🕑 The filter has been queued for addition")
|
||
logger.error(f"User {user_id} queued filter {filter_id} ({custom_name or 'no name'}) for addition.")
|
||
else:
|
||
await ctx.reply(f"❌ Failed to add filter `{filter_id}`. Status: {result['status']}")
|
||
logger.error(f"An unknown error occured when user {user_id} attempted to add filter {filter_id} ({custom_name or 'no name'})")
|
||
|
||
|
||
elif subcommand == "remove" and len(args) >= 2:
|
||
filter_id = args[1]
|
||
|
||
if filter_id.lower() == "all":
|
||
result = watcher.remove_all_filters(user_id)
|
||
if result["status"] == "removed":
|
||
await ctx.reply("✅ All filters have been removed from your watchlist.")
|
||
logger.info(f"User {user_id} removed all filters.")
|
||
else:
|
||
await ctx.reply("⚠️ You don't have any filters to remove.")
|
||
else:
|
||
result = watcher.remove_filter(user_id, filter_id)
|
||
if result["status"] == "removed":
|
||
await ctx.reply(f"✅ Filter `{filter_id}` was successfully removed.")
|
||
logger.info(f"User {user_id} removed filter {filter_id}")
|
||
elif result["status"] == "not_found":
|
||
await ctx.reply(f"⚠️ Filter `{filter_id}` was not found on your watchlist.")
|
||
else:
|
||
await ctx.reply(f"❌ Failed to remove filter `{filter_id}`. Status: {result['status']}")
|
||
|
||
|
||
elif subcommand == "list":
|
||
filters = watcher.watchlists.get(user_id, [])
|
||
if not filters:
|
||
await ctx.reply("You don't have any watchlisted filters.")
|
||
else:
|
||
embed = discord.Embed(title="Your PoE2 Watchlist", color=discord.Color.orange())
|
||
for fid in filters:
|
||
name = watcher.get_filter_name(user_id, fid)
|
||
label = f"{fid} — *{name}*" if name else fid
|
||
|
||
next_time = watcher.get_next_query_time(user_id, fid)
|
||
if next_time:
|
||
time_line = f"Next check: ~<t:{next_time}:R>\n"
|
||
else:
|
||
time_line = ""
|
||
|
||
embed.add_field(
|
||
name=label,
|
||
value=f"{time_line}[Open in Trade Site](https://www.pathofexile.com/trade2/search/poe2/Standard/{fid})\n`!poe2trade remove {fid}`",
|
||
inline=False
|
||
)
|
||
await ctx.reply(embed=embed)
|
||
logger.info(f"User {user_id} requested their watchlist.")
|
||
|
||
elif subcommand == "set" and len(args) >= 3:
|
||
if not self._is_owner(user_id):
|
||
await ctx.reply("Only the server owner can modify settings.")
|
||
return
|
||
key, value = args[1], args[2]
|
||
parsed_value = int(value) if value.isdigit() else value
|
||
|
||
result = await watcher.set_setting(key, parsed_value)
|
||
|
||
if result["status"] == "ok":
|
||
logger.info(f"Setting updated: {key} = {value}")
|
||
await ctx.reply(f"✅ Setting `{key}` updated successfully.")
|
||
elif result["status"] == "invalid_key":
|
||
await ctx.reply(f"❌ Setting `{key}` is not recognized.")
|
||
else:
|
||
await ctx.reply("⚠️ Something went wrong while updating the setting.")
|
||
|
||
elif subcommand == "settings":
|
||
if not self._is_owner(user_id):
|
||
await ctx.reply("Only the server owner can view settings.")
|
||
logger.info(f"User {user_id} requested (but not allowed) to see current settings.")
|
||
return
|
||
result = watcher.get_settings()
|
||
|
||
embed = discord.Embed(title="Current `PoE 2 Trade` Settings", color=discord.Color.teal())
|
||
embed.set_footer(text="Use `!poe2trade set <key> <value>` to change a setting (owner-only)")
|
||
|
||
status = result.get("status", "unknown")
|
||
settings = result.get("settings", {})
|
||
|
||
embed = discord.Embed(title="Current PoE 2 Trade Settings", color=discord.Color.teal())
|
||
embed.add_field(name="Status", value=f"`{status}`", inline=False)
|
||
|
||
user = self.bot.get_user(int(user_id))
|
||
if user:
|
||
owner_display_name = user.display_name # or user.name
|
||
else:
|
||
owner_display_name = f"Unknown User"
|
||
|
||
for key, value in settings.items():
|
||
if key == "POESESSID":
|
||
value = f"`{value[:4]}...{value[-4:]}` (*Obfuscated*)"
|
||
if key == "AUTO_POLL_INTERVAL":
|
||
value = f"`{value}` seconds"
|
||
if key == "MAX_SAFE_RESULTS":
|
||
value = f"`{value}` max results"
|
||
if key == "USER_ADD_INTERVAL":
|
||
value = f"`{value}` seconds"
|
||
if key == "MAX_FILTERS_PER_USER":
|
||
value = f"`{value}` filters"
|
||
if key == "RATE_LIMIT_INTERVAL":
|
||
value = f"`{value}` seconds"
|
||
if key == "OWNER_ID":
|
||
value = f"`{value}` (**{owner_display_name}**)"
|
||
if key == "LEGACY_WEBHOOK_URL":
|
||
value = f"`{value[:23]}...{value[-10:]}` (*Obfuscated*)"
|
||
|
||
embed.add_field(name=key, value=f"{value}", inline=False)
|
||
|
||
embed.set_footer(text="Use `!poe2trade set <key> <value>` to change a setting (owner-only)")
|
||
|
||
await ctx.reply(embed=embed)
|
||
logger.info(f"User {user_id} requested and received current settings.")
|
||
|
||
|
||
elif subcommand == "pause":
|
||
if watcher.pause_user(user_id):
|
||
await ctx.reply("⏸️ Your filter notifications are now paused.")
|
||
logger.info(f"User {user_id} paused their filters.")
|
||
else:
|
||
await ctx.reply("⏸️ Your filters were already paused.")
|
||
|
||
elif subcommand == "resume":
|
||
if watcher.resume_user(user_id):
|
||
await ctx.reply("▶️ Filter notifications have been resumed.")
|
||
logger.info(f"User {user_id} resumed their filters.")
|
||
else:
|
||
await ctx.reply("▶️ Your filters were not paused.")
|
||
|
||
elif subcommand == "clearcache":
|
||
if not self._is_owner(user_id):
|
||
await ctx.reply("❌ Only the bot owner can use this command.")
|
||
return
|
||
|
||
if "-y" in args:
|
||
result = watcher.clear_seen_cache(user_id)
|
||
if result["status"] == "success":
|
||
await ctx.reply("🧹 All cached seen listings have been cleared.")
|
||
logger.info(f"User {user_id} cleared their listing cache.")
|
||
else:
|
||
await ctx.reply("⚠️ Cache clearing failed or confirmation window expired.")
|
||
else:
|
||
watcher.initiate_cache_clear(user_id)
|
||
await ctx.reply(
|
||
"⚠️ Are you sure you want to clear all listing cache?\n"
|
||
"Run `!poe2trade clearcache -y` within 60 seconds to confirm."
|
||
)
|
||
logger.info(f"User {user_id} requested cache clear confirmation.")
|
||
|
||
else:
|
||
await ctx.reply("Unknown subcommand. Try `!poe2trade` to see help.")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in !poe2trade command: {e}", exc_info=True)
|
||
await ctx.reply("Something went wrong handling your command.")
|
||
|
||
|
||
def _get_help_embed(self):
|
||
embed = discord.Embed(title="PoE2 Trade Tracker Help", color=discord.Color.blue())
|
||
embed.description = (
|
||
"This bot allows you to track Path of Exile 2 trade listings.\n\n"
|
||
"**Usage:**\n"
|
||
"`!poe2trade add <filter_id> [filter_nickname]` — Adds a new filter to your watchlist.\n"
|
||
"`!poe2trade remove <filter_id | all>` — Removes a filter or all.\n"
|
||
"`!poe2trade list` — Shows your currently watched filters.\n"
|
||
"`!poe2trade pause` — Temporarily stop notifications for your filters.\n"
|
||
"`!poe2trade resume` — Resume notifications for your filters.\n"
|
||
"`!poe2trade set <key> <value>` — *(Owner only)* Change a system setting.\n"
|
||
"`!poe2trade settings` — *(Owner only)* View current settings.\n"
|
||
"`!poe2trade clearcache` — *(Owner only)* Clear current data cache\n\n"
|
||
"**How to use filters:**\n"
|
||
"Go to the official trade site: https://www.pathofexile.com/trade2/search/poe2/Standard\n"
|
||
"Apply the filters you want (stat filters, name, price, etc).\n"
|
||
"Copy the **filter ID** at the end of the URL.\n"
|
||
"Example: `https://www.pathofexile.com/trade2/search/poe2/Standard/Ezn0zLzf5`\n"
|
||
"Filter ID = `Ezn0zLzf5`.\n"
|
||
"Then run: `!poe2trade add Ezn0zLzf5`\n\n"
|
||
"**Important note**\n"
|
||
"Only the first 10 results from the filter is checked.\n"
|
||
"This means if you sort by eg. price, only the 10 cheapest results will be checked.\n"
|
||
"If you sort by a stat from highest to lowest on the other hand, only the 10 results \n"
|
||
"with the highest stat will be checked. So double-check your filter sorting!"
|
||
)
|
||
return embed
|
||
|
||
|
||
async def handle_poe2_updates(self, data):
|
||
EMBED_CHAR_LIMIT = 6000
|
||
MAX_EMBEDS_PER_MESSAGE = 10
|
||
|
||
for (user_id, filter_id), payload in data.items():
|
||
user = self.bot.get_user(int(user_id))
|
||
if not user:
|
||
logger.warning(f"User ID {user_id} not found in cache.")
|
||
continue
|
||
|
||
embeds = []
|
||
current_char_count = 0
|
||
|
||
search_id = payload["search_id"]
|
||
items = payload["items"]
|
||
|
||
for item in items:
|
||
try:
|
||
listing = item.get("listing", {})
|
||
item_data = item.get("item", {})
|
||
price_data = listing.get("price", {})
|
||
account_data = listing.get("account", {})
|
||
|
||
item_name = item_data.get("name") or item_data.get("typeLine", "Unknown Item")
|
||
item_type = item_data.get("typeLine", "Unknown")
|
||
full_name = item_name if item_name.strip().lower() == item_type.strip().lower() else f"{item_name} ({item_type})"
|
||
|
||
rarity = item_data.get("frameType", 2)
|
||
rarity_colors = {
|
||
0: discord.Color.light_grey(), # Normal
|
||
1: discord.Color.blue(), # Magic
|
||
2: discord.Color.gold(), # Rare
|
||
3: discord.Color.dark_orange(), # Unique
|
||
}
|
||
|
||
embed = discord.Embed(
|
||
title=full_name,
|
||
url=f"https://www.pathofexile.com/trade2/search/poe2/Standard/{filter_id}",
|
||
color=rarity_colors.get(rarity, discord.Color.gold())
|
||
)
|
||
|
||
icon_url = item_data.get("icon")
|
||
if icon_url:
|
||
embed.set_thumbnail(url=icon_url)
|
||
|
||
amount = price_data.get("amount", "?")
|
||
currency = price_data.get("currency", "?")
|
||
if currency in CURRENCY_METADATA:
|
||
name = CURRENCY_METADATA[currency]["name"]
|
||
currency_field = f"{amount} × {name}"
|
||
else:
|
||
currency_field = f"{amount} {currency}"
|
||
|
||
embed.add_field(name="Price", value=currency_field, inline=True)
|
||
|
||
seller_name = account_data.get("name", "Unknown Seller")
|
||
seller_url = f"https://www.pathofexile.com/account/view-profile/{quote(seller_name)}"
|
||
embed.add_field(name="Seller", value=f"[{seller_name}]({seller_url})", inline=True)
|
||
embed.add_field(name="Item Level", value=str(item_data.get("ilvl", "?")), inline=True)
|
||
|
||
def clean_stat(stat: str) -> str:
|
||
stat = re.sub(r"\[.*?\|(.+?)\]", r"\1", stat)
|
||
stat = re.sub(r"\[(.+?)\]", r"\1", stat)
|
||
return stat
|
||
|
||
stats = []
|
||
if "implicitMods" in item["item"]:
|
||
stats.append("__**Implicit Mods**__")
|
||
stats.extend(item["item"]["implicitMods"])
|
||
if "explicitMods" in item["item"]:
|
||
stats.append("__**Explicit Mods**__")
|
||
stats.extend(item["item"]["explicitMods"])
|
||
if stats:
|
||
stats_str = "\n".join(clean_stat(stat) for stat in stats)
|
||
embed.add_field(name="Stats", value=stats_str, inline=False)
|
||
|
||
next_check = watcher.get_next_query_time(user_id, filter_id)
|
||
timestamp_str = f"\nNext check: ~<t:{next_check}:R>" if next_check else "Unknown"
|
||
embed.add_field(
|
||
name="*Remove from watchlist*:",
|
||
value=f"`!poe2trade remove {filter_id}`{timestamp_str}",
|
||
inline=False
|
||
)
|
||
|
||
filter_name = watcher.get_filter_name(str(user_id), filter_id)
|
||
footer_lines = ["New PoE 2 trade listing"]
|
||
if filter_name:
|
||
footer_lines.insert(0, f"Filter name: {filter_name}")
|
||
embed.set_footer(text="\n".join(footer_lines))
|
||
|
||
est_char_len = len(embed.title or "") + sum(len(f.name or "") + len(f.value or "") for f in embed.fields)
|
||
|
||
if (len(embeds) + 1 > MAX_EMBEDS_PER_MESSAGE) or (current_char_count + est_char_len > EMBED_CHAR_LIMIT):
|
||
await user.send(embeds=embeds)
|
||
embeds = []
|
||
current_char_count = 0
|
||
|
||
embeds.append(embed)
|
||
current_char_count += est_char_len
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to process or build embed for user {user_id}: {e}")
|
||
|
||
if embeds:
|
||
try:
|
||
await user.send(embeds=embeds)
|
||
logger.info(f"Sent {len(embeds)} embeds to user {user_id} for filter {filter_id}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to send batched embeds to user {user_id}: {e}")
|
||
|
||
|
||
def _is_owner(self, user_id):
|
||
"""
|
||
Returns True or False depending on user ID comparison to owner ID
|
||
"""
|
||
owner_id = watcher.settings.get("OWNER_ID")
|
||
if str(user_id) != owner_id:
|
||
return False
|
||
elif str(user_id) == owner_id:
|
||
return True
|
||
else:
|
||
logger.error(f"Failed to verify author as owner! user_id: '{user_id}', owner_id: '{owner_id}'")
|
||
|
||
|
||
async def setup(bot):
|
||
await bot.add_cog(PoE2TradeCog(bot))
|