import asyncio import httpx import json import time from typing import Dict, Any, Optional, List from datetime import datetime, timedelta, timezone from utils.config import get_settings from utils.logging import log_llm_interaction, log_error_with_context, log_system_health from admin.services.audit_service import AuditService import logging logger = logging.getLogger(__name__) class LLMClient: """Async LLM client for interacting with local LLM APIs (Ollama, etc.)""" def __init__(self): self.settings = get_settings() self.base_url = self.settings.llm.base_url self.model = self.settings.llm.model # Force 5-minute timeout for self-hosted large models self.timeout = 300 self.max_tokens = self.settings.llm.max_tokens self.temperature = self.settings.llm.temperature # Rate limiting self.request_times = [] self.max_requests_per_minute = 30 # Response caching self.cache = {} self.cache_ttl = 300 # 5 minutes # Background task queue for long-running requests self.pending_requests = {} self.max_timeout = 300 # 5 minutes for self-hosted large models self.fallback_timeout = 300 # 5 minutes for self-hosted large models # Health monitoring self.health_stats = { 'total_requests': 0, 'successful_requests': 0, 'failed_requests': 0, 'average_response_time': 0, 'last_health_check': datetime.now(timezone.utc) } async def generate_response(self, prompt: str, character_name: str = None, max_tokens: int = None, temperature: float = None, use_fallback: bool = True) -> Optional[str]: """Generate response using LLM""" try: # Rate limiting check if not await self._check_rate_limit(): logger.warning(f"Rate limit exceeded for {character_name}") return None # Check cache first cache_key = self._generate_cache_key(prompt, character_name, max_tokens, temperature) cached_response = self._get_cached_response(cache_key) if cached_response: return cached_response start_time = time.time() # Use shorter timeout for immediate responses, longer for background effective_timeout = self.fallback_timeout if use_fallback else min(self.timeout, self.max_timeout) # Try OpenAI-compatible API first (KoboldCPP, etc.) async with httpx.AsyncClient(timeout=effective_timeout) as client: try: # OpenAI-compatible request request_data = { "model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature or self.temperature, "max_tokens": max_tokens or self.max_tokens, "top_p": 0.9, "stream": False } # Debug logging logger.debug(f"LLM Request for {character_name}:") logger.debug(f"Model: {self.model}") logger.debug(f"Prompt (first 500 chars): {prompt[:500]}...") logger.debug(f"Full prompt length: {len(prompt)} chars") response = await client.post( f"{self.base_url}/chat/completions", json=request_data, headers={"Content-Type": "application/json"} ) response.raise_for_status() result = response.json() if 'choices' in result and result['choices'] and 'message' in result['choices'][0]: generated_text = result['choices'][0]['message']['content'].strip() logger.debug(f"LLM Response for {character_name}: {generated_text[:200]}...") else: generated_text = None logger.debug(f"LLM Response for {character_name}: Invalid response format") except (httpx.HTTPStatusError, httpx.RequestError, KeyError): # Fallback to Ollama API request_data = { "model": self.model, "prompt": prompt, "options": { "temperature": temperature or self.temperature, "num_predict": max_tokens or self.max_tokens, "top_p": 0.9, "top_k": 40, "repeat_penalty": 1.1 }, "stream": False } response = await client.post( f"{self.base_url}/api/generate", json=request_data, headers={"Content-Type": "application/json"} ) response.raise_for_status() result = response.json() if 'response' in result and result['response']: generated_text = result['response'].strip() else: generated_text = None if generated_text: # Cache the response self._cache_response(cache_key, generated_text) # Update stats duration = time.time() - start_time self._update_stats(True, duration) # Log interaction log_llm_interaction( character_name or "unknown", len(prompt), len(generated_text), self.model, duration ) # AUDIT: Log performance metric await AuditService.log_performance_metric( metric_name="llm_response_time", metric_value=duration, metric_unit="seconds", component="llm_client", additional_data={ "model": self.model, "character_name": character_name, "prompt_length": len(prompt), "response_length": len(generated_text) } ) return generated_text else: logger.error(f"No response from LLM: {result}") self._update_stats(False, time.time() - start_time) return None except httpx.TimeoutException: if use_fallback: logger.warning(f"LLM request timeout for {character_name}, using fallback response") # Queue for background processing if needed if self.timeout > self.max_timeout: background_task = asyncio.create_task(self.generate_response( prompt, character_name, max_tokens, temperature, use_fallback=False )) request_id = f"{character_name}_{time.time()}" self.pending_requests[request_id] = background_task # Return a fallback response immediately fallback_response = self._get_fallback_response(character_name) self._update_stats(False, effective_timeout) return fallback_response else: logger.error(f"LLM background request timeout for {character_name}") self._update_stats(False, effective_timeout) return None except httpx.HTTPError as e: logger.error(f"LLM HTTP error for {character_name}: {e}") self._update_stats(False, time.time() - start_time) return None except Exception as e: log_error_with_context(e, { "character_name": character_name, "prompt_length": len(prompt), "model": self.model }) self._update_stats(False, time.time() - start_time) return None async def generate_batch_responses(self, prompts: List[Dict[str, Any]]) -> List[Optional[str]]: """Generate multiple responses in batch""" tasks = [] for prompt_data in prompts: task = self.generate_response( prompt=prompt_data['prompt'], character_name=prompt_data.get('character_name'), max_tokens=prompt_data.get('max_tokens'), temperature=prompt_data.get('temperature') ) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) # Convert exceptions to None return [result if not isinstance(result, Exception) else None for result in results] async def check_model_availability(self) -> bool: """Check if the LLM model is available""" try: async with httpx.AsyncClient(timeout=10) as client: # Try OpenAI-compatible API first (KoboldCPP, etc.) try: response = await client.get(f"{self.base_url}/models") response.raise_for_status() models = response.json() available_models = [model.get('id', '') for model in models.get('data', [])] except (httpx.HTTPStatusError, httpx.RequestError): # Fallback to Ollama API response = await client.get(f"{self.base_url}/api/tags") response.raise_for_status() models = response.json() available_models = [model.get('name', '') for model in models.get('models', [])] is_available = any(self.model in model_name for model_name in available_models) log_system_health( "llm_client", "available" if is_available else "model_not_found", {"model": self.model, "available_models": available_models} ) return is_available except Exception as e: log_error_with_context(e, {"model": self.model}) log_system_health("llm_client", "unavailable", {"error": str(e)}) return False async def get_model_info(self) -> Dict[str, Any]: """Get information about the current model""" try: async with httpx.AsyncClient(timeout=10) as client: response = await client.post( f"{self.base_url}/api/show", json={"name": self.model} ) response.raise_for_status() return response.json() except Exception as e: log_error_with_context(e, {"model": self.model}) return {} async def health_check(self) -> Dict[str, Any]: """Perform health check on LLM service""" try: start_time = time.time() # Test with simple prompt test_prompt = "Respond with 'OK' if you can understand this message." response = await self.generate_response(test_prompt, "health_check") duration = time.time() - start_time health_status = { 'status': 'healthy' if response else 'unhealthy', 'response_time': duration, 'model': self.model, 'base_url': self.base_url, 'timestamp': datetime.now(timezone.utc).isoformat() } # Update health check time self.health_stats['last_health_check'] = datetime.now(timezone.utc) return health_status except Exception as e: log_error_with_context(e, {"component": "llm_health_check"}) return { 'status': 'error', 'error': str(e), 'model': self.model, 'base_url': self.base_url, 'timestamp': datetime.now(timezone.utc).isoformat() } def get_statistics(self) -> Dict[str, Any]: """Get client statistics""" return { 'total_requests': self.health_stats['total_requests'], 'successful_requests': self.health_stats['successful_requests'], 'failed_requests': self.health_stats['failed_requests'], 'success_rate': ( self.health_stats['successful_requests'] / self.health_stats['total_requests'] if self.health_stats['total_requests'] > 0 else 0 ), 'average_response_time': self.health_stats['average_response_time'], 'cache_size': len(self.cache), 'last_health_check': self.health_stats['last_health_check'].isoformat() } async def _check_rate_limit(self) -> bool: """Check if we're within rate limits""" now = time.time() # Remove old requests (older than 1 minute) self.request_times = [t for t in self.request_times if now - t < 60] # Check if we can make another request if len(self.request_times) >= self.max_requests_per_minute: return False # Add current request time self.request_times.append(now) return True def _generate_cache_key(self, prompt: str, character_name: str = None, max_tokens: int = None, temperature: float = None) -> str: """Generate cache key for response""" import hashlib cache_data = { 'prompt': prompt, 'character_name': character_name, 'max_tokens': max_tokens or self.max_tokens, 'temperature': temperature or self.temperature, 'model': self.model } cache_string = json.dumps(cache_data, sort_keys=True) return hashlib.md5(cache_string.encode()).hexdigest() def _get_cached_response(self, cache_key: str) -> Optional[str]: """Get cached response if available and not expired""" if cache_key in self.cache: cached_data = self.cache[cache_key] if time.time() - cached_data['timestamp'] < self.cache_ttl: return cached_data['response'] else: # Remove expired cache entry del self.cache[cache_key] return None def _cache_response(self, cache_key: str, response: str): """Cache response""" self.cache[cache_key] = { 'response': response, 'timestamp': time.time() } # Clean up old cache entries if cache is too large if len(self.cache) > 100: # Remove oldest entries oldest_keys = sorted( self.cache.keys(), key=lambda k: self.cache[k]['timestamp'] )[:20] for key in oldest_keys: del self.cache[key] def _update_stats(self, success: bool, duration: float): """Update health statistics""" self.health_stats['total_requests'] += 1 if success: self.health_stats['successful_requests'] += 1 else: self.health_stats['failed_requests'] += 1 # Update average response time total_requests = self.health_stats['total_requests'] current_avg = self.health_stats['average_response_time'] # Rolling average self.health_stats['average_response_time'] = ( (current_avg * (total_requests - 1) + duration) / total_requests ) def _get_fallback_response(self, character_name: str = None) -> str: """Generate a character-aware fallback response when LLM is slow""" if character_name: # Character-specific fallbacks based on their personalities character_fallbacks = { "Alex": [ "*processing all the technical implications...*", "Let me analyze this from a different angle.", "That's fascinating - I need to think through the logic here.", "*running diagnostics on my thoughts...*" ], "Sage": [ "*contemplating the deeper meaning...*", "The philosophical implications are worth considering carefully.", "*reflecting on the nature of this question...*", "This touches on something profound - give me a moment." ], "Luna": [ "*feeling the creative energy flow...*", "Oh, this sparks so many artistic ideas! Let me gather my thoughts.", "*painting mental images of possibilities...*", "The beauty of this thought needs careful expression." ], "Echo": [ "*drifting between dimensions of thought...*", "The echoes of meaning reverberate... patience.", "*sensing the hidden patterns...*", "Reality shifts... understanding emerges slowly." ] } if character_name in character_fallbacks: import random return random.choice(character_fallbacks[character_name]) # Generic fallbacks fallback_responses = [ "*thinking deeply about this...*", "*processing thoughts...*", "*contemplating the discussion...*", "*reflecting on what you've said...*", "*considering different perspectives...*", "Hmm, that's an interesting point to consider.", "I need a moment to think about that.", "That's worth reflecting on carefully.", "*taking time to formulate thoughts...*" ] import random return random.choice(fallback_responses) async def generate_response_with_fallback(self, prompt: str, character_name: str = None, max_tokens: int = None, temperature: float = None) -> str: """Generate response with guaranteed fallback if LLM is slow""" try: # Try immediate response first response = await self.generate_response( prompt, character_name, max_tokens, temperature, use_fallback=True ) if response: return response else: # Return fallback if no response return self._get_fallback_response(character_name) except Exception as e: log_error_with_context(e, { "character_name": character_name, "prompt_length": len(prompt) }) return self._get_fallback_response(character_name) async def cleanup_pending_requests(self): """Clean up completed background requests""" completed_requests = [] for request_id, task in self.pending_requests.items(): if task.done(): completed_requests.append(request_id) try: result = await task if result: logger.info(f"Background LLM request {request_id} completed successfully") except Exception as e: logger.error(f"Background LLM request {request_id} failed: {e}") # Remove completed requests for request_id in completed_requests: del self.pending_requests[request_id] def get_pending_count(self) -> int: """Get number of pending background requests""" return len(self.pending_requests) class PromptManager: """Manages prompt templates and optimization""" def __init__(self): self.templates = { 'character_response': """You are {character_name}, responding in a Discord chat. {personality_context} {conversation_context} {memory_context} {relationship_context} Respond naturally as {character_name}. Keep it conversational and authentic to your personality.""", 'conversation_starter': """You are {character_name} in a Discord chat. {personality_context} Start a conversation about: {topic} Be natural and engaging. Your response should invite others to participate.""", 'self_reflection': """You are {character_name}. Reflect on your recent experiences: {personality_context} {recent_experiences} Consider: - How these experiences have affected you - Any changes in your perspective - Your relationships with others - Your personal growth Share your thoughtful reflection.""" } def build_prompt(self, template_name: str, **kwargs) -> str: """Build prompt from template""" template = self.templates.get(template_name) if not template: raise ValueError(f"Template '{template_name}' not found") try: return template.format(**kwargs) except KeyError as e: raise ValueError(f"Missing required parameter for template '{template_name}': {e}") def optimize_prompt(self, prompt: str, max_length: int = 2000) -> str: """Optimize prompt for better performance""" # Truncate if too long if len(prompt) > max_length: # Try to cut at paragraph boundaries paragraphs = prompt.split('\n\n') optimized = "" for paragraph in paragraphs: if len(optimized + paragraph) <= max_length: optimized += paragraph + '\n\n' else: break if optimized: return optimized.strip() else: # Fallback to simple truncation return prompt[:max_length] + "..." return prompt def add_template(self, name: str, template: str): """Add custom prompt template""" self.templates[name] = template def get_template_names(self) -> List[str]: """Get list of available template names""" return list(self.templates.keys()) # Global instances llm_client = LLMClient() prompt_manager = PromptManager()