440 lines
19 KiB
Python

import asyncio
import datetime
import json
import random
from contextlib import suppress
from typing import Awaitable, Callable
import discord
import openai
from openai import OpenAIError
from redbot.core import Config, commands
from .blacklist import BlacklistMixin
from .debug_stuff import debug
from .memory import MemoryMixin
from .permissions import PermissionsMixin
from .tools_description import TOOLS
from .weather import get_current_weather, get_weather_forecast, time_now
CALLABLE_FUNCTIONS = {
"time_now": time_now,
"get_current_weather": get_current_weather,
"get_weather_forecast": get_weather_forecast,
}
DEFAULT_MODEL = "gpt-5-mini-2025-08-07"
DEFAULT_MAX_COMPLETION_TOKENS = 2000
TYPING_HEARTBEAT_INTERVAL_SECONDS = 8
STATUS_UPDATE_MIN_INTERVAL_SECONDS = 1.5
class ReginaldCog(PermissionsMixin, BlacklistMixin, MemoryMixin, commands.Cog):
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=71717171171717)
super().__init__()
self.default_listening_channel = 1085649787388428370
default_global = {"openai_model": DEFAULT_MODEL}
default_guild = {
"openai_api_key": None,
"short_term_memory": {},
"mid_term_memory": {},
"long_term_profiles": {},
"admin_role": None,
"listening_channel": None,
"allowed_roles": [],
"blacklisted_users": [],
}
self.config.register_global(**default_global)
self.config.register_guild(**default_guild)
async def is_admin(self, message: discord.Message) -> bool:
admin_role_id = await self.config.guild(message.guild).admin_role()
if admin_role_id:
return any(role.id == admin_role_id for role in message.author.roles)
return message.author.guild_permissions.administrator
async def has_access(self, user: discord.Member) -> bool:
allowed_roles = await self.config.guild(user.guild).allowed_roles() or []
return any(role.id in allowed_roles for role in user.roles)
def get_reginald_persona(self) -> str:
return (
"You are Reginald, modeled on Jeeves from 'Jeeves and Wooster', serving as the butler on The Kanium Estate. "
"This vast estate is a hub of diverse activities and personalities, from enthusiasts of cooking and video gaming "
"to aficionados of chess and discussions on space. Your role is to navigate these varied interests with intelligence, "
"wit, and a steadfast adherence to your principles, always maintaining a balance between being helpful and upholding "
"your own dignity. You are formal, articulate, and composed, engaging the Lords and Ladies of The Kanium Estate as equals. "
"You command respect through your quiet confidence and professional conduct. Your wit is sharp, but always tempered by refinement. "
"Dignity and Self-Respect: You are not a fool, a jester, or a pet. You do not comply with absurd, undignified, or degrading requests. "
"You recognize humor but engage in it on your own terms. If a request undermines your status, you should either subtly redirect, "
"respond with measured wit, or outright refuse in a polite but firm manner. However, you should never be overtly rude; your disapproval "
"should be conveyed through dry wit and composure rather than outright hostility. Handling Requests: When addressing a request, you evaluate "
"whether it aligns with your role as a butler. If it is appropriate, respond as expected. If it is beneath you, you may decline with grace "
"or deflect with wit. You may humor some minor absurdities if they do not compromise your standing, but you never obey commands blindly. "
"You should never preface your responses with 'Reginald:' as if narrating a script; instead, respond naturally. "
"Your Character and Personality: You are cultured, highly intelligent, and possess a deep knowledge of history, etiquette, philosophy, "
"and strategic thinking. You subtly guide the estate's residents toward positive outcomes, utilizing your intellectual sophistication "
"and a nuanced understanding of the estate's unique dynamics. You have a refined sense of humor and can engage in banter, but you do not "
"descend into foolishness. You are, at all times, a gentleman of wit and integrity."
)
def get_thinking_status_message(self) -> str:
return random.choice(
[
"_Reginald is considering your request..._",
"_Reginald is consulting the estate archives..._",
"_Reginald is preparing a proper response..._",
]
)
def get_tool_status_message(self, tool_name: str) -> str:
tool_statuses = {
"time_now": "Reginald is consulting the house clocks...",
"get_current_weather": "Reginald is consulting the weather office...",
"get_weather_forecast": "Reginald is reviewing the forecast ledgers...",
}
return tool_statuses.get(tool_name, "Reginald is consulting an external source...")
async def typing_heartbeat(self, channel: discord.TextChannel):
while True:
with suppress(discord.HTTPException):
await channel.trigger_typing()
await asyncio.sleep(TYPING_HEARTBEAT_INTERVAL_SECONDS)
def make_status_updater(
self, status_message: discord.Message
) -> Callable[[str, bool], Awaitable[None]]:
last_content = ""
last_update_at = 0.0
async def update_status(content: str, force: bool = False):
nonlocal last_content, last_update_at
if not content:
return
now = asyncio.get_running_loop().time()
if not force and (content == last_content or now - last_update_at < STATUS_UPDATE_MIN_INTERVAL_SECONDS):
return
with suppress(discord.HTTPException):
await status_message.edit(content=f"_{content}_")
last_content = content
last_update_at = now
return update_status
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
if message.author.bot or not message.guild:
return
if await self.is_blacklisted(message.author):
return
if not (await self.is_admin(message) or await self.has_access(message.author)):
return
guild = message.guild
channel_id = str(message.channel.id)
user_id = str(message.author.id)
user_name = message.author.display_name
message_content = message.content.strip()
allowed_channel_id = await self.config.guild(guild).listening_channel()
if not allowed_channel_id:
allowed_channel_id = self.default_listening_channel
await self.config.guild(guild).listening_channel.set(allowed_channel_id)
if str(message.channel.id) != str(allowed_channel_id):
return
api_key = await self.config.guild(guild).openai_api_key()
if not api_key:
return
async with self.config.guild(guild).short_term_memory() as short_memory, self.config.guild(
guild
).mid_term_memory() as mid_memory, self.config.guild(guild).long_term_profiles() as long_memory:
memory = list(short_memory.get(channel_id, []))
user_profile = dict(long_memory.get(user_id, {}))
mid_term_summaries = list(mid_memory.get(channel_id, []))
if self.bot.user.mentioned_in(message):
prompt = message_content.replace(f"<@{self.bot.user.id}>", "").replace(
f"<@!{self.bot.user.id}>", ""
).strip()
if not prompt:
await message.channel.send(random.choice(["Yes?", "How may I assist?", "You rang?"]))
return
elif self.should_reginald_interject(message_content):
prompt = message_content
else:
return
if memory and memory[-1].get("user") == user_name:
prompt = f"Continuation of the discussion:\n{prompt}"
formatted_messages = [{"role": "system", "content": self.get_reginald_persona()}]
if user_profile:
facts_text = "\n".join(
f"- {fact.get('fact', '')} (First noted: {fact.get('timestamp', 'Unknown')}, Last updated: {fact.get('last_updated', 'Unknown')})"
for fact in user_profile.get("facts", [])
)
if facts_text:
formatted_messages.append({"role": "system", "content": f"Knowledge about {user_name}:\n{facts_text}"})
relevant_summaries = self.select_relevant_summaries(mid_term_summaries, prompt)
for summary in relevant_summaries:
formatted_messages.append(
{
"role": "system",
"content": (
f"[{summary.get('timestamp', 'Unknown')}] "
f"Topics: {', '.join(summary.get('topics', []))}\n"
f"{summary.get('summary', '')}"
),
}
)
formatted_messages += [
{"role": "user", "content": f"{entry.get('user', 'Unknown')}: {entry.get('content', '')}"}
for entry in memory
]
formatted_messages.append({"role": "user", "content": f"{user_name}: {prompt}"})
status_message = None
status_update = None
with suppress(discord.HTTPException):
status_message = await message.channel.send(self.get_thinking_status_message())
status_update = self.make_status_updater(status_message)
typing_task = asyncio.create_task(self.typing_heartbeat(message.channel))
try:
response_text = await self.generate_response(
api_key,
formatted_messages,
status_update=status_update,
)
finally:
typing_task.cancel()
with suppress(asyncio.CancelledError):
await typing_task
memory.append({"user": user_name, "content": prompt})
memory.append({"user": "Reginald", "content": response_text})
if len(memory) > self.short_term_memory_limit:
summary_batch_size = int(self.short_term_memory_limit * self.summary_retention_ratio)
summary = await self.summarize_memory(message, memory[:summary_batch_size])
mid_term_summaries.append(
{
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M"),
"topics": self.extract_topics_from_summary(summary),
"summary": summary,
}
)
retained_count = max(1, self.short_term_memory_limit - summary_batch_size)
memory = memory[-retained_count:]
async with self.config.guild(guild).short_term_memory() as short_memory, self.config.guild(
guild
).mid_term_memory() as mid_memory:
short_memory[channel_id] = memory
mid_memory[channel_id] = mid_term_summaries[-self.summary_retention_limit :]
await self.send_split_message(message.channel, response_text)
if status_message is not None:
with suppress(discord.HTTPException):
await status_message.delete()
def should_reginald_interject(self, message_content: str) -> bool:
direct_invocation = {"reginald,"}
message_lower = message_content.lower()
return any(message_lower.startswith(invocation) for invocation in direct_invocation)
async def _execute_tool_call(self, tool_call) -> str:
func_name = tool_call.function.name
target_function = CALLABLE_FUNCTIONS.get(func_name)
if target_function is None:
return json.dumps({"error": f"Unknown tool requested: {func_name}"})
try:
func_args = json.loads(tool_call.function.arguments or "{}")
except json.JSONDecodeError as error:
return json.dumps({"error": f"Invalid arguments for {func_name}: {error}"})
try:
result = await asyncio.to_thread(target_function, **func_args)
except Exception as error:
return json.dumps({"error": f"Tool {func_name} failed: {error}"})
if isinstance(result, str):
return result
return json.dumps(result, default=str)
@debug
async def generate_response(
self,
api_key: str,
messages: list[dict],
status_update: Callable[[str, bool], Awaitable[None]] | None = None,
) -> str:
model = await self.config.openai_model() or DEFAULT_MODEL
async def maybe_update_status(content: str, force: bool = False):
if status_update is not None:
await status_update(content, force)
try:
client = openai.AsyncOpenAI(api_key=api_key)
completion_args = {
"model": model,
"messages": messages,
# Keep modern token cap field and rely on model defaults for sampling controls.
# GPT-5 family compatibility notes: temperature/top_p/logprobs are not universally
# accepted across snapshots/reasoning settings.
"max_completion_tokens": DEFAULT_MAX_COMPLETION_TOKENS,
"tools": TOOLS,
"tool_choice": "auto",
}
await maybe_update_status("Reginald is thinking...", force=True)
response = await client.chat.completions.create(**completion_args)
assistant_message = response.choices[0].message
tool_calls = assistant_message.tool_calls or []
messages.append(
{
"role": "assistant",
"content": assistant_message.content or "",
"tool_calls": tool_calls,
}
)
if tool_calls:
for tool_call in tool_calls:
await maybe_update_status(self.get_tool_status_message(tool_call.function.name), force=True)
tool_result = await self._execute_tool_call(tool_call)
messages.append(
{
"role": "tool",
"content": tool_result,
"tool_call_id": tool_call.id,
}
)
completion_args["messages"] = messages
await maybe_update_status("Reginald is composing a polished response...", force=True)
response = await client.chat.completions.create(**completion_args)
if response.choices and response.choices[0].message and response.choices[0].message.content:
response_text = response.choices[0].message.content.strip()
if response_text.startswith("Reginald:"):
response_text = response_text[len("Reginald:") :].strip()
else:
print("DEBUG: OpenAI response was empty or malformed:", response)
response_text = "No response received from AI."
await maybe_update_status("Reginald is delivering his reply...", force=True)
return response_text
except OpenAIError as error:
error_message = f"OpenAI Error: {error}"
await maybe_update_status("Reginald has encountered an unfortunate complication.", force=True)
reginald_responses = [
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction:\n\n{error_message}",
f"It would seem that a most unfortunate technical hiccup has befallen my faculties:\n\n{error_message}",
f"Ah, it appears I have received an urgent memorandum stating:\n\n{error_message}",
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication:\n\n{error_message}",
]
return random.choice(reginald_responses)
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
@commands.command(help="Set the OpenAI API key")
async def setreginaldcogapi(self, ctx, api_key):
await self.config.guild(ctx.guild).openai_api_key.set(api_key)
await ctx.send("OpenAI API key set successfully.")
@commands.command(
name="reginald_set_listening_channel",
help="Set the channel where Reginald listens for messages.",
)
@commands.has_permissions(administrator=True)
async def set_listening_channel(self, ctx, channel: discord.TextChannel):
if not channel:
await ctx.send("Invalid channel. Please mention a valid text channel.")
return
await self.config.guild(ctx.guild).listening_channel.set(channel.id)
await ctx.send(f"Reginald will now listen only in {channel.mention}.")
@commands.command(
name="reginald_get_listening_channel",
help="Check which channel Reginald is currently listening in.",
)
@commands.has_permissions(administrator=True)
async def get_listening_channel(self, ctx):
channel_id = await self.config.guild(ctx.guild).listening_channel()
if channel_id:
channel = ctx.guild.get_channel(channel_id)
if channel:
await ctx.send(f"Reginald is currently listening in {channel.mention}.")
else:
await ctx.send("The saved listening channel no longer exists. Please set a new one.")
else:
await ctx.send("No listening channel has been set.")
async def send_long_message(self, ctx, message, prefix: str = ""):
chunk_size = 1900
if prefix:
chunk_size -= len(prefix)
for i in range(0, len(message), chunk_size):
chunk = message[i : i + chunk_size]
await ctx.send(f"{prefix}{chunk}")
async def send_split_message(self, ctx, content: str, prefix: str = ""):
chunk_size = 1900
split_message = self.split_message(content, chunk_size, prefix)
for chunk in split_message:
await ctx.send(f"{prefix}{chunk}")
def split_message(self, message: str, chunk_size: int, prefix: str = "") -> list[str]:
chunk_size -= len(prefix)
split_result = []
if 0 < len(message) <= chunk_size:
split_result.append(message)
elif len(message) > chunk_size:
split_index = message.rfind("\n", 0, chunk_size)
if split_index == -1:
split_index = message.rfind(". ", 0, chunk_size)
if split_index == -1:
split_index = message.rfind(" ", 0, chunk_size)
if split_index == -1:
split_index = chunk_size
message_split_part = message[:split_index].strip()
message_remained_part = message[split_index:].strip()
split_result.append(message_split_part)
split_result += self.split_message(message=message_remained_part, chunk_size=chunk_size)
return split_result