Rewriting from scratch

This commit is contained in:
unknown 2023-03-12 21:26:26 +01:00
parent 7699b4ad94
commit 8bc564bb6c
2 changed files with 42 additions and 289 deletions

View File

@ -1,6 +1,7 @@
from redbot.core.bot import Red from redbot.core import commands
from .recruitment import Recruitment from .recruitment import Recruitment
def setup(bot: Red): def setup(bot: commands.Bot) -> None:
bot.add_cog(Recruitment(bot)) cog = Recruitment(bot)
bot.add_cog(cog)

View File

@ -12,306 +12,58 @@ from redbot.core.utils.chat_formatting import pagify, box
from redbot.core.utils.antispam import AntiSpam from redbot.core.utils.antispam import AntiSpam
from redbot.core.bot import Red from redbot.core.bot import Red
from redbot.core.utils.predicates import MessagePredicate from redbot.core.utils.predicates import MessagePredicate
from redbot.core.utils.tunnel import Tunnel
application_channel_id = 1023172488143839252
class Recruitment(commands.Cog): class Recruitment(commands.Cog):
"""Create user applications that server staff can respond to. """A cog that lets a user send a membership application.
Users can open an application using `[p]apply`. These are then sent Users can open an application using `[!]apply`. These are then sent
to a channel in the server for staff, and the application creator to a channel in the server for staff, and the application creator
gets a DM. Both can be used to communicate. gets a DM. Both can be used to communicate.
""" """
default_guild_settings = {"output_channel": None, "active": False, "next_application": 1} def __init__(self, bot):
default_application = {"application": {}}
# This can be made configurable later if it
# becomes an issue.
# Intervals should be a list of tuples in the form
# (period: timedelta, max_frequency: int)
# see redbot/core/utils/antispam.py for more details
intervals = [
(timedelta(seconds=5), 1),
(timedelta(minutes=5), 3),
(timedelta(hours=1), 10),
(timedelta(days=1), 24),
]
def __init__(self, bot: Red):
super().__init__()
self.bot = bot self.bot = bot
self.config = Config.get_conf(self, 42631423034200142, force_registration=True) self.message: str = ''
self.config.register_guild(**self.default_guild_settings) self.active = True
self.config.init_custom("RECRUITMENT", 2)
self.config.register_custom("RECRUITMENT", **self.default_application)
self.antispam = {}
self.user_cache = []
self.tunnel_store = {}
# (guild, ticket#):
# {'tun': Tunnel, 'msgs': List[int]}
@property
def tunnels(self):
return [x["tun"] for x in self.tunnel_store.values()]
@checks.admin_or_permissions(manage_guild=True) @commands.group(name="apply", usage="[text]", invoke_without_command=True)
@commands.guild_only() async def apply(self, ctx: commands.Context, *, _apply: str = ""):
@commands.group(name="applicationset")
async def applicationset(self, ctx: commands.Context):
"""+++Manage Applications+++"""
pass
@checks.admin_or_permissions(manage_guild=True)
@applicationset.command(name="output")
async def applicationset_output(
self, ctx: commands.Context, channel: Union[discord.TextChannel, discord.VoiceChannel]
):
"""Set the channel where applications will be sent."""
await self.config.guild(ctx.guild).output_channel.set(channel.id)
await ctx.send("The application channel has been set.")
@checks.admin_or_permissions(manage_guild=True)
@applicationset.command(name="toggle", aliases=["toggleactive"])
async def applicationset_toggle(self, ctx: commands.Context):
"""Enable or disable applications for this server."""
active = await self.config.guild(ctx.guild).active()
active = not active
await self.config.guild(ctx.guild).active.set(active)
if active:
await ctx.send("Applications are now enabled")
else:
await ctx.send("Applications are now disabled")
async def internal_filter(self, m: discord.Member, mod=False, perms=None):
if perms and m.guild_permissions >= perms:
return True
if mod and await self.bot.is_mod(m):
return True
# The following line is for consistency with how perms are handled
# in Red, though I'm not sure it makes sense to use here.
if await self.bot.is_owner(m):
return True
async def discover_guild(
self,
author: discord.User,
*,
mod: bool = False,
permissions: Union[discord.Permissions, dict] = None,
prompt: str = "",
):
"""
discovers which of shared guilds between the bot
and provided user based on conditions (mod or permissions is an or)
prompt is for providing a user prompt for selection
"""
shared_guilds = []
if permissions is None:
perms = discord.Permissions()
elif isinstance(permissions, discord.Permissions):
perms = permissions
else:
perms = discord.Permissions(**permissions)
async for guild in AsyncIter(self.bot.guilds, steps=100):
x = guild.get_member(author.id)
if x is not None:
if await self.internal_filter(x, mod, perms):
shared_guilds.append(guild)
if len(shared_guilds) == 0:
raise ValueError("No Qualifying Shared Guilds")
if len(shared_guilds) == 1:
return shared_guilds[0]
output = ""
guilds = sorted(shared_guilds, key=lambda g: g.name)
for i, guild in enumerate(guilds, 1):
output += "{}: {}\n".format(i, guild.name)
output += "\n{}".format(prompt)
for page in pagify(output, delims=["\n"]):
await author.send(box(page))
try:
message = await self.bot.wait_for(
"message",
check=MessagePredicate.same_context(channel=author.dm_channel, user=author),
timeout=45,
)
except asyncio.TimeoutError:
await author.send("You took too long to select. Try again later.")
return None
try:
message = int(message.content.strip())
guild = guilds[message - 1]
except (ValueError, IndexError):
await author.send("That wasn't a valid choice.")
return None
else:
return guild
async def send_application(self, ctx: commands.Context, msg: discord.Message, guild: discord.Guild):
author = guild.get_member(msg.author.id)
application = msg.clean_content
channel_id = await self.config.guild(guild).output_channel()
channel = guild.get_channel(channel_id)
if channel is None:
return None
files: List[discord.File] = await Tunnel.files_from_attach(msg)
application_number = await self.config.guild(guild).next_application()
await self.config.guild(guild).next_application.set(application_number + 1)
if await self.bot.embed_requested(channel):
em = discord.Embed(description=application, colour=await ctx.embed_colour())
em.set_author(
name="Application from {author}{maybe_nick}".format(
author=author, maybe_nick=(f" ({author.nick})" if author.nick else "")
),
icon_url=author.display_avatar,
)
em.set_footer(text="Application #{}".format(application_number))
send_content = None
else:
em = None
send_content = "Application from {author.mention} (Application #{number})".format(
author=author, number=application_number
)
send_content += "\n" + application
try:
await Tunnel.message_forwarder(
destination=channel, content=send_content, embed=em, files=files
)
except (discord.Forbidden, discord.HTTPException):
return None
return application_number
@commands.group(name="application", usage="[text]", invoke_without_command=True)
async def application(self, ctx: commands.Context, *, _application: str = ""):
"""Send an application. """Send an application.
Use without arguments for an interactive experience, or do Use without arguments for an interactive application creation flow, or do
`[p]apply [text]` to use it non-interactively. `[p]apply [text]` to use it non-interactively.
""" """
author = ctx.author author = ctx.author
guild = ctx.guild
if guild is None:
guild = await self.discover_guild(
author, prompt="Select a server to make an application in by number."
)
if guild is None:
return
g_active = await self.config.guild(guild).active()
if not g_active:
return await author.send("Applications are currently closed")
if guild.id not in self.antispam:
self.antispam[guild.id] = {}
if author.id not in self.antispam[guild.id]:
self.antispam[guild.id][author.id] = AntiSpam(self.intervals)
if self.antispam[guild.id][author.id].spammy:
return await author.send(
"You've sent too many applications recently. "
"Are you sure you are in the right place? "
)
if author.id in self.user_cache:
return await author.send(
"Please finish making your prior application before trying to make an "
"additional one!"
)
self.user_cache.append(author.id)
if _application: # If there is no text argument, use an interactive flow
_m = copy(ctx.message) if not _apply:
_m.content = _application # Send a DM to the author to initiate the application
_m.content = _m.clean_content await author.send("Please answer the following questions to complete your application.")
val = await self.send_application(ctx, _m, guild) questions = ["What's your name?", "What's your age?", "Why do you want to join our community?"]
answers = []
# Ask the user the questions
for question in questions:
await author.send(question)
# Wait for the user to respond
answer = await self.bot.wait_for(
"message", check=lambda m: m.author == author and m.guild is None
)
answers.append(answer.content)
# Combine the answers into a single message and send to the application channel
application_text = "\n".join([f"{question}: {answer}" for question, answer in zip(questions, answers)])
application_channel = self.bot.get_channel(application_channel_id)
await application_channel.send(application_text)
# Send a confirmation message to the author
await author.send("Thank you for submitting your application!")
else: else:
try: # If there is a text argument, use a non-interactive flow
await author.send( application_channel = self.bot.get_channel(application_channel_id)
"Please respond to this message with your application." await application_channel.send(_apply)
"\nYour application should be a single message" await author.send("Thank you for submitting your application!")
)
except discord.Forbidden:
return await ctx.send("This requires DMs enabled.")
try:
message = await self.bot.wait_for(
"message",
check=MessagePredicate.same_context(ctx, channel=author.dm_channel),
timeout=180,
)
except asyncio.TimeoutError:
return await author.send("You took too long. Try again later.")
else:
val = await self.send_application(ctx, message, guild)
# Get the role to assign using its ID
trialRole_id = 531181363420987423
role = get(ctx.guild.roles, id=trialRole_id)
# Assign the role to the user who sent the application
if role is not None:
await author.add_roles(role)
with contextlib.suppress(discord.Forbidden, discord.HTTPException):
if val is None:
if await self.config.guild(ctx.guild).output_channel() is None:
await author.send(
"Hmm, most embarrassing, it rather seems Hatt has neglected to tell me where the applications room is. Please contact him for me."
)
else:
await author.send("Drat! There was an error sending your application, please contact Hatt.")
else:
await author.send("Your application was submitted. (Application #{})".format(val))
self.antispam[guild.id][author.id].stamp()
@application.after_invoke
async def application_cleanup(self, ctx: commands.Context):
"""
The logic is cleaner this way
"""
if ctx.author.id in self.user_cache:
self.user_cache.remove(ctx.author.id)
if ctx.guild and ctx.invoked_subcommand is None:
if ctx.bot_permissions.manage_messages:
try:
await ctx.message.delete()
except discord.NotFound:
pass
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
to_remove = []
for k, v in self.tunnel_store.items():
guild, application_number = k
if await self.bot.cog_disabled_in_guild(self, guild):
to_remove.append(k)
continue
topic = "Re: application# {application_number} in {guild.name}".format(
application_number=application_number, guild=guild
)
# Tunnels won't forward unintended messages, this is safe
msgs = await v["tun"].communicate(message=message, topic=topic)
if msgs:
self.tunnel_store[k]["msgs"] = msgs
for key in to_remove:
if tun := self.tunnel_store.pop(key, None):
guild, application = key
await tun["tun"].close_because_disabled(
"Correspondence about application# {application_number} in "
"{guild.name} has been ended due "
"to applications being closed."
.format(application_number=application, guild=guild)
)