perf(api): pool MySQL connections + drop per-request schema check

Two wins, one root cause: every API request was opening TWO fresh MySQL
connections plus four wasted round-trips before the real query.

Old per-request shape (GET /acc/):
  1. DB() constructor → open conn, SHOW TABLES LIKE 'acc',
     SHOW TABLES LIKE 'user', close
  2. db.query() → open conn, run SELECT, close

That's ~4 round-trips for ~10 ms of useful work. With the dashboard's
30 s auto-refresh and two open tabs (accounts + users), the api-server
churned through ~10 fresh MySQL connections every minute even when
nothing changed.

Changes:
- app/db.py: introduce a process-wide MySQLConnectionPool (size 8 by
  default, override with DB_POOL_SIZE). DB() now just touches the cached
  pool — no schema check, no fresh handshake. query()/execute() rent a
  connection from the pool and return it via conn.close().
- app/db.py: extract the schema check into verify_tables_once() — runs
  once at WSGI boot inside create_app() so a misconfigured DB still
  fails fast at startup.
- app/cm_api.py: _close_database_connection() removed; the finally
  blocks that wrapped every route are gone too. Pool reclamation lives
  inside DB now.
- app/cm_api.py: create_app() and run() invoke verify_tables_once()
  once at startup instead of CM_API.__init__ doing nothing useful.

Net: ~4× round-trip reduction per request, no MySQL handshake on the
hot path. With two gunicorn workers × pool_size 8 = 16 max in-flight
connections, well under MySQL's default max_connections=151.

(The user asked about 'batching the queries' — but the queries already
return the full row set in one shot. The bottleneck was connection
churn, not query shape. If row count grows past the comfortable single-
fetch range later, swap to LIMIT/OFFSET pagination at the API + table
component layer.)
This commit is contained in:
yiekheng 2026-05-03 10:54:11 +08:00
parent 2871e04693
commit a42fdf54b0
2 changed files with 124 additions and 138 deletions

View File

@ -1,7 +1,7 @@
import os
import threading
from flask import Flask, jsonify, request
from .db import DB
from .db import DB, verify_tables_once
def _debug_enabled() -> bool:
@ -27,26 +27,18 @@ class CM_API:
self._register_routes()
def _get_database_connection(self):
"""Create a new database connection for use"""
"""Return a DB handle backed by the shared connection pool.
DB() is now a near-zero-cost handle (it just touches the cached
process-wide pool); each query()/execute() rents a connection
and returns it. There's nothing to clean up explicitly.
"""
try:
db = DB()
return db
return DB()
except Exception as e:
print(f"Database connection failed: {e}")
return None
def _close_database_connection(self, db):
"""Close database connection if it exists"""
if db is not None:
try:
# Assuming DB class has a close method or similar cleanup
if hasattr(db, 'close'):
db.close()
elif hasattr(db, 'connection') and hasattr(db.connection, 'close'):
db.connection.close()
except Exception as e:
print(f"Error closing database connection: {e}")
def _register_routes(self):
# Account routes
self.app.route('/acc/<username>', methods=['GET'])(self.get_account)
@ -96,8 +88,6 @@ class CM_API:
except Exception as error:
return self._handle_error(error, "Not Found"), 404
finally:
self._close_database_connection(db)
def get_user(self, username=None):
is_available, db, error_response = self._check_database_available()
@ -117,8 +107,6 @@ class CM_API:
except Exception as error:
return self._handle_error(error, "Not Found"), 404
finally:
self._close_database_connection(db)
def update_acc_data(self):
is_available, db, error_response = self._check_database_available()
@ -147,8 +135,6 @@ class CM_API:
except Exception as error:
return self._handle_error(error, "Error updating data"), 500
finally:
self._close_database_connection(db)
def update_user_data(self):
is_available, db, error_response = self._check_database_available()
@ -177,8 +163,6 @@ class CM_API:
except Exception as error:
return self._handle_error(error, "Error updating data")
finally:
self._close_database_connection(db)
def delete_acc_data(self):
is_available, db, error_response = self._check_database_available()
@ -203,8 +187,6 @@ class CM_API:
except Exception as error:
return self._handle_error(error, "Error deleting account"), 500
finally:
self._close_database_connection(db)
def delete_user_data(self):
is_available, db, error_response = self._check_database_available()
@ -229,8 +211,6 @@ class CM_API:
except Exception as error:
return self._handle_error(error, "Error deleting user"), 500
finally:
self._close_database_connection(db)
def create_acc_data(self):
is_available, db, error_response = self._check_database_available()
@ -258,8 +238,6 @@ class CM_API:
except Exception as error:
return self._handle_error(error, "Error creating account"), 500
finally:
self._close_database_connection(db)
def create_user_data(self):
is_available, db, error_response = self._check_database_available()
@ -287,35 +265,31 @@ class CM_API:
except Exception as error:
return self._handle_error(error, "Error creating user"), 500
finally:
self._close_database_connection(db)
def run(self, port=3000, debug=None):
if debug is None:
debug = _debug_enabled()
# Test database connection before starting server
test_db = self._get_database_connection()
if test_db is None:
print("Cannot start server: Database not available")
try:
verify_tables_once()
except Exception as e:
print(f"Cannot start server: {e}")
exit(1)
self._close_database_connection(test_db)
print(f'CM Bot DB API Listening at Port : {port}')
self.app.run(host='0.0.0.0', port=port, debug=debug)
def run_in_thread(self, port=3000, debug=False):
"""Run the Flask app in a separate thread"""
# Test database connection before starting server
test_db = self._get_database_connection()
if test_db is None:
print("Cannot start server: Database not available")
try:
verify_tables_once()
except Exception as e:
print(f"Cannot start server: {e}")
return None
self._close_database_connection(test_db)
def run_app():
print(f'CM Bot DB API Listening at Port : {port}')
self.app.run(host='0.0.0.0', port=port, debug=debug, use_reloader=False)
thread = threading.Thread(target=run_app, daemon=True)
thread.start()
return thread
@ -324,12 +298,13 @@ class CM_API:
def create_app():
"""WSGI factory used by gunicorn (`app.cm_api:create_app()`).
Returns the Flask app object so gunicorn can serve it. The
surrounding CM_API class still owns route registration and DB
connection management this just hands gunicorn the underlying
Flask instance.
Returns the Flask app object so gunicorn can serve it. Validates the
schema once at boot (so a misconfigured DB fails fast) request-time
handlers don't repeat the check.
"""
return CM_API().app
app = CM_API().app
verify_tables_once()
return app
if __name__ == '__main__':

187
app/db.py
View File

@ -1,8 +1,9 @@
import os
import threading
import time
import mysql.connector
from mysql.connector import Error
from mysql.connector import Error, pooling
def _get_required_env(name: str) -> str:
@ -12,112 +13,122 @@ def _get_required_env(name: str) -> str:
return value
class DB:
def __init__(self):
self.config = {
'host': _get_required_env('DB_HOST'),
'user': _get_required_env('DB_USER'),
'password': _get_required_env('DB_PASSWORD'),
'database': _get_required_env('DB_NAME'),
'port': int(_get_required_env('DB_PORT')),
'connection_timeout': int(_get_required_env('DB_CONNECTION_TIMEOUT'))
}
self.connect_retries = max(1, int(_get_required_env('DB_CONNECT_RETRIES')))
self.connect_retry_delay = float(_get_required_env('DB_CONNECT_RETRY_DELAY'))
self.init_database()
def get_connection(self):
"""Get MySQL database connection."""
for attempt in range(1, self.connect_retries + 1):
# Process-wide MySQL connection pool. Gunicorn forks workers; each worker
# gets its own pool (the global is rebuilt per process at first use).
_pool: "pooling.MySQLConnectionPool | None" = None
_pool_lock = threading.Lock()
def _build_pool() -> "pooling.MySQLConnectionPool":
config = {
"host": _get_required_env("DB_HOST"),
"user": _get_required_env("DB_USER"),
"password": _get_required_env("DB_PASSWORD"),
"database": _get_required_env("DB_NAME"),
"port": int(_get_required_env("DB_PORT")),
"connection_timeout": int(_get_required_env("DB_CONNECTION_TIMEOUT")),
"pool_name": "cm_pool",
"pool_size": int(os.getenv("DB_POOL_SIZE", "8")),
"pool_reset_session": True,
}
return pooling.MySQLConnectionPool(**config)
def _get_pool() -> "pooling.MySQLConnectionPool":
"""Lazily build the per-process pool with retry, then memoize."""
global _pool
if _pool is not None:
return _pool
with _pool_lock:
if _pool is not None:
return _pool
retries = max(1, int(_get_required_env("DB_CONNECT_RETRIES")))
delay = float(_get_required_env("DB_CONNECT_RETRY_DELAY"))
last_err: "Exception | None" = None
for attempt in range(1, retries + 1):
try:
connection = mysql.connector.connect(**self.config)
return connection
_pool = _build_pool()
return _pool
except Error as e:
print(f"Error connecting to MySQL: {e}")
if attempt < self.connect_retries:
last_err = e
if attempt < retries:
print(
f"Retrying MySQL connection ({attempt}/{self.connect_retries}) "
f"in {self.connect_retry_delay} seconds..."
f"MySQL pool init failed ({e}); "
f"retry {attempt}/{retries} in {delay}s..."
)
time.sleep(self.connect_retry_delay)
return None
def init_database(self):
"""Initialize the database connection."""
connection = self.get_connection()
if connection is None:
raise Exception("Failed to connect to database")
cursor = None
time.sleep(delay)
raise RuntimeError(
f"Failed to build MySQL pool after {retries} attempts: {last_err}"
)
def verify_tables_once() -> None:
"""Run once at app startup to confirm schema is present.
Previously the DB() constructor ran two SHOW TABLES LIKE queries on
every request wasted round-trips. Now the check happens once when
create_app() boots the WSGI app; subsequent requests just rent a
connection from the pool.
"""
conn = _get_pool().get_connection()
try:
cursor = conn.cursor()
try:
cursor = connection.cursor()
# Test connection by checking if required tables exist
cursor.execute("SHOW TABLES LIKE 'acc'")
if not cursor.fetchone():
raise Exception("Table 'acc' does not exist")
cursor.execute("SHOW TABLES LIKE 'user'")
if not cursor.fetchone():
raise Exception("Table 'user' does not exist")
# print("Database connection verified - required tables exist")
except Error as e:
print(f"Error verifying database: {e}")
raise Exception(f"Database verification failed: {e}")
finally:
if cursor is not None:
cursor.close()
if connection.is_connected():
connection.close()
def query(self, query, params=None):
"""Execute a query and return results."""
connection = self.get_connection()
if connection is None:
return []
cursor = None
cursor.close()
finally:
conn.close()
class DB:
"""Thin handle backed by the process-wide MySQL pool.
Constructing DB() is now ~free it just touches the (cached) pool.
Each query()/execute() rents a connection from the pool and returns
it on completion via conn.close() (which the pool intercepts and
releases the connection back instead of actually closing it).
Pool size caps in-flight queries per worker; tune with DB_POOL_SIZE
(default 8). Two gunicorn workers × pool_size 8 = 16 max
connections comfortably under MySQL's default max_connections.
"""
def __init__(self):
_get_pool()
def query(self, sql, params=None):
conn = _get_pool().get_connection()
try:
cursor = connection.cursor(dictionary=True)
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
results = cursor.fetchall()
return results
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(sql, params or ())
return cursor.fetchall()
finally:
cursor.close()
except Error as e:
print(f"Error executing query: {e}")
return []
finally:
if cursor is not None:
cursor.close()
if connection.is_connected():
connection.close()
def execute(self, query, params=None):
"""Execute a query that modifies data (INSERT, UPDATE, DELETE) and return success status."""
connection = self.get_connection()
if connection is None:
return False
cursor = None
conn.close()
def execute(self, sql, params=None):
conn = _get_pool().get_connection()
try:
cursor = connection.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
connection.commit()
return True
cursor = conn.cursor()
try:
cursor.execute(sql, params or ())
conn.commit()
return True
finally:
cursor.close()
except Error as e:
print(f"Error executing query: {e}")
return False
finally:
if cursor is not None:
cursor.close()
if connection.is_connected():
connection.close()
conn.close()