Spaces:
Runtime error
Runtime error
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| import json | |
| import os | |
| import time | |
| import uuid | |
| from typing import Any, Dict, List, Optional, Union | |
| import httpx | |
| from openai import OpenAI, Stream | |
| from camel.configs import ( | |
| SAMBA_CLOUD_API_PARAMS, | |
| SAMBA_VERSE_API_PARAMS, | |
| SambaCloudAPIConfig, | |
| ) | |
| from camel.messages import OpenAIMessage | |
| from camel.models import BaseModelBackend | |
| from camel.types import ( | |
| ChatCompletion, | |
| ChatCompletionChunk, | |
| CompletionUsage, | |
| ModelType, | |
| ) | |
| from camel.utils import ( | |
| BaseTokenCounter, | |
| OpenAITokenCounter, | |
| api_keys_required, | |
| ) | |
| try: | |
| if os.getenv("AGENTOPS_API_KEY") is not None: | |
| from agentops import LLMEvent, record | |
| else: | |
| raise ImportError | |
| except (ImportError, AttributeError): | |
| LLMEvent = None | |
| class SambaModel(BaseModelBackend): | |
| r"""SambaNova service interface. | |
| Args: | |
| model_type (Union[ModelType, str]): Model for which a SambaNova backend | |
| is created. Supported models via SambaNova Cloud: | |
| `https://community.sambanova.ai/t/supported-models/193`. | |
| Supported models via SambaVerse API is listed in | |
| `https://sambaverse.sambanova.ai/models`. | |
| model_config_dict (Optional[Dict[str, Any]], optional): A dictionary | |
| that will be fed into:obj:`openai.ChatCompletion.create()`. If | |
| :obj:`None`, :obj:`SambaCloudAPIConfig().as_dict()` will be used. | |
| (default: :obj:`None`) | |
| api_key (Optional[str], optional): The API key for authenticating | |
| with the SambaNova service. (default: :obj:`None`) | |
| url (Optional[str], optional): The url to the SambaNova service. | |
| Current support SambaVerse API: | |
| :obj:`"https://sambaverse.sambanova.ai/api/predict"` and | |
| SambaNova Cloud: | |
| :obj:`"https://api.sambanova.ai/v1"` (default: :obj:`https://api. | |
| sambanova.ai/v1`) | |
| token_counter (Optional[BaseTokenCounter], optional): Token counter to | |
| use for the model. If not provided, :obj:`OpenAITokenCounter( | |
| ModelType.GPT_4O_MINI)` will be used. | |
| """ | |
| def __init__( | |
| self, | |
| model_type: Union[ModelType, str], | |
| model_config_dict: Optional[Dict[str, Any]] = None, | |
| api_key: Optional[str] = None, | |
| url: Optional[str] = None, | |
| token_counter: Optional[BaseTokenCounter] = None, | |
| ) -> None: | |
| if model_config_dict is None: | |
| model_config_dict = SambaCloudAPIConfig().as_dict() | |
| api_key = api_key or os.environ.get("SAMBA_API_KEY") | |
| url = url or os.environ.get( | |
| "SAMBA_API_BASE_URL", | |
| "https://api.sambanova.ai/v1", | |
| ) | |
| super().__init__( | |
| model_type, model_config_dict, api_key, url, token_counter | |
| ) | |
| if self._url == "https://api.sambanova.ai/v1": | |
| self._client = OpenAI( | |
| timeout=180, | |
| max_retries=3, | |
| base_url=self._url, | |
| api_key=self._api_key, | |
| ) | |
| def token_counter(self) -> BaseTokenCounter: | |
| r"""Initialize the token counter for the model backend. | |
| Returns: | |
| BaseTokenCounter: The token counter following the model's | |
| tokenization style. | |
| """ | |
| if not self._token_counter: | |
| self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) | |
| return self._token_counter | |
| def check_model_config(self): | |
| r"""Check whether the model configuration contains any | |
| unexpected arguments to SambaNova API. | |
| Raises: | |
| ValueError: If the model configuration dictionary contains any | |
| unexpected arguments to SambaNova API. | |
| """ | |
| if self._url == "https://sambaverse.sambanova.ai/api/predict": | |
| for param in self.model_config_dict: | |
| if param not in SAMBA_VERSE_API_PARAMS: | |
| raise ValueError( | |
| f"Unexpected argument `{param}` is " | |
| "input into SambaVerse API." | |
| ) | |
| elif self._url == "https://api.sambanova.ai/v1": | |
| for param in self.model_config_dict: | |
| if param not in SAMBA_CLOUD_API_PARAMS: | |
| raise ValueError( | |
| f"Unexpected argument `{param}` is " | |
| "input into SambaCloud API." | |
| ) | |
| else: | |
| raise ValueError( | |
| f"{self._url} is not supported, please check the url to the" | |
| " SambaNova service" | |
| ) | |
| def run( # type: ignore[misc] | |
| self, messages: List[OpenAIMessage] | |
| ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| r"""Runs SambaNova's service. | |
| Args: | |
| messages (List[OpenAIMessage]): Message list with the chat history | |
| in OpenAI API format. | |
| Returns: | |
| Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| `ChatCompletion` in the non-stream mode, or | |
| `Stream[ChatCompletionChunk]` in the stream mode. | |
| """ | |
| if "tools" in self.model_config_dict: | |
| del self.model_config_dict["tools"] | |
| if self.model_config_dict.get("stream") is True: | |
| return self._run_streaming(messages) | |
| else: | |
| return self._run_non_streaming(messages) | |
| def _run_streaming( | |
| self, messages: List[OpenAIMessage] | |
| ) -> Stream[ChatCompletionChunk]: | |
| r"""Handles streaming inference with SambaNova's API. | |
| Args: | |
| messages (List[OpenAIMessage]): A list of messages representing the | |
| chat history in OpenAI API format. | |
| Returns: | |
| Stream[ChatCompletionChunk]: A generator yielding | |
| `ChatCompletionChunk` objects as they are received from the | |
| API. | |
| Raises: | |
| RuntimeError: If the HTTP request fails. | |
| ValueError: If the API doesn't support stream mode. | |
| """ | |
| # Handle SambaNova's Cloud API | |
| if self._url == "https://api.sambanova.ai/v1": | |
| response = self._client.chat.completions.create( | |
| messages=messages, | |
| model=self.model_type, | |
| **self.model_config_dict, | |
| ) | |
| # Add AgentOps LLM Event tracking | |
| if LLMEvent: | |
| llm_event = LLMEvent( | |
| thread_id=response.id, | |
| prompt=" ".join( | |
| [message.get("content") for message in messages] # type: ignore[misc] | |
| ), | |
| prompt_tokens=response.usage.prompt_tokens, # type: ignore[union-attr] | |
| completion=response.choices[0].message.content, | |
| completion_tokens=response.usage.completion_tokens, # type: ignore[union-attr] | |
| model=self.model_type, | |
| ) | |
| record(llm_event) | |
| return response | |
| elif self._url == "https://sambaverse.sambanova.ai/api/predict": | |
| raise ValueError( | |
| "https://sambaverse.sambanova.ai/api/predict doesn't support" | |
| " stream mode" | |
| ) | |
| raise RuntimeError(f"Unknown URL: {self._url}") | |
| def _run_non_streaming( | |
| self, messages: List[OpenAIMessage] | |
| ) -> ChatCompletion: | |
| r"""Handles non-streaming inference with SambaNova's API. | |
| Args: | |
| messages (List[OpenAIMessage]): A list of messages representing the | |
| message in OpenAI API format. | |
| Returns: | |
| ChatCompletion: A `ChatCompletion` object containing the complete | |
| response from the API. | |
| Raises: | |
| RuntimeError: If the HTTP request fails. | |
| ValueError: If the JSON response cannot be decoded or is missing | |
| expected data. | |
| """ | |
| # Handle SambaNova's Cloud API | |
| if self._url == "https://api.sambanova.ai/v1": | |
| response = self._client.chat.completions.create( | |
| messages=messages, | |
| model=self.model_type, | |
| **self.model_config_dict, | |
| ) | |
| # Add AgentOps LLM Event tracking | |
| if LLMEvent: | |
| llm_event = LLMEvent( | |
| thread_id=response.id, | |
| prompt=" ".join( | |
| [message.get("content") for message in messages] # type: ignore[misc] | |
| ), | |
| prompt_tokens=response.usage.prompt_tokens, # type: ignore[union-attr] | |
| completion=response.choices[0].message.content, | |
| completion_tokens=response.usage.completion_tokens, # type: ignore[union-attr] | |
| model=self.model_type, | |
| ) | |
| record(llm_event) | |
| return response | |
| # Handle SambaNova's Sambaverse API | |
| else: | |
| headers = { | |
| "Content-Type": "application/json", | |
| "key": str(self._api_key), | |
| "modelName": self.model_type, | |
| } | |
| data = { | |
| "instance": json.dumps( | |
| { | |
| "conversation_id": str(uuid.uuid4()), | |
| "messages": messages, | |
| } | |
| ), | |
| "params": { | |
| "do_sample": {"type": "bool", "value": "true"}, | |
| "max_tokens_to_generate": { | |
| "type": "int", | |
| "value": str(self.model_config_dict.get("max_tokens")), | |
| }, | |
| "process_prompt": {"type": "bool", "value": "true"}, | |
| "repetition_penalty": { | |
| "type": "float", | |
| "value": str( | |
| self.model_config_dict.get("repetition_penalty") | |
| ), | |
| }, | |
| "return_token_count_only": { | |
| "type": "bool", | |
| "value": "false", | |
| }, | |
| "select_expert": { | |
| "type": "str", | |
| "value": self.model_type.split('/')[1], | |
| }, | |
| "stop_sequences": { | |
| "type": "str", | |
| "value": self.model_config_dict.get("stop_sequences"), | |
| }, | |
| "temperature": { | |
| "type": "float", | |
| "value": str( | |
| self.model_config_dict.get("temperature") | |
| ), | |
| }, | |
| "top_k": { | |
| "type": "int", | |
| "value": str(self.model_config_dict.get("top_k")), | |
| }, | |
| "top_p": { | |
| "type": "float", | |
| "value": str(self.model_config_dict.get("top_p")), | |
| }, | |
| }, | |
| } | |
| try: | |
| # Send the request and handle the response | |
| with httpx.Client() as client: | |
| response = client.post( | |
| self._url, # type: ignore[arg-type] | |
| headers=headers, | |
| json=data, | |
| ) | |
| raw_text = response.text | |
| # Split the string into two dictionaries | |
| dicts = raw_text.split('}\n{') | |
| # Keep only the last dictionary | |
| last_dict = '{' + dicts[-1] | |
| # Parse the dictionary | |
| last_dict = json.loads(last_dict) | |
| return self._sambaverse_to_openai_response(last_dict) # type: ignore[arg-type] | |
| except httpx.HTTPStatusError: | |
| raise RuntimeError(f"HTTP request failed: {raw_text}") | |
| def _sambaverse_to_openai_response( | |
| self, samba_response: Dict[str, Any] | |
| ) -> ChatCompletion: | |
| r"""Converts SambaVerse API response into an OpenAI-compatible | |
| response. | |
| Args: | |
| samba_response (Dict[str, Any]): A dictionary representing | |
| responses from the SambaVerse API. | |
| Returns: | |
| ChatCompletion: A `ChatCompletion` object constructed from the | |
| aggregated response data. | |
| """ | |
| choices = [ | |
| dict( | |
| index=0, | |
| message={ | |
| "role": 'assistant', | |
| "content": samba_response['result']['responses'][0][ | |
| 'completion' | |
| ], | |
| }, | |
| finish_reason=samba_response['result']['responses'][0][ | |
| 'stop_reason' | |
| ], | |
| ) | |
| ] | |
| obj = ChatCompletion.construct( | |
| id=None, | |
| choices=choices, | |
| created=int(time.time()), | |
| model=self.model_type, | |
| object="chat.completion", | |
| # SambaVerse API only provide `total_tokens` | |
| usage=CompletionUsage( | |
| completion_tokens=0, | |
| prompt_tokens=0, | |
| total_tokens=int( | |
| samba_response['result']['responses'][0][ | |
| 'total_tokens_count' | |
| ] | |
| ), | |
| ), | |
| ) | |
| return obj | |
| def stream(self) -> bool: | |
| r"""Returns whether the model is in stream mode, which sends partial | |
| results each time. | |
| Returns: | |
| bool: Whether the model is in stream mode. | |
| """ | |
| return self.model_config_dict.get('stream', False) | |