BirdScopeAI / langgraph_agent /subagent_router.py.legacy
facemelter's picture
Initial commit to hf space for hackathon
ff0e97f verified
"""
Subagent Router
Orchestrates routing between specialized subagents using LangGraph's
delegation pattern.
"""
from typing import Dict, Any, List, Literal
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import InMemorySaver
from .subagent_config import SubAgentConfig
from .subagent_factory import SubAgentFactory
async def create_router_agent(all_tools: List[Any], llm: BaseChatModel):
"""
Create a router agent that orchestrates specialized subagents.
Args:
all_tools: Full list of available MCP tools
llm: Language model for the router
Returns:
Compiled LangGraph workflow
"""
async def router_node(state: MessagesState):
"""Main router that delegates to subagents."""
# Get routing instructions
router_prompt = SubAgentConfig.get_router_prompt()
# Add system message with routing instructions
messages = [SystemMessage(content=router_prompt)] + state["messages"]
# Router decides which subagent to use
response = await llm.ainvoke(messages)
# Extract subagent name from response (you could make this more sophisticated)
# For now, the router will use tools to delegate
return {"messages": [response]}
async def create_subagent_node(subagent_name: str):
"""Create a node for a specific subagent."""
async def subagent_node(state: MessagesState):
# Create the specialized subagent
subagent = await SubAgentFactory.create_subagent(
subagent_name, all_tools, llm
)
# Run the subagent
result = await subagent.ainvoke(state)
return result
return subagent_node
# Build the graph
workflow = StateGraph(MessagesState)
# Add nodes
workflow.add_node("router", router_node)
workflow.add_node("image_identifier", await create_subagent_node("image_identifier"))
workflow.add_node("species_explorer", await create_subagent_node("species_explorer"))
workflow.add_node("taxonomy_specialist", await create_subagent_node("taxonomy_specialist"))
# Define routing logic
def route_to_specialist(state: MessagesState) -> Literal["image_identifier", "species_explorer", "taxonomy_specialist", END]:
"""Route based on last message content."""
last_message = state["messages"][-1]
content = last_message.content.lower()
# Simple keyword-based routing (could be improved with LLM classification)
if any(word in content for word in ["identify", "what bird", "classify", "image", "photo"]):
return "image_identifier"
elif any(word in content for word in ["audio", "sound", "call", "song", "find", "search"]):
return "species_explorer"
elif any(word in content for word in ["family", "families", "conservation", "endangered", "taxonomy"]):
return "taxonomy_specialist"
else:
# Default to species explorer for general queries
return "species_explorer"
# Connect nodes
workflow.add_edge(START, "router")
workflow.add_conditional_edges("router", route_to_specialist)
workflow.add_edge("image_identifier", END)
workflow.add_edge("species_explorer", END)
workflow.add_edge("taxonomy_specialist", END)
# Compile with memory for conversation context
return workflow.compile(checkpointer=InMemorySaver())