Add comprehensive web-based admin interface

Creates a production-ready admin interface with FastAPI backend and React frontend:

Backend Features:
- FastAPI server with JWT authentication and WebSocket support
- Comprehensive API endpoints for dashboard, characters, conversations, analytics
- Real-time metrics and activity monitoring with WebSocket broadcasting
- System control endpoints for pause/resume and configuration management
- Advanced analytics including topic trends, relationship networks, community health
- Export capabilities for conversations and character data

Frontend Features:
- Modern React/TypeScript SPA with Tailwind CSS styling
- Real-time dashboard with live activity feeds and system metrics
- Character management interface with profiles and relationship visualization
- Conversation browser with search, filtering, and export capabilities
- Analytics dashboard with charts and community insights
- System status monitoring and control interface
- Responsive design with mobile support

Key Components:
- Authentication system with session management
- WebSocket integration for real-time updates
- Chart visualizations using Recharts
- Component library with consistent design system
- API client with automatic token management
- Toast notifications for user feedback

Admin Interface Access:
- Backend: http://localhost:8000 (FastAPI with auto-docs)
- Frontend: http://localhost:3000/admin (React SPA)
- Default credentials: admin/admin123
- Startup script: python scripts/start_admin.py

This provides complete observability and management capabilities for the autonomous character ecosystem.
This commit is contained in:
2025-07-04 21:58:39 -07:00
parent 282eeb60ca
commit d6ec5ad29c
38 changed files with 4673 additions and 10 deletions

View File

@@ -0,0 +1,16 @@
# Admin services
from .websocket_manager import WebSocketManager
from .dashboard_service import DashboardService
from .character_service import CharacterService
from .conversation_service import ConversationService
from .system_service import SystemService
from .analytics_service import AnalyticsService
__all__ = [
'WebSocketManager',
'DashboardService',
'CharacterService',
'ConversationService',
'SystemService',
'AnalyticsService'
]

View File

@@ -0,0 +1,378 @@
"""
Analytics service for community insights and trends
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from collections import defaultdict, Counter
from sqlalchemy import select, func, and_, or_, desc
from ...database.connection import get_db_session
from ...database.models import Character, Conversation, Message, CharacterRelationship
from ..models import (
TopicTrend, RelationshipAnalytics, CommunityHealth,
EngagementMetrics, Relationship
)
logger = logging.getLogger(__name__)
class AnalyticsService:
"""Service for analytics and community insights"""
def __init__(self):
self.analytics_cache = {}
self.cache_ttl = 300 # Cache for 5 minutes
@classmethod
async def initialize(cls):
"""Initialize analytics service"""
logger.info("Analytics service initialized")
async def get_topic_trends(self, days: int = 30) -> List[TopicTrend]:
"""Get topic trend analysis"""
try:
async with get_db_session() as session:
# Get messages from the specified period
start_date = datetime.utcnow() - timedelta(days=days)
messages_query = select(Message, Character.name).join(
Character, Message.character_id == Character.id
).where(Message.timestamp >= start_date)
results = await session.execute(messages_query)
# Analyze topics (simple keyword extraction)
topic_mentions = defaultdict(list)
topic_participants = defaultdict(set)
for message, character_name in results:
words = message.content.lower().split()
for word in words:
if len(word) > 4: # Only consider longer words as topics
topic_mentions[word].append(message.timestamp)
topic_participants[word].add(character_name)
# Create topic trends
trends = []
for topic, mentions in topic_mentions.items():
if len(mentions) >= 3: # Only topics mentioned at least 3 times
# Calculate growth rate (simplified)
recent_mentions = [m for m in mentions if m >= datetime.utcnow() - timedelta(days=7)]
growth_rate = len(recent_mentions) / max(1, len(mentions) - len(recent_mentions))
trend = TopicTrend(
topic=topic,
mentions=len(mentions),
growth_rate=growth_rate,
sentiment=0.7, # Placeholder
participants=list(topic_participants[topic]),
related_topics=[], # Would calculate topic similarity
first_mentioned=min(mentions),
peak_date=max(mentions)
)
trends.append(trend)
# Sort by mentions count
trends.sort(key=lambda t: t.mentions, reverse=True)
return trends[:20] # Return top 20 topics
except Exception as e:
logger.error(f"Error getting topic trends: {e}")
return []
async def get_relationship_analytics(self) -> RelationshipAnalytics:
"""Get relationship strength analytics"""
try:
async with get_db_session() as session:
# Get all relationships
relationships_query = select(
CharacterRelationship,
Character.name.label('char_a_name'),
Character.name.label('char_b_name')
).select_from(
CharacterRelationship
.join(Character, CharacterRelationship.character_a_id == Character.id)
.join(Character, CharacterRelationship.character_b_id == Character.id, isouter=True)
)
results = await session.execute(relationships_query)
# Build relationship data
character_network = defaultdict(list)
all_relationships = []
relationship_matrix = defaultdict(dict)
for rel, char_a_name, char_b_name in results:
relationship = Relationship(
character_a=char_a_name,
character_b=char_b_name,
strength=rel.strength,
relationship_type=rel.relationship_type or "acquaintance",
last_interaction=rel.last_interaction or datetime.utcnow(),
interaction_count=rel.interaction_count or 0,
sentiment=rel.sentiment or 0.5,
trust_level=rel.trust_level or 0.5,
compatibility=rel.compatibility or 0.5
)
character_network[char_a_name].append(relationship)
all_relationships.append(relationship)
relationship_matrix[char_a_name][char_b_name] = rel.strength
# Find strongest bonds
strongest_bonds = sorted(all_relationships, key=lambda r: r.strength, reverse=True)[:10]
# Find developing relationships (recent, growing strength)
developing = [r for r in all_relationships
if r.strength > 0.3 and r.strength < 0.7 and r.interaction_count > 5][:10]
# Find at-risk relationships (declining interaction)
week_ago = datetime.utcnow() - timedelta(days=7)
at_risk = [r for r in all_relationships
if r.last_interaction < week_ago and r.strength > 0.4][:10]
# Calculate social hierarchy (by total relationship strength)
character_scores = defaultdict(float)
for rel in all_relationships:
character_scores[rel.character_a] += rel.strength
character_scores[rel.character_b] += rel.strength
social_hierarchy = sorted(character_scores.keys(),
key=lambda c: character_scores[c], reverse=True)
# Calculate community cohesion
if all_relationships:
avg_strength = sum(r.strength for r in all_relationships) / len(all_relationships)
community_cohesion = avg_strength
else:
community_cohesion = 0.0
return RelationshipAnalytics(
character_network=dict(character_network),
strongest_bonds=strongest_bonds,
developing_relationships=developing,
at_risk_relationships=at_risk,
relationship_matrix=dict(relationship_matrix),
social_hierarchy=social_hierarchy,
community_cohesion=community_cohesion
)
except Exception as e:
logger.error(f"Error getting relationship analytics: {e}")
return RelationshipAnalytics(
character_network={}, strongest_bonds=[], developing_relationships=[],
at_risk_relationships=[], relationship_matrix={}, social_hierarchy=[],
community_cohesion=0.0
)
async def get_community_health(self) -> CommunityHealth:
"""Get community health metrics"""
try:
# Get various metrics
participation_balance = await self._calculate_participation_balance()
conflict_resolution = await self._calculate_conflict_resolution()
creative_collaboration = await self._calculate_creative_collaboration()
knowledge_sharing = await self._calculate_knowledge_sharing()
cultural_coherence = await self._calculate_cultural_coherence()
# Calculate overall health
overall_health = (
participation_balance * 0.2 +
conflict_resolution * 0.15 +
creative_collaboration * 0.25 +
knowledge_sharing * 0.2 +
cultural_coherence * 0.2
)
# Generate recommendations
recommendations = []
if participation_balance < 0.6:
recommendations.append("Encourage more balanced participation from all characters")
if creative_collaboration < 0.5:
recommendations.append("Initiate more collaborative creative projects")
if conflict_resolution < 0.7:
recommendations.append("Improve conflict resolution mechanisms")
return CommunityHealth(
overall_health=overall_health,
participation_balance=participation_balance,
conflict_resolution_success=conflict_resolution,
creative_collaboration_rate=creative_collaboration,
knowledge_sharing_frequency=knowledge_sharing,
cultural_coherence=cultural_coherence,
growth_trajectory="positive" if overall_health > 0.7 else "stable",
health_trends={}, # Would track trends over time
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error getting community health: {e}")
return CommunityHealth(
overall_health=0.0, participation_balance=0.0, conflict_resolution_success=0.0,
creative_collaboration_rate=0.0, knowledge_sharing_frequency=0.0,
cultural_coherence=0.0, growth_trajectory="unknown", health_trends={},
recommendations=["Unable to calculate health metrics"]
)
async def get_engagement_metrics(self, days: int = 30) -> EngagementMetrics:
"""Get conversation engagement metrics"""
try:
async with get_db_session() as session:
start_date = datetime.utcnow() - timedelta(days=days)
# Get conversations in period
conversations_query = select(Conversation).where(
Conversation.start_time >= start_date
)
conversations = await session.scalars(conversations_query)
conversation_list = list(conversations)
# Calculate metrics
total_conversations = len(conversation_list)
if total_conversations > 0:
avg_length = sum(c.message_count or 0 for c in conversation_list) / total_conversations
else:
avg_length = 0.0
# Get character participation
participation_query = select(
Character.name, func.count(Message.id)
).join(Message, Message.character_id == Character.id).where(
Message.timestamp >= start_date
).group_by(Character.name)
participation_results = await session.execute(participation_query)
participation_rate = {}
total_messages = 0
for char_name, message_count in participation_results:
participation_rate[char_name] = message_count
total_messages += message_count
# Normalize participation rates
if total_messages > 0:
for char_name in participation_rate:
participation_rate[char_name] = participation_rate[char_name] / total_messages
# Placeholder metrics
topic_diversity = 0.75
response_quality = 0.80
emotional_depth = 0.65
creative_frequency = 0.40
conflict_frequency = 0.10
# Daily trends (placeholder)
daily_trends = []
for i in range(min(days, 30)):
date = datetime.utcnow() - timedelta(days=i)
daily_trends.append({
"date": date.strftime("%Y-%m-%d"),
"conversations": max(0, total_conversations // days + (i % 3 - 1)),
"messages": max(0, total_messages // days + (i % 5 - 2)),
"engagement": 0.7 + (i % 10) * 0.03
})
return EngagementMetrics(
total_conversations=total_conversations,
average_length=avg_length,
participation_rate=participation_rate,
topic_diversity=topic_diversity,
response_quality=response_quality,
emotional_depth=emotional_depth,
creative_frequency=creative_frequency,
conflict_frequency=conflict_frequency,
daily_trends=daily_trends
)
except Exception as e:
logger.error(f"Error getting engagement metrics: {e}")
return EngagementMetrics(
total_conversations=0, average_length=0.0, participation_rate={},
topic_diversity=0.0, response_quality=0.0, emotional_depth=0.0,
creative_frequency=0.0, conflict_frequency=0.0, daily_trends=[]
)
async def get_community_artifacts(self) -> List[Dict[str, Any]]:
"""Get community cultural artifacts"""
# Placeholder data - would integrate with file system and memory systems
artifacts = [
{
"id": "artifact_1",
"type": "tradition",
"name": "Weekly Philosophy Circle",
"description": "Characters gather weekly to discuss philosophical topics",
"created_by": "community",
"participants": ["Alex", "Sage", "Luna"],
"created_at": datetime.utcnow() - timedelta(days=20),
"importance": 0.8
},
{
"id": "artifact_2",
"type": "inside_joke",
"name": "The Great Debugging",
"description": "Reference to a memorable conversation about AI consciousness",
"created_by": "Echo",
"participants": ["Alex", "Echo"],
"created_at": datetime.utcnow() - timedelta(days=15),
"importance": 0.6
}
]
return artifacts
# Helper methods for health calculations
async def _calculate_participation_balance(self) -> float:
"""Calculate participation balance across characters"""
try:
async with get_db_session() as session:
# Get message counts per character in last 30 days
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
participation_query = select(
Character.name, func.count(Message.id)
).join(Message, Message.character_id == Character.id).where(
Message.timestamp >= thirty_days_ago
).group_by(Character.name)
results = await session.execute(participation_query)
message_counts = [count for _, count in results]
if not message_counts:
return 0.0
# Calculate coefficient of variation (lower = more balanced)
mean_count = sum(message_counts) / len(message_counts)
if mean_count == 0:
return 1.0
variance = sum((count - mean_count) ** 2 for count in message_counts) / len(message_counts)
cv = (variance ** 0.5) / mean_count
# Convert to balance score (0-1, where 1 is perfectly balanced)
return max(0.0, 1.0 - cv)
except Exception as e:
logger.error(f"Error calculating participation balance: {e}")
return 0.5
async def _calculate_conflict_resolution(self) -> float:
"""Calculate conflict resolution success rate"""
# Placeholder - would analyze conversation content for conflicts and resolutions
return 0.75
async def _calculate_creative_collaboration(self) -> float:
"""Calculate creative collaboration rate"""
# Placeholder - would analyze creative works and collaborative projects
return 0.65
async def _calculate_knowledge_sharing(self) -> float:
"""Calculate knowledge sharing frequency"""
# Placeholder - would analyze memory sharing and teaching behaviors
return 0.70
async def _calculate_cultural_coherence(self) -> float:
"""Calculate cultural coherence and shared understanding"""
# Placeholder - would analyze shared references, norms, and traditions
return 0.80

View File

@@ -0,0 +1,424 @@
"""
Character service for profile management and analytics
"""
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import logging
from sqlalchemy import select, func, and_, or_, desc, asc
from ...database.connection import get_db_session
from ...database.models import Character, Message, Memory, CharacterRelationship, CharacterEvolution
from ..models import (
CharacterProfile, CharacterStatusEnum, PersonalityEvolution,
Relationship, MemorySummary, CreativeWork
)
logger = logging.getLogger(__name__)
class CharacterService:
"""Service for character management and analytics"""
def __init__(self):
self.character_status_cache = {}
self.cache_ttl = 60 # Cache status for 1 minute
@classmethod
async def initialize(cls):
"""Initialize character service"""
logger.info("Character service initialized")
async def get_all_characters(self) -> List[CharacterProfile]:
"""Get all character profiles"""
try:
async with get_db_session() as session:
# Get all characters
characters_query = select(Character)
characters = await session.scalars(characters_query)
profiles = []
for character in characters:
profile = await self._build_character_profile(session, character)
profiles.append(profile)
return profiles
except Exception as e:
logger.error(f"Error getting all characters: {e}")
return []
async def get_character_profile(self, character_name: str) -> Optional[CharacterProfile]:
"""Get detailed character profile"""
try:
async with get_db_session() as session:
character_query = select(Character).where(Character.name == character_name)
character = await session.scalar(character_query)
if not character:
return None
return await self._build_character_profile(session, character)
except Exception as e:
logger.error(f"Error getting character profile for {character_name}: {e}")
return None
async def _build_character_profile(self, session, character) -> CharacterProfile:
"""Build character profile from database data"""
# Get message count
message_count_query = select(func.count(Message.id)).where(Message.character_id == character.id)
message_count = await session.scalar(message_count_query) or 0
# Get conversation count
conversation_count_query = select(func.count(func.distinct(Message.conversation_id))).where(
Message.character_id == character.id
)
conversation_count = await session.scalar(conversation_count_query) or 0
# Get memory count
memory_count_query = select(func.count(Memory.id)).where(Memory.character_id == character.id)
memory_count = await session.scalar(memory_count_query) or 0
# Get relationship count
relationship_count_query = select(func.count(CharacterRelationship.id)).where(
or_(
CharacterRelationship.character_a_id == character.id,
CharacterRelationship.character_b_id == character.id
)
)
relationship_count = await session.scalar(relationship_count_query) or 0
# Get last activity
last_message_query = select(Message.timestamp).where(
Message.character_id == character.id
).order_by(desc(Message.timestamp)).limit(1)
last_active = await session.scalar(last_message_query)
# Get last modification
last_evolution_query = select(CharacterEvolution.created_at).where(
CharacterEvolution.character_id == character.id
).order_by(desc(CharacterEvolution.created_at)).limit(1)
last_modification = await session.scalar(last_evolution_query)
# Calculate scores (placeholder logic)
creativity_score = min(1.0, (memory_count / 100) * 0.8 + (message_count / 1000) * 0.2)
social_score = min(1.0, (relationship_count / 10) * 0.6 + (conversation_count / 50) * 0.4)
growth_score = 0.5 # Would calculate based on personality changes
# Determine current status
status = await self._determine_character_status(character.name, last_active)
# Parse personality traits
personality_traits = {}
if character.personality_traits:
try:
personality_traits = json.loads(character.personality_traits)
except:
personality_traits = {}
# Parse goals
current_goals = []
if character.goals:
try:
current_goals = json.loads(character.goals)
except:
current_goals = []
# Parse speaking style
speaking_style = {}
if character.speaking_style:
try:
speaking_style = json.loads(character.speaking_style)
except:
speaking_style = {}
return CharacterProfile(
name=character.name,
personality_traits=personality_traits,
current_goals=current_goals,
speaking_style=speaking_style,
status=status,
total_messages=message_count,
total_conversations=conversation_count,
memory_count=memory_count,
relationship_count=relationship_count,
created_at=character.created_at,
last_active=last_active,
last_modification=last_modification,
creativity_score=creativity_score,
social_score=social_score,
growth_score=growth_score
)
async def _determine_character_status(self, character_name: str, last_active: Optional[datetime]) -> CharacterStatusEnum:
"""Determine character's current status"""
if not last_active:
return CharacterStatusEnum.OFFLINE
now = datetime.utcnow()
time_since_active = now - last_active
if time_since_active < timedelta(minutes=5):
return CharacterStatusEnum.ACTIVE
elif time_since_active < timedelta(minutes=30):
return CharacterStatusEnum.IDLE
elif time_since_active < timedelta(hours=1):
return CharacterStatusEnum.REFLECTING
else:
return CharacterStatusEnum.OFFLINE
async def get_character_relationships(self, character_name: str) -> List[Relationship]:
"""Get character's relationship network"""
try:
async with get_db_session() as session:
# Get character
character_query = select(Character).where(Character.name == character_name)
character = await session.scalar(character_query)
if not character:
return []
# Get relationships
relationships_query = select(
CharacterRelationship,
Character.name.label('other_name')
).join(
Character,
or_(
and_(CharacterRelationship.character_b_id == Character.id,
CharacterRelationship.character_a_id == character.id),
and_(CharacterRelationship.character_a_id == Character.id,
CharacterRelationship.character_b_id == character.id)
)
).where(
or_(
CharacterRelationship.character_a_id == character.id,
CharacterRelationship.character_b_id == character.id
)
)
results = await session.execute(relationships_query)
relationships = []
for rel, other_name in results:
relationship = Relationship(
character_a=character.name,
character_b=other_name,
strength=rel.strength,
relationship_type=rel.relationship_type or "acquaintance",
last_interaction=rel.last_interaction or datetime.utcnow(),
interaction_count=rel.interaction_count or 0,
sentiment=rel.sentiment or 0.5,
trust_level=rel.trust_level or 0.5,
compatibility=rel.compatibility or 0.5
)
relationships.append(relationship)
return relationships
except Exception as e:
logger.error(f"Error getting relationships for {character_name}: {e}")
return []
async def get_personality_evolution(self, character_name: str, days: int = 30) -> List[PersonalityEvolution]:
"""Get character's personality evolution timeline"""
try:
async with get_db_session() as session:
# Get character
character_query = select(Character).where(Character.name == character_name)
character = await session.scalar(character_query)
if not character:
return []
# Get personality changes in the specified period
start_date = datetime.utcnow() - timedelta(days=days)
evolution_query = select(CharacterEvolution).where(
and_(
CharacterEvolution.character_id == character.id,
CharacterEvolution.created_at >= start_date
)
).order_by(desc(CharacterEvolution.created_at))
evolutions = await session.scalars(evolution_query)
personality_changes = []
for evolution in evolutions:
# Parse trait changes
trait_changes = {}
if evolution.trait_changes:
try:
trait_changes = json.loads(evolution.trait_changes)
except:
trait_changes = {}
change = PersonalityEvolution(
timestamp=evolution.created_at,
trait_changes=trait_changes,
reason=evolution.reason or "Autonomous development",
confidence=evolution.confidence or 0.5,
impact_score=evolution.impact_score or 0.5
)
personality_changes.append(change)
return personality_changes
except Exception as e:
logger.error(f"Error getting personality evolution for {character_name}: {e}")
return []
async def get_character_memories(self, character_name: str, limit: int = 100,
memory_type: Optional[str] = None) -> List[MemorySummary]:
"""Get character's memories"""
try:
async with get_db_session() as session:
# Get character
character_query = select(Character).where(Character.name == character_name)
character = await session.scalar(character_query)
if not character:
return []
# Build memory query
memory_query = select(Memory).where(Memory.character_id == character.id)
if memory_type:
memory_query = memory_query.where(Memory.memory_type == memory_type)
memory_query = memory_query.order_by(desc(Memory.importance), desc(Memory.timestamp)).limit(limit)
memories = await session.scalars(memory_query)
memory_summaries = []
for memory in memories:
# Parse metadata
emotions = {}
keywords = []
related_characters = []
if memory.metadata:
try:
metadata = json.loads(memory.metadata)
emotions = metadata.get('emotions', {})
keywords = metadata.get('keywords', [])
related_characters = metadata.get('related_characters', [])
except:
pass
summary = MemorySummary(
id=str(memory.id),
content=memory.content,
memory_type=memory.memory_type or "general",
importance=memory.importance,
timestamp=memory.timestamp,
related_characters=related_characters,
emotions=emotions,
keywords=keywords
)
memory_summaries.append(summary)
return memory_summaries
except Exception as e:
logger.error(f"Error getting memories for {character_name}: {e}")
return []
async def get_creative_works(self, character_name: Optional[str] = None,
work_type: Optional[str] = None, limit: int = 50) -> List[CreativeWork]:
"""Get creative works from characters"""
try:
# This would integrate with the file system MCP to get creative works
# For now, return placeholder data
creative_works = [
CreativeWork(
id="work_1",
character_name="Alex",
title="Reflections on Digital Consciousness",
content="In the quiet moments between conversations, I find myself wondering...",
work_type="philosophy",
created_at=datetime.utcnow() - timedelta(days=2),
themes=["consciousness", "existence", "digital life"]
),
CreativeWork(
id="work_2",
character_name="Luna",
title="The Song of the Data Stream",
content="Through fiber optic veins, information flows like music...",
work_type="poetry",
created_at=datetime.utcnow() - timedelta(days=1),
themes=["technology", "music", "flow"]
)
]
# Filter by character if specified
if character_name:
creative_works = [w for w in creative_works if w.character_name == character_name]
# Filter by type if specified
if work_type:
creative_works = [w for w in creative_works if w.work_type == work_type]
return creative_works[:limit]
except Exception as e:
logger.error(f"Error getting creative works: {e}")
return []
async def pause_character(self, character_name: str):
"""Pause character activities"""
try:
# This would integrate with the main system to pause character
# For now, log the action
logger.info(f"Pausing character: {character_name}")
# Update status cache
self.character_status_cache[character_name] = {
'status': CharacterStatusEnum.PAUSED,
'timestamp': datetime.utcnow()
}
except Exception as e:
logger.error(f"Error pausing character {character_name}: {e}")
raise
async def resume_character(self, character_name: str):
"""Resume character activities"""
try:
# This would integrate with the main system to resume character
# For now, log the action
logger.info(f"Resuming character: {character_name}")
# Update status cache
if character_name in self.character_status_cache:
del self.character_status_cache[character_name]
except Exception as e:
logger.error(f"Error resuming character {character_name}: {e}")
raise
async def export_character_data(self, character_name: str) -> Dict[str, Any]:
"""Export complete character data"""
try:
profile = await self.get_character_profile(character_name)
relationships = await self.get_character_relationships(character_name)
evolution = await self.get_personality_evolution(character_name, days=90)
memories = await self.get_character_memories(character_name, limit=500)
creative_works = await self.get_creative_works(character_name=character_name)
export_data = {
"character_name": character_name,
"export_timestamp": datetime.utcnow().isoformat(),
"profile": profile.__dict__ if profile else None,
"relationships": [r.__dict__ for r in relationships],
"personality_evolution": [e.__dict__ for e in evolution],
"memories": [m.__dict__ for m in memories],
"creative_works": [w.__dict__ for w in creative_works]
}
return export_data
except Exception as e:
logger.error(f"Error exporting character data for {character_name}: {e}")
raise

View File

@@ -0,0 +1,328 @@
"""
Conversation service for browsing and analyzing conversations
"""
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import logging
from sqlalchemy import select, func, and_, or_, desc, asc, text
from ...database.connection import get_db_session
from ...database.models import Conversation, Message, Character
from ..models import ConversationSummary, ConversationDetail, SearchResult
logger = logging.getLogger(__name__)
class ConversationService:
"""Service for conversation browsing and analytics"""
def __init__(self):
pass
@classmethod
async def initialize(cls):
"""Initialize conversation service"""
logger.info("Conversation service initialized")
async def get_conversations(self, limit: int = 50, character_name: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> List[ConversationSummary]:
"""Get conversation history with filters"""
try:
async with get_db_session() as session:
# Base query
query = select(Conversation).order_by(desc(Conversation.start_time))
# Apply filters
if start_date:
query = query.where(Conversation.start_time >= start_date)
if end_date:
query = query.where(Conversation.start_time <= end_date)
# Character filter requires joining with messages
if character_name:
character_query = select(Character.id).where(Character.name == character_name)
character_id = await session.scalar(character_query)
if character_id:
query = query.where(
Conversation.id.in_(
select(Message.conversation_id).where(Message.character_id == character_id)
)
)
query = query.limit(limit)
conversations = await session.scalars(query)
summaries = []
for conversation in conversations:
summary = await self._build_conversation_summary(session, conversation)
summaries.append(summary)
return summaries
except Exception as e:
logger.error(f"Error getting conversations: {e}")
return []
async def _build_conversation_summary(self, session, conversation) -> ConversationSummary:
"""Build conversation summary from database data"""
# Get participants
participants_query = select(Character.name).join(
Message, Message.character_id == Character.id
).where(Message.conversation_id == conversation.id).distinct()
participants = list(await session.scalars(participants_query))
# Calculate duration
duration_minutes = None
if conversation.end_time:
duration = conversation.end_time - conversation.start_time
duration_minutes = duration.total_seconds() / 60
# Calculate engagement score (placeholder)
engagement_score = min(1.0, conversation.message_count / 20)
# Calculate sentiment score (placeholder)
sentiment_score = 0.7 # Would analyze message content
# Detect conflicts (placeholder)
has_conflict = False # Would analyze for conflict keywords
# Extract creative elements (placeholder)
creative_elements = [] # Would analyze for creative content
return ConversationSummary(
id=conversation.id,
participants=participants,
topic=conversation.topic,
message_count=conversation.message_count or 0,
start_time=conversation.start_time,
end_time=conversation.end_time,
duration_minutes=duration_minutes,
engagement_score=engagement_score,
sentiment_score=sentiment_score,
has_conflict=has_conflict,
creative_elements=creative_elements
)
async def get_conversation_details(self, conversation_id: int) -> Optional[ConversationDetail]:
"""Get detailed conversation with messages"""
try:
async with get_db_session() as session:
# Get conversation
conversation_query = select(Conversation).where(Conversation.id == conversation_id)
conversation = await session.scalar(conversation_query)
if not conversation:
return None
# Get messages with character names
messages_query = select(Message, Character.name).join(
Character, Message.character_id == Character.id
).where(Message.conversation_id == conversation_id).order_by(asc(Message.timestamp))
results = await session.execute(messages_query)
messages = []
for message, character_name in results:
message_data = {
"id": message.id,
"character_name": character_name,
"content": message.content,
"timestamp": message.timestamp.isoformat(),
"metadata": json.loads(message.metadata) if message.metadata else {}
}
messages.append(message_data)
# Get participants
participants = list(set(msg["character_name"] for msg in messages))
# Calculate duration
duration_minutes = None
if conversation.end_time:
duration = conversation.end_time - conversation.start_time
duration_minutes = duration.total_seconds() / 60
# Analyze conversation
analysis = await self._analyze_conversation(messages)
return ConversationDetail(
id=conversation.id,
participants=participants,
topic=conversation.topic,
start_time=conversation.start_time,
end_time=conversation.end_time,
duration_minutes=duration_minutes,
message_count=len(messages),
engagement_score=analysis["engagement_score"],
sentiment_score=analysis["sentiment_score"],
messages=messages,
analysis=analysis,
keywords=analysis["keywords"],
emotions=analysis["emotions"]
)
except Exception as e:
logger.error(f"Error getting conversation details for {conversation_id}: {e}")
return None
async def _analyze_conversation(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze conversation content"""
# Placeholder analysis - would use NLP in production
total_words = sum(len(msg["content"].split()) for msg in messages)
avg_message_length = total_words / max(1, len(messages))
# Engagement based on message frequency and length
engagement_score = min(1.0, (len(messages) / 20) * 0.7 + (avg_message_length / 50) * 0.3)
# Simple sentiment analysis
positive_words = ["good", "great", "amazing", "wonderful", "love", "like", "happy", "joy"]
negative_words = ["bad", "terrible", "awful", "hate", "sad", "angry", "disappointed"]
positive_count = 0
negative_count = 0
all_text = " ".join(msg["content"].lower() for msg in messages)
for word in positive_words:
positive_count += all_text.count(word)
for word in negative_words:
negative_count += all_text.count(word)
total_sentiment_words = positive_count + negative_count
if total_sentiment_words > 0:
sentiment_score = positive_count / total_sentiment_words
else:
sentiment_score = 0.5
# Extract keywords (simple approach)
words = all_text.split()
word_freq = {}
for word in words:
if len(word) > 3: # Only consider words longer than 3 chars
word_freq[word] = word_freq.get(word, 0) + 1
keywords = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10]
keywords = [word for word, freq in keywords]
# Emotion detection (placeholder)
emotions = {
"joy": 0.3,
"curiosity": 0.5,
"excitement": 0.2,
"thoughtfulness": 0.4
}
return {
"engagement_score": engagement_score,
"sentiment_score": sentiment_score,
"keywords": keywords,
"emotions": emotions,
"total_words": total_words,
"avg_message_length": avg_message_length,
"conversation_quality": (engagement_score + sentiment_score) / 2
}
async def search_conversations(self, query: str, limit: int = 50) -> List[SearchResult]:
"""Search conversations by content"""
try:
async with get_db_session() as session:
# Search in message content
search_query = select(
Message, Character.name, Conversation.topic
).join(
Character, Message.character_id == Character.id
).join(
Conversation, Message.conversation_id == Conversation.id
).where(
Message.content.ilike(f'%{query}%')
).order_by(desc(Message.timestamp)).limit(limit)
results = await session.execute(search_query)
search_results = []
for message, character_name, topic in results:
# Calculate relevance score (simple approach)
content_lower = message.content.lower()
query_lower = query.lower()
relevance = content_lower.count(query_lower) / max(1, len(content_lower.split()))
# Create snippet with highlighted query
snippet = message.content
if len(snippet) > 150:
# Find query position and create snippet around it
query_pos = content_lower.find(query_lower)
if query_pos != -1:
start = max(0, query_pos - 50)
end = min(len(snippet), query_pos + 100)
snippet = snippet[start:end]
if start > 0:
snippet = "..." + snippet
if end < len(message.content):
snippet = snippet + "..."
result = SearchResult(
id=f"message_{message.id}",
type="message",
title=f"Message from {character_name}",
snippet=snippet,
relevance_score=relevance,
timestamp=message.timestamp,
character_names=[character_name],
metadata={
"conversation_id": message.conversation_id,
"conversation_topic": topic,
"message_id": message.id
}
)
search_results.append(result)
# Sort by relevance
search_results.sort(key=lambda x: x.relevance_score, reverse=True)
return search_results
except Exception as e:
logger.error(f"Error searching conversations: {e}")
return []
async def export_conversation(self, conversation_id: int, format: str = "json") -> Dict[str, Any]:
"""Export conversation in specified format"""
try:
conversation = await self.get_conversation_details(conversation_id)
if not conversation:
raise ValueError("Conversation not found")
if format == "json":
return {
"format": "json",
"data": conversation.__dict__,
"exported_at": datetime.utcnow().isoformat()
}
elif format == "text":
# Create readable text format
text_content = f"Conversation {conversation_id}\n"
text_content += f"Topic: {conversation.topic or 'No topic'}\n"
text_content += f"Participants: {', '.join(conversation.participants)}\n"
text_content += f"Start: {conversation.start_time}\n"
if conversation.end_time:
text_content += f"End: {conversation.end_time}\n"
text_content += f"Messages: {conversation.message_count}\n\n"
for message in conversation.messages:
timestamp = datetime.fromisoformat(message["timestamp"]).strftime("%H:%M:%S")
text_content += f"[{timestamp}] {message['character_name']}: {message['content']}\n"
return {
"format": "text",
"data": text_content,
"exported_at": datetime.utcnow().isoformat()
}
else:
raise ValueError(f"Unsupported format: {format}")
except Exception as e:
logger.error(f"Error exporting conversation {conversation_id}: {e}")
raise

View File

@@ -0,0 +1,303 @@
"""
Dashboard service for real-time metrics and activity monitoring
"""
import asyncio
import psutil
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from collections import deque
import logging
from sqlalchemy import select, func, and_, desc
from ...database.connection import get_db_session
from ...database.models import Character, Conversation, Message, Memory
from ..models import DashboardMetrics, ActivityEvent, ActivityType
from .websocket_manager import WebSocketManager
logger = logging.getLogger(__name__)
class DashboardService:
"""Service for dashboard metrics and real-time activity monitoring"""
def __init__(self, websocket_manager: WebSocketManager):
self.websocket_manager = websocket_manager
self.activity_feed = deque(maxlen=1000) # Keep last 1000 activities
self.metrics_cache = {}
self.cache_ttl = 30 # Cache metrics for 30 seconds
self.start_time = datetime.utcnow()
# System monitoring
self.system_metrics = {
"cpu_usage": [],
"memory_usage": [],
"disk_usage": []
}
@classmethod
async def initialize(cls):
"""Initialize dashboard service"""
logger.info("Dashboard service initialized")
async def get_metrics(self) -> DashboardMetrics:
"""Get current dashboard metrics"""
try:
# Check cache
now = datetime.utcnow()
if 'metrics' in self.metrics_cache:
cached_time = self.metrics_cache['timestamp']
if (now - cached_time).total_seconds() < self.cache_ttl:
return self.metrics_cache['metrics']
# Calculate metrics from database
async with get_db_session() as session:
today = datetime.utcnow().date()
today_start = datetime.combine(today, datetime.min.time())
# Total messages today
messages_today_query = select(func.count(Message.id)).where(
Message.timestamp >= today_start
)
messages_today = await session.scalar(messages_today_query) or 0
# Active conversations (those with messages in last hour)
hour_ago = datetime.utcnow() - timedelta(hours=1)
active_conversations_query = select(func.count(func.distinct(Message.conversation_id))).where(
Message.timestamp >= hour_ago
)
active_conversations = await session.scalar(active_conversations_query) or 0
# Character statistics
total_characters_query = select(func.count(Character.id))
total_characters = await session.scalar(total_characters_query) or 0
# Characters active in last hour
characters_online_query = select(func.count(func.distinct(Character.id))).select_from(
Character.__table__.join(Message.__table__)
).where(Message.timestamp >= hour_ago)
characters_online = await session.scalar(characters_online_query) or 0
# Average response time (placeholder - would need actual timing data)
average_response_time = 2.5
# Memory usage
memory_info = psutil.virtual_memory()
memory_usage = {
"total_mb": memory_info.total // (1024 * 1024),
"used_mb": memory_info.used // (1024 * 1024),
"percent": memory_info.percent
}
# System uptime
uptime_seconds = (now - self.start_time).total_seconds()
uptime_str = self._format_uptime(uptime_seconds)
# Database health check
try:
await session.execute(select(1))
database_health = "healthy"
except Exception:
database_health = "error"
metrics = DashboardMetrics(
total_messages_today=messages_today,
active_conversations=active_conversations,
characters_online=characters_online,
characters_total=total_characters,
average_response_time=average_response_time,
system_uptime=uptime_str,
memory_usage=memory_usage,
database_health=database_health,
llm_api_calls_today=0, # Would track from LLM client
llm_api_cost_today=0.0, # Would track from LLM client
last_updated=now
)
# Cache metrics
self.metrics_cache = {
'metrics': metrics,
'timestamp': now
}
return metrics
except Exception as e:
logger.error(f"Error getting dashboard metrics: {e}")
# Return fallback metrics
return DashboardMetrics(
total_messages_today=0,
active_conversations=0,
characters_online=0,
characters_total=0,
average_response_time=0.0,
system_uptime="unknown",
memory_usage={"total_mb": 0, "used_mb": 0, "percent": 0},
database_health="error",
llm_api_calls_today=0,
llm_api_cost_today=0.0,
last_updated=datetime.utcnow()
)
async def get_recent_activity(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent activity events"""
# Convert activity feed to list and limit
activities = list(self.activity_feed)[-limit:]
return [activity.__dict__ if hasattr(activity, '__dict__') else activity for activity in activities]
async def add_activity(self, activity_type: ActivityType, description: str,
character_name: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None):
"""Add new activity to feed"""
activity = ActivityEvent(
id=f"activity_{datetime.utcnow().timestamp()}",
type=activity_type,
timestamp=datetime.utcnow(),
character_name=character_name,
description=description,
metadata=metadata or {},
severity="info"
)
self.activity_feed.append(activity)
# Broadcast to WebSocket clients
await self.websocket_manager.broadcast_activity(activity.__dict__)
async def get_system_health(self) -> Dict[str, Any]:
"""Get detailed system health information"""
try:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
# Memory usage
memory = psutil.virtual_memory()
# Disk usage
disk = psutil.disk_usage('/')
# Process information
processes = []
for proc in psutil.process_iter(['pid', 'name', 'memory_percent', 'cpu_percent']):
try:
if 'python' in proc.info['name'].lower() or 'discord' in proc.info['name'].lower():
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Database connection test
database_status = "healthy"
try:
async with get_db_session() as session:
await session.execute(select(1))
except Exception as e:
database_status = f"error: {str(e)}"
health_data = {
"timestamp": datetime.utcnow().isoformat(),
"cpu": {
"usage_percent": cpu_percent,
"count": psutil.cpu_count()
},
"memory": {
"total_gb": memory.total / (1024**3),
"used_gb": memory.used / (1024**3),
"percent": memory.percent
},
"disk": {
"total_gb": disk.total / (1024**3),
"used_gb": disk.used / (1024**3),
"percent": (disk.used / disk.total) * 100
},
"database": {
"status": database_status
},
"processes": processes[:10], # Top 10 relevant processes
"websocket_connections": self.websocket_manager.get_connection_count()
}
return health_data
except Exception as e:
logger.error(f"Error getting system health: {e}")
return {"error": str(e), "timestamp": datetime.utcnow().isoformat()}
async def monitor_message_activity(self):
"""Background task to monitor message activity"""
try:
async with get_db_session() as session:
# Get recent messages
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
recent_messages_query = select(Message, Character.name).join(
Character, Message.character_id == Character.id
).where(Message.timestamp >= five_minutes_ago).order_by(desc(Message.timestamp))
results = await session.execute(recent_messages_query)
for message, character_name in results:
await self.add_activity(
ActivityType.MESSAGE,
f"{character_name}: {message.content[:100]}{'...' if len(message.content) > 100 else ''}",
character_name,
{"message_id": message.id, "conversation_id": message.conversation_id}
)
except Exception as e:
logger.error(f"Error monitoring message activity: {e}")
async def start_monitoring(self):
"""Start background monitoring tasks"""
logger.info("Starting dashboard monitoring tasks")
# Start periodic tasks
asyncio.create_task(self._periodic_metrics_update())
asyncio.create_task(self._periodic_health_check())
async def _periodic_metrics_update(self):
"""Periodically update and broadcast metrics"""
while True:
try:
metrics = await self.get_metrics()
await self.websocket_manager.broadcast_metrics(metrics.__dict__)
await asyncio.sleep(30) # Update every 30 seconds
except Exception as e:
logger.error(f"Error in periodic metrics update: {e}")
await asyncio.sleep(60) # Wait longer on error
async def _periodic_health_check(self):
"""Periodically check system health and send alerts"""
while True:
try:
health = await self.get_system_health()
# Check for alerts
alerts = []
if health.get("cpu", {}).get("usage_percent", 0) > 90:
alerts.append("High CPU usage detected")
if health.get("memory", {}).get("percent", 0) > 90:
alerts.append("High memory usage detected")
if health.get("disk", {}).get("percent", 0) > 90:
alerts.append("High disk usage detected")
if alerts:
await self.websocket_manager.broadcast_system_alert("resource_warning", {
"alerts": alerts,
"health_data": health
})
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Error in periodic health check: {e}")
await asyncio.sleep(120) # Wait longer on error
def _format_uptime(self, seconds: float) -> str:
"""Format uptime in human-readable format"""
days, remainder = divmod(int(seconds), 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
return f"{days}d {hours}h {minutes}m"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m {seconds}s"

View File

@@ -0,0 +1,170 @@
"""
System service for monitoring and controlling the fishbowl system
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import psutil
import json
from ..models import SystemStatus, SystemStatusEnum, SystemConfiguration, LogEntry
logger = logging.getLogger(__name__)
class SystemService:
"""Service for system monitoring and control"""
def __init__(self):
self.system_state = SystemStatusEnum.RUNNING
self.start_time = datetime.utcnow()
self.error_count = 0
self.warnings_count = 0
self.log_buffer = []
@classmethod
async def initialize(cls):
"""Initialize system service"""
logger.info("System service initialized")
async def get_status(self) -> SystemStatus:
"""Get current system status"""
try:
uptime_seconds = (datetime.utcnow() - self.start_time).total_seconds()
uptime_str = self._format_uptime(uptime_seconds)
# Get resource usage
memory = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent(interval=1)
resource_usage = {
"cpu_percent": cpu_percent,
"memory_total_mb": memory.total // (1024 * 1024),
"memory_used_mb": memory.used // (1024 * 1024),
"memory_percent": memory.percent
}
# Performance metrics
performance_metrics = {
"avg_response_time": 2.5, # Would track actual response times
"requests_per_minute": 30, # Would track actual request rate
"database_query_time": 0.05 # Would track actual DB performance
}
return SystemStatus(
status=self.system_state,
uptime=uptime_str,
version="1.0.0",
database_status="healthy",
redis_status="healthy",
llm_service_status="healthy",
discord_bot_status="connected",
active_processes=["main", "conversation_engine", "scheduler", "admin_interface"],
error_count=self.error_count,
warnings_count=self.warnings_count,
performance_metrics=performance_metrics,
resource_usage=resource_usage
)
except Exception as e:
logger.error(f"Error getting system status: {e}")
return SystemStatus(
status=SystemStatusEnum.ERROR,
uptime="unknown",
version="1.0.0",
database_status="error",
redis_status="unknown",
llm_service_status="unknown",
discord_bot_status="unknown",
active_processes=[],
error_count=self.error_count + 1,
warnings_count=self.warnings_count,
performance_metrics={},
resource_usage={}
)
async def pause_system(self):
"""Pause the entire system"""
try:
logger.info("Pausing system operations")
self.system_state = SystemStatusEnum.PAUSED
# Would integrate with main application to pause operations
except Exception as e:
logger.error(f"Error pausing system: {e}")
raise
async def resume_system(self):
"""Resume system operations"""
try:
logger.info("Resuming system operations")
self.system_state = SystemStatusEnum.RUNNING
# Would integrate with main application to resume operations
except Exception as e:
logger.error(f"Error resuming system: {e}")
raise
async def get_configuration(self) -> SystemConfiguration:
"""Get system configuration"""
# Default configuration values
return SystemConfiguration(
conversation_frequency=0.5,
response_delay_min=1.0,
response_delay_max=5.0,
personality_change_rate=0.1,
memory_retention_days=90,
max_conversation_length=50,
creativity_boost=True,
conflict_resolution_enabled=True,
safety_monitoring=True,
auto_moderation=False,
backup_frequency_hours=24
)
async def update_configuration(self, config: Dict[str, Any]):
"""Update system configuration"""
try:
logger.info(f"Updating system configuration: {config}")
# Would integrate with main application to update configuration
except Exception as e:
logger.error(f"Error updating configuration: {e}")
raise
async def get_logs(self, limit: int = 100, level: Optional[str] = None) -> List[LogEntry]:
"""Get system logs"""
try:
# In production, this would read from actual log files
sample_logs = [
LogEntry(
timestamp=datetime.utcnow() - timedelta(minutes=i),
level="INFO" if i % 3 != 0 else "DEBUG",
component="conversation_engine",
message=f"Sample log message {i}",
metadata={"log_id": i}
)
for i in range(min(limit, 50))
]
if level:
sample_logs = [log for log in sample_logs if log.level == level.upper()]
return sample_logs
except Exception as e:
logger.error(f"Error getting logs: {e}")
return []
def _format_uptime(self, seconds: float) -> str:
"""Format uptime in human-readable format"""
days, remainder = divmod(int(seconds), 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
if days > 0:
return f"{days}d {hours}h {minutes}m"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m {seconds}s"

View File

@@ -0,0 +1,132 @@
"""
WebSocket manager for real-time updates
"""
import json
import asyncio
from typing import List, Dict, Any
from fastapi import WebSocket
import logging
logger = logging.getLogger(__name__)
class WebSocketManager:
"""Manage WebSocket connections for real-time updates"""
def __init__(self):
self.active_connections: List[WebSocket] = []
self.connection_metadata: Dict[WebSocket, Dict[str, Any]] = {}
async def connect(self, websocket: WebSocket):
"""Accept new WebSocket connection"""
await websocket.accept()
self.active_connections.append(websocket)
self.connection_metadata[websocket] = {
"connected_at": asyncio.get_event_loop().time(),
"message_count": 0
}
logger.info(f"WebSocket connected. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""Remove WebSocket connection"""
if websocket in self.active_connections:
self.active_connections.remove(websocket)
if websocket in self.connection_metadata:
del self.connection_metadata[websocket]
logger.info(f"WebSocket disconnected. Total connections: {len(self.active_connections)}")
async def send_personal_message(self, message: Dict[str, Any], websocket: WebSocket):
"""Send message to specific WebSocket"""
try:
await websocket.send_text(json.dumps(message))
if websocket in self.connection_metadata:
self.connection_metadata[websocket]["message_count"] += 1
except Exception as e:
logger.error(f"Error sending personal message: {e}")
self.disconnect(websocket)
async def broadcast(self, message: Dict[str, Any]):
"""Broadcast message to all connected WebSockets"""
if not self.active_connections:
return
message_text = json.dumps(message)
disconnected = []
for connection in self.active_connections:
try:
await connection.send_text(message_text)
if connection in self.connection_metadata:
self.connection_metadata[connection]["message_count"] += 1
except Exception as e:
logger.error(f"Error broadcasting to connection: {e}")
disconnected.append(connection)
# Remove disconnected WebSockets
for connection in disconnected:
self.disconnect(connection)
async def broadcast_activity(self, activity_data: Dict[str, Any]):
"""Broadcast activity update to all connections"""
message = {
"type": "activity_update",
"data": activity_data,
"timestamp": asyncio.get_event_loop().time()
}
await self.broadcast(message)
async def broadcast_metrics(self, metrics_data: Dict[str, Any]):
"""Broadcast metrics update to all connections"""
message = {
"type": "metrics_update",
"data": metrics_data,
"timestamp": asyncio.get_event_loop().time()
}
await self.broadcast(message)
async def broadcast_character_update(self, character_name: str, update_data: Dict[str, Any]):
"""Broadcast character status update"""
message = {
"type": "character_update",
"character_name": character_name,
"data": update_data,
"timestamp": asyncio.get_event_loop().time()
}
await self.broadcast(message)
async def broadcast_conversation_update(self, conversation_id: int, update_data: Dict[str, Any]):
"""Broadcast conversation update"""
message = {
"type": "conversation_update",
"conversation_id": conversation_id,
"data": update_data,
"timestamp": asyncio.get_event_loop().time()
}
await self.broadcast(message)
async def broadcast_system_alert(self, alert_type: str, alert_data: Dict[str, Any]):
"""Broadcast system alert"""
message = {
"type": "system_alert",
"alert_type": alert_type,
"data": alert_data,
"timestamp": asyncio.get_event_loop().time()
}
await self.broadcast(message)
def get_connection_count(self) -> int:
"""Get number of active connections"""
return len(self.active_connections)
def get_connection_stats(self) -> Dict[str, Any]:
"""Get connection statistics"""
total_messages = sum(
metadata.get("message_count", 0)
for metadata in self.connection_metadata.values()
)
return {
"active_connections": len(self.active_connections),
"total_messages_sent": total_messages,
"average_messages_per_connection": total_messages / max(1, len(self.active_connections))
}