Spaces:
Running
Running
File size: 5,526 Bytes
53c4c46 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# Orchestrators Architecture
DeepCritical supports multiple orchestration patterns for research workflows.
## Research Flows
### IterativeResearchFlow
**File**: `src/orchestrator/research_flow.py`
**Pattern**: Generate observations β Evaluate gaps β Select tools β Execute β Judge β Continue/Complete
**Agents Used**:
- `KnowledgeGapAgent`: Evaluates research completeness
- `ToolSelectorAgent`: Selects tools for addressing gaps
- `ThinkingAgent`: Generates observations
- `WriterAgent`: Creates final report
- `JudgeHandler`: Assesses evidence sufficiency
**Features**:
- Tracks iterations, time, budget
- Supports graph execution (`use_graph=True`) and agent chains (`use_graph=False`)
- Iterates until research complete or constraints met
**Usage**:
```python
from src.orchestrator.research_flow import IterativeResearchFlow
flow = IterativeResearchFlow(
search_handler=search_handler,
judge_handler=judge_handler,
use_graph=False
)
async for event in flow.run(query):
# Handle events
pass
```
### DeepResearchFlow
**File**: `src/orchestrator/research_flow.py`
**Pattern**: Planner β Parallel iterative loops per section β Synthesizer
**Agents Used**:
- `PlannerAgent`: Breaks query into report sections
- `IterativeResearchFlow`: Per-section research (parallel)
- `LongWriterAgent` or `ProofreaderAgent`: Final synthesis
**Features**:
- Uses `WorkflowManager` for parallel execution
- Budget tracking per section and globally
- State synchronization across parallel loops
- Supports graph execution and agent chains
**Usage**:
```python
from src.orchestrator.research_flow import DeepResearchFlow
flow = DeepResearchFlow(
search_handler=search_handler,
judge_handler=judge_handler,
use_graph=True
)
async for event in flow.run(query):
# Handle events
pass
```
## Graph Orchestrator
**File**: `src/orchestrator/graph_orchestrator.py`
**Purpose**: Graph-based execution using Pydantic AI agents as nodes
**Features**:
- Uses Pydantic AI Graphs (when available) or agent chains (fallback)
- Routes based on research mode (iterative/deep/auto)
- Streams `AgentEvent` objects for UI
**Node Types**:
- **Agent Nodes**: Execute Pydantic AI agents
- **State Nodes**: Update or read workflow state
- **Decision Nodes**: Make routing decisions
- **Parallel Nodes**: Execute multiple nodes concurrently
**Edge Types**:
- **Sequential Edges**: Always traversed
- **Conditional Edges**: Traversed based on condition
- **Parallel Edges**: Used for parallel execution branches
## Orchestrator Factory
**File**: `src/orchestrator_factory.py`
**Purpose**: Factory for creating orchestrators
**Modes**:
- **Simple**: Legacy orchestrator (backward compatible)
- **Advanced**: Magentic orchestrator (requires OpenAI API key)
- **Auto-detect**: Chooses based on API key availability
**Usage**:
```python
from src.orchestrator_factory import create_orchestrator
orchestrator = create_orchestrator(
search_handler=search_handler,
judge_handler=judge_handler,
config={},
mode="advanced" # or "simple" or None for auto-detect
)
```
## Magentic Orchestrator
**File**: `src/orchestrator_magentic.py`
**Purpose**: Multi-agent coordination using Microsoft Agent Framework
**Features**:
- Uses `agent-framework-core`
- ChatAgent pattern with internal LLMs per agent
- `MagenticBuilder` with participants: searcher, hypothesizer, judge, reporter
- Manager orchestrates agents via `OpenAIChatClient`
- Requires OpenAI API key (function calling support)
- Event-driven: converts Magentic events to `AgentEvent` for UI streaming
**Requirements**:
- `agent-framework-core` package
- OpenAI API key
## Hierarchical Orchestrator
**File**: `src/orchestrator_hierarchical.py`
**Purpose**: Hierarchical orchestrator using middleware and sub-teams
**Features**:
- Uses `SubIterationMiddleware` with `ResearchTeam` and `LLMSubIterationJudge`
- Adapts Magentic ChatAgent to `SubIterationTeam` protocol
- Event-driven via `asyncio.Queue` for coordination
- Supports sub-iteration patterns for complex research tasks
## Legacy Simple Mode
**File**: `src/legacy_orchestrator.py`
**Purpose**: Linear search-judge-synthesize loop
**Features**:
- Uses `SearchHandlerProtocol` and `JudgeHandlerProtocol`
- Generator-based design yielding `AgentEvent` objects
- Backward compatibility for simple use cases
## State Initialization
All orchestrators must initialize workflow state:
```python
from src.middleware.state_machine import init_workflow_state
from src.services.embeddings import get_embedding_service
embedding_service = get_embedding_service()
init_workflow_state(embedding_service)
```
## Event Streaming
All orchestrators yield `AgentEvent` objects:
**Event Types**:
- `started`: Research started
- `search_complete`: Search completed
- `judge_complete`: Evidence evaluation completed
- `hypothesizing`: Generating hypotheses
- `synthesizing`: Synthesizing results
- `complete`: Research completed
- `error`: Error occurred
**Event Structure**:
```python
class AgentEvent:
type: str
iteration: int | None
data: dict[str, Any]
```
## See Also
- [Graph Orchestration](graph-orchestration.md) - Graph-based execution details
- [Graph Orchestration (Detailed)](graph_orchestration.md) - Detailed graph architecture
- [Workflows](workflows.md) - Workflow diagrams and patterns
- [Workflow Diagrams](workflow-diagrams.md) - Detailed workflow diagrams
- [API Reference - Orchestrators](../api/orchestrators.md) - API documentation
|