cm_bot_v2/app/cm_api.py
yiekheng ee74ebda64 feat(web): show real DB total in table header (replaces '200+')
The header used to show '200+' once the user had loaded a partial set
of pages — opaque, useless for an operator who actually needs to know
'how many accounts are in the system right now'.

Server (app/cm_api.py):
- /acc/ and /user/ list responses now wrap the rows alongside a
  COUNT(*) of the table: { rows: [...], total: N }. The single-row
  /acc/<username> path is unchanged (still returns Acc[] with one row).
- Each list request issues both queries (the page SELECT and the COUNT)
  on the same pooled connection. COUNT(*) on a 3k-row table is sub-ms;
  even when the cache misses, total request latency stays well under
  20ms on warm-cache MySQL.

Web client:
- web/lib/api.ts: Page<T> gains a  field; getAccountsPage and
  getUsersPage parse the new wrapped response.
- web/app/page.tsx + users/page.tsx: pass page.total down as
  initialTotal.
- web/components/{accounts,users}-table.tsx: hold total in state, sync
  it from every page fetch (initial, loadMore, sort change, force
  refresh) so cm99 monitor inserts during the session bump it correctly.
  Delete decrements it by 1 immediately so the header doesn't lie
  between the optimistic delete and the next refresh.
- PageHead now shows '<total>' as the big number. When loaded < total,
  a small zinc-400 line below reads 'Showing X of N — keep scrolling
  to load more'. Once the user reaches the end, the line goes away.

No new round trips for the count: it piggybacks on the same /acc/?...
or /user/?... request that already fetches the page. The 30s cache
covers the count too — so tab switches still don't hit MySQL.
2026-05-03 11:38:38 +08:00

382 lines
14 KiB
Python

import os
import threading
from flask import Flask, jsonify, request
from .db import DB, verify_tables_once
def _debug_enabled() -> bool:
"""Return True iff CM_DEBUG env var is set to a truthy value.
Truthy: '1', 'true', 'yes' (case-insensitive, whitespace-trimmed).
Anything else, including unset, is False. Default-off so the
Werkzeug debugger is never reachable in production containers.
"""
return os.getenv("CM_DEBUG", "false").strip().lower() in ("1", "true", "yes")
class CM_API:
def __init__(self):
self.app = Flask(__name__)
# No CORS middleware: api-server is internal-only (no host port
# in prod compose, per C5). Browsers can't reach it directly,
# and server-side fetches from the web service don't trigger
# CORS. Removing flask_cors removes a permissive '*' origin
# default that becomes an attack surface if a host port is ever
# accidentally re-exposed.
self._register_routes()
# Schema verification is deferred to the first request so that
# constructing the WSGI app (e.g., in tests, or via gunicorn's
# preload phase before MySQL is reachable) doesn't require the
# DB to be up. The first request hits this hook, validates the
# schema, and flips the latch — subsequent requests skip it.
self._schema_verified = False
self.app.before_request(self._verify_schema_once)
def _verify_schema_once(self):
if self._schema_verified:
return
verify_tables_once()
self._schema_verified = True
def _get_database_connection(self):
"""Return a DB handle backed by the shared connection pool.
DB() is now a near-zero-cost handle (it just touches the cached
process-wide pool); each query()/execute() rents a connection
and returns it. There's nothing to clean up explicitly.
"""
try:
return DB()
except Exception as e:
print(f"Database connection failed: {e}")
return None
def _register_routes(self):
# Account routes
self.app.route('/acc/<username>', methods=['GET'])(self.get_account)
self.app.route('/acc/', methods=['GET'])(self.get_account)
# User routes
self.app.route('/user/<username>', methods=['GET'])(self.get_user)
self.app.route('/user/', methods=['GET'])(self.get_user)
# Update routes
self.app.route('/update-acc-data', methods=['POST'])(self.update_acc_data)
self.app.route('/update-user-data', methods=['POST'])(self.update_user_data)
# Delete routes
self.app.route('/delete-acc-data', methods=['POST'])(self.delete_acc_data)
self.app.route('/delete-user-data', methods=['POST'])(self.delete_user_data)
# Create routes (manual operator input)
self.app.route('/create-acc-data', methods=['POST'])(self.create_acc_data)
self.app.route('/create-user-data', methods=['POST'])(self.create_user_data)
def _check_database_available(self):
db = self._get_database_connection()
if db is None:
return False, None, ("Database not available", 500)
return True, db, None
def _handle_error(self, error, message="An error occurred"):
print(f"Error: {error}")
return message, 500
def get_account(self, username=None):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
if username:
query = "SELECT username, password, status, link FROM acc WHERE username = %s"
results = db.query(query, [username])
return jsonify(results)
# Listing path — pagination + prefix-priority sort.
try:
limit = max(1, min(int(request.args.get('limit', 200)), 1000))
offset = max(0, int(request.args.get('offset', 0)))
except (TypeError, ValueError):
return jsonify({"error": "limit and offset must be integers"}), 400
prefix = (request.args.get('prefix') or '').strip()
# Whitelist direction so it's safe to interpolate into the
# ORDER BY clause (parameterised binding doesn't apply to
# column names or sort directions).
direction = 'ASC' if request.args.get('dir', 'desc').lower() == 'asc' else 'DESC'
if prefix:
query = (
"SELECT username, password, status, link FROM acc "
"ORDER BY (CASE WHEN username LIKE %s THEN 0 ELSE 1 END), "
f"username {direction} "
"LIMIT %s OFFSET %s"
)
params = [f"{prefix}%", limit, offset]
else:
query = (
"SELECT username, password, status, link FROM acc "
f"ORDER BY username {direction} "
"LIMIT %s OFFSET %s"
)
params = [limit, offset]
rows = db.query(query, params)
count_rows = db.query("SELECT COUNT(*) AS c FROM acc", [])
total = int(count_rows[0]["c"]) if count_rows else 0
return jsonify({"rows": rows, "total": total})
except Exception as error:
return self._handle_error(error, "Not Found"), 404
def get_user(self, username=None):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
if username:
query = "SELECT f_username, f_password, t_username, t_password FROM user WHERE f_username = %s"
results = db.query(query, [username])
return jsonify(results)
try:
limit = max(1, min(int(request.args.get('limit', 200)), 1000))
offset = max(0, int(request.args.get('offset', 0)))
except (TypeError, ValueError):
return jsonify({"error": "limit and offset must be integers"}), 400
prefix = (request.args.get('prefix') or '').strip()
sort_arg = request.args.get('sort', 'last_update_time')
sort_col = 'f_username' if sort_arg == 'f_username' else 'last_update_time'
direction = 'ASC' if request.args.get('dir', 'desc').lower() == 'asc' else 'DESC'
if prefix:
query = (
"SELECT f_username, f_password, t_username, t_password, last_update_time "
"FROM user "
"ORDER BY (CASE WHEN f_username LIKE %s THEN 0 ELSE 1 END), "
f"{sort_col} {direction} "
"LIMIT %s OFFSET %s"
)
params = [f"{prefix}%", limit, offset]
else:
query = (
"SELECT f_username, f_password, t_username, t_password, last_update_time "
"FROM user "
f"ORDER BY {sort_col} {direction} "
"LIMIT %s OFFSET %s"
)
params = [limit, offset]
rows = db.query(query, params)
count_rows = db.query("SELECT COUNT(*) AS c FROM user", [])
total = int(count_rows[0]["c"]) if count_rows else 0
return jsonify({"rows": rows, "total": total})
except Exception as error:
return self._handle_error(error, "Not Found"), 404
def update_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
status = data.get('status')
link = data.get('link')
if not username:
return jsonify({"error": "Username is required"})
result = db.execute(
"UPDATE acc SET password = %s, status = %s, link = %s WHERE username = %s",
[password, status, link, username]
)
if result:
return jsonify("Data updated successfully")
else:
return jsonify("Error updating data")
except Exception as error:
return self._handle_error(error, "Error updating data"), 500
def update_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json()
f_username = data.get('f_username')
f_password = data.get('f_password')
t_username = data.get('t_username')
t_password = data.get('t_password')
if not f_username:
return jsonify({"error": "f_username is required"})
result = db.execute(
"UPDATE user SET f_password = %s, t_password = %s, t_username = %s, last_update_time = CURRENT_TIMESTAMP WHERE f_username = %s",
[f_password, t_password, t_username, f_username]
)
if result:
return jsonify("Data updated successfully")
else:
return jsonify("Error updating data")
except Exception as error:
return self._handle_error(error, "Error updating data")
def delete_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
username = data.get('username')
if not username:
return jsonify({"error": "Username is required"}), 400
result = db.execute(
"DELETE FROM acc WHERE username = %s",
[username]
)
if result:
return jsonify({"deleted": username})
return jsonify({"error": "Failed to delete account"}), 500
except Exception as error:
return self._handle_error(error, "Error deleting account"), 500
def delete_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
f_username = data.get('f_username')
if not f_username:
return jsonify({"error": "f_username is required"}), 400
result = db.execute(
"DELETE FROM user WHERE f_username = %s",
[f_username]
)
if result:
return jsonify({"deleted": f_username})
return jsonify({"error": "Failed to delete user"}), 500
except Exception as error:
return self._handle_error(error, "Error deleting user"), 500
def create_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
username = (data.get('username') or '').strip()
password = data.get('password') or ''
status = data.get('status') or ''
link = data.get('link') or ''
if not username or not password:
return jsonify({"error": "Username and password are required"}), 400
result = db.execute(
"INSERT INTO acc (username, password, status, link) VALUES (%s, %s, %s, %s)",
[username, password, status, link]
)
if result:
return jsonify({"created": username})
return jsonify({"error": "Failed to create account"}), 500
except Exception as error:
return self._handle_error(error, "Error creating account"), 500
def create_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
f_username = (data.get('f_username') or '').strip()
f_password = data.get('f_password') or ''
t_username = (data.get('t_username') or '').strip()
t_password = data.get('t_password') or ''
if not f_username or not f_password or not t_username or not t_password:
return jsonify({"error": "All fields are required"}), 400
result = db.execute(
"INSERT INTO user (f_username, f_password, t_username, t_password) VALUES (%s, %s, %s, %s)",
[f_username, f_password, t_username, t_password]
)
if result:
return jsonify({"created": f_username})
return jsonify({"error": "Failed to create user"}), 500
except Exception as error:
return self._handle_error(error, "Error creating user"), 500
def run(self, port=3000, debug=None):
if debug is None:
debug = _debug_enabled()
try:
verify_tables_once()
except Exception as e:
print(f"Cannot start server: {e}")
exit(1)
print(f'CM Bot DB API Listening at Port : {port}')
self.app.run(host='0.0.0.0', port=port, debug=debug)
def run_in_thread(self, port=3000, debug=False):
"""Run the Flask app in a separate thread"""
try:
verify_tables_once()
except Exception as e:
print(f"Cannot start server: {e}")
return None
def run_app():
print(f'CM Bot DB API Listening at Port : {port}')
self.app.run(host='0.0.0.0', port=port, debug=debug, use_reloader=False)
thread = threading.Thread(target=run_app, daemon=True)
thread.start()
return thread
def create_app():
"""WSGI factory used by gunicorn (`app.cm_api:create_app()`).
Returns the Flask app object. Schema verification runs lazily on the
first request (see CM_API._verify_schema_once) so the factory itself
never touches MySQL — keeps gunicorn's preload phase unaffected by a
momentarily-unavailable DB and lets unit tests construct the app
without DB env wiring.
"""
return CM_API().app
if __name__ == '__main__':
api = CM_API()
api.run(port = 3000)