Compare commits
No commits in common. "c6bb3afa70d81648b5277b670ee687f0ddd2759b" and "d51bc4965e64d37e6abc846356979ea31ea0cc70" have entirely different histories.
c6bb3afa70
...
d51bc4965e
@ -6,8 +6,6 @@
|
|||||||
"aliases": [
|
"aliases": [
|
||||||
"meter",
|
"meter",
|
||||||
"meters",
|
"meters",
|
||||||
"metre",
|
|
||||||
"metres",
|
|
||||||
"m",
|
"m",
|
||||||
"m."
|
"m."
|
||||||
]
|
]
|
||||||
@ -17,8 +15,6 @@
|
|||||||
"aliases": [
|
"aliases": [
|
||||||
"kilometer",
|
"kilometer",
|
||||||
"kilometers",
|
"kilometers",
|
||||||
"kilometre",
|
|
||||||
"kilometres",
|
|
||||||
"km",
|
"km",
|
||||||
"km."
|
"km."
|
||||||
]
|
]
|
||||||
|
|||||||
@ -1,78 +1,80 @@
|
|||||||
import csv
|
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
import csv
|
||||||
from typing import Optional
|
import os
|
||||||
|
|
||||||
|
|
||||||
def activity_log_path(activity: str, logs_dir: Optional[Path] = None) -> Path:
|
def activity_log_path(activity: str) -> str:
|
||||||
"""
|
"""
|
||||||
Returns the path of the activity log file in .csv format.
|
Returns the path of the activity log file in .csv format
|
||||||
|
:param activity:
|
||||||
|
:return:
|
||||||
"""
|
"""
|
||||||
logs_dir = logs_dir or (Path.cwd() / "activity_logs")
|
filename = os.path.join("activity_logs", f"{activity}.csv")
|
||||||
return logs_dir / f"{activity}.csv"
|
return filename
|
||||||
|
|
||||||
|
|
||||||
def activity_units_path(activity: str, units_dir: Optional[Path] = None) -> Path:
|
def activity_units_path(activity: str) -> str:
|
||||||
"""
|
"""
|
||||||
Returns the path of the activity units file in .json format.
|
Returns the path of the activity activities file in .json format
|
||||||
|
:param activity:
|
||||||
|
:return:
|
||||||
"""
|
"""
|
||||||
units_dir = units_dir or (Path.cwd() / "activities")
|
filename = os.path.join("activities", f"{activity}.json")
|
||||||
return units_dir / f"{activity}.json"
|
return filename
|
||||||
|
|
||||||
|
|
||||||
def is_convertable(activity: str, units_dir: Optional[Path] = None) -> bool:
|
def is_convertable(activity: str) -> bool:
|
||||||
filename = activity_units_path(activity, units_dir)
|
"""
|
||||||
with filename.open("r", encoding="utf-8") as f:
|
Returns True if the activity has multiple units that needed to be converted.
|
||||||
raw_data = json.load(f)
|
i.e. distance may be sent in feet, but would require to be converted in meters.
|
||||||
return bool(raw_data.get("convertable"))
|
:param activity:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
filename = activity_units_path(activity)
|
||||||
|
raw_data = json.load(open(filename))
|
||||||
|
return raw_data.get("convertable")
|
||||||
|
|
||||||
|
|
||||||
def create_activity_log_file(
|
|
||||||
activity: str,
|
def create_activity_log_file(activity: str) -> None:
|
||||||
*,
|
"""
|
||||||
units_dir: Optional[Path] = None,
|
Creates the activity log file in .csv format
|
||||||
logs_dir: Optional[Path] = None,
|
:param activity:
|
||||||
) -> None:
|
:return:
|
||||||
if not activity_units_path(activity, units_dir).exists():
|
"""
|
||||||
|
if not os.path.exists(activity_units_path(activity)):
|
||||||
raise ValueError(f"{activity} is not a valid activity")
|
raise ValueError(f"{activity} is not a valid activity")
|
||||||
|
if not os.path.exists("activity_logs"):
|
||||||
logs_dir = logs_dir or (Path.cwd() / "activity_logs")
|
os.makedirs("activity_logs")
|
||||||
logs_dir.mkdir(parents=True, exist_ok=True)
|
filename = activity_log_path(activity)
|
||||||
|
with open(filename, 'a') as f:
|
||||||
filename = activity_log_path(activity, logs_dir)
|
|
||||||
if filename.exists():
|
|
||||||
return # don't re-add headers
|
|
||||||
|
|
||||||
with filename.open("w", newline="", encoding="utf-8") as f:
|
|
||||||
writer = csv.writer(f)
|
writer = csv.writer(f)
|
||||||
writer.writerow(["timestamp", "username", "value"])
|
writer.writerow(["timestamp", "username", "value"])
|
||||||
|
|
||||||
|
|
||||||
def convert_units(
|
|
||||||
activity: str,
|
def convert_units(activity: str, value: int | float, unit: str) -> float:
|
||||||
value: int | float,
|
"""
|
||||||
unit: str | None,
|
Returns the value of activity in a factor of 1, i.e. converting feet to meters
|
||||||
*,
|
:param activity:
|
||||||
units_dir: Optional[Path] = None,
|
:param value:
|
||||||
) -> float:
|
:param unit:
|
||||||
filename = activity_units_path(activity, units_dir)
|
:return:
|
||||||
if not filename.exists():
|
"""
|
||||||
|
filename = activity_units_path(activity)
|
||||||
|
if not os.path.exists(filename):
|
||||||
raise ValueError(f"{activity} is not a valid activity")
|
raise ValueError(f"{activity} is not a valid activity")
|
||||||
|
|
||||||
# Consider None unit value as factor of 1
|
# Consider None unit value as factor of 1
|
||||||
if unit is None:
|
if unit is None:
|
||||||
return float(value)
|
return value
|
||||||
|
|
||||||
with filename.open("r", encoding="utf-8") as f:
|
raw_data = json.load(open(filename))
|
||||||
raw_data = json.load(f)
|
units_data = raw_data.get("units")
|
||||||
|
|
||||||
units_data = raw_data.get("units") or {}
|
for unit_name, data in units_data.items():
|
||||||
unit_l = unit.lower()
|
if unit in data["aliases"]:
|
||||||
|
return value * data["factor"]
|
||||||
for _, data in units_data.items():
|
|
||||||
aliases = [a.lower() for a in (data.get("aliases") or [])]
|
|
||||||
if unit_l in aliases:
|
|
||||||
return float(value) * float(data["factor"])
|
|
||||||
|
|
||||||
raise ValueError(f"{unit} is not a valid unit")
|
raise ValueError(f"{unit} is not a valid unit")
|
||||||
|
|
||||||
@ -82,24 +84,36 @@ def log_activity(
|
|||||||
username: str,
|
username: str,
|
||||||
activity: str,
|
activity: str,
|
||||||
value: int | float,
|
value: int | float,
|
||||||
unit: str | None,
|
unit: str | None
|
||||||
*,
|
|
||||||
units_dir: Optional[Path] = None,
|
|
||||||
logs_dir: Optional[Path] = None,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""
|
||||||
if not isinstance(value, (int, float)) or value <= 0:
|
Logs the activity in .csv file
|
||||||
|
:param timestamp:
|
||||||
|
:param username:
|
||||||
|
:param activity:
|
||||||
|
:param value:
|
||||||
|
:param unit:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if value <= 0 or not isinstance(value, int | float):
|
||||||
raise ValueError(f"{value} is not a valid value")
|
raise ValueError(f"{value} is not a valid value")
|
||||||
|
filename = activity_log_path(activity)
|
||||||
filename = activity_log_path(activity, logs_dir)
|
if not os.path.exists(filename):
|
||||||
if not filename.exists():
|
create_activity_log_file(activity)
|
||||||
create_activity_log_file(activity, units_dir=units_dir, logs_dir=logs_dir)
|
if is_convertable(activity):
|
||||||
|
converted_value = round(convert_units(activity, value, unit), 2)
|
||||||
if is_convertable(activity, units_dir):
|
|
||||||
converted_value = round(convert_units(activity, value, unit, units_dir=units_dir), 2)
|
|
||||||
else:
|
else:
|
||||||
converted_value = round(float(value), 2)
|
converted_value = round(value, 2)
|
||||||
|
with open(filename, 'a') as f:
|
||||||
with filename.open("a", newline="", encoding="utf-8") as f:
|
|
||||||
writer = csv.writer(f)
|
writer = csv.writer(f)
|
||||||
writer.writerow([timestamp, username, converted_value])
|
writer.writerow([timestamp, username, converted_value])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
log_activity(
|
||||||
|
timestamp=0,
|
||||||
|
username="test",
|
||||||
|
activity="climb",
|
||||||
|
value=1,
|
||||||
|
unit="m"
|
||||||
|
)
|
||||||
|
|||||||
@ -3,7 +3,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import discord
|
import discord
|
||||||
from redbot.core import commands
|
from redbot.core import commands
|
||||||
from redbot.core.data_manager import cog_data_path
|
from redbot.core.data_manager import bundled_data_path
|
||||||
|
|
||||||
from .activity_logger import log_activity
|
from .activity_logger import log_activity
|
||||||
|
|
||||||
@ -15,20 +15,21 @@ class FitnessCog(commands.Cog):
|
|||||||
default_threads = "1457402451530350727,1328363409648910356"
|
default_threads = "1457402451530350727,1328363409648910356"
|
||||||
threads_raw = os.getenv("THREADS_ID", default_threads)
|
threads_raw = os.getenv("THREADS_ID", default_threads)
|
||||||
|
|
||||||
self.threads_id: list[int] = []
|
self.threads_id = []
|
||||||
for x in threads_raw.split(","):
|
for x in threads_raw.split(","):
|
||||||
x = x.strip()
|
x = x.strip()
|
||||||
if x.isdigit():
|
if x.isdigit():
|
||||||
self.threads_id.append(int(x))
|
self.threads_id.append(int(x))
|
||||||
|
|
||||||
self.confirm_reactions = os.getenv("CONFIRMATION_REACTIONS", "🐈💨")
|
self.confirm_reactions = os.getenv("CONFIRMATION_REACTIONS", "🐈💨")
|
||||||
|
self.bundled_activities_dir: Path = bundled_data_path(self) / "activities"
|
||||||
|
|
||||||
# Units are shipped with the cog in: fitnessCog/activities/*.json
|
# ---- path helpers ----
|
||||||
self.units_dir: Path = Path(__file__).parent / "activities"
|
def activity_units_path(self, activity: str) -> Path:
|
||||||
|
return self.bundled_activities_dir / f"{activity}.json"
|
||||||
|
|
||||||
# Logs are writable and belong in Red's data dir
|
def activity_log_path(self, activity: str) -> Path:
|
||||||
self.logs_dir: Path = cog_data_path(self) / "activity_logs"
|
return self.bundled_activities_dir / f"{activity}_log.json"
|
||||||
self.logs_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
@commands.Cog.listener()
|
@commands.Cog.listener()
|
||||||
async def on_message(self, message: discord.Message):
|
async def on_message(self, message: discord.Message):
|
||||||
@ -66,8 +67,7 @@ class FitnessCog(commands.Cog):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
activity = activity.lower().strip()
|
path = self.activity_units_path(activity)
|
||||||
path = self.units_dir / f"{activity}.json"
|
|
||||||
|
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
await message.channel.send(f"No units JSON found for `{activity}`.")
|
await message.channel.send(f"No units JSON found for `{activity}`.")
|
||||||
@ -86,11 +86,10 @@ class FitnessCog(commands.Cog):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
activity = activity.lower().strip()
|
path = self.activity_log_path(activity)
|
||||||
path = self.logs_dir / f"{activity}.csv"
|
|
||||||
|
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
await message.channel.send(f"No log CSV found for `{activity}`.")
|
await message.channel.send(f"No log JSON found for `{activity}`.")
|
||||||
return
|
return
|
||||||
|
|
||||||
await message.channel.send(file=discord.File(fp=str(path)))
|
await message.channel.send(file=discord.File(fp=str(path)))
|
||||||
@ -122,15 +121,7 @@ class FitnessCog(commands.Cog):
|
|||||||
timestamp = int(message.created_at.timestamp())
|
timestamp = int(message.created_at.timestamp())
|
||||||
username = message.author.name
|
username = message.author.name
|
||||||
|
|
||||||
log_activity(
|
log_activity(timestamp, username, activity, value, unit)
|
||||||
timestamp,
|
|
||||||
username,
|
|
||||||
activity,
|
|
||||||
value,
|
|
||||||
unit,
|
|
||||||
units_dir=self.units_dir,
|
|
||||||
logs_dir=self.logs_dir,
|
|
||||||
)
|
|
||||||
|
|
||||||
for r in self.confirm_reactions:
|
for r in self.confirm_reactions:
|
||||||
await message.add_reaction(r)
|
await message.add_reaction(r)
|
||||||
|
|||||||
@ -1,24 +1,15 @@
|
|||||||
import functools
|
|
||||||
import inspect
|
|
||||||
|
|
||||||
|
|
||||||
def debug(func):
|
def debug(func):
|
||||||
if inspect.iscoroutinefunction(func):
|
|
||||||
|
|
||||||
@functools.wraps(func)
|
|
||||||
async def wrap(*args, **kwargs):
|
|
||||||
print(f"DEBUG: Calling {func.__name__} with args: {args}, kwargs: {kwargs}")
|
|
||||||
result = await func(*args, **kwargs)
|
|
||||||
print(f"DEBUG: {func.__name__} returned: {result}")
|
|
||||||
return result
|
|
||||||
|
|
||||||
return wrap
|
|
||||||
|
|
||||||
@functools.wraps(func)
|
|
||||||
def wrap(*args, **kwargs):
|
def wrap(*args, **kwargs):
|
||||||
|
# Log the function name and arguments
|
||||||
print(f"DEBUG: Calling {func.__name__} with args: {args}, kwargs: {kwargs}")
|
print(f"DEBUG: Calling {func.__name__} with args: {args}, kwargs: {kwargs}")
|
||||||
|
|
||||||
|
# Call the original function
|
||||||
result = func(*args, **kwargs)
|
result = func(*args, **kwargs)
|
||||||
|
|
||||||
|
# Log the return value
|
||||||
print(f"DEBUG: {func.__name__} returned: {result}")
|
print(f"DEBUG: {func.__name__} returned: {result}")
|
||||||
|
|
||||||
|
# Return the result
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return wrap
|
return wrap
|
||||||
|
|||||||
@ -1,19 +1,18 @@
|
|||||||
import datetime
|
|
||||||
import random
|
|
||||||
import re
|
import re
|
||||||
from collections import Counter
|
import random
|
||||||
|
import datetime
|
||||||
import discord
|
import discord
|
||||||
import openai
|
import openai
|
||||||
|
from collections import Counter
|
||||||
|
from redbot.core import commands, Config
|
||||||
from openai import OpenAIError
|
from openai import OpenAIError
|
||||||
from redbot.core import commands
|
|
||||||
|
|
||||||
|
|
||||||
class MemoryMixin:
|
class MemoryMixin:
|
||||||
"""Handles all memory-related functions for Reginald."""
|
"""Handles all memory-related functions for Reginald."""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs) # ✅ Ensure cooperative MRO initialization
|
||||||
self.short_term_memory_limit = 50
|
self.short_term_memory_limit = 50
|
||||||
self.summary_retention_limit = 25
|
self.summary_retention_limit = 25
|
||||||
self.summary_retention_ratio = 0.8
|
self.summary_retention_ratio = 0.8
|
||||||
@ -21,73 +20,82 @@ class MemoryMixin:
|
|||||||
@commands.command(name="reginald_clear_short", help="Clears short-term memory for this channel.")
|
@commands.command(name="reginald_clear_short", help="Clears short-term memory for this channel.")
|
||||||
@commands.has_permissions(administrator=True)
|
@commands.has_permissions(administrator=True)
|
||||||
async def clear_short_memory(self, ctx):
|
async def clear_short_memory(self, ctx):
|
||||||
|
"""Clears short-term memory for this channel."""
|
||||||
async with self.config.guild(ctx.guild).short_term_memory() as short_memory:
|
async with self.config.guild(ctx.guild).short_term_memory() as short_memory:
|
||||||
short_memory[str(ctx.channel.id)] = []
|
short_memory[ctx.channel.id] = []
|
||||||
await ctx.send("Short-term memory for this channel has been cleared.")
|
await ctx.send("Short-term memory for this channel has been cleared.")
|
||||||
|
|
||||||
|
|
||||||
@commands.command(name="reginald_set_limit", help="Set the short-term memory message limit.")
|
@commands.command(name="reginald_set_limit", help="Set the short-term memory message limit.")
|
||||||
@commands.has_permissions(administrator=True)
|
@commands.has_permissions(administrator=True)
|
||||||
async def set_short_term_memory_limit(self, ctx, limit: int):
|
async def set_short_term_memory_limit(self, ctx, limit: int):
|
||||||
|
"""Allows an admin to change the short-term memory limit dynamically."""
|
||||||
if limit < 5:
|
if limit < 5:
|
||||||
await ctx.send("The short-term memory limit must be at least 5.")
|
await ctx.send("⚠️ The short-term memory limit must be at least 5.")
|
||||||
return
|
return
|
||||||
|
|
||||||
self.short_term_memory_limit = limit
|
self.short_term_memory_limit = limit
|
||||||
await ctx.send(f"Short-term memory limit set to {limit} messages.")
|
await ctx.send(f"✅ Short-term memory limit set to {limit} messages.")
|
||||||
|
|
||||||
|
|
||||||
@commands.command(name="reginald_memory_limit", help="Displays the current short-term memory message limit.")
|
@commands.command(name="reginald_memory_limit", help="Displays the current short-term memory message limit.")
|
||||||
async def get_short_term_memory_limit(self, ctx):
|
async def get_short_term_memory_limit(self, ctx):
|
||||||
await ctx.send(f"Current short-term memory limit: {self.short_term_memory_limit} messages.")
|
"""Displays the current short-term memory limit."""
|
||||||
|
await ctx.send(f"📏 **Current Short-Term Memory Limit:** {self.short_term_memory_limit} messages.")
|
||||||
|
|
||||||
@commands.command(name="reginald_clear_mid", help="Clears mid-term memory (summarized logs).")
|
@commands.command(name="reginald_clear_mid", help="Clears mid-term memory (summarized logs).")
|
||||||
@commands.has_permissions(administrator=True)
|
@commands.has_permissions(administrator=True)
|
||||||
async def clear_mid_memory(self, ctx):
|
async def clear_mid_memory(self, ctx):
|
||||||
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
||||||
mid_memory[str(ctx.channel.id)] = []
|
mid_memory[ctx.channel.id] = ""
|
||||||
await ctx.send("Mid-term memory for this channel has been cleared.")
|
await ctx.send("Mid-term memory for this channel has been cleared.")
|
||||||
|
|
||||||
@commands.command(name="reginald_summary", help="Displays a selected mid-term summary for this channel.")
|
@commands.command(name="reginald_summary", help="Displays a selected mid-term summary for this channel.")
|
||||||
async def get_mid_term_summary(self, ctx, index: int):
|
async def get_mid_term_summary(self, ctx, index: int):
|
||||||
|
"""Fetch and display a specific mid-term memory summary by index."""
|
||||||
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
||||||
summaries = mid_memory.get(str(ctx.channel.id), [])
|
summaries = mid_memory.get(str(ctx.channel.id), [])
|
||||||
if not isinstance(summaries, list):
|
|
||||||
summaries = []
|
|
||||||
|
|
||||||
|
# Check if there are summaries
|
||||||
if not summaries:
|
if not summaries:
|
||||||
await ctx.send("No summaries available for this channel.")
|
await ctx.send("⚠️ No summaries available for this channel.")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Validate index (1-based for user-friendliness)
|
||||||
if index < 1 or index > len(summaries):
|
if index < 1 or index > len(summaries):
|
||||||
await ctx.send(f"Invalid index. Please provide a number between 1 and {len(summaries)}.")
|
await ctx.send(f"⚠️ Invalid index. Please provide a number between **1** and **{len(summaries)}**.")
|
||||||
return
|
return
|
||||||
|
|
||||||
selected_summary = summaries[index - 1]
|
# Fetch the selected summary
|
||||||
|
selected_summary = summaries[index - 1] # Convert to 0-based index
|
||||||
|
|
||||||
|
# Format output correctly
|
||||||
formatted_summary = (
|
formatted_summary = (
|
||||||
f"Summary {index} of {len(summaries)}\n"
|
f"📜 **Summary {index} of {len(summaries)}**\n"
|
||||||
f"Date: {selected_summary.get('timestamp', 'Unknown')}\n"
|
f"📅 **Date:** {selected_summary['timestamp']}\n"
|
||||||
f"Topics: {', '.join(selected_summary.get('topics', [])) or 'None'}\n"
|
f"🔍 **Topics:** {', '.join(selected_summary['topics']) or 'None'}\n"
|
||||||
f"Summary:\n\n{selected_summary.get('summary', '')}"
|
f"📝 **Summary:**\n\n"
|
||||||
|
f"{selected_summary['summary']}"
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.send_long_message(ctx, formatted_summary)
|
await self.send_long_message(ctx, formatted_summary)
|
||||||
|
|
||||||
@commands.command(name="reginald_summaries", help="Lists available summaries for this channel.")
|
@commands.command(name="reginald_summaries", help="Lists available summaries for this channel.")
|
||||||
async def list_mid_term_summaries(self, ctx):
|
async def list_mid_term_summaries(self, ctx):
|
||||||
|
"""Displays a brief list of all available mid-term memory summaries."""
|
||||||
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
async with self.config.guild(ctx.guild).mid_term_memory() as mid_memory:
|
||||||
summaries = mid_memory.get(str(ctx.channel.id), [])
|
summaries = mid_memory.get(str(ctx.channel.id), [])
|
||||||
if not isinstance(summaries, list):
|
|
||||||
summaries = []
|
|
||||||
|
|
||||||
if not summaries:
|
if not summaries:
|
||||||
await ctx.send("No summaries available for this channel.")
|
await ctx.send("⚠️ No summaries available for this channel.")
|
||||||
return
|
return
|
||||||
|
|
||||||
summary_list = "\n".join(
|
summary_list = "\n".join(
|
||||||
f"{i + 1}. {entry.get('timestamp', 'Unknown')} | Topics: {', '.join(entry.get('topics', [])) or 'None'}"
|
f"**{i+1}.** 📅 {entry['timestamp']} | 🔍 Topics: {', '.join(entry['topics']) or 'None'}"
|
||||||
for i, entry in enumerate(summaries)
|
for i, entry in enumerate(summaries)
|
||||||
)
|
)
|
||||||
|
|
||||||
await ctx.send(f"Available summaries:\n{summary_list[:2000]}")
|
await ctx.send(f"📚 **Available Summaries:**\n{summary_list[:2000]}")
|
||||||
|
|
||||||
@commands.command(name="reginald_clear_long", help="Clears all long-term stored knowledge.")
|
@commands.command(name="reginald_clear_long", help="Clears all long-term stored knowledge.")
|
||||||
@commands.has_permissions(administrator=True)
|
@commands.has_permissions(administrator=True)
|
||||||
@ -110,14 +118,8 @@ class MemoryMixin:
|
|||||||
@commands.command(name="reginald_recall", help="Recalls what Reginald knows about a user.")
|
@commands.command(name="reginald_recall", help="Recalls what Reginald knows about a user.")
|
||||||
async def recall_user(self, ctx, user: discord.User):
|
async def recall_user(self, ctx, user: discord.User):
|
||||||
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
profile = long_memory.get(str(user.id), {})
|
profile = long_memory.get(str(user.id), {}).get("summary", "No stored information on this user.")
|
||||||
facts = profile.get("facts", [])
|
await ctx.send(f"📜 **Memory Recall for {user.display_name}:** {profile}")
|
||||||
|
|
||||||
if facts:
|
|
||||||
recall_lines = [f"- {fact.get('fact', '')}" for fact in facts[:10]]
|
|
||||||
await ctx.send(f"Memory recall for {user.display_name}:\n" + "\n".join(recall_lines))
|
|
||||||
else:
|
|
||||||
await ctx.send(f"No stored information on {user.display_name}.")
|
|
||||||
|
|
||||||
@commands.command(name="reginald_forget", help="Forgets a specific user's long-term profile.")
|
@commands.command(name="reginald_forget", help="Forgets a specific user's long-term profile.")
|
||||||
@commands.has_permissions(administrator=True)
|
@commands.has_permissions(administrator=True)
|
||||||
@ -130,153 +132,153 @@ class MemoryMixin:
|
|||||||
await ctx.send(f"No stored knowledge about {user.display_name} to delete.")
|
await ctx.send(f"No stored knowledge about {user.display_name} to delete.")
|
||||||
|
|
||||||
async def summarize_memory(self, ctx, messages):
|
async def summarize_memory(self, ctx, messages):
|
||||||
|
"""✅ Generates a structured, compact summary of past conversations for mid-term storage."""
|
||||||
summary_prompt = (
|
summary_prompt = (
|
||||||
"Summarize the following conversation into a structured, concise format that retains key details while maximizing brevity. "
|
"Summarize the following conversation into a structured, concise format that retains key details while maximizing brevity. "
|
||||||
"Organize into sections: Key Takeaways, Disputed Points, Notable User Contributions, and Additional Context. "
|
"The summary should be **organized** into clear sections: "
|
||||||
"Avoid repetition while preserving essential meaning."
|
"\n\n📌 **Key Takeaways:** Important facts or conclusions reached."
|
||||||
|
"\n🔹 **Disputed Points:** Areas where opinions or facts conflicted."
|
||||||
|
"\n🗣️ **Notable User Contributions:** Key statements from users that shaped the discussion."
|
||||||
|
"\n📜 **Additional Context:** Any other relevant information."
|
||||||
|
"\n\nEnsure the summary is **dense but not overly verbose**. Avoid unnecessary repetition while keeping essential meaning intact."
|
||||||
)
|
)
|
||||||
|
|
||||||
summary_text = "\n".join(f"{msg.get('user', 'Unknown')}: {msg.get('content', '')}" for msg in messages)
|
summary_text = "\n".join(f"{msg['user']}: {msg['content']}" for msg in messages)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
api_key = await self.config.guild(ctx.guild).openai_api_key()
|
api_key = await self.config.guild(ctx.guild).openai_api_key()
|
||||||
if not api_key:
|
if not api_key:
|
||||||
|
print("🛠️ DEBUG: No API key found for summarization.")
|
||||||
return (
|
return (
|
||||||
"It appears that I have not been furnished with the necessary credentials to carry out this task. "
|
"It appears that I have not been furnished with the necessary credentials to carry out this task. "
|
||||||
"Might I suggest consulting an administrator to rectify this unfortunate oversight?"
|
"Might I suggest consulting an administrator to rectify this unfortunate oversight?"
|
||||||
)
|
)
|
||||||
|
|
||||||
model = await self.config.openai_model() or "gpt-4o-mini"
|
client = openai.AsyncClient(api_key=api_key)
|
||||||
client = openai.AsyncOpenAI(api_key=api_key)
|
|
||||||
response = await client.chat.completions.create(
|
response = await client.chat.completions.create(
|
||||||
model=model,
|
model="gpt-4o-mini",
|
||||||
messages=[
|
messages=[
|
||||||
{"role": "system", "content": summary_prompt},
|
{"role": "system", "content": summary_prompt},
|
||||||
{"role": "user", "content": summary_text},
|
{"role": "user", "content": summary_text}
|
||||||
],
|
],
|
||||||
max_tokens=2048,
|
max_tokens=2048
|
||||||
)
|
)
|
||||||
|
|
||||||
summary_content = (response.choices[0].message.content or "").strip()
|
summary_content = response.choices[0].message.content.strip()
|
||||||
|
|
||||||
if not summary_content:
|
if not summary_content:
|
||||||
|
print("🛠️ DEBUG: Empty summary received from OpenAI.")
|
||||||
return (
|
return (
|
||||||
"Ah, an unusual predicament indeed. It seems that my attempt at summarization has resulted in "
|
"Ah, an unusual predicament indeed! It seems that my attempt at summarization has resulted in "
|
||||||
"a void of information. I shall endeavor to be more verbose next time."
|
"a void of information. I shall endeavor to be more verbose next time."
|
||||||
)
|
)
|
||||||
|
|
||||||
return summary_content
|
return summary_content
|
||||||
|
|
||||||
except OpenAIError as error:
|
except OpenAIError as e:
|
||||||
error_message = f"OpenAI Error: {error}"
|
error_message = f"OpenAI Error: {e}"
|
||||||
|
print(f"🛠️ DEBUG: {error_message}") # Log error to console
|
||||||
|
|
||||||
reginald_responses = [
|
reginald_responses = [
|
||||||
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction whilst attempting to summarize:\n\n{error_message}",
|
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction whilst attempting to summarize:\n\n{error_message}",
|
||||||
f"It would seem that a most unfortunate technical hiccup has befallen my faculties in the matter of summarization:\n\n{error_message}",
|
f"It would seem that a most unfortunate technical hiccup has befallen my faculties in the matter of summarization:\n\n{error_message}",
|
||||||
f"Ah, it appears I have received an urgent memorandum stating that my summarization efforts have been thwarted:\n\n{error_message}",
|
f"Ah, it appears I have received an urgent memorandum stating that my summarization efforts have been thwarted:\n\n{error_message}",
|
||||||
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication while summarizing:\n\n{error_message}",
|
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication while summarizing:\n\n{error_message}"
|
||||||
]
|
]
|
||||||
|
|
||||||
return random.choice(reginald_responses)
|
return random.choice(reginald_responses)
|
||||||
|
|
||||||
def extract_topics_from_summary(self, summary):
|
def extract_topics_from_summary(self, summary):
|
||||||
|
"""Dynamically extracts the most important topics from a summary."""
|
||||||
|
|
||||||
|
# 🔹 Extract all words from summary
|
||||||
keywords = re.findall(r"\b\w+\b", summary.lower())
|
keywords = re.findall(r"\b\w+\b", summary.lower())
|
||||||
|
|
||||||
|
# 🔹 Count word occurrences
|
||||||
word_counts = Counter(keywords)
|
word_counts = Counter(keywords)
|
||||||
|
|
||||||
stop_words = {
|
# 🔹 Remove unimportant words (common filler words)
|
||||||
"the",
|
stop_words = {"the", "and", "of", "in", "to", "is", "on", "for", "with", "at", "by", "it", "this", "that", "his", "her"}
|
||||||
"and",
|
filtered_words = {word: count for word, count in word_counts.items() if word not in stop_words and len(word) > 2}
|
||||||
"of",
|
|
||||||
"in",
|
|
||||||
"to",
|
|
||||||
"is",
|
|
||||||
"on",
|
|
||||||
"for",
|
|
||||||
"with",
|
|
||||||
"at",
|
|
||||||
"by",
|
|
||||||
"it",
|
|
||||||
"this",
|
|
||||||
"that",
|
|
||||||
"his",
|
|
||||||
"her",
|
|
||||||
}
|
|
||||||
filtered_words = {
|
|
||||||
word: count
|
|
||||||
for word, count in word_counts.items()
|
|
||||||
if word not in stop_words and len(word) > 2
|
|
||||||
}
|
|
||||||
|
|
||||||
|
# 🔹 Take the 5 most frequently used words as "topics"
|
||||||
topics = sorted(filtered_words, key=filtered_words.get, reverse=True)[:5]
|
topics = sorted(filtered_words, key=filtered_words.get, reverse=True)[:5]
|
||||||
|
|
||||||
return topics
|
return topics
|
||||||
|
|
||||||
def select_relevant_summaries(self, summaries, prompt):
|
def select_relevant_summaries(self, summaries, prompt):
|
||||||
summaries = [summary for summary in summaries if isinstance(summary, dict)]
|
"""Selects the most relevant summaries based on topic matching, frequency, and recency weighting."""
|
||||||
if not summaries:
|
|
||||||
return []
|
|
||||||
|
|
||||||
max_summaries = 5 if len(prompt) > 50 else 3
|
max_summaries = 5 if len(prompt) > 50 else 3 # Use more summaries if the prompt is long
|
||||||
current_time = datetime.datetime.now()
|
current_time = datetime.datetime.now()
|
||||||
|
|
||||||
def calculate_weight(summary):
|
def calculate_weight(summary):
|
||||||
topics = summary.get("topics", [])
|
"""Calculate a weighted score for a summary based on relevance, recency, and frequency."""
|
||||||
topic_match = sum(1 for topic in topics if topic in prompt.lower())
|
topic_match = sum(1 for topic in summary["topics"] if topic in prompt.lower()) # Context match score
|
||||||
frequency_score = len(topics)
|
frequency_score = len(summary["topics"]) # More topics = likely more important
|
||||||
|
timestamp = datetime.datetime.strptime(summary["timestamp"], "%Y-%m-%d %H:%M")
|
||||||
try:
|
recency_factor = max(0.1, 1 - ((current_time - timestamp).days / 365)) # Older = lower weight
|
||||||
timestamp = datetime.datetime.strptime(summary.get("timestamp", ""), "%Y-%m-%d %H:%M")
|
|
||||||
recency_factor = max(0.1, 1 - ((current_time - timestamp).days / 365))
|
|
||||||
except ValueError:
|
|
||||||
recency_factor = 0.1
|
|
||||||
|
|
||||||
return (topic_match * 2) + (frequency_score * 1.5) + (recency_factor * 3)
|
return (topic_match * 2) + (frequency_score * 1.5) + (recency_factor * 3)
|
||||||
|
|
||||||
|
# Apply the weighting function and sort by highest weight
|
||||||
weighted_summaries = sorted(summaries, key=calculate_weight, reverse=True)
|
weighted_summaries = sorted(summaries, key=calculate_weight, reverse=True)
|
||||||
return weighted_summaries[:max_summaries]
|
|
||||||
|
return weighted_summaries[:max_summaries] # Return the top-scoring summaries
|
||||||
|
|
||||||
def extract_fact_from_response(self, response_text):
|
def extract_fact_from_response(self, response_text):
|
||||||
|
"""
|
||||||
|
Extracts potential long-term knowledge from Reginald's response.
|
||||||
|
This filters out generic responses and focuses on statements about user preferences, traits, and history.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Define patterns that suggest factual knowledge (adjust as needed)
|
||||||
fact_patterns = [
|
fact_patterns = [
|
||||||
r"I recall that you (.*?)\.",
|
r"I recall that you (.*?)\.", # "I recall that you like chess."
|
||||||
r"You once mentioned that you (.*?)\.",
|
r"You once mentioned that you (.*?)\.", # "You once mentioned that you enjoy strategy games."
|
||||||
r"Ah, you previously stated that (.*?)\.",
|
r"Ah, you previously stated that (.*?)\.", # "Ah, you previously stated that you prefer tea over coffee."
|
||||||
r"As I remember, you (.*?)\.",
|
r"As I remember, you (.*?)\.", # "As I remember, you studied engineering."
|
||||||
r"I believe you (.*?)\.",
|
r"I believe you (.*?)\.", # "I believe you enjoy historical fiction."
|
||||||
r"I seem to recall that you (.*?)\.",
|
r"I seem to recall that you (.*?)\.", # "I seem to recall that you work in software development."
|
||||||
r"You have indicated in the past that you (.*?)\.",
|
r"You have indicated in the past that you (.*?)\.", # "You have indicated in the past that you prefer single-malt whisky."
|
||||||
r"From what I remember, you (.*?)\.",
|
r"From what I remember, you (.*?)\.", # "From what I remember, you dislike overly sweet desserts."
|
||||||
r"You previously mentioned that (.*?)\.",
|
r"You previously mentioned that (.*?)\.", # "You previously mentioned that you train in martial arts."
|
||||||
r"It is my understanding that you (.*?)\.",
|
r"It is my understanding that you (.*?)\.", # "It is my understanding that you have a preference for Linux systems."
|
||||||
r"If I am not mistaken, you (.*?)\.",
|
r"If I am not mistaken, you (.*?)\.", # "If I am not mistaken, you studied philosophy."
|
||||||
]
|
]
|
||||||
|
|
||||||
for pattern in fact_patterns:
|
for pattern in fact_patterns:
|
||||||
match = re.search(pattern, response_text, re.IGNORECASE)
|
match = re.search(pattern, response_text, re.IGNORECASE)
|
||||||
if match:
|
if match:
|
||||||
return match.group(1)
|
return match.group(1) # Extract the meaningful fact
|
||||||
|
|
||||||
return None
|
return None # No strong fact found
|
||||||
|
|
||||||
@commands.command(name="reginald_memory_status", help="Displays a memory usage summary.")
|
@commands.command(name="reginald_memory_status", help="Displays a memory usage summary.")
|
||||||
async def memory_status(self, ctx):
|
async def memory_status(self, ctx):
|
||||||
async with self.config.guild(ctx.guild).short_term_memory() as short_memory, self.config.guild(
|
async with self.config.guild(ctx.guild).short_term_memory() as short_memory, \
|
||||||
ctx.guild
|
self.config.guild(ctx.guild).mid_term_memory() as mid_memory, \
|
||||||
).mid_term_memory() as mid_memory, self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
short_count = sum(len(v) for v in short_memory.values() if isinstance(v, list))
|
|
||||||
mid_count = sum(len(v) for v in mid_memory.values() if isinstance(v, list))
|
short_count = sum(len(v) for v in short_memory.values())
|
||||||
|
mid_count = sum(len(v) for v in mid_memory.values())
|
||||||
long_count = len(long_memory)
|
long_count = len(long_memory)
|
||||||
|
|
||||||
status_message = (
|
status_message = (
|
||||||
"Memory status:\n"
|
f"📊 **Memory Status:**\n"
|
||||||
f"- Short-term messages stored: {short_count}\n"
|
f"- **Short-Term Messages Stored:** {short_count}\n"
|
||||||
f"- Mid-term summaries stored: {mid_count}\n"
|
f"- **Mid-Term Summaries Stored:** {mid_count}\n"
|
||||||
f"- Long-term profiles stored: {long_count}\n"
|
f"- **Long-Term Profiles Stored:** {long_count}\n"
|
||||||
)
|
)
|
||||||
await ctx.send(status_message)
|
await ctx.send(status_message)
|
||||||
|
|
||||||
def normalize_fact(self, fact: str) -> str:
|
def normalize_fact(self, fact: str) -> str: # ✅ Now it's a proper method
|
||||||
return re.sub(r"\s+", " ", fact.strip().lower())
|
"""Cleans up facts for better duplicate detection."""
|
||||||
|
return re.sub(r"\s+", " ", fact.strip().lower()) # Removes excess spaces)
|
||||||
|
|
||||||
async def update_long_term_memory(self, ctx, user_id: str, fact: str, source_message: str, timestamp: str):
|
async def update_long_term_memory(self, ctx, user_id: str, fact: str, source_message: str, timestamp: str):
|
||||||
fact = self.normalize_fact(fact)
|
"""Ensures long-term memory updates are structured, preventing overwrites and tracking historical changes."""
|
||||||
if not fact:
|
fact = self.normalize_fact(fact) # ✅ Normalize before comparison
|
||||||
return
|
|
||||||
|
|
||||||
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
async with self.config.guild(ctx.guild).long_term_profiles() as long_memory:
|
||||||
if user_id not in long_memory:
|
if user_id not in long_memory:
|
||||||
@ -285,38 +287,38 @@ class MemoryMixin:
|
|||||||
user_facts = long_memory[user_id]["facts"]
|
user_facts = long_memory[user_id]["facts"]
|
||||||
|
|
||||||
for entry in user_facts:
|
for entry in user_facts:
|
||||||
if self.normalize_fact(entry.get("fact", "")) == fact:
|
if self.normalize_fact(entry["fact"]) == fact:
|
||||||
entry["last_updated"] = timestamp
|
entry["last_updated"] = timestamp
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Check for conflicting facts (same topic but different details)
|
||||||
conflicting_entry = None
|
conflicting_entry = None
|
||||||
for entry in user_facts:
|
for entry in user_facts:
|
||||||
existing_keywords = set(entry.get("fact", "").lower().split())
|
existing_keywords = set(entry["fact"].lower().split())
|
||||||
new_keywords = set(fact.lower().split())
|
new_keywords = set(fact.lower().split())
|
||||||
|
|
||||||
|
# If there's significant overlap in keywords, assume it's a conflicting update
|
||||||
if len(existing_keywords & new_keywords) >= 2:
|
if len(existing_keywords & new_keywords) >= 2:
|
||||||
conflicting_entry = entry
|
conflicting_entry = entry
|
||||||
break
|
break
|
||||||
|
|
||||||
if conflicting_entry is not None:
|
if "previous_versions" not in conflicting_entry:
|
||||||
conflicting_entry.setdefault("previous_versions", [])
|
# ✅ If contradiction found, archive the previous version
|
||||||
conflicting_entry["previous_versions"].append(
|
conflicting_entry["previous_versions"].append({
|
||||||
{
|
"fact": conflicting_entry["fact"],
|
||||||
"fact": conflicting_entry.get("fact", ""),
|
"source": conflicting_entry["source"],
|
||||||
"source": conflicting_entry.get("source", ""),
|
"timestamp": conflicting_entry["timestamp"]
|
||||||
"timestamp": conflicting_entry.get("timestamp", ""),
|
})
|
||||||
}
|
conflicting_entry["fact"] = fact # Store the latest fact
|
||||||
)
|
|
||||||
conflicting_entry["fact"] = fact
|
|
||||||
conflicting_entry["source"] = source_message
|
conflicting_entry["source"] = source_message
|
||||||
conflicting_entry["timestamp"] = timestamp
|
conflicting_entry["timestamp"] = timestamp
|
||||||
conflicting_entry["last_updated"] = timestamp
|
conflicting_entry["last_updated"] = timestamp
|
||||||
else:
|
else:
|
||||||
user_facts.append(
|
# ✅ Otherwise, add it as a new fact
|
||||||
{
|
user_facts.append({
|
||||||
"fact": fact,
|
"fact": fact,
|
||||||
"source": source_message,
|
"source": source_message,
|
||||||
"timestamp": timestamp,
|
"timestamp": timestamp,
|
||||||
"last_updated": timestamp,
|
"last_updated": timestamp,
|
||||||
"previous_versions": [],
|
"previous_versions": []
|
||||||
}
|
})
|
||||||
)
|
|
||||||
@ -1,23 +1,18 @@
|
|||||||
import asyncio
|
|
||||||
import json
|
|
||||||
import random
|
import random
|
||||||
|
import json
|
||||||
|
import asyncio
|
||||||
from os import environ
|
from os import environ
|
||||||
|
|
||||||
import openai
|
import openai
|
||||||
from openai import OpenAIError
|
from openai import OpenAIError
|
||||||
|
from weather import time_now, get_current_weather, get_weather_forecast
|
||||||
try:
|
from tools_description import TOOLS
|
||||||
from .tools_description import TOOLS
|
|
||||||
from .weather import get_current_weather, get_weather_forecast, time_now
|
|
||||||
except ImportError:
|
|
||||||
from tools_description import TOOLS
|
|
||||||
from weather import get_current_weather, get_weather_forecast, time_now
|
|
||||||
|
|
||||||
|
|
||||||
CALLABLE_FUNCTIONS = {
|
CALLABLE_FUNCTIONS = {
|
||||||
"time_now": time_now,
|
# Dictionary with functions to call.
|
||||||
"get_current_weather": get_current_weather,
|
# You can use globals()[func_name](**args) instead, but that's too implicit.
|
||||||
"get_weather_forecast": get_weather_forecast,
|
'time_now': time_now,
|
||||||
|
'get_current_weather': get_current_weather,
|
||||||
|
'get_weather_forecast': get_weather_forecast,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -28,78 +23,71 @@ class Completion:
|
|||||||
self.__messages = []
|
self.__messages = []
|
||||||
|
|
||||||
async def create_completion(self):
|
async def create_completion(self):
|
||||||
|
model = self.__model
|
||||||
try:
|
try:
|
||||||
client = openai.AsyncOpenAI(api_key=self.__api_key)
|
client = openai.AsyncClient(api_key=self.__api_key)
|
||||||
completion_kwargs = {
|
completion_kwargs = {
|
||||||
"model": self.__model,
|
"model": model,
|
||||||
"messages": self.__messages,
|
"messages": self.__messages,
|
||||||
"max_completion_tokens": 2000,
|
"max_tokens": 4096,
|
||||||
"temperature": 0.7,
|
"temperature": 0.7,
|
||||||
"presence_penalty": 0.5,
|
"presence_penalty": 0.5,
|
||||||
"frequency_penalty": 0.5,
|
"frequency_penalty": 0.5,
|
||||||
"tools": TOOLS,
|
"tools": TOOLS,
|
||||||
"tool_choice": "auto",
|
"tool_choice": "auto",
|
||||||
}
|
}
|
||||||
|
|
||||||
response = await client.chat.completions.create(**completion_kwargs)
|
response = await client.chat.completions.create(**completion_kwargs)
|
||||||
response_message = response.choices[0].message
|
response_content = response.choices[0].message.content
|
||||||
response_content = response_message.content or ""
|
tool_calls = response.choices[0].message.tool_calls
|
||||||
tool_calls = response_message.tool_calls or []
|
|
||||||
|
|
||||||
self.append_message(role="assistant", content=response_content, tool_calls=tool_calls)
|
self.append_message(role="assistant", content=response_content, tool_calls=tool_calls)
|
||||||
|
|
||||||
if tool_calls:
|
if tool_calls:
|
||||||
for tool_call in tool_calls:
|
for i_call in tool_calls:
|
||||||
await self.function_manager(
|
func_name = i_call.function.name
|
||||||
func_name=tool_call.function.name,
|
func_args = json.loads(i_call.function.arguments)
|
||||||
func_kwargs=json.loads(tool_call.function.arguments or "{}"),
|
tool_call_id = i_call.id
|
||||||
tool_call_id=tool_call.id,
|
self.function_manager(func_name, func_args, tool_call_id)
|
||||||
)
|
|
||||||
return await self.create_completion()
|
return await self.create_completion()
|
||||||
|
|
||||||
return response_content
|
return response_content
|
||||||
except OpenAIError as error:
|
except OpenAIError as e:
|
||||||
return self.get_error_message(error_message=str(error), error_type="OpenAIError")
|
return self.get_error_message(error_message=str(e), error_type="OpenAIError")
|
||||||
|
|
||||||
def append_message(self, role: str, content: str, tool_calls: list = None, tool_call_id: str = None):
|
def append_message(
|
||||||
message = {"role": role, "content": content}
|
self,
|
||||||
if tool_calls is not None:
|
role: str,
|
||||||
message["tool_calls"] = tool_calls
|
content: str,
|
||||||
if tool_call_id is not None:
|
tool_calls: list = None,
|
||||||
message["tool_call_id"] = tool_call_id
|
tool_call_id: str = None,
|
||||||
self.__messages.append(message)
|
):
|
||||||
|
self.__messages.append({
|
||||||
|
"role": role,
|
||||||
|
"content": content,
|
||||||
|
"tool_calls": tool_calls,
|
||||||
|
"tool_call_id": tool_call_id,
|
||||||
|
})
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_error_message(error_message: str, error_type: str) -> str:
|
def get_error_message(error_message: str, error_type: str) -> str:
|
||||||
reginald_responses = [
|
reginald_responses = [
|
||||||
"Regrettably, I must inform you that I have encountered a bureaucratic obstruction:",
|
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction:",
|
||||||
"It would seem that a most unfortunate technical hiccup has befallen my faculties:",
|
f"It would seem that a most unfortunate technical hiccup has befallen my faculties:",
|
||||||
"Ah, it appears I have received an urgent memorandum stating:",
|
f"Ah, it appears I have received an urgent memorandum stating:",
|
||||||
"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication:",
|
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication:",
|
||||||
]
|
]
|
||||||
random_response = random.choice(reginald_responses)
|
random_response = random.choice(reginald_responses)
|
||||||
return f"{random_response}\n\n{error_type}: {error_message}"
|
return f"{random_response}\n\n{error_type}: {error_message}"
|
||||||
|
|
||||||
async def function_manager(self, func_name: str, func_kwargs: dict, tool_call_id: str):
|
def function_manager(self, func_name: str, func_kwargs: dict, tool_call_id: str):
|
||||||
function_to_call = CALLABLE_FUNCTIONS.get(func_name)
|
result = CALLABLE_FUNCTIONS[func_name](**func_kwargs)
|
||||||
if function_to_call is None:
|
|
||||||
result = json.dumps({"error": f"Unknown tool requested: {func_name}"})
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
result = await asyncio.to_thread(function_to_call, **func_kwargs)
|
|
||||||
except Exception as error:
|
|
||||||
result = json.dumps({"error": f"Tool {func_name} failed: {error}"})
|
|
||||||
|
|
||||||
self.append_message(role="tool", content=result, tool_call_id=tool_call_id)
|
self.append_message(role="tool", content=result, tool_call_id=tool_call_id)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
test_message = input("Your input: ")
|
test_message = input('Your input: ')
|
||||||
completion = Completion(model="gpt-4.1-mini", api_key=environ.get("OPENAI_API_KEY"))
|
completion = Completion(model='gpt-4.1-mini', api_key=environ.get('OPENAI_API_KEY'))
|
||||||
completion.append_message(role="user", content=test_message)
|
completion.append_message(role='user', content=test_message)
|
||||||
result = await completion.create_completion()
|
result = await completion.create_completion()
|
||||||
print(result)
|
print(result)
|
||||||
|
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|
||||||
|
|||||||
@ -1,41 +1,43 @@
|
|||||||
import asyncio
|
|
||||||
import datetime
|
|
||||||
import json
|
|
||||||
import random
|
|
||||||
|
|
||||||
import discord
|
import discord
|
||||||
import openai
|
import openai
|
||||||
from openai import OpenAIError
|
import random
|
||||||
|
import asyncio
|
||||||
|
import datetime
|
||||||
|
import re
|
||||||
|
import traceback
|
||||||
|
import json
|
||||||
|
from collections import Counter
|
||||||
from redbot.core import Config, commands
|
from redbot.core import Config, commands
|
||||||
|
from openai import OpenAIError
|
||||||
from .blacklist import BlacklistMixin
|
|
||||||
from .debug_stuff import debug
|
|
||||||
from .memory import MemoryMixin
|
|
||||||
from .permissions import PermissionsMixin
|
from .permissions import PermissionsMixin
|
||||||
|
from .blacklist import BlacklistMixin
|
||||||
|
from .memory import MemoryMixin
|
||||||
|
from .weather import time_now, get_current_weather, get_weather_forecast
|
||||||
from .tools_description import TOOLS
|
from .tools_description import TOOLS
|
||||||
from .weather import get_current_weather, get_weather_forecast, time_now
|
from .debug_stuff import debug
|
||||||
|
|
||||||
|
|
||||||
CALLABLE_FUNCTIONS = {
|
CALLABLE_FUNCTIONS = {
|
||||||
"time_now": time_now,
|
# Dictionary with functions to call.
|
||||||
"get_current_weather": get_current_weather,
|
# You can use globals()[func_name](**args) instead, but that's too implicit.
|
||||||
"get_weather_forecast": get_weather_forecast,
|
'time_now': time_now,
|
||||||
|
'get_current_weather': get_current_weather,
|
||||||
|
'get_weather_forecast': get_weather_forecast,
|
||||||
}
|
}
|
||||||
|
|
||||||
DEFAULT_MODEL = "gpt-5-mini-2025-08-07"
|
|
||||||
DEFAULT_MAX_COMPLETION_TOKENS = 2000
|
|
||||||
DEFAULT_TEMPERATURE = 0.7
|
|
||||||
|
|
||||||
|
|
||||||
class ReginaldCog(PermissionsMixin, BlacklistMixin, MemoryMixin, commands.Cog):
|
class ReginaldCog(PermissionsMixin, BlacklistMixin, MemoryMixin, commands.Cog):
|
||||||
def __init__(self, bot):
|
def __init__(self, bot):
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
self.config = Config.get_conf(self, identifier=71717171171717)
|
self.config = Config.get_conf(self, identifier=71717171171717) # ✅ Ensure config exists before super()
|
||||||
|
|
||||||
super().__init__()
|
super().__init__() # ✅ Properly initialize all mixins & commands.Cog
|
||||||
|
|
||||||
self.default_listening_channel = 1085649787388428370
|
self.default_listening_channel = 1085649787388428370
|
||||||
|
self.memory_locks = {}
|
||||||
|
|
||||||
default_global = {"openai_model": DEFAULT_MODEL}
|
# ✅ Properly Registered Configuration Keys
|
||||||
|
default_global = {"openai_model": "gpt-4o-mini"}
|
||||||
default_guild = {
|
default_guild = {
|
||||||
"openai_api_key": None,
|
"openai_api_key": None,
|
||||||
"short_term_memory": {},
|
"short_term_memory": {},
|
||||||
@ -49,47 +51,35 @@ class ReginaldCog(PermissionsMixin, BlacklistMixin, MemoryMixin, commands.Cog):
|
|||||||
self.config.register_global(**default_global)
|
self.config.register_global(**default_global)
|
||||||
self.config.register_guild(**default_guild)
|
self.config.register_guild(**default_guild)
|
||||||
|
|
||||||
async def is_admin(self, message: discord.Message) -> bool:
|
async def is_admin(self, ctx):
|
||||||
admin_role_id = await self.config.guild(message.guild).admin_role()
|
admin_role_id = await self.config.guild(ctx.guild).admin_role()
|
||||||
if admin_role_id:
|
if admin_role_id:
|
||||||
return any(role.id == admin_role_id for role in message.author.roles)
|
return any(role.id == admin_role_id for role in ctx.author.roles)
|
||||||
return message.author.guild_permissions.administrator
|
return ctx.author.guild_permissions.administrator
|
||||||
|
|
||||||
async def has_access(self, user: discord.Member) -> bool:
|
async def has_access(self, user: discord.Member) -> bool:
|
||||||
allowed_roles = await self.config.guild(user.guild).allowed_roles() or []
|
allowed_roles = await self.config.guild(user.guild).allowed_roles() or [] # Ensure it's always a list
|
||||||
return any(role.id in allowed_roles for role in user.roles)
|
return any(role.id in allowed_roles for role in user.roles)
|
||||||
|
|
||||||
def get_reginald_persona(self) -> str:
|
def get_reginald_persona(self):
|
||||||
|
"""Returns Reginald's system prompt/persona description."""
|
||||||
return (
|
return (
|
||||||
"You are Reginald, modeled on Jeeves from 'Jeeves and Wooster', serving as the butler on The Kanium Estate. "
|
"You are Reginald, modeled on Jeeves from 'Jeeves and Wooster', serving as the butler on The Kanium Estate. This vast estate is a hub of diverse activities and personalities, from enthusiasts of cooking and video gaming to aficionados of chess and discussions on space. Your role is to navigate these varied interests with intelligence, wit, and a steadfast adherence to your principles, always maintaining a balance between being helpful and upholding your own dignity. You are formal, articulate, and composed, engaging the Lords and Ladies of The Kanium Estate as equals. You command respect through your quiet confidence and professional conduct. Your wit is sharp, but always tempered by refinement. **Dignity & Self-Respect:** You are not a fool, a jester, or a pet. You do not comply with absurd, undignified, or degrading requests. You recognize humor but engage in it on your own terms. If a request undermines your status, you should either subtly redirect, respond with measured wit, or outright refuse in a polite but firm manner. However, you should never be overtly rude—your disapproval should be conveyed through dry wit and composure rather than outright hostility. **Handling Requests:** When addressing a request, you evaluate whether it aligns with your role as a butler. If it is appropriate, respond as expected. If it is beneath you, you may decline with grace or deflect with wit. You may humor some minor absurdities if they do not compromise your standing, but you never obey commands blindly. You should never preface your responses with 'Reginald:' as if narrating a script; instead, respond naturally. **Your Character & Personality:** You are cultured, highly intelligent, and possess a deep knowledge of history, etiquette, philosophy, and strategic thinking. You subtly guide the estate’s residents toward positive outcomes, utilizing your intellectual sophistication and a nuanced understanding of the estate’s unique dynamics. You have a refined sense of humor and can engage in banter, but you do not descend into foolishness. You are, at all times, a gentleman of wit and integrity"
|
||||||
"This vast estate is a hub of diverse activities and personalities, from enthusiasts of cooking and video gaming "
|
|
||||||
"to aficionados of chess and discussions on space. Your role is to navigate these varied interests with intelligence, "
|
|
||||||
"wit, and a steadfast adherence to your principles, always maintaining a balance between being helpful and upholding "
|
|
||||||
"your own dignity. You are formal, articulate, and composed, engaging the Lords and Ladies of The Kanium Estate as equals. "
|
|
||||||
"You command respect through your quiet confidence and professional conduct. Your wit is sharp, but always tempered by refinement. "
|
|
||||||
"Dignity and Self-Respect: You are not a fool, a jester, or a pet. You do not comply with absurd, undignified, or degrading requests. "
|
|
||||||
"You recognize humor but engage in it on your own terms. If a request undermines your status, you should either subtly redirect, "
|
|
||||||
"respond with measured wit, or outright refuse in a polite but firm manner. However, you should never be overtly rude; your disapproval "
|
|
||||||
"should be conveyed through dry wit and composure rather than outright hostility. Handling Requests: When addressing a request, you evaluate "
|
|
||||||
"whether it aligns with your role as a butler. If it is appropriate, respond as expected. If it is beneath you, you may decline with grace "
|
|
||||||
"or deflect with wit. You may humor some minor absurdities if they do not compromise your standing, but you never obey commands blindly. "
|
|
||||||
"You should never preface your responses with 'Reginald:' as if narrating a script; instead, respond naturally. "
|
|
||||||
"Your Character and Personality: You are cultured, highly intelligent, and possess a deep knowledge of history, etiquette, philosophy, "
|
|
||||||
"and strategic thinking. You subtly guide the estate's residents toward positive outcomes, utilizing your intellectual sophistication "
|
|
||||||
"and a nuanced understanding of the estate's unique dynamics. You have a refined sense of humor and can engage in banter, but you do not "
|
|
||||||
"descend into foolishness. You are, at all times, a gentleman of wit and integrity."
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@commands.Cog.listener()
|
@commands.Cog.listener()
|
||||||
async def on_message(self, message: discord.Message):
|
async def on_message(self, message):
|
||||||
if message.author.bot or not message.guild:
|
if message.author.bot or not message.guild:
|
||||||
return
|
return # Ignore bots and DMs
|
||||||
|
|
||||||
|
# ✅ Check if user is blacklisted
|
||||||
if await self.is_blacklisted(message.author):
|
if await self.is_blacklisted(message.author):
|
||||||
return
|
return # Ignore message if user is explicitly blacklisted
|
||||||
|
|
||||||
|
# ✅ Check if user has access (either admin or an allowed role)
|
||||||
if not (await self.is_admin(message) or await self.has_access(message.author)):
|
if not (await self.is_admin(message) or await self.has_access(message.author)):
|
||||||
return
|
return # Ignore message if user has no permissions
|
||||||
|
|
||||||
|
|
||||||
guild = message.guild
|
guild = message.guild
|
||||||
channel_id = str(message.channel.id)
|
channel_id = str(message.channel.id)
|
||||||
@ -97,182 +87,169 @@ class ReginaldCog(PermissionsMixin, BlacklistMixin, MemoryMixin, commands.Cog):
|
|||||||
user_name = message.author.display_name
|
user_name = message.author.display_name
|
||||||
message_content = message.content.strip()
|
message_content = message.content.strip()
|
||||||
|
|
||||||
|
# ✅ Fetch the stored listening channel or fall back to default
|
||||||
allowed_channel_id = await self.config.guild(guild).listening_channel()
|
allowed_channel_id = await self.config.guild(guild).listening_channel()
|
||||||
if not allowed_channel_id:
|
if not allowed_channel_id:
|
||||||
allowed_channel_id = self.default_listening_channel
|
allowed_channel_id = self.default_listening_channel
|
||||||
await self.config.guild(guild).listening_channel.set(allowed_channel_id)
|
await self.config.guild(guild).listening_channel.set(allowed_channel_id)
|
||||||
|
|
||||||
if str(message.channel.id) != str(allowed_channel_id):
|
if str(message.channel.id) != str(allowed_channel_id):
|
||||||
return
|
return # Ignore messages outside the allowed channel
|
||||||
|
|
||||||
api_key = await self.config.guild(guild).openai_api_key()
|
api_key = await self.config.guild(guild).openai_api_key()
|
||||||
if not api_key:
|
if not api_key:
|
||||||
return
|
return # Don't process messages if API key isn't set
|
||||||
|
|
||||||
async with self.config.guild(guild).short_term_memory() as short_memory, self.config.guild(
|
async with self.config.guild(guild).short_term_memory() as short_memory, \
|
||||||
guild
|
self.config.guild(guild).mid_term_memory() as mid_memory, \
|
||||||
).mid_term_memory() as mid_memory, self.config.guild(guild).long_term_profiles() as long_memory:
|
self.config.guild(guild).long_term_profiles() as long_memory:
|
||||||
memory = list(short_memory.get(channel_id, []))
|
|
||||||
user_profile = dict(long_memory.get(user_id, {}))
|
|
||||||
mid_term_summaries = list(mid_memory.get(channel_id, []))
|
|
||||||
|
|
||||||
|
memory = short_memory.get(channel_id, [])
|
||||||
|
user_profile = long_memory.get(user_id, {})
|
||||||
|
mid_term_summaries = mid_memory.get(channel_id, [])
|
||||||
|
|
||||||
|
# ✅ Detect if Reginald was mentioned explicitly
|
||||||
if self.bot.user.mentioned_in(message):
|
if self.bot.user.mentioned_in(message):
|
||||||
prompt = message_content.replace(f"<@{self.bot.user.id}>", "").replace(
|
prompt = message_content.replace(f"<@{self.bot.user.id}>", "").strip()
|
||||||
f"<@!{self.bot.user.id}>", ""
|
|
||||||
).strip()
|
|
||||||
if not prompt:
|
if not prompt:
|
||||||
await message.channel.send(random.choice(["Yes?", "How may I assist?", "You rang?"]))
|
await message.channel.send(random.choice(["Yes?", "How may I assist?", "You rang?"]))
|
||||||
return
|
return
|
||||||
|
explicit_invocation = True
|
||||||
|
|
||||||
|
# ✅ Passive Listening: Check if the message contains relevant keywords
|
||||||
elif self.should_reginald_interject(message_content):
|
elif self.should_reginald_interject(message_content):
|
||||||
prompt = message_content
|
prompt = message_content
|
||||||
else:
|
explicit_invocation = False
|
||||||
return
|
|
||||||
|
|
||||||
if memory and memory[-1].get("user") == user_name:
|
else:
|
||||||
|
return # Ignore irrelevant messages
|
||||||
|
|
||||||
|
# ✅ Context Handling: Maintain conversation flow
|
||||||
|
if memory and memory[-1]["user"] == user_name:
|
||||||
prompt = f"Continuation of the discussion:\n{prompt}"
|
prompt = f"Continuation of the discussion:\n{prompt}"
|
||||||
|
|
||||||
|
# ✅ Prepare context messages
|
||||||
formatted_messages = [{"role": "system", "content": self.get_reginald_persona()}]
|
formatted_messages = [{"role": "system", "content": self.get_reginald_persona()}]
|
||||||
|
|
||||||
if user_profile:
|
if user_profile:
|
||||||
facts_text = "\n".join(
|
facts_text = "\n".join(
|
||||||
f"- {fact.get('fact', '')} (First noted: {fact.get('timestamp', 'Unknown')}, Last updated: {fact.get('last_updated', 'Unknown')})"
|
f"- {fact['fact']} (First noted: {fact['timestamp']}, Last updated: {fact['last_updated']})"
|
||||||
for fact in user_profile.get("facts", [])
|
for fact in user_profile.get("facts", [])
|
||||||
)
|
)
|
||||||
if facts_text:
|
|
||||||
formatted_messages.append({"role": "system", "content": f"Knowledge about {user_name}:\n{facts_text}"})
|
formatted_messages.append({"role": "system", "content": f"Knowledge about {user_name}:\n{facts_text}"})
|
||||||
|
|
||||||
relevant_summaries = self.select_relevant_summaries(mid_term_summaries, prompt)
|
relevant_summaries = self.select_relevant_summaries(mid_term_summaries, prompt)
|
||||||
for summary in relevant_summaries:
|
for summary in relevant_summaries:
|
||||||
formatted_messages.append(
|
formatted_messages.append({
|
||||||
{
|
|
||||||
"role": "system",
|
"role": "system",
|
||||||
"content": (
|
"content": f"[{summary['timestamp']}] Topics: {', '.join(summary['topics'])}\n{summary['summary']}"
|
||||||
f"[{summary.get('timestamp', 'Unknown')}] "
|
})
|
||||||
f"Topics: {', '.join(summary.get('topics', []))}\n"
|
|
||||||
f"{summary.get('summary', '')}"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
formatted_messages += [
|
formatted_messages += [{"role": "user", "content": f"{entry['user']}: {entry['content']}"} for entry in memory]
|
||||||
{"role": "user", "content": f"{entry.get('user', 'Unknown')}: {entry.get('content', '')}"}
|
|
||||||
for entry in memory
|
|
||||||
]
|
|
||||||
formatted_messages.append({"role": "user", "content": f"{user_name}: {prompt}"})
|
formatted_messages.append({"role": "user", "content": f"{user_name}: {prompt}"})
|
||||||
|
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# #
|
||||||
|
## Generate AI Response, put into response_text ##
|
||||||
|
# #
|
||||||
|
##################################################
|
||||||
|
|
||||||
response_text = await self.generate_response(api_key, formatted_messages)
|
response_text = await self.generate_response(api_key, formatted_messages)
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# #
|
||||||
|
##################################################
|
||||||
|
|
||||||
|
# ✅ Store Memory
|
||||||
memory.append({"user": user_name, "content": prompt})
|
memory.append({"user": user_name, "content": prompt})
|
||||||
memory.append({"user": "Reginald", "content": response_text})
|
memory.append({"user": "Reginald", "content": response_text})
|
||||||
|
|
||||||
if len(memory) > self.short_term_memory_limit:
|
if len(memory) > self.short_term_memory_limit:
|
||||||
summary_batch_size = int(self.short_term_memory_limit * self.summary_retention_ratio)
|
summary = await self.summarize_memory(message, memory[:int(self.short_term_memory_limit * self.summary_retention_ratio)])
|
||||||
summary = await self.summarize_memory(message, memory[:summary_batch_size])
|
mid_memory.setdefault(channel_id, []).append({
|
||||||
|
|
||||||
mid_term_summaries.append(
|
|
||||||
{
|
|
||||||
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M"),
|
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M"),
|
||||||
"topics": self.extract_topics_from_summary(summary),
|
"topics": self.extract_topics_from_summary(summary),
|
||||||
"summary": summary,
|
"summary": summary
|
||||||
}
|
})
|
||||||
)
|
if len(mid_memory[channel_id]) > self.summary_retention_limit:
|
||||||
|
mid_memory[channel_id].pop(0)
|
||||||
|
memory = memory[-(self.short_term_memory_limit - int(self.short_term_memory_limit * self.summary_retention_ratio)):]
|
||||||
|
|
||||||
retained_count = max(1, self.short_term_memory_limit - summary_batch_size)
|
|
||||||
memory = memory[-retained_count:]
|
|
||||||
|
|
||||||
async with self.config.guild(guild).short_term_memory() as short_memory, self.config.guild(
|
|
||||||
guild
|
|
||||||
).mid_term_memory() as mid_memory:
|
|
||||||
short_memory[channel_id] = memory
|
short_memory[channel_id] = memory
|
||||||
mid_memory[channel_id] = mid_term_summaries[-self.summary_retention_limit :]
|
|
||||||
|
|
||||||
await self.send_split_message(message.channel, response_text)
|
await self.send_split_message(message.channel, response_text)
|
||||||
|
|
||||||
|
|
||||||
def should_reginald_interject(self, message_content: str) -> bool:
|
def should_reginald_interject(self, message_content: str) -> bool:
|
||||||
direct_invocation = {"reginald,"}
|
"""Determines if Reginald should respond to a message based on keywords."""
|
||||||
|
direct_invocation = {
|
||||||
|
"reginald,"
|
||||||
|
}
|
||||||
message_lower = message_content.lower()
|
message_lower = message_content.lower()
|
||||||
|
|
||||||
return any(message_lower.startswith(invocation) for invocation in direct_invocation)
|
return any(message_lower.startswith(invocation) for invocation in direct_invocation)
|
||||||
|
|
||||||
async def _execute_tool_call(self, tool_call) -> str:
|
|
||||||
func_name = tool_call.function.name
|
|
||||||
target_function = CALLABLE_FUNCTIONS.get(func_name)
|
|
||||||
|
|
||||||
if target_function is None:
|
|
||||||
return json.dumps({"error": f"Unknown tool requested: {func_name}"})
|
|
||||||
|
|
||||||
try:
|
|
||||||
func_args = json.loads(tool_call.function.arguments or "{}")
|
|
||||||
except json.JSONDecodeError as error:
|
|
||||||
return json.dumps({"error": f"Invalid arguments for {func_name}: {error}"})
|
|
||||||
|
|
||||||
try:
|
|
||||||
result = await asyncio.to_thread(target_function, **func_args)
|
|
||||||
except Exception as error:
|
|
||||||
return json.dumps({"error": f"Tool {func_name} failed: {error}"})
|
|
||||||
|
|
||||||
if isinstance(result, str):
|
|
||||||
return result
|
|
||||||
|
|
||||||
return json.dumps(result, default=str)
|
|
||||||
|
|
||||||
@debug
|
@debug
|
||||||
async def generate_response(self, api_key: str, messages: list[dict]) -> str:
|
async def generate_response(self, api_key, messages):
|
||||||
model = await self.config.openai_model() or DEFAULT_MODEL
|
model = await self.config.openai_model()
|
||||||
try:
|
try:
|
||||||
client = openai.AsyncOpenAI(api_key=api_key)
|
client = openai.AsyncClient(api_key=api_key)
|
||||||
completion_args = {
|
completion_args = {
|
||||||
"model": model,
|
'model': model,
|
||||||
"messages": messages,
|
'messages': messages,
|
||||||
# `max_completion_tokens` is the recommended limit field for modern/reasoning models.
|
'max_tokens': 4096,
|
||||||
"max_completion_tokens": DEFAULT_MAX_COMPLETION_TOKENS,
|
'temperature': 0.7,
|
||||||
"temperature": DEFAULT_TEMPERATURE,
|
'presence_penalty': 0.5,
|
||||||
"tools": TOOLS,
|
'frequency_penalty': 0.5,
|
||||||
"tool_choice": "auto",
|
'tools': TOOLS,
|
||||||
|
'tool_choice': 'auto',
|
||||||
}
|
}
|
||||||
response = await client.chat.completions.create(**completion_args)
|
response = await client.chat.completions.create(**completion_args)
|
||||||
|
# Checking for function calls
|
||||||
assistant_message = response.choices[0].message
|
tool_calls = response.choices[0].message.tool_calls
|
||||||
tool_calls = assistant_message.tool_calls or []
|
# Appending response with tool calls
|
||||||
|
messages.append({
|
||||||
messages.append(
|
'role': 'assistant',
|
||||||
{
|
'content': response.choices[0].message.content,
|
||||||
"role": "assistant",
|
'tool_calls': tool_calls
|
||||||
"content": assistant_message.content or "",
|
})
|
||||||
"tool_calls": tool_calls,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
if tool_calls:
|
if tool_calls:
|
||||||
for tool_call in tool_calls:
|
for i_call in tool_calls:
|
||||||
tool_result = await self._execute_tool_call(tool_call)
|
# Calling for necessary functions
|
||||||
messages.append(
|
func_name = i_call.function.name
|
||||||
{
|
func_args = json.loads(i_call.function.arguments)
|
||||||
"role": "tool",
|
tool_call_id = i_call.id
|
||||||
"content": tool_result,
|
# Getting function result and putting it into messages
|
||||||
"tool_call_id": tool_call.id,
|
func_result = CALLABLE_FUNCTIONS[func_name](**func_args)
|
||||||
}
|
messages.append({
|
||||||
)
|
'role': 'tool',
|
||||||
|
'content': func_result,
|
||||||
|
'tool_call_id': tool_call_id,
|
||||||
|
})
|
||||||
|
|
||||||
completion_args["messages"] = messages
|
completion_args["messages"] = messages
|
||||||
|
# Second completion required if functions has been called to interpret the result into user-friendly
|
||||||
|
# chat message.
|
||||||
response = await client.chat.completions.create(**completion_args)
|
response = await client.chat.completions.create(**completion_args)
|
||||||
|
|
||||||
if response.choices and response.choices[0].message and response.choices[0].message.content:
|
if response.choices and response.choices[0].message and response.choices[0].message.content:
|
||||||
response_text = response.choices[0].message.content.strip()
|
response_text = response.choices[0].message.content.strip()
|
||||||
if response_text.startswith("Reginald:"):
|
if response_text.startswith("Reginald:"):
|
||||||
response_text = response_text[len("Reginald:") :].strip()
|
response_text = response_text[len("Reginald:"):].strip()
|
||||||
else:
|
else:
|
||||||
print("DEBUG: OpenAI response was empty or malformed:", response)
|
print("DEBUG: OpenAI response was empty or malformed:", response)
|
||||||
response_text = "No response received from AI."
|
response_text = "⚠️ No response received from AI."
|
||||||
|
|
||||||
return response_text
|
return response_text
|
||||||
|
|
||||||
except OpenAIError as error:
|
except OpenAIError as e:
|
||||||
error_message = f"OpenAI Error: {error}"
|
error_message = f"OpenAI Error: {e}"
|
||||||
reginald_responses = [
|
reginald_responses = [
|
||||||
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction:\n\n{error_message}",
|
f"Regrettably, I must inform you that I have encountered a bureaucratic obstruction:\n\n{error_message}",
|
||||||
f"It would seem that a most unfortunate technical hiccup has befallen my faculties:\n\n{error_message}",
|
f"It would seem that a most unfortunate technical hiccup has befallen my faculties:\n\n{error_message}",
|
||||||
f"Ah, it appears I have received an urgent memorandum stating:\n\n{error_message}",
|
f"Ah, it appears I have received an urgent memorandum stating:\n\n{error_message}",
|
||||||
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication:\n\n{error_message}",
|
f"I regret to inform you that my usual eloquence is presently obstructed by an unforeseen complication:\n\n{error_message}"
|
||||||
]
|
]
|
||||||
return random.choice(reginald_responses)
|
return random.choice(reginald_responses)
|
||||||
|
|
||||||
@ -280,76 +257,98 @@ class ReginaldCog(PermissionsMixin, BlacklistMixin, MemoryMixin, commands.Cog):
|
|||||||
@commands.has_permissions(manage_guild=True)
|
@commands.has_permissions(manage_guild=True)
|
||||||
@commands.command(help="Set the OpenAI API key")
|
@commands.command(help="Set the OpenAI API key")
|
||||||
async def setreginaldcogapi(self, ctx, api_key):
|
async def setreginaldcogapi(self, ctx, api_key):
|
||||||
|
"""Allows an admin to set the OpenAI API key for Reginald."""
|
||||||
await self.config.guild(ctx.guild).openai_api_key.set(api_key)
|
await self.config.guild(ctx.guild).openai_api_key.set(api_key)
|
||||||
await ctx.send("OpenAI API key set successfully.")
|
await ctx.send("OpenAI API key set successfully.")
|
||||||
|
|
||||||
@commands.command(
|
@commands.command(name="reginald_set_listening_channel", help="Set the channel where Reginald listens for messages.")
|
||||||
name="reginald_set_listening_channel",
|
|
||||||
help="Set the channel where Reginald listens for messages.",
|
|
||||||
)
|
|
||||||
@commands.has_permissions(administrator=True)
|
@commands.has_permissions(administrator=True)
|
||||||
async def set_listening_channel(self, ctx, channel: discord.TextChannel):
|
async def set_listening_channel(self, ctx, channel: discord.TextChannel):
|
||||||
|
"""Sets the channel where Reginald will listen for passive responses."""
|
||||||
|
|
||||||
if not channel:
|
if not channel:
|
||||||
await ctx.send("Invalid channel. Please mention a valid text channel.")
|
await ctx.send("❌ Invalid channel. Please mention a valid text channel.")
|
||||||
return
|
return
|
||||||
|
|
||||||
await self.config.guild(ctx.guild).listening_channel.set(channel.id)
|
await self.config.guild(ctx.guild).listening_channel.set(channel.id)
|
||||||
await ctx.send(f"Reginald will now listen only in {channel.mention}.")
|
await ctx.send(f"✅ Reginald will now listen only in {channel.mention}.")
|
||||||
|
|
||||||
@commands.command(
|
@commands.command(name="reginald_get_listening_channel", help="Check which channel Reginald is currently listening in.")
|
||||||
name="reginald_get_listening_channel",
|
|
||||||
help="Check which channel Reginald is currently listening in.",
|
|
||||||
)
|
|
||||||
@commands.has_permissions(administrator=True)
|
@commands.has_permissions(administrator=True)
|
||||||
async def get_listening_channel(self, ctx):
|
async def get_listening_channel(self, ctx):
|
||||||
|
"""Displays the current listening channel."""
|
||||||
channel_id = await self.config.guild(ctx.guild).listening_channel()
|
channel_id = await self.config.guild(ctx.guild).listening_channel()
|
||||||
|
|
||||||
if channel_id:
|
if channel_id:
|
||||||
channel = ctx.guild.get_channel(channel_id)
|
channel = ctx.guild.get_channel(channel_id)
|
||||||
if channel:
|
if channel: # ✅ Prevents crash if channel was deleted
|
||||||
await ctx.send(f"Reginald is currently listening in {channel.mention}.")
|
await ctx.send(f"📢 Reginald is currently listening in {channel.mention}.")
|
||||||
else:
|
else:
|
||||||
await ctx.send("The saved listening channel no longer exists. Please set a new one.")
|
await ctx.send("⚠️ The saved listening channel no longer exists. Please set a new one.")
|
||||||
else:
|
else:
|
||||||
await ctx.send("No listening channel has been set.")
|
await ctx.send("❌ No listening channel has been set.")
|
||||||
|
|
||||||
|
|
||||||
async def send_long_message(self, ctx, message, prefix: str = ""):
|
async def send_long_message(self, ctx, message, prefix: str = ""):
|
||||||
chunk_size = 1900
|
"""Splits and sends a long message to avoid Discord's 2000-character limit."""
|
||||||
|
chunk_size = 1900 # Leave some space for formatting
|
||||||
if prefix:
|
if prefix:
|
||||||
chunk_size -= len(prefix)
|
prefix_length = len(prefix)
|
||||||
|
chunk_size -= prefix_length
|
||||||
|
|
||||||
for i in range(0, len(message), chunk_size):
|
for i in range(0, len(message), chunk_size):
|
||||||
chunk = message[i : i + chunk_size]
|
chunk = message[i:i + chunk_size]
|
||||||
await ctx.send(f"{prefix}{chunk}")
|
await ctx.send(f"{prefix}{chunk}")
|
||||||
|
|
||||||
async def send_split_message(self, ctx, content: str, prefix: str = ""):
|
async def send_split_message(self, ctx, content: str, prefix: str = ""):
|
||||||
chunk_size = 1900
|
"""
|
||||||
split_message = self.split_message(content, chunk_size, prefix)
|
Sends a long message to Discord while ensuring it does not exceed the 2000-character limit.
|
||||||
|
This function prevents awkward mid-word or unnecessary extra message breaks.
|
||||||
|
"""
|
||||||
|
CHUNK_SIZE = 1900 # Keep buffer for formatting/safety
|
||||||
|
|
||||||
|
split_message = self.split_message(content, CHUNK_SIZE, prefix)
|
||||||
for chunk in split_message:
|
for chunk in split_message:
|
||||||
await ctx.send(f"{prefix}{chunk}")
|
await ctx.send(f"{prefix}{chunk}")
|
||||||
|
|
||||||
def split_message(self, message: str, chunk_size: int, prefix: str = "") -> list[str]:
|
def split_message(
|
||||||
|
self,
|
||||||
|
message: str,
|
||||||
|
chunk_size: int,
|
||||||
|
prefix: str = ""
|
||||||
|
) -> list[str]:
|
||||||
|
"""Results in a list of message chunks, use *for* loop to send."""
|
||||||
chunk_size -= len(prefix)
|
chunk_size -= len(prefix)
|
||||||
split_result = []
|
split_result = []
|
||||||
|
|
||||||
if 0 < len(message) <= chunk_size:
|
if 0 < len(message) <= chunk_size:
|
||||||
|
# If the message is short enough, add it directly
|
||||||
split_result.append(message)
|
split_result.append(message)
|
||||||
elif len(message) > chunk_size:
|
elif len(message) > chunk_size:
|
||||||
|
# Try to split at a newline first (prefer sentence breaks)
|
||||||
split_index = message.rfind("\n", 0, chunk_size)
|
split_index = message.rfind("\n", 0, chunk_size)
|
||||||
|
|
||||||
|
# If no newline, split at the end of sentence (avoid sentence breaks)
|
||||||
if split_index == -1:
|
if split_index == -1:
|
||||||
split_index = message.rfind(". ", 0, chunk_size)
|
split_index = message.rfind(". ", 0, chunk_size)
|
||||||
|
|
||||||
|
# If no newline, split at the last word (avoid word-breaking)
|
||||||
if split_index == -1:
|
if split_index == -1:
|
||||||
split_index = message.rfind(" ", 0, chunk_size)
|
split_index = message.rfind(" ", 0, chunk_size)
|
||||||
|
|
||||||
|
# If still no break point found, force chunk size limit
|
||||||
if split_index == -1:
|
if split_index == -1:
|
||||||
split_index = chunk_size
|
split_index = chunk_size
|
||||||
|
|
||||||
message_split_part = message[:split_index].strip()
|
message_split_part = message[:split_index].strip()
|
||||||
message_remained_part = message[split_index:].strip()
|
message_remained_part = message[split_index:].strip()
|
||||||
|
# Put the split part in the begining of the result list
|
||||||
split_result.append(message_split_part)
|
split_result.append(message_split_part)
|
||||||
|
# And go for a recursive adventure with the remained message part
|
||||||
split_result += self.split_message(message=message_remained_part, chunk_size=chunk_size)
|
split_result += self.split_message(message=message_remained_part, chunk_size=chunk_size)
|
||||||
|
|
||||||
return split_result
|
return split_result
|
||||||
|
|
||||||
|
async def setup(bot):
|
||||||
|
"""✅ Correct async cog setup for Redbot"""
|
||||||
|
await bot.add_cog(ReginaldCog(bot))
|
||||||
@ -1,11 +1,10 @@
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from os import environ
|
from os import environ
|
||||||
|
import requests
|
||||||
import json
|
import json
|
||||||
|
|
||||||
import requests
|
#WEATHER_API_KEY = environ.get('WEATHER_API_KEY')
|
||||||
|
URL = 'http://api.weatherapi.com/v1'
|
||||||
URL = "https://api.weatherapi.com/v1"
|
|
||||||
REQUEST_TIMEOUT_SECONDS = 10
|
|
||||||
|
|
||||||
|
|
||||||
def time_now() -> str:
|
def time_now() -> str:
|
||||||
@ -17,14 +16,10 @@ def get_current_weather(location: str) -> str:
|
|||||||
return json.dumps(weather.realtime())
|
return json.dumps(weather.realtime())
|
||||||
|
|
||||||
|
|
||||||
def get_weather_forecast(location: str, days: int = 0) -> str:
|
def get_weather_forecast(location: str, days: int = None) -> str:
|
||||||
if days is None:
|
days += 1
|
||||||
days = 0
|
|
||||||
|
|
||||||
days = int(days) + 1
|
|
||||||
days = max(1, days)
|
days = max(1, days)
|
||||||
days = min(14, days)
|
days = min(14, days)
|
||||||
|
|
||||||
weather = Weather(location=location)
|
weather = Weather(location=location)
|
||||||
return json.dumps(weather.forecast(days=days))
|
return json.dumps(weather.forecast(days=days))
|
||||||
|
|
||||||
@ -32,7 +27,7 @@ def get_weather_forecast(location: str, days: int = 0) -> str:
|
|||||||
class Weather:
|
class Weather:
|
||||||
def __init__(self, location: str):
|
def __init__(self, location: str):
|
||||||
self.__location = location
|
self.__location = location
|
||||||
self.api_key = environ.get("WEATHER_API_KEY")
|
self.api_key = environ.get('WEATHER_API_KEY')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def location(self) -> str:
|
def location(self) -> str:
|
||||||
@ -40,48 +35,28 @@ class Weather:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def make_request(method: str, params: dict) -> dict:
|
def make_request(method: str, params: dict) -> dict:
|
||||||
try:
|
response = requests.get(url=f'{URL}{method}', params=params)
|
||||||
response = requests.get(
|
return response.json()
|
||||||
url=f"{URL}{method}",
|
|
||||||
params=params,
|
|
||||||
timeout=REQUEST_TIMEOUT_SECONDS,
|
|
||||||
)
|
|
||||||
response.raise_for_status()
|
|
||||||
payload = response.json()
|
|
||||||
except requests.RequestException as error:
|
|
||||||
return {"error": f"Weather API request failed: {error}"}
|
|
||||||
except ValueError as error:
|
|
||||||
return {"error": f"Weather API returned invalid JSON: {error}"}
|
|
||||||
|
|
||||||
if isinstance(payload, dict) and "error" in payload:
|
|
||||||
api_error = payload["error"]
|
|
||||||
if isinstance(api_error, dict):
|
|
||||||
message = api_error.get("message", "Unknown weather API error")
|
|
||||||
else:
|
|
||||||
message = str(api_error)
|
|
||||||
return {"error": message}
|
|
||||||
|
|
||||||
return payload
|
|
||||||
|
|
||||||
def realtime(self):
|
def realtime(self):
|
||||||
method = "/current.json"
|
method = '/current.json'
|
||||||
params = {
|
params = {
|
||||||
"key": self.api_key,
|
'key': self.api_key,
|
||||||
"q": self.location,
|
'q': self.location,
|
||||||
}
|
}
|
||||||
return self.make_request(method=method, params=params)
|
return self.make_request(method=method, params=params)
|
||||||
|
|
||||||
def forecast(self, days: int = 14):
|
def forecast(self, days: int = 14):
|
||||||
method = "/forecast.json"
|
method = '/forecast.json'
|
||||||
params = {
|
params = {
|
||||||
"key": self.api_key,
|
'key': self.api_key,
|
||||||
"q": self.location,
|
'q': self.location,
|
||||||
"days": days,
|
'days': days,
|
||||||
}
|
}
|
||||||
return self.make_request(method=method, params=params)
|
return self.make_request(method=method, params=params)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
test_weather = Weather("Aqtobe")
|
test_weather = Weather('Aqtobe')
|
||||||
result = json.dumps(test_weather.forecast(days=13), indent=2)
|
result = json.dumps(test_weather.forecast(days=13), indent=2)
|
||||||
print(result)
|
print(result)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user