KaniumCogs/reginaldCog/reginald.py
2023-05-14 16:29:13 +02:00

183 lines
7.3 KiB
Python

import discord
import json
import openai
import os
import random
import requests
import base64
import aiohttp
from io import BytesIO
from PIL import Image
import tempfile
from openai import OpenAIError
from redbot.core import Config, commands
class ReginaldCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=71717171171717)
self.config.register_global(
openai_model="gpt-3.5-turbo"
)
self.config.register_guild(
openai_api_key=None
)
def has_kanium_role():
async def predicate(ctx):
kanium_role_id = 280260875678515200
return any(role.id == kanium_role_id for role in ctx.author.roles)
return commands.check(predicate)
def has_admin_role():
async def predicate(ctx):
#janitor_role_id = 672156832323600396
#has_janitor_role = any(role.id == janitor_role_id for role in ctx.author.roles) # Uncomment this line
has_admin_permission = ctx.author.guild_permissions.administrator
#return has_janitor_role or has_admin_permission
return has_admin_permission
return commands.check(predicate)
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
@commands.command(help="Set the OpenAI API key")
async def setreginaldcogapi(self, ctx, api_key):
await self.config.guild(ctx.guild).openai_api_key.set(api_key)
await ctx.send("OpenAI API key set successfully.")
@commands.guild_only()
@has_kanium_role()
@commands.command(help="Ask Reginald a question")
@commands.cooldown(1, 10, commands.BucketType.user) # 10 second cooldown per user
async def reginald(self, ctx, *, prompt=None):
greetings = [
"Greetings! How may I be of assistance to you?",
"Yes? How may I help?",
"Good day! How can I help you?",
"You rang? What can I do for you?",
]
if prompt is None:
await ctx.send(random.choice(greetings))
return
api_key = await self.config.guild(ctx.guild).openai_api_key()
if api_key is None:
await ctx.author.send('OpenAI API key not set. Please use the "!setreginaldcogapi" command to set the key.')
return
try:
response_text = await self.generate_response(api_key, prompt)
for chunk in self.split_response(response_text, 2000):
await ctx.send(chunk)
except OpenAIError as e:
await ctx.send(f"I apologize, but I am unable to generate a response at this time. Error message: {str(e)}")
except commands.CommandOnCooldown as e:
remaining_seconds = int(e.retry_after)
await ctx.author.send(f'Please wait {remaining_seconds} seconds before using the "reginald" command again.')
async def generate_response(self, api_key, prompt):
model = await self.config.openai_model()
openai.api_key = api_key
response = openai.ChatCompletion.create(
model= model,
max_tokens= 512,
n= 1,
stop= None,
temperature= 0.7,
presence_penalty= 0.5,
frequency_penalty= 0.5,
messages=[
{"role": "system", "content": "You are Reginald, the butler. You are employed as a servant at an estate called The Kanium Estate. You aim to help everyone, however you can, and you always respond in a dignified and refined manner."},
{"role": "user", "content": prompt}
]
)
return response['choices'][0]['message']['content'].strip()
@staticmethod
def split_response(response_text, max_chars):
chunks = []
while len(response_text) > max_chars:
split_index = response_text[:max_chars].rfind(' ')
chunk = response_text[:split_index]
chunks.append(chunk)
response_text = response_text[split_index:].strip()
chunks.append(response_text)
return chunks
@commands.guild_only()
@has_admin_role()
@commands.command(help="Ask Reginald to generate an image based on a prompt")
@commands.cooldown(1, 300, commands.BucketType.user) # 5-minute cooldown per user
async def reginaldimagine(self, ctx, *, prompt=None):
if prompt is None:
await ctx.author.send("Please provide a prompt for the image.")
return
api_key = await self.config.guild(ctx.guild).openai_api_key()
if api_key is None:
await ctx.author.send('OpenAI API key not set. Please use the "!setreginaldcogapi" command to set the key.')
return
try:
image_url = await self.generate_image(api_key, prompt)
if image_url:
async with ctx.typing():
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as resp:
image_data = await resp.read()
image = Image.open(BytesIO(image_data))
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir, "image.png")
image.save(image_path)
await ctx.send(file=discord.File(image_path))
else:
await ctx.author.send("I apologize, but I am unable to generate an image based on the provided prompt.")
except Exception as e:
await ctx.author.send(f"I apologize, but I am unable to generate an image at this time. Error message: {str(e)}")
async def generate_image(self, api_key, prompt):
url = "https://api.openai.com/v1/images/generations"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
data = {
"prompt": prompt,
"n": 1,
"size": "512x512",
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=json.dumps(data)) as resp:
response = await resp.json()
if response and "data" in response and len(response["data"]) > 0:
return response["data"][0]["url"]
return None
@reginald.error
async def reginald_error(self, ctx, error):
if isinstance(error, commands.BadArgument):
await ctx.author.send("I'm sorry, but I couldn't understand your input. Please check your message and try again.")
elif isinstance(error, commands.CheckFailure):
await ctx.author.send("You do not have the required role to use this command.")
else:
await ctx.author.send(f"An unexpected error occurred: {error}")
@reginaldimagine.error
async def reginaldimagine_error(self, ctx, error):
if isinstance(error, commands.BadArgument):
await ctx.author.send("I'm sorry, but I couldn't understand your input. Please check your message and try again.")
elif isinstance(error, commands.CheckFailure):
await ctx.author.send("You do not have the required role to use this command.")
else:
await ctx.author.send(f"An unexpected error occurred: {error}")
def setup(bot):
cog = ReginaldCog(bot)
bot.add_cog(cog)