File size: 16,669 Bytes
cd46aca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# DeepCritical Project - Rules

## Project-Wide Rules

**Architecture**: Multi-agent research system using Pydantic AI for agent orchestration, supporting iterative and deep research patterns. Uses middleware for state management, budget tracking, and workflow coordination.

**Type Safety**: ALWAYS use complete type hints. All functions must have parameter and return type annotations. Use `mypy --strict` compliance. Use `TYPE_CHECKING` imports for circular dependencies: `from typing import TYPE_CHECKING; if TYPE_CHECKING: from src.services.embeddings import EmbeddingService`

**Async Patterns**: ALL I/O operations must be async (`async def`, `await`). Use `asyncio.gather()` for parallel operations. CPU-bound work must use `run_in_executor()`: `loop = asyncio.get_running_loop(); result = await loop.run_in_executor(None, cpu_bound_function, args)`. Never block the event loop.

**Error Handling**: Use custom exceptions from `src/utils/exceptions.py`: `DeepCriticalError`, `SearchError`, `RateLimitError`, `JudgeError`, `ConfigurationError`. Always chain exceptions: `raise SearchError(...) from e`. Log with structlog: `logger.error("Operation failed", error=str(e), context=value)`.

**Logging**: Use `structlog` for ALL logging (NOT `print` or `logging`). Import: `import structlog; logger = structlog.get_logger()`. Log with structured data: `logger.info("event", key=value)`. Use appropriate levels: DEBUG, INFO, WARNING, ERROR.

**Pydantic Models**: All data exchange uses Pydantic models from `src/utils/models.py`. Models are frozen (`model_config = {"frozen": True}`) for immutability. Use `Field()` with descriptions. Validate with `ge=`, `le=`, `min_length=`, `max_length=` constraints.

**Code Style**: Ruff with 100-char line length. Ignore rules: `PLR0913` (too many arguments), `PLR0912` (too many branches), `PLR0911` (too many returns), `PLR2004` (magic values), `PLW0603` (global statement), `PLC0415` (lazy imports).

**Docstrings**: Google-style docstrings for all public functions. Include Args, Returns, Raises sections. Use type hints in docstrings only if needed for clarity.

**Testing**: Unit tests in `tests/unit/` (mocked, fast). Integration tests in `tests/integration/` (real APIs, marked `@pytest.mark.integration`). Use `respx` for httpx mocking, `pytest-mock` for general mocking.

**State Management**: Use `ContextVar` in middleware for thread-safe isolation. Never use global mutable state (except singletons via `@lru_cache`). Use `WorkflowState` from `src/middleware/state_machine.py` for workflow state.

**Citation Validation**: ALWAYS validate references before returning reports. Use `validate_references()` from `src/utils/citation_validator.py`. Remove hallucinated citations. Log warnings for removed citations.

---

## src/agents/ - Agent Implementation Rules

**Pattern**: All agents use Pydantic AI `Agent` class. Agents have structured output types (Pydantic models) or return strings. Use factory functions in `src/agent_factory/agents.py` for creation.

**Agent Structure**: 
- System prompt as module-level constant (with date injection: `datetime.now().strftime("%Y-%m-%d")`)
- Agent class with `__init__(model: Any | None = None)`
- Main method (e.g., `async def evaluate()`, `async def write_report()`)
- Factory function: `def create_agent_name(model: Any | None = None) -> AgentName`

**Model Initialization**: Use `get_model()` from `src/agent_factory/judges.py` if no model provided. Support OpenAI/Anthropic/HF Inference via settings.

**Error Handling**: Return fallback values (e.g., `KnowledgeGapOutput(research_complete=False, outstanding_gaps=[...])`) on failure. Log errors with context. Use retry logic (3 retries) in Pydantic AI Agent initialization.

**Input Validation**: Validate query/inputs are not empty. Truncate very long inputs with warnings. Handle None values gracefully.

**Output Types**: Use structured output types from `src/utils/models.py` (e.g., `KnowledgeGapOutput`, `AgentSelectionPlan`, `ReportDraft`). For text output (writer agents), return `str` directly.

**Agent-Specific Rules**:
- `knowledge_gap.py`: Outputs `KnowledgeGapOutput`. Evaluates research completeness.
- `tool_selector.py`: Outputs `AgentSelectionPlan`. Selects tools (RAG/web/database).
- `writer.py`: Returns markdown string. Includes citations in numbered format.
- `long_writer.py`: Uses `ReportDraft` input/output. Handles section-by-section writing.
- `proofreader.py`: Takes `ReportDraft`, returns polished markdown.
- `thinking.py`: Returns observation string from conversation history.
- `input_parser.py`: Outputs `ParsedQuery` with research mode detection.

---

## src/tools/ - Search Tool Rules

**Protocol**: All tools implement `SearchTool` protocol from `src/tools/base.py`: `name` property and `async def search(query, max_results) -> list[Evidence]`.

**Rate Limiting**: Use `@retry` decorator from tenacity: `@retry(stop=stop_after_attempt(3), wait=wait_exponential(...))`. Implement `_rate_limit()` method for APIs with limits. Use shared rate limiters from `src/tools/rate_limiter.py`.

**Error Handling**: Raise `SearchError` or `RateLimitError` on failures. Handle HTTP errors (429, 500, timeout). Return empty list on non-critical errors (log warning).

**Query Preprocessing**: Use `preprocess_query()` from `src/tools/query_utils.py` to remove noise and expand synonyms.

**Evidence Conversion**: Convert API responses to `Evidence` objects with `Citation`. Extract metadata (title, url, date, authors). Set relevance scores (0.0-1.0). Handle missing fields gracefully.

**Tool-Specific Rules**:
- `pubmed.py`: Use NCBI E-utilities (ESearch β†’ EFetch). Rate limit: 0.34s between requests. Parse XML with `xmltodict`. Handle single vs. multiple articles.
- `clinicaltrials.py`: Use `requests` library (NOT httpx - WAF blocks httpx). Run in thread pool: `await asyncio.to_thread(requests.get, ...)`. Filter: Only interventional studies, active/completed.
- `europepmc.py`: Handle preprint markers: `[PREPRINT - Not peer-reviewed]`. Build URLs from DOI or PMID.
- `rag_tool.py`: Wraps `LlamaIndexRAGService`. Returns Evidence from RAG results. Handles ingestion.
- `search_handler.py`: Orchestrates parallel searches across multiple tools. Uses `asyncio.gather()` with `return_exceptions=True`. Aggregates results into `SearchResult`.

---

## src/middleware/ - Middleware Rules

**State Management**: Use `ContextVar` for thread-safe isolation. `WorkflowState` uses `ContextVar[WorkflowState | None]`. Initialize with `init_workflow_state(embedding_service)`. Access with `get_workflow_state()` (auto-initializes if missing).

**WorkflowState**: Tracks `evidence: list[Evidence]`, `conversation: Conversation`, `embedding_service: Any`. Methods: `add_evidence()` (deduplicates by URL), `async search_related()` (semantic search).

**WorkflowManager**: Manages parallel research loops. Methods: `add_loop()`, `run_loops_parallel()`, `update_loop_status()`, `sync_loop_evidence_to_state()`. Uses `asyncio.gather()` for parallel execution. Handles errors per loop (don't fail all if one fails).

**BudgetTracker**: Tracks tokens, time, iterations per loop and globally. Methods: `create_budget()`, `add_tokens()`, `start_timer()`, `update_timer()`, `increment_iteration()`, `check_budget()`, `can_continue()`. Token estimation: `estimate_tokens(text)` (~4 chars per token), `estimate_llm_call_tokens(prompt, response)`.

**Models**: All middleware models in `src/utils/models.py`. `IterationData`, `Conversation`, `ResearchLoop`, `BudgetStatus` are used by middleware.

---

## src/orchestrator/ - Orchestration Rules

**Research Flows**: Two patterns: `IterativeResearchFlow` (single loop) and `DeepResearchFlow` (plan β†’ parallel loops β†’ synthesis). Both support agent chains (`use_graph=False`) and graph execution (`use_graph=True`).

**IterativeResearchFlow**: Pattern: Generate observations β†’ Evaluate gaps β†’ Select tools β†’ Execute β†’ Judge β†’ Continue/Complete. Uses `KnowledgeGapAgent`, `ToolSelectorAgent`, `ThinkingAgent`, `WriterAgent`, `JudgeHandler`. Tracks iterations, time, budget.

**DeepResearchFlow**: Pattern: Planner β†’ Parallel iterative loops per section β†’ Synthesizer. Uses `PlannerAgent`, `IterativeResearchFlow` (per section), `LongWriterAgent` or `ProofreaderAgent`. Uses `WorkflowManager` for parallel execution.

**Graph Orchestrator**: Uses Pydantic AI Graphs (when available) or agent chains (fallback). Routes based on research mode (iterative/deep/auto). Streams `AgentEvent` objects for UI.

**State Initialization**: Always call `init_workflow_state()` before running flows. Initialize `BudgetTracker` per loop. Use `WorkflowManager` for parallel coordination.

**Event Streaming**: Yield `AgentEvent` objects during execution. Event types: "started", "search_complete", "judge_complete", "hypothesizing", "synthesizing", "complete", "error". Include iteration numbers and data payloads.

---

## src/services/ - Service Rules

**EmbeddingService**: Local sentence-transformers (NO API key required). All operations async-safe via `run_in_executor()`. ChromaDB for vector storage. Deduplication threshold: 0.85 (85% similarity = duplicate).

**LlamaIndexRAGService**: Uses OpenAI embeddings (requires `OPENAI_API_KEY`). Methods: `ingest_evidence()`, `retrieve()`, `query()`. Returns documents with metadata (source, title, url, date, authors). Lazy initialization with graceful fallback.

**StatisticalAnalyzer**: Generates Python code via LLM. Executes in Modal sandbox (secure, isolated). Library versions pinned in `SANDBOX_LIBRARIES` dict. Returns `AnalysisResult` with verdict (SUPPORTED/REFUTED/INCONCLUSIVE).

**Singleton Pattern**: Use `@lru_cache(maxsize=1)` for singletons: `@lru_cache(maxsize=1); def get_service() -> Service: return Service()`. Lazy initialization to avoid requiring dependencies at import time.

---

## src/utils/ - Utility Rules

**Models**: All Pydantic models in `src/utils/models.py`. Use frozen models (`model_config = {"frozen": True}`) except where mutation needed. Use `Field()` with descriptions. Validate with constraints.

**Config**: Settings via Pydantic Settings (`src/utils/config.py`). Load from `.env` automatically. Use `settings` singleton: `from src.utils.config import settings`. Validate API keys with properties: `has_openai_key`, `has_anthropic_key`.

**Exceptions**: Custom exception hierarchy in `src/utils/exceptions.py`. Base: `DeepCriticalError`. Specific: `SearchError`, `RateLimitError`, `JudgeError`, `ConfigurationError`. Always chain exceptions.

**LLM Factory**: Centralized LLM model creation in `src/utils/llm_factory.py`. Supports OpenAI, Anthropic, HF Inference. Use `get_model()` or factory functions. Check requirements before initialization.

**Citation Validator**: Use `validate_references()` from `src/utils/citation_validator.py`. Removes hallucinated citations (URLs not in evidence). Logs warnings. Returns validated report string.

---

## src/orchestrator_factory.py Rules

**Purpose**: Factory for creating orchestrators. Supports "simple" (legacy) and "advanced" (magentic) modes. Auto-detects mode based on API key availability.

**Pattern**: Lazy import for optional dependencies (`_get_magentic_orchestrator_class()`). Handles `ImportError` gracefully with clear error messages.

**Mode Detection**: `_determine_mode()` checks explicit mode or auto-detects: "advanced" if `settings.has_openai_key`, else "simple". Maps "magentic" β†’ "advanced".

**Function Signature**: `create_orchestrator(search_handler, judge_handler, config, mode) -> Any`. Simple mode requires handlers. Advanced mode uses MagenticOrchestrator.

**Error Handling**: Raise `ValueError` with clear messages if requirements not met. Log mode selection with structlog.

---

## src/orchestrator_hierarchical.py Rules

**Purpose**: Hierarchical orchestrator using middleware and sub-teams. Adapts Magentic ChatAgent to SubIterationTeam protocol.

**Pattern**: Uses `SubIterationMiddleware` with `ResearchTeam` and `LLMSubIterationJudge`. Event-driven via callback queue.

**State Initialization**: Initialize embedding service with graceful fallback. Use `init_magentic_state()` (deprecated, but kept for compatibility).

**Event Streaming**: Uses `asyncio.Queue` for event coordination. Yields `AgentEvent` objects. Handles event callback pattern with `asyncio.wait()`.

**Error Handling**: Log errors with context. Yield error events. Process remaining events after task completion.

---

## src/orchestrator_magentic.py Rules

**Purpose**: Magentic-based orchestrator using ChatAgent pattern. Each agent has internal LLM. Manager orchestrates agents.

**Pattern**: Uses `MagenticBuilder` with participants (searcher, hypothesizer, judge, reporter). Manager uses `OpenAIChatClient`. Workflow built in `_build_workflow()`.

**Event Processing**: `_process_event()` converts Magentic events to `AgentEvent`. Handles: `MagenticOrchestratorMessageEvent`, `MagenticAgentMessageEvent`, `MagenticFinalResultEvent`, `MagenticAgentDeltaEvent`, `WorkflowOutputEvent`.

**Text Extraction**: `_extract_text()` defensively extracts text from messages. Priority: `.content` β†’ `.text` β†’ `str(message)`. Handles buggy message objects.

**State Initialization**: Initialize embedding service with graceful fallback. Use `init_magentic_state()` (deprecated).

**Requirements**: Must call `check_magentic_requirements()` in `__init__`. Requires `agent-framework-core` and OpenAI API key.

**Event Types**: Maps agent names to event types: "search" β†’ "search_complete", "judge" β†’ "judge_complete", "hypothes" β†’ "hypothesizing", "report" β†’ "synthesizing".

---

## src/agent_factory/ - Factory Rules

**Pattern**: Factory functions for creating agents and handlers. Lazy initialization for optional dependencies. Support OpenAI/Anthropic/HF Inference.

**Judges**: `create_judge_handler()` creates `JudgeHandler` with structured output (`JudgeAssessment`). Supports `MockJudgeHandler`, `HFInferenceJudgeHandler` as fallbacks.

**Agents**: Factory functions in `agents.py` for all Pydantic AI agents. Pattern: `create_agent_name(model: Any | None = None) -> AgentName`. Use `get_model()` if model not provided.

**Graph Builder**: `graph_builder.py` contains utilities for building research graphs. Supports iterative and deep research graph construction.

**Error Handling**: Raise `ConfigurationError` if required API keys missing. Log agent creation. Handle import errors gracefully.

---

## src/prompts/ - Prompt Rules

**Pattern**: System prompts stored as module-level constants. Include date injection: `datetime.now().strftime("%Y-%m-%d")`. Format evidence with truncation (1500 chars per item).

**Judge Prompts**: In `judge.py`. Handle empty evidence case separately. Always request structured JSON output.

**Hypothesis Prompts**: In `hypothesis.py`. Use diverse evidence selection (MMR algorithm). Sentence-aware truncation.

**Report Prompts**: In `report.py`. Include full citation details. Use diverse evidence selection (n=20). Emphasize citation validation rules.

---

## Testing Rules

**Structure**: Unit tests in `tests/unit/` (mocked, fast). Integration tests in `tests/integration/` (real APIs, marked `@pytest.mark.integration`).

**Mocking**: Use `respx` for httpx mocking. Use `pytest-mock` for general mocking. Mock LLM calls in unit tests (use `MockJudgeHandler`).

**Fixtures**: Common fixtures in `tests/conftest.py`: `mock_httpx_client`, `mock_llm_response`.

**Coverage**: Aim for >80% coverage. Test error handling, edge cases, and integration paths.

---

## File-Specific Agent Rules

**knowledge_gap.py**: Outputs `KnowledgeGapOutput`. System prompt evaluates research completeness. Handles conversation history. Returns fallback on error.

**writer.py**: Returns markdown string. System prompt includes citation format examples. Validates inputs. Truncates long findings. Retry logic for transient failures.

**long_writer.py**: Uses `ReportDraft` input/output. Writes sections iteratively. Reformats references (deduplicates, renumbers). Reformats section headings.

**proofreader.py**: Takes `ReportDraft`, returns polished markdown. Removes duplicates. Adds summary. Preserves references.

**tool_selector.py**: Outputs `AgentSelectionPlan`. System prompt lists available agents (WebSearchAgent, SiteCrawlerAgent, RAGAgent). Guidelines for when to use each.

**thinking.py**: Returns observation string. Generates observations from conversation history. Uses query and background context.

**input_parser.py**: Outputs `ParsedQuery`. Detects research mode (iterative/deep). Extracts entities and research questions. Improves/refines query.