Spaces:
Running
Running
| """Magentic-based orchestrator for DeepCritical. | |
| NOTE: Magentic mode currently requires OpenAI API keys. The MagenticBuilder's | |
| standard manager uses OpenAIChatClient. Anthropic support may be added when | |
| the agent_framework provides an AnthropicChatClient. | |
| """ | |
| from collections.abc import AsyncGenerator | |
| from typing import TYPE_CHECKING, Any | |
| import structlog | |
| if TYPE_CHECKING: | |
| from src.services.embeddings import EmbeddingService | |
| from agent_framework import ( | |
| MagenticAgentDeltaEvent, | |
| MagenticAgentMessageEvent, | |
| MagenticBuilder, | |
| MagenticFinalResultEvent, | |
| MagenticOrchestratorMessageEvent, | |
| WorkflowOutputEvent, | |
| ) | |
| from agent_framework.openai import OpenAIChatClient | |
| from src.agents.hypothesis_agent import HypothesisAgent | |
| from src.agents.judge_agent import JudgeAgent | |
| from src.agents.report_agent import ReportAgent | |
| from src.agents.search_agent import SearchAgent | |
| from src.orchestrator import JudgeHandlerProtocol, SearchHandlerProtocol | |
| from src.utils.config import settings | |
| from src.utils.exceptions import ConfigurationError | |
| from src.utils.models import AgentEvent, Evidence | |
| logger = structlog.get_logger() | |
| def _truncate(text: str, max_len: int = 100) -> str: | |
| """Truncate text with ellipsis only if needed.""" | |
| return f"{text[:max_len]}..." if len(text) > max_len else text | |
| class MagenticOrchestrator: | |
| """ | |
| Magentic-based orchestrator - same API as Orchestrator. | |
| Uses Microsoft Agent Framework's MagenticBuilder for multi-agent coordination. | |
| Note: | |
| Magentic mode requires OPENAI_API_KEY. The MagenticBuilder's standard | |
| manager currently only supports OpenAI. If you have only an Anthropic | |
| key, use the "simple" orchestrator mode instead. | |
| """ | |
| def __init__( | |
| self, | |
| search_handler: SearchHandlerProtocol, | |
| judge_handler: JudgeHandlerProtocol, | |
| max_rounds: int = 10, | |
| ) -> None: | |
| self._search_handler = search_handler | |
| self._judge_handler = judge_handler | |
| self._max_rounds = max_rounds | |
| self._evidence_store: dict[str, list[Evidence]] = {"current": []} | |
| def _init_embedding_service(self) -> "EmbeddingService | None": | |
| """Initialize embedding service if available.""" | |
| try: | |
| from src.services.embeddings import get_embedding_service | |
| service = get_embedding_service() | |
| logger.info("Embedding service enabled") | |
| return service | |
| except ImportError: | |
| logger.info("Embedding service not available (dependencies missing)") | |
| except Exception as e: | |
| logger.warning("Failed to initialize embedding service", error=str(e)) | |
| return None | |
| def _build_workflow( | |
| self, | |
| search_agent: SearchAgent, | |
| hypothesis_agent: HypothesisAgent, | |
| judge_agent: JudgeAgent, | |
| report_agent: ReportAgent, | |
| ) -> Any: | |
| """Build the Magentic workflow with participants.""" | |
| if not settings.openai_api_key: | |
| raise ConfigurationError( | |
| "Magentic mode requires OPENAI_API_KEY. " | |
| "Set the key or use mode='simple' with Anthropic." | |
| ) | |
| return ( | |
| MagenticBuilder() | |
| .participants( | |
| searcher=search_agent, | |
| hypothesizer=hypothesis_agent, | |
| judge=judge_agent, | |
| reporter=report_agent, | |
| ) | |
| .with_standard_manager( | |
| chat_client=OpenAIChatClient( | |
| model_id=settings.openai_model, api_key=settings.openai_api_key | |
| ), | |
| max_round_count=self._max_rounds, | |
| max_stall_count=3, | |
| max_reset_count=2, | |
| ) | |
| .build() | |
| ) | |
| def _format_task(self, query: str, has_embeddings: bool) -> str: | |
| """Format the task instruction for the manager.""" | |
| semantic_note = "" | |
| if has_embeddings: | |
| semantic_note = """ | |
| The system has semantic search enabled. When evidence is found: | |
| 1. Related concepts will be automatically surfaced | |
| 2. Duplicates are removed by meaning, not just URL | |
| 3. Use the surfaced related concepts to refine searches | |
| """ | |
| return f"""Research drug repurposing opportunities for: {query} | |
| {semantic_note} | |
| Workflow: | |
| 1. SearcherAgent: Find initial evidence from PubMed and web. SEND ONLY A SIMPLE KEYWORD QUERY. | |
| 2. HypothesisAgent: Generate mechanistic hypotheses (Drug -> Target -> Pathway -> Effect). | |
| 3. SearcherAgent: Use hypothesis-suggested queries for targeted search. | |
| 4. JudgeAgent: Evaluate if evidence supports hypotheses. | |
| 5. If sufficient -> ReportAgent: Generate structured research report. | |
| 6. If not sufficient -> Repeat from step 1 with refined queries. | |
| Focus on: | |
| - Identifying specific molecular targets | |
| - Understanding mechanism of action | |
| - Finding supporting/contradicting evidence for hypotheses | |
| The final output should be a complete research report with: | |
| - Executive summary | |
| - Methodology | |
| - Hypotheses tested | |
| - Mechanistic and clinical findings | |
| - Drug candidates | |
| - Limitations | |
| - Conclusion with references | |
| """ | |
| async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: | |
| """ | |
| Run the Magentic workflow - same API as simple Orchestrator. | |
| Yields AgentEvent objects for real-time UI updates. | |
| """ | |
| logger.info("Starting Magentic orchestrator", query=query) | |
| yield AgentEvent( | |
| type="started", | |
| message=f"Starting research (Magentic mode): {query}", | |
| iteration=0, | |
| ) | |
| # Initialize services and agents | |
| embedding_service = self._init_embedding_service() | |
| search_agent = SearchAgent( | |
| self._search_handler, self._evidence_store, embedding_service=embedding_service | |
| ) | |
| judge_agent = JudgeAgent(self._judge_handler, self._evidence_store) | |
| hypothesis_agent = HypothesisAgent( | |
| self._evidence_store, embedding_service=embedding_service | |
| ) | |
| report_agent = ReportAgent(self._evidence_store, embedding_service=embedding_service) | |
| # Build workflow and task | |
| workflow = self._build_workflow(search_agent, hypothesis_agent, judge_agent, report_agent) | |
| task = self._format_task(query, embedding_service is not None) | |
| iteration = 0 | |
| try: | |
| async for event in workflow.run_stream(task): | |
| agent_event = self._process_event(event, iteration) | |
| if agent_event: | |
| if isinstance(event, MagenticAgentMessageEvent): | |
| iteration += 1 | |
| yield agent_event | |
| except Exception as e: | |
| logger.error("Magentic workflow failed", error=str(e)) | |
| yield AgentEvent( | |
| type="error", | |
| message=f"Workflow error: {e!s}", | |
| iteration=iteration, | |
| ) | |
| def _process_event(self, event: Any, iteration: int) -> AgentEvent | None: | |
| """Process a workflow event and return an AgentEvent if applicable.""" | |
| if isinstance(event, MagenticOrchestratorMessageEvent): | |
| message_text = ( | |
| event.message.text if event.message and hasattr(event.message, "text") else "" | |
| ) | |
| kind = getattr(event, "kind", "manager") | |
| if message_text: | |
| return AgentEvent( | |
| type="judging", | |
| message=f"Manager ({kind}): {_truncate(message_text)}", | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, MagenticAgentMessageEvent): | |
| agent_name = event.agent_id or "unknown" | |
| msg_text = ( | |
| event.message.text if event.message and hasattr(event.message, "text") else "" | |
| ) | |
| return self._agent_message_event(agent_name, msg_text, iteration + 1) | |
| elif isinstance(event, MagenticFinalResultEvent): | |
| final_text = ( | |
| event.message.text | |
| if event.message and hasattr(event.message, "text") | |
| else "No result" | |
| ) | |
| return AgentEvent( | |
| type="complete", | |
| message=final_text, | |
| data={"iterations": iteration}, | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, MagenticAgentDeltaEvent): | |
| if event.text: | |
| return AgentEvent( | |
| type="streaming", | |
| message=event.text, | |
| data={"agent_id": event.agent_id}, | |
| iteration=iteration, | |
| ) | |
| elif isinstance(event, WorkflowOutputEvent): | |
| if event.data: | |
| return AgentEvent( | |
| type="complete", | |
| message=str(event.data), | |
| iteration=iteration, | |
| ) | |
| return None | |
| def _agent_message_event(self, agent_name: str, msg_text: str, iteration: int) -> AgentEvent: | |
| """Create an AgentEvent for an agent message.""" | |
| if "search" in agent_name.lower(): | |
| return AgentEvent( | |
| type="search_complete", | |
| message=f"Search agent: {_truncate(msg_text)}", | |
| iteration=iteration, | |
| ) | |
| elif "hypothes" in agent_name.lower(): | |
| return AgentEvent( | |
| type="hypothesizing", | |
| message=f"Hypothesis agent: {_truncate(msg_text)}", | |
| iteration=iteration, | |
| ) | |
| elif "judge" in agent_name.lower(): | |
| return AgentEvent( | |
| type="judge_complete", | |
| message=f"Judge agent: {_truncate(msg_text)}", | |
| iteration=iteration, | |
| ) | |
| elif "report" in agent_name.lower(): | |
| return AgentEvent( | |
| type="synthesizing", | |
| message=f"Report agent: {_truncate(msg_text)}" if msg_text else "Report generated.", | |
| iteration=iteration, | |
| ) | |
| return AgentEvent( | |
| type="judging", | |
| message=f"{agent_name}: {_truncate(msg_text)}", | |
| iteration=iteration, | |
| ) | |