import asyncio import random import json from typing import Dict, Any, List, Optional, Set, Tuple from datetime import datetime, timedelta from dataclasses import dataclass, asdict from enum import Enum import logging from ..database.connection import get_db_session from ..database.models import Character as CharacterModel, Conversation, Message, Memory from ..characters.character import Character from ..characters.enhanced_character import EnhancedCharacter from ..llm.client import llm_client, prompt_manager from ..llm.prompt_manager import advanced_prompt_manager from ..utils.config import get_settings, get_character_settings from ..utils.logging import (log_conversation_event, log_character_action, log_autonomous_decision, log_error_with_context) from sqlalchemy import select, and_, or_, func, desc logger = logging.getLogger(__name__) class ConversationState(Enum): IDLE = "idle" STARTING = "starting" ACTIVE = "active" WINDING_DOWN = "winding_down" PAUSED = "paused" STOPPED = "stopped" @dataclass class ConversationContext: conversation_id: Optional[int] = None topic: str = "" participants: List[str] = None message_count: int = 0 start_time: datetime = None last_activity: datetime = None current_speaker: Optional[str] = None conversation_type: str = "general" energy_level: float = 1.0 def __post_init__(self): if self.participants is None: self.participants = [] if self.start_time is None: self.start_time = datetime.utcnow() if self.last_activity is None: self.last_activity = datetime.utcnow() class ConversationEngine: """Autonomous conversation engine that manages character interactions""" def __init__(self, vector_store=None, memory_sharing_manager=None, creative_manager=None, mcp_servers=None): self.settings = get_settings() self.character_settings = get_character_settings() # RAG and collaboration systems self.vector_store = vector_store self.memory_sharing_manager = memory_sharing_manager self.creative_manager = creative_manager self.mcp_servers = mcp_servers or [] # Engine state self.state = ConversationState.IDLE self.characters: Dict[str, Character] = {} self.active_conversations: Dict[int, ConversationContext] = {} self.discord_bot = None # Scheduling self.scheduler_task = None self.conversation_task = None self.is_paused = False # Configuration self.min_delay = self.settings.conversation.min_delay_seconds self.max_delay = self.settings.conversation.max_delay_seconds self.max_conversation_length = self.settings.conversation.max_conversation_length self.quiet_hours = ( self.settings.conversation.quiet_hours_start, self.settings.conversation.quiet_hours_end ) # Conversation topics self.available_topics = self.character_settings.conversation_topics # Statistics self.stats = { 'conversations_started': 0, 'messages_generated': 0, 'characters_active': 0, 'uptime_start': datetime.utcnow(), 'last_activity': datetime.utcnow() } async def initialize(self, discord_bot): """Initialize the conversation engine""" try: self.discord_bot = discord_bot # Load characters from database await self._load_characters() # Start scheduler self.scheduler_task = asyncio.create_task(self._scheduler_loop()) # Start main conversation loop self.conversation_task = asyncio.create_task(self._conversation_loop()) self.state = ConversationState.IDLE log_conversation_event( 0, "engine_initialized", list(self.characters.keys()), {"character_count": len(self.characters)} ) except Exception as e: log_error_with_context(e, {"component": "conversation_engine_init"}) raise async def start_conversation(self, topic: str = None, forced_participants: List[str] = None) -> Optional[int]: """Start a new conversation""" try: if self.is_paused or self.state == ConversationState.STOPPED: return None # Check if it's quiet hours if self._is_quiet_hours(): return None # Select topic if not topic: topic = await self._select_conversation_topic() # Select participants participants = forced_participants or await self._select_participants(topic) if len(participants) < 2: logger.warning("Not enough participants for conversation") return None # Create conversation in database conversation_id = await self._create_conversation(topic, participants) # Create conversation context context = ConversationContext( conversation_id=conversation_id, topic=topic, participants=participants, conversation_type=await self._determine_conversation_type(topic) ) self.active_conversations[conversation_id] = context # Choose initial speaker initial_speaker = await self._choose_initial_speaker(participants, topic) # Generate opening message opening_message = await self._generate_opening_message(initial_speaker, topic, context) if opening_message: # Send message via Discord bot await self.discord_bot.send_character_message( initial_speaker, opening_message, conversation_id ) # Update context context.current_speaker = initial_speaker context.message_count = 1 context.last_activity = datetime.utcnow() # Store message in database await self._store_conversation_message( conversation_id, initial_speaker, opening_message ) # Update statistics self.stats['conversations_started'] += 1 self.stats['messages_generated'] += 1 self.stats['last_activity'] = datetime.utcnow() log_conversation_event( conversation_id, "conversation_started", participants, {"topic": topic, "initial_speaker": initial_speaker} ) return conversation_id return None except Exception as e: log_error_with_context(e, { "topic": topic, "participants": forced_participants }) return None async def continue_conversation(self, conversation_id: int) -> bool: """Continue an active conversation""" try: if conversation_id not in self.active_conversations: return False context = self.active_conversations[conversation_id] # Check if conversation should continue if not await self._should_continue_conversation(context): await self._end_conversation(conversation_id) return False # Choose next speaker next_speaker = await self._choose_next_speaker(context) if not next_speaker: await self._end_conversation(conversation_id) return False # Generate response response = await self._generate_response(next_speaker, context) if response: # Send message await self.discord_bot.send_character_message( next_speaker, response, conversation_id ) # Update context context.current_speaker = next_speaker context.message_count += 1 context.last_activity = datetime.utcnow() # Store message await self._store_conversation_message( conversation_id, next_speaker, response ) # Update character relationships await self._update_character_relationships(context, next_speaker, response) # Store memories await self._store_conversation_memories(context, next_speaker, response) # Update statistics self.stats['messages_generated'] += 1 self.stats['last_activity'] = datetime.utcnow() log_conversation_event( conversation_id, "message_sent", [next_speaker], {"message_length": len(response), "total_messages": context.message_count} ) return True return False except Exception as e: log_error_with_context(e, {"conversation_id": conversation_id}) return False async def handle_external_mention(self, message_content: str, mentioned_characters: List[str], author: str): """Handle external mentions of characters""" try: for character_name in mentioned_characters: if character_name in self.characters: character = self.characters[character_name] # Decide if character should respond context = { 'type': 'external_mention', 'content': message_content, 'author': author, 'participants': [character_name] } should_respond, reason = await character.should_respond(context) if should_respond: # Generate response response = await character.generate_response(context) if response: await self.discord_bot.send_character_message( character_name, response ) log_character_action( character_name, "responded_to_mention", {"author": author, "response_length": len(response)} ) except Exception as e: log_error_with_context(e, { "mentioned_characters": mentioned_characters, "author": author }) async def handle_external_engagement(self, message_content: str, author: str): """Handle external user trying to engage characters""" try: # Randomly select a character to respond if self.characters: responding_character = random.choice(list(self.characters.values())) context = { 'type': 'external_engagement', 'content': message_content, 'author': author, 'participants': [responding_character.name] } should_respond, reason = await responding_character.should_respond(context) if should_respond: response = await responding_character.generate_response(context) if response: await self.discord_bot.send_character_message( responding_character.name, response ) # Possibly start a conversation with other characters if random.random() < 0.4: # 40% chance await self.start_conversation( topic=f"Discussion prompted by: {message_content[:50]}..." ) except Exception as e: log_error_with_context(e, {"author": author}) async def trigger_conversation(self, topic: str = None): """Manually trigger a conversation""" try: conversation_id = await self.start_conversation(topic) if conversation_id: log_conversation_event( conversation_id, "manually_triggered", self.active_conversations[conversation_id].participants, {"topic": topic} ) return conversation_id return None except Exception as e: log_error_with_context(e, {"topic": topic}) return None async def pause(self): """Pause the conversation engine""" self.is_paused = True self.state = ConversationState.PAUSED logger.info("Conversation engine paused") async def resume(self): """Resume the conversation engine""" self.is_paused = False self.state = ConversationState.IDLE logger.info("Conversation engine resumed") async def stop(self): """Stop the conversation engine""" self.state = ConversationState.STOPPED # Cancel tasks if self.scheduler_task: self.scheduler_task.cancel() if self.conversation_task: self.conversation_task.cancel() # End all active conversations for conversation_id in list(self.active_conversations.keys()): await self._end_conversation(conversation_id) logger.info("Conversation engine stopped") async def get_status(self) -> Dict[str, Any]: """Get engine status""" uptime = datetime.utcnow() - self.stats['uptime_start'] return { 'status': self.state.value, 'is_paused': self.is_paused, 'active_conversations': len(self.active_conversations), 'loaded_characters': len(self.characters), 'uptime': str(uptime), 'stats': self.stats.copy(), 'next_conversation_in': await self._time_until_next_conversation() } async def _load_characters(self): """Load characters from database""" try: async with get_db_session() as session: query = select(CharacterModel).where(CharacterModel.is_active == True) character_models = await session.scalars(query) for char_model in character_models: # Use EnhancedCharacter if RAG systems are available if self.vector_store and self.memory_sharing_manager: # Find the appropriate MCP servers for this character from ..mcp.self_modification_server import mcp_server from ..mcp.file_system_server import filesystem_server # Find creative projects MCP server creative_projects_mcp = None for mcp_srv in self.mcp_servers: if hasattr(mcp_srv, 'creative_manager'): creative_projects_mcp = mcp_srv break character = EnhancedCharacter( character_data=char_model, vector_store=self.vector_store, mcp_server=mcp_server, filesystem=filesystem_server, memory_sharing_manager=self.memory_sharing_manager, creative_projects_mcp=creative_projects_mcp ) # Set character context for MCP servers for mcp_srv in self.mcp_servers: if hasattr(mcp_srv, 'set_character_context'): await mcp_srv.set_character_context(char_model.name) await character.initialize(llm_client) logger.info(f"Loaded enhanced character: {character.name}") else: # Fallback to basic character character = Character(char_model) await character.initialize(llm_client) logger.info(f"Loaded basic character: {character.name}") self.characters[character.name] = character self.stats['characters_active'] = len(self.characters) logger.info(f"Loaded {len(self.characters)} characters") except Exception as e: log_error_with_context(e, {"component": "character_loading"}) raise async def _scheduler_loop(self): """Main scheduler loop for autonomous conversations""" try: while self.state != ConversationState.STOPPED: if not self.is_paused and self.state == ConversationState.IDLE: # Check if we should start a conversation if await self._should_start_conversation(): await self.start_conversation() # Check for conversation continuations for conversation_id in list(self.active_conversations.keys()): if await self._should_continue_conversation_now(conversation_id): await self.continue_conversation(conversation_id) # Random delay between checks delay = random.uniform(self.min_delay, self.max_delay) await asyncio.sleep(delay) except asyncio.CancelledError: logger.info("Scheduler loop cancelled") except Exception as e: log_error_with_context(e, {"component": "scheduler_loop"}) async def _conversation_loop(self): """Main conversation management loop""" try: while self.state != ConversationState.STOPPED: # Periodic character self-reflection if random.random() < 0.1: # 10% chance per cycle await self._trigger_character_reflection() # Cleanup old conversations await self._cleanup_old_conversations() # Wait before next cycle await asyncio.sleep(60) # Check every minute except asyncio.CancelledError: logger.info("Conversation loop cancelled") except Exception as e: log_error_with_context(e, {"component": "conversation_loop"}) async def _should_start_conversation(self) -> bool: """Determine if a new conversation should start""" # Don't start if too many active conversations if len(self.active_conversations) >= 2: return False # Don't start during quiet hours if self._is_quiet_hours(): return False # Random chance based on activity level base_chance = 0.3 # Increase chance if no recent activity time_since_last = datetime.utcnow() - self.stats['last_activity'] if time_since_last > timedelta(hours=2): base_chance += 0.4 elif time_since_last > timedelta(hours=1): base_chance += 0.2 return random.random() < base_chance async def _should_continue_conversation(self, context: ConversationContext) -> bool: """Determine if conversation should continue""" # Check message limit if context.message_count >= self.max_conversation_length: return False # Check time limit (conversations shouldn't go on forever) duration = datetime.utcnow() - context.start_time if duration > timedelta(hours=2): return False # Check if it's quiet hours if self._is_quiet_hours(): return False # Check energy level if context.energy_level < 0.2: return False # Random natural ending chance if context.message_count > 10 and random.random() < 0.1: return False return True async def _should_continue_conversation_now(self, conversation_id: int) -> bool: """Check if conversation should continue right now""" if conversation_id not in self.active_conversations: return False context = self.active_conversations[conversation_id] # Check time since last message time_since_last = datetime.utcnow() - context.last_activity min_wait = timedelta(seconds=random.uniform(30, 120)) return time_since_last >= min_wait async def _select_conversation_topic(self) -> str: """Select a topic for conversation""" return random.choice(self.available_topics) async def _select_participants(self, topic: str) -> List[str]: """Select participants for a conversation""" interested_characters = [] # Find characters interested in the topic for character in self.characters.values(): if await character._is_interested_in_topic(topic): interested_characters.append(character.name) # If not enough interested characters, add random ones if len(interested_characters) < 2: all_characters = list(self.characters.keys()) random.shuffle(all_characters) for char_name in all_characters: if char_name not in interested_characters: interested_characters.append(char_name) if len(interested_characters) >= 3: break # Select 2-3 participants num_participants = min(random.randint(2, 3), len(interested_characters)) return random.sample(interested_characters, num_participants) def _is_quiet_hours(self) -> bool: """Check if it's currently quiet hours""" current_hour = datetime.now().hour start_hour, end_hour = self.quiet_hours if start_hour <= end_hour: return start_hour <= current_hour <= end_hour else: # Spans midnight return current_hour >= start_hour or current_hour <= end_hour async def _time_until_next_conversation(self) -> str: """Calculate time until next conversation attempt""" if self.is_paused or self._is_quiet_hours(): return "Paused or quiet hours" # This is a simple estimate next_check = random.uniform(self.min_delay, self.max_delay) return f"{int(next_check)} seconds" async def _create_conversation(self, topic: str, participants: List[str]) -> int: """Create conversation in database""" try: async with get_db_session() as session: conversation = Conversation( channel_id=str(self.discord_bot.channel_id), topic=topic, participants=participants, start_time=datetime.utcnow(), last_activity=datetime.utcnow(), is_active=True, message_count=0 ) session.add(conversation) await session.commit() return conversation.id except Exception as e: log_error_with_context(e, {"topic": topic, "participants": participants}) raise async def _determine_conversation_type(self, topic: str) -> str: """Determine conversation type based on topic""" topic_lower = topic.lower() if any(word in topic_lower for word in ['art', 'music', 'creative', 'design']): return 'creative' elif any(word in topic_lower for word in ['problem', 'solve', 'analyze', 'think']): return 'analytical' elif any(word in topic_lower for word in ['feel', 'emotion', 'experience', 'personal']): return 'emotional' elif any(word in topic_lower for word in ['philosophy', 'meaning', 'existence', 'consciousness']): return 'philosophical' else: return 'general' async def _choose_initial_speaker(self, participants: List[str], topic: str) -> str: """Choose who should start the conversation""" scores = {} for participant in participants: if participant in self.characters: character = self.characters[participant] score = 0.5 # Base score # Higher score if interested in topic if await character._is_interested_in_topic(topic): score += 0.3 # Higher score if character is extraverted if 'extraverted' in character.personality.lower() or 'outgoing' in character.personality.lower(): score += 0.2 scores[participant] = score # Choose participant with highest score (with some randomness) if scores: weighted_choices = [(name, score) for name, score in scores.items()] return random.choices([name for name, _ in weighted_choices], weights=[score for _, score in weighted_choices])[0] return random.choice(participants) async def _generate_opening_message(self, speaker: str, topic: str, context: ConversationContext) -> Optional[str]: """Generate opening message for conversation""" if speaker not in self.characters: return None character = self.characters[speaker] prompt_context = { 'type': 'initiation', 'topic': topic, 'participants': context.participants, 'conversation_type': context.conversation_type } return await character.generate_response(prompt_context) async def _choose_next_speaker(self, context: ConversationContext) -> Optional[str]: """Choose next speaker in conversation""" participants = context.participants current_speaker = context.current_speaker # Don't let same character speak twice in a row (unless only one participant) if len(participants) > 1: available = [p for p in participants if p != current_speaker] else: available = participants if not available: return None # Score each potential speaker scores = {} for participant in available: if participant in self.characters: character = self.characters[participant] # Base response probability should_respond, _ = await character.should_respond({ 'type': 'conversation_continue', 'topic': context.topic, 'participants': context.participants, 'message_count': context.message_count }) scores[participant] = 1.0 if should_respond else 0.3 if not scores: return random.choice(available) # Choose weighted random speaker weighted_choices = [(name, score) for name, score in scores.items()] return random.choices([name for name, _ in weighted_choices], weights=[score for _, score in weighted_choices])[0] async def _generate_response(self, speaker: str, context: ConversationContext) -> Optional[str]: """Generate response for speaker in conversation""" if speaker not in self.characters: return None character = self.characters[speaker] # Get conversation history conversation_history = await self._get_conversation_history(context.conversation_id, limit=10) prompt_context = { 'type': 'response', 'topic': context.topic, 'participants': context.participants, 'conversation_history': conversation_history, 'conversation_type': context.conversation_type, 'message_count': context.message_count } return await character.generate_response(prompt_context) async def _store_conversation_message(self, conversation_id: int, character_name: str, content: str): """Store conversation message in database""" try: async with get_db_session() as session: # Get character character_query = select(CharacterModel).where(CharacterModel.name == character_name) character = await session.scalar(character_query) if character: message = Message( conversation_id=conversation_id, character_id=character.id, content=content, timestamp=datetime.utcnow() ) session.add(message) await session.commit() except Exception as e: log_error_with_context(e, {"conversation_id": conversation_id, "character_name": character_name}) async def _get_conversation_history(self, conversation_id: int, limit: int = 10) -> List[Dict[str, Any]]: """Get recent conversation history""" try: async with get_db_session() as session: query = select(Message, CharacterModel.name).join( CharacterModel, Message.character_id == CharacterModel.id ).where( Message.conversation_id == conversation_id ).order_by(desc(Message.timestamp)).limit(limit) results = await session.execute(query) history = [] for message, character_name in results: history.append({ 'character': character_name, 'content': message.content, 'timestamp': message.timestamp }) return list(reversed(history)) # Return in chronological order except Exception as e: log_error_with_context(e, {"conversation_id": conversation_id}) return [] async def _update_character_relationships(self, context: ConversationContext, speaker: str, message: str): """Update character relationships based on interaction""" try: for participant in context.participants: if participant != speaker and participant in self.characters: character = self.characters[speaker] await character.process_relationship_change( participant, 'conversation', message ) except Exception as e: log_error_with_context(e, {"speaker": speaker, "participants": context.participants}) async def _store_conversation_memories(self, context: ConversationContext, speaker: str, message: str): """Store conversation memories for character""" try: if speaker in self.characters: character = self.characters[speaker] # Store conversation memory await character._store_memory( memory_type="conversation", content=f"In conversation about {context.topic}: {message}", importance=0.6, tags=[context.topic, "conversation"] + context.participants ) except Exception as e: log_error_with_context(e, {"speaker": speaker, "topic": context.topic}) async def _end_conversation(self, conversation_id: int): """End a conversation""" try: if conversation_id in self.active_conversations: context = self.active_conversations[conversation_id] # Update conversation in database async with get_db_session() as session: conversation = await session.get(Conversation, conversation_id) if conversation: conversation.is_active = False conversation.last_activity = datetime.utcnow() conversation.message_count = context.message_count await session.commit() # Remove from active conversations del self.active_conversations[conversation_id] log_conversation_event( conversation_id, "conversation_ended", context.participants, {"total_messages": context.message_count, "duration": str(datetime.utcnow() - context.start_time)} ) except Exception as e: log_error_with_context(e, {"conversation_id": conversation_id}) async def _trigger_character_reflection(self): """Trigger reflection for a random character""" if self.characters: character_name = random.choice(list(self.characters.keys())) character = self.characters[character_name] reflection_result = await character.self_reflect() if reflection_result: log_character_action( character_name, "completed_reflection", {"reflection_length": len(reflection_result.get('reflection', ''))} ) async def _cleanup_old_conversations(self): """Clean up old inactive conversations""" try: cutoff_time = datetime.utcnow() - timedelta(hours=6) # Remove old conversations from active list to_remove = [] for conv_id, context in self.active_conversations.items(): if context.last_activity < cutoff_time: to_remove.append(conv_id) for conv_id in to_remove: await self._end_conversation(conv_id) except Exception as e: log_error_with_context(e, {"component": "conversation_cleanup"})