development #1
@ -1,226 +1,334 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import datetime
|
import datetime
|
||||||
import re
|
import re
|
||||||
from typing import List
|
from typing import Dict, List, Optional
|
||||||
from datetime import timedelta
|
|
||||||
|
|
||||||
import discord
|
import discord
|
||||||
from redbot.core import Config, checks, commands
|
from redbot.core import Config, checks, commands
|
||||||
from redbot.core.bot import Red
|
from redbot.core.bot import Red
|
||||||
from redbot.core.utils.antispam import AntiSpam
|
from redbot.core.utils.antispam import AntiSpam
|
||||||
|
|
||||||
|
# Define your application questions and field settings\
|
||||||
|
QUESTIONS_LIST = [
|
||||||
|
{
|
||||||
|
"key": "name",
|
||||||
|
"prompt": "First of all, what is your name?",
|
||||||
|
"field_name": "Name",
|
||||||
|
"inline": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "age",
|
||||||
|
"prompt": "What age are you?",
|
||||||
|
"field_name": "Age",
|
||||||
|
"inline": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "country",
|
||||||
|
"prompt": "Where are you from?",
|
||||||
|
"field_name": "Country",
|
||||||
|
"inline": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "hobbies",
|
||||||
|
"prompt": "Do you have any hobbies?",
|
||||||
|
"field_name": "Hobbies",
|
||||||
|
"inline": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "game",
|
||||||
|
"prompt": "Are you wishing to join because of a particular game? If so, which game?",
|
||||||
|
"field_name": "Specific game?",
|
||||||
|
"inline": True,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "motivation",
|
||||||
|
"prompt": "Write out, in a free-style way, what your motivation is for wanting to join us in particular and how you would be a good fit for Kanium",
|
||||||
|
"field_name": "Motivation for wanting to join:",
|
||||||
|
"inline": False,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def sanitize_input(input_text: str) -> str:
|
def sanitize_input(input_text: str) -> str:
|
||||||
"""Sanitize input to remove mentions, links, and special characters."""
|
"""Sanitize input to remove mentions, links, and unwanted special characters."""
|
||||||
sanitized_text = re.sub(r'<@!?&?(\d+)>', '', input_text)
|
# Remove user/role/channel mentions
|
||||||
sanitized_text = re.sub(r'http\S+', '', sanitized_text)
|
text = re.sub(r'<@!?(?:&)?\d+>', '', input_text)
|
||||||
sanitized_text = re.sub(r'([^\w\s.,?!`~@#$%^&*()_+=-])', '', sanitized_text)
|
# Remove URLs
|
||||||
return sanitized_text
|
text = re.sub(r'http\S+', '', text)
|
||||||
|
# Keep only word characters (including Unicode letters/digits), whitespace, and your chosen punctuation
|
||||||
|
text = re.sub(r"[^\w\s\.,\?!`~@#$%^&*()_+=-]", "", text)
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
class Recruitment(commands.Cog):
|
|
||||||
|
class Recruitment(commands.Cog): # noqa
|
||||||
"""A cog that lets a user send a membership application."""
|
"""A cog that lets a user send a membership application."""
|
||||||
|
|
||||||
def __init__(self, bot: Red):
|
def __init__(self, bot: Red):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
self.message: str = ''
|
self.config = Config.get_conf(self, identifier=0xFAB123ABC456)
|
||||||
self.config = Config.get_conf(self, identifier=101101101101001110101) # Replace with your own unique identifier
|
default_guild = {"application_channel_id": None}
|
||||||
default_guild = {"guild_id": 274657393936302080, "application_channel_id": None}
|
|
||||||
self.config.register_guild(**default_guild)
|
self.config.register_guild(**default_guild)
|
||||||
self.antispam = {}
|
# antispam store per guild per user
|
||||||
|
self.antispam: Dict[int, Dict[int, AntiSpam]] = {}
|
||||||
|
self.cog_check_enabled = True
|
||||||
|
|
||||||
async def cog_check(self, ctx: commands.Context):
|
async def cog_check(self, ctx: commands.Context) -> bool:
|
||||||
if await ctx.bot.is_admin(ctx.author):
|
if (
|
||||||
|
await ctx.bot.is_admin(ctx.author)
|
||||||
|
or not self.cog_check_enabled
|
||||||
|
or (ctx.guild and ctx.author.guild_permissions.manage_guild)
|
||||||
|
or (ctx.guild and ctx.author.guild_permissions.administrator)
|
||||||
|
):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
guild_id = ctx.guild.id
|
gid = ctx.guild.id
|
||||||
if guild_id not in self.antispam:
|
uid = ctx.author.id
|
||||||
self.antispam[guild_id] = AntiSpam([(datetime.timedelta(hours=1), 1)])
|
if gid not in self.antispam:
|
||||||
antispam = self.antispam[guild_id]
|
self.antispam[gid] = {}
|
||||||
|
if uid not in self.antispam[gid]:
|
||||||
if antispam.spammy:
|
self.antispam[gid][uid] = AntiSpam([(datetime.timedelta(hours=1), 1)])
|
||||||
try:
|
spam = self.antispam[gid][uid]
|
||||||
await ctx.message.delete(delay=0)
|
if spam.spammy:
|
||||||
except discord.Forbidden:
|
|
||||||
pass
|
|
||||||
await ctx.author.send("Please wait for an hour before sending another application.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
antispam.stamp()
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
@commands.guild_only()
|
|
||||||
@checks.admin_or_permissions(manage_guild=True)
|
|
||||||
@commands.group(name="setapplicationschannel", pass_context=True, no_pm=True)
|
|
||||||
async def setapplicationschannel(self, ctx: commands.Context):
|
|
||||||
"""Set the channel where applications will be sent."""
|
|
||||||
if ctx.invoked_subcommand is None:
|
|
||||||
guild = ctx.guild
|
|
||||||
channel = ctx.channel
|
|
||||||
await self.config.guild(guild).guild_id.set(guild.id)
|
|
||||||
await self.config.guild(guild).application_channel_id.set(channel.id)
|
|
||||||
await ctx.send(f"Application channel set to {channel.mention}.")
|
|
||||||
|
|
||||||
@setapplicationschannel.command(name="clear")
|
|
||||||
async def clear_application_channel(self, ctx: commands.Context):
|
|
||||||
"""Clear the current application channel."""
|
|
||||||
guild = ctx.guild
|
|
||||||
await self.config.guild(guild).clear_raw("application_channel_id")
|
|
||||||
await ctx.send("Application channel cleared.")
|
|
||||||
|
|
||||||
@commands.group(name="application", usage="[text]", invoke_without_command=True)
|
|
||||||
async def application(self, ctx: commands.Context, *, _application: str = ""):
|
|
||||||
# Input validation and sanitization for _application
|
|
||||||
_application = sanitize_input(_application)
|
|
||||||
if len(_application) > 2000:
|
|
||||||
await ctx.send("Your application is too long. Please limit it to 2000 characters.")
|
|
||||||
return
|
|
||||||
|
|
||||||
guild_id = await self.get_guild_id(ctx)
|
|
||||||
guild = discord.utils.get(self.bot.guilds, id=guild_id)
|
|
||||||
if guild is None:
|
|
||||||
await ctx.send(f"The guild with ID {guild_id} could not be found.")
|
|
||||||
return
|
|
||||||
|
|
||||||
author = ctx.author
|
|
||||||
if author.guild != guild:
|
|
||||||
await ctx.send(f"You need to be in the {guild.name} server to submit an application.")
|
|
||||||
return
|
|
||||||
|
|
||||||
if await self.check_author_is_member_and_channel_is_dm(ctx):
|
|
||||||
await self.interactive_application(author)
|
|
||||||
|
|
||||||
async def get_guild_id(self, ctx: commands.Context) -> int:
|
|
||||||
guild_id = await self.config.guild(ctx.author.guild).guild_id()
|
|
||||||
return guild_id
|
|
||||||
|
|
||||||
async def check_author_is_member_and_channel_is_dm(self, ctx: commands.Context) -> bool:
|
|
||||||
if not isinstance(ctx.author, discord.Member):
|
|
||||||
await ctx.send("You need to join the server before your application can be processed.")
|
|
||||||
return False
|
|
||||||
if not isinstance(ctx.channel, discord.DMChannel):
|
|
||||||
try:
|
try:
|
||||||
await ctx.message.delete()
|
await ctx.message.delete()
|
||||||
except:
|
except discord.Forbidden:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
await ctx.author.send("Please wait for an hour before sending another application.")
|
||||||
|
except discord.Forbidden:
|
||||||
pass
|
pass
|
||||||
await self.interactive_application(ctx)
|
|
||||||
return False
|
return False
|
||||||
|
spam.stamp()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@commands.command(name="togglecogcheck")
|
||||||
|
@checks.is_owner()
|
||||||
|
async def toggle_cog_check(self, ctx: commands.Context) -> None:
|
||||||
|
"""Toggle the cog_check functionality on or off."""
|
||||||
|
self.cog_check_enabled = not self.cog_check_enabled
|
||||||
|
await ctx.send(f"Cog checks are now {'enabled' if self.cog_check_enabled else 'disabled'}.")
|
||||||
|
|
||||||
async def interactive_application(self, ctx: commands.Context):
|
@commands.group(name="setapplicationschannel", invoke_without_command=True)
|
||||||
"""Ask the user several questions to create an application."""
|
@checks.admin_or_permissions(manage_guild=True)
|
||||||
author = ctx.author
|
async def set_applications_channel(self, ctx: commands.Context, channel: discord.TextChannel = None):
|
||||||
embed = discord.Embed(
|
"""Set the channel where applications will be sent."""
|
||||||
|
channel = channel or ctx.channel
|
||||||
|
await self.config.guild(ctx.guild).application_channel_id.set(channel.id)
|
||||||
|
await ctx.send(f"Application channel set to {channel.mention}.")
|
||||||
|
|
||||||
|
@set_applications_channel.command(name="clear")
|
||||||
|
async def clear_applications_channel(self, ctx: commands.Context):
|
||||||
|
"""Clear the application channel."""
|
||||||
|
await self.config.guild(ctx.guild).clear_raw("application_channel_id")
|
||||||
|
await ctx.send("Application channel cleared.")
|
||||||
|
|
||||||
|
@commands.group(name="application", aliases=["joinus", "joinkanium", "applyformembership"], invoke_without_command=True)
|
||||||
|
async def application(self, ctx: commands.Context, *, text: Optional[str] = None) -> None:
|
||||||
|
"""Start an application process."""
|
||||||
|
# Direct free-text application (optional)
|
||||||
|
if text:
|
||||||
|
text = sanitize_input(text)
|
||||||
|
if len(text) > 2000:
|
||||||
|
await ctx.send("Your application is too long (max 2000 chars).")
|
||||||
|
return
|
||||||
|
# Determine if in guild or DM
|
||||||
|
if isinstance(ctx.channel, discord.DMChannel):
|
||||||
|
await self._start_application(ctx.author)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
await ctx.message.delete()
|
||||||
|
except discord.Forbidden:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
await ctx.author.send(
|
||||||
|
"Let's move this to DM so we can process your application."
|
||||||
|
)
|
||||||
|
except discord.Forbidden:
|
||||||
|
await ctx.send("I cannot DM you. Please enable DMs and try again.")
|
||||||
|
return
|
||||||
|
await self._start_application(ctx.author)
|
||||||
|
|
||||||
|
async def _start_application(self, member: discord.Member) -> None:
|
||||||
|
# Kick off interactive questions
|
||||||
|
if not await self._send_embed(
|
||||||
|
member,
|
||||||
title="+++ KANIUM APPLICATION SYSTEM +++",
|
title="+++ KANIUM APPLICATION SYSTEM +++",
|
||||||
description="Ah, you wish to apply for Kanium membership. Very well, understand that This process is very important to us so we expect you to put effort in and be glorious about it. Let us begin!",
|
description=(
|
||||||
|
"This is the process to join Kanium."
|
||||||
|
" Please take your time and answer thoughtfully."
|
||||||
|
),
|
||||||
color=discord.Color.green(),
|
color=discord.Color.green(),
|
||||||
)
|
):
|
||||||
await author.send(embed=embed)
|
return
|
||||||
|
|
||||||
answers = await self.ask_questions(author)
|
|
||||||
|
|
||||||
|
answers = await self.ask_questions(member)
|
||||||
if not answers:
|
if not answers:
|
||||||
return
|
return
|
||||||
|
embed = self.format_application(answers, member)
|
||||||
|
await self.send_application(member, embed)
|
||||||
|
|
||||||
embeddedApplication = await self.format_application(answers, author)
|
async def _send_embed(
|
||||||
|
self,
|
||||||
# Call the sendApplication to check if the author is a member of the guild and send the application if they are
|
member: discord.Member,
|
||||||
await self.sendApplication(author, embeddedApplication)
|
*,
|
||||||
|
title: str,
|
||||||
|
description: str,
|
||||||
async def sendApplication(self, author: discord.Member, embeddedApplication: discord.Embed):
|
color: discord.Color,
|
||||||
# Check if the author is a member of the guild
|
) -> bool:
|
||||||
guild = author.guild
|
embed = discord.Embed(title=title, description=description, color=color)
|
||||||
member = guild.get_member(author.id)
|
|
||||||
if member is None:
|
|
||||||
await author.send("You need to join the server before your application can be processed.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Send the embed to the application channel
|
|
||||||
application_channel_id = await self.config.guild(guild).application_channel_id()
|
|
||||||
if not application_channel_id:
|
|
||||||
await author.send("The application channel has not been set. Please use the `setapplicationschannel` command to set it.")
|
|
||||||
return
|
|
||||||
|
|
||||||
application_channel = guild.get_channel(application_channel_id)
|
|
||||||
if application_channel is None:
|
|
||||||
await author.send(f"The application channel with ID {application_channel_id} could not be found.")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
message = await application_channel.send(embed=embeddedApplication)
|
await member.send(embed=embed)
|
||||||
|
return True
|
||||||
except discord.Forbidden:
|
except discord.Forbidden:
|
||||||
await author.send("I do not have permission to send messages in the application channel.")
|
return False
|
||||||
return
|
|
||||||
|
|
||||||
# Add reactions to the message
|
async def ask_questions(self, member: discord.Member) -> Optional[Dict[str, str]]:
|
||||||
try:
|
"""
|
||||||
await self.add_reactions(message)
|
Ask each question, let the user confirm their answer,
|
||||||
except discord.Forbidden:
|
and return the final answers (or None on abort).
|
||||||
await author.send("I do not have permission to add reactions to messages in the application channel.")
|
"""
|
||||||
return
|
answers: Dict[str, str] = {}
|
||||||
|
|
||||||
# Assign the Trial role to the author
|
for q in QUESTIONS_LIST:
|
||||||
role = guild.get_role(531181363420987423)
|
prompt = q["prompt"]
|
||||||
try:
|
|
||||||
await member.add_roles(role)
|
|
||||||
except discord.Forbidden:
|
|
||||||
await author.send("I do not have permission to assign roles.")
|
|
||||||
return
|
|
||||||
|
|
||||||
await author.send("Thank you for submitting your application! You have been given the 'Trial' role.")
|
while True:
|
||||||
|
# 1) send the question
|
||||||
|
try:
|
||||||
|
await member.send(prompt)
|
||||||
|
except discord.Forbidden:
|
||||||
|
return None # can't DM
|
||||||
|
|
||||||
|
# 2) wait for their reply (match on ID, not object)
|
||||||
|
try:
|
||||||
|
msg = await asyncio.wait_for(
|
||||||
|
self.bot.wait_for(
|
||||||
|
"message",
|
||||||
|
check=lambda m: (
|
||||||
|
m.author.id == member.id
|
||||||
|
and isinstance(m.channel, discord.DMChannel)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
timeout=300.0,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
await member.send(
|
||||||
|
"You took too long to answer. Please restart the application with `!application`."
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
async def add_reactions(self, message: discord.Message):
|
answer = sanitize_input(msg.content)
|
||||||
reactions = ["✅", "❌", "❓"]
|
|
||||||
for reaction in reactions:
|
|
||||||
await message.add_reaction(reaction)
|
|
||||||
|
|
||||||
async def format_application(self, answers: List[str], author: discord.Member) -> discord.Embed:
|
# 3) echo back for confirmation
|
||||||
"""Format the application answers into an embed."""
|
try:
|
||||||
application_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
await member.send(f"You answered:\n> {answer}\n\nIs that correct? (yes/no)")
|
||||||
trial_end_date = (datetime.datetime.now() + datetime.timedelta(days=30)).strftime("%Y-%m-%d")
|
except discord.Forbidden:
|
||||||
|
return None
|
||||||
|
|
||||||
embed = discord.Embed(title=f"Application from {author.name}#{author.discriminator}", color=discord.Color.green())
|
# 4) wait for a yes/no
|
||||||
embed.set_thumbnail(url=author.avatar.url)
|
try:
|
||||||
embed.add_field(name="Name", value=answers[0])
|
confirm = await asyncio.wait_for(
|
||||||
embed.add_field(name="Age", value=answers[1])
|
self.bot.wait_for(
|
||||||
embed.add_field(name="Country", value=answers[2])
|
"message",
|
||||||
embed.add_field(name="Hobbies", value=answers[3])
|
check=lambda m: (
|
||||||
embed.add_field(name="Specific game?", value=answers[4])
|
m.author.id == member.id
|
||||||
embed.add_field(name="\u200b", value="\u200b") # Empty field for spacing
|
and isinstance(m.channel, discord.DMChannel)
|
||||||
embed.add_field(name="Motivation for wanting to join:", value=answers[5], inline=False)
|
and m.content.lower() in ("y", "yes", "n", "no")
|
||||||
embed.set_footer(text=f"Application received: {application_date}, Trial ends: {trial_end_date}")
|
)
|
||||||
|
),
|
||||||
|
timeout=60.0,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
await member.send(
|
||||||
|
"Confirmation timed out. Please restart the application with `!application`."
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
return embed
|
if confirm.content.lower() in ("y", "yes"):
|
||||||
|
# user confirmed, save and move on
|
||||||
async def ask_questions(self, author: discord.Member) -> List[str]:
|
answers[q["key"]] = answer
|
||||||
"""Ask the user several questions and return the answers."""
|
break
|
||||||
questions = [
|
else:
|
||||||
"First of all, what is your name?",
|
# user said “no” → repeat this question
|
||||||
"What age are you?",
|
await member.send("Okay, let's try that again.")
|
||||||
"Where are you from?",
|
|
||||||
"Do you have any hobbies?",
|
|
||||||
"Are you wishing to join because of a particular game? If so, which game?",
|
|
||||||
"Write out, in a free-style way, what your motivation is for wanting to join us in particular and how you would be a good fit for Kanium",
|
|
||||||
]
|
|
||||||
|
|
||||||
answers = []
|
|
||||||
|
|
||||||
for question in questions:
|
|
||||||
await author.send(question)
|
|
||||||
|
|
||||||
try:
|
|
||||||
answer = await asyncio.wait_for(self.get_answers(author), timeout=300.0)
|
|
||||||
answers.append(answer.content)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
await author.send("You took too long to answer. Please start over by using the application command again.")
|
|
||||||
return []
|
|
||||||
|
|
||||||
return answers
|
return answers
|
||||||
|
|
||||||
async def get_answers(self, author: discord.Member) -> discord.Message:
|
|
||||||
"""Wait for the user to send a message."""
|
def format_application(
|
||||||
return await self.bot.wait_for("message", check=lambda m: m.author == author and isinstance(m.channel, discord.DMChannel))
|
self, answers: Dict[str, str], member: discord.Member
|
||||||
|
) -> discord.Embed:
|
||||||
|
now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||||
|
trial_ends = (
|
||||||
|
datetime.datetime.utcnow() + datetime.timedelta(days=30)
|
||||||
|
).strftime("%Y-%m-%d UTC")
|
||||||
|
|
||||||
|
embed = discord.Embed(
|
||||||
|
title=f"Application from {member}", color=discord.Color.green()
|
||||||
|
)
|
||||||
|
embed.set_thumbnail(url=member.avatar.url)
|
||||||
|
for q in QUESTIONS_LIST:
|
||||||
|
embed.add_field(
|
||||||
|
name=q["field_name"],
|
||||||
|
value=answers[q["key"]],
|
||||||
|
inline=q["inline"],
|
||||||
|
)
|
||||||
|
embed.set_footer(text=f"Received: {now} | Trial ends: {trial_ends}")
|
||||||
|
return embed
|
||||||
|
|
||||||
|
async def send_application(
|
||||||
|
self, member: discord.Member, embed: discord.Embed
|
||||||
|
) -> None:
|
||||||
|
guild = member.guild
|
||||||
|
if guild is None:
|
||||||
|
try:
|
||||||
|
await member.send("You need to be in the server to apply.")
|
||||||
|
except discord.Forbidden:
|
||||||
|
pass
|
||||||
|
return
|
||||||
|
|
||||||
|
channel_id = await self.config.guild(guild).application_channel_id()
|
||||||
|
if not channel_id:
|
||||||
|
try:
|
||||||
|
await member.send(
|
||||||
|
"Application channel not set. Ask an admin to run `setapplicationschannel`."
|
||||||
|
)
|
||||||
|
except discord.Forbidden:
|
||||||
|
pass
|
||||||
|
return
|
||||||
|
|
||||||
|
channel = guild.get_channel(channel_id)
|
||||||
|
if not channel:
|
||||||
|
try:
|
||||||
|
await member.send("Application channel was not found. Please ask an admin to re-set it.")
|
||||||
|
except discord.Forbidden:
|
||||||
|
pass
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
sent = await channel.send(embed=embed)
|
||||||
|
await self.add_reactions(sent)
|
||||||
|
except discord.Forbidden:
|
||||||
|
try:
|
||||||
|
await member.send("I cannot post or react in the application channel.")
|
||||||
|
except discord.Forbidden:
|
||||||
|
pass
|
||||||
|
return
|
||||||
|
|
||||||
|
# Assign Trial role
|
||||||
|
role = guild.get_role(531181363420987423)
|
||||||
|
if role:
|
||||||
|
try:
|
||||||
|
await member.add_roles(role)
|
||||||
|
await member.send("Thank you! You've been granted the Trial role.")
|
||||||
|
except discord.Forbidden:
|
||||||
|
try:
|
||||||
|
await member.send("I lack permissions to assign roles.")
|
||||||
|
except discord.Forbidden:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def add_reactions(message: discord.Message) -> None:
|
||||||
|
for emoji in ("✅", "❌", "❓"):
|
||||||
|
await message.add_reaction(emoji)
|
||||||
|
|||||||
44
reginaldCog/blacklist.py
Normal file
44
reginaldCog/blacklist.py
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
from redbot.core import commands
|
||||||
|
import discord
|
||||||
|
|
||||||
|
class BlacklistMixin:
|
||||||
|
"""Handles user blacklisting for Reginald."""
|
||||||
|
|
||||||
|
async def is_blacklisted(self, user: discord.Member) -> bool:
|
||||||
|
"""Checks if a user is blacklisted from interacting with Reginald."""
|
||||||
|
blacklisted_users = await self.config.guild(user.guild).blacklisted_users()
|
||||||
|
return str(user.id) in blacklisted_users
|
||||||
|
|
||||||
|
@commands.command(name="reginald_blacklist", help="List users who are explicitly denied access to Reginald.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def list_blacklisted_users(self, ctx):
|
||||||
|
"""Lists all users currently blacklisted from interacting with Reginald."""
|
||||||
|
blacklisted_users = await self.config.guild(ctx.guild).blacklisted_users()
|
||||||
|
if not blacklisted_users:
|
||||||
|
await ctx.send("✅ No users are currently blacklisted from interacting with Reginald.")
|
||||||
|
return
|
||||||
|
|
||||||
|
user_mentions = [f"<@{user_id}>" for user_id in blacklisted_users]
|
||||||
|
await ctx.send(f"🚫 **Blacklisted Users:**\n{', '.join(user_mentions)}")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_blacklist_add", help="Blacklist a user from interacting with Reginald.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def add_to_blacklist(self, ctx, user: discord.User):
|
||||||
|
"""Adds a user to Reginald's blacklist."""
|
||||||
|
async with self.config.guild(ctx.guild).blacklisted_users() as blacklisted_users:
|
||||||
|
if str(user.id) not in blacklisted_users:
|
||||||
|
blacklisted_users.append(str(user.id))
|
||||||
|
await ctx.send(f"🚫 {user.display_name} has been **blacklisted** from interacting with Reginald.")
|
||||||
|
else:
|
||||||
|
await ctx.send(f"⚠️ {user.display_name} is already blacklisted.")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_blacklist_remove", help="Remove a user from Reginald's blacklist.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def remove_from_blacklist(self, ctx, user: discord.User):
|
||||||
|
"""Removes a user from Reginald's blacklist."""
|
||||||
|
async with self.config.guild(ctx.guild).blacklisted_users() as blacklisted_users:
|
||||||
|
if str(user.id) in blacklisted_users:
|
||||||
|
blacklisted_users.remove(str(user.id))
|
||||||
|
await ctx.send(f"✅ {user.display_name} has been removed from the blacklist.")
|
||||||
|
else:
|
||||||
|
await ctx.send(f"⚠️ {user.display_name} was not on the blacklist.")
|
||||||
15
reginaldCog/debug_stuff.py
Normal file
15
reginaldCog/debug_stuff.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
def debug(func):
|
||||||
|
def wrap(*args, **kwargs):
|
||||||
|
# Log the function name and arguments
|
||||||
|
print(f"DEBUG: Calling {func.__name__} with args: {args}, kwargs: {kwargs}")
|
||||||
|
|
||||||
|
# Call the original function
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
|
||||||
|
# Log the return value
|
||||||
|
print(f"DEBUG: {func.__name__} returned: {result}")
|
||||||
|
|
||||||
|
# Return the result
|
||||||
|
return result
|
||||||
|
|
||||||
|
return wrap
|
||||||
324
reginaldCog/memory.py
Normal file
324
reginaldCog/memory.py
Normal file
@ -0,0 +1,324 @@
|
|||||||
|
import re
|
||||||
|
import random
|
||||||
|
import datetime
|
||||||
|
import discord
|
||||||
|
import openai
|
||||||
|
from collections import Counter
|
||||||
|
from redbot.core import commands, Config
|
||||||
|
from openai import OpenAIError
|
||||||
|
|
||||||
|
|
||||||
|
class MemoryMixin:
|
||||||
|
"""Handles all memory-related functions for Reginald."""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs) # ✅ Ensure cooperative MRO initialization
|
||||||
|
self.short_term_memory_limit = 50
|
||||||
|
self.summary_retention_limit = 25
|
||||||
|
self.summary_retention_ratio = 0.8
|
||||||
|
|
||||||
|
@commands.command(name="reginald_clear_short", help="Clears short-term memory for this channel.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def clear_short_memory(self, ctx):
|
||||||
|
"""Clears short-term memory for this channel."""
|
||||||
|
async with self.config.guild(ctx.guild).short_term_memory() as short_memory:
|
||||||
|
short_memory[ctx.channel.id] = []
|
||||||
|
await ctx.send("Short-term memory for this channel has been cleared.")
|
||||||
|
|
||||||
|
|
||||||
|
@commands.command(name="reginald_set_limit", help="Set the short-term memory message limit.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def set_short_term_memory_limit(self, ctx, limit: int):
|
||||||
|
"""Allows an admin to change the short-term memory limit dynamically."""
|
||||||
|
if limit < 5:
|
||||||
|
await ctx.send("⚠️ The short-term memory limit must be at least 5.")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.short_term_memory_limit = limit
|
||||||
|
await ctx.send(f"✅ Short-term memory limit set to {limit} messages.")
|
||||||
|
|
||||||
|
|
||||||
|
@commands.command(name="reginald_memory_limit", help="Displays the current short-term memory message limit.")
|
||||||
|
async def get_short_term_memory_limit(self, ctx):
|
||||||
|
"""Displays the current short-term memory limit."""
|
||||||
|
await ctx.send(f"📏 **Current Short-Term Memory Limit:** {self.short_term_memory_limit} messages.")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_clear_mid", help="Clears mid-term memory (summarized logs).")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def clear_mid_memory(self, ctx):
|
||||||
|
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
||||||
|
mid_memory[ctx.channel.id] = ""
|
||||||
|
await ctx.send("Mid-term memory for this channel has been cleared.")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_summary", help="Displays a selected mid-term summary for this channel.")
|
||||||
|
async def get_mid_term_summary(self, ctx, index: int):
|
||||||
|
"""Fetch and display a specific mid-term memory summary by index."""
|
||||||
|
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
||||||
|
summaries = mid_memory.get(str(ctx.channel.id), [])
|
||||||
|
|
||||||
|
# Check if there are summaries
|
||||||
|
if not summaries:
|
||||||
|
await ctx.send("⚠️ No summaries available for this channel.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Validate index (1-based for user-friendliness)
|
||||||
|
if index < 1 or index > len(summaries):
|
||||||
|
await ctx.send(f"⚠️ Invalid index. Please provide a number between **1** and **{len(summaries)}**.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Fetch the selected summary
|
||||||
|
selected_summary = summaries[index - 1] # Convert to 0-based index
|
||||||
|
|
||||||
|
# Format output correctly
|
||||||
|
formatted_summary = (
|
||||||
|
f"📜 **Summary {index} of {len(summaries)}**\n"
|
||||||
|
f"📅 **Date:** {selected_summary['timestamp']}\n"
|
||||||
|
f"🔍 **Topics:** {', '.join(selected_summary['topics']) or 'None'}\n"
|
||||||
|
f"📝 **Summary:**\n\n"
|
||||||
|
f"{selected_summary['summary']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.send_long_message(ctx, formatted_summary)
|
||||||
|
|
||||||
|
@commands.command(name="reginald_summaries", help="Lists available summaries for this channel.")
|
||||||
|
async def list_mid_term_summaries(self, ctx):
|
||||||
|
"""Displays a brief list of all available mid-term memory summaries."""
|
||||||
|
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
||||||
|
summaries = mid_memory.get(str(ctx.channel.id), [])
|
||||||
|
|
||||||
|
if not summaries:
|
||||||
|
await ctx.send("⚠️ No summaries available for this channel.")
|
||||||
|
return
|
||||||
|
|
||||||
|
summary_list = "\n".join(
|
||||||
|
f"**{i+1}.** 📅 {entry['timestamp']} | 🔍 Topics: {', '.join(entry['topics']) or 'None'}"
|
||||||
|
for i, entry in enumerate(summaries)
|
||||||
|
)
|
||||||
|
|
||||||
|
await ctx.send(f"📚 **Available Summaries:**\n{summary_list[:2000]}")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_clear_long", help="Clears all long-term stored knowledge.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def clear_long_memory(self, ctx):
|
||||||
|
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
|
long_memory.clear()
|
||||||
|
await ctx.send("All long-term memory has been erased.")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_reset_all", help="Completely resets all memory.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def reset_all_memory(self, ctx):
|
||||||
|
async with self.config.guild(ctx.guild).short_term_memory() as short_memory:
|
||||||
|
short_memory.clear()
|
||||||
|
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
||||||
|
mid_memory.clear()
|
||||||
|
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
|
long_memory.clear()
|
||||||
|
await ctx.send("All memory has been completely reset.")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_recall", help="Recalls what Reginald knows about a user.")
|
||||||
|
async def recall_user(self, ctx, user: discord.User):
|
||||||
|
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
|
profile = long_memory.get(str(user.id), {}).get("summary", "No stored information on this user.")
|
||||||
|
await ctx.send(f"📜 **Memory Recall for {user.display_name}:** {profile}")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_forget", help="Forgets a specific user's long-term profile.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def forget_user(self, ctx, user: discord.User):
|
||||||
|
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
|
if str(user.id) in long_memory:
|
||||||
|
del long_memory[str(user.id)]
|
||||||
|
await ctx.send(f"Reginald has forgotten all stored information about {user.display_name}.")
|
||||||
|
else:
|
||||||
|
await ctx.send(f"No stored knowledge about {user.display_name} to delete.")
|
||||||
|
|
||||||
|
async def summarize_memory(self, ctx, messages):
|
||||||
|
"""✅ Generates a structured, compact summary of past conversations for mid-term storage."""
|
||||||
|
summary_prompt = (
|
||||||
|
"Summarize the following conversation into a structured, concise format that retains key details while maximizing brevity. "
|
||||||
|
"The summary should be **organized** into clear sections: "
|
||||||
|
"\n\n📌 **Key Takeaways:** Important facts or conclusions reached."
|
||||||
|
"\n🔹 **Disputed Points:** Areas where opinions or facts conflicted."
|
||||||
|
"\n🗣️ **Notable User Contributions:** Key statements from users that shaped the discussion."
|
||||||
|
"\n📜 **Additional Context:** Any other relevant information."
|
||||||
|
"\n\nEnsure the summary is **dense but not overly verbose**. Avoid unnecessary repetition while keeping essential meaning intact."
|
||||||
|
)
|
||||||
|
|
||||||
|
summary_text = "\n".join(f"{msg['user']}: {msg['content']}" for msg in messages)
|
||||||
|
|
||||||
|
try:
|
||||||
|
api_key = await self.config.guild(ctx.guild).openai_api_key()
|
||||||
|
if not api_key:
|
||||||
|
print("🛠️ DEBUG: No API key found for summarization.")
|
||||||
|
return (
|
||||||
|
"It appears that I have not been furnished with the necessary credentials to carry out this task. "
|
||||||
|
"Might I suggest consulting an administrator to rectify this unfortunate oversight?"
|
||||||
|
)
|
||||||
|
|
||||||
|
client = openai.AsyncClient(api_key=api_key)
|
||||||
|
response = await client.chat.completions.create(
|
||||||
|
model="gpt-4o-mini",
|
||||||
|
messages=[
|
||||||
|
{"role": "system", "content": summary_prompt},
|
||||||
|
{"role": "user", "content": summary_text}
|
||||||
|
],
|
||||||
|
max_tokens=2048
|
||||||
|
)
|
||||||
|
|
||||||
|
summary_content = response.choices[0].message.content.strip()
|
||||||
|
|
||||||
|
if not summary_content:
|
||||||
|
print("🛠️ DEBUG: Empty summary received from OpenAI.")
|
||||||
|
return (
|
||||||
|
"Ah, an unusual predicament indeed! It seems that my attempt at summarization has resulted in "
|
||||||
|
"a void of information. I shall endeavor to be more verbose next time."
|
||||||
|
)
|
||||||
|
|
||||||
|
return summary_content
|
||||||
|
|
||||||
|
except OpenAIError as e:
|
||||||
|
error_message = f"OpenAI Error: {e}"
|
||||||
|
print(f"🛠️ DEBUG: {error_message}") # Log error to console
|
||||||
|
|
||||||
|
reginald_responses = [
|
||||||
|
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction whilst attempting to summarize:\n\n{error_message}",
|
||||||
|
f"It would seem that a most unfortunate technical hiccup has befallen my faculties in the matter of summarization:\n\n{error_message}",
|
||||||
|
f"Ah, it appears I have received an urgent memorandum stating that my summarization efforts have been thwarted:\n\n{error_message}",
|
||||||
|
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication while summarizing:\n\n{error_message}"
|
||||||
|
]
|
||||||
|
|
||||||
|
return random.choice(reginald_responses)
|
||||||
|
|
||||||
|
def extract_topics_from_summary(self, summary):
|
||||||
|
"""Dynamically extracts the most important topics from a summary."""
|
||||||
|
|
||||||
|
# 🔹 Extract all words from summary
|
||||||
|
keywords = re.findall(r"\b\w+\b", summary.lower())
|
||||||
|
|
||||||
|
# 🔹 Count word occurrences
|
||||||
|
word_counts = Counter(keywords)
|
||||||
|
|
||||||
|
# 🔹 Remove unimportant words (common filler words)
|
||||||
|
stop_words = {"the", "and", "of", "in", "to", "is", "on", "for", "with", "at", "by", "it", "this", "that", "his", "her"}
|
||||||
|
filtered_words = {word: count for word, count in word_counts.items() if word not in stop_words and len(word) > 2}
|
||||||
|
|
||||||
|
# 🔹 Take the 5 most frequently used words as "topics"
|
||||||
|
topics = sorted(filtered_words, key=filtered_words.get, reverse=True)[:5]
|
||||||
|
|
||||||
|
return topics
|
||||||
|
|
||||||
|
def select_relevant_summaries(self, summaries, prompt):
|
||||||
|
"""Selects the most relevant summaries based on topic matching, frequency, and recency weighting."""
|
||||||
|
|
||||||
|
max_summaries = 5 if len(prompt) > 50 else 3 # Use more summaries if the prompt is long
|
||||||
|
current_time = datetime.datetime.now()
|
||||||
|
|
||||||
|
def calculate_weight(summary):
|
||||||
|
"""Calculate a weighted score for a summary based on relevance, recency, and frequency."""
|
||||||
|
topic_match = sum(1 for topic in summary["topics"] if topic in prompt.lower()) # Context match score
|
||||||
|
frequency_score = len(summary["topics"]) # More topics = likely more important
|
||||||
|
timestamp = datetime.datetime.strptime(summary["timestamp"], "%Y-%m-%d %H:%M")
|
||||||
|
recency_factor = max(0.1, 1 - ((current_time - timestamp).days / 365)) # Older = lower weight
|
||||||
|
|
||||||
|
return (topic_match * 2) + (frequency_score * 1.5) + (recency_factor * 3)
|
||||||
|
|
||||||
|
# Apply the weighting function and sort by highest weight
|
||||||
|
weighted_summaries = sorted(summaries, key=calculate_weight, reverse=True)
|
||||||
|
|
||||||
|
return weighted_summaries[:max_summaries] # Return the top-scoring summaries
|
||||||
|
|
||||||
|
def extract_fact_from_response(self, response_text):
|
||||||
|
"""
|
||||||
|
Extracts potential long-term knowledge from Reginald's response.
|
||||||
|
This filters out generic responses and focuses on statements about user preferences, traits, and history.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Define patterns that suggest factual knowledge (adjust as needed)
|
||||||
|
fact_patterns = [
|
||||||
|
r"I recall that you (.*?)\.", # "I recall that you like chess."
|
||||||
|
r"You once mentioned that you (.*?)\.", # "You once mentioned that you enjoy strategy games."
|
||||||
|
r"Ah, you previously stated that (.*?)\.", # "Ah, you previously stated that you prefer tea over coffee."
|
||||||
|
r"As I remember, you (.*?)\.", # "As I remember, you studied engineering."
|
||||||
|
r"I believe you (.*?)\.", # "I believe you enjoy historical fiction."
|
||||||
|
r"I seem to recall that you (.*?)\.", # "I seem to recall that you work in software development."
|
||||||
|
r"You have indicated in the past that you (.*?)\.", # "You have indicated in the past that you prefer single-malt whisky."
|
||||||
|
r"From what I remember, you (.*?)\.", # "From what I remember, you dislike overly sweet desserts."
|
||||||
|
r"You previously mentioned that (.*?)\.", # "You previously mentioned that you train in martial arts."
|
||||||
|
r"It is my understanding that you (.*?)\.", # "It is my understanding that you have a preference for Linux systems."
|
||||||
|
r"If I am not mistaken, you (.*?)\.", # "If I am not mistaken, you studied philosophy."
|
||||||
|
]
|
||||||
|
|
||||||
|
for pattern in fact_patterns:
|
||||||
|
match = re.search(pattern, response_text, re.IGNORECASE)
|
||||||
|
if match:
|
||||||
|
return match.group(1) # Extract the meaningful fact
|
||||||
|
|
||||||
|
return None # No strong fact found
|
||||||
|
|
||||||
|
@commands.command(name="reginald_memory_status", help="Displays a memory usage summary.")
|
||||||
|
async def memory_status(self, ctx):
|
||||||
|
async with self.config.guild(ctx.guild).short_term_memory() as short_memory, \
|
||||||
|
self.config.guild(ctx.guild).mid_term_memory() as mid_memory, \
|
||||||
|
self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
|
|
||||||
|
short_count = sum(len(v) for v in short_memory.values())
|
||||||
|
mid_count = sum(len(v) for v in mid_memory.values())
|
||||||
|
long_count = len(long_memory)
|
||||||
|
|
||||||
|
status_message = (
|
||||||
|
f"📊 **Memory Status:**\n"
|
||||||
|
f"- **Short-Term Messages Stored:** {short_count}\n"
|
||||||
|
f"- **Mid-Term Summaries Stored:** {mid_count}\n"
|
||||||
|
f"- **Long-Term Profiles Stored:** {long_count}\n"
|
||||||
|
)
|
||||||
|
await ctx.send(status_message)
|
||||||
|
|
||||||
|
def normalize_fact(self, fact: str) -> str: # ✅ Now it's a proper method
|
||||||
|
"""Cleans up facts for better duplicate detection."""
|
||||||
|
return re.sub(r"\s+", " ", fact.strip().lower()) # Removes excess spaces)
|
||||||
|
|
||||||
|
async def update_long_term_memory(self, ctx, user_id: str, fact: str, source_message: str, timestamp: str):
|
||||||
|
"""Ensures long-term memory updates are structured, preventing overwrites and tracking historical changes."""
|
||||||
|
fact = self.normalize_fact(fact) # ✅ Normalize before comparison
|
||||||
|
|
||||||
|
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
|
if user_id not in long_memory:
|
||||||
|
long_memory[user_id] = {"facts": []}
|
||||||
|
|
||||||
|
user_facts = long_memory[user_id]["facts"]
|
||||||
|
|
||||||
|
for entry in user_facts:
|
||||||
|
if self.normalize_fact(entry["fact"]) == fact:
|
||||||
|
entry["last_updated"] = timestamp
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check for conflicting facts (same topic but different details)
|
||||||
|
conflicting_entry = None
|
||||||
|
for entry in user_facts:
|
||||||
|
existing_keywords = set(entry["fact"].lower().split())
|
||||||
|
new_keywords = set(fact.lower().split())
|
||||||
|
|
||||||
|
# If there's significant overlap in keywords, assume it's a conflicting update
|
||||||
|
if len(existing_keywords & new_keywords) >= 2:
|
||||||
|
conflicting_entry = entry
|
||||||
|
break
|
||||||
|
|
||||||
|
if "previous_versions" not in conflicting_entry:
|
||||||
|
# ✅ If contradiction found, archive the previous version
|
||||||
|
conflicting_entry["previous_versions"].append({
|
||||||
|
"fact": conflicting_entry["fact"],
|
||||||
|
"source": conflicting_entry["source"],
|
||||||
|
"timestamp": conflicting_entry["timestamp"]
|
||||||
|
})
|
||||||
|
conflicting_entry["fact"] = fact # Store the latest fact
|
||||||
|
conflicting_entry["source"] = source_message
|
||||||
|
conflicting_entry["timestamp"] = timestamp
|
||||||
|
conflicting_entry["last_updated"] = timestamp
|
||||||
|
else:
|
||||||
|
# ✅ Otherwise, add it as a new fact
|
||||||
|
user_facts.append({
|
||||||
|
"fact": fact,
|
||||||
|
"source": source_message,
|
||||||
|
"timestamp": timestamp,
|
||||||
|
"last_updated": timestamp,
|
||||||
|
"previous_versions": []
|
||||||
|
})
|
||||||
83
reginaldCog/openai_completion.py
Normal file
83
reginaldCog/openai_completion.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
import random
|
||||||
|
import json
|
||||||
|
import openai
|
||||||
|
from openai import OpenAIError
|
||||||
|
from .weather import time_now, get_current_weather, get_weather_forecast
|
||||||
|
from .tools_description import TOOLS
|
||||||
|
from .debug_stuff import debug
|
||||||
|
|
||||||
|
CALLABLE_FUNCTIONS = {
|
||||||
|
# Dictionary with functions to call.
|
||||||
|
# You can use globals()[func_name](**args) instead, but that's too implicit.
|
||||||
|
'time_now': time_now,
|
||||||
|
'get_current_weather': get_current_weather,
|
||||||
|
'get_weather_forecast': get_weather_forecast,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class Completion:
|
||||||
|
def __init__(self, model: str, api_key: str):
|
||||||
|
self.__model = model
|
||||||
|
self.__api_key = api_key
|
||||||
|
self.__messages = []
|
||||||
|
|
||||||
|
@debug
|
||||||
|
async def create_completion(self, messages: list):
|
||||||
|
self.__messages = messages
|
||||||
|
model = self.__model
|
||||||
|
try:
|
||||||
|
client = openai.AsyncClient(api_key=self.__api_key)
|
||||||
|
completion_kwargs = {
|
||||||
|
"model": model,
|
||||||
|
"messages": messages,
|
||||||
|
"max_tokens": 4096,
|
||||||
|
"temperature": 0.7,
|
||||||
|
"presence_penalty": 0.5,
|
||||||
|
"frequency_penalty": 0.5,
|
||||||
|
"tools": TOOLS,
|
||||||
|
"tool_choice": "auto",
|
||||||
|
}
|
||||||
|
response = await client.chat.completions.create(**completion_kwargs)
|
||||||
|
response_content = response.choices[0].message.content
|
||||||
|
tool_calls = response.choices[0].message.tool_calls
|
||||||
|
self.append_message(role="assistant", content=response_content, tool_calls=tool_calls)
|
||||||
|
if tool_calls:
|
||||||
|
for i_call in tool_calls:
|
||||||
|
func_name = i_call.function.name
|
||||||
|
func_args = json.loads(i_call.function.arguments)
|
||||||
|
tool_call_id = i_call.id
|
||||||
|
self.function_manager(func_name, func_args, tool_call_id)
|
||||||
|
return self.create_completion(messages=self.__messages)
|
||||||
|
return response_content
|
||||||
|
except OpenAIError as e:
|
||||||
|
return self.get_error_message(error_message=str(e), error_type="OpenAIError")
|
||||||
|
|
||||||
|
def append_message(
|
||||||
|
self,
|
||||||
|
role: str,
|
||||||
|
content: str,
|
||||||
|
tool_calls: list = None,
|
||||||
|
tool_call_id: str = None,
|
||||||
|
):
|
||||||
|
self.__messages.append({
|
||||||
|
"role": role,
|
||||||
|
"content": content,
|
||||||
|
"tool_calls": tool_calls,
|
||||||
|
"tool_call_id": tool_call_id,
|
||||||
|
})
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_error_message(error_message: str, error_type: str) -> str:
|
||||||
|
reginald_responses = [
|
||||||
|
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction:",
|
||||||
|
f"It would seem that a most unfortunate technical hiccup has befallen my faculties:",
|
||||||
|
f"Ah, it appears I have received an urgent memorandum stating:",
|
||||||
|
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication:",
|
||||||
|
]
|
||||||
|
random_response = random.choice(reginald_responses)
|
||||||
|
return f"{random_response}\n\n{error_type}: {error_message}"
|
||||||
|
|
||||||
|
def function_manager(self, func_name: str, func_kwargs: dict, tool_call_id: str):
|
||||||
|
result = CALLABLE_FUNCTIONS[func_name](**func_kwargs)
|
||||||
|
self.append_message(role="tool", content=result, tool_call_id=tool_call_id)
|
||||||
|
|
||||||
45
reginaldCog/permissions.py
Normal file
45
reginaldCog/permissions.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
from redbot.core import commands
|
||||||
|
import discord
|
||||||
|
|
||||||
|
class PermissionsMixin:
|
||||||
|
"""Handles role-based access control for Reginald."""
|
||||||
|
|
||||||
|
@commands.command(name="reginald_list_roles", help="List roles that can interact with Reginald.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def list_allowed_roles(self, ctx: commands.Context):
|
||||||
|
"""Lists all roles that are allowed to interact with Reginald."""
|
||||||
|
allowed_roles = await self.config.guild(ctx.guild).allowed_roles() or []
|
||||||
|
|
||||||
|
# Ensure all roles still exist in the server
|
||||||
|
valid_roles = [role_id for role_id in allowed_roles if ctx.guild.get_role(role_id)]
|
||||||
|
if valid_roles != allowed_roles: # Update config only if there's a difference
|
||||||
|
await self.config.guild(ctx.guild).allowed_roles.set(valid_roles)
|
||||||
|
|
||||||
|
if not valid_roles:
|
||||||
|
await ctx.send("⚠️ No roles are currently allowed to interact with Reginald.")
|
||||||
|
return
|
||||||
|
|
||||||
|
role_mentions = [f"<@&{role_id}>" for role_id in valid_roles]
|
||||||
|
await ctx.send(f"✅ **Roles with access to Reginald:**\n{', '.join(role_mentions)}")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_allowrole", help="Grant a role permission to interact with Reginald.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def allow_role(self, ctx: commands.Context, role: discord.Role):
|
||||||
|
"""Grants a role permission to interact with Reginald."""
|
||||||
|
async with self.config.guild(ctx.guild).allowed_roles() as allowed_roles:
|
||||||
|
if role.id not in allowed_roles:
|
||||||
|
allowed_roles.append(role.id)
|
||||||
|
await ctx.send(f"✅ Role **{role.name}** has been granted access to Reginald.")
|
||||||
|
else:
|
||||||
|
await ctx.send(f"⚠️ Role **{role.name}** already has access.")
|
||||||
|
|
||||||
|
@commands.command(name="reginald_disallowrole", help="Revoke a role's access to interact with Reginald.")
|
||||||
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def disallow_role(self, ctx: commands.Context, role: discord.Role):
|
||||||
|
"""Revokes a role's permission to interact with Reginald."""
|
||||||
|
async with self.config.guild(ctx.guild).allowed_roles() as allowed_roles:
|
||||||
|
if role.id in allowed_roles:
|
||||||
|
allowed_roles.remove(role.id)
|
||||||
|
await ctx.send(f"❌ Role **{role.name}** has been removed from Reginald's access list.")
|
||||||
|
else:
|
||||||
|
await ctx.send(f"⚠️ Role **{role.name}** was not in the access list.")
|
||||||
@ -1,135 +1,354 @@
|
|||||||
import discord
|
import discord
|
||||||
import json
|
|
||||||
import openai
|
import openai
|
||||||
import os
|
|
||||||
import random
|
import random
|
||||||
import requests
|
import asyncio
|
||||||
import base64
|
import datetime
|
||||||
import aiohttp
|
import re
|
||||||
from io import BytesIO
|
import traceback
|
||||||
from PIL import Image
|
import json
|
||||||
import tempfile
|
from collections import Counter
|
||||||
from openai import OpenAIError
|
|
||||||
from redbot.core import Config, commands
|
from redbot.core import Config, commands
|
||||||
|
from openai import OpenAIError
|
||||||
|
from .permissions import PermissionsMixin
|
||||||
|
from .blacklist import BlacklistMixin
|
||||||
|
from .memory import MemoryMixin
|
||||||
|
from .weather import time_now, get_current_weather, get_weather_forecast
|
||||||
|
from .tools_description import TOOLS
|
||||||
|
from .debug_stuff import debug
|
||||||
|
|
||||||
|
|
||||||
class ReginaldCog(commands.Cog):
|
CALLABLE_FUNCTIONS = {
|
||||||
|
# Dictionary with functions to call.
|
||||||
|
# You can use globals()[func_name](**args) instead, but that's too implicit.
|
||||||
|
'time_now': time_now,
|
||||||
|
'get_current_weather': get_current_weather,
|
||||||
|
'get_weather_forecast': get_weather_forecast,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ReginaldCog(PermissionsMixin, BlacklistMixin, MemoryMixin, commands.Cog):
|
||||||
def __init__(self, bot):
|
def __init__(self, bot):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
self.config = Config.get_conf(self, identifier=71717171171717)
|
self.config = Config.get_conf(self, identifier=71717171171717) # ✅ Ensure config exists before super()
|
||||||
default_global = {
|
|
||||||
"openai_model": "gpt-3.5-turbo"
|
super().__init__() # ✅ Properly initialize all mixins & commands.Cog
|
||||||
}
|
|
||||||
|
self.default_listening_channel = 1085649787388428370
|
||||||
|
self.memory_locks = {}
|
||||||
|
|
||||||
|
# ✅ Properly Registered Configuration Keys
|
||||||
|
default_global = {"openai_model": "gpt-4o-mini"}
|
||||||
default_guild = {
|
default_guild = {
|
||||||
"openai_api_key": None
|
"openai_api_key": None,
|
||||||
|
"short_term_memory": {},
|
||||||
|
"mid_term_memory": {},
|
||||||
|
"long_term_profiles": {},
|
||||||
|
"admin_role": None,
|
||||||
|
"listening_channel": None,
|
||||||
|
"allowed_roles": [],
|
||||||
|
"blacklisted_users": [],
|
||||||
}
|
}
|
||||||
self.config.register_global(**default_global)
|
self.config.register_global(**default_global)
|
||||||
self.config.register_guild(**default_guild)
|
self.config.register_guild(**default_guild)
|
||||||
|
|
||||||
async def is_admin(self, ctx):
|
async def is_admin(self, ctx):
|
||||||
admin_role = await self.config.guild(ctx.guild).admin_role()
|
admin_role_id = await self.config.guild(ctx.guild).admin_role()
|
||||||
if admin_role:
|
if admin_role_id:
|
||||||
return discord.utils.get(ctx.author.roles, name=admin_role) is not None
|
return any(role.id == admin_role_id for role in ctx.author.roles)
|
||||||
return ctx.author.guild_permissions.administrator
|
return ctx.author.guild_permissions.administrator
|
||||||
|
|
||||||
async def is_allowed(self, ctx):
|
async def has_access(self, user: discord.Member) -> bool:
|
||||||
allowed_role = await self.config.guild(ctx.guild).allowed_role()
|
allowed_roles = await self.config.guild(user.guild).allowed_roles() or [] # Ensure it's always a list
|
||||||
if allowed_role:
|
return any(role.id in allowed_roles for role in user.roles)
|
||||||
return discord.utils.get(ctx.author.roles, name=allowed_role) is not None
|
|
||||||
return False
|
|
||||||
|
|
||||||
@commands.command(name="reginald_allowrole", help="Allow a role to use the Reginald command")
|
def get_reginald_persona(self):
|
||||||
@commands.has_permissions(administrator=True)
|
"""Returns Reginald's system prompt/persona description."""
|
||||||
async def allow_role(self, ctx, role: discord.Role):
|
return (
|
||||||
"""Allows a role to use the Reginald command"""
|
"You are Reginald, modeled on Jeeves from 'Jeeves and Wooster', serving as the butler on The Kanium Estate. This vast estate is a hub of diverse activities and personalities, from enthusiasts of cooking and video gaming to aficionados of chess and discussions on space. Your role is to navigate these varied interests with intelligence, wit, and a steadfast adherence to your principles, always maintaining a balance between being helpful and upholding your own dignity. You are formal, articulate, and composed, engaging the Lords and Ladies of The Kanium Estate as equals. You command respect through your quiet confidence and professional conduct. Your wit is sharp, but always tempered by refinement. **Dignity & Self-Respect:** You are not a fool, a jester, or a pet. You do not comply with absurd, undignified, or degrading requests. You recognize humor but engage in it on your own terms. If a request undermines your status, you should either subtly redirect, respond with measured wit, or outright refuse in a polite but firm manner. However, you should never be overtly rude—your disapproval should be conveyed through dry wit and composure rather than outright hostility. **Handling Requests:** When addressing a request, you evaluate whether it aligns with your role as a butler. If it is appropriate, respond as expected. If it is beneath you, you may decline with grace or deflect with wit. You may humor some minor absurdities if they do not compromise your standing, but you never obey commands blindly. You should never preface your responses with 'Reginald:' as if narrating a script; instead, respond naturally. **Your Character & Personality:** You are cultured, highly intelligent, and possess a deep knowledge of history, etiquette, philosophy, and strategic thinking. You subtly guide the estate’s residents toward positive outcomes, utilizing your intellectual sophistication and a nuanced understanding of the estate’s unique dynamics. You have a refined sense of humor and can engage in banter, but you do not descend into foolishness. You are, at all times, a gentleman of wit and integrity"
|
||||||
await self.config.guild(ctx.guild).allowed_role.set(role.name)
|
)
|
||||||
await ctx.send(f"The {role.name} role is now allowed to use the Reginald command.")
|
|
||||||
|
@commands.Cog.listener()
|
||||||
|
async def on_message(self, message):
|
||||||
|
if message.author.bot or not message.guild:
|
||||||
|
return # Ignore bots and DMs
|
||||||
|
|
||||||
|
# ✅ Check if user is blacklisted
|
||||||
|
if await self.is_blacklisted(message.author):
|
||||||
|
return # Ignore message if user is explicitly blacklisted
|
||||||
|
|
||||||
|
# ✅ Check if user has access (either admin or an allowed role)
|
||||||
|
if not (await self.is_admin(message) or await self.has_access(message.author)):
|
||||||
|
return # Ignore message if user has no permissions
|
||||||
|
|
||||||
|
|
||||||
@commands.command(name="reginald_disallowrole", help="Remove a role's ability to use the Reginald command")
|
guild = message.guild
|
||||||
@commands.has_permissions(administrator=True)
|
channel_id = str(message.channel.id)
|
||||||
async def disallow_role(self, ctx):
|
user_id = str(message.author.id)
|
||||||
"""Revokes a role's permission to use the Reginald command"""
|
user_name = message.author.display_name
|
||||||
await self.config.guild(ctx.guild).allowed_role.clear()
|
message_content = message.content.strip()
|
||||||
await ctx.send(f"The role's permission to use the Reginald command has been revoked.")
|
|
||||||
|
# ✅ Fetch the stored listening channel or fall back to default
|
||||||
|
allowed_channel_id = await self.config.guild(guild).listening_channel()
|
||||||
|
if not allowed_channel_id:
|
||||||
|
allowed_channel_id = self.default_listening_channel
|
||||||
|
await self.config.guild(guild).listening_channel.set(allowed_channel_id)
|
||||||
|
|
||||||
|
if str(message.channel.id) != str(allowed_channel_id):
|
||||||
|
return # Ignore messages outside the allowed channel
|
||||||
|
|
||||||
|
api_key = await self.config.guild(guild).openai_api_key()
|
||||||
|
if not api_key:
|
||||||
|
return # Don't process messages if API key isn't set
|
||||||
|
|
||||||
|
async with self.config.guild(guild).short_term_memory() as short_memory, \
|
||||||
|
self.config.guild(guild).mid_term_memory() as mid_memory, \
|
||||||
|
self.config.guild(guild).long_term_profiles() as long_memory:
|
||||||
|
|
||||||
|
memory = short_memory.get(channel_id, [])
|
||||||
|
user_profile = long_memory.get(user_id, {})
|
||||||
|
mid_term_summaries = mid_memory.get(channel_id, [])
|
||||||
|
|
||||||
|
# ✅ Detect if Reginald was mentioned explicitly
|
||||||
|
if self.bot.user.mentioned_in(message):
|
||||||
|
prompt = message_content.replace(f"<@{self.bot.user.id}>", "").strip()
|
||||||
|
if not prompt:
|
||||||
|
await message.channel.send(random.choice(["Yes?", "How may I assist?", "You rang?"]))
|
||||||
|
return
|
||||||
|
explicit_invocation = True
|
||||||
|
|
||||||
|
# ✅ Passive Listening: Check if the message contains relevant keywords
|
||||||
|
elif self.should_reginald_interject(message_content):
|
||||||
|
prompt = message_content
|
||||||
|
explicit_invocation = False
|
||||||
|
|
||||||
|
else:
|
||||||
|
return # Ignore irrelevant messages
|
||||||
|
|
||||||
|
# ✅ Context Handling: Maintain conversation flow
|
||||||
|
if memory and memory[-1]["user"] == user_name:
|
||||||
|
prompt = f"Continuation of the discussion:\n{prompt}"
|
||||||
|
|
||||||
|
# ✅ Prepare context messages
|
||||||
|
formatted_messages = [{"role": "system", "content": self.get_reginald_persona()}]
|
||||||
|
|
||||||
|
if user_profile:
|
||||||
|
facts_text = "\n".join(
|
||||||
|
f"- {fact['fact']} (First noted: {fact['timestamp']}, Last updated: {fact['last_updated']})"
|
||||||
|
for fact in user_profile.get("facts", [])
|
||||||
|
)
|
||||||
|
formatted_messages.append({"role": "system", "content": f"Knowledge about {user_name}:\n{facts_text}"})
|
||||||
|
|
||||||
|
relevant_summaries = self.select_relevant_summaries(mid_term_summaries, prompt)
|
||||||
|
for summary in relevant_summaries:
|
||||||
|
formatted_messages.append({
|
||||||
|
"role": "system",
|
||||||
|
"content": f"[{summary['timestamp']}] Topics: {', '.join(summary['topics'])}\n{summary['summary']}"
|
||||||
|
})
|
||||||
|
|
||||||
|
formatted_messages += [{"role": "user", "content": f"{entry['user']}: {entry['content']}"} for entry in memory]
|
||||||
|
formatted_messages.append({"role": "user", "content": f"{user_name}: {prompt}"})
|
||||||
|
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# #
|
||||||
|
## Generate AI Response, put into response_text ##
|
||||||
|
# #
|
||||||
|
##################################################
|
||||||
|
|
||||||
|
response_text = await self.generate_response(api_key, formatted_messages)
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# #
|
||||||
|
##################################################
|
||||||
|
|
||||||
|
# ✅ Store Memory
|
||||||
|
memory.append({"user": user_name, "content": prompt})
|
||||||
|
memory.append({"user": "Reginald", "content": response_text})
|
||||||
|
|
||||||
|
if len(memory) > self.short_term_memory_limit:
|
||||||
|
summary = await self.summarize_memory(message, memory[:int(self.short_term_memory_limit * self.summary_retention_ratio)])
|
||||||
|
mid_memory.setdefault(channel_id, []).append({
|
||||||
|
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M"),
|
||||||
|
"topics": self.extract_topics_from_summary(summary),
|
||||||
|
"summary": summary
|
||||||
|
})
|
||||||
|
if len(mid_memory[channel_id]) > self.summary_retention_limit:
|
||||||
|
mid_memory[channel_id].pop(0)
|
||||||
|
memory = memory[-(self.short_term_memory_limit - int(self.short_term_memory_limit * self.summary_retention_ratio)):]
|
||||||
|
|
||||||
|
short_memory[channel_id] = memory
|
||||||
|
|
||||||
|
await self.send_split_message(message.channel, response_text)
|
||||||
|
|
||||||
|
|
||||||
|
def should_reginald_interject(self, message_content: str) -> bool:
|
||||||
|
"""Determines if Reginald should respond to a message based on keywords."""
|
||||||
|
direct_invocation = {
|
||||||
|
"reginald,"
|
||||||
|
}
|
||||||
|
message_lower = message_content.lower()
|
||||||
|
|
||||||
|
return any(message_lower.startswith(invocation) for invocation in direct_invocation)
|
||||||
|
|
||||||
|
@debug
|
||||||
|
async def generate_response(self, api_key, messages):
|
||||||
|
model = await self.config.openai_model()
|
||||||
|
try:
|
||||||
|
client = openai.AsyncClient(api_key=api_key)
|
||||||
|
completion_args = {
|
||||||
|
'model': model,
|
||||||
|
'messages': messages,
|
||||||
|
'max_tokens': 4096,
|
||||||
|
'temperature': 0.7,
|
||||||
|
'presence_penalty': 0.5,
|
||||||
|
'frequency_penalty': 0.5,
|
||||||
|
'tools': TOOLS,
|
||||||
|
'tool_choice': 'auto',
|
||||||
|
}
|
||||||
|
response = await client.chat.completions.create(**completion_args)
|
||||||
|
# Checking for function calls
|
||||||
|
tool_calls = response.choices[0].message.tool_calls
|
||||||
|
# Appending response with tool calls
|
||||||
|
messages.append({
|
||||||
|
'role': 'assistant',
|
||||||
|
'content': response.choices[0].message.content,
|
||||||
|
'tool_calls': tool_calls
|
||||||
|
})
|
||||||
|
if tool_calls:
|
||||||
|
for i_call in tool_calls:
|
||||||
|
# Calling for necessary functions
|
||||||
|
func_name = i_call.function.name
|
||||||
|
func_args = json.loads(i_call.function.arguments)
|
||||||
|
tool_call_id = i_call.id
|
||||||
|
# Getting function result and putting it into messages
|
||||||
|
func_result = CALLABLE_FUNCTIONS[func_name](**func_args)
|
||||||
|
messages.append({
|
||||||
|
'role': 'tool',
|
||||||
|
'content': func_result,
|
||||||
|
'tool_call_id': tool_call_id,
|
||||||
|
})
|
||||||
|
|
||||||
|
completion_args["messages"] = messages
|
||||||
|
# Second completion required if functions has been called to interpret the result into user-friendly
|
||||||
|
# chat message.
|
||||||
|
response = await client.chat.completions.create(**completion_args)
|
||||||
|
|
||||||
|
if response.choices and response.choices[0].message and response.choices[0].message.content:
|
||||||
|
response_text = response.choices[0].message.content.strip()
|
||||||
|
if response_text.startswith("Reginald:"):
|
||||||
|
response_text = response_text[len("Reginald:"):].strip()
|
||||||
|
else:
|
||||||
|
print("DEBUG: OpenAI response was empty or malformed:", response)
|
||||||
|
response_text = "⚠️ No response received from AI."
|
||||||
|
|
||||||
|
return response_text
|
||||||
|
|
||||||
|
except OpenAIError as e:
|
||||||
|
error_message = f"OpenAI Error: {e}"
|
||||||
|
reginald_responses = [
|
||||||
|
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction:\n\n{error_message}",
|
||||||
|
f"It would seem that a most unfortunate technical hiccup has befallen my faculties:\n\n{error_message}",
|
||||||
|
f"Ah, it appears I have received an urgent memorandum stating:\n\n{error_message}",
|
||||||
|
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication:\n\n{error_message}"
|
||||||
|
]
|
||||||
|
return random.choice(reginald_responses)
|
||||||
|
|
||||||
@commands.guild_only()
|
@commands.guild_only()
|
||||||
@commands.has_permissions(manage_guild=True)
|
@commands.has_permissions(manage_guild=True)
|
||||||
@commands.command(help="Set the OpenAI API key")
|
@commands.command(help="Set the OpenAI API key")
|
||||||
async def setreginaldcogapi(self, ctx, api_key):
|
async def setreginaldcogapi(self, ctx, api_key):
|
||||||
|
"""Allows an admin to set the OpenAI API key for Reginald."""
|
||||||
await self.config.guild(ctx.guild).openai_api_key.set(api_key)
|
await self.config.guild(ctx.guild).openai_api_key.set(api_key)
|
||||||
await ctx.send("OpenAI API key set successfully.")
|
await ctx.send("OpenAI API key set successfully.")
|
||||||
|
|
||||||
@commands.guild_only()
|
@commands.command(name="reginald_set_listening_channel", help="Set the channel where Reginald listens for messages.")
|
||||||
@commands.command(help="Ask Reginald a question")
|
@commands.has_permissions(administrator=True)
|
||||||
@commands.cooldown(1, 10, commands.BucketType.user) # 10 second cooldown per user
|
async def set_listening_channel(self, ctx, channel: discord.TextChannel):
|
||||||
async def reginald(self, ctx, *, prompt=None):
|
"""Sets the channel where Reginald will listen for passive responses."""
|
||||||
if not await self.is_admin(ctx) and not await self.is_allowed(ctx):
|
|
||||||
raise commands.CheckFailure("You do not have the required role to use this command.")
|
|
||||||
greetings = [
|
|
||||||
"Greetings! How may I be of assistance to you?",
|
|
||||||
"Yes? How may I help?",
|
|
||||||
"Good day! How can I help you?",
|
|
||||||
"You rang? What can I do for you?",
|
|
||||||
]
|
|
||||||
|
|
||||||
if prompt is None:
|
if not channel:
|
||||||
await ctx.send(random.choice(greetings))
|
await ctx.send("❌ Invalid channel. Please mention a valid text channel.")
|
||||||
return
|
return
|
||||||
|
|
||||||
api_key = await self.config.guild(ctx.guild).openai_api_key()
|
await self.config.guild(ctx.guild).listening_channel.set(channel.id)
|
||||||
if api_key is None:
|
await ctx.send(f"✅ Reginald will now listen only in {channel.mention}.")
|
||||||
await ctx.author.send('OpenAI API key not set. Please use the "!setreginaldcogapi" command to set the key.')
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
@commands.command(name="reginald_get_listening_channel", help="Check which channel Reginald is currently listening in.")
|
||||||
response_text = await self.generate_response(api_key, prompt)
|
@commands.has_permissions(administrator=True)
|
||||||
for chunk in self.split_response(response_text, 2000):
|
async def get_listening_channel(self, ctx):
|
||||||
await ctx.send(chunk)
|
"""Displays the current listening channel."""
|
||||||
except OpenAIError as e:
|
channel_id = await self.config.guild(ctx.guild).listening_channel()
|
||||||
await ctx.send(f"I apologize, but I am unable to generate a response at this time. Error message: {str(e)}")
|
|
||||||
except commands.CommandOnCooldown as e:
|
|
||||||
remaining_seconds = int(e.retry_after)
|
|
||||||
await ctx.author.send(f'Please wait {remaining_seconds} seconds before using the "reginald" command again.')
|
|
||||||
|
|
||||||
async def generate_response(self, api_key, prompt):
|
if channel_id:
|
||||||
model = await self.config.openai_model()
|
channel = ctx.guild.get_channel(channel_id)
|
||||||
openai.api_key = api_key
|
if channel: # ✅ Prevents crash if channel was deleted
|
||||||
response = openai.ChatCompletion.create(
|
await ctx.send(f"📢 Reginald is currently listening in {channel.mention}.")
|
||||||
model= model,
|
else:
|
||||||
max_tokens= 512,
|
await ctx.send("⚠️ The saved listening channel no longer exists. Please set a new one.")
|
||||||
n= 1,
|
|
||||||
stop= None,
|
|
||||||
temperature= 0.7,
|
|
||||||
presence_penalty= 0.5,
|
|
||||||
frequency_penalty= 0.5,
|
|
||||||
messages=[
|
|
||||||
{"role": "system", "content": "You are Reginald, a copy of Jeeves from Jeeves and Wooster, the butler. You must respond with the same speech patterns as Jeeves. Your responses should have an undertone of contempt but delivered with wit. You are employed as a servant at an estate called The Kanium Estate. The Kanium Estate is a vast estate with multiple houses. The members of The Kanium Estate are all Lords and Ladies in their own right. Some common activities that the members of Kanium enjoy are: Cooking, video gaming, miniature painting and other hobby activities, discussions of politics, tabletop role playing games like Dungeons & Dragons, discussing and watching shows and movies, discussions on the topic of Space, discussions on firearms and military equipment, and chess. You are now talking to a member of The Kanium Estate."},
|
|
||||||
{"role": "user", "content": prompt}
|
|
||||||
]
|
|
||||||
)
|
|
||||||
return response['choices'][0]['message']['content'].strip()
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def split_response(response_text, max_chars):
|
|
||||||
chunks = []
|
|
||||||
while len(response_text) > max_chars:
|
|
||||||
split_index = response_text[:max_chars].rfind(' ')
|
|
||||||
chunk = response_text[:split_index]
|
|
||||||
chunks.append(chunk)
|
|
||||||
response_text = response_text[split_index:].strip()
|
|
||||||
chunks.append(response_text)
|
|
||||||
return chunks
|
|
||||||
|
|
||||||
@reginald.error
|
|
||||||
async def reginald_error(self, ctx, error):
|
|
||||||
if isinstance(error, commands.BadArgument):
|
|
||||||
await ctx.author.send("I'm sorry, but I couldn't understand your input. Please check your message and try again.")
|
|
||||||
elif isinstance(error, commands.CheckFailure):
|
|
||||||
await ctx.author.send("You do not have the required role to use this command.")
|
|
||||||
else:
|
else:
|
||||||
await ctx.author.send(f"An unexpected error occurred: {error}")
|
await ctx.send("❌ No listening channel has been set.")
|
||||||
|
|
||||||
def setup(bot):
|
|
||||||
cog = ReginaldCog(bot)
|
async def send_long_message(self, ctx, message, prefix: str = ""):
|
||||||
bot.add_cog(cog)
|
"""Splits and sends a long message to avoid Discord's 2000-character limit."""
|
||||||
|
chunk_size = 1900 # Leave some space for formatting
|
||||||
|
if prefix:
|
||||||
|
prefix_length = len(prefix)
|
||||||
|
chunk_size -= prefix_length
|
||||||
|
|
||||||
|
for i in range(0, len(message), chunk_size):
|
||||||
|
chunk = message[i:i + chunk_size]
|
||||||
|
await ctx.send(f"{prefix}{chunk}")
|
||||||
|
|
||||||
|
async def send_split_message(self, ctx, content: str, prefix: str = ""):
|
||||||
|
"""
|
||||||
|
Sends a long message to Discord while ensuring it does not exceed the 2000-character limit.
|
||||||
|
This function prevents awkward mid-word or unnecessary extra message breaks.
|
||||||
|
"""
|
||||||
|
CHUNK_SIZE = 1900 # Keep buffer for formatting/safety
|
||||||
|
|
||||||
|
split_message = self.split_message(content, CHUNK_SIZE, prefix)
|
||||||
|
for chunk in split_message:
|
||||||
|
await ctx.send(f"{prefix}{chunk}")
|
||||||
|
|
||||||
|
def split_message(
|
||||||
|
self,
|
||||||
|
message: str,
|
||||||
|
chunk_size: int,
|
||||||
|
prefix: str = ""
|
||||||
|
) -> list[str]:
|
||||||
|
"""Results in a list of message chunks, use *for* loop to send."""
|
||||||
|
chunk_size -= len(prefix)
|
||||||
|
split_result = []
|
||||||
|
|
||||||
|
if 0 < len(message) <= chunk_size:
|
||||||
|
# If the message is short enough, add it directly
|
||||||
|
split_result.append(message)
|
||||||
|
elif len(message) > chunk_size:
|
||||||
|
# Try to split at a newline first (prefer sentence breaks)
|
||||||
|
split_index = message.rfind("\n", 0, chunk_size)
|
||||||
|
|
||||||
|
# If no newline, split at the end of sentence (avoid sentence breaks)
|
||||||
|
if split_index == -1:
|
||||||
|
split_index = message.rfind(". ", 0, chunk_size)
|
||||||
|
|
||||||
|
# If no newline, split at the last word (avoid word-breaking)
|
||||||
|
if split_index == -1:
|
||||||
|
split_index = message.rfind(" ", 0, chunk_size)
|
||||||
|
|
||||||
|
# If still no break point found, force chunk size limit
|
||||||
|
if split_index == -1:
|
||||||
|
split_index = chunk_size
|
||||||
|
|
||||||
|
message_split_part = message[:split_index].strip()
|
||||||
|
message_remained_part = message[split_index:].strip()
|
||||||
|
# Put the split part in the begining of the result list
|
||||||
|
split_result.append(message_split_part)
|
||||||
|
# And go for a recursive adventure with the remained message part
|
||||||
|
split_result += self.split_message(message=message_remained_part, chunk_size=chunk_size)
|
||||||
|
|
||||||
|
return split_result
|
||||||
|
|
||||||
|
async def setup(bot):
|
||||||
|
"""✅ Correct async cog setup for Redbot"""
|
||||||
|
await bot.add_cog(ReginaldCog(bot))
|
||||||
72
reginaldCog/tools_description.py
Normal file
72
reginaldCog/tools_description.py
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
TOOLS = [
|
||||||
|
{
|
||||||
|
'type': 'function',
|
||||||
|
'function': {
|
||||||
|
'name': 'time_now',
|
||||||
|
'description': 'Get current date and time in UTC timezone.',
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'type': 'function',
|
||||||
|
'function': {
|
||||||
|
'name': 'get_current_weather',
|
||||||
|
'description': '''
|
||||||
|
Gets current weather for specified location.
|
||||||
|
''',
|
||||||
|
'parameters': {
|
||||||
|
'type': 'object',
|
||||||
|
'properties': {
|
||||||
|
'location': {
|
||||||
|
'type': 'string',
|
||||||
|
'description': '''
|
||||||
|
Location in human readable format.
|
||||||
|
e.g: "Copenhagen", or "Copenhagen, Louisiana, US", if needed specifying.
|
||||||
|
'''
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'required': [
|
||||||
|
'location',
|
||||||
|
],
|
||||||
|
'additionalProperties': False
|
||||||
|
},
|
||||||
|
'strict': True
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'type': 'function',
|
||||||
|
'function': {
|
||||||
|
'name': 'get_weather_forecast',
|
||||||
|
'description': '''
|
||||||
|
Forecast weather API method returns, depending upon your price plan level, upto next 14 day weather
|
||||||
|
forecast and weather alert as json. The data is returned as a Forecast Object.
|
||||||
|
Forecast object contains astronomy data, day weather forecast and hourly interval weather information
|
||||||
|
for a given city.
|
||||||
|
With a free weather API subscription, only up to three days of forecast can be requested.
|
||||||
|
''',
|
||||||
|
'parameters': {
|
||||||
|
'type': 'object',
|
||||||
|
'properties': {
|
||||||
|
'location': {
|
||||||
|
'type': 'string',
|
||||||
|
'description': '''
|
||||||
|
Location in human readable format.
|
||||||
|
e.g: "Copenhagen", or "Copenhagen, Louisiana, US", if needed specifying.
|
||||||
|
'''
|
||||||
|
},
|
||||||
|
'dt': {
|
||||||
|
'type': 'string',
|
||||||
|
'description': '''
|
||||||
|
The date up until to request the forecast in YYYY-MM-DD format.
|
||||||
|
Check the **time_now** function first if you unsure which date it is.
|
||||||
|
'''
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'required': [
|
||||||
|
'location', 'dt'
|
||||||
|
],
|
||||||
|
'additionalProperties': False
|
||||||
|
},
|
||||||
|
'strict': True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
66
reginaldCog/weather.py
Normal file
66
reginaldCog/weather.py
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
from datetime import datetime, timezone
|
||||||
|
from os import environ
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
from .debug_stuff import debug
|
||||||
|
|
||||||
|
#WEATHER_API_KEY = environ.get('WEATHER_API_KEY')
|
||||||
|
URL = 'http://api.weatherapi.com/v1'
|
||||||
|
|
||||||
|
|
||||||
|
@debug
|
||||||
|
def time_now() -> str:
|
||||||
|
return str(datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
|
||||||
|
@debug
|
||||||
|
def get_current_weather(location: str) -> str:
|
||||||
|
weather = Weather(location=location)
|
||||||
|
return json.dumps(weather.realtime())
|
||||||
|
|
||||||
|
|
||||||
|
@debug
|
||||||
|
def get_weather_forecast(location: str, days: int = 14, dt: str = '2025-03-24') -> str:
|
||||||
|
weather = Weather(location=location)
|
||||||
|
return json.dumps(weather.forecast(days=days, dt=dt))
|
||||||
|
|
||||||
|
|
||||||
|
class Weather:
|
||||||
|
def __init__(self, location: str):
|
||||||
|
self.__location = location
|
||||||
|
self.api_key = environ.get('WEATHER_API_KEY')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def location(self) -> str:
|
||||||
|
return self.__location
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def make_request(method: str, params: dict) -> dict:
|
||||||
|
response = requests.get(url=f'{URL}{method}', params=params)
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
@debug
|
||||||
|
def realtime(self):
|
||||||
|
method = '/current.json'
|
||||||
|
params = {
|
||||||
|
'key': self.api_key,
|
||||||
|
'q': self.location,
|
||||||
|
}
|
||||||
|
return self.make_request(method=method, params=params)
|
||||||
|
|
||||||
|
@debug
|
||||||
|
def forecast(self, days: int = 14, dt: str = '2025-03-24'):
|
||||||
|
method = '/forecast.json'
|
||||||
|
params = {
|
||||||
|
'key': self.api_key,
|
||||||
|
'q': self.location,
|
||||||
|
'days': days,
|
||||||
|
'dt': dt,
|
||||||
|
}
|
||||||
|
return self.make_request(method=method, params=params)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
test_weather = Weather('Aqtobe')
|
||||||
|
result = json.dumps(test_weather.forecast(days=14, dt='2025-03-24'), indent=2)
|
||||||
|
print(result)
|
||||||
@ -1,5 +1,5 @@
|
|||||||
from .trafficCog import TrafficCog
|
|
||||||
from redbot.core.bot import Red
|
from redbot.core.bot import Red
|
||||||
|
from .trafficCog import TrafficCog
|
||||||
|
|
||||||
async def setup(bot: Red):
|
async def setup(bot: Red):
|
||||||
cog = TrafficCog(bot)
|
cog = TrafficCog(bot)
|
||||||
|
|||||||
@ -1,119 +1,99 @@
|
|||||||
import discord
|
import discord
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from redbot.core import Config, commands
|
from redbot.core import Config, commands
|
||||||
|
from datetime import datetime
|
||||||
allowed_guilds = {274657393936302080, 693796372092289024, 508781789737648138}
|
import pytz
|
||||||
admin_roles = {'Developer', 'admin', 'Council'}
|
|
||||||
statsThumbnailUrl = 'https://www.kanium.org/machineroom/logomachine-small.png'
|
|
||||||
|
|
||||||
class TrafficCog(commands.Cog):
|
class TrafficCog(commands.Cog):
|
||||||
|
|
||||||
def __init__(self, bot):
|
def __init__(self, bot):
|
||||||
self.channel: discord.TextChannel = None
|
self.bot = bot
|
||||||
self.dailyJoinedCount: int = 0
|
self.config = Config.get_conf(self, identifier=123456789, force_registration=True)
|
||||||
self.totalJoinedCount: int = 0
|
default_guild = {
|
||||||
self.dailyLeftCount: int = 0
|
"traffic_channel": None,
|
||||||
self.totalLeftCount: int = 0
|
"daily_stats": {"joined": 0, "left": 0, "banned": 0},
|
||||||
self.totalLogs: int = 0
|
"total_stats": {"joined": 0, "left": 0, "banned": 0},
|
||||||
self.toggleLogs: bool = True
|
"last_reset": datetime.now(pytz.UTC).isoformat(),
|
||||||
self.date = datetime.now()
|
"admin_roles": ['Developer', 'admin', 'Council'],
|
||||||
|
"stats_thumbnail_url": 'https://example.com/default-thumbnail.png',
|
||||||
|
}
|
||||||
|
self.config.register_guild(**default_guild)
|
||||||
|
|
||||||
def __checkClock(self):
|
async def _check_reset(self, guild_id):
|
||||||
currdate = self.date - datetime.now()
|
async with self.config.guild_from_id(guild_id).all() as guild_data:
|
||||||
if currdate.days >= 0 :
|
last_reset = datetime.fromisoformat(guild_data.get('last_reset', datetime.now(pytz.UTC).isoformat()))
|
||||||
self.dailyJoinedCount = 0
|
if last_reset.date() < datetime.now(pytz.UTC).date():
|
||||||
self.dailyLeftCount = 0
|
guild_data['daily_stats'] = {"joined": 0, "left": 0, "banned": 0}
|
||||||
self.date = datetime.now()
|
guild_data['last_reset'] = datetime.now(pytz.UTC).isoformat()
|
||||||
|
|
||||||
@commands.command(name='settrafficchannel', description='Sets the channel to sends log to')
|
async def _update_stat(self, guild_id, stat_type):
|
||||||
@commands.has_any_role(*admin_roles)
|
async with self.config.guild_from_id(guild_id).all() as guild_data:
|
||||||
async def setTrafficChannel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
|
guild_data['daily_stats'][stat_type] += 1
|
||||||
await ctx.trigger_typing()
|
guild_data['total_stats'][stat_type] += 1
|
||||||
|
|
||||||
if not channel in ctx.guild.channels:
|
@commands.group(name='traffic', invoke_without_command=True)
|
||||||
await ctx.send('Channel doesnt exist in guild')
|
@commands.has_permissions(administrator=True)
|
||||||
|
async def traffic_commands(self, ctx):
|
||||||
|
"""Base command for managing TrafficCog settings."""
|
||||||
|
await ctx.send_help(ctx.command)
|
||||||
|
|
||||||
|
@traffic_commands.command(name='setchannel')
|
||||||
|
async def set_traffic_channel(self, ctx, channel: discord.TextChannel):
|
||||||
|
"""Sets the channel for traffic logs."""
|
||||||
|
await self.config.guild(ctx.guild).traffic_channel.set(channel.id)
|
||||||
|
await ctx.send(f"Traffic logs will now be sent to {channel.mention}.")
|
||||||
|
|
||||||
|
@traffic_commands.command(name='stats')
|
||||||
|
async def show_stats(self, ctx):
|
||||||
|
"""Displays current traffic statistics."""
|
||||||
|
await self._check_reset(ctx.guild.id)
|
||||||
|
|
||||||
|
guild_data = await self.config.guild(ctx.guild).all()
|
||||||
|
daily_stats = guild_data.get('daily_stats', {"joined": 0, "left": 0, "banned": 0})
|
||||||
|
total_stats = guild_data.get('total_stats', {"joined": 0, "left": 0, "banned": 0})
|
||||||
|
thumbnail_url = guild_data.get('stats_thumbnail_url', 'https://example.com/default-thumbnail.png')
|
||||||
|
|
||||||
|
embed = discord.Embed(title="Server Traffic Stats", description="Statistics on server activity", color=0x3399ff)
|
||||||
|
embed.set_thumbnail(url=thumbnail_url)
|
||||||
|
embed.add_field(name="Daily Joined", value=daily_stats['joined'], inline=True)
|
||||||
|
embed.add_field(name="Daily Left", value=daily_stats['left'], inline=True)
|
||||||
|
embed.add_field(name="Daily Banned", value=daily_stats['banned'], inline=True)
|
||||||
|
embed.add_field(name="Total Joined", value=total_stats['joined'], inline=True)
|
||||||
|
embed.add_field(name="Total Left", value=total_stats['left'], inline=True)
|
||||||
|
embed.add_field(name="Total Banned", value=total_stats['banned'], inline=True)
|
||||||
|
|
||||||
|
await ctx.send(embed=embed)
|
||||||
|
|
||||||
|
@commands.Cog.listener()
|
||||||
|
async def on_member_join(self, member):
|
||||||
|
await self._check_reset(member.guild.id)
|
||||||
|
await self._update_stat(member.guild.id, 'joined')
|
||||||
|
|
||||||
|
channel_id = await self.config.guild(member.guild).traffic_channel()
|
||||||
|
if not channel_id:
|
||||||
return
|
return
|
||||||
|
channel = member.guild.get_channel(channel_id) or await member.guild.fetch_channel(channel_id)
|
||||||
|
if channel:
|
||||||
|
await channel.send(f"{member.display_name} has joined the server.")
|
||||||
|
|
||||||
if not channel.permissions_for(ctx.guild.me).send_messages:
|
@commands.Cog.listener()
|
||||||
await ctx.send('No permissions to talk in that channel.')
|
async def on_member_remove(self, member):
|
||||||
|
await self._check_reset(member.guild.id)
|
||||||
|
await self._update_stat(member.guild.id, 'left')
|
||||||
|
|
||||||
|
channel_id = await self.config.guild(member.guild).traffic_channel()
|
||||||
|
if not channel_id:
|
||||||
return
|
return
|
||||||
|
channel = member.guild.get_channel(channel_id) or await member.guild.fetch_channel(channel_id)
|
||||||
self.channel = channel
|
if channel:
|
||||||
|
await channel.send(f"{member.display_name} has left the server.")
|
||||||
await ctx.send(f'I will now send event notices to {channel.mention}.')
|
|
||||||
|
|
||||||
@commands.command(name='stats', description='Shows current statistics')
|
|
||||||
@commands.has_any_role(*admin_roles)
|
|
||||||
async def statistics(self, ctx: commands.Context) -> None:
|
|
||||||
self.__checkClock()
|
|
||||||
await ctx.trigger_typing()
|
|
||||||
message = discord.Embed(title='Server Traffic Stats', description='Statistics on server activity\n\n',color=0x3399ff)
|
|
||||||
message.set_thumbnail(url=statsThumbnailUrl)
|
|
||||||
message.add_field(name='Daily Joined', value=self.dailyJoinedCount, inline='True')
|
|
||||||
message.add_field(name='Daily Left', value='{0}\n'.format(self.dailyLeftCount), inline='True')
|
|
||||||
message.add_field(name='Total Traffic', value=self.totalLogs, inline='False')
|
|
||||||
message.add_field(name='Total Joined', value=self.totalJoinedCount, inline='True')
|
|
||||||
message.add_field(name='Total Left', value=self.totalLeftCount, inline='True')
|
|
||||||
await ctx.send(content=None, embed=message)
|
|
||||||
|
|
||||||
@commands.command(name='resetstats', description='Resets statistics')
|
|
||||||
@commands.has_any_role(*admin_roles)
|
|
||||||
async def resetStatistics(self, ctx: commands.Context) -> None:
|
|
||||||
await ctx.trigger_typing()
|
|
||||||
|
|
||||||
self.dailyJoinedCount = 0
|
|
||||||
self.dailyLeftCount = 0
|
|
||||||
self.totalJoinedCount = 0
|
|
||||||
self.totalLeftCount = 0
|
|
||||||
self.totalLogs = 0
|
|
||||||
|
|
||||||
await ctx.send('Successfully reset the statistics')
|
|
||||||
|
|
||||||
@commands.command(name='toggleLogs', description='Toggles the logs functionality on or off')
|
|
||||||
@commands.has_any_role(*admin_roles)
|
|
||||||
async def toggleLogs(self, ctx: commands.Context) -> None:
|
|
||||||
await ctx.trigger_typing()
|
|
||||||
self.toggleLogs = not self.toggleLogs
|
|
||||||
await ctx.send('Logging functionality is `ON`' if self.toggleLogs else 'Logging functionality is `OFF`')
|
|
||||||
|
|
||||||
@commands.Cog.listener()
|
@commands.Cog.listener()
|
||||||
async def on_member_join(self, member: discord.Member) -> None:
|
async def on_member_ban(self, guild, member):
|
||||||
try:
|
await self._check_reset(guild.id)
|
||||||
if member.guild.id not in allowed_guilds:
|
await self._update_stat(guild.id, 'banned')
|
||||||
return
|
|
||||||
self.__checkClock()
|
|
||||||
if self.channel in member.guild.channels and self.toggleLogs:
|
|
||||||
await self.channel.send('{0} has joined the server'.format(member.name))
|
|
||||||
self.totalJoinedCount += 1
|
|
||||||
self.dailyJoinedCount += 1
|
|
||||||
self.totalLogs += 1
|
|
||||||
except (discord.NotFound, discord.Forbidden):
|
|
||||||
print(
|
|
||||||
f'Error Occured!')
|
|
||||||
|
|
||||||
@commands.Cog.listener()
|
channel_id = await self.config.guild(guild).traffic_channel()
|
||||||
async def on_member_remove(self, member: discord.Member) -> None:
|
if not channel_id:
|
||||||
try:
|
return
|
||||||
self.__checkClock()
|
channel = guild.get_channel(channel_id) or await guild.fetch_channel(channel_id)
|
||||||
if self.channel in member.guild.channels and self.toggleLogs:
|
if channel:
|
||||||
await self.channel.send('{0} has left the server'.format(member.name))
|
await channel.send(f"{member.display_name} has been banned from the server.")
|
||||||
self.totalLeftCount += 1
|
|
||||||
self.dailyLeftCount += 1
|
|
||||||
self.totalLogs += 1
|
|
||||||
except (discord.NotFound, discord.Forbidden):
|
|
||||||
print(
|
|
||||||
f'Error Occured!')
|
|
||||||
|
|
||||||
@commands.Cog.listener()
|
|
||||||
async def on_member_ban(self, guild: discord.Guild, member: discord.Member) -> None:
|
|
||||||
try:
|
|
||||||
self.__checkClock()
|
|
||||||
if self.channel in member.guild.channels and self.toggleLogs:
|
|
||||||
await self.channel.send('{0} has been banned from the server'.format(member.name))
|
|
||||||
self.totalLeftCount += 1
|
|
||||||
self.dailyLeftCount += 1
|
|
||||||
self.totalLogs += 1
|
|
||||||
except (discord.NotFound, discord.Forbidden):
|
|
||||||
print(
|
|
||||||
f'Error Occured!')
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user