"""OpenRouter API client with improved error handling and retry logic.""" import httpx import asyncio from typing import List, Dict, Any, Optional from .config_improved import ( OPENROUTER_API_KEY, OPENROUTER_API_URL, DEFAULT_TIMEOUT, MAX_RETRIES, RETRY_DELAY ) async def query_model( model: str, messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT, max_retries: int = MAX_RETRIES ) -> Optional[Dict[str, Any]]: """ Query a single model via OpenRouter API with retry logic. Args: model: OpenRouter model identifier (e.g., "openai/gpt-4o") messages: List of message dicts with 'role' and 'content' timeout: Request timeout in seconds max_retries: Maximum number of retry attempts Returns: Response dict with 'content' and optional 'reasoning_details', or None if failed """ headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "X-Title": "LLM Council", } payload = { "model": model, "messages": messages, } for attempt in range(max_retries + 1): try: async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post(OPENROUTER_API_URL, headers=headers, json=payload) response.raise_for_status() data = response.json() message = data["choices"][0]["message"] return { "content": message.get("content"), "reasoning_details": message.get("reasoning_details") } except httpx.TimeoutException as e: print(f"⏱️ Timeout querying model {model} (attempt {attempt + 1}/{max_retries + 1}): {e}") if attempt < max_retries: await asyncio.sleep(RETRY_DELAY * (attempt + 1)) # Exponential backoff continue return None except httpx.HTTPStatusError as e: print(f"🚫 HTTP error querying model {model}: {e.response.status_code} - {e.response.text}") # Don't retry on 4xx errors (client errors) if 400 <= e.response.status_code < 500: return None # Retry on 5xx errors (server errors) if attempt < max_retries: await asyncio.sleep(RETRY_DELAY * (attempt + 1)) continue return None except Exception as e: print(f"❌ Error querying model {model} (attempt {attempt + 1}/{max_retries + 1}): {e}") if attempt < max_retries: await asyncio.sleep(RETRY_DELAY) continue return None return None async def query_model_stream( model: str, messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT ): """ Query a model via OpenRouter API and stream the response. Yields content chunks as they arrive. Args: model: OpenRouter model identifier messages: List of message dicts with 'role' and 'content' timeout: Request timeout in seconds Yields: Content chunks as strings """ headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "X-Title": "LLM Council", } payload = { "model": model, "messages": messages, "stream": True } import json try: async with httpx.AsyncClient(timeout=timeout) as client: async with client.stream("POST", OPENROUTER_API_URL, headers=headers, json=payload) as response: response.raise_for_status() async for line in response.aiter_lines(): if line.startswith("data: "): data_str = line[6:] if data_str.strip() == "[DONE]": break try: data = json.loads(data_str) delta = data["choices"][0]["delta"] content = delta.get("content") if content: yield content except json.JSONDecodeError: pass except KeyError: pass except httpx.TimeoutException as e: print(f"⏱️ Timeout streaming model {model}: {e}") yield f"\n\n[Error: Request timed out after {timeout}s]" except httpx.HTTPStatusError as e: print(f"🚫 HTTP error streaming model {model}: {e.response.status_code}") yield f"\n\n[Error: HTTP {e.response.status_code}]" except Exception as e: print(f"❌ Error streaming model {model}: {e}") yield f"\n\n[Error: {str(e)}]" async def query_models_parallel( models: List[str], messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT ) -> Dict[str, Optional[Dict[str, Any]]]: """ Query multiple models in parallel with individual error handling. Args: models: List of OpenRouter model identifiers messages: List of message dicts to send to each model timeout: Request timeout in seconds Returns: Dict mapping model identifier to response dict (or None if failed) """ import asyncio print(f"🚀 Querying {len(models)} models in parallel...") # Create tasks for all models tasks = [query_model(model, messages, timeout=timeout) for model in models] # Wait for all to complete responses = await asyncio.gather(*tasks, return_exceptions=True) # Map models to their responses, handling exceptions result = {} for model, response in zip(models, responses): if isinstance(response, Exception): print(f"❌ Model {model} raised exception: {response}") result[model] = None else: result[model] = response status = "✅" if response else "❌" print(f"{status} Model {model} completed") successful = sum(1 for r in result.values() if r is not None) print(f"📊 {successful}/{len(models)} models responded successfully") return result