From a42fdf54b09bf0050ad9c6c13a56dcb314b54797 Mon Sep 17 00:00:00 2001 From: yiekheng Date: Sun, 3 May 2026 10:54:11 +0800 Subject: [PATCH] perf(api): pool MySQL connections + drop per-request schema check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two wins, one root cause: every API request was opening TWO fresh MySQL connections plus four wasted round-trips before the real query. Old per-request shape (GET /acc/): 1. DB() constructor → open conn, SHOW TABLES LIKE 'acc', SHOW TABLES LIKE 'user', close 2. db.query() → open conn, run SELECT, close That's ~4 round-trips for ~10 ms of useful work. With the dashboard's 30 s auto-refresh and two open tabs (accounts + users), the api-server churned through ~10 fresh MySQL connections every minute even when nothing changed. Changes: - app/db.py: introduce a process-wide MySQLConnectionPool (size 8 by default, override with DB_POOL_SIZE). DB() now just touches the cached pool — no schema check, no fresh handshake. query()/execute() rent a connection from the pool and return it via conn.close(). - app/db.py: extract the schema check into verify_tables_once() — runs once at WSGI boot inside create_app() so a misconfigured DB still fails fast at startup. - app/cm_api.py: _close_database_connection() removed; the finally blocks that wrapped every route are gone too. Pool reclamation lives inside DB now. - app/cm_api.py: create_app() and run() invoke verify_tables_once() once at startup instead of CM_API.__init__ doing nothing useful. Net: ~4× round-trip reduction per request, no MySQL handshake on the hot path. With two gunicorn workers × pool_size 8 = 16 max in-flight connections, well under MySQL's default max_connections=151. (The user asked about 'batching the queries' — but the queries already return the full row set in one shot. The bottleneck was connection churn, not query shape. If row count grows past the comfortable single- fetch range later, swap to LIMIT/OFFSET pagination at the API + table component layer.) --- app/cm_api.py | 75 +++++++------------- app/db.py | 187 ++++++++++++++++++++++++++------------------------ 2 files changed, 124 insertions(+), 138 deletions(-) diff --git a/app/cm_api.py b/app/cm_api.py index e0ff4f1..f4a28c3 100644 --- a/app/cm_api.py +++ b/app/cm_api.py @@ -1,7 +1,7 @@ import os import threading from flask import Flask, jsonify, request -from .db import DB +from .db import DB, verify_tables_once def _debug_enabled() -> bool: @@ -27,26 +27,18 @@ class CM_API: self._register_routes() def _get_database_connection(self): - """Create a new database connection for use""" + """Return a DB handle backed by the shared connection pool. + + DB() is now a near-zero-cost handle (it just touches the cached + process-wide pool); each query()/execute() rents a connection + and returns it. There's nothing to clean up explicitly. + """ try: - db = DB() - return db + return DB() except Exception as e: print(f"Database connection failed: {e}") return None - def _close_database_connection(self, db): - """Close database connection if it exists""" - if db is not None: - try: - # Assuming DB class has a close method or similar cleanup - if hasattr(db, 'close'): - db.close() - elif hasattr(db, 'connection') and hasattr(db.connection, 'close'): - db.connection.close() - except Exception as e: - print(f"Error closing database connection: {e}") - def _register_routes(self): # Account routes self.app.route('/acc/', methods=['GET'])(self.get_account) @@ -96,8 +88,6 @@ class CM_API: except Exception as error: return self._handle_error(error, "Not Found"), 404 - finally: - self._close_database_connection(db) def get_user(self, username=None): is_available, db, error_response = self._check_database_available() @@ -117,8 +107,6 @@ class CM_API: except Exception as error: return self._handle_error(error, "Not Found"), 404 - finally: - self._close_database_connection(db) def update_acc_data(self): is_available, db, error_response = self._check_database_available() @@ -147,8 +135,6 @@ class CM_API: except Exception as error: return self._handle_error(error, "Error updating data"), 500 - finally: - self._close_database_connection(db) def update_user_data(self): is_available, db, error_response = self._check_database_available() @@ -177,8 +163,6 @@ class CM_API: except Exception as error: return self._handle_error(error, "Error updating data") - finally: - self._close_database_connection(db) def delete_acc_data(self): is_available, db, error_response = self._check_database_available() @@ -203,8 +187,6 @@ class CM_API: except Exception as error: return self._handle_error(error, "Error deleting account"), 500 - finally: - self._close_database_connection(db) def delete_user_data(self): is_available, db, error_response = self._check_database_available() @@ -229,8 +211,6 @@ class CM_API: except Exception as error: return self._handle_error(error, "Error deleting user"), 500 - finally: - self._close_database_connection(db) def create_acc_data(self): is_available, db, error_response = self._check_database_available() @@ -258,8 +238,6 @@ class CM_API: except Exception as error: return self._handle_error(error, "Error creating account"), 500 - finally: - self._close_database_connection(db) def create_user_data(self): is_available, db, error_response = self._check_database_available() @@ -287,35 +265,31 @@ class CM_API: except Exception as error: return self._handle_error(error, "Error creating user"), 500 - finally: - self._close_database_connection(db) def run(self, port=3000, debug=None): if debug is None: debug = _debug_enabled() - # Test database connection before starting server - test_db = self._get_database_connection() - if test_db is None: - print("Cannot start server: Database not available") + try: + verify_tables_once() + except Exception as e: + print(f"Cannot start server: {e}") exit(1) - self._close_database_connection(test_db) print(f'CM Bot DB API Listening at Port : {port}') self.app.run(host='0.0.0.0', port=port, debug=debug) - + def run_in_thread(self, port=3000, debug=False): """Run the Flask app in a separate thread""" - # Test database connection before starting server - test_db = self._get_database_connection() - if test_db is None: - print("Cannot start server: Database not available") + try: + verify_tables_once() + except Exception as e: + print(f"Cannot start server: {e}") return None - self._close_database_connection(test_db) - + def run_app(): print(f'CM Bot DB API Listening at Port : {port}') self.app.run(host='0.0.0.0', port=port, debug=debug, use_reloader=False) - + thread = threading.Thread(target=run_app, daemon=True) thread.start() return thread @@ -324,12 +298,13 @@ class CM_API: def create_app(): """WSGI factory used by gunicorn (`app.cm_api:create_app()`). - Returns the Flask app object so gunicorn can serve it. The - surrounding CM_API class still owns route registration and DB - connection management — this just hands gunicorn the underlying - Flask instance. + Returns the Flask app object so gunicorn can serve it. Validates the + schema once at boot (so a misconfigured DB fails fast) — request-time + handlers don't repeat the check. """ - return CM_API().app + app = CM_API().app + verify_tables_once() + return app if __name__ == '__main__': diff --git a/app/db.py b/app/db.py index 2f08b3c..8798f9f 100644 --- a/app/db.py +++ b/app/db.py @@ -1,8 +1,9 @@ import os +import threading import time import mysql.connector -from mysql.connector import Error +from mysql.connector import Error, pooling def _get_required_env(name: str) -> str: @@ -12,112 +13,122 @@ def _get_required_env(name: str) -> str: return value -class DB: - def __init__(self): - self.config = { - 'host': _get_required_env('DB_HOST'), - 'user': _get_required_env('DB_USER'), - 'password': _get_required_env('DB_PASSWORD'), - 'database': _get_required_env('DB_NAME'), - 'port': int(_get_required_env('DB_PORT')), - 'connection_timeout': int(_get_required_env('DB_CONNECTION_TIMEOUT')) - } - self.connect_retries = max(1, int(_get_required_env('DB_CONNECT_RETRIES'))) - self.connect_retry_delay = float(_get_required_env('DB_CONNECT_RETRY_DELAY')) - self.init_database() - - def get_connection(self): - """Get MySQL database connection.""" - for attempt in range(1, self.connect_retries + 1): +# Process-wide MySQL connection pool. Gunicorn forks workers; each worker +# gets its own pool (the global is rebuilt per process at first use). +_pool: "pooling.MySQLConnectionPool | None" = None +_pool_lock = threading.Lock() + + +def _build_pool() -> "pooling.MySQLConnectionPool": + config = { + "host": _get_required_env("DB_HOST"), + "user": _get_required_env("DB_USER"), + "password": _get_required_env("DB_PASSWORD"), + "database": _get_required_env("DB_NAME"), + "port": int(_get_required_env("DB_PORT")), + "connection_timeout": int(_get_required_env("DB_CONNECTION_TIMEOUT")), + "pool_name": "cm_pool", + "pool_size": int(os.getenv("DB_POOL_SIZE", "8")), + "pool_reset_session": True, + } + return pooling.MySQLConnectionPool(**config) + + +def _get_pool() -> "pooling.MySQLConnectionPool": + """Lazily build the per-process pool with retry, then memoize.""" + global _pool + if _pool is not None: + return _pool + with _pool_lock: + if _pool is not None: + return _pool + retries = max(1, int(_get_required_env("DB_CONNECT_RETRIES"))) + delay = float(_get_required_env("DB_CONNECT_RETRY_DELAY")) + last_err: "Exception | None" = None + for attempt in range(1, retries + 1): try: - connection = mysql.connector.connect(**self.config) - return connection + _pool = _build_pool() + return _pool except Error as e: - print(f"Error connecting to MySQL: {e}") - if attempt < self.connect_retries: + last_err = e + if attempt < retries: print( - f"Retrying MySQL connection ({attempt}/{self.connect_retries}) " - f"in {self.connect_retry_delay} seconds..." + f"MySQL pool init failed ({e}); " + f"retry {attempt}/{retries} in {delay}s..." ) - time.sleep(self.connect_retry_delay) - return None - - def init_database(self): - """Initialize the database connection.""" - connection = self.get_connection() - if connection is None: - raise Exception("Failed to connect to database") - cursor = None + time.sleep(delay) + raise RuntimeError( + f"Failed to build MySQL pool after {retries} attempts: {last_err}" + ) + + +def verify_tables_once() -> None: + """Run once at app startup to confirm schema is present. + + Previously the DB() constructor ran two SHOW TABLES LIKE queries on + every request — wasted round-trips. Now the check happens once when + create_app() boots the WSGI app; subsequent requests just rent a + connection from the pool. + """ + conn = _get_pool().get_connection() + try: + cursor = conn.cursor() try: - cursor = connection.cursor() - # Test connection by checking if required tables exist cursor.execute("SHOW TABLES LIKE 'acc'") if not cursor.fetchone(): raise Exception("Table 'acc' does not exist") - cursor.execute("SHOW TABLES LIKE 'user'") if not cursor.fetchone(): raise Exception("Table 'user' does not exist") - - # print("Database connection verified - required tables exist") - - except Error as e: - print(f"Error verifying database: {e}") - raise Exception(f"Database verification failed: {e}") finally: - if cursor is not None: - cursor.close() - if connection.is_connected(): - connection.close() - - def query(self, query, params=None): - """Execute a query and return results.""" - connection = self.get_connection() - if connection is None: - return [] - cursor = None + cursor.close() + finally: + conn.close() + + +class DB: + """Thin handle backed by the process-wide MySQL pool. + + Constructing DB() is now ~free — it just touches the (cached) pool. + Each query()/execute() rents a connection from the pool and returns + it on completion via conn.close() (which the pool intercepts and + releases the connection back instead of actually closing it). + + Pool size caps in-flight queries per worker; tune with DB_POOL_SIZE + (default 8). Two gunicorn workers × pool_size 8 = 16 max + connections — comfortably under MySQL's default max_connections. + """ + + def __init__(self): + _get_pool() + + def query(self, sql, params=None): + conn = _get_pool().get_connection() try: - cursor = connection.cursor(dictionary=True) - - if params: - cursor.execute(query, params) - else: - cursor.execute(query) - - results = cursor.fetchall() - return results - + cursor = conn.cursor(dictionary=True) + try: + cursor.execute(sql, params or ()) + return cursor.fetchall() + finally: + cursor.close() except Error as e: print(f"Error executing query: {e}") return [] finally: - if cursor is not None: - cursor.close() - if connection.is_connected(): - connection.close() - - def execute(self, query, params=None): - """Execute a query that modifies data (INSERT, UPDATE, DELETE) and return success status.""" - connection = self.get_connection() - if connection is None: - return False - cursor = None + conn.close() + + def execute(self, sql, params=None): + conn = _get_pool().get_connection() try: - cursor = connection.cursor() - - if params: - cursor.execute(query, params) - else: - cursor.execute(query) - - connection.commit() - return True - + cursor = conn.cursor() + try: + cursor.execute(sql, params or ()) + conn.commit() + return True + finally: + cursor.close() except Error as e: print(f"Error executing query: {e}") return False finally: - if cursor is not None: - cursor.close() - if connection.is_connected(): - connection.close() + conn.close()