Krishna Chaitanya Cheedella
Fix: Update HuggingFace API to use router.huggingface.co (new endpoint)
5eb2461
raw
history blame
10.3 kB
"""API client for HuggingFace Inference API and OpenAI."""
import httpx
import asyncio
from typing import List, Dict, Any, Optional
from .config_free import (
OPENAI_API_KEY,
HUGGINGFACE_API_KEY,
DEFAULT_TIMEOUT,
MAX_RETRIES,
RETRY_DELAY
)
async def query_openai_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT,
max_retries: int = MAX_RETRIES
) -> Optional[Dict[str, Any]]:
"""
Query an OpenAI model.
Args:
model: OpenAI model name (e.g., "gpt-4o-mini")
messages: List of message dicts with 'role' and 'content'
timeout: Request timeout in seconds
max_retries: Maximum retry attempts
Returns:
Response dict with 'content', or None if failed
"""
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": 0.7,
}
for attempt in range(max_retries + 1):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
content = data["choices"][0]["message"]["content"]
return {"content": content}
except httpx.TimeoutException as e:
print(f"⏱️ Timeout querying OpenAI {model} (attempt {attempt + 1}/{max_retries + 1})")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except httpx.HTTPStatusError as e:
print(f"🚫 HTTP error querying OpenAI {model}: {e.response.status_code}")
if 400 <= e.response.status_code < 500:
return None
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except Exception as e:
print(f"❌ Error querying OpenAI {model}: {e}")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY)
continue
return None
return None
async def query_huggingface_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT,
max_retries: int = MAX_RETRIES
) -> Optional[Dict[str, Any]]:
"""
Query a HuggingFace model via Router (FREE).
Args:
model: HuggingFace model ID (e.g., "meta-llama/Llama-3.3-70B-Instruct")
messages: List of message dicts with 'role' and 'content'
timeout: Request timeout in seconds
max_retries: Maximum retry attempts
Returns:
Response dict with 'content', or None if failed
"""
headers = {
"Authorization": f"Bearer {HUGGINGFACE_API_KEY}",
"Content-Type": "application/json",
}
# Use OpenAI-compatible format for HuggingFace Router
payload = {
"model": model,
"messages": messages,
"max_tokens": 2048,
"temperature": 0.7,
"top_p": 0.9,
}
# Updated to use router.huggingface.co (new endpoint)
api_url = "https://router.huggingface.co/v1/chat/completions"
for attempt in range(max_retries + 1):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(api_url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
# Parse OpenAI-compatible response format
if "choices" in data and len(data["choices"]) > 0:
content = data["choices"][0]["message"]["content"]
return {"content": content}
else:
print(f"❌ Unexpected response format from HF {model}: {data}")
return None
except httpx.TimeoutException as e:
print(f"⏱️ Timeout querying HF {model} (attempt {attempt + 1}/{max_retries + 1})")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except httpx.HTTPStatusError as e:
error_msg = e.response.text
print(f"🚫 HTTP {e.response.status_code} querying HF {model}: {error_msg[:200]}")
# Model is loading - retry with longer delay
if "loading" in error_msg.lower() or "warming up" in error_msg.lower():
print(f"⏳ Model is loading, waiting 20s...")
await asyncio.sleep(20)
if attempt < max_retries:
continue
# Don't retry on client errors (except loading)
if 400 <= e.response.status_code < 500:
return None
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except Exception as e:
print(f"❌ Error querying HF {model}: {e}")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY)
continue
return None
return None
async def query_model(
model_config: Dict[str, str],
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
) -> Optional[Dict[str, Any]]:
"""
Query a model based on its configuration (provider-agnostic).
Args:
model_config: Dict with 'provider' and 'model' keys
messages: List of message dicts
timeout: Request timeout
Returns:
Response dict or None
"""
provider = model_config["provider"]
model = model_config["model"]
if provider == "openai":
return await query_openai_model(model, messages, timeout)
elif provider == "huggingface":
return await query_huggingface_model(model, messages, timeout)
else:
print(f"❌ Unknown provider: {provider}")
return None
async def query_model_stream(
model_config: Dict[str, str],
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
):
"""
Query a model and stream the response.
Args:
model_config: Dict with 'provider' and 'model' keys
messages: List of message dicts
timeout: Request timeout
Yields:
Content chunks
"""
provider = model_config["provider"]
model = model_config["model"]
if provider == "openai":
async for chunk in stream_openai_model(model, messages, timeout):
yield chunk
elif provider == "huggingface":
# HF Inference API doesn't support streaming well, fallback to full response
response = await query_huggingface_model(model, messages, timeout)
if response:
yield response["content"]
else:
yield "[Error: Failed to get response]"
else:
yield f"[Error: Unknown provider {provider}]"
async def stream_openai_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
):
"""Stream OpenAI model response."""
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": 0.7,
"stream": True,
}
import json
try:
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]":
break
try:
data = json.loads(data_str)
delta = data["choices"][0]["delta"]
content = delta.get("content")
if content:
yield content
except (json.JSONDecodeError, KeyError):
pass
except Exception as e:
print(f"❌ Error streaming OpenAI {model}: {e}")
yield f"\n[Error: {str(e)}]"
async def query_models_parallel(
model_configs: List[Dict[str, str]],
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
) -> Dict[str, Optional[Dict[str, Any]]]:
"""
Query multiple models in parallel.
Args:
model_configs: List of model config dicts
messages: Messages to send to each model
timeout: Request timeout
Returns:
Dict mapping model ID to response
"""
print(f"πŸš€ Querying {len(model_configs)} models in parallel...")
tasks = [query_model(config, messages, timeout) for config in model_configs]
responses = await asyncio.gather(*tasks, return_exceptions=True)
result = {}
for config, response in zip(model_configs, responses):
model_id = config["id"]
if isinstance(response, Exception):
print(f"❌ Model {model_id} raised exception: {response}")
result[model_id] = None
else:
result[model_id] = response
status = "βœ…" if response else "❌"
print(f"{status} Model {model_id} completed")
successful = sum(1 for r in result.values() if r is not None)
print(f"πŸ“Š {successful}/{len(model_configs)} models responded successfully")
return result