705 lines
26 KiB
Python
705 lines
26 KiB
Python
import aiohttp
|
|
import asyncio
|
|
import time
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
import base64
|
|
|
|
# --- Base64 encode/decode helpers ---
|
|
def encrypt_token(token: str) -> str:
|
|
return base64.b64encode(token.encode()).decode()
|
|
|
|
def decrypt_token(encoded: str) -> str:
|
|
return base64.b64decode(encoded.encode()).decode()
|
|
|
|
class PoeTradeWatcher:
|
|
"""
|
|
A watcher class for managing Path of Exile trade filters, querying updates,
|
|
and notifying clients of new item listings.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Initializes the watcher with default settings and loads persistent state.
|
|
"""
|
|
self._poll_task = None
|
|
self._on_update_callback = None
|
|
self.logger = self._init_logger()
|
|
|
|
self.BASE_URL = "https://www.pathofexile.com/api/trade2"
|
|
self.LEAGUE = "Standard"
|
|
self.HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
self.COOKIES = {}
|
|
self.settings = {
|
|
"AUTO_POLL_INTERVAL": 300,
|
|
"MAX_SAFE_RESULTS": 100,
|
|
"USER_ADD_INTERVAL": 60,
|
|
"MAX_FILTERS_PER_USER": 5,
|
|
"RATE_LIMIT_INTERVAL": 90,
|
|
"POESESSID": "",
|
|
"OWNER_ID": "203190147582394369",
|
|
"LEGACY_WEBHOOK_URL": "https://discord.com/api/webhooks/1354003262709305364/afkTjeXcu1bfZXsQzFl-QqSb3R1MmQ4hdZhosR3vm4I__QVEyZ0jO9cqndUTQwb1mt5Z"
|
|
}
|
|
|
|
self.dynamic_rate_limit = self.settings.get("RATE_LIMIT_INTERVAL", 90)
|
|
base_path = Path(__file__).resolve().parent.parent / "local_storage"
|
|
base_path.mkdir(parents=True, exist_ok=True)
|
|
self.storage_file = base_path / "poe_trade_state.json"
|
|
self.log_file = base_path / "poe_trade.log"
|
|
|
|
self.watchlists = defaultdict(list)
|
|
self.last_seen_items = defaultdict(set)
|
|
self.last_add_time = defaultdict(lambda: 0)
|
|
self.last_api_time = 0
|
|
self.session_valid = True
|
|
self.filter_names = defaultdict(dict) # {user_id: {filter_id: custom_name}}
|
|
self.last_query_time = defaultdict(dict) # user_id -> filter_id -> timestamp
|
|
|
|
self.paused_users = set()
|
|
|
|
self.verification_queue = asyncio.Queue()
|
|
|
|
self._load_state()
|
|
asyncio.create_task(self._validate_session())
|
|
asyncio.create_task(self._start_verification_worker())
|
|
asyncio.create_task(self.start_auto_poll())
|
|
|
|
|
|
def _init_logger(self):
|
|
logger = logging.getLogger("PoeTradeWatcher")
|
|
logger.setLevel(logging.INFO)
|
|
log_path = Path(__file__).resolve().parent.parent / "local_storage" / "poe_trade.log"
|
|
if not logger.handlers:
|
|
handler = logging.FileHandler(log_path, encoding="utf-8")
|
|
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
return logger
|
|
|
|
|
|
def should_notify(self, user_id, filter_id, item_id, price, currency) -> bool:
|
|
try:
|
|
entry = self.last_seen_items[user_id][filter_id][item_id]
|
|
return entry.get("price") != price or entry.get("currency") != currency
|
|
except KeyError:
|
|
return True
|
|
|
|
|
|
def mark_seen(self, user_id, filter_id, item_id, price, currency):
|
|
self.last_seen_items.setdefault(user_id, {}).setdefault(filter_id, {})[item_id] = {
|
|
"price": price,
|
|
"currency": currency
|
|
}
|
|
self._save_state()
|
|
|
|
|
|
async def start_auto_poll(self):
|
|
"""
|
|
Starts a rolling polling system where each user/filter is queried one at a time.
|
|
Automatically adjusts wait time based on rate limit headers.
|
|
"""
|
|
if self._poll_task is not None:
|
|
return # Already running
|
|
|
|
async def poll_loop():
|
|
base_delay = self.settings.get("RATE_LIMIT_INTERVAL", 90)
|
|
safety_margin = 2
|
|
|
|
while True:
|
|
for user_id, filters in self.watchlists.items():
|
|
if str(user_id) in self.paused_users:
|
|
self.logger.debug(f"Skipping paused user {user_id}")
|
|
continue
|
|
|
|
for filter_id in filters:
|
|
try:
|
|
# Always wait at least base_delay
|
|
delay = max(base_delay, self.dynamic_rate_limit) + safety_margin
|
|
|
|
result = await self.query_single(user_id, filter_id)
|
|
|
|
if result == "ratelimited":
|
|
# dynamic_rate_limit has already been adjusted within query_single
|
|
delay = max(base_delay, self.dynamic_rate_limit) + safety_margin
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed query for {filter_id} of user {user_id}: {e}")
|
|
delay = base_delay + safety_margin
|
|
|
|
await asyncio.sleep(delay)
|
|
|
|
|
|
self._poll_task = asyncio.create_task(poll_loop())
|
|
|
|
|
|
def pause(self):
|
|
"""
|
|
Pauses the auto-polling loop if it's currently running.
|
|
"""
|
|
if self._poll_task:
|
|
self._poll_task.cancel()
|
|
self._poll_task = None
|
|
self.logger.info("Auto-polling paused.")
|
|
|
|
|
|
def resume(self):
|
|
"""
|
|
Resumes auto-polling if it was previously paused.
|
|
"""
|
|
if self._poll_task is None:
|
|
asyncio.create_task(self.start_auto_poll())
|
|
self.logger.info("Auto-polling resumed.")
|
|
|
|
|
|
def set_update_callback(self, callback):
|
|
"""
|
|
Sets the callback function that will be triggered when new results are found during polling.
|
|
|
|
Args:
|
|
callback (Callable): An async function that receives a dict of new results.
|
|
"""
|
|
self._on_update_callback = callback
|
|
|
|
|
|
async def set_setting(self, key, value, force: bool = False):
|
|
"""
|
|
Updates a configuration setting and persists the state.
|
|
|
|
Args:
|
|
key (str): Setting name to update.
|
|
value (Any): New value for the setting.
|
|
force (bool): Allows overriding even admin-only settings.
|
|
|
|
Returns:
|
|
dict: Result with status and current setting value.
|
|
"""
|
|
result = self._template()
|
|
admin_only_keys = {"RATE_LIMIT_INTERVAL", "USER_ADD_INTERVAL", "MAX_FILTERS_PER_USER", "AUTO_POLL_INTERVAL", "MAX_SAFE_RESULTS"}
|
|
|
|
try:
|
|
if key in admin_only_keys and not force:
|
|
result["status"] = "restricted"
|
|
return result
|
|
|
|
if key == "POESESSID":
|
|
self.settings[key] = value
|
|
self.COOKIES = {"POESESSID": value}
|
|
await self._validate_session()
|
|
result["session"] = value
|
|
result["status"] = "ok" if self.session_valid else "invalid"
|
|
else:
|
|
self.settings[key] = type(self.settings.get(key, value))(value)
|
|
result["status"] = "ok"
|
|
|
|
self._save_state()
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
self.logger.error(f"Failed to update setting {key}: {e}")
|
|
return result
|
|
|
|
|
|
def get_settings(self):
|
|
"""
|
|
Returns the current settings.
|
|
|
|
Returns:
|
|
dict: A dictionary with current settings and status.
|
|
"""
|
|
return {"status": "ok", "settings": self.settings}
|
|
|
|
|
|
async def _validate_session(self):
|
|
"""
|
|
Performs a test request to verify if the current POESESSID is valid.
|
|
Updates internal `session_valid` flag.
|
|
Handles rate limit headers to avoid overloading the API at startup.
|
|
"""
|
|
test_url = f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/OzKEO5ltE"
|
|
try:
|
|
connector = aiohttp.TCPConnector(ssl=True)
|
|
async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES, connector=connector) as session:
|
|
async with session.get(test_url) as resp:
|
|
rate_info = {
|
|
"Status": resp.status,
|
|
"Retry-After": resp.headers.get("Retry-After"),
|
|
"X-Rate-Limit-Rules": resp.headers.get("X-Rate-Limit-Rules"),
|
|
"X-Rate-Limit-Ip": resp.headers.get("X-Rate-Limit-Ip"),
|
|
"X-Rate-Limit-State": resp.headers.get("X-Rate-Limit-State"),
|
|
}
|
|
self.logger.debug(f"[{resp.status}] GET {test_url} | Rate Info: {rate_info}")
|
|
|
|
if resp.status == 429:
|
|
retry_after = int(resp.headers.get("Retry-After", 10))
|
|
self.session_valid = False
|
|
self.logger.warning(f"Rate limited during session validation. Sleeping for {retry_after + 2} seconds.")
|
|
await asyncio.sleep(retry_after + 2)
|
|
return
|
|
|
|
elif resp.status == 403:
|
|
self.session_valid = False
|
|
self.logger.error("POESESSID validation failed: status 403")
|
|
|
|
elif resp.status == 200:
|
|
self.session_valid = True
|
|
self.logger.info("POESESSID validated successfully.")
|
|
|
|
else:
|
|
self.session_valid = False
|
|
self.logger.error(f"POESESSID validation returned unexpected status: {resp.status}")
|
|
|
|
except Exception as e:
|
|
self.session_valid = False
|
|
self.logger.error(f"Session validation request failed: {e}")
|
|
|
|
|
|
def get_next_query_time(self, user_id: str, filter_id: str) -> int | None:
|
|
"""
|
|
Estimate the next time this filter will be queried, accounting for queued filters,
|
|
current ratelimit interval, and safety margins.
|
|
This version ensures the returned time is in the future based on queue order.
|
|
"""
|
|
if str(user_id) in self.paused_users:
|
|
return None
|
|
|
|
base_interval = self.dynamic_rate_limit
|
|
safety_margin = 2
|
|
total_delay = base_interval + safety_margin
|
|
|
|
# Build a linear list of all filters in scheduled order
|
|
full_queue = [
|
|
(uid, fid)
|
|
for uid, flist in self.watchlists.items()
|
|
if str(uid) not in self.paused_users
|
|
for fid in flist
|
|
]
|
|
|
|
if (user_id, filter_id) not in full_queue:
|
|
return None
|
|
|
|
# Filter position in the total queue
|
|
position = full_queue.index((user_id, filter_id))
|
|
|
|
# Always start from last API time and simulate future ticks
|
|
now = time.time()
|
|
last = self.last_api_time or now
|
|
|
|
# Keep incrementing until we find a future timestamp
|
|
next_query = last + (position * total_delay)
|
|
while next_query <= now:
|
|
next_query += len(full_queue) * total_delay
|
|
|
|
return int(next_query)
|
|
|
|
|
|
def _template(self):
|
|
return {
|
|
"status": None,
|
|
"user": None,
|
|
"filter_id": None,
|
|
"result_count": None,
|
|
"summary": None,
|
|
"results": None,
|
|
"session": None,
|
|
"input": None,
|
|
"user_count": None,
|
|
"query_time": None,
|
|
"next_allowed_time": None,
|
|
"settings": None
|
|
}
|
|
|
|
|
|
def _load_state(self):
|
|
if self.storage_file.exists():
|
|
try:
|
|
with open(self.storage_file, "r") as f:
|
|
data = json.load(f)
|
|
self.watchlists = defaultdict(list, data.get("watchlists", {}))
|
|
self.filter_names = defaultdict(dict, data.get("filter_names", {}))
|
|
self.last_seen_items = defaultdict(lambda: defaultdict(dict), data.get("seen", {}))
|
|
self.settings.update(data.get("settings", {}))
|
|
|
|
# Decode POESESSID
|
|
enc_token = self.settings.get("POESESSID")
|
|
if enc_token:
|
|
try:
|
|
self.settings["POESESSID"] = decrypt_token(enc_token)
|
|
self.COOKIES = {"POESESSID": self.settings["POESESSID"]}
|
|
except Exception as de:
|
|
self.logger.warning(f"Could not decode POESESSID: {de}")
|
|
|
|
self.paused_users = set(data.get("paused_users", []))
|
|
self.logger.info("State loaded. Active filters: %s", sum(len(v) for v in self.watchlists.values()))
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to load state: {e}")
|
|
|
|
|
|
def _save_state(self):
|
|
try:
|
|
settings_copy = self.settings.copy()
|
|
if "POESESSID" in settings_copy:
|
|
raw_token = settings_copy["POESESSID"]
|
|
settings_copy["POESESSID"] = encrypt_token(raw_token)
|
|
|
|
data = {
|
|
"watchlists": dict(self.watchlists),
|
|
"filter_names": self.filter_names,
|
|
"seen": self.last_seen_items,
|
|
"settings": settings_copy,
|
|
"paused_users": list(self.paused_users)
|
|
}
|
|
|
|
with open(self.storage_file, "w") as f:
|
|
json.dump(data, f)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to save state: {e}")
|
|
|
|
|
|
async def _start_verification_worker(self):
|
|
while True:
|
|
user_id, filter_id, custom_name, response_callback = await self.verification_queue.get()
|
|
result = await self._verify_and_add_filter(user_id, filter_id, custom_name)
|
|
if callable(response_callback):
|
|
await response_callback(result)
|
|
await asyncio.sleep(self.settings["RATE_LIMIT_INTERVAL"])
|
|
|
|
|
|
async def _verify_and_add_filter(self, user_id, filter_id, custom_name):
|
|
result = self._template()
|
|
result["user"] = user_id
|
|
result["filter_id"] = filter_id
|
|
|
|
query_url = f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/{filter_id}"
|
|
try:
|
|
connector = aiohttp.TCPConnector(ssl=False)
|
|
async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES, connector=connector) as session:
|
|
async with session.get(query_url) as resp:
|
|
if resp.status == 403:
|
|
self.session_valid = False
|
|
result["status"] = "invalid_session"
|
|
return result
|
|
elif resp.status != 200:
|
|
result["status"] = "error"
|
|
return result
|
|
data = await resp.json()
|
|
except Exception as e:
|
|
self.logger.error(f"Error while verifying filter {filter_id}: {e}")
|
|
result["status"] = "error"
|
|
return result
|
|
|
|
total_results = data.get("total", 0)
|
|
result["result_count"] = total_results
|
|
|
|
if total_results > self.settings["MAX_SAFE_RESULTS"]:
|
|
result["status"] = "too_broad"
|
|
return result
|
|
|
|
self.watchlists[user_id].append(filter_id)
|
|
if custom_name:
|
|
self.filter_names[user_id][filter_id] = custom_name
|
|
|
|
self._save_state()
|
|
result["status"] = "success"
|
|
return result
|
|
|
|
|
|
async def add_filter(self, user_id: str, filter_id: str, custom_name: str = None, response_callback=None) -> dict:
|
|
result = self._template()
|
|
result["user"] = user_id
|
|
result["filter_id"] = filter_id
|
|
now = time.time()
|
|
|
|
if not self.session_valid:
|
|
result["status"] = "invalid_session"
|
|
return result
|
|
|
|
if filter_id in self.watchlists[user_id]:
|
|
result["status"] = "exists"
|
|
return result
|
|
|
|
if len(self.watchlists[user_id]) >= self.settings["MAX_FILTERS_PER_USER"]:
|
|
result["status"] = "limit_reached"
|
|
return result
|
|
|
|
if now - self.last_add_time[user_id] < self.settings["USER_ADD_INTERVAL"]:
|
|
result["status"] = "cooldown"
|
|
result["next_allowed_time"] = self.last_add_time[user_id] + self.settings["USER_ADD_INTERVAL"]
|
|
return result
|
|
|
|
self.last_add_time[user_id] = now
|
|
await self.verification_queue.put((user_id, filter_id, custom_name, response_callback))
|
|
result["status"] = "queued"
|
|
return result
|
|
|
|
|
|
def remove_filter(self, user_id: str, filter_id: str) -> dict:
|
|
"""
|
|
Removes a specific filter from a user's watchlist and cleans up associated metadata.
|
|
|
|
Args:
|
|
user_id (str): Discord user ID.
|
|
filter_id (str): Filter ID to remove.
|
|
|
|
Returns:
|
|
dict: Result template with status and user info.
|
|
"""
|
|
result = self._template()
|
|
result["user"] = user_id
|
|
result["filter_id"] = filter_id
|
|
|
|
if filter_id in self.watchlists[user_id]:
|
|
self.watchlists[user_id].remove(filter_id)
|
|
self.filter_names[user_id].pop(filter_id, None)
|
|
self.last_seen_items[user_id].pop(filter_id, None)
|
|
self.last_query_time[user_id].pop(filter_id, None)
|
|
result["status"] = "removed"
|
|
else:
|
|
result["status"] = "not_found"
|
|
|
|
self._save_state()
|
|
return result
|
|
|
|
|
|
def remove_all_filters(self, user_id: str) -> dict:
|
|
"""
|
|
Removes all filters for the specified user and clears associated metadata.
|
|
|
|
Args:
|
|
user_id (str): Discord user ID.
|
|
|
|
Returns:
|
|
dict: Result template with summary of removed filters.
|
|
"""
|
|
result = self._template()
|
|
result["user"] = user_id
|
|
removed = self.watchlists.pop(user_id, [])
|
|
self.filter_names.pop(user_id, None)
|
|
self.last_seen_items.pop(user_id, None)
|
|
self.last_add_time.pop(user_id, None)
|
|
self.last_query_time.pop(user_id, None)
|
|
|
|
if removed:
|
|
result["status"] = "removed"
|
|
result["results"] = removed
|
|
result["summary"] = f"Removed {len(removed)} filters from your watchlist."
|
|
else:
|
|
result["status"] = "empty"
|
|
result["summary"] = "You have no filters to remove."
|
|
|
|
self._save_state()
|
|
return result
|
|
|
|
def get_filters(self, user_id: str) -> dict:
|
|
"""
|
|
Returns all active filters for a user.
|
|
|
|
Args:
|
|
user_id (str): Discord user ID.
|
|
|
|
Returns:
|
|
dict: Result template with active filters list and status.
|
|
"""
|
|
result = self._template()
|
|
result["user"] = user_id
|
|
result["results"] = self.watchlists[user_id]
|
|
result["status"] = "ok"
|
|
return result
|
|
|
|
|
|
async def query_single(self, user_id, filter_id):
|
|
if not self.session_valid:
|
|
self.logger.warning("Skipping query: POESESSID invalid.")
|
|
return
|
|
|
|
if str(user_id) in self.paused_users:
|
|
return
|
|
|
|
found_items = {}
|
|
|
|
async with aiohttp.ClientSession(headers=self.HEADERS, cookies=self.COOKIES) as session:
|
|
async def fetch_with_handling(url, method="GET", **kwargs):
|
|
async with getattr(session, method.lower())(url, **kwargs) as res:
|
|
rate_info = {
|
|
"Status": res.status,
|
|
"Retry-After": res.headers.get("Retry-After"),
|
|
"X-Rate-Limit-Rules": res.headers.get("X-Rate-Limit-Rules"),
|
|
"X-Rate-Limit-Ip": res.headers.get("X-Rate-Limit-Ip"),
|
|
"X-Rate-Limit-State": res.headers.get("X-Rate-Limit-State"),
|
|
}
|
|
self.logger.debug(f"[{res.status}] {method} {url} | Rate Info: {rate_info}")
|
|
|
|
if res.status == 429:
|
|
retry_after = int(res.headers.get("Retry-After", 10))
|
|
self.dynamic_rate_limit = retry_after + 2
|
|
self.logger.warning(f"Rate limited on {url}. Sleeping for {self.dynamic_rate_limit} seconds.")
|
|
await asyncio.sleep(self.dynamic_rate_limit)
|
|
return "ratelimited"
|
|
elif res.status >= 400:
|
|
self.logger.warning(f"HTTP {res.status} on {url}")
|
|
return None
|
|
|
|
return await res.json()
|
|
|
|
filter_data = await fetch_with_handling(
|
|
f"{self.BASE_URL}/search/poe2/{self.LEAGUE}/{filter_id}", method="GET"
|
|
)
|
|
if not filter_data:
|
|
return
|
|
|
|
query = {"query": filter_data.get("query", {})}
|
|
result_data = await fetch_with_handling(
|
|
f"{self.BASE_URL}/search/poe2/{self.LEAGUE}", method="POST", json=query
|
|
)
|
|
if not result_data:
|
|
return
|
|
|
|
search_id = result_data.get("id")
|
|
result_ids = result_data.get("result", [])[:10]
|
|
self.logger.info(f"Filter {filter_id} returned {len(result_ids)} item IDs")
|
|
|
|
if not result_ids:
|
|
return
|
|
|
|
joined_ids = ",".join(result_ids)
|
|
fetch_url = f"{self.BASE_URL}/fetch/{joined_ids}?query={search_id}"
|
|
item_data = await fetch_with_handling(fetch_url, method="GET")
|
|
if not item_data:
|
|
return "ratelimited"
|
|
|
|
items = item_data.get("result", [])
|
|
filtered = []
|
|
for item in items:
|
|
item_id = item.get("id")
|
|
price_data = item.get("listing", {}).get("price", {})
|
|
price = price_data.get("amount")
|
|
currency = price_data.get("currency", "unknown")
|
|
|
|
if item_id and self.should_notify(user_id, filter_id, item_id, price, currency):
|
|
filtered.append(item)
|
|
self.mark_seen(user_id, filter_id, item_id, price, currency)
|
|
|
|
if filtered:
|
|
found_items[(user_id, filter_id)] = {
|
|
"search_id": search_id,
|
|
"items": filtered
|
|
}
|
|
|
|
now = time.time()
|
|
self.last_query_time[user_id][filter_id] = now
|
|
self.last_api_time = now
|
|
|
|
if found_items and self._on_update_callback:
|
|
await self._on_update_callback(found_items)
|
|
|
|
|
|
def cleanup_filters(self, max_age_seconds: int = 86400) -> dict:
|
|
"""
|
|
Cleans up filters that haven't had results in a specified time.
|
|
|
|
Args:
|
|
max_age_seconds (int): Threshold of inactivity before filters are cleaned.
|
|
|
|
Returns:
|
|
dict: Result with summary of removed filters.
|
|
"""
|
|
result = self._template()
|
|
now = time.time()
|
|
removed = {}
|
|
|
|
for key in list(self.last_seen_items.keys()):
|
|
last_seen = max((now - self.last_add_time.get(key[0], now)), 0)
|
|
if last_seen > max_age_seconds:
|
|
user, filter_id = key
|
|
if filter_id in self.watchlists[user]:
|
|
self.watchlists[user].remove(filter_id)
|
|
removed.setdefault(user, []).append(filter_id)
|
|
del self.last_seen_items[key]
|
|
|
|
self._save_state()
|
|
result["status"] = "ok"
|
|
result["results"] = removed
|
|
return result
|
|
|
|
|
|
def get_filter_name(self, user_id: str, filter_id: str) -> str | None:
|
|
return self.filter_names.get(user_id, {}).get(filter_id)
|
|
|
|
|
|
def pause_user(self, user_id: str) -> bool:
|
|
if user_id in self.paused_users:
|
|
return False
|
|
self.paused_users.add(user_id)
|
|
self._save_state()
|
|
return True
|
|
|
|
def resume_user(self, user_id: str) -> bool:
|
|
if user_id not in self.paused_users:
|
|
return False
|
|
self.paused_users.remove(user_id)
|
|
self._save_state()
|
|
return True
|
|
|
|
def is_paused(self, user_id: str) -> bool:
|
|
return user_id in self.paused_users
|
|
|
|
def clear_cache(self, user_id: str, confirm: bool = False) -> dict:
|
|
"""
|
|
Clears all persistent and in-memory cache for a user after confirmation.
|
|
|
|
Args:
|
|
user_id (str): Discord user ID (must be the owner).
|
|
confirm (bool): If True, actually clears cache; otherwise sends confirmation.
|
|
|
|
Returns:
|
|
dict: Result with status and message.
|
|
"""
|
|
owner_id = self.settings.get("OWNER_ID", "203190147582394369")
|
|
result = self._template()
|
|
|
|
if str(user_id) != owner_id:
|
|
result["status"] = "unauthorized"
|
|
result["summary"] = "Only the bot owner may use this command."
|
|
return result
|
|
|
|
now = time.time()
|
|
if not hasattr(self, "_cache_clear_confirmations"):
|
|
self._cache_clear_confirmations = {}
|
|
|
|
if not confirm:
|
|
self._cache_clear_confirmations[user_id] = now
|
|
result["status"] = "confirm"
|
|
result["summary"] = "⚠️ This action will clear all filters, names, and seen cache.\nRun the same command again with `-y` within 60s to confirm."
|
|
return result
|
|
|
|
last = self._cache_clear_confirmations.get(user_id, 0)
|
|
if now - last > 60:
|
|
result["status"] = "expired"
|
|
result["summary"] = "Confirmation expired. Please run the command again."
|
|
return result
|
|
|
|
elif confirm:
|
|
result["status"] = "invalid"
|
|
result["summary"] = "⚠️ Run the command without the `-y` flag first!"
|
|
return result
|
|
|
|
# Reset all critical in-memory and persistent states
|
|
self.watchlists.clear()
|
|
self.filter_names.clear()
|
|
self.last_seen_items.clear()
|
|
self.last_add_time.clear()
|
|
self.last_query_time.clear()
|
|
self.paused_users.clear()
|
|
|
|
self._cache_clear_confirmations.pop(user_id, None)
|
|
self._save_state()
|
|
|
|
result["status"] = "cleared"
|
|
result["summary"] = "🧹 Cache successfully cleared for all users."
|
|
return result
|