Spaces:
Running
Running
| """ | |
| Subagent Router | |
| Orchestrates routing between specialized subagents using LangGraph's | |
| delegation pattern. | |
| """ | |
| from typing import Dict, Any, List, Literal | |
| from langchain_core.language_models import BaseChatModel | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| from langgraph.graph import StateGraph, MessagesState, START, END | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.checkpoint.memory import InMemorySaver | |
| from .subagent_config import SubAgentConfig | |
| from .subagent_factory import SubAgentFactory | |
| async def create_router_agent(all_tools: List[Any], llm: BaseChatModel): | |
| """ | |
| Create a router agent that orchestrates specialized subagents. | |
| Args: | |
| all_tools: Full list of available MCP tools | |
| llm: Language model for the router | |
| Returns: | |
| Compiled LangGraph workflow | |
| """ | |
| async def router_node(state: MessagesState): | |
| """Main router that delegates to subagents.""" | |
| # Get routing instructions | |
| router_prompt = SubAgentConfig.get_router_prompt() | |
| # Add system message with routing instructions | |
| messages = [SystemMessage(content=router_prompt)] + state["messages"] | |
| # Router decides which subagent to use | |
| response = await llm.ainvoke(messages) | |
| # Extract subagent name from response (you could make this more sophisticated) | |
| # For now, the router will use tools to delegate | |
| return {"messages": [response]} | |
| async def create_subagent_node(subagent_name: str): | |
| """Create a node for a specific subagent.""" | |
| async def subagent_node(state: MessagesState): | |
| # Create the specialized subagent | |
| subagent = await SubAgentFactory.create_subagent( | |
| subagent_name, all_tools, llm | |
| ) | |
| # Run the subagent | |
| result = await subagent.ainvoke(state) | |
| return result | |
| return subagent_node | |
| # Build the graph | |
| workflow = StateGraph(MessagesState) | |
| # Add nodes | |
| workflow.add_node("router", router_node) | |
| workflow.add_node("image_identifier", await create_subagent_node("image_identifier")) | |
| workflow.add_node("species_explorer", await create_subagent_node("species_explorer")) | |
| workflow.add_node("taxonomy_specialist", await create_subagent_node("taxonomy_specialist")) | |
| # Define routing logic | |
| def route_to_specialist(state: MessagesState) -> Literal["image_identifier", "species_explorer", "taxonomy_specialist", END]: | |
| """Route based on last message content.""" | |
| last_message = state["messages"][-1] | |
| content = last_message.content.lower() | |
| # Simple keyword-based routing (could be improved with LLM classification) | |
| if any(word in content for word in ["identify", "what bird", "classify", "image", "photo"]): | |
| return "image_identifier" | |
| elif any(word in content for word in ["audio", "sound", "call", "song", "find", "search"]): | |
| return "species_explorer" | |
| elif any(word in content for word in ["family", "families", "conservation", "endangered", "taxonomy"]): | |
| return "taxonomy_specialist" | |
| else: | |
| # Default to species explorer for general queries | |
| return "species_explorer" | |
| # Connect nodes | |
| workflow.add_edge(START, "router") | |
| workflow.add_conditional_edges("router", route_to_specialist) | |
| workflow.add_edge("image_identifier", END) | |
| workflow.add_edge("species_explorer", END) | |
| workflow.add_edge("taxonomy_specialist", END) | |
| # Compile with memory for conversation context | |
| return workflow.compile(checkpointer=InMemorySaver()) |