development #1
@ -1,246 +1,285 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import re
|
||||
from typing import List
|
||||
from datetime import timedelta
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
import discord
|
||||
from redbot.core import Config, checks, commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.utils.antispam import AntiSpam
|
||||
|
||||
# Define your application questions and field settings\
|
||||
QUESTIONS_LIST = [
|
||||
{
|
||||
"key": "name",
|
||||
"prompt": "First of all, what is your name?",
|
||||
"field_name": "Name",
|
||||
"inline": True,
|
||||
},
|
||||
{
|
||||
"key": "age",
|
||||
"prompt": "What age are you?",
|
||||
"field_name": "Age",
|
||||
"inline": True,
|
||||
},
|
||||
{
|
||||
"key": "country",
|
||||
"prompt": "Where are you from?",
|
||||
"field_name": "Country",
|
||||
"inline": True,
|
||||
},
|
||||
{
|
||||
"key": "hobbies",
|
||||
"prompt": "Do you have any hobbies?",
|
||||
"field_name": "Hobbies",
|
||||
"inline": True,
|
||||
},
|
||||
{
|
||||
"key": "game",
|
||||
"prompt": "Are you wishing to join because of a particular game? If so, which game?",
|
||||
"field_name": "Specific game?",
|
||||
"inline": True,
|
||||
},
|
||||
{
|
||||
"key": "motivation",
|
||||
"prompt": "Write out, in a free-style way, what your motivation is for wanting to join us in particular and how you would be a good fit for Kanium",
|
||||
"field_name": "Motivation for wanting to join:",
|
||||
"inline": False,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def sanitize_input(input_text: str) -> str:
|
||||
"""Sanitize input to remove mentions, links, and special characters."""
|
||||
sanitized_text = re.sub(r'<@!?&?(\d+)>', '', input_text)
|
||||
sanitized_text = re.sub(r'http\S+', '', sanitized_text)
|
||||
sanitized_text = re.sub(r'([^\w\s.,?!`~@#$%^&*()_+=-])', '', sanitized_text)
|
||||
return sanitized_text
|
||||
"""Sanitize input to remove mentions, links, and unwanted special characters."""
|
||||
text = re.sub(r'<@!?(?:&)?\d+>', '', input_text)
|
||||
text = re.sub(r'http\S+', '', text)
|
||||
# Allow unicode letters and common punctuation
|
||||
text = re.sub(r'[^\w\s\p{L}\.,\?!`~@#$%^&*()_+=-]', '', text)
|
||||
return text
|
||||
|
||||
|
||||
class Recruitment(commands.Cog):
|
||||
class Recruitment(commands.Cog): # noqa
|
||||
"""A cog that lets a user send a membership application."""
|
||||
|
||||
def __init__(self, bot: Red):
|
||||
self.bot = bot
|
||||
self.message: str = ''
|
||||
self.config = Config.get_conf(self, identifier=101101101101001110101) # Replace with your own unique identifier
|
||||
default_guild = {"guild_id": 274657393936302080, "application_channel_id": None}
|
||||
self.config = Config.get_conf(self, identifier=0xFAB123ABC456)
|
||||
default_guild = {"application_channel_id": None}
|
||||
self.config.register_guild(**default_guild)
|
||||
self.antispam = {}
|
||||
self.cog_check_enabled = True # Attribute to track the state of cog_check
|
||||
# antispam store per guild per user
|
||||
self.antispam: Dict[int, Dict[int, AntiSpam]] = {}
|
||||
self.cog_check_enabled = True
|
||||
|
||||
async def cog_check(self, ctx: commands.Context):
|
||||
if await ctx.bot.is_admin(ctx.author):
|
||||
async def cog_check(self, ctx: commands.Context) -> bool:
|
||||
if (
|
||||
await ctx.bot.is_admin(ctx.author)
|
||||
or not self.cog_check_enabled
|
||||
or (ctx.guild and ctx.author.guild_permissions.manage_guild)
|
||||
or (ctx.guild and ctx.author.guild_permissions.administrator)
|
||||
):
|
||||
return True
|
||||
|
||||
if not self.cog_check_enabled:
|
||||
return True # If disabled, always return True to allow all commands
|
||||
|
||||
guild_id = ctx.guild.id
|
||||
author_id = ctx.author.id # Get the ID of the user who invoked the command
|
||||
|
||||
# Check if the guild has an antispam entry, if not, create one
|
||||
if guild_id not in self.antispam:
|
||||
self.antispam[guild_id] = {}
|
||||
|
||||
# Check if the user has an antispam entry in this guild, if not, create one
|
||||
if author_id not in self.antispam[guild_id]:
|
||||
self.antispam[guild_id][author_id] = AntiSpam([(datetime.timedelta(hours=1), 1)])
|
||||
|
||||
# Get the antispam object for this specific user in this guild
|
||||
antispam = self.antispam[guild_id][author_id]
|
||||
|
||||
if antispam.spammy:
|
||||
gid = ctx.guild.id
|
||||
uid = ctx.author.id
|
||||
if gid not in self.antispam:
|
||||
self.antispam[gid] = {}
|
||||
if uid not in self.antispam[gid]:
|
||||
self.antispam[gid][uid] = AntiSpam([(datetime.timedelta(hours=1), 1)])
|
||||
spam = self.antispam[gid][uid]
|
||||
if spam.spammy:
|
||||
try:
|
||||
await ctx.message.delete(delay=0)
|
||||
await ctx.message.delete()
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
try:
|
||||
await ctx.author.send("Please wait for an hour before sending another application.")
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
return False
|
||||
|
||||
antispam.stamp()
|
||||
spam.stamp()
|
||||
return True
|
||||
|
||||
@commands.command(name="togglecogcheck")
|
||||
@checks.is_owner() # Or use any other appropriate check
|
||||
async def toggle_cog_check(self, ctx: commands.Context):
|
||||
@checks.is_owner()
|
||||
async def toggle_cog_check(self, ctx: commands.Context) -> None:
|
||||
"""Toggle the cog_check functionality on or off."""
|
||||
self.cog_check_enabled = not self.cog_check_enabled
|
||||
status = "enabled" if self.cog_check_enabled else "disabled"
|
||||
await ctx.send(f"Cog check has been {status}.")
|
||||
await ctx.send(f"Cog checks are now {'enabled' if self.cog_check_enabled else 'disabled'}.")
|
||||
|
||||
@commands.guild_only()
|
||||
@checks.admin_or_permissions(manage_guild=True)
|
||||
@commands.group(name="setapplicationschannel", pass_context=True, no_pm=True)
|
||||
async def setapplicationschannel(self, ctx: commands.Context):
|
||||
@commands.group(
|
||||
name="setapplicationschannel",
|
||||
invoke_without_command=True,
|
||||
)
|
||||
async def set_applications_channel(self, ctx: commands.Context) -> None:
|
||||
"""Set the channel where applications will be sent."""
|
||||
if ctx.invoked_subcommand is None:
|
||||
guild = ctx.guild
|
||||
channel = ctx.channel
|
||||
await self.config.guild(guild).guild_id.set(guild.id)
|
||||
await self.config.guild(guild).application_channel_id.set(channel.id)
|
||||
await ctx.send(f"Application channel set to {channel.mention}.")
|
||||
await self.config.guild(ctx.guild).application_channel_id.set(ctx.channel.id)
|
||||
await ctx.send(f"Application channel set to {ctx.channel.mention}.")
|
||||
|
||||
@setapplicationschannel.command(name="clear")
|
||||
async def clear_application_channel(self, ctx: commands.Context):
|
||||
@set_applications_channel.command(name="clear")
|
||||
async def clear_applications_channel(self, ctx: commands.Context) -> None:
|
||||
"""Clear the current application channel."""
|
||||
guild = ctx.guild
|
||||
await self.config.guild(guild).clear_raw("application_channel_id")
|
||||
await self.config.guild(ctx.guild).clear_raw("application_channel_id")
|
||||
await ctx.send("Application channel cleared.")
|
||||
|
||||
@commands.group(name="application", usage="[text]", invoke_without_command=True)
|
||||
async def application(self, ctx: commands.Context, *, _application: str = ""):
|
||||
# Input validation and sanitization for _application
|
||||
_application = sanitize_input(_application)
|
||||
if len(_application) > 2000:
|
||||
await ctx.send("Your application is too long. Please limit it to 2000 characters.")
|
||||
@commands.group(name="application", invoke_without_command=True)
|
||||
async def application(self, ctx: commands.Context, *, text: Optional[str] = None) -> None:
|
||||
"""Start an application process."""
|
||||
# Direct free-text application (optional)
|
||||
if text:
|
||||
text = sanitize_input(text)
|
||||
if len(text) > 2000:
|
||||
await ctx.send("Your application is too long (max 2000 chars).")
|
||||
return
|
||||
|
||||
guild_id = await self.get_guild_id(ctx)
|
||||
guild = discord.utils.get(self.bot.guilds, id=guild_id)
|
||||
if guild is None:
|
||||
await ctx.send(f"The guild with ID {guild_id} could not be found.")
|
||||
return
|
||||
|
||||
author = ctx.author
|
||||
if author.guild != guild:
|
||||
await ctx.send(f"You need to be in the {guild.name} server to submit an application.")
|
||||
return
|
||||
|
||||
if await self.check_author_is_member_and_channel_is_dm(ctx):
|
||||
await self.interactive_application(author)
|
||||
|
||||
async def get_guild_id(self, ctx: commands.Context) -> int:
|
||||
guild_id = await self.config.guild(ctx.author.guild).guild_id()
|
||||
return guild_id
|
||||
|
||||
async def check_author_is_member_and_channel_is_dm(self, ctx: commands.Context) -> bool:
|
||||
if not isinstance(ctx.author, discord.Member):
|
||||
await ctx.send("You need to join the server before your application can be processed.")
|
||||
return False
|
||||
if not isinstance(ctx.channel, discord.DMChannel):
|
||||
# Determine if in guild or DM
|
||||
if isinstance(ctx.channel, discord.DMChannel):
|
||||
await self._start_application(ctx.author)
|
||||
else:
|
||||
try:
|
||||
await ctx.message.delete()
|
||||
except:
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
await self.interactive_application(ctx)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
async def interactive_application(self, ctx: commands.Context):
|
||||
"""Ask the user several questions to create an application."""
|
||||
author = ctx.author
|
||||
embed = discord.Embed(
|
||||
title="+++ KANIUM APPLICATION SYSTEM +++",
|
||||
description="Ah, you wish to apply for Kanium membership. Very well, understand that This process is very important to us so we expect you to put effort in and be glorious about it. Let us begin!",
|
||||
color=discord.Color.green(),
|
||||
try:
|
||||
await ctx.author.send(
|
||||
"Let's move this to DM so we can process your application."
|
||||
)
|
||||
await author.send(embed=embed)
|
||||
except discord.Forbidden:
|
||||
await ctx.send("I cannot DM you. Please enable DMs and try again.")
|
||||
return
|
||||
await self._start_application(ctx.author)
|
||||
|
||||
answers = await self.ask_questions(author)
|
||||
async def _start_application(self, member: discord.Member) -> None:
|
||||
# Kick off interactive questions
|
||||
if not await self._send_embed(
|
||||
member,
|
||||
title="+++ KANIUM APPLICATION SYSTEM +++",
|
||||
description=(
|
||||
"Ah, you wish to apply for Kanium membership."
|
||||
" Please take your time and answer thoughtfully."
|
||||
),
|
||||
color=discord.Color.green(),
|
||||
):
|
||||
return
|
||||
|
||||
answers = await self.ask_questions(member)
|
||||
if not answers:
|
||||
return
|
||||
embed = self.format_application(answers, member)
|
||||
await self.send_application(member, embed)
|
||||
|
||||
embeddedApplication = await self.format_application(answers, author)
|
||||
|
||||
# Call the sendApplication to check if the author is a member of the guild and send the application if they are
|
||||
await self.sendApplication(author, embeddedApplication)
|
||||
|
||||
|
||||
async def sendApplication(self, author: discord.Member, embeddedApplication: discord.Embed):
|
||||
# Check if the author is a member of the guild
|
||||
guild = author.guild
|
||||
member = guild.get_member(author.id)
|
||||
if member is None:
|
||||
await author.send("You need to join the server before your application can be processed.")
|
||||
return
|
||||
|
||||
# Send the embed to the application channel
|
||||
application_channel_id = await self.config.guild(guild).application_channel_id()
|
||||
if not application_channel_id:
|
||||
await author.send("The application channel has not been set. Please use the `setapplicationschannel` command to set it.")
|
||||
return
|
||||
|
||||
application_channel = guild.get_channel(application_channel_id)
|
||||
if application_channel is None:
|
||||
await author.send(f"The application channel with ID {application_channel_id} could not be found.")
|
||||
return
|
||||
|
||||
async def _send_embed(
|
||||
self,
|
||||
member: discord.Member,
|
||||
*,
|
||||
title: str,
|
||||
description: str,
|
||||
color: discord.Color,
|
||||
) -> bool:
|
||||
embed = discord.Embed(title=title, description=description, color=color)
|
||||
try:
|
||||
message = await application_channel.send(embed=embeddedApplication)
|
||||
await member.send(embed=embed)
|
||||
return True
|
||||
except discord.Forbidden:
|
||||
await author.send("I do not have permission to send messages in the application channel.")
|
||||
return
|
||||
return False
|
||||
|
||||
# Add reactions to the message
|
||||
async def ask_questions(self, member: discord.Member) -> Optional[Dict[str, str]]:
|
||||
answers: Dict[str, str] = {}
|
||||
for q in QUESTIONS_LIST:
|
||||
try:
|
||||
await self.add_reactions(message)
|
||||
await member.send(q["prompt"])
|
||||
except discord.Forbidden:
|
||||
await author.send("I do not have permission to add reactions to messages in the application channel.")
|
||||
return
|
||||
|
||||
# Assign the Trial role to the author
|
||||
role = guild.get_role(531181363420987423)
|
||||
return None
|
||||
try:
|
||||
await member.add_roles(role)
|
||||
except discord.Forbidden:
|
||||
await author.send("I do not have permission to assign roles.")
|
||||
return
|
||||
|
||||
await author.send("Thank you for submitting your application! You have been given the 'Trial' role.")
|
||||
|
||||
|
||||
async def add_reactions(self, message: discord.Message):
|
||||
reactions = ["✅", "❌", "❓"]
|
||||
for reaction in reactions:
|
||||
await message.add_reaction(reaction)
|
||||
|
||||
async def format_application(self, answers: List[str], author: discord.Member) -> discord.Embed:
|
||||
"""Format the application answers into an embed."""
|
||||
application_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
trial_end_date = (datetime.datetime.now() + datetime.timedelta(days=30)).strftime("%Y-%m-%d")
|
||||
|
||||
embed = discord.Embed(title=f"Application from {author.name}#{author.discriminator}", color=discord.Color.green())
|
||||
embed.set_thumbnail(url=author.avatar.url)
|
||||
embed.add_field(name="Name", value=answers[0])
|
||||
embed.add_field(name="Age", value=answers[1])
|
||||
embed.add_field(name="Country", value=answers[2])
|
||||
embed.add_field(name="Hobbies", value=answers[3])
|
||||
embed.add_field(name="Specific game?", value=answers[4])
|
||||
embed.add_field(name="\u200b", value="\u200b") # Empty field for spacing
|
||||
embed.add_field(name="Motivation for wanting to join:", value=answers[5], inline=False)
|
||||
embed.set_footer(text=f"Application received: {application_date}, Trial ends: {trial_end_date}")
|
||||
|
||||
return embed
|
||||
|
||||
async def ask_questions(self, author: discord.Member) -> List[str]:
|
||||
"""Ask the user several questions and return the answers."""
|
||||
questions = [
|
||||
"First of all, what is your name?",
|
||||
"What age are you?",
|
||||
"Where are you from?",
|
||||
"Do you have any hobbies?",
|
||||
"Are you wishing to join because of a particular game? If so, which game?",
|
||||
"Write out, in a free-style way, what your motivation is for wanting to join us in particular and how you would be a good fit for Kanium",
|
||||
]
|
||||
|
||||
answers = []
|
||||
|
||||
for question in questions:
|
||||
await author.send(question)
|
||||
|
||||
try:
|
||||
answer = await asyncio.wait_for(self.get_answers(author), timeout=300.0)
|
||||
answers.append(answer.content)
|
||||
msg = await asyncio.wait_for(
|
||||
self.bot.wait_for(
|
||||
"message",
|
||||
check=lambda m: m.author == member and isinstance(m.channel, discord.DMChannel),
|
||||
),
|
||||
timeout=300.0,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
await author.send("You took too long to answer. Please start over by using the application command again.")
|
||||
return []
|
||||
|
||||
try:
|
||||
await member.send("You took too long. Please run the application command again.")
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
return None
|
||||
answers[q["key"]] = sanitize_input(msg.content)
|
||||
return answers
|
||||
|
||||
async def get_answers(self, author: discord.Member) -> discord.Message:
|
||||
"""Wait for the user to send a message."""
|
||||
return await self.bot.wait_for("message", check=lambda m: m.author == author and isinstance(m.channel, discord.DMChannel))
|
||||
def format_application(
|
||||
self, answers: Dict[str, str], member: discord.Member
|
||||
) -> discord.Embed:
|
||||
now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||
trial_ends = (
|
||||
datetime.datetime.utcnow() + datetime.timedelta(days=30)
|
||||
).strftime("%Y-%m-%d UTC")
|
||||
|
||||
embed = discord.Embed(
|
||||
title=f"Application from {member}", color=discord.Color.green()
|
||||
)
|
||||
embed.set_thumbnail(url=member.avatar.url)
|
||||
for q in QUESTIONS_LIST:
|
||||
embed.add_field(
|
||||
name=q["field_name"],
|
||||
value=answers[q["key"]],
|
||||
inline=q["inline"],
|
||||
)
|
||||
embed.set_footer(text=f"Received: {now} | Trial ends: {trial_ends}")
|
||||
return embed
|
||||
|
||||
async def send_application(
|
||||
self, member: discord.Member, embed: discord.Embed
|
||||
) -> None:
|
||||
guild = member.guild
|
||||
if guild is None:
|
||||
try:
|
||||
await member.send("You need to be in the server to apply.")
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
return
|
||||
|
||||
channel_id = await self.config.guild(guild).application_channel_id()
|
||||
if not channel_id:
|
||||
try:
|
||||
await member.send(
|
||||
"Application channel not set. Ask an admin to run `setapplicationschannel`."
|
||||
)
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
return
|
||||
|
||||
channel = guild.get_channel(channel_id)
|
||||
if not channel:
|
||||
try:
|
||||
await member.send("Application channel was not found. Please ask an admin to re-set it.")
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
return
|
||||
|
||||
try:
|
||||
sent = await channel.send(embed=embed)
|
||||
await self.add_reactions(sent)
|
||||
except discord.Forbidden:
|
||||
try:
|
||||
await member.send("I cannot post or react in the application channel.")
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
return
|
||||
|
||||
# Assign Trial role
|
||||
role = guild.get_role(531181363420987423)
|
||||
if role:
|
||||
try:
|
||||
await member.add_roles(role)
|
||||
await member.send("Thank you! You've been granted the Trial role.")
|
||||
except discord.Forbidden:
|
||||
try:
|
||||
await member.send("I lack permissions to assign roles.")
|
||||
except discord.Forbidden:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
async def add_reactions(message: discord.Message) -> None:
|
||||
for emoji in ("✅", "❌", "❓"):
|
||||
await message.add_reaction(emoji)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user