import asyncio import chromadb import numpy as np from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta from pathlib import Path import json import hashlib from dataclasses import dataclass, asdict from enum import Enum from sentence_transformers import SentenceTransformer from ..utils.logging import log_error_with_context, log_character_action from ..utils.config import get_settings import logging logger = logging.getLogger(__name__) class MemoryType(Enum): PERSONAL = "personal" RELATIONSHIP = "relationship" CREATIVE = "creative" COMMUNITY = "community" REFLECTION = "reflection" EXPERIENCE = "experience" @dataclass class VectorMemory: id: str content: str memory_type: MemoryType character_name: str timestamp: datetime importance: float metadata: Dict[str, Any] embedding: Optional[List[float]] = None def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "content": self.content, "memory_type": self.memory_type.value, "character_name": self.character_name, "timestamp": self.timestamp.isoformat(), "importance": self.importance, "metadata": self.metadata } class VectorStoreManager: """Manages multi-layer vector databases for character memories""" def __init__(self, data_path: str = "./data/vector_stores"): self.data_path = Path(data_path) self.data_path.mkdir(parents=True, exist_ok=True) # Initialize embedding model self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') # Initialize ChromaDB client self.chroma_client = chromadb.PersistentClient(path=str(self.data_path)) # Collection references self.personal_collections: Dict[str, chromadb.Collection] = {} self.community_collection = None self.creative_collections: Dict[str, chromadb.Collection] = {} # Memory importance decay self.importance_decay_rate = 0.95 self.consolidation_threshold = 0.8 async def initialize(self, character_names: List[str]): """Initialize collections for all characters""" try: # Initialize personal memory collections for character_name in character_names: collection_name = f"personal_{character_name.lower()}" self.personal_collections[character_name] = self.chroma_client.get_or_create_collection( name=collection_name, metadata={"type": "personal", "character": character_name} ) # Initialize creative collections creative_collection_name = f"creative_{character_name.lower()}" self.creative_collections[character_name] = self.chroma_client.get_or_create_collection( name=creative_collection_name, metadata={"type": "creative", "character": character_name} ) # Initialize community collection self.community_collection = self.chroma_client.get_or_create_collection( name="community_knowledge", metadata={"type": "community"} ) logger.info(f"Initialized vector stores for {len(character_names)} characters") except Exception as e: log_error_with_context(e, {"component": "vector_store_init"}) raise async def store_memory(self, memory: VectorMemory) -> str: """Store a memory in appropriate vector database""" try: # Generate embedding if not memory.embedding: memory.embedding = await self._generate_embedding(memory.content) # Generate unique ID if not provided if not memory.id: memory.id = self._generate_memory_id(memory) # Select appropriate collection collection = self._get_collection_for_memory(memory) if not collection: raise ValueError(f"No collection found for memory type: {memory.memory_type}") # Prepare metadata metadata = memory.metadata.copy() metadata.update({ "character_name": memory.character_name, "timestamp": memory.timestamp.isoformat(), "importance": memory.importance, "memory_type": memory.memory_type.value }) # Store in collection collection.add( ids=[memory.id], embeddings=[memory.embedding], documents=[memory.content], metadatas=[metadata] ) log_character_action( memory.character_name, "stored_vector_memory", {"memory_type": memory.memory_type.value, "importance": memory.importance} ) return memory.id except Exception as e: log_error_with_context(e, { "character": memory.character_name, "memory_type": memory.memory_type.value }) raise async def query_memories(self, character_name: str, query: str, memory_types: List[MemoryType] = None, limit: int = 10, min_importance: float = 0.0) -> List[VectorMemory]: """Query character's memories using semantic search""" try: # Generate query embedding query_embedding = await self._generate_embedding(query) # Determine which collections to search collections_to_search = [] if not memory_types: memory_types = [MemoryType.PERSONAL, MemoryType.RELATIONSHIP, MemoryType.EXPERIENCE, MemoryType.REFLECTION] for memory_type in memory_types: collection = self._get_collection_for_type(character_name, memory_type) if collection: collections_to_search.append((collection, memory_type)) # Search each collection all_results = [] for collection, memory_type in collections_to_search: try: results = collection.query( query_embeddings=[query_embedding], n_results=limit, where={"character_name": character_name} if memory_type != MemoryType.COMMUNITY else None ) # Convert results to VectorMemory objects for i, (doc, metadata, distance) in enumerate(zip( results['documents'][0], results['metadatas'][0], results['distances'][0] )): if metadata.get('importance', 0) >= min_importance: memory = VectorMemory( id=results['ids'][0][i], content=doc, memory_type=MemoryType(metadata['memory_type']), character_name=metadata['character_name'], timestamp=datetime.fromisoformat(metadata['timestamp']), importance=metadata['importance'], metadata=metadata ) memory.metadata['similarity_score'] = 1 - distance # Convert distance to similarity all_results.append(memory) except Exception as e: logger.warning(f"Error querying collection {memory_type}: {e}") continue # Sort by relevance (similarity + importance) all_results.sort( key=lambda m: m.metadata.get('similarity_score', 0) * 0.7 + m.importance * 0.3, reverse=True ) return all_results[:limit] except Exception as e: log_error_with_context(e, {"character": character_name, "query": query}) return [] async def query_community_knowledge(self, query: str, limit: int = 5) -> List[VectorMemory]: """Query community knowledge base""" try: if not self.community_collection: return [] query_embedding = await self._generate_embedding(query) results = self.community_collection.query( query_embeddings=[query_embedding], n_results=limit ) memories = [] for i, (doc, metadata, distance) in enumerate(zip( results['documents'][0], results['metadatas'][0], results['distances'][0] )): memory = VectorMemory( id=results['ids'][0][i], content=doc, memory_type=MemoryType.COMMUNITY, character_name=metadata.get('character_name', 'community'), timestamp=datetime.fromisoformat(metadata['timestamp']), importance=metadata['importance'], metadata=metadata ) memory.metadata['similarity_score'] = 1 - distance memories.append(memory) return sorted(memories, key=lambda m: m.metadata.get('similarity_score', 0), reverse=True) except Exception as e: log_error_with_context(e, {"query": query, "component": "community_knowledge"}) return [] async def get_creative_knowledge(self, character_name: str, query: str, limit: int = 5) -> List[VectorMemory]: """Query character's creative knowledge base""" try: if character_name not in self.creative_collections: return [] collection = self.creative_collections[character_name] query_embedding = await self._generate_embedding(query) results = collection.query( query_embeddings=[query_embedding], n_results=limit ) memories = [] for i, (doc, metadata, distance) in enumerate(zip( results['documents'][0], results['metadatas'][0], results['distances'][0] )): memory = VectorMemory( id=results['ids'][0][i], content=doc, memory_type=MemoryType.CREATIVE, character_name=character_name, timestamp=datetime.fromisoformat(metadata['timestamp']), importance=metadata['importance'], metadata=metadata ) memory.metadata['similarity_score'] = 1 - distance memories.append(memory) return sorted(memories, key=lambda m: m.metadata.get('similarity_score', 0), reverse=True) except Exception as e: log_error_with_context(e, {"character": character_name, "query": query}) return [] async def consolidate_memories(self, character_name: str) -> Dict[str, Any]: """Consolidate similar memories to save space""" try: consolidated_count = 0 # Get all personal memories for character collection = self.personal_collections.get(character_name) if not collection: return {"consolidated_count": 0} # Get all memories all_memories = collection.get() if len(all_memories['ids']) < 10: # Not enough memories to consolidate return {"consolidated_count": 0} # Find similar memory clusters clusters = await self._find_similar_clusters(all_memories) # Consolidate each cluster for cluster in clusters: if len(cluster) >= 3: # Only consolidate if 3+ similar memories consolidated_memory = await self._create_consolidated_memory(cluster, character_name) if consolidated_memory: # Store consolidated memory await self.store_memory(consolidated_memory) # Remove original memories collection.delete(ids=[mem['id'] for mem in cluster]) consolidated_count += len(cluster) - 1 log_character_action( character_name, "consolidated_memories", {"consolidated_count": consolidated_count} ) return {"consolidated_count": consolidated_count} except Exception as e: log_error_with_context(e, {"character": character_name}) return {"consolidated_count": 0} async def decay_memory_importance(self, character_name: str): """Apply time-based decay to memory importance""" try: collection = self.personal_collections.get(character_name) if not collection: return # Get all memories all_memories = collection.get(include=['metadatas']) updates = [] for memory_id, metadata in zip(all_memories['ids'], all_memories['metadatas']): # Calculate age in days timestamp = datetime.fromisoformat(metadata['timestamp']) age_days = (datetime.utcnow() - timestamp).days # Apply decay current_importance = metadata['importance'] decayed_importance = current_importance * (self.importance_decay_rate ** age_days) if abs(decayed_importance - current_importance) > 0.01: # Only update if significant change metadata['importance'] = decayed_importance updates.append((memory_id, metadata)) # Update in batches if updates: for memory_id, metadata in updates: collection.update( ids=[memory_id], metadatas=[metadata] ) logger.info(f"Applied importance decay to {len(updates)} memories for {character_name}") except Exception as e: log_error_with_context(e, {"character": character_name}) async def _generate_embedding(self, text: str) -> List[float]: """Generate embedding for text""" try: # Use asyncio to avoid blocking loop = asyncio.get_event_loop() embedding = await loop.run_in_executor( None, lambda: self.embedding_model.encode(text).tolist() ) return embedding except Exception as e: log_error_with_context(e, {"text_length": len(text)}) # Return zero embedding as fallback return [0.0] * 384 # MiniLM embedding size def _get_collection_for_memory(self, memory: VectorMemory) -> Optional[chromadb.Collection]: """Get appropriate collection for memory""" if memory.memory_type == MemoryType.COMMUNITY: return self.community_collection elif memory.memory_type == MemoryType.CREATIVE: return self.creative_collections.get(memory.character_name) else: return self.personal_collections.get(memory.character_name) def _get_collection_for_type(self, character_name: str, memory_type: MemoryType) -> Optional[chromadb.Collection]: """Get collection for specific memory type and character""" if memory_type == MemoryType.COMMUNITY: return self.community_collection elif memory_type == MemoryType.CREATIVE: return self.creative_collections.get(character_name) else: return self.personal_collections.get(character_name) def _generate_memory_id(self, memory: VectorMemory) -> str: """Generate unique ID for memory""" content_hash = hashlib.md5(memory.content.encode()).hexdigest()[:8] timestamp_str = memory.timestamp.strftime("%Y%m%d_%H%M%S") return f"{memory.character_name}_{memory.memory_type.value}_{timestamp_str}_{content_hash}" async def _find_similar_clusters(self, memories: Dict[str, List]) -> List[List[Dict]]: """Find clusters of similar memories for consolidation""" # This is a simplified clustering - in production you'd use proper clustering algorithms clusters = [] processed = set() for i, memory_id in enumerate(memories['ids']): if memory_id in processed: continue cluster = [{'id': memory_id, 'content': memories['documents'][i], 'metadata': memories['metadatas'][i]}] processed.add(memory_id) # Find similar memories (simplified similarity check) for j, other_id in enumerate(memories['ids'][i+1:], i+1): if other_id in processed: continue # Simple similarity check based on content overlap content1 = memories['documents'][i].lower() content2 = memories['documents'][j].lower() words1 = set(content1.split()) words2 = set(content2.split()) overlap = len(words1 & words2) / len(words1 | words2) if words1 | words2 else 0 if overlap > 0.3: # 30% word overlap threshold cluster.append({'id': other_id, 'content': memories['documents'][j], 'metadata': memories['metadatas'][j]}) processed.add(other_id) if len(cluster) > 1: clusters.append(cluster) return clusters async def _create_consolidated_memory(self, cluster: List[Dict], character_name: str) -> Optional[VectorMemory]: """Create a consolidated memory from a cluster of similar memories""" try: # Combine content contents = [mem['content'] for mem in cluster] combined_content = f"Consolidated memory: {' | '.join(contents[:3])}" # Limit to first 3 if len(cluster) > 3: combined_content += f" | ... and {len(cluster) - 3} more similar memories" # Calculate average importance avg_importance = sum(mem['metadata']['importance'] for mem in cluster) / len(cluster) # Get earliest timestamp timestamps = [datetime.fromisoformat(mem['metadata']['timestamp']) for mem in cluster] earliest_timestamp = min(timestamps) # Create consolidated memory consolidated = VectorMemory( id="", # Will be generated content=combined_content, memory_type=MemoryType.PERSONAL, character_name=character_name, timestamp=earliest_timestamp, importance=avg_importance, metadata={ "consolidated": True, "original_count": len(cluster), "consolidation_date": datetime.utcnow().isoformat() } ) return consolidated except Exception as e: log_error_with_context(e, {"character": character_name, "cluster_size": len(cluster)}) return None def get_store_statistics(self, character_name: str) -> Dict[str, Any]: """Get statistics about character's vector stores""" try: stats = { "personal_memories": 0, "creative_memories": 0, "community_memories": 0, "total_memories": 0 } # Personal memories if character_name in self.personal_collections: personal_count = self.personal_collections[character_name].count() stats["personal_memories"] = personal_count stats["total_memories"] += personal_count # Creative memories if character_name in self.creative_collections: creative_count = self.creative_collections[character_name].count() stats["creative_memories"] = creative_count stats["total_memories"] += creative_count # Community memories (shared) if self.community_collection: stats["community_memories"] = self.community_collection.count() return stats except Exception as e: log_error_with_context(e, {"character": character_name}) return {"error": str(e)} # Global vector store manager vector_store_manager = VectorStoreManager()