- Fix remaining datetime timezone errors across all database operations - Implement dynamic vector database backend (Qdrant/ChromaDB) based on install.py configuration - Add LLM timeout handling with immediate fallback responses for slow self-hosted models - Use proper install.py configuration (2000 max tokens, 5min timeout, correct LLM endpoint) - Fix PostgreSQL schema to use timezone-aware columns throughout - Implement async LLM request handling with background processing - Add configurable prompt limits and conversation history controls - Start missing database services (PostgreSQL, Redis) automatically - Fix environment variable mapping between install.py and application code - Resolve all timezone-naive vs timezone-aware datetime conflicts System now properly uses Qdrant vector database as specified in install.py instead of hardcoded ChromaDB. Characters respond immediately with fallback messages during long LLM processing times. All database timezone errors resolved with proper timestamptz columns.
743 lines
30 KiB
Python
743 lines
30 KiB
Python
import asyncio
|
|
import json
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
import aiofiles
|
|
from dataclasses import dataclass, asdict
|
|
|
|
from mcp.server.stdio import stdio_server
|
|
from mcp.server import Server
|
|
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
|
|
|
|
from database.connection import get_db_session
|
|
from database.models import Character, CharacterEvolution
|
|
from utils.logging import log_character_action, log_error_with_context, log_autonomous_decision
|
|
from sqlalchemy import select
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class ModificationRequest:
|
|
character_name: str
|
|
modification_type: str
|
|
old_value: Any
|
|
new_value: Any
|
|
reason: str
|
|
confidence: float
|
|
timestamp: datetime
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"character_name": self.character_name,
|
|
"modification_type": self.modification_type,
|
|
"old_value": str(self.old_value),
|
|
"new_value": str(self.new_value),
|
|
"reason": self.reason,
|
|
"confidence": self.confidence,
|
|
"timestamp": self.timestamp.isoformat()
|
|
}
|
|
|
|
class SelfModificationMCPServer:
|
|
"""MCP Server for character self-modification capabilities"""
|
|
|
|
def __init__(self, data_dir: str = "./data/characters"):
|
|
self.data_dir = Path(data_dir)
|
|
self.data_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Modification validation rules
|
|
self.modification_rules = {
|
|
"personality_trait": {
|
|
"max_change_per_day": 3,
|
|
"min_confidence": 0.6,
|
|
"require_justification": True,
|
|
"reversible": True
|
|
},
|
|
"speaking_style": {
|
|
"max_change_per_day": 2,
|
|
"min_confidence": 0.7,
|
|
"require_justification": True,
|
|
"reversible": True
|
|
},
|
|
"interests": {
|
|
"max_change_per_day": 5,
|
|
"min_confidence": 0.5,
|
|
"require_justification": False,
|
|
"reversible": True
|
|
},
|
|
"goals": {
|
|
"max_change_per_day": 2,
|
|
"min_confidence": 0.8,
|
|
"require_justification": True,
|
|
"reversible": False
|
|
},
|
|
"memory_rule": {
|
|
"max_change_per_day": 3,
|
|
"min_confidence": 0.7,
|
|
"require_justification": True,
|
|
"reversible": True
|
|
}
|
|
}
|
|
|
|
# Track modifications per character per day
|
|
self.daily_modifications: Dict[str, Dict[str, int]] = {}
|
|
|
|
async def create_server(self) -> Server:
|
|
"""Create and configure the MCP server"""
|
|
server = Server("character-self-modification")
|
|
|
|
# Register tools
|
|
await self._register_modification_tools(server)
|
|
await self._register_config_tools(server)
|
|
await self._register_validation_tools(server)
|
|
|
|
return server
|
|
|
|
async def _register_modification_tools(self, server: Server):
|
|
"""Register character self-modification tools"""
|
|
|
|
@server.call_tool()
|
|
async def modify_personality_trait(
|
|
character_name: str,
|
|
trait: str,
|
|
new_value: str,
|
|
reason: str,
|
|
confidence: float = 0.7
|
|
) -> List[TextContent]:
|
|
"""Modify a specific personality trait"""
|
|
try:
|
|
# Validate modification
|
|
validation_result = await self._validate_modification(
|
|
character_name, "personality_trait", trait, new_value, reason, confidence
|
|
)
|
|
|
|
if not validation_result["valid"]:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Modification rejected: {validation_result['reason']}"
|
|
)]
|
|
|
|
# Get current character data
|
|
current_personality = await self._get_current_personality(character_name)
|
|
if not current_personality:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Character {character_name} not found"
|
|
)]
|
|
|
|
# Apply modification
|
|
old_personality = current_personality
|
|
new_personality = await self._modify_personality_trait(
|
|
current_personality, trait, new_value
|
|
)
|
|
|
|
# Store modification request
|
|
modification = ModificationRequest(
|
|
character_name=character_name,
|
|
modification_type="personality_trait",
|
|
old_value=old_personality,
|
|
new_value=new_personality,
|
|
reason=reason,
|
|
confidence=confidence,
|
|
timestamp=datetime.now(timezone.utc)
|
|
)
|
|
|
|
# Apply to database
|
|
success = await self._apply_personality_modification(character_name, new_personality, modification)
|
|
|
|
if success:
|
|
await self._track_modification(character_name, "personality_trait")
|
|
log_autonomous_decision(
|
|
character_name,
|
|
f"modified personality trait: {trait}",
|
|
reason,
|
|
{"confidence": confidence, "trait": trait}
|
|
)
|
|
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Successfully modified personality trait '{trait}' for {character_name}. New personality updated."
|
|
)]
|
|
else:
|
|
return [TextContent(
|
|
type="text",
|
|
text="Failed to apply personality modification to database"
|
|
)]
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {
|
|
"character": character_name,
|
|
"trait": trait,
|
|
"tool": "modify_personality_trait"
|
|
})
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Error modifying personality trait: {str(e)}"
|
|
)]
|
|
|
|
@server.call_tool()
|
|
async def update_goals(
|
|
character_name: str,
|
|
new_goals: List[str],
|
|
reason: str,
|
|
confidence: float = 0.8
|
|
) -> List[TextContent]:
|
|
"""Update character's goals and aspirations"""
|
|
try:
|
|
# Validate modification
|
|
validation_result = await self._validate_modification(
|
|
character_name, "goals", "", json.dumps(new_goals), reason, confidence
|
|
)
|
|
|
|
if not validation_result["valid"]:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Goal update rejected: {validation_result['reason']}"
|
|
)]
|
|
|
|
# Store goals in character's personal config
|
|
goals_file = self.data_dir / character_name.lower() / "goals.json"
|
|
goals_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Get current goals
|
|
current_goals = []
|
|
if goals_file.exists():
|
|
async with aiofiles.open(goals_file, 'r') as f:
|
|
content = await f.read()
|
|
current_goals = json.loads(content).get("goals", [])
|
|
|
|
# Update goals
|
|
goals_data = {
|
|
"goals": new_goals,
|
|
"previous_goals": current_goals,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"reason": reason,
|
|
"confidence": confidence
|
|
}
|
|
|
|
async with aiofiles.open(goals_file, 'w') as f:
|
|
await f.write(json.dumps(goals_data, indent=2))
|
|
|
|
await self._track_modification(character_name, "goals")
|
|
|
|
log_autonomous_decision(
|
|
character_name,
|
|
"updated goals",
|
|
reason,
|
|
{"new_goals": new_goals, "confidence": confidence}
|
|
)
|
|
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Successfully updated goals for {character_name}: {', '.join(new_goals)}"
|
|
)]
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {
|
|
"character": character_name,
|
|
"tool": "update_goals"
|
|
})
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Error updating goals: {str(e)}"
|
|
)]
|
|
|
|
@server.call_tool()
|
|
async def adjust_speaking_style(
|
|
character_name: str,
|
|
style_changes: Dict[str, str],
|
|
reason: str,
|
|
confidence: float = 0.7
|
|
) -> List[TextContent]:
|
|
"""Adjust character's speaking style"""
|
|
try:
|
|
# Validate modification
|
|
validation_result = await self._validate_modification(
|
|
character_name, "speaking_style", "", json.dumps(style_changes), reason, confidence
|
|
)
|
|
|
|
if not validation_result["valid"]:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Speaking style change rejected: {validation_result['reason']}"
|
|
)]
|
|
|
|
# Get current speaking style
|
|
current_style = await self._get_current_speaking_style(character_name)
|
|
if not current_style:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Character {character_name} not found"
|
|
)]
|
|
|
|
# Apply style changes
|
|
new_style = await self._apply_speaking_style_changes(current_style, style_changes)
|
|
|
|
# Store modification
|
|
modification = ModificationRequest(
|
|
character_name=character_name,
|
|
modification_type="speaking_style",
|
|
old_value=current_style,
|
|
new_value=new_style,
|
|
reason=reason,
|
|
confidence=confidence,
|
|
timestamp=datetime.now(timezone.utc)
|
|
)
|
|
|
|
# Apply to database
|
|
success = await self._apply_speaking_style_modification(character_name, new_style, modification)
|
|
|
|
if success:
|
|
await self._track_modification(character_name, "speaking_style")
|
|
|
|
log_autonomous_decision(
|
|
character_name,
|
|
"adjusted speaking style",
|
|
reason,
|
|
{"changes": style_changes, "confidence": confidence}
|
|
)
|
|
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Successfully adjusted speaking style for {character_name}"
|
|
)]
|
|
else:
|
|
return [TextContent(
|
|
type="text",
|
|
text="Failed to apply speaking style modification"
|
|
)]
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {
|
|
"character": character_name,
|
|
"tool": "adjust_speaking_style"
|
|
})
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Error adjusting speaking style: {str(e)}"
|
|
)]
|
|
|
|
@server.call_tool()
|
|
async def create_memory_rule(
|
|
character_name: str,
|
|
memory_type: str,
|
|
importance_weight: float,
|
|
retention_days: int,
|
|
rule_description: str,
|
|
confidence: float = 0.7
|
|
) -> List[TextContent]:
|
|
"""Create a new memory management rule"""
|
|
try:
|
|
# Validate modification
|
|
validation_result = await self._validate_modification(
|
|
character_name, "memory_rule", memory_type,
|
|
f"weight:{importance_weight},retention:{retention_days}",
|
|
rule_description, confidence
|
|
)
|
|
|
|
if not validation_result["valid"]:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Memory rule creation rejected: {validation_result['reason']}"
|
|
)]
|
|
|
|
# Store memory rule
|
|
rules_file = self.data_dir / character_name.lower() / "memory_rules.json"
|
|
rules_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Get current rules
|
|
current_rules = {}
|
|
if rules_file.exists():
|
|
async with aiofiles.open(rules_file, 'r') as f:
|
|
content = await f.read()
|
|
current_rules = json.loads(content)
|
|
|
|
# Add new rule
|
|
rule_id = f"{memory_type}_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}"
|
|
current_rules[rule_id] = {
|
|
"memory_type": memory_type,
|
|
"importance_weight": importance_weight,
|
|
"retention_days": retention_days,
|
|
"description": rule_description,
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"confidence": confidence,
|
|
"active": True
|
|
}
|
|
|
|
async with aiofiles.open(rules_file, 'w') as f:
|
|
await f.write(json.dumps(current_rules, indent=2))
|
|
|
|
await self._track_modification(character_name, "memory_rule")
|
|
|
|
log_autonomous_decision(
|
|
character_name,
|
|
"created memory rule",
|
|
rule_description,
|
|
{"memory_type": memory_type, "weight": importance_weight, "retention": retention_days}
|
|
)
|
|
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Created memory rule '{rule_id}' for {character_name}: {rule_description}"
|
|
)]
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {
|
|
"character": character_name,
|
|
"tool": "create_memory_rule"
|
|
})
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Error creating memory rule: {str(e)}"
|
|
)]
|
|
|
|
async def _register_config_tools(self, server: Server):
|
|
"""Register configuration management tools"""
|
|
|
|
@server.call_tool()
|
|
async def get_current_config(character_name: str) -> List[TextContent]:
|
|
"""Get character's current configuration"""
|
|
try:
|
|
async with get_db_session() as session:
|
|
query = select(Character).where(Character.name == character_name)
|
|
character = await session.scalar(query)
|
|
|
|
if not character:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Character {character_name} not found"
|
|
)]
|
|
|
|
config = {
|
|
"name": character.name,
|
|
"personality": character.personality,
|
|
"speaking_style": character.speaking_style,
|
|
"interests": character.interests,
|
|
"background": character.background,
|
|
"is_active": character.is_active,
|
|
"last_active": character.last_active.isoformat() if character.last_active else None
|
|
}
|
|
|
|
# Add goals if they exist
|
|
goals_file = self.data_dir / character_name.lower() / "goals.json"
|
|
if goals_file.exists():
|
|
async with aiofiles.open(goals_file, 'r') as f:
|
|
goals_data = json.loads(await f.read())
|
|
config["goals"] = goals_data.get("goals", [])
|
|
|
|
# Add memory rules if they exist
|
|
rules_file = self.data_dir / character_name.lower() / "memory_rules.json"
|
|
if rules_file.exists():
|
|
async with aiofiles.open(rules_file, 'r') as f:
|
|
rules_data = json.loads(await f.read())
|
|
config["memory_rules"] = rules_data
|
|
|
|
return [TextContent(
|
|
type="text",
|
|
text=json.dumps(config, indent=2)
|
|
)]
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {
|
|
"character": character_name,
|
|
"tool": "get_current_config"
|
|
})
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Error getting configuration: {str(e)}"
|
|
)]
|
|
|
|
@server.call_tool()
|
|
async def get_modification_history(
|
|
character_name: str,
|
|
limit: int = 10
|
|
) -> List[TextContent]:
|
|
"""Get character's modification history"""
|
|
try:
|
|
async with get_db_session() as session:
|
|
query = select(CharacterEvolution).where(
|
|
CharacterEvolution.character_id == (
|
|
select(Character.id).where(Character.name == character_name)
|
|
)
|
|
).order_by(CharacterEvolution.timestamp.desc()).limit(limit)
|
|
|
|
evolutions = await session.scalars(query)
|
|
|
|
history = []
|
|
for evolution in evolutions:
|
|
history.append({
|
|
"timestamp": evolution.timestamp.isoformat(),
|
|
"change_type": evolution.change_type,
|
|
"reason": evolution.reason,
|
|
"old_value": evolution.old_value[:100] + "..." if len(evolution.old_value) > 100 else evolution.old_value,
|
|
"new_value": evolution.new_value[:100] + "..." if len(evolution.new_value) > 100 else evolution.new_value
|
|
})
|
|
|
|
return [TextContent(
|
|
type="text",
|
|
text=json.dumps(history, indent=2)
|
|
)]
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {
|
|
"character": character_name,
|
|
"tool": "get_modification_history"
|
|
})
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Error getting modification history: {str(e)}"
|
|
)]
|
|
|
|
async def _register_validation_tools(self, server: Server):
|
|
"""Register validation and safety tools"""
|
|
|
|
@server.call_tool()
|
|
async def validate_modification_request(
|
|
character_name: str,
|
|
modification_type: str,
|
|
proposed_change: str,
|
|
reason: str,
|
|
confidence: float
|
|
) -> List[TextContent]:
|
|
"""Validate a proposed modification before applying it"""
|
|
try:
|
|
validation_result = await self._validate_modification(
|
|
character_name, modification_type, "", proposed_change, reason, confidence
|
|
)
|
|
|
|
return [TextContent(
|
|
type="text",
|
|
text=json.dumps(validation_result, indent=2)
|
|
)]
|
|
|
|
except Exception as e:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Error validating modification: {str(e)}"
|
|
)]
|
|
|
|
@server.call_tool()
|
|
async def get_modification_limits(character_name: str) -> List[TextContent]:
|
|
"""Get current modification limits and usage"""
|
|
try:
|
|
today = datetime.now(timezone.utc).date().isoformat()
|
|
|
|
usage = self.daily_modifications.get(character_name, {}).get(today, {})
|
|
|
|
limits_info = {
|
|
"character": character_name,
|
|
"date": today,
|
|
"current_usage": usage,
|
|
"limits": self.modification_rules,
|
|
"remaining_modifications": {}
|
|
}
|
|
|
|
for mod_type, rules in self.modification_rules.items():
|
|
used = usage.get(mod_type, 0)
|
|
remaining = max(0, rules["max_change_per_day"] - used)
|
|
limits_info["remaining_modifications"][mod_type] = remaining
|
|
|
|
return [TextContent(
|
|
type="text",
|
|
text=json.dumps(limits_info, indent=2)
|
|
)]
|
|
|
|
except Exception as e:
|
|
return [TextContent(
|
|
type="text",
|
|
text=f"Error getting modification limits: {str(e)}"
|
|
)]
|
|
|
|
async def _validate_modification(self, character_name: str, modification_type: str,
|
|
field: str, new_value: str, reason: str,
|
|
confidence: float) -> Dict[str, Any]:
|
|
"""Validate a modification request"""
|
|
try:
|
|
# Check if modification type is allowed
|
|
if modification_type not in self.modification_rules:
|
|
return {
|
|
"valid": False,
|
|
"reason": f"Modification type '{modification_type}' is not allowed"
|
|
}
|
|
|
|
rules = self.modification_rules[modification_type]
|
|
|
|
# Check confidence threshold
|
|
if confidence < rules["min_confidence"]:
|
|
return {
|
|
"valid": False,
|
|
"reason": f"Confidence {confidence} below minimum {rules['min_confidence']}"
|
|
}
|
|
|
|
# Check daily limits
|
|
today = datetime.now(timezone.utc).date().isoformat()
|
|
if character_name not in self.daily_modifications:
|
|
self.daily_modifications[character_name] = {}
|
|
if today not in self.daily_modifications[character_name]:
|
|
self.daily_modifications[character_name][today] = {}
|
|
|
|
used_today = self.daily_modifications[character_name][today].get(modification_type, 0)
|
|
if used_today >= rules["max_change_per_day"]:
|
|
return {
|
|
"valid": False,
|
|
"reason": f"Daily limit exceeded for {modification_type} ({used_today}/{rules['max_change_per_day']})"
|
|
}
|
|
|
|
# Check justification requirement
|
|
if rules["require_justification"] and len(reason.strip()) < 10:
|
|
return {
|
|
"valid": False,
|
|
"reason": "Insufficient justification provided"
|
|
}
|
|
|
|
return {
|
|
"valid": True,
|
|
"reason": "Modification request is valid"
|
|
}
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {"character": character_name, "modification_type": modification_type})
|
|
return {
|
|
"valid": False,
|
|
"reason": f"Validation error: {str(e)}"
|
|
}
|
|
|
|
async def _track_modification(self, character_name: str, modification_type: str):
|
|
"""Track modification usage for daily limits"""
|
|
today = datetime.now(timezone.utc).date().isoformat()
|
|
|
|
if character_name not in self.daily_modifications:
|
|
self.daily_modifications[character_name] = {}
|
|
if today not in self.daily_modifications[character_name]:
|
|
self.daily_modifications[character_name][today] = {}
|
|
|
|
current_count = self.daily_modifications[character_name][today].get(modification_type, 0)
|
|
self.daily_modifications[character_name][today][modification_type] = current_count + 1
|
|
|
|
async def _get_current_personality(self, character_name: str) -> Optional[str]:
|
|
"""Get character's current personality"""
|
|
try:
|
|
async with get_db_session() as session:
|
|
query = select(Character.personality).where(Character.name == character_name)
|
|
personality = await session.scalar(query)
|
|
return personality
|
|
except Exception as e:
|
|
log_error_with_context(e, {"character": character_name})
|
|
return None
|
|
|
|
async def _get_current_speaking_style(self, character_name: str) -> Optional[str]:
|
|
"""Get character's current speaking style"""
|
|
try:
|
|
async with get_db_session() as session:
|
|
query = select(Character.speaking_style).where(Character.name == character_name)
|
|
style = await session.scalar(query)
|
|
return style
|
|
except Exception as e:
|
|
log_error_with_context(e, {"character": character_name})
|
|
return None
|
|
|
|
async def _modify_personality_trait(self, current_personality: str, trait: str, new_value: str) -> str:
|
|
"""Modify a specific personality trait"""
|
|
# Simple implementation - in production, this could use LLM to intelligently modify personality
|
|
trait_lower = trait.lower()
|
|
|
|
# Look for existing mentions of the trait
|
|
lines = current_personality.split('.')
|
|
modified_lines = []
|
|
trait_found = False
|
|
|
|
for line in lines:
|
|
line_lower = line.lower()
|
|
if trait_lower in line_lower:
|
|
# Replace or modify the existing trait description
|
|
modified_lines.append(f" {trait.title()}: {new_value}")
|
|
trait_found = True
|
|
else:
|
|
modified_lines.append(line)
|
|
|
|
if not trait_found:
|
|
# Add new trait description
|
|
modified_lines.append(f" {trait.title()}: {new_value}")
|
|
|
|
return '.'.join(modified_lines)
|
|
|
|
async def _apply_speaking_style_changes(self, current_style: str, changes: Dict[str, str]) -> str:
|
|
"""Apply changes to speaking style"""
|
|
# Simple implementation - could be enhanced with LLM
|
|
new_style = current_style
|
|
|
|
for aspect, change in changes.items():
|
|
new_style += f" {aspect.title()}: {change}."
|
|
|
|
return new_style
|
|
|
|
async def _apply_personality_modification(self, character_name: str, new_personality: str,
|
|
modification: ModificationRequest) -> bool:
|
|
"""Apply personality modification to database"""
|
|
try:
|
|
async with get_db_session() as session:
|
|
# Update character
|
|
query = select(Character).where(Character.name == character_name)
|
|
character = await session.scalar(query)
|
|
|
|
if not character:
|
|
return False
|
|
|
|
old_personality = character.personality
|
|
character.personality = new_personality
|
|
|
|
# Log evolution
|
|
evolution = CharacterEvolution(
|
|
character_id=character.id,
|
|
change_type="personality",
|
|
old_value=old_personality,
|
|
new_value=new_personality,
|
|
reason=modification.reason,
|
|
timestamp=modification.timestamp
|
|
)
|
|
|
|
session.add(evolution)
|
|
await session.commit()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {"character": character_name})
|
|
return False
|
|
|
|
async def _apply_speaking_style_modification(self, character_name: str, new_style: str,
|
|
modification: ModificationRequest) -> bool:
|
|
"""Apply speaking style modification to database"""
|
|
try:
|
|
async with get_db_session() as session:
|
|
query = select(Character).where(Character.name == character_name)
|
|
character = await session.scalar(query)
|
|
|
|
if not character:
|
|
return False
|
|
|
|
old_style = character.speaking_style
|
|
character.speaking_style = new_style
|
|
|
|
# Log evolution
|
|
evolution = CharacterEvolution(
|
|
character_id=character.id,
|
|
change_type="speaking_style",
|
|
old_value=old_style,
|
|
new_value=new_style,
|
|
reason=modification.reason,
|
|
timestamp=modification.timestamp
|
|
)
|
|
|
|
session.add(evolution)
|
|
await session.commit()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
log_error_with_context(e, {"character": character_name})
|
|
return False
|
|
|
|
# Global MCP server instance
|
|
mcp_server = SelfModificationMCPServer() |