import threading from flask import Flask, jsonify, request from flask_cors import CORS from .db import DB class CM_API: def __init__(self): self.app = Flask(__name__) CORS(self.app) self._register_routes() def _get_database_connection(self): """Create a new database connection for use""" try: db = DB() return db except Exception as e: print(f"Database connection failed: {e}") return None def _close_database_connection(self, db): """Close database connection if it exists""" if db is not None: try: # Assuming DB class has a close method or similar cleanup if hasattr(db, 'close'): db.close() elif hasattr(db, 'connection') and hasattr(db.connection, 'close'): db.connection.close() except Exception as e: print(f"Error closing database connection: {e}") def _register_routes(self): # Account routes self.app.route('/acc/', methods=['GET'])(self.get_account) self.app.route('/acc/', methods=['GET'])(self.get_account) # User routes self.app.route('/user/', methods=['GET'])(self.get_user) self.app.route('/user/', methods=['GET'])(self.get_user) # Update routes self.app.route('/update-acc-data', methods=['POST'])(self.update_acc_data) self.app.route('/update-user-data', methods=['POST'])(self.update_user_data) def _check_database_available(self): db = self._get_database_connection() if db is None: return False, None, ("Database not available", 500) return True, db, None def _handle_error(self, error, message="An error occurred"): print(f"Error: {error}") return message, 500 def get_account(self, username=None): is_available, db, error_response = self._check_database_available() if not is_available: return error_response try: if username: query = "SELECT username, password, status, link FROM acc WHERE username = %s" query_params = [username] else: query = "SELECT username, password, status, link FROM acc" query_params = [] results = db.query(query, query_params) return jsonify(results) except Exception as error: return self._handle_error(error, "Not Found"), 404 finally: self._close_database_connection(db) def get_user(self, username=None): is_available, db, error_response = self._check_database_available() if not is_available: return error_response try: if username: query = "SELECT f_username, f_password, t_username, t_password FROM user WHERE f_username = %s" query_params = [username] else: query = "SELECT f_username, f_password, t_username, t_password, last_update_time FROM user" query_params = [] results = db.query(query, query_params) return jsonify(results) except Exception as error: return self._handle_error(error, "Not Found"), 404 finally: self._close_database_connection(db) def update_acc_data(self): is_available, db, error_response = self._check_database_available() if not is_available: return error_response try: data = request.get_json() username = data.get('username') password = data.get('password') status = data.get('status') link = data.get('link') if not username: return jsonify({"error": "Username is required"}) result = db.execute( "UPDATE acc SET password = %s, status = %s, link = %s WHERE username = %s", [password, status, link, username] ) if result: return jsonify("Data updated successfully") else: return jsonify("Error updating data") except Exception as error: return self._handle_error(error, "Error updating data"), 500 finally: self._close_database_connection(db) def update_user_data(self): is_available, db, error_response = self._check_database_available() if not is_available: return error_response try: data = request.get_json() f_username = data.get('f_username') f_password = data.get('f_password') t_username = data.get('t_username') t_password = data.get('t_password') if not f_username: return jsonify({"error": "f_username is required"}) result = db.execute( "UPDATE user SET f_password = %s, t_password = %s, t_username = %s, last_update_time = CURRENT_TIMESTAMP WHERE f_username = %s", [f_password, t_password, t_username, f_username] ) if result: return jsonify("Data updated successfully") else: return jsonify("Error updating data") except Exception as error: return self._handle_error(error, "Error updating data") finally: self._close_database_connection(db) def run(self, port=3000, debug=True): # Test database connection before starting server test_db = self._get_database_connection() if test_db is None: print("Cannot start server: Database not available") exit(1) self._close_database_connection(test_db) print(f'CM Bot DB API Listening at Port : {port}') self.app.run(host='0.0.0.0', port=port, debug=debug) def run_in_thread(self, port=3000, debug=False): """Run the Flask app in a separate thread""" # Test database connection before starting server test_db = self._get_database_connection() if test_db is None: print("Cannot start server: Database not available") return None self._close_database_connection(test_db) def run_app(): print(f'CM Bot DB API Listening at Port : {port}') self.app.run(host='0.0.0.0', port=port, debug=debug, use_reloader=False) thread = threading.Thread(target=run_app, daemon=True) thread.start() return thread if __name__ == '__main__': api = CM_API() api.run(port = 3000)