diff --git a/app/cm_api.py b/app/cm_api.py index 05c8dc4..b8f25f7 100644 --- a/app/cm_api.py +++ b/app/cm_api.py @@ -60,6 +60,11 @@ class CM_API: # User routes self.app.route('/user/', methods=['GET'])(self.get_user) self.app.route('/user/', methods=['GET'])(self.get_user) + # Batch endpoint for the transfer bot — returns ALL users as a + # bare list (no pagination envelope). Kept separate from the + # paginated UI endpoint so the two contracts can evolve + # independently. + self.app.route('/user/batch', methods=['GET'])(self.get_user_batch) # Update routes self.app.route('/update-acc-data', methods=['POST'])(self.update_acc_data) @@ -176,7 +181,26 @@ class CM_API: except Exception as error: return self._handle_error(error, "Not Found"), 404 - + + def get_user_batch(self): + is_available, db, error_response = self._check_database_available() + if not is_available: + return error_response + + try: + rows = db.query( + "SELECT f_username, f_password, t_username, t_password, last_update_time " + "FROM user " + "ORDER BY last_update_time DESC", + [], + ) + return jsonify(rows) + except Exception as error: + # _handle_error already returns (message, 500); wrapping it + # again in a (..., 500) tuple yields a tuple-as-body that + # Flask refuses to coerce into a Response. + return self._handle_error(error, "Error fetching user batch") + def update_acc_data(self): is_available, db, error_response = self._check_database_available() if not is_available: diff --git a/app/cm_transfer_credit.py b/app/cm_transfer_credit.py index ae72fd6..1e2df0e 100644 --- a/app/cm_transfer_credit.py +++ b/app/cm_transfer_credit.py @@ -1,8 +1,9 @@ -import logging, time, requests, json, os, threading +import logging, time, requests, os, threading from concurrent.futures import ThreadPoolExecutor from datetime import datetime from .cm_bot_hal import CM_BOT_HAL +from .telegram_notifier import TelegramNotifier # Suppress httpx logs logging.getLogger("httpx").setLevel(logging.WARNING) @@ -11,6 +12,8 @@ logging.getLogger("httpx").setLevel(logging.WARNING) api_url = os.getenv('API_BASE_URL', 'http://api-server:3000') max_threading = int(os.getenv('CM_TRANSFER_MAX_THREADS', '20')) +notifier = TelegramNotifier() + def transfer(data: dict, local_logger): bot = CM_BOT_HAL() thread_name = threading.current_thread().name @@ -21,6 +24,30 @@ def transfer(data: dict, local_logger): del bot time.sleep(5) + +def fetch_users(local_logger): + """Fetch the full user list from /user/batch. + + Returns the list on success, or None to signal "skip this cycle" + after surfacing the failure via log + Telegram alert. + """ + endpoint = f'{api_url}/user/batch' + try: + response = requests.get(endpoint, timeout=30) + response.raise_for_status() + items = response.json() + if not isinstance(items, list): + raise TypeError( + f"contract violation at {endpoint}: expected list, " + f"got {type(items).__name__}: {response.text[:200]}" + ) + return items + except (requests.RequestException, ValueError, TypeError) as exc: + msg = f"Transfer bot: failed to fetch users from {endpoint} — {exc}" + local_logger.error(msg) + notifier.notify_generic_error(msg) + return None + def main(): while True: weekday = int(datetime.now().strftime("%w")) @@ -45,14 +72,15 @@ def main(): ) local_logger.info("=" * 80) - response = requests.get(f'{api_url}/user') - items = json.loads(response.text) - total_items = len(items) if isinstance(items, list) else 0 - - if total_items == 0: + items = fetch_users(local_logger) + if items is None: + local_logger.info("Skipping cycle due to fetch error (alert sent).") + local_logger.info("=" * 80) + elif len(items) == 0: local_logger.info("No items to process.") local_logger.info("=" * 80) else: + total_items = len(items) local_logger.info(f"Processing {total_items} transfer items...") with ThreadPoolExecutor(max_workers=max_threading) as executor: list(executor.map(lambda item: transfer(item, local_logger), items)) diff --git a/tests/test_transfer_fetch_users.py b/tests/test_transfer_fetch_users.py new file mode 100644 index 0000000..dc4ccf4 --- /dev/null +++ b/tests/test_transfer_fetch_users.py @@ -0,0 +1,170 @@ +"""Tests for fetch_users() in app.cm_transfer_credit. + +fetch_users() is the bot's API-fetch boundary. It must: + - Return the list on a clean 200 with a JSON list body. + - Return [] on a 200 with empty list (caller distinguishes "no work" + from "fetch failed" — empty list is NOT a failure). + - Return None and send a Telegram alert on every error mode: + HTTP error, network error, malformed JSON, contract violation. + - Hit the dedicated /user/batch endpoint with a finite timeout. + +The previous implementation collapsed every error mode into "0 items" +silently — these tests pin the new loud-skip behavior so the silent +failure can't regress. +""" + +import logging +import unittest +from unittest import mock + +import requests + +import app.cm_transfer_credit as transfer + + +def _make_response(status_code=200, json_data=None, json_error=None, text=""): + """Build a Mock that quacks like requests.Response for the paths + fetch_users actually calls: status_code, text, raise_for_status(), + json(). + + json_error: if set, response.json() raises this instead of returning + json_data. Use to simulate malformed JSON bodies. + """ + response = mock.Mock(spec=requests.Response) + response.status_code = status_code + response.text = text + if json_error is not None: + response.json.side_effect = json_error + else: + response.json.return_value = json_data + if 400 <= status_code < 600: + response.raise_for_status.side_effect = requests.HTTPError( + f"{status_code} Server Error", response=response + ) + else: + response.raise_for_status.return_value = None + return response + + +class FetchUsersHappyPathTests(unittest.TestCase): + def setUp(self): + self.logger = mock.Mock(spec=logging.Logger) + + @mock.patch("app.cm_transfer_credit.notifier") + @mock.patch("app.cm_transfer_credit.requests.get") + def test_returns_list_on_200_with_list_body(self, mock_get, mock_notifier): + rows = [ + {"f_username": "13c1", "f_password": "p1", "t_username": "tA", "t_password": "pA"}, + {"f_username": "13c2", "f_password": "p2", "t_username": "tB", "t_password": "pB"}, + ] + mock_get.return_value = _make_response(200, json_data=rows) + + result = transfer.fetch_users(self.logger) + + self.assertEqual(result, rows) + mock_notifier.notify_generic_error.assert_not_called() + self.logger.error.assert_not_called() + + @mock.patch("app.cm_transfer_credit.notifier") + @mock.patch("app.cm_transfer_credit.requests.get") + def test_empty_list_is_not_treated_as_failure(self, mock_get, mock_notifier): + # An empty user table is a legitimate "no work" signal, not an + # error. Caller must see [] (truthy len()==0) not None. + mock_get.return_value = _make_response(200, json_data=[]) + + result = transfer.fetch_users(self.logger) + + self.assertEqual(result, []) + self.assertIsNotNone(result) + mock_notifier.notify_generic_error.assert_not_called() + + @mock.patch("app.cm_transfer_credit.notifier") + @mock.patch("app.cm_transfer_credit.requests.get") + def test_hits_batch_endpoint_with_30s_timeout(self, mock_get, mock_notifier): + # Pins two contracts the bot relies on: + # 1. URL ends in /user/batch (NOT /user/ — that returns + # {"rows": [...]} and would re-trigger the original bug). + # 2. A finite timeout exists, so a hung DB can't wedge the + # worker for the full sleep window. + mock_get.return_value = _make_response(200, json_data=[]) + + transfer.fetch_users(self.logger) + + mock_get.assert_called_once() + url = mock_get.call_args.args[0] + self.assertTrue(url.endswith("/user/batch"), f"expected /user/batch, got {url!r}") + self.assertEqual(mock_get.call_args.kwargs.get("timeout"), 30) + + +class FetchUsersErrorPathTests(unittest.TestCase): + """Every error mode must: return None, log via local_logger.error, + and call notifier.notify_generic_error exactly once.""" + + def setUp(self): + self.logger = mock.Mock(spec=logging.Logger) + + def _assert_alerted_and_skipped(self, result, mock_notifier): + self.assertIsNone(result, "fetch_users must return None on failure") + self.assertEqual(self.logger.error.call_count, 1) + mock_notifier.notify_generic_error.assert_called_once() + + @mock.patch("app.cm_transfer_credit.notifier") + @mock.patch("app.cm_transfer_credit.requests.get") + def test_http_500_alerts_and_returns_none(self, mock_get, mock_notifier): + mock_get.return_value = _make_response(500, text="Internal Server Error") + + result = transfer.fetch_users(self.logger) + + self._assert_alerted_and_skipped(result, mock_notifier) + + @mock.patch("app.cm_transfer_credit.notifier") + @mock.patch("app.cm_transfer_credit.requests.get") + def test_connection_error_alerts_and_returns_none(self, mock_get, mock_notifier): + mock_get.side_effect = requests.ConnectionError("api-server unreachable") + + result = transfer.fetch_users(self.logger) + + self._assert_alerted_and_skipped(result, mock_notifier) + + @mock.patch("app.cm_transfer_credit.notifier") + @mock.patch("app.cm_transfer_credit.requests.get") + def test_timeout_alerts_and_returns_none(self, mock_get, mock_notifier): + mock_get.side_effect = requests.Timeout("read timeout") + + result = transfer.fetch_users(self.logger) + + self._assert_alerted_and_skipped(result, mock_notifier) + + @mock.patch("app.cm_transfer_credit.notifier") + @mock.patch("app.cm_transfer_credit.requests.get") + def test_malformed_json_alerts_and_returns_none(self, mock_get, mock_notifier): + mock_get.return_value = _make_response( + 200, json_error=ValueError("Expecting value: line 1 column 1 (char 0)"), + text="500 Bad Gateway", + ) + + result = transfer.fetch_users(self.logger) + + self._assert_alerted_and_skipped(result, mock_notifier) + + @mock.patch("app.cm_transfer_credit.notifier") + @mock.patch("app.cm_transfer_credit.requests.get") + def test_envelope_dict_alerts_and_returns_none(self, mock_get, mock_notifier): + # Regression test for the actual bug: /user/ returns + # {"rows": [...], "total": N} which the old silent guard + # collapsed to "0 items". The new code must surface this loudly. + envelope = {"rows": [{"f_username": "13c1"}], "total": 1} + mock_get.return_value = _make_response(200, json_data=envelope) + + result = transfer.fetch_users(self.logger) + + self._assert_alerted_and_skipped(result, mock_notifier) + # The alert message should mention what was actually returned so + # an operator reading the Telegram alert can diagnose without + # opening logs. + alert_msg = mock_notifier.notify_generic_error.call_args.args[0] + self.assertIn("dict", alert_msg, f"alert should name the bad type; got: {alert_msg!r}") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_user_batch_endpoint.py b/tests/test_user_batch_endpoint.py new file mode 100644 index 0000000..3ffa943 --- /dev/null +++ b/tests/test_user_batch_endpoint.py @@ -0,0 +1,154 @@ +"""Tests for the /user/batch endpoint in app.cm_api. + +This is the API side of the contract that fetch_users() consumes (see +test_transfer_fetch_users.py for the bot side). The two MUST stay in +lockstep: bot expects bare list, API must serve bare list. The previous +production bug happened because /user/ was silently changed to return +{"rows": [...], "total": N}; the bot's silent guard collapsed that to +"0 items". /user/batch is a dedicated batch endpoint that locks the +bare-list contract for back-end batch jobs, separate from the paginated +UI endpoint. + +Tests use a small fixture (3 rows) — production has thousands but the +contract is identical regardless of cardinality. +""" + +import unittest +from unittest import mock + + +# Three rows is enough to exercise the contract — production has many +# more, but cardinality is irrelevant to what these tests verify. +SAMPLE_ROWS = [ + {"f_username": "13c1", "f_password": "p1", "t_username": "tA", + "t_password": "pA", "last_update_time": "Mon, 04 May 2026 06:00:00 GMT"}, + {"f_username": "13c2", "f_password": "p2", "t_username": "tB", + "t_password": "pB", "last_update_time": "Sun, 03 May 2026 06:00:00 GMT"}, + {"f_username": "13c3", "f_password": "p3", "t_username": "tC", + "t_password": "pC", "last_update_time": "Sat, 02 May 2026 06:00:00 GMT"}, +] + +EXPECTED_USER_FIELDS = {"f_username", "f_password", "t_username", + "t_password", "last_update_time"} + + +def _make_client(query_result=None, query_side_effect=None, db_constructor_raises=None): + """Build a Flask test client with the DB layer mocked. + + Patches `app.cm_api.DB` (the imported symbol) and + `app.cm_api.verify_tables_once` (called by the before_request hook + on the first request). Returns (client, mock_db_instance) so tests + can assert on what queries were issued. + """ + db_patcher = mock.patch("app.cm_api.DB") + verify_patcher = mock.patch("app.cm_api.verify_tables_once") + mock_db_class = db_patcher.start() + verify_patcher.start() + + if db_constructor_raises is not None: + mock_db_class.side_effect = db_constructor_raises + mock_db_instance = None + else: + mock_db_instance = mock.Mock() + if query_side_effect is not None: + mock_db_instance.query.side_effect = query_side_effect + else: + mock_db_instance.query.return_value = query_result if query_result is not None else [] + mock_db_class.return_value = mock_db_instance + + from app.cm_api import create_app + app = create_app() + app.testing = True + client = app.test_client() + return client, mock_db_instance, (db_patcher, verify_patcher) + + +class UserBatchEndpointTests(unittest.TestCase): + def tearDown(self): + # mock.patch.stopall() also unwinds patches started outside + # context managers above; safer than tracking each patcher. + mock.patch.stopall() + + def test_returns_bare_list_not_envelope(self): + # THE contract test. The paginated /user/ returns + # {"rows": [...], "total": N} — that shape is what broke the + # transfer bot. /user/batch must return a bare list so the bot + # can len() it directly. + client, _, _ = _make_client(query_result=SAMPLE_ROWS) + + response = client.get("/user/batch") + + self.assertEqual(response.status_code, 200) + body = response.get_json() + self.assertIsInstance( + body, list, + f"contract violation: expected list, got {type(body).__name__}: {body!r}", + ) + self.assertNotIsInstance(body, dict, "must NOT wrap in {'rows': ...}") + + def test_returns_all_rows_with_expected_schema(self): + client, _, _ = _make_client(query_result=SAMPLE_ROWS) + + response = client.get("/user/batch") + body = response.get_json() + + self.assertEqual(len(body), 3) + for row in body: + self.assertEqual(set(row.keys()), EXPECTED_USER_FIELDS) + + def test_empty_table_returns_empty_list_not_error(self): + # Empty user table is "no work to do", not a failure. Must be + # 200 + [] so the bot's empty-list branch fires (NOT the + # error-skip branch). + client, _, _ = _make_client(query_result=[]) + + response = client.get("/user/batch") + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.get_json(), []) + + def test_query_has_no_pagination_clauses(self): + # Defensive: if someone "helpfully" adds LIMIT/OFFSET to the + # batch query later, the bot would silently process only a + # subset and the rest would be missed for a week. Pin that + # the SELECT against the user table is unbounded. + client, mock_db, _ = _make_client(query_result=SAMPLE_ROWS) + + client.get("/user/batch") + + sql = mock_db.query.call_args.args[0] + self.assertIn("FROM user", sql) + self.assertNotIn("LIMIT", sql.upper()) + self.assertNotIn("OFFSET", sql.upper()) + + def test_db_unavailable_returns_500(self): + # DB() constructor raising should not 200 with empty list — must + # surface as an error so the bot's error branch fires + alerts. + client, _, _ = _make_client(db_constructor_raises=Exception("conn refused")) + + response = client.get("/user/batch") + + self.assertEqual(response.status_code, 500) + + def test_query_exception_returns_500(self): + client, _, _ = _make_client(query_side_effect=Exception("query failed")) + + response = client.get("/user/batch") + + self.assertEqual(response.status_code, 500) + + def test_route_resolves_to_batch_handler_not_username_lookup(self): + # Werkzeug should pick /user/batch (static segment) over + # /user/ (variable segment). If route order/specificity + # ever regresses, the bot would silently get back an empty list + # from a username lookup for the literal string "batch". + with mock.patch("app.cm_api.DB"), mock.patch("app.cm_api.verify_tables_once"): + from app.cm_api import create_app + app = create_app() + urls = app.url_map.bind("localhost") + endpoint, _ = urls.match("/user/batch", method="GET") + self.assertEqual(endpoint, "get_user_batch") + + +if __name__ == "__main__": + unittest.main()