DeepCritical / src /orchestrator.py
VibecoderMcSwaggins's picture
feat: implement phase 13 modal pipeline integration
7cc8b69
raw
history blame
13.3 kB
"""Orchestrator - the agent loop connecting Search and Judge."""
import asyncio
from collections.abc import AsyncGenerator
from typing import Any, Protocol
import structlog
from src.utils.config import settings
from src.utils.models import (
AgentEvent,
Evidence,
JudgeAssessment,
OrchestratorConfig,
SearchResult,
)
logger = structlog.get_logger()
class SearchHandlerProtocol(Protocol):
"""Protocol for search handler."""
async def execute(self, query: str, max_results_per_tool: int = 10) -> SearchResult: ...
class JudgeHandlerProtocol(Protocol):
"""Protocol for judge handler."""
async def assess(self, question: str, evidence: list[Evidence]) -> JudgeAssessment: ...
class Orchestrator:
"""
The agent orchestrator - runs the Search -> Judge -> Loop cycle.
This is a generator-based design that yields events for real-time UI updates.
"""
def __init__(
self,
search_handler: SearchHandlerProtocol,
judge_handler: JudgeHandlerProtocol,
config: OrchestratorConfig | None = None,
enable_analysis: bool = False,
):
"""
Initialize the orchestrator.
Args:
search_handler: Handler for executing searches
judge_handler: Handler for assessing evidence
config: Optional configuration (uses defaults if not provided)
enable_analysis: Whether to perform statistical analysis (if Modal available)
"""
self.search = search_handler
self.judge = judge_handler
self.config = config or OrchestratorConfig()
self.history: list[dict[str, Any]] = []
self._enable_analysis = enable_analysis and settings.modal_available
# Lazy-load analysis (NO agent_framework dependency!)
self._analyzer: Any = None
def _get_analyzer(self) -> Any:
"""Lazy initialization of StatisticalAnalyzer.
Note: This imports from src.services, NOT src.agents,
so it works without the magentic optional dependency.
"""
if self._analyzer is None:
from src.services.statistical_analyzer import get_statistical_analyzer
self._analyzer = get_statistical_analyzer()
return self._analyzer
async def _run_analysis_phase(
self, query: str, evidence: list[Evidence], iteration: int
) -> AsyncGenerator[AgentEvent, None]:
"""Run the optional analysis phase."""
if not self._enable_analysis:
return
yield AgentEvent(
type="analyzing",
message="Running statistical analysis in Modal sandbox...",
data={},
iteration=iteration,
)
try:
analyzer = self._get_analyzer()
# Run Modal analysis (no agent_framework needed!)
analysis_result = await analyzer.analyze(
query=query,
evidence=evidence,
hypothesis=None, # Could add hypothesis generation later
)
yield AgentEvent(
type="analysis_complete",
message=f"Analysis verdict: {analysis_result.verdict}",
data=analysis_result.model_dump(),
iteration=iteration,
)
except Exception as e:
logger.error("Modal analysis failed", error=str(e))
yield AgentEvent(
type="error",
message=f"Modal analysis failed: {e}",
data={"error": str(e)},
iteration=iteration,
)
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
"""
Run the agent loop for a query.
Yields AgentEvent objects for each step, allowing real-time UI updates.
Args:
query: The user's research question
Yields:
AgentEvent objects for each step of the process
"""
logger.info("Starting orchestrator", query=query)
yield AgentEvent(
type="started",
message=f"Starting research for: {query}",
iteration=0,
)
all_evidence: list[Evidence] = []
current_queries = [query]
iteration = 0
while iteration < self.config.max_iterations:
iteration += 1
logger.info("Iteration", iteration=iteration, queries=current_queries)
# === SEARCH PHASE ===
yield AgentEvent(
type="searching",
message=f"Searching for: {', '.join(current_queries[:3])}...",
iteration=iteration,
)
try:
# Execute searches for all current queries
search_tasks = [
self.search.execute(q, self.config.max_results_per_tool)
for q in current_queries[:3] # Limit to 3 queries per iteration
]
search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
# Collect evidence from successful searches
new_evidence: list[Evidence] = []
errors: list[str] = []
for q, result in zip(current_queries[:3], search_results, strict=False):
if isinstance(result, Exception):
errors.append(f"Search for '{q}' failed: {result!s}")
elif isinstance(result, SearchResult):
new_evidence.extend(result.evidence)
errors.extend(result.errors)
else:
# Should not happen with return_exceptions=True but safe fallback
errors.append(f"Unknown result type for '{q}': {type(result)}")
# Deduplicate evidence by URL
seen_urls = {e.citation.url for e in all_evidence}
unique_new = [e for e in new_evidence if e.citation.url not in seen_urls]
all_evidence.extend(unique_new)
yield AgentEvent(
type="search_complete",
message=f"Found {len(unique_new)} new sources ({len(all_evidence)} total)",
data={
"new_count": len(unique_new),
"total_count": len(all_evidence),
},
iteration=iteration,
)
if errors:
logger.warning("Search errors", errors=errors)
except Exception as e:
logger.error("Search phase failed", error=str(e))
yield AgentEvent(
type="error",
message=f"Search failed: {e!s}",
iteration=iteration,
)
continue
# === JUDGE PHASE ===
yield AgentEvent(
type="judging",
message=f"Evaluating {len(all_evidence)} sources...",
iteration=iteration,
)
try:
assessment = await self.judge.assess(query, all_evidence)
yield AgentEvent(
type="judge_complete",
message=(
f"Assessment: {assessment.recommendation} "
f"(confidence: {assessment.confidence:.0%})"
),
data={
"sufficient": assessment.sufficient,
"confidence": assessment.confidence,
"mechanism_score": assessment.details.mechanism_score,
"clinical_score": assessment.details.clinical_evidence_score,
},
iteration=iteration,
)
# Record this iteration in history
self.history.append(
{
"iteration": iteration,
"queries": current_queries,
"evidence_count": len(all_evidence),
"assessment": assessment.model_dump(),
}
)
# === DECISION PHASE ===
if assessment.sufficient and assessment.recommendation == "synthesize":
# Optional Analysis Phase
async for event in self._run_analysis_phase(query, all_evidence, iteration):
yield event
yield AgentEvent(
type="synthesizing",
message="Evidence sufficient! Preparing synthesis...",
iteration=iteration,
)
# Generate final response
final_response = self._generate_synthesis(query, all_evidence, assessment)
yield AgentEvent(
type="complete",
message=final_response,
data={
"evidence_count": len(all_evidence),
"iterations": iteration,
"drug_candidates": assessment.details.drug_candidates,
"key_findings": assessment.details.key_findings,
},
iteration=iteration,
)
return
else:
# Need more evidence - prepare next queries
current_queries = assessment.next_search_queries or [
f"{query} mechanism of action",
f"{query} clinical evidence",
]
yield AgentEvent(
type="looping",
message=(
f"Need more evidence. "
f"Next searches: {', '.join(current_queries[:2])}..."
),
data={"next_queries": current_queries},
iteration=iteration,
)
except Exception as e:
logger.error("Judge phase failed", error=str(e))
yield AgentEvent(
type="error",
message=f"Assessment failed: {e!s}",
iteration=iteration,
)
continue
# Max iterations reached
yield AgentEvent(
type="complete",
message=self._generate_partial_synthesis(query, all_evidence),
data={
"evidence_count": len(all_evidence),
"iterations": iteration,
"max_reached": True,
},
iteration=iteration,
)
def _generate_synthesis(
self,
query: str,
evidence: list[Evidence],
assessment: JudgeAssessment,
) -> str:
"""
Generate the final synthesis response.
Args:
query: The original question
evidence: All collected evidence
assessment: The final assessment
Returns:
Formatted synthesis as markdown
"""
drug_list = (
"\n".join([f"- **{d}**" for d in assessment.details.drug_candidates])
or "- No specific candidates identified"
)
findings_list = (
"\n".join([f"- {f}" for f in assessment.details.key_findings]) or "- See evidence below"
)
citations = "\n".join(
[
f"{i + 1}. [{e.citation.title}]({e.citation.url}) "
f"({e.citation.source.upper()}, {e.citation.date})"
for i, e in enumerate(evidence[:10]) # Limit to 10 citations
]
)
return f"""## Drug Repurposing Analysis
### Question
{query}
### Drug Candidates
{drug_list}
### Key Findings
{findings_list}
### Assessment
- **Mechanism Score**: {assessment.details.mechanism_score}/10
- **Clinical Evidence Score**: {assessment.details.clinical_evidence_score}/10
- **Confidence**: {assessment.confidence:.0%}
### Reasoning
{assessment.reasoning}
### Citations ({len(evidence)} sources)
{citations}
---
*Analysis based on {len(evidence)} sources across {len(self.history)} iterations.*
"""
def _generate_partial_synthesis(
self,
query: str,
evidence: list[Evidence],
) -> str:
"""
Generate a partial synthesis when max iterations reached.
Args:
query: The original question
evidence: All collected evidence
Returns:
Formatted partial synthesis as markdown
"""
citations = "\n".join(
[
f"{i + 1}. [{e.citation.title}]({e.citation.url}) ({e.citation.source.upper()})"
for i, e in enumerate(evidence[:10])
]
)
return f"""## Partial Analysis (Max Iterations Reached)
### Question
{query}
### Status
Maximum search iterations reached. The evidence gathered may be incomplete.
### Evidence Collected
Found {len(evidence)} sources. Consider refining your query for more specific results.
### Citations
{citations}
---
*Consider searching with more specific terms or drug names.*
"""