The previous default of 8 was a regression risk: cm_transfer_credit.py uses ThreadPoolExecutor with CM_TRANSFER_MAX_THREADS (default 20 in prod compose), so up to 20 threads concurrently call self.db.query(). With pool_size=8, the 9th-20th threads would hit PoolError, which gets caught by 'except Error' and silently returns []/False — making transfers fail with no obvious cause. Default bumped to 24 (covers the 20-thread default with 4 in reserve). mysql.connector caps pool_size at 32; clamping with a clear log line so a future operator who pushes CM_TRANSFER_MAX_THREADS too high gets a readable message instead of a library traceback. Operator note: if you raise CM_TRANSFER_MAX_THREADS, also raise DB_POOL_SIZE to at least the same value (max 32). At 32 threads with 4 services × 32 = 128 conns total, still well under MySQL's default max_connections=151.
148 lines
5.1 KiB
Python
148 lines
5.1 KiB
Python
import os
|
||
import threading
|
||
import time
|
||
|
||
import mysql.connector
|
||
from mysql.connector import Error, pooling
|
||
|
||
|
||
def _get_required_env(name: str) -> str:
|
||
value = os.getenv(name)
|
||
if value is None or value == "":
|
||
raise RuntimeError(f"Missing required environment variable: {name}")
|
||
return value
|
||
|
||
|
||
# Process-wide MySQL connection pool. Gunicorn forks workers; each worker
|
||
# gets its own pool (the global is rebuilt per process at first use).
|
||
_pool: "pooling.MySQLConnectionPool | None" = None
|
||
_pool_lock = threading.Lock()
|
||
|
||
|
||
def _build_pool() -> "pooling.MySQLConnectionPool":
|
||
# pool_size default of 24 covers transfer-bot at full tilt:
|
||
# CM_TRANSFER_MAX_THREADS defaults to 20, each thread can hold one
|
||
# connection for the duration of a transfer step. A smaller pool
|
||
# would surface as PoolError (caught silently by query/execute) and
|
||
# transfers would fail without obvious cause. mysql.connector caps
|
||
# pool_size at 32; if you bump CM_TRANSFER_MAX_THREADS, set
|
||
# DB_POOL_SIZE to at least the same value, capped at 32.
|
||
pool_size = int(os.getenv("DB_POOL_SIZE", "24"))
|
||
if pool_size > 32:
|
||
# Hard cap from mysql.connector; clamp here so the misconfigured
|
||
# value gets a clean message instead of a cryptic library error.
|
||
print(f"DB_POOL_SIZE={pool_size} exceeds mysql.connector max (32); clamping to 32")
|
||
pool_size = 32
|
||
config = {
|
||
"host": _get_required_env("DB_HOST"),
|
||
"user": _get_required_env("DB_USER"),
|
||
"password": _get_required_env("DB_PASSWORD"),
|
||
"database": _get_required_env("DB_NAME"),
|
||
"port": int(_get_required_env("DB_PORT")),
|
||
"connection_timeout": int(_get_required_env("DB_CONNECTION_TIMEOUT")),
|
||
"pool_name": "cm_pool",
|
||
"pool_size": pool_size,
|
||
"pool_reset_session": True,
|
||
}
|
||
return pooling.MySQLConnectionPool(**config)
|
||
|
||
|
||
def _get_pool() -> "pooling.MySQLConnectionPool":
|
||
"""Lazily build the per-process pool with retry, then memoize."""
|
||
global _pool
|
||
if _pool is not None:
|
||
return _pool
|
||
with _pool_lock:
|
||
if _pool is not None:
|
||
return _pool
|
||
retries = max(1, int(_get_required_env("DB_CONNECT_RETRIES")))
|
||
delay = float(_get_required_env("DB_CONNECT_RETRY_DELAY"))
|
||
last_err: "Exception | None" = None
|
||
for attempt in range(1, retries + 1):
|
||
try:
|
||
_pool = _build_pool()
|
||
return _pool
|
||
except Error as e:
|
||
last_err = e
|
||
if attempt < retries:
|
||
print(
|
||
f"MySQL pool init failed ({e}); "
|
||
f"retry {attempt}/{retries} in {delay}s..."
|
||
)
|
||
time.sleep(delay)
|
||
raise RuntimeError(
|
||
f"Failed to build MySQL pool after {retries} attempts: {last_err}"
|
||
)
|
||
|
||
|
||
def verify_tables_once() -> None:
|
||
"""Run once at app startup to confirm schema is present.
|
||
|
||
Previously the DB() constructor ran two SHOW TABLES LIKE queries on
|
||
every request — wasted round-trips. Now the check happens once when
|
||
create_app() boots the WSGI app; subsequent requests just rent a
|
||
connection from the pool.
|
||
"""
|
||
conn = _get_pool().get_connection()
|
||
try:
|
||
cursor = conn.cursor()
|
||
try:
|
||
cursor.execute("SHOW TABLES LIKE 'acc'")
|
||
if not cursor.fetchone():
|
||
raise Exception("Table 'acc' does not exist")
|
||
cursor.execute("SHOW TABLES LIKE 'user'")
|
||
if not cursor.fetchone():
|
||
raise Exception("Table 'user' does not exist")
|
||
finally:
|
||
cursor.close()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
class DB:
|
||
"""Thin handle backed by the process-wide MySQL pool.
|
||
|
||
Constructing DB() is now ~free — it just touches the (cached) pool.
|
||
Each query()/execute() rents a connection from the pool and returns
|
||
it on completion via conn.close() (which the pool intercepts and
|
||
releases the connection back instead of actually closing it).
|
||
|
||
Pool size caps in-flight queries per worker; tune with DB_POOL_SIZE
|
||
(default 8). Two gunicorn workers × pool_size 8 = 16 max
|
||
connections — comfortably under MySQL's default max_connections.
|
||
"""
|
||
|
||
def __init__(self):
|
||
_get_pool()
|
||
|
||
def query(self, sql, params=None):
|
||
conn = _get_pool().get_connection()
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
try:
|
||
cursor.execute(sql, params or ())
|
||
return cursor.fetchall()
|
||
finally:
|
||
cursor.close()
|
||
except Error as e:
|
||
print(f"Error executing query: {e}")
|
||
return []
|
||
finally:
|
||
conn.close()
|
||
|
||
def execute(self, sql, params=None):
|
||
conn = _get_pool().get_connection()
|
||
try:
|
||
cursor = conn.cursor()
|
||
try:
|
||
cursor.execute(sql, params or ())
|
||
conn.commit()
|
||
return True
|
||
finally:
|
||
cursor.close()
|
||
except Error as e:
|
||
print(f"Error executing query: {e}")
|
||
return False
|
||
finally:
|
||
conn.close()
|