Spaces:
Running
Running
Phase 2 Implementation Spec: Search Vertical Slice
Goal: Implement the "Eyes and Ears" of the agent — retrieving real biomedical data. Philosophy: "Real data, mocked connections." Prerequisite: Phase 1 complete (all tests passing)
1. The Slice Definition
This slice covers:
- Input: A string query (e.g., "metformin Alzheimer's disease").
- Process:
- Fetch from PubMed (E-utilities API).
- Fetch from Web (DuckDuckGo).
- Normalize results into
Evidencemodels.
- Output: A list of
Evidenceobjects.
Files to Create:
src/utils/models.py- Pydantic models (Evidence, Citation, SearchResult)src/tools/pubmed.py- PubMed E-utilities toolsrc/tools/websearch.py- DuckDuckGo search toolsrc/tools/search_handler.py- Orchestrates multiple toolssrc/tools/__init__.py- Exports
2. PubMed E-utilities API Reference
Base URL: https://eutils.ncbi.nlm.nih.gov/entrez/eutils/
Key Endpoints
| Endpoint | Purpose | Example |
|---|---|---|
esearch.fcgi |
Search for article IDs | ?db=pubmed&term=metformin+alzheimer&retmax=10 |
efetch.fcgi |
Fetch article details | ?db=pubmed&id=12345,67890&rettype=abstract&retmode=xml |
Rate Limiting (CRITICAL!)
NCBI requires rate limiting:
- Without API key: 3 requests/second
- With API key: 10 requests/second
Get a free API key: https://www.ncbi.nlm.nih.gov/account/settings/
# Add to .env
NCBI_API_KEY=your-key-here # Optional but recommended
Example Search Flow
1. esearch: "metformin alzheimer" → [PMID: 12345, 67890, ...]
2. efetch: PMIDs → Full abstracts/metadata
3. Parse XML → Evidence objects
3. Models (src/utils/models.py)
"""Data models for the Search feature."""
from pydantic import BaseModel, Field
from typing import Literal
class Citation(BaseModel):
"""A citation to a source document."""
source: Literal["pubmed", "web"] = Field(description="Where this came from")
title: str = Field(min_length=1, max_length=500)
url: str = Field(description="URL to the source")
date: str = Field(description="Publication date (YYYY-MM-DD or 'Unknown')")
authors: list[str] = Field(default_factory=list)
@property
def formatted(self) -> str:
"""Format as a citation string."""
author_str = ", ".join(self.authors[:3])
if len(self.authors) > 3:
author_str += " et al."
return f"{author_str} ({self.date}). {self.title}. {self.source.upper()}"
class Evidence(BaseModel):
"""A piece of evidence retrieved from search."""
content: str = Field(min_length=1, description="The actual text content")
citation: Citation
relevance: float = Field(default=0.0, ge=0.0, le=1.0, description="Relevance score 0-1")
class Config:
frozen = True # Immutable after creation
class SearchResult(BaseModel):
"""Result of a search operation."""
query: str
evidence: list[Evidence]
sources_searched: list[Literal["pubmed", "web"]]
total_found: int
errors: list[str] = Field(default_factory=list)
4. Tool Protocol (src/tools/pubmed.py and src/tools/websearch.py)
The Interface (Protocol) - Add to src/tools/__init__.py
"""Search tools package."""
from typing import Protocol, List
# Import implementations
from src.tools.pubmed import PubMedTool
from src.tools.websearch import WebTool
from src.tools.search_handler import SearchHandler
# Re-export
__all__ = ["SearchTool", "PubMedTool", "WebTool", "SearchHandler"]
class SearchTool(Protocol):
"""Protocol defining the interface for all search tools."""
@property
def name(self) -> str:
"""Human-readable name of this tool."""
...
async def search(self, query: str, max_results: int = 10) -> List["Evidence"]:
"""
Execute a search and return evidence.
Args:
query: The search query string
max_results: Maximum number of results to return
Returns:
List of Evidence objects
Raises:
SearchError: If the search fails
RateLimitError: If we hit rate limits
"""
...
PubMed Tool Implementation (src/tools/pubmed.py)
"""PubMed search tool using NCBI E-utilities."""
import asyncio
import httpx
import xmltodict
from typing import List
from tenacity import retry, stop_after_attempt, wait_exponential
from src.utils.config import settings
from src.utils.exceptions import SearchError, RateLimitError
from src.utils.models import Evidence, Citation
class PubMedTool:
"""Search tool for PubMed/NCBI."""
BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
RATE_LIMIT_DELAY = 0.34 # ~3 requests/sec without API key
def __init__(self, api_key: str | None = None):
self.api_key = api_key or getattr(settings, "ncbi_api_key", None)
self._last_request_time = 0.0
@property
def name(self) -> str:
return "pubmed"
async def _rate_limit(self) -> None:
"""Enforce NCBI rate limiting."""
now = asyncio.get_event_loop().time()
elapsed = now - self._last_request_time
if elapsed < self.RATE_LIMIT_DELAY:
await asyncio.sleep(self.RATE_LIMIT_DELAY - elapsed)
self._last_request_time = asyncio.get_event_loop().time()
def _build_params(self, **kwargs) -> dict:
"""Build request params with optional API key."""
params = {**kwargs, "retmode": "json"}
if self.api_key:
params["api_key"] = self.api_key
return params
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True,
)
async def search(self, query: str, max_results: int = 10) -> List[Evidence]:
"""
Search PubMed and return evidence.
1. ESearch: Get PMIDs matching query
2. EFetch: Get abstracts for those PMIDs
3. Parse and return Evidence objects
"""
await self._rate_limit()
async with httpx.AsyncClient(timeout=30.0) as client:
# Step 1: Search for PMIDs
search_params = self._build_params(
db="pubmed",
term=query,
retmax=max_results,
sort="relevance",
)
try:
search_resp = await client.get(
f"{self.BASE_URL}/esearch.fcgi",
params=search_params,
)
search_resp.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
raise RateLimitError("PubMed rate limit exceeded")
raise SearchError(f"PubMed search failed: {e}")
search_data = search_resp.json()
pmids = search_data.get("esearchresult", {}).get("idlist", [])
if not pmids:
return []
# Step 2: Fetch abstracts
await self._rate_limit()
fetch_params = self._build_params(
db="pubmed",
id=",".join(pmids),
rettype="abstract",
)
# Use XML for fetch (more reliable parsing)
fetch_params["retmode"] = "xml"
fetch_resp = await client.get(
f"{self.BASE_URL}/efetch.fcgi",
params=fetch_params,
)
fetch_resp.raise_for_status()
# Step 3: Parse XML to Evidence
return self._parse_pubmed_xml(fetch_resp.text)
def _parse_pubmed_xml(self, xml_text: str) -> List[Evidence]:
"""Parse PubMed XML into Evidence objects."""
try:
data = xmltodict.parse(xml_text)
except Exception as e:
raise SearchError(f"Failed to parse PubMed XML: {e}")
articles = data.get("PubmedArticleSet", {}).get("PubmedArticle", [])
# Handle single article (xmltodict returns dict instead of list)
if isinstance(articles, dict):
articles = [articles]
evidence_list = []
for article in articles:
try:
evidence = self._article_to_evidence(article)
if evidence:
evidence_list.append(evidence)
except Exception:
continue # Skip malformed articles
return evidence_list
def _article_to_evidence(self, article: dict) -> Evidence | None:
"""Convert a single PubMed article to Evidence."""
medline = article.get("MedlineCitation", {})
article_data = medline.get("Article", {})
# Extract PMID
pmid = medline.get("PMID", {})
if isinstance(pmid, dict):
pmid = pmid.get("#text", "")
# Extract title
title = article_data.get("ArticleTitle", "")
if isinstance(title, dict):
title = title.get("#text", str(title))
# Extract abstract
abstract_data = article_data.get("Abstract", {}).get("AbstractText", "")
if isinstance(abstract_data, list):
abstract = " ".join(
item.get("#text", str(item)) if isinstance(item, dict) else str(item)
for item in abstract_data
)
elif isinstance(abstract_data, dict):
abstract = abstract_data.get("#text", str(abstract_data))
else:
abstract = str(abstract_data)
if not abstract or not title:
return None
# Extract date
pub_date = article_data.get("Journal", {}).get("JournalIssue", {}).get("PubDate", {})
year = pub_date.get("Year", "Unknown")
month = pub_date.get("Month", "01")
day = pub_date.get("Day", "01")
date_str = f"{year}-{month}-{day}" if year != "Unknown" else "Unknown"
# Extract authors
author_list = article_data.get("AuthorList", {}).get("Author", [])
if isinstance(author_list, dict):
author_list = [author_list]
authors = []
for author in author_list[:5]: # Limit to 5 authors
last = author.get("LastName", "")
first = author.get("ForeName", "")
if last:
authors.append(f"{last} {first}".strip())
return Evidence(
content=abstract[:2000], # Truncate long abstracts
citation=Citation(
source="pubmed",
title=title[:500],
url=f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/",
date=date_str,
authors=authors,
),
)
DuckDuckGo Tool Implementation (src/tools/websearch.py)
"""Web search tool using DuckDuckGo."""
from typing import List
from duckduckgo_search import DDGS
from src.utils.exceptions import SearchError
from src.utils.models import Evidence, Citation
class WebTool:
"""Search tool for general web search via DuckDuckGo."""
def __init__(self):
pass
@property
def name(self) -> str:
return "web"
async def search(self, query: str, max_results: int = 10) -> List[Evidence]:
"""
Search DuckDuckGo and return evidence.
Note: duckduckgo-search is synchronous, so we run it in executor.
"""
import asyncio
loop = asyncio.get_event_loop()
try:
results = await loop.run_in_executor(
None,
lambda: self._sync_search(query, max_results),
)
return results
except Exception as e:
raise SearchError(f"Web search failed: {e}")
def _sync_search(self, query: str, max_results: int) -> List[Evidence]:
"""Synchronous search implementation."""
evidence_list = []
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
for result in results:
evidence_list.append(
Evidence(
content=result.get("body", "")[:1000],
citation=Citation(
source="web",
title=result.get("title", "Unknown")[:500],
url=result.get("href", ""),
date="Unknown",
authors=[],
),
)
)
return evidence_list
5. Search Handler (src/tools/search_handler.py)
The handler orchestrates multiple tools using the Scatter-Gather pattern.
"""Search handler - orchestrates multiple search tools."""
import asyncio
from typing import List, Protocol
import structlog
from src.utils.exceptions import SearchError
from src.utils.models import Evidence, SearchResult
logger = structlog.get_logger()
class SearchTool(Protocol):
"""Protocol defining the interface for all search tools."""
@property
def name(self) -> str:
...
async def search(self, query: str, max_results: int = 10) -> List[Evidence]:
...
def flatten(nested: List[List[Evidence]]) -> List[Evidence]:
"""Flatten a list of lists into a single list."""
return [item for sublist in nested for item in sublist]
class SearchHandler:
"""Orchestrates parallel searches across multiple tools."""
def __init__(self, tools: List[SearchTool], timeout: float = 30.0):
"""
Initialize the search handler.
Args:
tools: List of search tools to use
timeout: Timeout for each search in seconds
"""
self.tools = tools
self.timeout = timeout
async def execute(self, query: str, max_results_per_tool: int = 10) -> SearchResult:
"""
Execute search across all tools in parallel.
Args:
query: The search query
max_results_per_tool: Max results from each tool
Returns:
SearchResult containing all evidence and metadata
"""
logger.info("Starting search", query=query, tools=[t.name for t in self.tools])
# Create tasks for parallel execution
tasks = [
self._search_with_timeout(tool, query, max_results_per_tool)
for tool in self.tools
]
# Gather results (don't fail if one tool fails)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
all_evidence: List[Evidence] = []
sources_searched: List[str] = []
errors: List[str] = []
for tool, result in zip(self.tools, results):
if isinstance(result, Exception):
errors.append(f"{tool.name}: {str(result)}")
logger.warning("Search tool failed", tool=tool.name, error=str(result))
else:
all_evidence.extend(result)
sources_searched.append(tool.name)
logger.info("Search tool succeeded", tool=tool.name, count=len(result))
return SearchResult(
query=query,
evidence=all_evidence,
sources_searched=sources_searched,
total_found=len(all_evidence),
errors=errors,
)
async def _search_with_timeout(
self,
tool: SearchTool,
query: str,
max_results: int,
) -> List[Evidence]:
"""Execute a single tool search with timeout."""
try:
return await asyncio.wait_for(
tool.search(query, max_results),
timeout=self.timeout,
)
except asyncio.TimeoutError:
raise SearchError(f"{tool.name} search timed out after {self.timeout}s")
6. TDD Workflow
Test File: tests/unit/tools/test_pubmed.py
"""Unit tests for PubMed tool."""
import pytest
from unittest.mock import AsyncMock, MagicMock
# Sample PubMed XML response for mocking
SAMPLE_PUBMED_XML = """<?xml version="1.0" ?>
<PubmedArticleSet>
<PubmedArticle>
<MedlineCitation>
<PMID>12345678</PMID>
<Article>
<ArticleTitle>Metformin in Alzheimer's Disease: A Systematic Review</ArticleTitle>
<Abstract>
<AbstractText>Metformin shows neuroprotective properties...</AbstractText>
</Abstract>
<AuthorList>
<Author>
<LastName>Smith</LastName>
<ForeName>John</ForeName>
</Author>
</AuthorList>
<Journal>
<JournalIssue>
<PubDate>
<Year>2024</Year>
<Month>01</Month>
</PubDate>
</JournalIssue>
</Journal>
</Article>
</MedlineCitation>
</PubmedArticle>
</PubmedArticleSet>
"""
class TestPubMedTool:
"""Tests for PubMedTool."""
@pytest.mark.asyncio
async def test_search_returns_evidence(self, mocker):
"""PubMedTool should return Evidence objects from search."""
from src.tools.pubmed import PubMedTool
# Mock the HTTP responses
mock_search_response = MagicMock()
mock_search_response.json.return_value = {
"esearchresult": {"idlist": ["12345678"]}
}
mock_search_response.raise_for_status = MagicMock()
mock_fetch_response = MagicMock()
mock_fetch_response.text = SAMPLE_PUBMED_XML
mock_fetch_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=[mock_search_response, mock_fetch_response])
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=None)
mocker.patch("httpx.AsyncClient", return_value=mock_client)
# Act
tool = PubMedTool()
results = await tool.search("metformin alzheimer")
# Assert
assert len(results) == 1
assert results[0].citation.source == "pubmed"
assert "Metformin" in results[0].citation.title
assert "12345678" in results[0].citation.url
@pytest.mark.asyncio
async def test_search_empty_results(self, mocker):
"""PubMedTool should return empty list when no results."""
from src.tools.pubmed import PubMedTool
mock_response = MagicMock()
mock_response.json.return_value = {"esearchresult": {"idlist": []}}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=None)
mocker.patch("httpx.AsyncClient", return_value=mock_client)
tool = PubMedTool()
results = await tool.search("xyznonexistentquery123")
assert results == []
def test_parse_pubmed_xml(self):
"""PubMedTool should correctly parse XML."""
from src.tools.pubmed import PubMedTool
tool = PubMedTool()
results = tool._parse_pubmed_xml(SAMPLE_PUBMED_XML)
assert len(results) == 1
assert results[0].citation.source == "pubmed"
assert "Smith John" in results[0].citation.authors
Test File: tests/unit/tools/test_websearch.py
"""Unit tests for WebTool."""
import pytest
from unittest.mock import MagicMock
class TestWebTool:
"""Tests for WebTool."""
@pytest.mark.asyncio
async def test_search_returns_evidence(self, mocker):
"""WebTool should return Evidence objects from search."""
from src.tools.websearch import WebTool
mock_results = [
{
"title": "Drug Repurposing Article",
"href": "https://example.com/article",
"body": "Some content about drug repurposing...",
}
]
mock_ddgs = MagicMock()
mock_ddgs.__enter__ = MagicMock(return_value=mock_ddgs)
mock_ddgs.__exit__ = MagicMock(return_value=None)
mock_ddgs.text = MagicMock(return_value=mock_results)
mocker.patch("src.tools.websearch.DDGS", return_value=mock_ddgs)
tool = WebTool()
results = await tool.search("drug repurposing")
assert len(results) == 1
assert results[0].citation.source == "web"
assert "Drug Repurposing" in results[0].citation.title
Test File: tests/unit/tools/test_search_handler.py
"""Unit tests for SearchHandler."""
import pytest
from unittest.mock import AsyncMock
from src.utils.models import Evidence, Citation
from src.utils.exceptions import SearchError
class TestSearchHandler:
"""Tests for SearchHandler."""
@pytest.mark.asyncio
async def test_execute_aggregates_results(self):
"""SearchHandler should aggregate results from all tools."""
from src.tools.search_handler import SearchHandler
# Create mock tools
mock_tool_1 = AsyncMock()
mock_tool_1.name = "mock1"
mock_tool_1.search = AsyncMock(return_value=[
Evidence(
content="Result 1",
citation=Citation(source="pubmed", title="T1", url="u1", date="2024"),
)
])
mock_tool_2 = AsyncMock()
mock_tool_2.name = "mock2"
mock_tool_2.search = AsyncMock(return_value=[
Evidence(
content="Result 2",
citation=Citation(source="web", title="T2", url="u2", date="2024"),
)
])
handler = SearchHandler(tools=[mock_tool_1, mock_tool_2])
result = await handler.execute("test query")
assert result.total_found == 2
assert "mock1" in result.sources_searched
assert "mock2" in result.sources_searched
assert len(result.errors) == 0
@pytest.mark.asyncio
async def test_execute_handles_tool_failure(self):
"""SearchHandler should continue if one tool fails."""
from src.tools.search_handler import SearchHandler
mock_tool_ok = AsyncMock()
mock_tool_ok.name = "ok_tool"
mock_tool_ok.search = AsyncMock(return_value=[
Evidence(
content="Good result",
citation=Citation(source="pubmed", title="T", url="u", date="2024"),
)
])
mock_tool_fail = AsyncMock()
mock_tool_fail.name = "fail_tool"
mock_tool_fail.search = AsyncMock(side_effect=SearchError("API down"))
handler = SearchHandler(tools=[mock_tool_ok, mock_tool_fail])
result = await handler.execute("test")
assert result.total_found == 1
assert "ok_tool" in result.sources_searched
assert len(result.errors) == 1
assert "fail_tool" in result.errors[0]
7. Integration Test (Optional, Real API)
# tests/integration/test_pubmed_live.py
"""Integration tests that hit real APIs (run manually)."""
import pytest
@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.asyncio
async def test_pubmed_live_search():
"""Test real PubMed search (requires network)."""
from src.tools.pubmed import PubMedTool
tool = PubMedTool()
results = await tool.search("metformin diabetes", max_results=3)
assert len(results) > 0
assert results[0].citation.source == "pubmed"
assert "pubmed.ncbi.nlm.nih.gov" in results[0].citation.url
# Run with: uv run pytest tests/integration -m integration
8. Implementation Checklist
- Create
src/utils/models.pywith all Pydantic models (Evidence, Citation, SearchResult) - Create
src/tools/__init__.pywith SearchTool Protocol and exports - Implement
src/tools/pubmed.pywith PubMedTool class - Implement
src/tools/websearch.pywith WebTool class - Create
src/tools/search_handler.pywith SearchHandler class - Write tests in
tests/unit/tools/test_pubmed.py - Write tests in
tests/unit/tools/test_websearch.py - Write tests in
tests/unit/tools/test_search_handler.py - Run
uv run pytest tests/unit/tools/ -v— ALL TESTS MUST PASS - (Optional) Run integration test:
uv run pytest -m integration - Commit:
git commit -m "feat: phase 2 search slice complete"
9. Definition of Done
Phase 2 is COMPLETE when:
- All unit tests pass:
uv run pytest tests/unit/tools/ -v SearchHandlercan execute with both tools- Graceful degradation: if PubMed fails, WebTool results still return
- Rate limiting is enforced (verify no 429 errors)
- Can run this in Python REPL:
import asyncio
from src.tools.pubmed import PubMedTool
from src.tools.websearch import WebTool
from src.tools.search_handler import SearchHandler
async def test():
handler = SearchHandler([PubMedTool(), WebTool()])
result = await handler.execute("metformin alzheimer")
print(f"Found {result.total_found} results")
for e in result.evidence[:3]:
print(f"- {e.citation.title}")
asyncio.run(test())
Proceed to Phase 3 ONLY after all checkboxes are complete.