cm_bot_v2/cm_telegram.py
2025-10-05 18:39:47 +08:00

109 lines
4.1 KiB
Python

import threading, logging, time
from telegram import ForceReply, Update
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from cm_bot_hal import CM_BOT_HAL
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
creating_acc_now = False
async def menu_cmd_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
menu = [
'MENU',
'/1 - Get Acc',
'/2 <link> - Set Security Pin',
'/3 <agent username> <agent password> <player username> <player password>'
]
await update.message.reply_text('\n'.join(menu))
async def get_acc_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
while creating_acc_now == True:
await update.message.reply_text('CM account creation is running, queuing ...')
time.sleep(60)
creating_acc_now = True
await update.message.reply_text('Start Getting CM Account ...')
try:
bot = CM_BOT_HAL()
user = bot.get_user_api()
msg = [
f'Username: {user["username"]}',
f'Password: {user["password"]}',
f'Link: {user["link"]}'
]
await update.message.reply_text('\n'.join(msg))
except Exception as e:
await update.message.reply_text(f'Error: {e}')
finally:
creating_acc_now = False
async def set_security_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if len(context.args) == 0 or len(context.args) > 1:
await update.message.reply_text('CMD is wrong, please check and retry ...')
return
bot = CM_BOT_HAL()
if bot.is_whatsapp_url(context.args[0]) == False:
await update.message.reply_text('Link Format Wrong, please check and retry ...')
return
await update.message.reply_text('Start Setting Security Pin ...')
try:
result = bot.set_security_pin_api(context.args[0])
del bot
await update.message.reply_text(f"Done setting Security Pin for {result['f_username']} - {result['t_username']} !")
except Exception as e:
await update.message.reply_text(f'Error: {e}')
async def insert_to_user_table_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if len(context.args) == 0 or len(context.args) != 4:
await update.message.reply_text('CMD is wrong, please check and retry ...')
return
bot = CM_BOT_HAL()
f_username, f_password, t_username, t_password = context.args
bot.insert_user_to_table_user(
{
'f_username': f_username,
'f_password': f_password,
't_username': t_username,
't_password': t_password
}
)
await update.message.reply_text(f'Done insert {f_username} into user table.')
def monitor_amount_of_available_acc():
max_available = 20
while True:
bot = CM_BOT_HAL()
available_size = len(bot.get_all_available_acc())
print(available_size)
if available_size <= max_available:
for i in range(available_size, max_available):
bot.create_new_acc()
time.sleep(10 * 60)
del bot
def main() -> None:
"""Start the bot."""
# application = Application.builder().token("5327571437:AAFlowwnAysTEMx6LtYQNTevGCboKDZoYzY").build()
application = Application.builder().token("5315819168:AAH31xwNgPdnk123x97XalmTW6fQV5EUCFU").build()
application.add_handler(CommandHandler("menu", menu_cmd_handler))
application.add_handler(CommandHandler("1", get_acc_handler))
application.add_handler(CommandHandler("2", set_security_handler))
application.add_handler(CommandHandler("3", insert_to_user_table_handler))
# application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
# Start the Telegram bot
print("Starting Telegram bot...")
threading.Thread(target=monitor_amount_of_available_acc, args=()).start()
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()