Transfer bot was silently no-op'ing every Monday cycle since /user/
became paginated ({"rows": [...], "total": N}). The bot's silent
guard `len(items) if isinstance(items, list) else 0` collapsed every
contract mismatch and HTTP/JSON error into "0 items" with no signal.
- API: add /user/batch — bare-list, no pagination — for batch jobs.
Keeps the paginated /user/ contract intact for the web UI.
- Bot: replace silent guard with raise_for_status + isinstance check.
On any HTTP/JSON/contract failure, log + Telegram-alert + skip the
cycle (next attempt in 10 min). Empty list still means "no work".
- Tests: 15 new tests pinning both sides of the contract, including a
regression test that feeds the exact envelope shape that broke prod.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
94 lines
3.6 KiB
Python
94 lines
3.6 KiB
Python
import logging, time, requests, os, threading
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
|
|
from .cm_bot_hal import CM_BOT_HAL
|
|
from .telegram_notifier import TelegramNotifier
|
|
|
|
# Suppress httpx logs
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
|
|
# api_url = 'https://api.luckytown888.net'
|
|
api_url = os.getenv('API_BASE_URL', 'http://api-server:3000')
|
|
max_threading = int(os.getenv('CM_TRANSFER_MAX_THREADS', '20'))
|
|
|
|
notifier = TelegramNotifier()
|
|
|
|
def transfer(data: dict, local_logger):
|
|
bot = CM_BOT_HAL()
|
|
thread_name = threading.current_thread().name
|
|
local_logger.info(f"[Thread-{thread_name}] [Start] Transfer Credit from {data['f_username']} to {data['t_username']}")
|
|
result = bot.transfer_credit_api(data['f_username'], data['f_password'], data['t_username'], data['t_password'])
|
|
local_logger.info(f"[Thread-{thread_name}] {result}")
|
|
local_logger.info(f"[Thread-{thread_name}] [Done] {data['f_username']} transfer done!")
|
|
del bot
|
|
time.sleep(5)
|
|
|
|
|
|
def fetch_users(local_logger):
|
|
"""Fetch the full user list from /user/batch.
|
|
|
|
Returns the list on success, or None to signal "skip this cycle"
|
|
after surfacing the failure via log + Telegram alert.
|
|
"""
|
|
endpoint = f'{api_url}/user/batch'
|
|
try:
|
|
response = requests.get(endpoint, timeout=30)
|
|
response.raise_for_status()
|
|
items = response.json()
|
|
if not isinstance(items, list):
|
|
raise TypeError(
|
|
f"contract violation at {endpoint}: expected list, "
|
|
f"got {type(items).__name__}: {response.text[:200]}"
|
|
)
|
|
return items
|
|
except (requests.RequestException, ValueError, TypeError) as exc:
|
|
msg = f"Transfer bot: failed to fetch users from {endpoint} — {exc}"
|
|
local_logger.error(msg)
|
|
notifier.notify_generic_error(msg)
|
|
return None
|
|
|
|
def main():
|
|
while True:
|
|
weekday = int(datetime.now().strftime("%w"))
|
|
hour = int(datetime.now().strftime("%H"))
|
|
if weekday == 1 and (hour >= 6 and hour < 12):
|
|
local_logger = logging.getLogger(__name__)
|
|
if not local_logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
handler.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
handler.setFormatter(formatter)
|
|
local_logger.addHandler(handler)
|
|
local_logger.setLevel(logging.INFO)
|
|
|
|
local_logger.info("=" * 80)
|
|
local_logger.info(
|
|
"Transfer window triggered | weekday=%s | hour=%s | max_threads=%s | api_url=%s",
|
|
weekday,
|
|
hour,
|
|
max_threading,
|
|
api_url,
|
|
)
|
|
local_logger.info("=" * 80)
|
|
|
|
items = fetch_users(local_logger)
|
|
if items is None:
|
|
local_logger.info("Skipping cycle due to fetch error (alert sent).")
|
|
local_logger.info("=" * 80)
|
|
elif len(items) == 0:
|
|
local_logger.info("No items to process.")
|
|
local_logger.info("=" * 80)
|
|
else:
|
|
total_items = len(items)
|
|
local_logger.info(f"Processing {total_items} transfer items...")
|
|
with ThreadPoolExecutor(max_workers=max_threading) as executor:
|
|
list(executor.map(lambda item: transfer(item, local_logger), items))
|
|
local_logger.info(f"Completed processing {total_items} transfer items.")
|
|
local_logger.info("=" * 80)
|
|
time.sleep(10 * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|