fix(transfer): use dedicated /user/batch endpoint, alert+skip on fetch failure

Transfer bot was silently no-op'ing every Monday cycle since /user/
became paginated ({"rows": [...], "total": N}). The bot's silent
guard `len(items) if isinstance(items, list) else 0` collapsed every
contract mismatch and HTTP/JSON error into "0 items" with no signal.

- API: add /user/batch — bare-list, no pagination — for batch jobs.
  Keeps the paginated /user/ contract intact for the web UI.
- Bot: replace silent guard with raise_for_status + isinstance check.
  On any HTTP/JSON/contract failure, log + Telegram-alert + skip the
  cycle (next attempt in 10 min). Empty list still means "no work".
- Tests: 15 new tests pinning both sides of the contract, including a
  regression test that feeds the exact envelope shape that broke prod.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
yiekheng 2026-05-04 08:31:08 +08:00
parent 850fb71ddd
commit 1c3d4ef893
4 changed files with 383 additions and 7 deletions

View File

@ -60,6 +60,11 @@ class CM_API:
# User routes # User routes
self.app.route('/user/<username>', methods=['GET'])(self.get_user) self.app.route('/user/<username>', methods=['GET'])(self.get_user)
self.app.route('/user/', methods=['GET'])(self.get_user) self.app.route('/user/', methods=['GET'])(self.get_user)
# Batch endpoint for the transfer bot — returns ALL users as a
# bare list (no pagination envelope). Kept separate from the
# paginated UI endpoint so the two contracts can evolve
# independently.
self.app.route('/user/batch', methods=['GET'])(self.get_user_batch)
# Update routes # Update routes
self.app.route('/update-acc-data', methods=['POST'])(self.update_acc_data) self.app.route('/update-acc-data', methods=['POST'])(self.update_acc_data)
@ -176,7 +181,26 @@ class CM_API:
except Exception as error: except Exception as error:
return self._handle_error(error, "Not Found"), 404 return self._handle_error(error, "Not Found"), 404
def get_user_batch(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
rows = db.query(
"SELECT f_username, f_password, t_username, t_password, last_update_time "
"FROM user "
"ORDER BY last_update_time DESC",
[],
)
return jsonify(rows)
except Exception as error:
# _handle_error already returns (message, 500); wrapping it
# again in a (..., 500) tuple yields a tuple-as-body that
# Flask refuses to coerce into a Response.
return self._handle_error(error, "Error fetching user batch")
def update_acc_data(self): def update_acc_data(self):
is_available, db, error_response = self._check_database_available() is_available, db, error_response = self._check_database_available()
if not is_available: if not is_available:

View File

@ -1,8 +1,9 @@
import logging, time, requests, json, os, threading import logging, time, requests, os, threading
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from datetime import datetime from datetime import datetime
from .cm_bot_hal import CM_BOT_HAL from .cm_bot_hal import CM_BOT_HAL
from .telegram_notifier import TelegramNotifier
# Suppress httpx logs # Suppress httpx logs
logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING)
@ -11,6 +12,8 @@ logging.getLogger("httpx").setLevel(logging.WARNING)
api_url = os.getenv('API_BASE_URL', 'http://api-server:3000') api_url = os.getenv('API_BASE_URL', 'http://api-server:3000')
max_threading = int(os.getenv('CM_TRANSFER_MAX_THREADS', '20')) max_threading = int(os.getenv('CM_TRANSFER_MAX_THREADS', '20'))
notifier = TelegramNotifier()
def transfer(data: dict, local_logger): def transfer(data: dict, local_logger):
bot = CM_BOT_HAL() bot = CM_BOT_HAL()
thread_name = threading.current_thread().name thread_name = threading.current_thread().name
@ -21,6 +24,30 @@ def transfer(data: dict, local_logger):
del bot del bot
time.sleep(5) time.sleep(5)
def fetch_users(local_logger):
"""Fetch the full user list from /user/batch.
Returns the list on success, or None to signal "skip this cycle"
after surfacing the failure via log + Telegram alert.
"""
endpoint = f'{api_url}/user/batch'
try:
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
items = response.json()
if not isinstance(items, list):
raise TypeError(
f"contract violation at {endpoint}: expected list, "
f"got {type(items).__name__}: {response.text[:200]}"
)
return items
except (requests.RequestException, ValueError, TypeError) as exc:
msg = f"Transfer bot: failed to fetch users from {endpoint}{exc}"
local_logger.error(msg)
notifier.notify_generic_error(msg)
return None
def main(): def main():
while True: while True:
weekday = int(datetime.now().strftime("%w")) weekday = int(datetime.now().strftime("%w"))
@ -45,14 +72,15 @@ def main():
) )
local_logger.info("=" * 80) local_logger.info("=" * 80)
response = requests.get(f'{api_url}/user') items = fetch_users(local_logger)
items = json.loads(response.text) if items is None:
total_items = len(items) if isinstance(items, list) else 0 local_logger.info("Skipping cycle due to fetch error (alert sent).")
local_logger.info("=" * 80)
if total_items == 0: elif len(items) == 0:
local_logger.info("No items to process.") local_logger.info("No items to process.")
local_logger.info("=" * 80) local_logger.info("=" * 80)
else: else:
total_items = len(items)
local_logger.info(f"Processing {total_items} transfer items...") local_logger.info(f"Processing {total_items} transfer items...")
with ThreadPoolExecutor(max_workers=max_threading) as executor: with ThreadPoolExecutor(max_workers=max_threading) as executor:
list(executor.map(lambda item: transfer(item, local_logger), items)) list(executor.map(lambda item: transfer(item, local_logger), items))

View File

@ -0,0 +1,170 @@
"""Tests for fetch_users() in app.cm_transfer_credit.
fetch_users() is the bot's API-fetch boundary. It must:
- Return the list on a clean 200 with a JSON list body.
- Return [] on a 200 with empty list (caller distinguishes "no work"
from "fetch failed" empty list is NOT a failure).
- Return None and send a Telegram alert on every error mode:
HTTP error, network error, malformed JSON, contract violation.
- Hit the dedicated /user/batch endpoint with a finite timeout.
The previous implementation collapsed every error mode into "0 items"
silently these tests pin the new loud-skip behavior so the silent
failure can't regress.
"""
import logging
import unittest
from unittest import mock
import requests
import app.cm_transfer_credit as transfer
def _make_response(status_code=200, json_data=None, json_error=None, text=""):
"""Build a Mock that quacks like requests.Response for the paths
fetch_users actually calls: status_code, text, raise_for_status(),
json().
json_error: if set, response.json() raises this instead of returning
json_data. Use to simulate malformed JSON bodies.
"""
response = mock.Mock(spec=requests.Response)
response.status_code = status_code
response.text = text
if json_error is not None:
response.json.side_effect = json_error
else:
response.json.return_value = json_data
if 400 <= status_code < 600:
response.raise_for_status.side_effect = requests.HTTPError(
f"{status_code} Server Error", response=response
)
else:
response.raise_for_status.return_value = None
return response
class FetchUsersHappyPathTests(unittest.TestCase):
def setUp(self):
self.logger = mock.Mock(spec=logging.Logger)
@mock.patch("app.cm_transfer_credit.notifier")
@mock.patch("app.cm_transfer_credit.requests.get")
def test_returns_list_on_200_with_list_body(self, mock_get, mock_notifier):
rows = [
{"f_username": "13c1", "f_password": "p1", "t_username": "tA", "t_password": "pA"},
{"f_username": "13c2", "f_password": "p2", "t_username": "tB", "t_password": "pB"},
]
mock_get.return_value = _make_response(200, json_data=rows)
result = transfer.fetch_users(self.logger)
self.assertEqual(result, rows)
mock_notifier.notify_generic_error.assert_not_called()
self.logger.error.assert_not_called()
@mock.patch("app.cm_transfer_credit.notifier")
@mock.patch("app.cm_transfer_credit.requests.get")
def test_empty_list_is_not_treated_as_failure(self, mock_get, mock_notifier):
# An empty user table is a legitimate "no work" signal, not an
# error. Caller must see [] (truthy len()==0) not None.
mock_get.return_value = _make_response(200, json_data=[])
result = transfer.fetch_users(self.logger)
self.assertEqual(result, [])
self.assertIsNotNone(result)
mock_notifier.notify_generic_error.assert_not_called()
@mock.patch("app.cm_transfer_credit.notifier")
@mock.patch("app.cm_transfer_credit.requests.get")
def test_hits_batch_endpoint_with_30s_timeout(self, mock_get, mock_notifier):
# Pins two contracts the bot relies on:
# 1. URL ends in /user/batch (NOT /user/ — that returns
# {"rows": [...]} and would re-trigger the original bug).
# 2. A finite timeout exists, so a hung DB can't wedge the
# worker for the full sleep window.
mock_get.return_value = _make_response(200, json_data=[])
transfer.fetch_users(self.logger)
mock_get.assert_called_once()
url = mock_get.call_args.args[0]
self.assertTrue(url.endswith("/user/batch"), f"expected /user/batch, got {url!r}")
self.assertEqual(mock_get.call_args.kwargs.get("timeout"), 30)
class FetchUsersErrorPathTests(unittest.TestCase):
"""Every error mode must: return None, log via local_logger.error,
and call notifier.notify_generic_error exactly once."""
def setUp(self):
self.logger = mock.Mock(spec=logging.Logger)
def _assert_alerted_and_skipped(self, result, mock_notifier):
self.assertIsNone(result, "fetch_users must return None on failure")
self.assertEqual(self.logger.error.call_count, 1)
mock_notifier.notify_generic_error.assert_called_once()
@mock.patch("app.cm_transfer_credit.notifier")
@mock.patch("app.cm_transfer_credit.requests.get")
def test_http_500_alerts_and_returns_none(self, mock_get, mock_notifier):
mock_get.return_value = _make_response(500, text="Internal Server Error")
result = transfer.fetch_users(self.logger)
self._assert_alerted_and_skipped(result, mock_notifier)
@mock.patch("app.cm_transfer_credit.notifier")
@mock.patch("app.cm_transfer_credit.requests.get")
def test_connection_error_alerts_and_returns_none(self, mock_get, mock_notifier):
mock_get.side_effect = requests.ConnectionError("api-server unreachable")
result = transfer.fetch_users(self.logger)
self._assert_alerted_and_skipped(result, mock_notifier)
@mock.patch("app.cm_transfer_credit.notifier")
@mock.patch("app.cm_transfer_credit.requests.get")
def test_timeout_alerts_and_returns_none(self, mock_get, mock_notifier):
mock_get.side_effect = requests.Timeout("read timeout")
result = transfer.fetch_users(self.logger)
self._assert_alerted_and_skipped(result, mock_notifier)
@mock.patch("app.cm_transfer_credit.notifier")
@mock.patch("app.cm_transfer_credit.requests.get")
def test_malformed_json_alerts_and_returns_none(self, mock_get, mock_notifier):
mock_get.return_value = _make_response(
200, json_error=ValueError("Expecting value: line 1 column 1 (char 0)"),
text="<html>500 Bad Gateway</html>",
)
result = transfer.fetch_users(self.logger)
self._assert_alerted_and_skipped(result, mock_notifier)
@mock.patch("app.cm_transfer_credit.notifier")
@mock.patch("app.cm_transfer_credit.requests.get")
def test_envelope_dict_alerts_and_returns_none(self, mock_get, mock_notifier):
# Regression test for the actual bug: /user/ returns
# {"rows": [...], "total": N} which the old silent guard
# collapsed to "0 items". The new code must surface this loudly.
envelope = {"rows": [{"f_username": "13c1"}], "total": 1}
mock_get.return_value = _make_response(200, json_data=envelope)
result = transfer.fetch_users(self.logger)
self._assert_alerted_and_skipped(result, mock_notifier)
# The alert message should mention what was actually returned so
# an operator reading the Telegram alert can diagnose without
# opening logs.
alert_msg = mock_notifier.notify_generic_error.call_args.args[0]
self.assertIn("dict", alert_msg, f"alert should name the bad type; got: {alert_msg!r}")
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,154 @@
"""Tests for the /user/batch endpoint in app.cm_api.
This is the API side of the contract that fetch_users() consumes (see
test_transfer_fetch_users.py for the bot side). The two MUST stay in
lockstep: bot expects bare list, API must serve bare list. The previous
production bug happened because /user/ was silently changed to return
{"rows": [...], "total": N}; the bot's silent guard collapsed that to
"0 items". /user/batch is a dedicated batch endpoint that locks the
bare-list contract for back-end batch jobs, separate from the paginated
UI endpoint.
Tests use a small fixture (3 rows) production has thousands but the
contract is identical regardless of cardinality.
"""
import unittest
from unittest import mock
# Three rows is enough to exercise the contract — production has many
# more, but cardinality is irrelevant to what these tests verify.
SAMPLE_ROWS = [
{"f_username": "13c1", "f_password": "p1", "t_username": "tA",
"t_password": "pA", "last_update_time": "Mon, 04 May 2026 06:00:00 GMT"},
{"f_username": "13c2", "f_password": "p2", "t_username": "tB",
"t_password": "pB", "last_update_time": "Sun, 03 May 2026 06:00:00 GMT"},
{"f_username": "13c3", "f_password": "p3", "t_username": "tC",
"t_password": "pC", "last_update_time": "Sat, 02 May 2026 06:00:00 GMT"},
]
EXPECTED_USER_FIELDS = {"f_username", "f_password", "t_username",
"t_password", "last_update_time"}
def _make_client(query_result=None, query_side_effect=None, db_constructor_raises=None):
"""Build a Flask test client with the DB layer mocked.
Patches `app.cm_api.DB` (the imported symbol) and
`app.cm_api.verify_tables_once` (called by the before_request hook
on the first request). Returns (client, mock_db_instance) so tests
can assert on what queries were issued.
"""
db_patcher = mock.patch("app.cm_api.DB")
verify_patcher = mock.patch("app.cm_api.verify_tables_once")
mock_db_class = db_patcher.start()
verify_patcher.start()
if db_constructor_raises is not None:
mock_db_class.side_effect = db_constructor_raises
mock_db_instance = None
else:
mock_db_instance = mock.Mock()
if query_side_effect is not None:
mock_db_instance.query.side_effect = query_side_effect
else:
mock_db_instance.query.return_value = query_result if query_result is not None else []
mock_db_class.return_value = mock_db_instance
from app.cm_api import create_app
app = create_app()
app.testing = True
client = app.test_client()
return client, mock_db_instance, (db_patcher, verify_patcher)
class UserBatchEndpointTests(unittest.TestCase):
def tearDown(self):
# mock.patch.stopall() also unwinds patches started outside
# context managers above; safer than tracking each patcher.
mock.patch.stopall()
def test_returns_bare_list_not_envelope(self):
# THE contract test. The paginated /user/ returns
# {"rows": [...], "total": N} — that shape is what broke the
# transfer bot. /user/batch must return a bare list so the bot
# can len() it directly.
client, _, _ = _make_client(query_result=SAMPLE_ROWS)
response = client.get("/user/batch")
self.assertEqual(response.status_code, 200)
body = response.get_json()
self.assertIsInstance(
body, list,
f"contract violation: expected list, got {type(body).__name__}: {body!r}",
)
self.assertNotIsInstance(body, dict, "must NOT wrap in {'rows': ...}")
def test_returns_all_rows_with_expected_schema(self):
client, _, _ = _make_client(query_result=SAMPLE_ROWS)
response = client.get("/user/batch")
body = response.get_json()
self.assertEqual(len(body), 3)
for row in body:
self.assertEqual(set(row.keys()), EXPECTED_USER_FIELDS)
def test_empty_table_returns_empty_list_not_error(self):
# Empty user table is "no work to do", not a failure. Must be
# 200 + [] so the bot's empty-list branch fires (NOT the
# error-skip branch).
client, _, _ = _make_client(query_result=[])
response = client.get("/user/batch")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.get_json(), [])
def test_query_has_no_pagination_clauses(self):
# Defensive: if someone "helpfully" adds LIMIT/OFFSET to the
# batch query later, the bot would silently process only a
# subset and the rest would be missed for a week. Pin that
# the SELECT against the user table is unbounded.
client, mock_db, _ = _make_client(query_result=SAMPLE_ROWS)
client.get("/user/batch")
sql = mock_db.query.call_args.args[0]
self.assertIn("FROM user", sql)
self.assertNotIn("LIMIT", sql.upper())
self.assertNotIn("OFFSET", sql.upper())
def test_db_unavailable_returns_500(self):
# DB() constructor raising should not 200 with empty list — must
# surface as an error so the bot's error branch fires + alerts.
client, _, _ = _make_client(db_constructor_raises=Exception("conn refused"))
response = client.get("/user/batch")
self.assertEqual(response.status_code, 500)
def test_query_exception_returns_500(self):
client, _, _ = _make_client(query_side_effect=Exception("query failed"))
response = client.get("/user/batch")
self.assertEqual(response.status_code, 500)
def test_route_resolves_to_batch_handler_not_username_lookup(self):
# Werkzeug should pick /user/batch (static segment) over
# /user/<username> (variable segment). If route order/specificity
# ever regresses, the bot would silently get back an empty list
# from a username lookup for the literal string "batch".
with mock.patch("app.cm_api.DB"), mock.patch("app.cm_api.verify_tables_once"):
from app.cm_api import create_app
app = create_app()
urls = app.url_map.bind("localhost")
endpoint, _ = urls.match("/user/batch", method="GET")
self.assertEqual(endpoint, "get_user_batch")
if __name__ == "__main__":
unittest.main()