cm_bot_v2/app/cm_api.py
yiekheng 6bb85222d1 perf(web): server-side pagination + infinite-scroll for accounts/users
For 3k+ row deployments, returning the full table in one shot is the
bottleneck — the JSON payload alone is hundreds of KB and the client
mounts thousands of EditableCell instances on every visit. Pagination
with auto-fetch on scroll shrinks both the wire payload and the initial
render to a single page (200 rows).

Server (app/cm_api.py):
- /acc/ and /user/ accept ?limit, ?offset, ?prefix, ?dir, plus ?sort on
  /user/ (f_username | last_update_time). Defaults: limit=200 (capped at
  1000), offset=0, dir=desc.
- ORDER BY done in SQL with prefix-priority: rows whose username starts
  with the configured CM_PREFIX_PATTERN come first, then asc/desc by the
  sort column. The 'dir' value is whitelisted to ASC|DESC before string
  interpolation; everything else goes through parameterised binding.
- Schema verification (verify_tables_once) deferred to first request via
  a Flask before_request hook — keeps create_app() free of MySQL touches
  so unit tests + gunicorn preload still work without a live DB.

Web client:
- web/lib/api.ts: getAccountsPage / getUsersPage return { rows, hasMore }.
  hasMore = (rows.length === PAGE_SIZE), so the client knows when to
  stop fetching. Each page is its own Next.js cache entry (the URL is
  the cache key) — caching from the previous commit still applies.
- web/app/actions.ts: loadMoreAccounts / loadMoreUsers Server Actions
  for next-page requests; refreshAccounts / refreshUsers force-evict the
  cache via revalidateTag before refetching page 1.
- web/app/page.tsx + users/page.tsx: only fetch the first page now.
- web/components/{accounts,users}-table.tsx: rewrote state model. Rows
  accumulate as the user scrolls. An IntersectionObserver on a sentinel
  div near the bottom triggers loadMore when it enters the viewport
  (300px rootMargin so the next page starts loading before the user
  reaches the end). useOptimistic wraps the accumulated rows for in-
  flight edits; on success the row is committed locally so the change
  survives even though we no longer router.refresh.
- Sort toggle now refetches from page 1 with the new dir/sort param.
  Local sort over a partial set would be inconsistent.
- Mutations: delete filters from local state; create + refresh both
  reset to page 1 so the row appears in its sorted position.
- Header count shows '<loaded>+' when more pages exist so the operator
  knows what they're seeing isn't the full table.

Removed AutoRefresh:
- web/app/layout.tsx no longer mounts AutoRefresh.
- web/components/auto-refresh.tsx deleted.
- Reason: router.refresh every 30s would yank the user back to page 1
  every time, losing scroll position and accumulated rows. Manual
  Refresh button replaces it (now wired to refreshAccounts/refreshUsers
  which evict cache + refetch).

Tests: deferred verify_tables_once() means tests.test_bot_cli's
CreateAppFactoryTests pass without DB env vars again. All 38 existing
tests pass.
2026-05-03 11:29:34 +08:00

376 lines
14 KiB
Python

import os
import threading
from flask import Flask, jsonify, request
from .db import DB, verify_tables_once
def _debug_enabled() -> bool:
"""Return True iff CM_DEBUG env var is set to a truthy value.
Truthy: '1', 'true', 'yes' (case-insensitive, whitespace-trimmed).
Anything else, including unset, is False. Default-off so the
Werkzeug debugger is never reachable in production containers.
"""
return os.getenv("CM_DEBUG", "false").strip().lower() in ("1", "true", "yes")
class CM_API:
def __init__(self):
self.app = Flask(__name__)
# No CORS middleware: api-server is internal-only (no host port
# in prod compose, per C5). Browsers can't reach it directly,
# and server-side fetches from the web service don't trigger
# CORS. Removing flask_cors removes a permissive '*' origin
# default that becomes an attack surface if a host port is ever
# accidentally re-exposed.
self._register_routes()
# Schema verification is deferred to the first request so that
# constructing the WSGI app (e.g., in tests, or via gunicorn's
# preload phase before MySQL is reachable) doesn't require the
# DB to be up. The first request hits this hook, validates the
# schema, and flips the latch — subsequent requests skip it.
self._schema_verified = False
self.app.before_request(self._verify_schema_once)
def _verify_schema_once(self):
if self._schema_verified:
return
verify_tables_once()
self._schema_verified = True
def _get_database_connection(self):
"""Return a DB handle backed by the shared connection pool.
DB() is now a near-zero-cost handle (it just touches the cached
process-wide pool); each query()/execute() rents a connection
and returns it. There's nothing to clean up explicitly.
"""
try:
return DB()
except Exception as e:
print(f"Database connection failed: {e}")
return None
def _register_routes(self):
# Account routes
self.app.route('/acc/<username>', methods=['GET'])(self.get_account)
self.app.route('/acc/', methods=['GET'])(self.get_account)
# User routes
self.app.route('/user/<username>', methods=['GET'])(self.get_user)
self.app.route('/user/', methods=['GET'])(self.get_user)
# Update routes
self.app.route('/update-acc-data', methods=['POST'])(self.update_acc_data)
self.app.route('/update-user-data', methods=['POST'])(self.update_user_data)
# Delete routes
self.app.route('/delete-acc-data', methods=['POST'])(self.delete_acc_data)
self.app.route('/delete-user-data', methods=['POST'])(self.delete_user_data)
# Create routes (manual operator input)
self.app.route('/create-acc-data', methods=['POST'])(self.create_acc_data)
self.app.route('/create-user-data', methods=['POST'])(self.create_user_data)
def _check_database_available(self):
db = self._get_database_connection()
if db is None:
return False, None, ("Database not available", 500)
return True, db, None
def _handle_error(self, error, message="An error occurred"):
print(f"Error: {error}")
return message, 500
def get_account(self, username=None):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
if username:
query = "SELECT username, password, status, link FROM acc WHERE username = %s"
results = db.query(query, [username])
return jsonify(results)
# Listing path — pagination + prefix-priority sort.
try:
limit = max(1, min(int(request.args.get('limit', 200)), 1000))
offset = max(0, int(request.args.get('offset', 0)))
except (TypeError, ValueError):
return jsonify({"error": "limit and offset must be integers"}), 400
prefix = (request.args.get('prefix') or '').strip()
# Whitelist direction so it's safe to interpolate into the
# ORDER BY clause (parameterised binding doesn't apply to
# column names or sort directions).
direction = 'ASC' if request.args.get('dir', 'desc').lower() == 'asc' else 'DESC'
if prefix:
query = (
"SELECT username, password, status, link FROM acc "
"ORDER BY (CASE WHEN username LIKE %s THEN 0 ELSE 1 END), "
f"username {direction} "
"LIMIT %s OFFSET %s"
)
params = [f"{prefix}%", limit, offset]
else:
query = (
"SELECT username, password, status, link FROM acc "
f"ORDER BY username {direction} "
"LIMIT %s OFFSET %s"
)
params = [limit, offset]
return jsonify(db.query(query, params))
except Exception as error:
return self._handle_error(error, "Not Found"), 404
def get_user(self, username=None):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
if username:
query = "SELECT f_username, f_password, t_username, t_password FROM user WHERE f_username = %s"
results = db.query(query, [username])
return jsonify(results)
try:
limit = max(1, min(int(request.args.get('limit', 200)), 1000))
offset = max(0, int(request.args.get('offset', 0)))
except (TypeError, ValueError):
return jsonify({"error": "limit and offset must be integers"}), 400
prefix = (request.args.get('prefix') or '').strip()
sort_arg = request.args.get('sort', 'last_update_time')
sort_col = 'f_username' if sort_arg == 'f_username' else 'last_update_time'
direction = 'ASC' if request.args.get('dir', 'desc').lower() == 'asc' else 'DESC'
if prefix:
query = (
"SELECT f_username, f_password, t_username, t_password, last_update_time "
"FROM user "
"ORDER BY (CASE WHEN f_username LIKE %s THEN 0 ELSE 1 END), "
f"{sort_col} {direction} "
"LIMIT %s OFFSET %s"
)
params = [f"{prefix}%", limit, offset]
else:
query = (
"SELECT f_username, f_password, t_username, t_password, last_update_time "
"FROM user "
f"ORDER BY {sort_col} {direction} "
"LIMIT %s OFFSET %s"
)
params = [limit, offset]
return jsonify(db.query(query, params))
except Exception as error:
return self._handle_error(error, "Not Found"), 404
def update_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
status = data.get('status')
link = data.get('link')
if not username:
return jsonify({"error": "Username is required"})
result = db.execute(
"UPDATE acc SET password = %s, status = %s, link = %s WHERE username = %s",
[password, status, link, username]
)
if result:
return jsonify("Data updated successfully")
else:
return jsonify("Error updating data")
except Exception as error:
return self._handle_error(error, "Error updating data"), 500
def update_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json()
f_username = data.get('f_username')
f_password = data.get('f_password')
t_username = data.get('t_username')
t_password = data.get('t_password')
if not f_username:
return jsonify({"error": "f_username is required"})
result = db.execute(
"UPDATE user SET f_password = %s, t_password = %s, t_username = %s, last_update_time = CURRENT_TIMESTAMP WHERE f_username = %s",
[f_password, t_password, t_username, f_username]
)
if result:
return jsonify("Data updated successfully")
else:
return jsonify("Error updating data")
except Exception as error:
return self._handle_error(error, "Error updating data")
def delete_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
username = data.get('username')
if not username:
return jsonify({"error": "Username is required"}), 400
result = db.execute(
"DELETE FROM acc WHERE username = %s",
[username]
)
if result:
return jsonify({"deleted": username})
return jsonify({"error": "Failed to delete account"}), 500
except Exception as error:
return self._handle_error(error, "Error deleting account"), 500
def delete_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
f_username = data.get('f_username')
if not f_username:
return jsonify({"error": "f_username is required"}), 400
result = db.execute(
"DELETE FROM user WHERE f_username = %s",
[f_username]
)
if result:
return jsonify({"deleted": f_username})
return jsonify({"error": "Failed to delete user"}), 500
except Exception as error:
return self._handle_error(error, "Error deleting user"), 500
def create_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
username = (data.get('username') or '').strip()
password = data.get('password') or ''
status = data.get('status') or ''
link = data.get('link') or ''
if not username or not password:
return jsonify({"error": "Username and password are required"}), 400
result = db.execute(
"INSERT INTO acc (username, password, status, link) VALUES (%s, %s, %s, %s)",
[username, password, status, link]
)
if result:
return jsonify({"created": username})
return jsonify({"error": "Failed to create account"}), 500
except Exception as error:
return self._handle_error(error, "Error creating account"), 500
def create_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
f_username = (data.get('f_username') or '').strip()
f_password = data.get('f_password') or ''
t_username = (data.get('t_username') or '').strip()
t_password = data.get('t_password') or ''
if not f_username or not f_password or not t_username or not t_password:
return jsonify({"error": "All fields are required"}), 400
result = db.execute(
"INSERT INTO user (f_username, f_password, t_username, t_password) VALUES (%s, %s, %s, %s)",
[f_username, f_password, t_username, t_password]
)
if result:
return jsonify({"created": f_username})
return jsonify({"error": "Failed to create user"}), 500
except Exception as error:
return self._handle_error(error, "Error creating user"), 500
def run(self, port=3000, debug=None):
if debug is None:
debug = _debug_enabled()
try:
verify_tables_once()
except Exception as e:
print(f"Cannot start server: {e}")
exit(1)
print(f'CM Bot DB API Listening at Port : {port}')
self.app.run(host='0.0.0.0', port=port, debug=debug)
def run_in_thread(self, port=3000, debug=False):
"""Run the Flask app in a separate thread"""
try:
verify_tables_once()
except Exception as e:
print(f"Cannot start server: {e}")
return None
def run_app():
print(f'CM Bot DB API Listening at Port : {port}')
self.app.run(host='0.0.0.0', port=port, debug=debug, use_reloader=False)
thread = threading.Thread(target=run_app, daemon=True)
thread.start()
return thread
def create_app():
"""WSGI factory used by gunicorn (`app.cm_api:create_app()`).
Returns the Flask app object. Schema verification runs lazily on the
first request (see CM_API._verify_schema_once) so the factory itself
never touches MySQL — keeps gunicorn's preload phase unaffected by a
momentarily-unavailable DB and lets unit tests construct the app
without DB env wiring.
"""
return CM_API().app
if __name__ == '__main__':
api = CM_API()
api.run(port = 3000)