commit fe326b2cd58b29ff716c5e10afab82ca91052d7e Author: Alexander Date: Sat Aug 10 20:15:53 2024 -0400 init diff --git a/README.md b/README.md new file mode 100644 index 0000000..cbab95a --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +# affection-captcha-bot + +Users are auto muted by default. They can unmute themself if they successfully click the correct 4 numbers. The number pad is randomized each time. Difficulty, attempts, and time to expiration can be set in the env file. + +## Getting Started + +### Bare Metal + +- Clone the repo +- Open the `app` folder +- Rename `sample.env` to `.env` +- Enter your Telegram bot token from @botfather +- Run by typing `python main.py` + +### Docker + +- Run by typing `docker compose up -d` in the repo's root folder + +### Telegram + +- Add the bot to your supergroup as admin +- The bot only works for supergroups (public) \ No newline at end of file diff --git a/app/Dockerfile b/app/Dockerfile new file mode 100644 index 0000000..ab8707f --- /dev/null +++ b/app/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.12 +RUN apt update +RUN apt upgrade -y +RUN apt install -y build-essential libtiff5-dev libjpeg62-turbo-dev zlib1g-dev libfreetype6-dev liblcms2-dev libwebp-dev tcl8.6-dev tk8.6-dev python3-tk +WORKDIR /app +COPY . . +RUN pip install -r requirements.txt +CMD ["sh", "-c", "python main.py"] diff --git a/app/custom.py b/app/custom.py new file mode 100644 index 0000000..8a9f592 --- /dev/null +++ b/app/custom.py @@ -0,0 +1,149 @@ +import json +import os +import random + +import httpx +from multicolorcaptcha import CaptchaGenerator +from telegram import InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto +from telegram.error import BadRequest +from telegram.ext import ContextTypes + + +def get_name_from_user(user): + return "@{}".format(user.username) if hasattr(user, 'username') else "{} {}".format(user.first_name, user.last_name) + + +async def delete_message(context: ContextTypes.DEFAULT_TYPE) -> None: + job = context.job + try: + await context.bot.delete_message(chat_id=job.data['chat_id'], message_id=job.data['message_id']) + except BadRequest: + pass + + +def generate_numpad(pressed=None): + if not pressed: + pressed = [] + numbers = list(range(0, 10)) + random.shuffle(numbers) + keyboard = [ + [InlineKeyboardButton("*{}*".format(str(n)) if n in pressed else str(n), callback_data="key_{}".format(str(n))) for n in numbers[0:3]], + [InlineKeyboardButton("*{}*".format(str(n)) if n in pressed else str(n), callback_data="key_{}".format(str(n))) for n in numbers[3:6]], + [InlineKeyboardButton("*{}*".format(str(n)) if n in pressed else str(n), callback_data="key_{}".format(str(n))) for n in numbers[6:9]], + [InlineKeyboardButton("🫣", callback_data="restart"), InlineKeyboardButton("*{}*".format(str(numbers[9])) if numbers[9] in pressed else str(numbers[9]), callback_data="key_{}".format(str(numbers[9]))), InlineKeyboardButton("😵‍💫", callback_data="regenerate")] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + return reply_markup + + +async def solve_captcha(update, context, user_data): + text = os.getenv('CAPTCHA_TEXT') + reply_markup = generate_numpad() + if update.callback_query: + msg = await update.callback_query.edit_message_text(text=text, reply_markup=reply_markup) + else: + msg = await update.message.reply_photo("{}/{}".format(os.getenv('DATA_FOLDER'), user_data["captcha_file"]), caption=text, reply_markup=reply_markup) + context.job_queue.run_once(delete_message, int(os.getenv('CAPTCHA_EXPIRES')), data={'chat_id': msg.chat_id, 'message_id': msg.message_id}) + + +async def generate_captcha_image(user_id, mode): + generator = CaptchaGenerator(2) + images_folder = "{}/images".format(os.getenv('DATA_FOLDER')) + os.makedirs(images_folder, exist_ok=True) + if mode == "math": + math_captcha = generator.gen_math_captcha_image(difficult_level=int(os.getenv('CAPTCHA_DIFFICULTY'))) + math_image = math_captcha.image + math_equation_string = math_captcha.equation_str + math_equation_result = math_captcha.equation_result + math_image.save("{}/{}_math.png".format(images_folder, user_id), "png") + return math_equation_string, math_equation_result + elif mode == "random": + captcha = generator.gen_captcha_image(difficult_level=int(os.getenv('CAPTCHA_DIFFICULTY'))) + image = captcha.image + characters = captcha.characters + image.save("{}/{}_random.png".format(images_folder, user_id), "png") + return characters, characters + else: + raise Exception("No such mode for captchas") + + +async def generate_case_file(update, user, perms): + chat_id = update.message.chat_id + mode = os.getenv("CAPTCHA_MODE") + question, answer = await generate_captcha_image(user.id, mode) + user_data = { + "id": user.id, + "message_id": update.message.id, + "first_name": user.first_name, + "last_name": user.last_name, + "username": user.username, + "permissions": perms, + "captcha_mode": mode, + "captcha_file": "images/{}_{}.png".format(user.id, mode), + "captcha_question": question, + "captcha_answer": answer, + "captcha_answer_submitted": "", + "captcha_attempts": 0, + "captcha_solved": False + } + save_case_file(chat_id, user_data) + return user_data + + +async def update_caption_attempts(update, user_data): + text = os.getenv('CAPTCHA_TEXT') + if str(user_data["captcha_attempts"]) == os.getenv("CAPTCHA_MAX_ATTEMPTS"): + text += ". Last chance... " + text = "{} (Attempts: {})".format(text, user_data["captcha_attempts"]) + await update.callback_query.edit_message_caption( + caption=text, + reply_markup=generate_numpad() + ) + + +async def regenerate_captcha_for_case_file(update, user_data): + query = update.callback_query + mode = os.getenv("CAPTCHA_MODE") + question, answer = await generate_captcha_image(query.from_user.id, mode) + user_data["captcha_file"] = "images/{}_{}.png".format(query.from_user.id, mode) + user_data["captcha_question"] = question + user_data["captcha_answer"] = answer + user_data["captcha_answer_submitted"] = "" + user_data["captcha_attempts"] += 1 + # change the captcha photo + await update.callback_query.edit_message_media( + media=InputMediaPhoto(open("{}/{}".format(os.getenv('DATA_FOLDER'), user_data["captcha_file"]), "rb")), + reply_markup=generate_numpad() + ) + # save the user_data + save_case_file(query.message.chat.id, user_data) + return user_data + + +def load_case_file(chat_id, user_id): + channel_folder = "{}/channels/{}".format(os.getenv('DATA_FOLDER'), chat_id) + os.makedirs(channel_folder, exist_ok=True) + try: + return json.load(open("{}/{}.json".format(channel_folder, user_id), "r")) + except FileNotFoundError: + return {} + + +def save_case_file(chat_id, user_data): + channel_folder = "{}/channels/{}".format(os.getenv('DATA_FOLDER'), chat_id) + os.makedirs(channel_folder, exist_ok=True) + open("{}/{}.json".format(channel_folder, user_data["id"]), "w").write(json.dumps(user_data, indent=4)) + + +async def is_user_cas_banned(user_id): + r = httpx.get("https://api.cas.chat/check?user_id={}".format(user_id)) + try: + resp = r.json() + except json.decoder.JSONDecodeError: + return False + else: + if resp["ok"] is True: + if resp["result"]["offenses"] > 0: + return True + else: + return False diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..49eaeb2 --- /dev/null +++ b/app/main.py @@ -0,0 +1,171 @@ +import logging +import sys +import traceback + +from dotenv import load_dotenv +from telegram import Update +from telegram.ext import ApplicationBuilder, CallbackQueryHandler +from telegram.ext import MessageHandler + +from custom import * + +load_dotenv() +log_file = '{}/app.log'.format(os.getenv('DATA_FOLDER')) +logging.basicConfig( + format='%(asctime)s %(name)s %(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=logging.INFO, + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler(sys.stdout) + ] +) +for package in (silence_packages := ( + 'httpx', + 'requests', + 'apscheduler', +)): + logging.getLogger(package).setLevel(logging.ERROR) + + +def is_bot(update): + if message := update.edited_message if update.edited_message else update.message: + if message.from_user.is_bot: + return message.from_user.id + return False + + +async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + # ignore other bots + if is_bot(update): + return + # ignore groups where muting isn't available + elif update.effective_chat.type not in ('supergroup',): + return + + restricted_perms = {"can_send_messages": False} + # new user join the chat + if update.message.new_chat_members: + for user in update.message.new_chat_members: + # skip if bot + if user.is_bot: + continue + # mute the user + await context.bot.restrict_chat_member( + update.message.chat_id, + update.message.from_user.id, + restricted_perms + ) + # checks if user is cas banned, do not unmute or give them a captcha + if await is_user_cas_banned(user.id): + # ban the user from this channel too + await context.bot.ban_chat_member( + update.message.chat_id, + user.id + ) + # announce user is already cas banned + await update.message.reply_text(text="{} is CAS Banned!".format(get_name_from_user(user))) + else: + # create a captcha file for the user + user_data = await generate_case_file(update, user, restricted_perms) + # show the captcha solver + await solve_captcha(update, context, user_data) + return + + +async def menu_button(update: Update, context: ContextTypes.DEFAULT_TYPE): + query = update.callback_query + # check if captcha file exists for the user + channel_folder = "{}/channels/{}".format(os.getenv('DATA_FOLDER'), query.message.chat.id) + os.makedirs(channel_folder, exist_ok=True) + if not os.path.exists("{}/{}.json".format(channel_folder, query.from_user.id)): + return + else: + # check if user's captcha is already solved + user_data = load_case_file(query.message.chat.id, query.from_user.id) + if user_data["captcha_solved"] is True: + return + if user_data["message_id"] != query.message.reply_to_message.message_id: + return + await query.answer() + # load the user data + user_data = load_case_file(query.message.chat.id, query.from_user.id) + # answer callback query + await context.bot.answer_callback_query(query.id) + # check for input + if not query.data.startswith("key_"): + if query.data not in ("regenerate", "restart",): + return + # do not allow refreshing the captcha too many times + if user_data["captcha_attempts"] + 1 > int(os.getenv('CAPTCHA_MAX_ATTEMPTS')): + return + match query.data: + case "regenerate": + # regenerate user data + await regenerate_captcha_for_case_file(update, user_data) + # update the caption but not the captcha + await update_caption_attempts(update, user_data) + case "restart": + # reset the answer input + user_data["captcha_answer_submitted"] = "" + user_data["captcha_attempts"] += 1 + # save the user_data + save_case_file(query.message.chat.id, user_data) + # update the caption but not the captcha + await update_caption_attempts(update, user_data) + else: + # get the number pressed + number = query.data.replace("key_", "") + # save the value + user_data["captcha_answer_submitted"] += str(number) + # check if the values match + if user_data["captcha_answer"] == user_data["captcha_answer_submitted"]: + # reverse perms + user_data["permissions"] = {k: True for k in user_data["permissions"]} + # unmute the user + await context.bot.restrict_chat_member( + query.message.chat.id, + user_data["id"], + user_data["permissions"] + ) + # log attempts + user_data["captcha_attempts"] += 1 + # mark captcha is solved + user_data["captcha_solved"] = True + # delete the captcha + await context.bot.delete_message(chat_id=query.message.chat.id, message_id=query.message.message_id) + # show welcome message to the joiner + await query.message.reply_to_message.reply_text(os.getenv('WELCOME_MESSAGE').format(get_name_from_user(query.from_user))) + # check if captcha is the same length but wrong, start over + elif len(user_data["captcha_answer_submitted"]) >= len(user_data["captcha_answer"]): + # user ran out of attempts + if user_data["captcha_attempts"] + 1 > int(os.getenv('CAPTCHA_MAX_ATTEMPTS')): + # log attempts + user_data["captcha_attempts"] += 1 + # delete the captcha + await context.bot.delete_message(chat_id=query.message.chat.id, message_id=query.message.message_id) + else: + # regenerate user data + await regenerate_captcha_for_case_file(update, user_data) + # update the caption but not the captcha + await update_caption_attempts(update, user_data) + # save the user_data + save_case_file(query.message.chat.id, user_data) + + +async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: + logging.error("Exception while handling an update:", exc_info=context.error) + tb_list = traceback.format_exception(None, context.error, context.error.__traceback__) + tb_string = "".join(tb_list) + logging.error(tb_string) + + +if __name__ == '__main__': + logging.info("-" * 50) + logging.info("Affection TG Captcha Bot") + logging.info("-" * 50) + app = ApplicationBuilder().token(os.getenv('TELEGRAM_BOT_TOKEN')).build() + app.add_handler(MessageHandler(None, handle_message)) + app.add_handler(CallbackQueryHandler(menu_button)) + app.add_error_handler(error_handler) + app.run_polling(allowed_updates=[Update.MESSAGE, Update.CALLBACK_QUERY]) diff --git a/app/requirements.txt b/app/requirements.txt new file mode 100644 index 0000000..c4ab345 --- /dev/null +++ b/app/requirements.txt @@ -0,0 +1,4 @@ +httpx +python-telegram-bot[ext] +python-dotenv +multicolorcaptcha \ No newline at end of file diff --git a/app/sample.env b/app/sample.env new file mode 100644 index 0000000..c31f97e --- /dev/null +++ b/app/sample.env @@ -0,0 +1,8 @@ +DATA_FOLDER=../data +TELEGRAM_BOT_TOKEN= +CAPTCHA_DIFFICULTY=1 +CAPTCHA_EXPIRES=900 +CAPTCHA_MAX_ATTEMPTS=69 +CAPTCHA_MODE=random +CAPTCHA_TEXT=Click on 4 numbers as shown to talk +WELCOME_MESSAGE="Welcome, {} 🥰❤️‍🔥" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..4b5e2e3 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,14 @@ +version: "3.1" +services: + app: + container_name: affection-captcha-bot + build: app + restart: unless-stopped + volumes: + - "./app:/app" + - "./data:/data" + environment: + - DATA_FOLDER=/data + command: ['python', 'main.py'] + env_file: + - ./app/.env \ No newline at end of file