# Phase 4 Implementation Spec: Orchestrator & UI **Goal**: Connect the Brain and the Body, then give it a Face. **Philosophy**: "Streaming is Trust." **Prerequisite**: Phase 3 complete (all judge tests passing) --- ## 1. The Slice Definition This slice connects: 1. **Orchestrator**: The state machine (While loop) calling Search -> Judge. 2. **UI**: Gradio interface that visualizes the loop. **Files to Create/Modify**: - `src/orchestrator.py` - Agent loop logic - `src/app.py` - Gradio UI - `tests/unit/test_orchestrator.py` - Unit tests - `Dockerfile` - Container for deployment - `README.md` - Usage instructions (update) --- ## 2. Agent Events (`src/utils/models.py`) Add event types for streaming UI updates: ```python """Add to src/utils/models.py (after JudgeAssessment models).""" from pydantic import BaseModel, Field from typing import Literal, Any from datetime import datetime class AgentEvent(BaseModel): """Event emitted by the orchestrator for UI streaming.""" type: Literal[ "started", "searching", "search_complete", "judging", "judge_complete", "looping", "synthesizing", "complete", "error", ] message: str data: Any = None timestamp: datetime = Field(default_factory=datetime.now) iteration: int = 0 def to_markdown(self) -> str: """Format event as markdown for chat display.""" icons = { "started": "🚀", "searching": "🔍", "search_complete": "📚", "judging": "🧠", "judge_complete": "✅", "looping": "🔄", "synthesizing": "📝", "complete": "🎉", "error": "❌", } icon = icons.get(self.type, "â€ĸ") return f"{icon} **{self.type.upper()}**: {self.message}" class OrchestratorConfig(BaseModel): """Configuration for the orchestrator.""" max_iterations: int = Field(default=5, ge=1, le=10) max_results_per_tool: int = Field(default=10, ge=1, le=50) search_timeout: float = Field(default=30.0, ge=5.0, le=120.0) ``` --- ## 3. The Orchestrator (`src/orchestrator.py`) This is the "Agent" logic — the while loop that drives search and judgment. ```python """Orchestrator - the agent loop connecting Search and Judge.""" import asyncio from typing import AsyncGenerator, List, Protocol import structlog from src.utils.models import ( Evidence, SearchResult, JudgeAssessment, AgentEvent, OrchestratorConfig, ) logger = structlog.get_logger() class SearchHandlerProtocol(Protocol): """Protocol for search handler.""" async def execute(self, query: str, max_results_per_tool: int = 10) -> SearchResult: ... class JudgeHandlerProtocol(Protocol): """Protocol for judge handler.""" async def assess(self, question: str, evidence: List[Evidence]) -> JudgeAssessment: ... class Orchestrator: """ The agent orchestrator - runs the Search -> Judge -> Loop cycle. This is a generator-based design that yields events for real-time UI updates. """ def __init__( self, search_handler: SearchHandlerProtocol, judge_handler: JudgeHandlerProtocol, config: OrchestratorConfig | None = None, ): """ Initialize the orchestrator. Args: search_handler: Handler for executing searches judge_handler: Handler for assessing evidence config: Optional configuration (uses defaults if not provided) """ self.search = search_handler self.judge = judge_handler self.config = config or OrchestratorConfig() self.history: List[dict] = [] async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: """ Run the agent loop for a query. Yields AgentEvent objects for each step, allowing real-time UI updates. Args: query: The user's research question Yields: AgentEvent objects for each step of the process """ logger.info("Starting orchestrator", query=query) yield AgentEvent( type="started", message=f"Starting research for: {query}", iteration=0, ) all_evidence: List[Evidence] = [] current_queries = [query] iteration = 0 while iteration < self.config.max_iterations: iteration += 1 logger.info("Iteration", iteration=iteration, queries=current_queries) # === SEARCH PHASE === yield AgentEvent( type="searching", message=f"Searching for: {', '.join(current_queries[:3])}...", iteration=iteration, ) try: # Execute searches for all current queries search_tasks = [ self.search.execute(q, self.config.max_results_per_tool) for q in current_queries[:3] # Limit to 3 queries per iteration ] search_results = await asyncio.gather(*search_tasks, return_exceptions=True) # Collect evidence from successful searches new_evidence: List[Evidence] = [] errors: List[str] = [] for q, result in zip(current_queries[:3], search_results): if isinstance(result, Exception): errors.append(f"Search for '{q}' failed: {str(result)}") else: new_evidence.extend(result.evidence) errors.extend(result.errors) # Deduplicate evidence by URL seen_urls = {e.citation.url for e in all_evidence} unique_new = [e for e in new_evidence if e.citation.url not in seen_urls] all_evidence.extend(unique_new) yield AgentEvent( type="search_complete", message=f"Found {len(unique_new)} new sources ({len(all_evidence)} total)", data={"new_count": len(unique_new), "total_count": len(all_evidence)}, iteration=iteration, ) if errors: logger.warning("Search errors", errors=errors) except Exception as e: logger.error("Search phase failed", error=str(e)) yield AgentEvent( type="error", message=f"Search failed: {str(e)}", iteration=iteration, ) continue # === JUDGE PHASE === yield AgentEvent( type="judging", message=f"Evaluating {len(all_evidence)} sources...", iteration=iteration, ) try: assessment = await self.judge.assess(query, all_evidence) yield AgentEvent( type="judge_complete", message=f"Assessment: {assessment.recommendation} (confidence: {assessment.confidence:.0%})", data={ "sufficient": assessment.sufficient, "confidence": assessment.confidence, "mechanism_score": assessment.details.mechanism_score, "clinical_score": assessment.details.clinical_evidence_score, }, iteration=iteration, ) # Record this iteration in history self.history.append({ "iteration": iteration, "queries": current_queries, "evidence_count": len(all_evidence), "assessment": assessment.model_dump(), }) # === DECISION PHASE === if assessment.sufficient and assessment.recommendation == "synthesize": yield AgentEvent( type="synthesizing", message="Evidence sufficient! Preparing synthesis...", iteration=iteration, ) # Generate final response final_response = self._generate_synthesis(query, all_evidence, assessment) yield AgentEvent( type="complete", message=final_response, data={ "evidence_count": len(all_evidence), "iterations": iteration, "drug_candidates": assessment.details.drug_candidates, "key_findings": assessment.details.key_findings, }, iteration=iteration, ) return else: # Need more evidence - prepare next queries current_queries = assessment.next_search_queries or [ f"{query} mechanism of action", f"{query} clinical evidence", ] yield AgentEvent( type="looping", message=f"Need more evidence. Next searches: {', '.join(current_queries[:2])}...", data={"next_queries": current_queries}, iteration=iteration, ) except Exception as e: logger.error("Judge phase failed", error=str(e)) yield AgentEvent( type="error", message=f"Assessment failed: {str(e)}", iteration=iteration, ) continue # Max iterations reached yield AgentEvent( type="complete", message=self._generate_partial_synthesis(query, all_evidence), data={ "evidence_count": len(all_evidence), "iterations": iteration, "max_reached": True, }, iteration=iteration, ) def _generate_synthesis( self, query: str, evidence: List[Evidence], assessment: JudgeAssessment, ) -> str: """ Generate the final synthesis response. Args: query: The original question evidence: All collected evidence assessment: The final assessment Returns: Formatted synthesis as markdown """ drug_list = "\n".join([f"- **{d}**" for d in assessment.details.drug_candidates]) or "- No specific candidates identified" findings_list = "\n".join([f"- {f}" for f in assessment.details.key_findings]) or "- See evidence below" citations = "\n".join([ f"{i+1}. [{e.citation.title}]({e.citation.url}) ({e.citation.source.upper()}, {e.citation.date})" for i, e in enumerate(evidence[:10]) # Limit to 10 citations ]) return f"""## Drug Repurposing Analysis ### Question {query} ### Drug Candidates {drug_list} ### Key Findings {findings_list} ### Assessment - **Mechanism Score**: {assessment.details.mechanism_score}/10 - **Clinical Evidence Score**: {assessment.details.clinical_evidence_score}/10 - **Confidence**: {assessment.confidence:.0%} ### Reasoning {assessment.reasoning} ### Citations ({len(evidence)} sources) {citations} --- *Analysis based on {len(evidence)} sources across {len(self.history)} iterations.* """ def _generate_partial_synthesis( self, query: str, evidence: List[Evidence], ) -> str: """ Generate a partial synthesis when max iterations reached. Args: query: The original question evidence: All collected evidence Returns: Formatted partial synthesis as markdown """ citations = "\n".join([ f"{i+1}. [{e.citation.title}]({e.citation.url}) ({e.citation.source.upper()})" for i, e in enumerate(evidence[:10]) ]) return f"""## Partial Analysis (Max Iterations Reached) ### Question {query} ### Status Maximum search iterations reached. The evidence gathered may be incomplete. ### Evidence Collected Found {len(evidence)} sources. Consider refining your query for more specific results. ### Citations {citations} --- *Consider searching with more specific terms or drug names.* """ ``` --- ## 4. The Gradio UI (`src/app.py`) Using Gradio 5 generator pattern for real-time streaming. ```python """Gradio UI for DeepCritical agent.""" import asyncio import gradio as gr from typing import AsyncGenerator from src.orchestrator import Orchestrator from src.tools.pubmed import PubMedTool from src.tools.websearch import WebTool from src.tools.search_handler import SearchHandler from src.agent_factory.judges import JudgeHandler, MockJudgeHandler from src.utils.models import OrchestratorConfig, AgentEvent def create_orchestrator(use_mock: bool = False) -> Orchestrator: """ Create an orchestrator instance. Args: use_mock: If True, use MockJudgeHandler (no API key needed) Returns: Configured Orchestrator instance """ # Create search tools search_handler = SearchHandler( tools=[PubMedTool(), WebTool()], timeout=30.0, ) # Create judge (mock or real) if use_mock: judge_handler = MockJudgeHandler() else: judge_handler = JudgeHandler() # Create orchestrator config = OrchestratorConfig( max_iterations=5, max_results_per_tool=10, ) return Orchestrator( search_handler=search_handler, judge_handler=judge_handler, config=config, ) async def research_agent( message: str, history: list[dict], ) -> AsyncGenerator[str, None]: """ Gradio chat function that runs the research agent. Args: message: User's research question history: Chat history (Gradio format) Yields: Markdown-formatted responses for streaming """ if not message.strip(): yield "Please enter a research question." return # Create orchestrator (use mock if no API key) import os use_mock = not (os.getenv("OPENAI_API_KEY") or os.getenv("ANTHROPIC_API_KEY")) orchestrator = create_orchestrator(use_mock=use_mock) # Run the agent and stream events response_parts = [] try: async for event in orchestrator.run(message): # Format event as markdown event_md = event.to_markdown() response_parts.append(event_md) # If complete, show full response if event.type == "complete": yield event.message else: # Show progress yield "\n\n".join(response_parts) except Exception as e: yield f"❌ **Error**: {str(e)}" def create_demo() -> gr.Blocks: """ Create the Gradio demo interface. Returns: Configured Gradio Blocks interface """ with gr.Blocks( title="DeepCritical - Drug Repurposing Research Agent", theme=gr.themes.Soft(), ) as demo: gr.Markdown(""" # đŸ§Ŧ DeepCritical ## AI-Powered Drug Repurposing Research Agent Ask questions about potential drug repurposing opportunities. The agent will search PubMed and the web, evaluate evidence, and provide recommendations. **Example questions:** - "What drugs could be repurposed for Alzheimer's disease?" - "Is metformin effective for cancer treatment?" - "What existing medications show promise for Long COVID?" """) chatbot = gr.ChatInterface( fn=research_agent, type="messages", title="", examples=[ "What drugs could be repurposed for Alzheimer's disease?", "Is metformin effective for treating cancer?", "What medications show promise for Long COVID treatment?", "Can statins be repurposed for neurological conditions?", ], retry_btn="🔄 Retry", undo_btn="â†Šī¸ Undo", clear_btn="đŸ—‘ī¸ Clear", ) gr.Markdown(""" --- **Note**: This is a research tool and should not be used for medical decisions. Always consult healthcare professionals for medical advice. Built with 🤖 PydanticAI + đŸ”Ŧ PubMed + đŸĻ† DuckDuckGo """) return demo def main(): """Run the Gradio app.""" demo = create_demo() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, ) if __name__ == "__main__": main() ``` --- ## 5. TDD Workflow ### Test File: `tests/unit/test_orchestrator.py` ```python """Unit tests for Orchestrator.""" import pytest from unittest.mock import AsyncMock, MagicMock from src.utils.models import ( Evidence, Citation, SearchResult, JudgeAssessment, AssessmentDetails, OrchestratorConfig, ) class TestOrchestrator: """Tests for Orchestrator.""" @pytest.fixture def mock_search_handler(self): """Create a mock search handler.""" handler = AsyncMock() handler.execute = AsyncMock(return_value=SearchResult( query="test", evidence=[ Evidence( content="Test content", citation=Citation( source="pubmed", title="Test Title", url="https://pubmed.ncbi.nlm.nih.gov/12345/", date="2024-01-01", ), ), ], sources_searched=["pubmed"], total_found=1, errors=[], )) return handler @pytest.fixture def mock_judge_sufficient(self): """Create a mock judge that returns sufficient.""" handler = AsyncMock() handler.assess = AsyncMock(return_value=JudgeAssessment( details=AssessmentDetails( mechanism_score=8, mechanism_reasoning="Good mechanism", clinical_evidence_score=7, clinical_reasoning="Good clinical", drug_candidates=["Drug A"], key_findings=["Finding 1"], ), sufficient=True, confidence=0.85, recommendation="synthesize", next_search_queries=[], reasoning="Evidence is sufficient", )) return handler @pytest.fixture def mock_judge_insufficient(self): """Create a mock judge that returns insufficient.""" handler = AsyncMock() handler.assess = AsyncMock(return_value=JudgeAssessment( details=AssessmentDetails( mechanism_score=4, mechanism_reasoning="Weak mechanism", clinical_evidence_score=3, clinical_reasoning="Weak clinical", drug_candidates=[], key_findings=[], ), sufficient=False, confidence=0.3, recommendation="continue", next_search_queries=["more specific query"], reasoning="Need more evidence", )) return handler @pytest.mark.asyncio async def test_orchestrator_completes_with_sufficient_evidence( self, mock_search_handler, mock_judge_sufficient, ): """Orchestrator should complete when evidence is sufficient.""" from src.orchestrator import Orchestrator config = OrchestratorConfig(max_iterations=5) orchestrator = Orchestrator( search_handler=mock_search_handler, judge_handler=mock_judge_sufficient, config=config, ) events = [] async for event in orchestrator.run("test query"): events.append(event) # Should have started, searched, judged, and completed event_types = [e.type for e in events] assert "started" in event_types assert "searching" in event_types assert "search_complete" in event_types assert "judging" in event_types assert "judge_complete" in event_types assert "complete" in event_types # Should only have 1 iteration complete_event = [e for e in events if e.type == "complete"][0] assert complete_event.iteration == 1 @pytest.mark.asyncio async def test_orchestrator_loops_when_insufficient( self, mock_search_handler, mock_judge_insufficient, ): """Orchestrator should loop when evidence is insufficient.""" from src.orchestrator import Orchestrator config = OrchestratorConfig(max_iterations=3) orchestrator = Orchestrator( search_handler=mock_search_handler, judge_handler=mock_judge_insufficient, config=config, ) events = [] async for event in orchestrator.run("test query"): events.append(event) # Should have looping events event_types = [e.type for e in events] assert event_types.count("looping") >= 2 # At least 2 loop events # Should hit max iterations complete_event = [e for e in events if e.type == "complete"][0] assert complete_event.data.get("max_reached") is True @pytest.mark.asyncio async def test_orchestrator_respects_max_iterations( self, mock_search_handler, mock_judge_insufficient, ): """Orchestrator should stop at max_iterations.""" from src.orchestrator import Orchestrator config = OrchestratorConfig(max_iterations=2) orchestrator = Orchestrator( search_handler=mock_search_handler, judge_handler=mock_judge_insufficient, config=config, ) events = [] async for event in orchestrator.run("test query"): events.append(event) # Should have exactly 2 iterations max_iteration = max(e.iteration for e in events) assert max_iteration == 2 @pytest.mark.asyncio async def test_orchestrator_handles_search_error(self): """Orchestrator should handle search errors gracefully.""" from src.orchestrator import Orchestrator mock_search = AsyncMock() mock_search.execute = AsyncMock(side_effect=Exception("Search failed")) mock_judge = AsyncMock() mock_judge.assess = AsyncMock(return_value=JudgeAssessment( details=AssessmentDetails( mechanism_score=0, mechanism_reasoning="N/A", clinical_evidence_score=0, clinical_reasoning="N/A", drug_candidates=[], key_findings=[], ), sufficient=False, confidence=0.0, recommendation="continue", next_search_queries=["retry query"], reasoning="Search failed", )) config = OrchestratorConfig(max_iterations=2) orchestrator = Orchestrator( search_handler=mock_search, judge_handler=mock_judge, config=config, ) events = [] async for event in orchestrator.run("test query"): events.append(event) # Should have error events event_types = [e.type for e in events] assert "error" in event_types @pytest.mark.asyncio async def test_orchestrator_deduplicates_evidence(self, mock_judge_insufficient): """Orchestrator should deduplicate evidence by URL.""" from src.orchestrator import Orchestrator # Search returns same evidence each time duplicate_evidence = Evidence( content="Duplicate content", citation=Citation( source="pubmed", title="Same Title", url="https://pubmed.ncbi.nlm.nih.gov/12345/", # Same URL date="2024-01-01", ), ) mock_search = AsyncMock() mock_search.execute = AsyncMock(return_value=SearchResult( query="test", evidence=[duplicate_evidence], sources_searched=["pubmed"], total_found=1, errors=[], )) config = OrchestratorConfig(max_iterations=2) orchestrator = Orchestrator( search_handler=mock_search, judge_handler=mock_judge_insufficient, config=config, ) events = [] async for event in orchestrator.run("test query"): events.append(event) # Second search_complete should show 0 new evidence search_complete_events = [e for e in events if e.type == "search_complete"] assert len(search_complete_events) == 2 # First iteration should have 1 new assert search_complete_events[0].data["new_count"] == 1 # Second iteration should have 0 new (duplicate) assert search_complete_events[1].data["new_count"] == 0 class TestAgentEvent: """Tests for AgentEvent.""" def test_to_markdown(self): """AgentEvent should format to markdown correctly.""" from src.utils.models import AgentEvent event = AgentEvent( type="searching", message="Searching for: metformin alzheimer", iteration=1, ) md = event.to_markdown() assert "🔍" in md assert "SEARCHING" in md assert "metformin alzheimer" in md def test_complete_event_icon(self): """Complete event should have celebration icon.""" from src.utils.models import AgentEvent event = AgentEvent( type="complete", message="Done!", iteration=3, ) md = event.to_markdown() assert "🎉" in md ``` --- ## 6. Dockerfile ```dockerfile # Dockerfile for DeepCritical FROM python:3.11-slim # Set working directory WORKDIR /app # Install system dependencies RUN apt-get update && apt-get install -y \ git \ && rm -rf /var/lib/apt/lists/* # Install uv RUN pip install uv # Copy project files COPY pyproject.toml . COPY src/ src/ # Install dependencies RUN uv pip install --system . # Expose port EXPOSE 7860 # Set environment variables ENV GRADIO_SERVER_NAME=0.0.0.0 ENV GRADIO_SERVER_PORT=7860 # Run the app CMD ["python", "-m", "src.app"] ``` --- ## 7. HuggingFace Spaces Configuration Create `README.md` header for HuggingFace Spaces: ```markdown --- title: DeepCritical emoji: đŸ§Ŧ colorFrom: blue colorTo: purple sdk: gradio sdk_version: 5.0.0 app_file: src/app.py pinned: false license: mit --- # DeepCritical AI-Powered Drug Repurposing Research Agent ``` --- ## 8. Implementation Checklist - [ ] Add `AgentEvent` and `OrchestratorConfig` models to `src/utils/models.py` - [ ] Implement `src/orchestrator.py` with full Orchestrator class - [ ] Implement `src/app.py` with Gradio interface - [ ] Create `tests/unit/test_orchestrator.py` with all tests - [ ] Create `Dockerfile` for deployment - [ ] Update project `README.md` with usage instructions - [ ] Run `uv run pytest tests/unit/test_orchestrator.py -v` — **ALL TESTS MUST PASS** - [ ] Test locally: `uv run python -m src.app` - [ ] Commit: `git commit -m "feat: phase 4 orchestrator and UI complete"` --- ## 9. Definition of Done Phase 4 is **COMPLETE** when: 1. All unit tests pass: `uv run pytest tests/unit/test_orchestrator.py -v` 2. Orchestrator correctly loops Search -> Judge until sufficient 3. Max iterations limit is enforced 4. Graceful error handling throughout 5. Gradio UI streams events in real-time 6. Can run locally: ```bash # Start the UI uv run python -m src.app # Open browser to http://localhost:7860 # Enter a question like "What drugs could be repurposed for Alzheimer's disease?" # Watch the agent search, evaluate, and respond ``` 7. Can run the full flow in Python: ```python import asyncio from src.orchestrator import Orchestrator from src.tools.pubmed import PubMedTool from src.tools.websearch import WebTool from src.tools.search_handler import SearchHandler from src.agent_factory.judges import MockJudgeHandler from src.utils.models import OrchestratorConfig async def test_full_flow(): # Create components search_handler = SearchHandler([PubMedTool(), WebTool()]) judge_handler = MockJudgeHandler() # Use mock for testing config = OrchestratorConfig(max_iterations=3) # Create orchestrator orchestrator = Orchestrator( search_handler=search_handler, judge_handler=judge_handler, config=config, ) # Run and collect events print("Starting agent...") async for event in orchestrator.run("metformin alzheimer"): print(event.to_markdown()) print("\nDone!") asyncio.run(test_full_flow()) ``` --- ## 10. Deployment Verification After deployment to HuggingFace Spaces: 1. **Visit the Space URL** and verify the UI loads 2. **Test with example queries**: - "What drugs could be repurposed for Alzheimer's disease?" - "Is metformin effective for cancer treatment?" 3. **Verify streaming** - events should appear in real-time 4. **Check error handling** - try an empty query, verify graceful handling 5. **Monitor logs** for any errors --- ## Project Complete! 🎉 When Phase 4 is done, the DeepCritical MVP is complete: - **Phase 1**: Foundation (uv, pytest, config) ✅ - **Phase 2**: Search Slice (PubMed, DuckDuckGo) ✅ - **Phase 3**: Judge Slice (PydanticAI, structured output) ✅ - **Phase 4**: Orchestrator + UI (Gradio, streaming) ✅ The agent can: 1. Accept a drug repurposing question 2. Search PubMed and the web for evidence 3. Evaluate evidence quality with an LLM 4. Loop until confident or max iterations 5. Synthesize a research-backed recommendation 6. Display real-time progress in a beautiful UI