#!/usr/bin/env python3 """ Test the creative collaboration system without Discord """ import asyncio import sys import os from pathlib import Path # Add src to path sys.path.insert(0, str(Path(__file__).parent / "src")) # Set environment variables for testing os.environ['DATABASE_URL'] = 'sqlite+aiosqlite:///fishbowl_test.db' os.environ['ENVIRONMENT'] = 'development' async def test_creative_system(): """Test the creative collaboration system""" print("šŸŽØ Testing Creative Collaboration System") print("=" * 60) try: # Import required modules from collaboration.creative_projects import CollaborativeCreativeManager from rag.vector_store import VectorStoreManager from rag.memory_sharing import MemorySharingManager from database.connection import init_database, create_tables print("āœ… Imports successful") # Initialize database print("\nšŸ—„ļø Initializing database...") await init_database() await create_tables() print("āœ… Database initialized") # Initialize vector store print("\n🧠 Initializing vector store...") vector_store = VectorStoreManager("./data/vector_stores") character_names = ["Alex", "Sage", "Luna", "Echo"] await vector_store.initialize(character_names) print("āœ… Vector store initialized") # Initialize memory sharing print("\nšŸ¤ Initializing memory sharing...") memory_sharing = MemorySharingManager(vector_store) await memory_sharing.initialize(character_names) print("āœ… Memory sharing initialized") # Initialize creative collaboration print("\nšŸŽ­ Initializing creative collaboration...") creative_manager = CollaborativeCreativeManager(vector_store, memory_sharing) await creative_manager.initialize(character_names) print("āœ… Creative collaboration initialized") # Test project creation print("\nšŸ“ Testing project creation...") project_data = { "title": "Test Creative Project", "description": "A test project to verify the creative collaboration system works", "project_type": "story", "target_collaborators": ["Sage", "Luna"], "goals": ["Test system functionality", "Verify data persistence"], "estimated_duration": "test" } success, message = await creative_manager.propose_project("Alex", project_data) if success: print(f"āœ… Project created: {message}") # Get active projects active_projects = await creative_manager.get_active_projects("Alex") print(f"āœ… Found {len(active_projects)} active projects") if active_projects: project = active_projects[0] print(f" Project: {project.title}") print(f" Status: {project.status.value}") print(f" Collaborators: {len(project.collaborators)}") # Test contribution print("\nāœļø Testing project contribution...") contribution_data = { "content": "This is a test contribution to verify the system works properly.", "contribution_type": "content", "metadata": {"test": True} } contrib_success, contrib_message = await creative_manager.contribute_to_project( "Alex", project.id, contribution_data ) if contrib_success: print(f"āœ… Contribution added: {contrib_message}") else: print(f"āŒ Contribution failed: {contrib_message}") else: print(f"āŒ Project creation failed: {message}") # Test project suggestions print("\nšŸ’” Testing project suggestions...") suggestions = await creative_manager.get_project_suggestions("Luna") print(f"āœ… Generated {len(suggestions)} project suggestions for Luna") for i, suggestion in enumerate(suggestions[:2], 1): print(f" {i}. {suggestion['title']} ({suggestion['project_type']})") print("\nšŸŽ‰ Creative collaboration system test completed successfully!") return True except Exception as e: print(f"āŒ Test failed: {e}") import traceback traceback.print_exc() return False async def main(): success = await test_creative_system() if success: print("\nšŸš€ Creative collaboration system is ready!") print("\nThe system includes:") print("• Trust-based memory sharing between characters") print("• Collaborative creative project management") print("• MCP tools for autonomous character operation") print("• Database persistence for all project data") print("• Project analytics and health monitoring") else: print("\nšŸ’„ Creative collaboration system needs attention") return success if __name__ == "__main__": result = asyncio.run(main()) sys.exit(0 if result else 1)