import aiohttp import asyncio import time import json import logging from pathlib import Path from collections import defaultdict class PoeTradeWatcher: """ A watcher class for managing Path of Exile trade filters, querying updates, and notifying clients of new item listings. """ def __init__(self): """ Initializes the watcher with default settings and loads persistent state. """ self._poll_task = None self._on_update_callback = None self.logger = self._init_logger() self.BASE_URL = "https://www.pathofexile.com/api/trade2" self.LEAGUE = "Standard" self.HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "application/json" } self.COOKIES = {} self.settings = { "AUTO_POLL_INTERVAL": 300, "MAX_SAFE_RESULTS": 100, "USER_ADD_INTERVAL": 60, "MAX_FILTERS_PER_USER": 5, "RATE_LIMIT_INTERVAL": 5, "POESESSID": "", "LEGACY_WEBHOOK_URL": "https://discord.com/api/webhooks/1354003262709305364/afkTjeXcu1bfZXsQzFl-QqSb3R1MmQ4hdZhosR3vm4I__QVEyZ0jO9cqndUTQwb1mt5Z" } base_path = Path(__file__).resolve().parent.parent / "local_storage" base_path.mkdir(parents=True, exist_ok=True) self.storage_file = base_path / "poe_trade_state.json" self.log_file = base_path / "poe_trade.log" self.watchlists = defaultdict(list) self.last_seen_items = defaultdict(set) self.last_add_time = defaultdict(lambda: 0) self.last_api_time = 0 self.session_valid = True self.filter_names = defaultdict(dict) # {user_id: {filter_id: custom_name}} self.paused_users = set() self._load_state() asyncio.create_task(self._validate_session()) def _init_logger(self): logger = logging.getLogger("PoeTradeWatcher") logger.setLevel(logging.INFO) log_path = Path(__file__).resolve().parent.parent / "local_storage" / "poe_trade.log" if not logger.handlers: handler = logging.FileHandler(log_path, encoding="utf-8") formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger def should_notify(self, user_id, filter_id, item_id, price, currency) -> bool: try: entry = self.last_seen_items[user_id][filter_id][item_id] return entry.get("price") != price or entry.get("currency") != currency except KeyError: return True def mark_seen(self, user_id, filter_id, item_id, price, currency): self.last_seen_items.setdefault(user_id, {}).setdefault(filter_id, {})[item_id] = { "price": price, "currency": currency } self._save_state() async def start_auto_poll(self): """ Starts the automatic polling loop using AUTO_POLL_INTERVAL. This will call the registered update callback when new items are found. """ if self._poll_task is not None: return # Already running async def poll_loop(): while True: try: await self.query_all() except Exception as e: self.logger.error(f"Error in auto poll loop: {e}") await asyncio.sleep(self.settings.get("AUTO_POLL_INTERVAL", 300)) self._poll_task = asyncio.create_task(poll_loop()) def pause(self): """ Pauses the auto-polling loop if it's currently running. """ if self._poll_task: self._poll_task.cancel() self._poll_task = None self.logger.info("Auto-polling paused.") def resume(self): """ Resumes auto-polling if it was previously paused. """ if self._poll_task is None: asyncio.create_task(self.start_auto_poll()) self.logger.info("Auto-polling resumed.") def set_update_callback(self, callback): """ Sets the callback function that will be triggered when new results are found during polling. Args: callback (Callable): An async function that receives a dict of new results. """ self._on_update_callback = callback async def set_setting(self, key, value, force: bool = False): """ Updates a configuration setting and persists the state. Args: key (str): Setting name to update. value (Any): New value for the setting. force (bool): Allows overriding even admin-only settings. Returns: dict: Result with status and current setting value. """ result = self._template() admin_only_keys = {"RATE_LIMIT_INTERVAL", "USER_ADD_INTERVAL", "MAX_FILTERS_PER_USER", "AUTO_POLL_INTERVAL", "MAX_SAFE_RESULTS"} try: if key in admin_only_keys and not force: result["status"] = "restricted" return result if key == "POESESSID": self.settings[key] = value self.COOKIES = {"POESESSID": value} await self._validate_session() result["session"] = value result["status"] = "ok" if self.session_valid else "invalid" else: self.settings[key] = type(self.settings.get(key, value))(value) result["status"] = "ok" self._save_state() except Exception as e: result["status"] = "error" self.logger.error(f"Failed to update setting {key}: {e}") return result def get_settings(self): """ Returns the current settings. Returns: dict: A dictionary with current settings and status. """ return {"status": "ok", "settings": self.settings} async def _validate_session(self): """ Performs a test request to verify if the current POESESSID is valid. Updates internal `session_valid` flag. """ test_url = f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/OzKEO5ltE" try: connector = aiohttp.TCPConnector(ssl=True) async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES, connector=connector) as session: async with session.get(test_url) as resp: if resp.status == 403: self.session_valid = False self.logger.error("POESESSID validation failed: status 403") elif resp.status == 200: self.session_valid = True self.logger.info("POESESSID validated successfully.") else: self.session_valid = False self.logger.error(f"POESESSID validation returned unexpected status: {resp.status}") except Exception as e: self.session_valid = False self.logger.error(f"Session validation request failed: {e}") def _template(self): return { "status": None, "user": None, "filter_id": None, "result_count": None, "summary": None, "results": None, "session": None, "input": None, "user_count": None, "query_time": None, "next_allowed_time": None, "settings": None } def _load_state(self): if self.storage_file.exists(): try: with open(self.storage_file, "r") as f: data = json.load(f) self.watchlists = defaultdict(list, data.get("watchlists", {})) self.filter_names = defaultdict(dict, data.get("filter_names", {})) self.last_seen_items = defaultdict(lambda: defaultdict(dict), data.get("seen", {})) self.settings.update(data.get("settings", {})) self.paused_users = set(data.get("paused_users", [])) self.logger.info("State loaded. Active filters: %s", sum(len(v) for v in self.watchlists.values())) except Exception as e: self.logger.error(f"Failed to load state: {e}") def _save_state(self): try: data = { "watchlists": dict(self.watchlists), "filter_names": self.filter_names, "seen": self.last_seen_items, "settings": self.settings, "paused_users": list(self.paused_users) } with open(self.storage_file, "w") as f: json.dump(data, f) except Exception as e: self.logger.error(f"Failed to save state: {e}") async def add_filter(self, user_id: str, filter_id: str, custom_name: str = None, force: bool = False) -> dict: """ Adds a filter to the user's watchlist. Args: user_id (str): Discord user ID. filter_id (str): PoE trade filter ID. force (bool, optional): Whether to force add the filter even if it's broad. Defaults to False. Returns: dict: Result template with status, user, filter_id, and optional warnings. """ result = self._template() result["user"] = user_id result["filter_id"] = filter_id now = time.time() if not self.session_valid: result["status"] = "invalid_session" return result if filter_id in self.watchlists[user_id]: result["status"] = "exists" return result if len(self.watchlists[user_id]) >= self.settings["MAX_FILTERS_PER_USER"]: result["status"] = "limit_reached" return result if now - self.last_add_time[user_id] < self.settings["USER_ADD_INTERVAL"]: result["status"] = "cooldown" result["next_allowed_time"] = self.last_add_time[user_id] + self.settings["USER_ADD_INTERVAL"] return result query_url = f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/{filter_id}" try: connector = aiohttp.TCPConnector(ssl=False) async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES, connector=connector) as session: async with session.get(query_url) as resp: if resp.status == 403: self.session_valid = False result["status"] = "invalid_session" return result elif resp.status != 200: result["status"] = "error" return result data = await resp.json() except Exception as e: self.logger.error(f"Error while checking filter {filter_id}: {e}") result["status"] = "error" return result total_results = data.get("total", 0) result["result_count"] = total_results if total_results > self.settings["MAX_SAFE_RESULTS"] and not force: result["status"] = "too_broad" return result self.watchlists[user_id].append(filter_id) self.last_add_time[user_id] = now result["status"] = "success" if custom_name: self.filter_names[user_id][filter_id] = custom_name self._save_state() return result def remove_filter(self, user_id: str, filter_id: str) -> dict: """ Removes a specific filter from a user's watchlist. Args: user_id (str): Discord user ID. filter_id (str): Filter ID to remove. Returns: dict: Result template with status and user info. """ result = self._template() result["user"] = user_id result["filter_id"] = filter_id if filter_id in self.watchlists[user_id]: self.watchlists[user_id].remove(filter_id) result["status"] = "removed" else: result["status"] = "not_found" self._save_state() return result def get_filters(self, user_id: str) -> dict: """ Returns all active filters for a user. Args: user_id (str): Discord user ID. Returns: dict: Result template with active filters list and status. """ result = self._template() result["user"] = user_id result["results"] = self.watchlists[user_id] result["status"] = "ok" return result async def query_all(self): if not self.session_valid: self.logger.warning("Skipping query: POESESSID invalid.") return found_items = {} async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES) as session: for user_id, filters in self.watchlists.items(): if str(user_id) in self.paused_users: self.logger.debug(f"Skipping paused user {user_id}") continue for filter_id in filters: try: async with session.get(f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/{filter_id}") as res: if res.status != 200: self.logger.warning(f"Failed to fetch filter {filter_id} for user {user_id}. Status: {res.status}") continue filter_data = await res.json() query = {"query": filter_data.get("query", {})} async with session.post(f"{self.BASE_URL}/search/poe2/{self.LEAGUE}", json=query) as search_res: if search_res.status != 200: self.logger.warning(f"Query failed for {filter_id}. Status: {search_res.status}") continue result_data = await search_res.json() search_id = result_data.get("id") result_ids = result_data.get("result", [])[:10] self.logger.info(f"Filter {filter_id} returned {len(result_ids)} item IDs") if not result_ids: continue joined_ids = ",".join(result_ids) fetch_url = f"{self.BASE_URL}/fetch/{joined_ids}?query={search_id}" async with session.get(fetch_url) as detail_res: if detail_res.status != 200: self.logger.warning(f"Failed to fetch items for {filter_id}. Status: {detail_res.status}") continue item_data = await detail_res.json() items = item_data.get("result", []) filtered = [] for item in items: item_id = item.get("id") price_data = item.get("listing", {}).get("price", {}) price = price_data.get("amount") currency = price_data.get("currency", "unknown") if item_id and self.should_notify(user_id, filter_id, item_id, price, currency): filtered.append(item) self.mark_seen(user_id, filter_id, item_id, price, currency) if filtered: found_items[(user_id, filter_id)] = { "search_id": search_id, "items": filtered } except Exception as e: self.logger.error(f"Error while querying filter {filter_id}: {e}") if found_items and self._on_update_callback: await self._on_update_callback(found_items) elif not found_items: self.logger.info("No new results; fallback webhook active.") def cleanup_filters(self, max_age_seconds: int = 86400) -> dict: """ Cleans up filters that haven't had results in a specified time. Args: max_age_seconds (int): Threshold of inactivity before filters are cleaned. Returns: dict: Result with summary of removed filters. """ result = self._template() now = time.time() removed = {} for key in list(self.last_seen_items.keys()): last_seen = max((now - self.last_add_time.get(key[0], now)), 0) if last_seen > max_age_seconds: user, filter_id = key if filter_id in self.watchlists[user]: self.watchlists[user].remove(filter_id) removed.setdefault(user, []).append(filter_id) del self.last_seen_items[key] self._save_state() result["status"] = "ok" result["results"] = removed return result def get_filter_name(self, user_id: str, filter_id: str) -> str | None: return self.filter_names.get(user_id, {}).get(filter_id) def pause_user(self, user_id: str) -> bool: if user_id in self.paused_users: return False self.paused_users.add(user_id) self._save_state() return True def resume_user(self, user_id: str) -> bool: if user_id not in self.paused_users: return False self.paused_users.remove(user_id) self._save_state() return True def is_paused(self, user_id: str) -> bool: return user_id in self.paused_users