init
This commit is contained in:
commit
fe326b2cd5
7 changed files with 376 additions and 0 deletions
22
README.md
Normal file
22
README.md
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
# affection-captcha-bot
|
||||||
|
|
||||||
|
Users are auto muted by default. They can unmute themself if they successfully click the correct 4 numbers. The number pad is randomized each time. Difficulty, attempts, and time to expiration can be set in the env file.
|
||||||
|
|
||||||
|
## Getting Started
|
||||||
|
|
||||||
|
### Bare Metal
|
||||||
|
|
||||||
|
- Clone the repo
|
||||||
|
- Open the `app` folder
|
||||||
|
- Rename `sample.env` to `.env`
|
||||||
|
- Enter your Telegram bot token from @botfather
|
||||||
|
- Run by typing `python main.py`
|
||||||
|
|
||||||
|
### Docker
|
||||||
|
|
||||||
|
- Run by typing `docker compose up -d` in the repo's root folder
|
||||||
|
|
||||||
|
### Telegram
|
||||||
|
|
||||||
|
- Add the bot to your supergroup as admin
|
||||||
|
- The bot only works for supergroups (public)
|
||||||
8
app/Dockerfile
Normal file
8
app/Dockerfile
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
FROM python:3.12
|
||||||
|
RUN apt update
|
||||||
|
RUN apt upgrade -y
|
||||||
|
RUN apt install -y build-essential libtiff5-dev libjpeg62-turbo-dev zlib1g-dev libfreetype6-dev liblcms2-dev libwebp-dev tcl8.6-dev tk8.6-dev python3-tk
|
||||||
|
WORKDIR /app
|
||||||
|
COPY . .
|
||||||
|
RUN pip install -r requirements.txt
|
||||||
|
CMD ["sh", "-c", "python main.py"]
|
||||||
149
app/custom.py
Normal file
149
app/custom.py
Normal file
|
|
@ -0,0 +1,149 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from multicolorcaptcha import CaptchaGenerator
|
||||||
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto
|
||||||
|
from telegram.error import BadRequest
|
||||||
|
from telegram.ext import ContextTypes
|
||||||
|
|
||||||
|
|
||||||
|
def get_name_from_user(user):
|
||||||
|
return "@{}".format(user.username) if hasattr(user, 'username') else "{} {}".format(user.first_name, user.last_name)
|
||||||
|
|
||||||
|
|
||||||
|
async def delete_message(context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
job = context.job
|
||||||
|
try:
|
||||||
|
await context.bot.delete_message(chat_id=job.data['chat_id'], message_id=job.data['message_id'])
|
||||||
|
except BadRequest:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def generate_numpad(pressed=None):
|
||||||
|
if not pressed:
|
||||||
|
pressed = []
|
||||||
|
numbers = list(range(0, 10))
|
||||||
|
random.shuffle(numbers)
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("*{}*".format(str(n)) if n in pressed else str(n), callback_data="key_{}".format(str(n))) for n in numbers[0:3]],
|
||||||
|
[InlineKeyboardButton("*{}*".format(str(n)) if n in pressed else str(n), callback_data="key_{}".format(str(n))) for n in numbers[3:6]],
|
||||||
|
[InlineKeyboardButton("*{}*".format(str(n)) if n in pressed else str(n), callback_data="key_{}".format(str(n))) for n in numbers[6:9]],
|
||||||
|
[InlineKeyboardButton("🫣", callback_data="restart"), InlineKeyboardButton("*{}*".format(str(numbers[9])) if numbers[9] in pressed else str(numbers[9]), callback_data="key_{}".format(str(numbers[9]))), InlineKeyboardButton("😵💫", callback_data="regenerate")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
return reply_markup
|
||||||
|
|
||||||
|
|
||||||
|
async def solve_captcha(update, context, user_data):
|
||||||
|
text = os.getenv('CAPTCHA_TEXT')
|
||||||
|
reply_markup = generate_numpad()
|
||||||
|
if update.callback_query:
|
||||||
|
msg = await update.callback_query.edit_message_text(text=text, reply_markup=reply_markup)
|
||||||
|
else:
|
||||||
|
msg = await update.message.reply_photo("{}/{}".format(os.getenv('DATA_FOLDER'), user_data["captcha_file"]), caption=text, reply_markup=reply_markup)
|
||||||
|
context.job_queue.run_once(delete_message, int(os.getenv('CAPTCHA_EXPIRES')), data={'chat_id': msg.chat_id, 'message_id': msg.message_id})
|
||||||
|
|
||||||
|
|
||||||
|
async def generate_captcha_image(user_id, mode):
|
||||||
|
generator = CaptchaGenerator(2)
|
||||||
|
images_folder = "{}/images".format(os.getenv('DATA_FOLDER'))
|
||||||
|
os.makedirs(images_folder, exist_ok=True)
|
||||||
|
if mode == "math":
|
||||||
|
math_captcha = generator.gen_math_captcha_image(difficult_level=int(os.getenv('CAPTCHA_DIFFICULTY')))
|
||||||
|
math_image = math_captcha.image
|
||||||
|
math_equation_string = math_captcha.equation_str
|
||||||
|
math_equation_result = math_captcha.equation_result
|
||||||
|
math_image.save("{}/{}_math.png".format(images_folder, user_id), "png")
|
||||||
|
return math_equation_string, math_equation_result
|
||||||
|
elif mode == "random":
|
||||||
|
captcha = generator.gen_captcha_image(difficult_level=int(os.getenv('CAPTCHA_DIFFICULTY')))
|
||||||
|
image = captcha.image
|
||||||
|
characters = captcha.characters
|
||||||
|
image.save("{}/{}_random.png".format(images_folder, user_id), "png")
|
||||||
|
return characters, characters
|
||||||
|
else:
|
||||||
|
raise Exception("No such mode for captchas")
|
||||||
|
|
||||||
|
|
||||||
|
async def generate_case_file(update, user, perms):
|
||||||
|
chat_id = update.message.chat_id
|
||||||
|
mode = os.getenv("CAPTCHA_MODE")
|
||||||
|
question, answer = await generate_captcha_image(user.id, mode)
|
||||||
|
user_data = {
|
||||||
|
"id": user.id,
|
||||||
|
"message_id": update.message.id,
|
||||||
|
"first_name": user.first_name,
|
||||||
|
"last_name": user.last_name,
|
||||||
|
"username": user.username,
|
||||||
|
"permissions": perms,
|
||||||
|
"captcha_mode": mode,
|
||||||
|
"captcha_file": "images/{}_{}.png".format(user.id, mode),
|
||||||
|
"captcha_question": question,
|
||||||
|
"captcha_answer": answer,
|
||||||
|
"captcha_answer_submitted": "",
|
||||||
|
"captcha_attempts": 0,
|
||||||
|
"captcha_solved": False
|
||||||
|
}
|
||||||
|
save_case_file(chat_id, user_data)
|
||||||
|
return user_data
|
||||||
|
|
||||||
|
|
||||||
|
async def update_caption_attempts(update, user_data):
|
||||||
|
text = os.getenv('CAPTCHA_TEXT')
|
||||||
|
if str(user_data["captcha_attempts"]) == os.getenv("CAPTCHA_MAX_ATTEMPTS"):
|
||||||
|
text += ". Last chance... "
|
||||||
|
text = "{} (Attempts: {})".format(text, user_data["captcha_attempts"])
|
||||||
|
await update.callback_query.edit_message_caption(
|
||||||
|
caption=text,
|
||||||
|
reply_markup=generate_numpad()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def regenerate_captcha_for_case_file(update, user_data):
|
||||||
|
query = update.callback_query
|
||||||
|
mode = os.getenv("CAPTCHA_MODE")
|
||||||
|
question, answer = await generate_captcha_image(query.from_user.id, mode)
|
||||||
|
user_data["captcha_file"] = "images/{}_{}.png".format(query.from_user.id, mode)
|
||||||
|
user_data["captcha_question"] = question
|
||||||
|
user_data["captcha_answer"] = answer
|
||||||
|
user_data["captcha_answer_submitted"] = ""
|
||||||
|
user_data["captcha_attempts"] += 1
|
||||||
|
# change the captcha photo
|
||||||
|
await update.callback_query.edit_message_media(
|
||||||
|
media=InputMediaPhoto(open("{}/{}".format(os.getenv('DATA_FOLDER'), user_data["captcha_file"]), "rb")),
|
||||||
|
reply_markup=generate_numpad()
|
||||||
|
)
|
||||||
|
# save the user_data
|
||||||
|
save_case_file(query.message.chat.id, user_data)
|
||||||
|
return user_data
|
||||||
|
|
||||||
|
|
||||||
|
def load_case_file(chat_id, user_id):
|
||||||
|
channel_folder = "{}/channels/{}".format(os.getenv('DATA_FOLDER'), chat_id)
|
||||||
|
os.makedirs(channel_folder, exist_ok=True)
|
||||||
|
try:
|
||||||
|
return json.load(open("{}/{}.json".format(channel_folder, user_id), "r"))
|
||||||
|
except FileNotFoundError:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def save_case_file(chat_id, user_data):
|
||||||
|
channel_folder = "{}/channels/{}".format(os.getenv('DATA_FOLDER'), chat_id)
|
||||||
|
os.makedirs(channel_folder, exist_ok=True)
|
||||||
|
open("{}/{}.json".format(channel_folder, user_data["id"]), "w").write(json.dumps(user_data, indent=4))
|
||||||
|
|
||||||
|
|
||||||
|
async def is_user_cas_banned(user_id):
|
||||||
|
r = httpx.get("https://api.cas.chat/check?user_id={}".format(user_id))
|
||||||
|
try:
|
||||||
|
resp = r.json()
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
if resp["ok"] is True:
|
||||||
|
if resp["result"]["offenses"] > 0:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
171
app/main.py
Normal file
171
app/main.py
Normal file
|
|
@ -0,0 +1,171 @@
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from telegram import Update
|
||||||
|
from telegram.ext import ApplicationBuilder, CallbackQueryHandler
|
||||||
|
from telegram.ext import MessageHandler
|
||||||
|
|
||||||
|
from custom import *
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
log_file = '{}/app.log'.format(os.getenv('DATA_FOLDER'))
|
||||||
|
logging.basicConfig(
|
||||||
|
format='%(asctime)s %(name)s %(levelname)s %(message)s',
|
||||||
|
datefmt='%H:%M:%S',
|
||||||
|
level=logging.INFO,
|
||||||
|
handlers=[
|
||||||
|
logging.FileHandler(log_file),
|
||||||
|
logging.StreamHandler(sys.stdout)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
for package in (silence_packages := (
|
||||||
|
'httpx',
|
||||||
|
'requests',
|
||||||
|
'apscheduler',
|
||||||
|
)):
|
||||||
|
logging.getLogger(package).setLevel(logging.ERROR)
|
||||||
|
|
||||||
|
|
||||||
|
def is_bot(update):
|
||||||
|
if message := update.edited_message if update.edited_message else update.message:
|
||||||
|
if message.from_user.is_bot:
|
||||||
|
return message.from_user.id
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
# ignore other bots
|
||||||
|
if is_bot(update):
|
||||||
|
return
|
||||||
|
# ignore groups where muting isn't available
|
||||||
|
elif update.effective_chat.type not in ('supergroup',):
|
||||||
|
return
|
||||||
|
|
||||||
|
restricted_perms = {"can_send_messages": False}
|
||||||
|
# new user join the chat
|
||||||
|
if update.message.new_chat_members:
|
||||||
|
for user in update.message.new_chat_members:
|
||||||
|
# skip if bot
|
||||||
|
if user.is_bot:
|
||||||
|
continue
|
||||||
|
# mute the user
|
||||||
|
await context.bot.restrict_chat_member(
|
||||||
|
update.message.chat_id,
|
||||||
|
update.message.from_user.id,
|
||||||
|
restricted_perms
|
||||||
|
)
|
||||||
|
# checks if user is cas banned, do not unmute or give them a captcha
|
||||||
|
if await is_user_cas_banned(user.id):
|
||||||
|
# ban the user from this channel too
|
||||||
|
await context.bot.ban_chat_member(
|
||||||
|
update.message.chat_id,
|
||||||
|
user.id
|
||||||
|
)
|
||||||
|
# announce user is already cas banned
|
||||||
|
await update.message.reply_text(text="{} is CAS Banned!".format(get_name_from_user(user)))
|
||||||
|
else:
|
||||||
|
# create a captcha file for the user
|
||||||
|
user_data = await generate_case_file(update, user, restricted_perms)
|
||||||
|
# show the captcha solver
|
||||||
|
await solve_captcha(update, context, user_data)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
async def menu_button(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
query = update.callback_query
|
||||||
|
# check if captcha file exists for the user
|
||||||
|
channel_folder = "{}/channels/{}".format(os.getenv('DATA_FOLDER'), query.message.chat.id)
|
||||||
|
os.makedirs(channel_folder, exist_ok=True)
|
||||||
|
if not os.path.exists("{}/{}.json".format(channel_folder, query.from_user.id)):
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
# check if user's captcha is already solved
|
||||||
|
user_data = load_case_file(query.message.chat.id, query.from_user.id)
|
||||||
|
if user_data["captcha_solved"] is True:
|
||||||
|
return
|
||||||
|
if user_data["message_id"] != query.message.reply_to_message.message_id:
|
||||||
|
return
|
||||||
|
await query.answer()
|
||||||
|
# load the user data
|
||||||
|
user_data = load_case_file(query.message.chat.id, query.from_user.id)
|
||||||
|
# answer callback query
|
||||||
|
await context.bot.answer_callback_query(query.id)
|
||||||
|
# check for input
|
||||||
|
if not query.data.startswith("key_"):
|
||||||
|
if query.data not in ("regenerate", "restart",):
|
||||||
|
return
|
||||||
|
# do not allow refreshing the captcha too many times
|
||||||
|
if user_data["captcha_attempts"] + 1 > int(os.getenv('CAPTCHA_MAX_ATTEMPTS')):
|
||||||
|
return
|
||||||
|
match query.data:
|
||||||
|
case "regenerate":
|
||||||
|
# regenerate user data
|
||||||
|
await regenerate_captcha_for_case_file(update, user_data)
|
||||||
|
# update the caption but not the captcha
|
||||||
|
await update_caption_attempts(update, user_data)
|
||||||
|
case "restart":
|
||||||
|
# reset the answer input
|
||||||
|
user_data["captcha_answer_submitted"] = ""
|
||||||
|
user_data["captcha_attempts"] += 1
|
||||||
|
# save the user_data
|
||||||
|
save_case_file(query.message.chat.id, user_data)
|
||||||
|
# update the caption but not the captcha
|
||||||
|
await update_caption_attempts(update, user_data)
|
||||||
|
else:
|
||||||
|
# get the number pressed
|
||||||
|
number = query.data.replace("key_", "")
|
||||||
|
# save the value
|
||||||
|
user_data["captcha_answer_submitted"] += str(number)
|
||||||
|
# check if the values match
|
||||||
|
if user_data["captcha_answer"] == user_data["captcha_answer_submitted"]:
|
||||||
|
# reverse perms
|
||||||
|
user_data["permissions"] = {k: True for k in user_data["permissions"]}
|
||||||
|
# unmute the user
|
||||||
|
await context.bot.restrict_chat_member(
|
||||||
|
query.message.chat.id,
|
||||||
|
user_data["id"],
|
||||||
|
user_data["permissions"]
|
||||||
|
)
|
||||||
|
# log attempts
|
||||||
|
user_data["captcha_attempts"] += 1
|
||||||
|
# mark captcha is solved
|
||||||
|
user_data["captcha_solved"] = True
|
||||||
|
# delete the captcha
|
||||||
|
await context.bot.delete_message(chat_id=query.message.chat.id, message_id=query.message.message_id)
|
||||||
|
# show welcome message to the joiner
|
||||||
|
await query.message.reply_to_message.reply_text(os.getenv('WELCOME_MESSAGE').format(get_name_from_user(query.from_user)))
|
||||||
|
# check if captcha is the same length but wrong, start over
|
||||||
|
elif len(user_data["captcha_answer_submitted"]) >= len(user_data["captcha_answer"]):
|
||||||
|
# user ran out of attempts
|
||||||
|
if user_data["captcha_attempts"] + 1 > int(os.getenv('CAPTCHA_MAX_ATTEMPTS')):
|
||||||
|
# log attempts
|
||||||
|
user_data["captcha_attempts"] += 1
|
||||||
|
# delete the captcha
|
||||||
|
await context.bot.delete_message(chat_id=query.message.chat.id, message_id=query.message.message_id)
|
||||||
|
else:
|
||||||
|
# regenerate user data
|
||||||
|
await regenerate_captcha_for_case_file(update, user_data)
|
||||||
|
# update the caption but not the captcha
|
||||||
|
await update_caption_attempts(update, user_data)
|
||||||
|
# save the user_data
|
||||||
|
save_case_file(query.message.chat.id, user_data)
|
||||||
|
|
||||||
|
|
||||||
|
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
logging.error("Exception while handling an update:", exc_info=context.error)
|
||||||
|
tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
|
||||||
|
tb_string = "".join(tb_list)
|
||||||
|
logging.error(tb_string)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
logging.info("-" * 50)
|
||||||
|
logging.info("Affection TG Captcha Bot")
|
||||||
|
logging.info("-" * 50)
|
||||||
|
app = ApplicationBuilder().token(os.getenv('TELEGRAM_BOT_TOKEN')).build()
|
||||||
|
app.add_handler(MessageHandler(None, handle_message))
|
||||||
|
app.add_handler(CallbackQueryHandler(menu_button))
|
||||||
|
app.add_error_handler(error_handler)
|
||||||
|
app.run_polling(allowed_updates=[Update.MESSAGE, Update.CALLBACK_QUERY])
|
||||||
4
app/requirements.txt
Normal file
4
app/requirements.txt
Normal file
|
|
@ -0,0 +1,4 @@
|
||||||
|
httpx
|
||||||
|
python-telegram-bot[ext]
|
||||||
|
python-dotenv
|
||||||
|
multicolorcaptcha
|
||||||
8
app/sample.env
Normal file
8
app/sample.env
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
DATA_FOLDER=../data
|
||||||
|
TELEGRAM_BOT_TOKEN=
|
||||||
|
CAPTCHA_DIFFICULTY=1
|
||||||
|
CAPTCHA_EXPIRES=900
|
||||||
|
CAPTCHA_MAX_ATTEMPTS=69
|
||||||
|
CAPTCHA_MODE=random
|
||||||
|
CAPTCHA_TEXT=Click on 4 numbers as shown to talk
|
||||||
|
WELCOME_MESSAGE="Welcome, {} 🥰❤️🔥"
|
||||||
14
docker-compose.yml
Normal file
14
docker-compose.yml
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
version: "3.1"
|
||||||
|
services:
|
||||||
|
app:
|
||||||
|
container_name: affection-captcha-bot
|
||||||
|
build: app
|
||||||
|
restart: unless-stopped
|
||||||
|
volumes:
|
||||||
|
- "./app:/app"
|
||||||
|
- "./data:/data"
|
||||||
|
environment:
|
||||||
|
- DATA_FOLDER=/data
|
||||||
|
command: ['python', 'main.py']
|
||||||
|
env_file:
|
||||||
|
- ./app/.env
|
||||||
Loading…
Reference in a new issue