""" Admin audit service for tracking administrative actions and security events """ import asyncio from datetime import datetime, timezone from typing import Dict, Any, Optional, List import logging from ipaddress import ip_address, AddressValueError from database.connection import get_db_session from database.models import AdminAuditLog, SecurityEvent, PerformanceMetric, FileOperationLog, AdminSession from sqlalchemy import select, and_, desc, func from utils.logging import log_error_with_context logger = logging.getLogger(__name__) class AuditService: """Service for tracking admin actions and security events""" @classmethod async def log_admin_action(cls, admin_user: str, action_type: str, resource_affected: str = None, changes_made: Dict[str, Any] = None, request_ip: str = None, user_agent: str = None, session_id: str = None, success: bool = True, error_message: str = None): """Log an administrative action""" try: async with get_db_session() as session: audit_log = AdminAuditLog( admin_user=admin_user, action_type=action_type, resource_affected=resource_affected, changes_made=changes_made or {}, request_ip=cls._validate_ip(request_ip), user_agent=user_agent, session_id=session_id, success=success, error_message=error_message, timestamp=datetime.now(timezone.utc) ) session.add(audit_log) await session.commit() logger.info(f"Admin action logged: {admin_user} performed {action_type} on {resource_affected}") except Exception as e: log_error_with_context(e, { "admin_user": admin_user, "action_type": action_type, "component": "audit_service_admin_action" }) @classmethod async def log_security_event(cls, event_type: str, severity: str = "info", source_ip: str = None, user_identifier: str = None, event_data: Dict[str, Any] = None): """Log a security event""" try: async with get_db_session() as session: security_event = SecurityEvent( event_type=event_type, severity=severity, source_ip=cls._validate_ip(source_ip), user_identifier=user_identifier, event_data=event_data or {}, timestamp=datetime.now(timezone.utc) ) session.add(security_event) await session.commit() logger.info(f"Security event logged: {event_type} (severity: {severity})") # Alert on high severity events if severity in ["error", "critical"]: logger.warning(f"HIGH SEVERITY SECURITY EVENT: {event_type} from {source_ip}") except Exception as e: log_error_with_context(e, { "event_type": event_type, "severity": severity, "component": "audit_service_security_event" }) @classmethod async def log_performance_metric(cls, metric_name: str, metric_value: float, metric_unit: str = None, character_id: int = None, component: str = None, additional_data: Dict[str, Any] = None): """Log a performance metric""" try: async with get_db_session() as session: performance_metric = PerformanceMetric( metric_name=metric_name, metric_value=metric_value, metric_unit=metric_unit, character_id=character_id, component=component, additional_data=additional_data or {}, timestamp=datetime.now(timezone.utc) ) session.add(performance_metric) await session.commit() logger.debug(f"Performance metric logged: {metric_name}={metric_value}{metric_unit or ''}") except Exception as e: log_error_with_context(e, { "metric_name": metric_name, "metric_value": metric_value, "component": "audit_service_performance_metric" }) @classmethod async def log_file_operation(cls, character_id: int, operation_type: str, file_path: str, file_size: int = None, success: bool = True, error_message: str = None, mcp_server: str = None, request_context: Dict[str, Any] = None): """Log a file operation""" try: async with get_db_session() as session: file_operation = FileOperationLog( character_id=character_id, operation_type=operation_type, file_path=file_path, file_size=file_size, success=success, error_message=error_message, mcp_server=mcp_server, request_context=request_context or {}, timestamp=datetime.now(timezone.utc) ) session.add(file_operation) await session.commit() logger.debug(f"File operation logged: {operation_type} on {file_path} (success: {success})") except Exception as e: log_error_with_context(e, { "character_id": character_id, "operation_type": operation_type, "file_path": file_path, "component": "audit_service_file_operation" }) @classmethod async def create_admin_session(cls, session_id: str, admin_user: str, expires_at: datetime, source_ip: str = None, user_agent: str = None) -> bool: """Create a new admin session""" try: async with get_db_session() as session: admin_session = AdminSession( session_id=session_id, admin_user=admin_user, expires_at=expires_at, source_ip=cls._validate_ip(source_ip), user_agent=user_agent, created_at=datetime.now(timezone.utc), last_activity=datetime.now(timezone.utc) ) session.add(admin_session) await session.commit() # Log the session creation await cls.log_security_event( event_type="admin_session_created", severity="info", source_ip=source_ip, user_identifier=admin_user, event_data={"session_id": session_id} ) return True except Exception as e: log_error_with_context(e, { "session_id": session_id, "admin_user": admin_user, "component": "audit_service_create_session" }) return False @classmethod async def update_session_activity(cls, session_id: str) -> bool: """Update session last activity""" try: async with get_db_session() as session: admin_session = await session.get(AdminSession, session_id) if admin_session and admin_session.is_active: admin_session.last_activity = datetime.now(timezone.utc) await session.commit() return True return False except Exception as e: log_error_with_context(e, { "session_id": session_id, "component": "audit_service_update_session" }) return False @classmethod async def invalidate_session(cls, session_id: str, reason: str = "logout"): """Invalidate an admin session""" try: async with get_db_session() as session: admin_session = await session.get(AdminSession, session_id) if admin_session: admin_session.is_active = False await session.commit() # Log the session invalidation await cls.log_security_event( event_type="admin_session_invalidated", severity="info", user_identifier=admin_session.admin_user, event_data={"session_id": session_id, "reason": reason} ) except Exception as e: log_error_with_context(e, { "session_id": session_id, "component": "audit_service_invalidate_session" }) @classmethod async def get_recent_admin_actions(cls, limit: int = 50, admin_user: str = None) -> List[Dict[str, Any]]: """Get recent admin actions""" try: async with get_db_session() as session: query = select(AdminAuditLog).order_by(desc(AdminAuditLog.timestamp)).limit(limit) if admin_user: query = query.where(AdminAuditLog.admin_user == admin_user) results = await session.scalars(query) return [ { "id": action.id, "admin_user": action.admin_user, "action_type": action.action_type, "resource_affected": action.resource_affected, "changes_made": action.changes_made, "timestamp": action.timestamp.isoformat(), "success": action.success, "error_message": action.error_message } for action in results ] except Exception as e: log_error_with_context(e, {"component": "audit_service_get_recent_actions"}) return [] @classmethod async def get_security_events(cls, limit: int = 50, severity: str = None, resolved: bool = None) -> List[Dict[str, Any]]: """Get security events""" try: async with get_db_session() as session: query = select(SecurityEvent).order_by(desc(SecurityEvent.timestamp)).limit(limit) if severity: query = query.where(SecurityEvent.severity == severity) if resolved is not None: query = query.where(SecurityEvent.resolved == resolved) results = await session.scalars(query) return [ { "id": event.id, "event_type": event.event_type, "severity": event.severity, "source_ip": event.source_ip, "user_identifier": event.user_identifier, "event_data": event.event_data, "timestamp": event.timestamp.isoformat(), "resolved": event.resolved, "resolution_notes": event.resolution_notes } for event in results ] except Exception as e: log_error_with_context(e, {"component": "audit_service_get_security_events"}) return [] @classmethod async def get_performance_metrics(cls, metric_name: str = None, component: str = None, limit: int = 100) -> List[Dict[str, Any]]: """Get performance metrics""" try: async with get_db_session() as session: query = select(PerformanceMetric).order_by(desc(PerformanceMetric.timestamp)).limit(limit) if metric_name: query = query.where(PerformanceMetric.metric_name == metric_name) if component: query = query.where(PerformanceMetric.component == component) results = await session.scalars(query) return [ { "id": metric.id, "metric_name": metric.metric_name, "metric_value": metric.metric_value, "metric_unit": metric.metric_unit, "character_id": metric.character_id, "component": metric.component, "timestamp": metric.timestamp.isoformat(), "additional_data": metric.additional_data } for metric in results ] except Exception as e: log_error_with_context(e, {"component": "audit_service_get_performance_metrics"}) return [] @classmethod async def cleanup_old_sessions(cls): """Clean up expired sessions""" try: async with get_db_session() as session: now = datetime.now(timezone.utc) # Get expired sessions expired_query = select(AdminSession).where( and_( AdminSession.expires_at < now, AdminSession.is_active == True ) ) expired_sessions = await session.scalars(expired_query) count = 0 for expired_session in expired_sessions: expired_session.is_active = False count += 1 await session.commit() if count > 0: logger.info(f"Cleaned up {count} expired admin sessions") return count except Exception as e: log_error_with_context(e, {"component": "audit_service_cleanup_sessions"}) return 0 @classmethod def _validate_ip(cls, ip_str: str) -> Optional[str]: """Validate and normalize IP address""" if not ip_str: return None try: # This will validate both IPv4 and IPv6 validated_ip = ip_address(ip_str) return str(validated_ip) except (AddressValueError, ValueError): logger.warning(f"Invalid IP address provided: {ip_str}") return ip_str # Return as-is for logging purposes @classmethod async def get_audit_summary(cls) -> Dict[str, Any]: """Get audit summary statistics""" try: async with get_db_session() as session: # Count admin actions in last 24 hours from datetime import timedelta yesterday = datetime.now(timezone.utc) - timedelta(days=1) admin_actions_count = await session.scalar( select(func.count(AdminAuditLog.id)).where(AdminAuditLog.timestamp >= yesterday) ) # Count unresolved security events unresolved_security_count = await session.scalar( select(func.count(SecurityEvent.id)).where(SecurityEvent.resolved == False) ) # Count critical security events in last 24 hours critical_security_count = await session.scalar( select(func.count(SecurityEvent.id)).where( and_( SecurityEvent.timestamp >= yesterday, SecurityEvent.severity == 'critical' ) ) ) # Count active sessions active_sessions_count = await session.scalar( select(func.count(AdminSession.id)).where(AdminSession.is_active == True) ) return { "admin_actions_24h": admin_actions_count or 0, "unresolved_security_events": unresolved_security_count or 0, "critical_security_events_24h": critical_security_count or 0, "active_admin_sessions": active_sessions_count or 0 } except Exception as e: log_error_with_context(e, {"component": "audit_service_get_summary"}) return { "admin_actions_24h": 0, "unresolved_security_events": 0, "critical_security_events_24h": 0, "active_admin_sessions": 0 }