- move Python sources into app package and switch services to module entrypoints - relocate Dockerfiles under docker/, add buildx publish script, override compose for local builds - configure images to pull from gitea.04080616.xyz/yiekheng with env-driven tags and limits - harden installs and transfer worker logging/concurrency for cleaner container output
192 lines
6.8 KiB
Python
192 lines
6.8 KiB
Python
import threading
|
|
from flask import Flask, jsonify, request
|
|
from flask_cors import CORS
|
|
from .db import DB
|
|
|
|
|
|
class CM_API:
|
|
|
|
def __init__(self):
|
|
self.app = Flask(__name__)
|
|
CORS(self.app)
|
|
self._register_routes()
|
|
|
|
def _get_database_connection(self):
|
|
"""Create a new database connection for use"""
|
|
try:
|
|
db = DB()
|
|
return db
|
|
except Exception as e:
|
|
print(f"Database connection failed: {e}")
|
|
return None
|
|
|
|
def _close_database_connection(self, db):
|
|
"""Close database connection if it exists"""
|
|
if db is not None:
|
|
try:
|
|
# Assuming DB class has a close method or similar cleanup
|
|
if hasattr(db, 'close'):
|
|
db.close()
|
|
elif hasattr(db, 'connection') and hasattr(db.connection, 'close'):
|
|
db.connection.close()
|
|
except Exception as e:
|
|
print(f"Error closing database connection: {e}")
|
|
|
|
def _register_routes(self):
|
|
# Account routes
|
|
self.app.route('/acc/<username>', methods=['GET'])(self.get_account)
|
|
self.app.route('/acc/', methods=['GET'])(self.get_account)
|
|
|
|
# User routes
|
|
self.app.route('/user/<username>', methods=['GET'])(self.get_user)
|
|
self.app.route('/user/', methods=['GET'])(self.get_user)
|
|
|
|
# Update routes
|
|
self.app.route('/update-acc-data', methods=['POST'])(self.update_acc_data)
|
|
self.app.route('/update-user-data', methods=['POST'])(self.update_user_data)
|
|
|
|
def _check_database_available(self):
|
|
db = self._get_database_connection()
|
|
if db is None:
|
|
return False, None, ("Database not available", 500)
|
|
return True, db, None
|
|
|
|
def _handle_error(self, error, message="An error occurred"):
|
|
print(f"Error: {error}")
|
|
return message, 500
|
|
|
|
def get_account(self, username=None):
|
|
is_available, db, error_response = self._check_database_available()
|
|
if not is_available:
|
|
return error_response
|
|
|
|
try:
|
|
if username:
|
|
query = "SELECT username, password, status, link FROM acc WHERE username = %s"
|
|
query_params = [username]
|
|
else:
|
|
query = "SELECT username, password, status, link FROM acc"
|
|
query_params = []
|
|
|
|
results = db.query(query, query_params)
|
|
return jsonify(results)
|
|
|
|
except Exception as error:
|
|
return self._handle_error(error, "Not Found"), 404
|
|
finally:
|
|
self._close_database_connection(db)
|
|
|
|
def get_user(self, username=None):
|
|
is_available, db, error_response = self._check_database_available()
|
|
if not is_available:
|
|
return error_response
|
|
|
|
try:
|
|
if username:
|
|
query = "SELECT f_username, f_password, t_username, t_password FROM user WHERE f_username = %s"
|
|
query_params = [username]
|
|
else:
|
|
query = "SELECT f_username, f_password, t_username, t_password, last_update_time FROM user"
|
|
query_params = []
|
|
|
|
results = db.query(query, query_params)
|
|
return jsonify(results)
|
|
|
|
except Exception as error:
|
|
return self._handle_error(error, "Not Found"), 404
|
|
finally:
|
|
self._close_database_connection(db)
|
|
|
|
def update_acc_data(self):
|
|
is_available, db, error_response = self._check_database_available()
|
|
if not is_available:
|
|
return error_response
|
|
|
|
try:
|
|
data = request.get_json()
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
status = data.get('status')
|
|
link = data.get('link')
|
|
|
|
if not username:
|
|
return jsonify({"error": "Username is required"})
|
|
|
|
result = db.execute(
|
|
"UPDATE acc SET password = %s, status = %s, link = %s WHERE username = %s",
|
|
[password, status, link, username]
|
|
)
|
|
|
|
if result:
|
|
return jsonify("Data updated successfully")
|
|
else:
|
|
return jsonify("Error updating data")
|
|
|
|
except Exception as error:
|
|
return self._handle_error(error, "Error updating data"), 500
|
|
finally:
|
|
self._close_database_connection(db)
|
|
|
|
def update_user_data(self):
|
|
is_available, db, error_response = self._check_database_available()
|
|
if not is_available:
|
|
return error_response
|
|
|
|
try:
|
|
data = request.get_json()
|
|
f_username = data.get('f_username')
|
|
f_password = data.get('f_password')
|
|
t_username = data.get('t_username')
|
|
t_password = data.get('t_password')
|
|
|
|
if not f_username:
|
|
return jsonify({"error": "f_username is required"})
|
|
|
|
result = db.execute(
|
|
"UPDATE user SET f_password = %s, t_password = %s, t_username = %s, last_update_time = CURRENT_TIMESTAMP WHERE f_username = %s",
|
|
[f_password, t_password, t_username, f_username]
|
|
)
|
|
|
|
if result:
|
|
return jsonify("Data updated successfully")
|
|
else:
|
|
return jsonify("Error updating data")
|
|
|
|
except Exception as error:
|
|
return self._handle_error(error, "Error updating data")
|
|
finally:
|
|
self._close_database_connection(db)
|
|
|
|
def run(self, port=3000, debug=True):
|
|
# Test database connection before starting server
|
|
test_db = self._get_database_connection()
|
|
if test_db is None:
|
|
print("Cannot start server: Database not available")
|
|
exit(1)
|
|
self._close_database_connection(test_db)
|
|
|
|
print(f'CM Bot DB API Listening at Port : {port}')
|
|
self.app.run(host='0.0.0.0', port=port, debug=debug)
|
|
|
|
def run_in_thread(self, port=3000, debug=False):
|
|
"""Run the Flask app in a separate thread"""
|
|
# Test database connection before starting server
|
|
test_db = self._get_database_connection()
|
|
if test_db is None:
|
|
print("Cannot start server: Database not available")
|
|
return None
|
|
self._close_database_connection(test_db)
|
|
|
|
def run_app():
|
|
print(f'CM Bot DB API Listening at Port : {port}')
|
|
self.app.run(host='0.0.0.0', port=port, debug=debug, use_reloader=False)
|
|
|
|
thread = threading.Thread(target=run_app, daemon=True)
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
if __name__ == '__main__':
|
|
api = CM_API()
|
|
api.run(port = 3000)
|