diff --git a/cmd_discord/poe2trade.py b/cmd_discord/poe2trade.py new file mode 100644 index 0000000..b74a5ef --- /dev/null +++ b/cmd_discord/poe2trade.py @@ -0,0 +1,236 @@ +import discord +from discord.ext import commands +import globals +from globals import logger +from utility.PoE2_wishlist import PoeTradeWatcher +import re +from urllib.parse import quote + + +watcher = PoeTradeWatcher() + +class PoE2TradeCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + watcher.set_update_callback(self.handle_poe2_updates) + self.bot.loop.create_task(watcher.start_auto_poll()) + + + @commands.command(name="poe2trade") + async def poe2trade(self, ctx, *, arg_str: str = ""): + if not arg_str: + await ctx.author.send(embed=self._get_help_embed()) + await ctx.reply("I've sent you a DM with usage instructions.") + return + + args = arg_str.strip().split() + subcommand = args[0].lower() + user_id = str(ctx.author.id) + + try: + if subcommand == "add" and len(args) >= 2: + filter_id = args[1] + custom_name = " ".join(args[2:]).strip() if len(args) > 2 else None + + # Validate name (only basic safe characters) + if custom_name and not re.fullmatch(r"[a-zA-Z0-9\s\-\_\!\?\(\)\[\]]+", custom_name): + await ctx.reply("❌ Invalid characters in custom name. Only letters, numbers, spaces, and basic punctuation are allowed.") + return + + result = await watcher.add_filter(user_id, filter_id, custom_name) + + if result["status"] == "success": + await ctx.reply(f"✅ Filter `{filter_id}` was successfully added to your watchlist.") + elif result["status"] == "exists": + await ctx.reply(f"⚠️ Filter `{filter_id}` is already on your watchlist.") + elif result["status"] == "rate_limited": + await ctx.reply("⏱️ You're adding filters too quickly. Please wait a bit before trying again.") + elif result["status"] == "limit_reached": + await ctx.reply("🚫 You've reached the maximum number of filters. Remove one before adding another.") + elif result["status"] == "warning": + await ctx.reply("⚠️ That filter returns too many results. Add it with better filters or use force mode (not currently supported).") + elif result["status"] == "invalid_session": + await ctx.reply("🔒 The session token is invalid or expired. Please update it via `!poe2trade set POESESSID ` (owner-only).") + else: + await ctx.reply(f"❌ Failed to add filter `{filter_id}`. Status: {result['status']}") + + elif subcommand == "remove" and len(args) >= 2: + filter_id = args[1] + result = watcher.remove_filter(user_id, filter_id) + if result["status"] == "removed": + await ctx.reply(f"✅ Filter `{filter_id}` was successfully removed.") + elif result["status"] == "not_found": + await ctx.reply(f"⚠️ Filter `{filter_id}` was not found on your watchlist.") + else: + await ctx.reply(f"❌ Failed to remove filter `{filter_id}`. Status: {result['status']}") + + elif subcommand == "list": + filters = watcher.watchlists.get(user_id, []) + if not filters: + await ctx.reply("You don't have any watchlisted filters.") + else: + embed = discord.Embed(title="Your PoE2 Watchlist", color=discord.Color.orange()) + for fid in filters: + name = watcher.get_filter_name(user_id, fid) + label = f"{fid} — *{name}*" if name else fid + + embed.add_field( + name=label, + value=f"[Open in Trade Site](https://www.pathofexile.com/trade2/search/poe2/Standard/{fid})\n`!poe2trade remove {fid}`", + inline=False + ) + await ctx.reply(embed=embed) + + elif subcommand == "set" and len(args) >= 3: + if ctx.author.id != ctx.guild.owner_id: + await ctx.reply("Only the server owner can modify settings.") + return + key, value = args[1], args[2] + parsed_value = int(value) if value.isdigit() else value + + result = await watcher.set_setting(key, parsed_value) + + if result["status"] == "ok": + logger.info(f"Setting updated: {key} = {value}") + await ctx.reply(f"✅ Setting `{key}` updated successfully.") + elif result["status"] == "invalid_key": + await ctx.reply(f"❌ Setting `{key}` is not recognized.") + else: + await ctx.reply("⚠️ Something went wrong while updating the setting.") + + elif subcommand == "settings": + if ctx.author.id != ctx.guild.owner_id: + await ctx.reply("Only the server owner can view settings.") + return + result = watcher.get_settings() + await ctx.reply(str(result)) + + elif subcommand == "pause": + if watcher.pause_user(user_id): + await ctx.reply("⏸️ Your filter notifications are now paused.") + else: + await ctx.reply("⏸️ Your filters were already paused.") + + elif subcommand == "resume": + if watcher.resume_user(user_id): + await ctx.reply("▶️ Filter notifications have been resumed.") + else: + await ctx.reply("▶️ Your filters were not paused.") + + else: + await ctx.reply("Unknown subcommand. Try `!poe2trade` to see help.") + + except Exception as e: + logger.error(f"Error in !poe2trade command: {e}", exc_info=True) + await ctx.reply("Something went wrong handling your command.") + + + def _get_help_embed(self): + embed = discord.Embed(title="PoE2 Trade Tracker Help", color=discord.Color.blue()) + embed.description = ( + "This bot allows you to track Path of Exile 2 trade listings.\n\n" + "**Usage:**\n" + "`!poe2trade add [filter_nickname]` — Adds a new filter to your watchlist.\n" + "`!poe2trade remove ` — Removes a filter or all.\n" + "`!poe2trade list` — Shows your currently watched filters.\n" + "`!poe2trade pause` — Temporarily stop notifications for your filters.\n" + "`!poe2trade resume` — Resume notifications for your filters.\n" + "`!poe2trade set ` — *(Owner only)* Change a system setting.\n" + "`!poe2trade settings` — *(Owner only)* View current settings.\n\n" + "**How to use filters:**\n" + "Go to the official trade site: https://www.pathofexile.com/trade2/search/poe2/Standard\n" + "Apply the filters you want (stat filters, name, price, etc).\n" + "Copy the **filter ID** at the end of the URL.\n" + "Example: `https://www.pathofexile.com/trade2/search/poe2/Standard/Ezn0zLzf5`\n" + "Filter ID = `Ezn0zLzf5`.\n" + "Then run: `!poe2trade add Ezn0zLzf5`\n" + ) + return embed + + + async def handle_poe2_updates(self, data): + EMBED_CHAR_LIMIT = 6000 + MAX_EMBEDS_PER_MESSAGE = 10 + + for (user_id, filter_id), payload in data.items(): + user = self.bot.get_user(int(user_id)) + if not user: + logger.warning(f"User ID {user_id} not found in cache.") + continue + + embeds = [] + current_char_count = 0 + + search_id = payload["search_id"] + items = payload["items"] + + for item in items: + try: + listing = item.get("listing", {}) + item_data = item.get("item", {}) + price_data = listing.get("price", {}) + account_data = listing.get("account", {}) + + item_name = item_data.get("name") or item_data.get("typeLine", "Unknown Item") + item_type = item_data.get("typeLine", "Unknown") + full_name = f"{item_name} ({item_type})" + + embed = discord.Embed( + title=full_name, + url=f"https://www.pathofexile.com/trade2/search/poe2/Standard/{search_id}/item/{item.get('id')}", + color=discord.Color.gold() + ) + + icon_url = item_data.get("icon") + if icon_url: + embed.set_thumbnail(url=icon_url) + + embed.add_field(name="Price", value=f"{price_data.get('amount', '?')} {price_data.get('currency', '?')}", inline=True) + + seller_name = account_data.get("name", "Unknown Seller") + seller_url = f"https://www.pathofexile.com/account/view-profile/{quote(seller_name)}" + embed.add_field(name="Seller", value=f"[{seller_name}]({seller_url})", inline=True) + + embed.add_field(name="Item Level", value=str(item_data.get("ilvl", "?")), inline=True) + + def clean_stat(stat: str) -> str: + stat = re.sub(r"\[.*?\|(.+?)\]", r"\1", stat) + stat = re.sub(r"\[(.+?)\]", r"\1", stat) + return stat + + stats = item_data.get("explicitMods", []) + if stats: + stats_str = "\n".join(clean_stat(stat) for stat in stats) + embed.add_field(name="Stats", value=stats_str, inline=False) + + embed.add_field(name="*Remove from watchlist*:", value=f"`!poe2trade remove {filter_id}`", inline=False) + filter_name = watcher.get_filter_name(str(user_id), filter_id) + footer_lines = [f"New PoE 2 trade listing"] + if filter_name: + footer_lines.insert(0, f"Filter name: {filter_name}") + + embed.set_footer(text="\n".join(footer_lines)) + + est_char_len = len(embed.title or "") + sum(len(f.name or "") + len(f.value or "") for f in embed.fields) + + # Send batch if limit exceeded + if (len(embeds) + 1 > MAX_EMBEDS_PER_MESSAGE) or (current_char_count + est_char_len > EMBED_CHAR_LIMIT): + await user.send(embeds=embeds) + embeds = [] + current_char_count = 0 + + embeds.append(embed) + current_char_count += est_char_len + + except Exception as e: + logger.warning(f"Failed to process or build embed for user {user_id}: {e}") + + if embeds: + try: + await user.send(embeds=embeds) + except Exception as e: + logger.warning(f"Failed to send batched embeds to user {user_id}: {e}") + + +async def setup(bot): + await bot.add_cog(PoE2TradeCog(bot)) diff --git a/utility/PoE2_wishlist.py b/utility/PoE2_wishlist.py new file mode 100644 index 0000000..901b33b --- /dev/null +++ b/utility/PoE2_wishlist.py @@ -0,0 +1,483 @@ +import aiohttp +import asyncio +import time +import json +import logging +from pathlib import Path +from collections import defaultdict + + +class PoeTradeWatcher: + """ + A watcher class for managing Path of Exile trade filters, querying updates, + and notifying clients of new item listings. + """ + + def __init__(self): + """ + Initializes the watcher with default settings and loads persistent state. + """ + self._poll_task = None + self._on_update_callback = None + self.logger = self._init_logger() + + self.BASE_URL = "https://www.pathofexile.com/api/trade2" + self.LEAGUE = "Standard" + self.HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "application/json" + } + + self.COOKIES = {} + self.settings = { + "AUTO_POLL_INTERVAL": 300, + "MAX_SAFE_RESULTS": 100, + "USER_ADD_INTERVAL": 60, + "MAX_FILTERS_PER_USER": 5, + "RATE_LIMIT_INTERVAL": 5, + "POESESSID": "", + "LEGACY_WEBHOOK_URL": "https://discord.com/api/webhooks/1354003262709305364/afkTjeXcu1bfZXsQzFl-QqSb3R1MmQ4hdZhosR3vm4I__QVEyZ0jO9cqndUTQwb1mt5Z" + } + + base_path = Path(__file__).resolve().parent.parent / "local_storage" + base_path.mkdir(parents=True, exist_ok=True) + self.storage_file = base_path / "poe_trade_state.json" + self.log_file = base_path / "poe_trade.log" + + self.watchlists = defaultdict(list) + self.last_seen_items = defaultdict(set) + self.last_add_time = defaultdict(lambda: 0) + self.last_api_time = 0 + self.session_valid = True + self.filter_names = defaultdict(dict) # {user_id: {filter_id: custom_name}} + self.paused_users = set() + + self._load_state() + asyncio.create_task(self._validate_session()) + + + def _init_logger(self): + logger = logging.getLogger("PoeTradeWatcher") + logger.setLevel(logging.INFO) + log_path = Path(__file__).resolve().parent.parent / "local_storage" / "poe_trade.log" + if not logger.handlers: + handler = logging.FileHandler(log_path, encoding="utf-8") + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + + def should_notify(self, user_id, filter_id, item_id, price, currency) -> bool: + try: + entry = self.last_seen_items[user_id][filter_id][item_id] + return entry.get("price") != price or entry.get("currency") != currency + except KeyError: + return True + + + def mark_seen(self, user_id, filter_id, item_id, price, currency): + self.last_seen_items.setdefault(user_id, {}).setdefault(filter_id, {})[item_id] = { + "price": price, + "currency": currency + } + self._save_state() + + + async def start_auto_poll(self): + """ + Starts the automatic polling loop using AUTO_POLL_INTERVAL. + This will call the registered update callback when new items are found. + """ + if self._poll_task is not None: + return # Already running + + async def poll_loop(): + while True: + try: + await self.query_all() + except Exception as e: + self.logger.error(f"Error in auto poll loop: {e}") + await asyncio.sleep(self.settings.get("AUTO_POLL_INTERVAL", 300)) + + self._poll_task = asyncio.create_task(poll_loop()) + + + def pause(self): + """ + Pauses the auto-polling loop if it's currently running. + """ + if self._poll_task: + self._poll_task.cancel() + self._poll_task = None + self.logger.info("Auto-polling paused.") + + + def resume(self): + """ + Resumes auto-polling if it was previously paused. + """ + if self._poll_task is None: + asyncio.create_task(self.start_auto_poll()) + self.logger.info("Auto-polling resumed.") + + + def set_update_callback(self, callback): + """ + Sets the callback function that will be triggered when new results are found during polling. + + Args: + callback (Callable): An async function that receives a dict of new results. + """ + self._on_update_callback = callback + + + async def set_setting(self, key, value, force: bool = False): + """ + Updates a configuration setting and persists the state. + + Args: + key (str): Setting name to update. + value (Any): New value for the setting. + force (bool): Allows overriding even admin-only settings. + + Returns: + dict: Result with status and current setting value. + """ + result = self._template() + admin_only_keys = {"RATE_LIMIT_INTERVAL", "USER_ADD_INTERVAL", "MAX_FILTERS_PER_USER", "AUTO_POLL_INTERVAL", "MAX_SAFE_RESULTS"} + + try: + if key in admin_only_keys and not force: + result["status"] = "restricted" + return result + + if key == "POESESSID": + self.settings[key] = value + self.COOKIES = {"POESESSID": value} + await self._validate_session() + result["session"] = value + result["status"] = "ok" if self.session_valid else "invalid" + else: + self.settings[key] = type(self.settings.get(key, value))(value) + result["status"] = "ok" + + self._save_state() + except Exception as e: + result["status"] = "error" + self.logger.error(f"Failed to update setting {key}: {e}") + return result + + + def get_settings(self): + """ + Returns the current settings. + + Returns: + dict: A dictionary with current settings and status. + """ + return {"status": "ok", "settings": self.settings} + + + async def _validate_session(self): + """ + Performs a test request to verify if the current POESESSID is valid. + Updates internal `session_valid` flag. + """ + test_url = f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/OzKEO5ltE" + try: + connector = aiohttp.TCPConnector(ssl=True) + async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES, connector=connector) as session: + async with session.get(test_url) as resp: + if resp.status == 403: + self.session_valid = False + self.logger.error("POESESSID validation failed: status 403") + elif resp.status == 200: + self.session_valid = True + self.logger.info("POESESSID validated successfully.") + else: + self.session_valid = False + self.logger.error(f"POESESSID validation returned unexpected status: {resp.status}") + except Exception as e: + self.session_valid = False + self.logger.error(f"Session validation request failed: {e}") + + + def _template(self): + return { + "status": None, + "user": None, + "filter_id": None, + "result_count": None, + "summary": None, + "results": None, + "session": None, + "input": None, + "user_count": None, + "query_time": None, + "next_allowed_time": None, + "settings": None + } + + + def _load_state(self): + if self.storage_file.exists(): + try: + with open(self.storage_file, "r") as f: + data = json.load(f) + self.watchlists = defaultdict(list, data.get("watchlists", {})) + self.filter_names = defaultdict(dict, data.get("filter_names", {})) + self.last_seen_items = defaultdict(lambda: defaultdict(dict), data.get("seen", {})) + self.settings.update(data.get("settings", {})) + self.paused_users = set(data.get("paused_users", [])) + self.logger.info("State loaded. Active filters: %s", sum(len(v) for v in self.watchlists.values())) + except Exception as e: + self.logger.error(f"Failed to load state: {e}") + + + def _save_state(self): + try: + data = { + "watchlists": dict(self.watchlists), + "filter_names": self.filter_names, + "seen": self.last_seen_items, + "settings": self.settings, + "paused_users": list(self.paused_users) + } + with open(self.storage_file, "w") as f: + json.dump(data, f) + except Exception as e: + self.logger.error(f"Failed to save state: {e}") + + + async def add_filter(self, user_id: str, filter_id: str, custom_name: str = None, force: bool = False) -> dict: + """ + Adds a filter to the user's watchlist. + + Args: + user_id (str): Discord user ID. + filter_id (str): PoE trade filter ID. + force (bool, optional): Whether to force add the filter even if it's broad. Defaults to False. + + Returns: + dict: Result template with status, user, filter_id, and optional warnings. + """ + result = self._template() + result["user"] = user_id + result["filter_id"] = filter_id + now = time.time() + + if not self.session_valid: + result["status"] = "invalid_session" + return result + + if filter_id in self.watchlists[user_id]: + result["status"] = "exists" + return result + + if len(self.watchlists[user_id]) >= self.settings["MAX_FILTERS_PER_USER"]: + result["status"] = "limit_reached" + return result + + if now - self.last_add_time[user_id] < self.settings["USER_ADD_INTERVAL"]: + result["status"] = "cooldown" + result["next_allowed_time"] = self.last_add_time[user_id] + self.settings["USER_ADD_INTERVAL"] + return result + + query_url = f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/{filter_id}" + try: + connector = aiohttp.TCPConnector(ssl=False) + async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES, connector=connector) as session: + async with session.get(query_url) as resp: + if resp.status == 403: + self.session_valid = False + result["status"] = "invalid_session" + return result + elif resp.status != 200: + result["status"] = "error" + return result + data = await resp.json() + except Exception as e: + self.logger.error(f"Error while checking filter {filter_id}: {e}") + result["status"] = "error" + return result + + total_results = data.get("total", 0) + result["result_count"] = total_results + + if total_results > self.settings["MAX_SAFE_RESULTS"] and not force: + result["status"] = "too_broad" + return result + + self.watchlists[user_id].append(filter_id) + self.last_add_time[user_id] = now + result["status"] = "success" + if custom_name: + self.filter_names[user_id][filter_id] = custom_name + self._save_state() + return result + + + def remove_filter(self, user_id: str, filter_id: str) -> dict: + """ + Removes a specific filter from a user's watchlist. + + Args: + user_id (str): Discord user ID. + filter_id (str): Filter ID to remove. + + Returns: + dict: Result template with status and user info. + """ + result = self._template() + result["user"] = user_id + result["filter_id"] = filter_id + + if filter_id in self.watchlists[user_id]: + self.watchlists[user_id].remove(filter_id) + result["status"] = "removed" + else: + result["status"] = "not_found" + + self._save_state() + return result + + + def get_filters(self, user_id: str) -> dict: + """ + Returns all active filters for a user. + + Args: + user_id (str): Discord user ID. + + Returns: + dict: Result template with active filters list and status. + """ + result = self._template() + result["user"] = user_id + result["results"] = self.watchlists[user_id] + result["status"] = "ok" + return result + + + async def query_all(self): + if not self.session_valid: + self.logger.warning("Skipping query: POESESSID invalid.") + return + + found_items = {} + + async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES) as session: + + for user_id, filters in self.watchlists.items(): + if str(user_id) in self.paused_users: + self.logger.debug(f"Skipping paused user {user_id}") + continue + for filter_id in filters: + try: + async with session.get(f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/{filter_id}") as res: + if res.status != 200: + self.logger.warning(f"Failed to fetch filter {filter_id} for user {user_id}. Status: {res.status}") + continue + filter_data = await res.json() + query = {"query": filter_data.get("query", {})} + + async with session.post(f"{self.BASE_URL}/search/poe2/{self.LEAGUE}", json=query) as search_res: + if search_res.status != 200: + self.logger.warning(f"Query failed for {filter_id}. Status: {search_res.status}") + continue + result_data = await search_res.json() + + search_id = result_data.get("id") + result_ids = result_data.get("result", [])[:10] + self.logger.info(f"Filter {filter_id} returned {len(result_ids)} item IDs") + + if not result_ids: + continue + + joined_ids = ",".join(result_ids) + fetch_url = f"{self.BASE_URL}/fetch/{joined_ids}?query={search_id}" + async with session.get(fetch_url) as detail_res: + if detail_res.status != 200: + self.logger.warning(f"Failed to fetch items for {filter_id}. Status: {detail_res.status}") + continue + item_data = await detail_res.json() + + items = item_data.get("result", []) + filtered = [] + for item in items: + item_id = item.get("id") + price_data = item.get("listing", {}).get("price", {}) + price = price_data.get("amount") + currency = price_data.get("currency", "unknown") + + if item_id and self.should_notify(user_id, filter_id, item_id, price, currency): + filtered.append(item) + self.mark_seen(user_id, filter_id, item_id, price, currency) + + if filtered: + found_items[(user_id, filter_id)] = { + "search_id": search_id, + "items": filtered + } + + except Exception as e: + self.logger.error(f"Error while querying filter {filter_id}: {e}") + + if found_items and self._on_update_callback: + await self._on_update_callback(found_items) + elif not found_items: + self.logger.info("No new results; fallback webhook active.") + + + def cleanup_filters(self, max_age_seconds: int = 86400) -> dict: + """ + Cleans up filters that haven't had results in a specified time. + + Args: + max_age_seconds (int): Threshold of inactivity before filters are cleaned. + + Returns: + dict: Result with summary of removed filters. + """ + result = self._template() + now = time.time() + removed = {} + + for key in list(self.last_seen_items.keys()): + last_seen = max((now - self.last_add_time.get(key[0], now)), 0) + if last_seen > max_age_seconds: + user, filter_id = key + if filter_id in self.watchlists[user]: + self.watchlists[user].remove(filter_id) + removed.setdefault(user, []).append(filter_id) + del self.last_seen_items[key] + + self._save_state() + result["status"] = "ok" + result["results"] = removed + return result + + + def get_filter_name(self, user_id: str, filter_id: str) -> str | None: + return self.filter_names.get(user_id, {}).get(filter_id) + + + def pause_user(self, user_id: str) -> bool: + if user_id in self.paused_users: + return False + self.paused_users.add(user_id) + self._save_state() + return True + + def resume_user(self, user_id: str) -> bool: + if user_id not in self.paused_users: + return False + self.paused_users.remove(user_id) + self._save_state() + return True + + def is_paused(self, user_id: str) -> bool: + return user_id in self.paused_users + diff --git a/utility/PoE2_wishlist_webhook.py b/utility/PoE2_wishlist_webhook.py new file mode 100644 index 0000000..9e67ba2 --- /dev/null +++ b/utility/PoE2_wishlist_webhook.py @@ -0,0 +1,159 @@ +import requests +import time +import json + +# CONFIGURATION +LEAGUE = "Standard" # PoE2 league name +DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/1354003262709305364/afkTjeXcu1bfZXsQzFl-QqSb3R1MmQ4hdZhosR3vm4I__QVEyZ0jO9cqndUTQwb1mt5Z" +SEARCH_INTERVAL = 300 # 5 minutes in seconds +POESESSID = "e6f8684e56b4ceb489b10225222640f4" # Test session ID + +# Memory to store previously seen listings +last_seen_ids = set() + +# Headers and cookies for authenticated requests +HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" +} +COOKIES = { + "POESESSID": POESESSID +} + +# EXAMPLE WISHLIST FILTER for PoE2 +query = { + "query": { + "status": {"option": "online"}, + "name": "Taryn's Shiver", + "type": "Gelid Staff", + "stats": [ + { + "type": "and", + "filters": [ + { + "id": "skill.freezing_shards", + "value": {"min": 10}, + "disabled": False + }, + { + "id": "explicit.stat_3291658075", + "value": {"min": 100}, + "disabled": False + } + ], + "disabled": False + } + ], + "filters": { + "req_filters": { + "filters": { + "lvl": { + "max": 40, + "min": None + } + }, + "disabled": False + }, + "trade_filters": { + "filters": { + "price": { + "max": 1, + "min": None, + "option": None + } + }, + "disabled": True + } + } + } +} + +def search_items(): + global last_seen_ids + + # Step 1: Submit search query + response = requests.post(f"https://www.pathofexile.com/api/trade2/search/poe2/{LEAGUE}", json=query, headers=HEADERS, cookies=COOKIES) + response.raise_for_status() + data = response.json() + search_id = data.get("id") + result_ids = data.get("result", [])[:10] # Limit to first 10 results + + if not result_ids: + print("No results found.") + return + + new_ids = [item_id for item_id in result_ids if item_id not in last_seen_ids] + if not new_ids: + print("No new items since last check.") + return + + # Step 2: Fetch item details + joined_ids = ",".join(new_ids) + fetch_url = f"https://www.pathofexile.com/api/trade2/fetch/{joined_ids}?query={search_id}" + details = requests.get(fetch_url, headers=HEADERS, cookies=COOKIES).json() + + for item in details.get("result", []): + item_id = item.get("id") + last_seen_ids.add(item_id) + + item_info = item.get("item", {}) + listing = item.get("listing", {}) + + name = item_info.get("name", "") + type_line = item_info.get("typeLine", "") + price = listing.get("price", {}).get("amount", "?") + currency = listing.get("price", {}).get("currency", "?") + account = listing.get("account", {}).get("name", "") + whisper = listing.get("whisper", "") + icon = item_info.get("icon", "") + item_lvl = item_info.get("ilvl", "?") + required_lvl = item_info.get("requirements", []) + + # Requirements formatting + req_str = "" + for r in required_lvl: + if r.get("name") == "Level": + req_str += f"Level {r.get('values')[0][0]}" + else: + req_str += f", {r.get('values')[0][0]} {r.get('name')}" + + # Extract key stats for display (if available) + stats = item_info.get("explicitMods", []) + stat_text = "\n".join([f"{s}" for s in stats]) if stats else "No explicit stats." + + # Construct listing URL + listing_url = f"https://www.pathofexile.com/trade2/search/poe2/{LEAGUE}/{search_id}" + f"/item/{item_id}" + + embed = { + "embeds": [ + { + "author": { + "name": f"{name} — {type_line}" + }, + "title": f"{price} {currency} • Listed by {account}", + "url": listing_url, + "description": f"**Item Level:** {item_lvl}\n**Requirements:** {req_str}\n\n{stat_text}\n\n**Whisper:**\n`{whisper}`", + "thumbnail": {"url": icon}, + "color": 7506394, + "footer": {"text": "Click the title to view full item details."} + } + ] + } + send_to_discord_embed(embed) + + +def send_to_discord_embed(embed_payload): + response = requests.post(DISCORD_WEBHOOK_URL, json=embed_payload) + if response.status_code == 204: + print("Embed sent to Discord.") + else: + print(f"Failed to send embed: {response.status_code}\n{response.text}") + + +if __name__ == "__main__": + while True: + print("Searching for wishlist items...") + try: + search_items() + except Exception as e: + print(f"Error: {e}") + time.sleep(SEARCH_INTERVAL)