import os import time import mysql.connector from mysql.connector import Error def _get_required_env(name: str) -> str: value = os.getenv(name) if value is None or value == "": raise RuntimeError(f"Missing required environment variable: {name}") return value class DB: def __init__(self): self.config = { 'host': _get_required_env('DB_HOST'), 'user': _get_required_env('DB_USER'), 'password': _get_required_env('DB_PASSWORD'), 'database': _get_required_env('DB_NAME'), 'port': int(_get_required_env('DB_PORT')), 'connection_timeout': int(_get_required_env('DB_CONNECTION_TIMEOUT')) } self.connect_retries = max(1, int(_get_required_env('DB_CONNECT_RETRIES'))) self.connect_retry_delay = float(_get_required_env('DB_CONNECT_RETRY_DELAY')) self.init_database() def get_connection(self): """Get MySQL database connection.""" for attempt in range(1, self.connect_retries + 1): try: connection = mysql.connector.connect(**self.config) return connection except Error as e: print(f"Error connecting to MySQL: {e}") if attempt < self.connect_retries: print( f"Retrying MySQL connection ({attempt}/{self.connect_retries}) " f"in {self.connect_retry_delay} seconds..." ) time.sleep(self.connect_retry_delay) return None def init_database(self): """Initialize the database connection.""" connection = self.get_connection() if connection is None: raise Exception("Failed to connect to database") cursor = None try: cursor = connection.cursor() # Test connection by checking if required tables exist cursor.execute("SHOW TABLES LIKE 'acc'") if not cursor.fetchone(): raise Exception("Table 'acc' does not exist") cursor.execute("SHOW TABLES LIKE 'user'") if not cursor.fetchone(): raise Exception("Table 'user' does not exist") # print("Database connection verified - required tables exist") except Error as e: print(f"Error verifying database: {e}") raise Exception(f"Database verification failed: {e}") finally: if cursor is not None: cursor.close() if connection.is_connected(): connection.close() def query(self, query, params=None): """Execute a query and return results.""" connection = self.get_connection() if connection is None: return [] cursor = None try: cursor = connection.cursor(dictionary=True) if params: cursor.execute(query, params) else: cursor.execute(query) results = cursor.fetchall() return results except Error as e: print(f"Error executing query: {e}") return [] finally: if cursor is not None: cursor.close() if connection.is_connected(): connection.close() def execute(self, query, params=None): """Execute a query that modifies data (INSERT, UPDATE, DELETE) and return success status.""" connection = self.get_connection() if connection is None: return False cursor = None try: cursor = connection.cursor() if params: cursor.execute(query, params) else: cursor.execute(query) connection.commit() return True except Error as e: print(f"Error executing query: {e}") return False finally: if cursor is not None: cursor.close() if connection.is_connected(): connection.close()