cm_bot_v2/app/cm_api.py
yiekheng 9eed051916 feat(web,api): row-level search, more sort columns, copy buttons
Three operator-quality-of-life features behind the same caching and
pagination contract that the existing tables already use:

- Search bar on /acc and /users (Find/Enter to apply, Clear to reset).
  Backed by a new `q` API param that filters via WHERE
  username/f_username LIKE 'q%' on both rows + count queries so the
  table header total stays consistent under a filter.
- Two more sortable columns: acc.status and user.t_username. Sort
  columns are whitelisted because sort_col is f-string'd into ORDER
  BY (parameterised binding doesn't apply to column names) — anything
  outside the allowed set falls back to the table's default.
- Copy button on every row that writes a multi-line credentials
  message to the clipboard. New lib/clipboard.ts helper tries
  navigator.clipboard.writeText() first and falls back to
  textarea+execCommand("copy") so it works over the internal-network
  HTTP deploy where the modern API is gated by secure-context rules.
  Acc message: Username/Password (+Link if set). User message:
  From/To username and password.

Also: inactive sort indicators now render ↓ (the direction they'll
sort on first click) instead of the more ambiguous ↕.

Test suite grows from 53 to 70: tests/test_user_search_filter.py
(9 tests) pins the q-filter contract on both /user/ and /acc/;
tests/test_sort_whitelist.py (8 tests) pins the allowed sort columns
and proves out-of-set values cannot reach the SQL parser.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 09:26:11 +08:00

429 lines
17 KiB
Python

import os
import threading
from flask import Flask, jsonify, request
from .db import DB, verify_tables_once
def _debug_enabled() -> bool:
"""Return True iff CM_DEBUG env var is set to a truthy value.
Truthy: '1', 'true', 'yes' (case-insensitive, whitespace-trimmed).
Anything else, including unset, is False. Default-off so the
Werkzeug debugger is never reachable in production containers.
"""
return os.getenv("CM_DEBUG", "false").strip().lower() in ("1", "true", "yes")
class CM_API:
def __init__(self):
self.app = Flask(__name__)
# No CORS middleware: api-server is internal-only (no host port
# in prod compose, per C5). Browsers can't reach it directly,
# and server-side fetches from the web service don't trigger
# CORS. Removing flask_cors removes a permissive '*' origin
# default that becomes an attack surface if a host port is ever
# accidentally re-exposed.
self._register_routes()
# Schema verification is deferred to the first request so that
# constructing the WSGI app (e.g., in tests, or via gunicorn's
# preload phase before MySQL is reachable) doesn't require the
# DB to be up. The first request hits this hook, validates the
# schema, and flips the latch — subsequent requests skip it.
self._schema_verified = False
self.app.before_request(self._verify_schema_once)
def _verify_schema_once(self):
if self._schema_verified:
return
verify_tables_once()
self._schema_verified = True
def _get_database_connection(self):
"""Return a DB handle backed by the shared connection pool.
DB() is now a near-zero-cost handle (it just touches the cached
process-wide pool); each query()/execute() rents a connection
and returns it. There's nothing to clean up explicitly.
"""
try:
return DB()
except Exception as e:
print(f"Database connection failed: {e}")
return None
def _register_routes(self):
# Account routes
self.app.route('/acc/<username>', methods=['GET'])(self.get_account)
self.app.route('/acc/', methods=['GET'])(self.get_account)
# User routes
self.app.route('/user/<username>', methods=['GET'])(self.get_user)
self.app.route('/user/', methods=['GET'])(self.get_user)
# Batch endpoint for the transfer bot — returns ALL users as a
# bare list (no pagination envelope). Kept separate from the
# paginated UI endpoint so the two contracts can evolve
# independently.
self.app.route('/user/batch', methods=['GET'])(self.get_user_batch)
# Update routes
self.app.route('/update-acc-data', methods=['POST'])(self.update_acc_data)
self.app.route('/update-user-data', methods=['POST'])(self.update_user_data)
# Delete routes
self.app.route('/delete-acc-data', methods=['POST'])(self.delete_acc_data)
self.app.route('/delete-user-data', methods=['POST'])(self.delete_user_data)
# Create routes (manual operator input)
self.app.route('/create-acc-data', methods=['POST'])(self.create_acc_data)
self.app.route('/create-user-data', methods=['POST'])(self.create_user_data)
def _check_database_available(self):
db = self._get_database_connection()
if db is None:
return False, None, ("Database not available", 500)
return True, db, None
def _handle_error(self, error, message="An error occurred"):
print(f"Error: {error}")
return message, 500
def get_account(self, username=None):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
if username:
query = "SELECT username, password, status, link FROM acc WHERE username = %s"
results = db.query(query, [username])
return jsonify(results)
# Listing path — pagination + prefix-priority sort.
try:
limit = max(1, min(int(request.args.get('limit', 200)), 1000))
offset = max(0, int(request.args.get('offset', 0)))
except (TypeError, ValueError):
return jsonify({"error": "limit and offset must be integers"}), 400
prefix = (request.args.get('prefix') or '').strip()
# `q` is the user-facing search filter (WHERE), distinct from
# `prefix` which is the deployment tenant boost (ORDER BY).
q = (request.args.get('q') or '').strip()
sort_arg = request.args.get('sort', 'username')
# Whitelist the sort column (see /user/ for rationale).
sort_col = sort_arg if sort_arg in ('username', 'status') else 'username'
# Whitelist direction so it's safe to interpolate into the
# ORDER BY clause (parameterised binding doesn't apply to
# column names or sort directions).
direction = 'ASC' if request.args.get('dir', 'desc').lower() == 'asc' else 'DESC'
where_sql = "WHERE username LIKE %s" if q else ""
where_params = [f"{q}%"] if q else []
if prefix:
query = (
"SELECT username, password, status, link FROM acc "
f"{where_sql} "
"ORDER BY (CASE WHEN username LIKE %s THEN 0 ELSE 1 END), "
f"{sort_col} {direction} "
"LIMIT %s OFFSET %s"
)
params = [*where_params, f"{prefix}%", limit, offset]
else:
query = (
"SELECT username, password, status, link FROM acc "
f"{where_sql} "
f"ORDER BY {sort_col} {direction} "
"LIMIT %s OFFSET %s"
)
params = [*where_params, limit, offset]
rows = db.query(query, params)
count_rows = db.query(f"SELECT COUNT(*) AS c FROM acc {where_sql}", where_params)
total = int(count_rows[0]["c"]) if count_rows else 0
return jsonify({"rows": rows, "total": total})
except Exception as error:
return self._handle_error(error, "Not Found"), 404
def get_user(self, username=None):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
if username:
query = "SELECT f_username, f_password, t_username, t_password FROM user WHERE f_username = %s"
results = db.query(query, [username])
return jsonify(results)
try:
limit = max(1, min(int(request.args.get('limit', 200)), 1000))
offset = max(0, int(request.args.get('offset', 0)))
except (TypeError, ValueError):
return jsonify({"error": "limit and offset must be integers"}), 400
prefix = (request.args.get('prefix') or '').strip()
# `q` is the user-facing search filter (WHERE), distinct from
# `prefix` which is the deployment tenant boost (ORDER BY).
# Both can be active at once — search refines what the
# tenant prefix already promotes.
q = (request.args.get('q') or '').strip()
sort_arg = request.args.get('sort', 'last_update_time')
# Whitelist the sort column — `sort_col` is f-string'd into
# the ORDER BY clause, which can't be parameterised. Anything
# outside the allowed set falls back to the default so a
# malformed query never hits the SQL parser unsanitised.
sort_col = sort_arg if sort_arg in ('f_username', 't_username', 'last_update_time') else 'last_update_time'
direction = 'ASC' if request.args.get('dir', 'desc').lower() == 'asc' else 'DESC'
where_sql = "WHERE f_username LIKE %s" if q else ""
where_params = [f"{q}%"] if q else []
if prefix:
query = (
"SELECT f_username, f_password, t_username, t_password, last_update_time "
f"FROM user {where_sql} "
"ORDER BY (CASE WHEN f_username LIKE %s THEN 0 ELSE 1 END), "
f"{sort_col} {direction} "
"LIMIT %s OFFSET %s"
)
params = [*where_params, f"{prefix}%", limit, offset]
else:
query = (
"SELECT f_username, f_password, t_username, t_password, last_update_time "
f"FROM user {where_sql} "
f"ORDER BY {sort_col} {direction} "
"LIMIT %s OFFSET %s"
)
params = [*where_params, limit, offset]
rows = db.query(query, params)
count_rows = db.query(f"SELECT COUNT(*) AS c FROM user {where_sql}", where_params)
total = int(count_rows[0]["c"]) if count_rows else 0
return jsonify({"rows": rows, "total": total})
except Exception as error:
return self._handle_error(error, "Not Found"), 404
def get_user_batch(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
rows = db.query(
"SELECT f_username, f_password, t_username, t_password, last_update_time "
"FROM user "
"ORDER BY last_update_time DESC",
[],
)
return jsonify(rows)
except Exception as error:
# _handle_error already returns (message, 500); wrapping it
# again in a (..., 500) tuple yields a tuple-as-body that
# Flask refuses to coerce into a Response.
return self._handle_error(error, "Error fetching user batch")
def update_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
status = data.get('status')
link = data.get('link')
if not username:
return jsonify({"error": "Username is required"})
result = db.execute(
"UPDATE acc SET password = %s, status = %s, link = %s WHERE username = %s",
[password, status, link, username]
)
if result:
return jsonify("Data updated successfully")
else:
return jsonify("Error updating data")
except Exception as error:
return self._handle_error(error, "Error updating data"), 500
def update_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json()
f_username = data.get('f_username')
f_password = data.get('f_password')
t_username = data.get('t_username')
t_password = data.get('t_password')
if not f_username:
return jsonify({"error": "f_username is required"})
result = db.execute(
"UPDATE user SET f_password = %s, t_password = %s, t_username = %s, last_update_time = CURRENT_TIMESTAMP WHERE f_username = %s",
[f_password, t_password, t_username, f_username]
)
if result:
return jsonify("Data updated successfully")
else:
return jsonify("Error updating data")
except Exception as error:
return self._handle_error(error, "Error updating data")
def delete_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
username = data.get('username')
if not username:
return jsonify({"error": "Username is required"}), 400
result = db.execute(
"DELETE FROM acc WHERE username = %s",
[username]
)
if result:
return jsonify({"deleted": username})
return jsonify({"error": "Failed to delete account"}), 500
except Exception as error:
return self._handle_error(error, "Error deleting account"), 500
def delete_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
f_username = data.get('f_username')
if not f_username:
return jsonify({"error": "f_username is required"}), 400
result = db.execute(
"DELETE FROM user WHERE f_username = %s",
[f_username]
)
if result:
return jsonify({"deleted": f_username})
return jsonify({"error": "Failed to delete user"}), 500
except Exception as error:
return self._handle_error(error, "Error deleting user"), 500
def create_acc_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
username = (data.get('username') or '').strip()
password = data.get('password') or ''
status = data.get('status') or ''
link = data.get('link') or ''
if not username or not password:
return jsonify({"error": "Username and password are required"}), 400
result = db.execute(
"INSERT INTO acc (username, password, status, link) VALUES (%s, %s, %s, %s)",
[username, password, status, link]
)
if result:
return jsonify({"created": username})
return jsonify({"error": "Failed to create account"}), 500
except Exception as error:
return self._handle_error(error, "Error creating account"), 500
def create_user_data(self):
is_available, db, error_response = self._check_database_available()
if not is_available:
return error_response
try:
data = request.get_json() or {}
f_username = (data.get('f_username') or '').strip()
f_password = data.get('f_password') or ''
t_username = (data.get('t_username') or '').strip()
t_password = data.get('t_password') or ''
if not f_username or not f_password or not t_username or not t_password:
return jsonify({"error": "All fields are required"}), 400
result = db.execute(
"INSERT INTO user (f_username, f_password, t_username, t_password) VALUES (%s, %s, %s, %s)",
[f_username, f_password, t_username, t_password]
)
if result:
return jsonify({"created": f_username})
return jsonify({"error": "Failed to create user"}), 500
except Exception as error:
return self._handle_error(error, "Error creating user"), 500
def run(self, port=3000, debug=None):
if debug is None:
debug = _debug_enabled()
try:
verify_tables_once()
except Exception as e:
print(f"Cannot start server: {e}")
exit(1)
print(f'CM Bot DB API Listening at Port : {port}')
self.app.run(host='0.0.0.0', port=port, debug=debug)
def run_in_thread(self, port=3000, debug=False):
"""Run the Flask app in a separate thread"""
try:
verify_tables_once()
except Exception as e:
print(f"Cannot start server: {e}")
return None
def run_app():
print(f'CM Bot DB API Listening at Port : {port}')
self.app.run(host='0.0.0.0', port=port, debug=debug, use_reloader=False)
thread = threading.Thread(target=run_app, daemon=True)
thread.start()
return thread
def create_app():
"""WSGI factory used by gunicorn (`app.cm_api:create_app()`).
Returns the Flask app object. Schema verification runs lazily on the
first request (see CM_API._verify_schema_once) so the factory itself
never touches MySQL — keeps gunicorn's preload phase unaffected by a
momentarily-unavailable DB and lets unit tests construct the app
without DB env wiring.
"""
return CM_API().app
if __name__ == '__main__':
api = CM_API()
api.run(port = 3000)