Spaces:
Sleeping
Sleeping
| """ | |
| Evolusis AI Agent - Unified Single Application | |
| All-in-one Streamlit app with integrated AI agent logic | |
| Run with: streamlit run app.py | |
| """ | |
| import streamlit as st | |
| import os | |
| import time | |
| import logging | |
| import json | |
| import re | |
| import copy | |
| import tempfile | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List | |
| from collections import deque | |
| import requests | |
| from dotenv import load_dotenv | |
| from groq import Groq | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Environment variables | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| OPENWEATHER_API_KEY = os.getenv("OPENWEATHER_API_KEY") | |
| NEWS_API_KEY = os.getenv("NEWS_API_KEY") | |
| # Initialize Groq | |
| groq_client = None | |
| logger.info("=== Initializing Groq Client ===") | |
| logger.info(f"GROQ_API_KEY present: {bool(GROQ_API_KEY)}") | |
| logger.info(f"GROQ_API_KEY length: {len(GROQ_API_KEY) if GROQ_API_KEY else 0}") | |
| if GROQ_API_KEY: | |
| try: | |
| logger.info("Starting Groq client initialization...") | |
| # Initialize Groq client for HF Spaces compatibility | |
| # Remove any proxy-related environment variables that might interfere | |
| env_backup = {} | |
| proxy_vars = ['HTTP_PROXY', 'HTTPS_PROXY', 'http_proxy', 'https_proxy', | |
| 'ALL_PROXY', 'all_proxy', 'NO_PROXY', 'no_proxy'] | |
| for var in proxy_vars: | |
| if var in os.environ: | |
| env_backup[var] = os.environ.pop(var) | |
| logger.info(f"Removed proxy variable: {var}") | |
| # Initialize Groq with minimal parameters | |
| groq_client = Groq(api_key=GROQ_API_KEY) | |
| # Restore environment variables | |
| for var, value in env_backup.items(): | |
| os.environ[var] = value | |
| logger.info("β Groq client initialized successfully") | |
| except Exception as e: | |
| logger.error(f"β Failed to initialize Groq: {type(e).__name__}: {e}") | |
| logger.error("Full error details:", exc_info=True) | |
| groq_client = None | |
| else: | |
| logger.error("β GROQ_API_KEY not found in environment variables") | |
| # Memory Store | |
| class MemoryStore: | |
| def __init__(self, max_size=10): | |
| self.sessions = {} | |
| self.max_size = max_size | |
| def add(self, session_id: str, query: str, response: str): | |
| if session_id not in self.sessions: | |
| self.sessions[session_id] = deque(maxlen=self.max_size) | |
| self.sessions[session_id].append({ | |
| "query": query, | |
| "response": response, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| def get_history(self, session_id: str, limit: int = 5) -> List[Dict]: | |
| if session_id not in self.sessions: | |
| return [] | |
| return list(self.sessions[session_id])[-limit:] | |
| memory_store = MemoryStore() | |
| # Tool Registry | |
| class ToolRegistry: | |
| def transcribe_audio(audio_file) -> Optional[str]: | |
| """Transcribe audio using Whisper Large V3 Turbo""" | |
| temp_file_path = None | |
| try: | |
| logger.info("=== Starting audio transcription ===") | |
| if not groq_client: | |
| logger.error("Groq client not initialized") | |
| return None | |
| # Ensure file pointer is at the beginning | |
| if hasattr(audio_file, 'seek'): | |
| audio_file.seek(0) | |
| logger.info("Reset file pointer to beginning") | |
| # Get the original filename or create a default one with proper extension | |
| # Streamlit's audio_input typically records in WAV format | |
| filename = getattr(audio_file, 'name', 'audio.wav') | |
| logger.info(f"Original filename: {filename}") | |
| # Determine file extension | |
| file_ext = '.wav' | |
| if any(filename.lower().endswith(ext) for ext in ['.wav', '.mp3', '.webm', '.m4a', '.ogg']): | |
| file_ext = os.path.splitext(filename)[1] | |
| logger.info(f"Using file extension: {file_ext}") | |
| # Read the file contents | |
| file_contents = audio_file.read() | |
| file_size = len(file_contents) | |
| logger.info(f"Read {file_size} bytes from audio file") | |
| if file_size == 0: | |
| logger.error("Audio file is empty (0 bytes)") | |
| return None | |
| # Check if file is too small | |
| if file_size < 1000: | |
| logger.warning(f"Audio file very small: {file_size} bytes. May be too short.") | |
| # Save to temporary file (more reliable approach) | |
| with tempfile.NamedTemporaryFile(mode='wb', suffix=file_ext, delete=False) as temp_file: | |
| temp_file.write(file_contents) | |
| temp_file_path = temp_file.name | |
| logger.info(f"Saved audio to temporary file: {temp_file_path}") | |
| # Open the temporary file and send to Groq API | |
| logger.info(f"Sending to Groq API - Model: whisper-large-v3-turbo, Size: {file_size} bytes") | |
| with open(temp_file_path, 'rb') as audio_file_handle: | |
| try: | |
| transcription = groq_client.audio.transcriptions.create( | |
| file=(os.path.basename(temp_file_path), audio_file_handle.read()), | |
| model="whisper-large-v3-turbo", | |
| response_format="text", | |
| temperature=0.0 | |
| ) | |
| logger.info("API call completed successfully") | |
| except Exception as api_error: | |
| logger.error(f"Groq API call failed: {type(api_error).__name__}") | |
| logger.error(f"API error details: {str(api_error)}") | |
| # Try to extract more details if it's a Groq API error | |
| if hasattr(api_error, 'response'): | |
| logger.error(f"Response status: {getattr(api_error.response, 'status_code', 'N/A')}") | |
| logger.error(f"Response body: {getattr(api_error.response, 'text', 'N/A')}") | |
| raise # Re-raise to be caught by outer exception handler | |
| result = str(transcription) | |
| logger.info(f"Transcription successful. Length: {len(result)} characters") | |
| logger.info(f"Transcription preview: {result[:100]}...") | |
| return result | |
| except Exception as e: | |
| logger.error(f"=== Whisper API Error ===") | |
| logger.error(f"Error type: {type(e).__name__}") | |
| logger.error(f"Error message: {str(e)}") | |
| logger.error(f"Full error details:", exc_info=True) | |
| return None | |
| finally: | |
| # Clean up temporary file | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| try: | |
| os.unlink(temp_file_path) | |
| logger.info(f"Cleaned up temporary file: {temp_file_path}") | |
| except Exception as cleanup_error: | |
| logger.warning(f"Failed to cleanup temp file: {cleanup_error}") | |
| def get_weather(city: str) -> Optional[Dict[str, Any]]: | |
| try: | |
| if not OPENWEATHER_API_KEY: | |
| return None | |
| url = f"http://api.openweathermap.org/data/2.5/weather" | |
| params = {"q": city, "appid": OPENWEATHER_API_KEY, "units": "metric"} | |
| response = requests.get(url, params=params, timeout=5) | |
| response.raise_for_status() | |
| data = response.json() | |
| return { | |
| "temperature": data["main"]["temp"], | |
| "description": data["weather"][0]["description"], | |
| "humidity": data["main"]["humidity"], | |
| "city": data["name"], | |
| "country": data["sys"]["country"] | |
| } | |
| except Exception as e: | |
| logger.error(f"Weather API error: {e}") | |
| return None | |
| def get_wikipedia(topic: str) -> Optional[str]: | |
| try: | |
| url = "https://en.wikipedia.org/api/rest_v1/page/summary/" + topic.replace(" ", "_") | |
| response = requests.get(url, timeout=5) | |
| response.raise_for_status() | |
| return response.json().get("extract", "No information found") | |
| except Exception as e: | |
| logger.error(f"Wikipedia API error: {e}") | |
| return None | |
| def get_news(query: str) -> Optional[List[Dict]]: | |
| try: | |
| if not NEWS_API_KEY: | |
| return None | |
| url = "https://newsapi.org/v2/everything" | |
| params = {"q": query, "apiKey": NEWS_API_KEY, "pageSize": 3, "sortBy": "publishedAt", "language": "en"} | |
| response = requests.get(url, params=params, timeout=5) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("articles"): | |
| return [{"title": a["title"], "description": a.get("description", ""), | |
| "source": a["source"]["name"], "url": a["url"]} for a in data["articles"][:3]] | |
| return None | |
| except Exception as e: | |
| logger.error(f"News API error: {e}") | |
| return None | |
| # AI Agent | |
| class AIAgent: | |
| def __init__(self): | |
| self.tools = ToolRegistry() | |
| def decide_tool(self, query: str, history: Optional[List[Dict]] = None) -> Dict[str, Any]: | |
| """Use LLM reasoning to decide which tools to use""" | |
| if not groq_client: | |
| # Fallback to simple pattern matching if LLM not available | |
| return self._fallback_decision(query, history) | |
| llm_decision = "" # Initialize to avoid unbound variable error | |
| try: | |
| # Build rich context from history | |
| context_str = "No previous conversation." | |
| if history and len(history) > 0: | |
| context_parts = [] | |
| for h in history[-3:]: | |
| q = h.get('query', '') | |
| r = h.get('response', '')[:150] # Truncate response | |
| context_parts.append(f"User: {q}\nAssistant: {r}") | |
| context_str = "\n\n".join(context_parts) | |
| # Create enhanced reasoning prompt | |
| reasoning_prompt = f"""You are an intelligent tool routing system that analyzes user queries to determine which external tools to invoke. | |
| **Available Tools:** | |
| 1. **WEATHER** - Fetch current weather data for any city/location/country | |
| - Use when: User mentions weather, temperature, climate, or location names (cities, countries, regions) | |
| - Examples: "weather in Paris", "how's the climate in Tokyo", "america?" (after weather context), "London temperature" | |
| - Can extract location from: explicit mentions, country names, city names, or follow-up questions | |
| 2. **WIKIPEDIA** - Retrieve factual knowledge about people, places, events, concepts | |
| - Use when: User asks "who is", "what is", "tell me about", "explain", historical/biographical queries | |
| - Examples: "who invented the telephone", "what is quantum physics", "Albert Einstein" | |
| 3. **NEWS** - Get latest news articles on specific topics | |
| - Use when: User asks about "news", "latest", "recent events", "headlines", current happenings | |
| - Examples: "latest AI news", "what's happening in tech", "recent developments" | |
| 4. **LLM_ONLY** - Use language model for general conversation, reasoning, explanations | |
| - Use when: No external data needed, creative/opinion questions, general chat | |
| **Recent Conversation Context:** | |
| {context_str} | |
| **Current Query:** {query} | |
| **Analysis Instructions:** | |
| - If query is very short (1-2 words) like "america?", "paris", "london" - check conversation history | |
| - If previous query was about weather, treat short location names as weather requests | |
| - Country names (USA, America, India, France, etc.) should trigger WEATHER when appropriate | |
| - Ambiguous queries should prefer external data sources over LLM-only responses | |
| - Use conversation context to resolve ambiguity in follow-up questions | |
| **Response Format (JSON only):** | |
| {{ | |
| "use_weather": true/false, | |
| "use_wikipedia": true/false, | |
| "use_news": true/false, | |
| "city": "city/country/location name if weather needed, else null", | |
| "topic": "topic name if wikipedia needed, else null", | |
| "news_query": "search terms if news needed, else null", | |
| "reasoning": "Clear explanation: What did you detect? Why these tools? What context influenced your decision?" | |
| }} | |
| **Important:** | |
| - For location queries (cities, countries, regions), always prefer WEATHER tool | |
| - "America" / "USA" should be treated as weather query if context suggests it | |
| - Be smart about follow-up questions - use conversation history | |
| - Provide detailed reasoning explaining your tool selection logic""" | |
| response = groq_client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert tool routing system. Analyze queries deeply and respond with valid JSON only. No markdown, no explanations outside JSON."}, | |
| {"role": "user", "content": reasoning_prompt} | |
| ], | |
| temperature=0.2, | |
| max_tokens=400 | |
| ) | |
| # Parse LLM response | |
| llm_decision = response.choices[0].message.content | |
| if not llm_decision: | |
| logger.warning("Empty LLM response, using fallback") | |
| return self._fallback_decision(query, history) | |
| # Extract JSON from response (handle potential markdown formatting) | |
| json_match = re.search(r'\{[\s\S]*\}', llm_decision) | |
| if json_match: | |
| decision_data = json.loads(json_match.group()) | |
| else: | |
| decision_data = json.loads(llm_decision) | |
| # Build decision object | |
| decision = { | |
| "use_weather": decision_data.get("use_weather", False), | |
| "use_wikipedia": decision_data.get("use_wikipedia", False), | |
| "use_news": decision_data.get("use_news", False), | |
| "use_llm": True, | |
| "extracted_params": {}, | |
| "reasoning": decision_data.get("reasoning", "LLM tool routing decision made") | |
| } | |
| # Extract parameters with validation | |
| if decision["use_weather"]: | |
| city = decision_data.get("city") | |
| if city and city != "null": | |
| decision["extracted_params"]["city"] = city | |
| logger.info(f"Weather tool selected for city: {city}") | |
| else: | |
| logger.warning("Weather tool selected but no city extracted") | |
| decision["use_weather"] = False | |
| if decision["use_wikipedia"]: | |
| topic = decision_data.get("topic") | |
| if topic and topic != "null": | |
| decision["extracted_params"]["topic"] = topic | |
| logger.info(f"Wikipedia tool selected for topic: {topic}") | |
| if decision["use_news"]: | |
| news_query = decision_data.get("news_query") | |
| if news_query and news_query != "null": | |
| decision["extracted_params"]["news_query"] = news_query | |
| logger.info(f"News tool selected for query: {news_query}") | |
| return decision | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON parsing error in LLM response: {e}") | |
| if 'llm_decision' in locals(): | |
| logger.error(f"LLM response was: {llm_decision}") | |
| return self._fallback_decision(query, history) | |
| except Exception as e: | |
| logger.error(f"LLM reasoning error: {e}") | |
| # Fallback to pattern matching | |
| return self._fallback_decision(query, history) | |
| def _fallback_decision(self, query: str, history: Optional[List[Dict]] = None) -> Dict[str, Any]: | |
| """Enhanced fallback pattern-based decision making with better context awareness""" | |
| query_lower = query.lower().strip() | |
| decision = { | |
| "use_weather": False, | |
| "use_wikipedia": False, | |
| "use_news": False, | |
| "use_llm": True, | |
| "extracted_params": {}, | |
| "reasoning": "Pattern-based routing (LLM unavailable)" | |
| } | |
| # Analyze conversation history for context | |
| previous_context = "" | |
| was_weather_context = False | |
| was_wiki_context = False | |
| if history and len(history) > 0: | |
| last_query = history[-1].get("query", "").lower() | |
| previous_context = last_query | |
| # Check what the last query was about | |
| weather_keywords = ["weather", "temperature", "forecast", "climate", "hot", "cold", "rain", "sunny", "cloudy"] | |
| was_weather_context = any(kw in previous_context for kw in weather_keywords) | |
| wiki_keywords = ["who is", "what is", "tell me about", "explain"] | |
| was_wiki_context = any(kw in previous_context for kw in wiki_keywords) | |
| # Extract potential location/topic from query | |
| query_words = query_lower.strip("?,.! ").split() | |
| # WEATHER DETECTION | |
| weather_keywords = ["weather", "temperature", "forecast", "climate", "hot", "cold", "rain", "sunny", "cloudy"] | |
| has_weather_keyword = any(kw in query_lower for kw in weather_keywords) | |
| # Countries and major locations that should trigger weather | |
| location_names = [ | |
| "america", "usa", "united states", "india", "china", "japan", "france", "germany", | |
| "london", "paris", "tokyo", "mumbai", "delhi", "bangalore", "new york", "sydney", | |
| "chembur", "andheri", "bandra", "pune", "hyderabad", "chennai", "kolkata", | |
| "california", "texas", "florida", "europe", "asia", "africa" | |
| ] | |
| has_location = any(loc in query_lower for loc in location_names) | |
| # Short query after weather context = likely a location follow-up | |
| is_short_followup = len(query_words) <= 2 and was_weather_context | |
| # Decide if this is a weather query | |
| if has_weather_keyword or has_location or is_short_followup: | |
| decision["use_weather"] = True | |
| city = self._extract_city(query, previous_context) | |
| if city: | |
| decision["extracted_params"]["city"] = city | |
| decision["reasoning"] = f"Detected weather query for location: {city}" | |
| else: | |
| # If no city extracted, disable weather tool | |
| decision["use_weather"] = False | |
| # WIKIPEDIA DETECTION | |
| knowledge_keywords = ["who is", "what is", "tell me about", "explain", "who invented", "who discovered", "define"] | |
| if any(kw in query_lower for kw in knowledge_keywords): | |
| decision["use_wikipedia"] = True | |
| topic = self._extract_topic(query) | |
| decision["extracted_params"]["topic"] = topic | |
| decision["reasoning"] = f"Knowledge query detected for: {topic}" | |
| # NEWS DETECTION | |
| news_keywords = ["news", "latest", "recent", "happening", "current events", "headlines", "breaking"] | |
| if any(kw in query_lower for kw in news_keywords): | |
| decision["use_news"] = True | |
| news_query = self._extract_news_query(query) | |
| decision["extracted_params"]["news_query"] = news_query | |
| decision["reasoning"] = f"News query detected for: {news_query}" | |
| return decision | |
| def _extract_city(self, query: str, previous_context: str = "") -> Optional[str]: | |
| """Enhanced city extraction with country/region support""" | |
| query_lower = query.lower().strip() | |
| # Handle explicit "in" syntax: "weather in Paris" | |
| if " in " in query_lower: | |
| parts = query_lower.split(" in ") | |
| if len(parts) > 1: | |
| city_part = parts[1].strip("?,.! ").split()[0] | |
| return city_part.title() | |
| # Map country/region names to capitals or major cities for weather API | |
| country_to_city = { | |
| "america": "New York", | |
| "usa": "New York", | |
| "united states": "New York", | |
| "india": "Mumbai", | |
| "china": "Beijing", | |
| "japan": "Tokyo", | |
| "france": "Paris", | |
| "germany": "Berlin", | |
| "uk": "London", | |
| "united kingdom": "London", | |
| "australia": "Sydney", | |
| "canada": "Toronto", | |
| "brazil": "Rio de Janeiro", | |
| "russia": "Moscow", | |
| "italy": "Rome", | |
| "spain": "Madrid", | |
| "mexico": "Mexico City" | |
| } | |
| # Check for country names | |
| for country, city in country_to_city.items(): | |
| if country in query_lower: | |
| logger.info(f"Mapped country '{country}' to city '{city}'") | |
| return city | |
| # Common cities and locations | |
| cities = [ | |
| "london", "paris", "tokyo", "new york", "mumbai", "delhi", "bangalore", "sydney", | |
| "chembur", "andheri", "bandra", "pune", "hyderabad", "chennai", "kolkata", | |
| "berlin", "madrid", "rome", "beijing", "shanghai", "los angeles", "chicago", | |
| "toronto", "vancouver", "dubai", "singapore", "hong kong", "seoul" | |
| ] | |
| for city in cities: | |
| if city in query_lower: | |
| return city.title() | |
| # If query is very short (1-2 words) and looks like a location name, use it | |
| words = query.strip("?,. ").split() | |
| if len(words) <= 2: | |
| potential_city = query.strip("?,. ").title() | |
| # Additional validation: if all letters (no special chars), likely a location | |
| if potential_city.replace(" ", "").isalpha(): | |
| logger.info(f"Treating short query '{potential_city}' as location name") | |
| return potential_city | |
| return None | |
| def _extract_topic(self, query: str) -> str: | |
| stop_words = ["who is", "what is", "tell me about", "explain", "who invented", "who discovered", "what's", "the"] | |
| topic = query.lower() | |
| for word in stop_words: | |
| topic = topic.replace(word, "") | |
| return topic.strip("?,. ") | |
| def _extract_news_query(self, query: str) -> str: | |
| stop_words = ["news", "latest", "recent", "what's", "tell me", "about", "the"] | |
| topic = query.lower() | |
| for word in stop_words: | |
| topic = topic.replace(word, "") | |
| return topic.strip("?,. ") or "technology" | |
| def process_query(self, query: str, session_id: str) -> dict: | |
| start_time = time.time() | |
| tools_used = [] | |
| reasoning_parts = [] | |
| history = memory_store.get_history(session_id, limit=3) | |
| decision = self.decide_tool(query, history) | |
| # Add LLM reasoning if available | |
| if decision.get("reasoning"): | |
| reasoning_parts.append(f"π§ LLM Decision: {decision['reasoning']}") | |
| else: | |
| reasoning_parts.append(f"Analyzed query intent: {query}") | |
| external_data = [] | |
| if decision["use_weather"]: | |
| city = decision["extracted_params"].get("city") | |
| if city: | |
| reasoning_parts.append(f"Fetching weather for {city}") | |
| weather = self.tools.get_weather(city) | |
| if weather: | |
| tools_used.append("OpenWeather API") | |
| external_data.append(f"Weather in {weather['city']}, {weather['country']}: {weather['temperature']}Β°C, {weather['description']}, Humidity: {weather['humidity']}%") | |
| if decision["use_news"]: | |
| news_query = decision["extracted_params"].get("news_query", "technology") | |
| reasoning_parts.append(f"Fetching news about {news_query}") | |
| news = self.tools.get_news(news_query) | |
| if news: | |
| tools_used.append("NewsAPI") | |
| news_text = "\n".join([f"- {item['title']} ({item['source']})" for item in news]) | |
| external_data.append(f"Latest news:\n{news_text}") | |
| if decision["use_wikipedia"]: | |
| topic = decision["extracted_params"].get("topic") | |
| if topic: | |
| reasoning_parts.append(f"Fetching Wikipedia info for {topic}") | |
| wiki_data = self.tools.get_wikipedia(topic) | |
| if wiki_data: | |
| tools_used.append("Wikipedia API") | |
| external_data.append(f"Wikipedia: {wiki_data[:500]}") | |
| else: | |
| reasoning_parts.append("Wikipedia data not available") | |
| reasoning_parts.append("Generating response with GPT oss-120B") | |
| if groq_client: | |
| tools_used.append("GPT oss-120B (Groq)") | |
| llm_response = self._call_groq(query, external_data, history) | |
| else: | |
| llm_response = "AI model unavailable. Please configure GROQ_API_KEY." | |
| memory_store.add(session_id, query, llm_response) | |
| response_time = int((time.time() - start_time) * 1000) | |
| return { | |
| "reasoning": " β ".join(reasoning_parts), | |
| "answer": llm_response, | |
| "tools_used": tools_used, | |
| "response_time_ms": response_time, | |
| "session_id": session_id | |
| } | |
| def _call_groq(self, query: str, external_data: List[str], history: List[Dict]) -> str: | |
| if not groq_client: | |
| return "Groq client not initialized. Please check GROQ_API_KEY." | |
| try: | |
| prompt_parts = ["You are a helpful AI assistant."] | |
| if history: | |
| prompt_parts.append("\nHistory:") | |
| for item in history: | |
| prompt_parts.append(f"User: {item['query']}\nAssistant: {item['response'][:200]}...") | |
| if external_data: | |
| prompt_parts.append("\nExternal Data:") | |
| prompt_parts.extend(external_data) | |
| prompt_parts.append(f"\nUser: {query}\nProvide a helpful response:") | |
| response = groq_client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[{"role": "system", "content": "You are a helpful AI assistant."}, | |
| {"role": "user", "content": "\n".join(prompt_parts)}], | |
| temperature=0.7, | |
| max_tokens=500 | |
| ) | |
| return response.choices[0].message.content or "No response generated." | |
| except Exception as e: | |
| logger.error(f"Groq error: {e}") | |
| return "Error generating response." | |
| # Initialize agent | |
| agent = AIAgent() | |
| # Streamlit UI starts here | |
| if True: | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="Evolusis AI Agent", | |
| page_icon="π€", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .main { | |
| background-color: #0e1117; | |
| } | |
| .stTextInput > div > div > input { | |
| background-color: #1e1e1e; | |
| color: #ffffff; | |
| border: 1px solid #333; | |
| border-radius: 8px; | |
| padding: 12px; | |
| } | |
| .user-message { | |
| background-color: #2b5278; | |
| color: white; | |
| padding: 12px 16px; | |
| border-radius: 12px; | |
| margin: 8px 0; | |
| margin-left: 20%; | |
| text-align: right; | |
| } | |
| .assistant-message { | |
| background-color: #1e1e1e; | |
| color: #e0e0e0; | |
| padding: 12px 16px; | |
| border-radius: 12px; | |
| margin: 8px 0; | |
| margin-right: 20%; | |
| } | |
| .metadata { | |
| font-size: 0.8em; | |
| color: #888; | |
| margin-top: 6px; | |
| } | |
| .tool-badge { | |
| display: inline-block; | |
| background-color: #333; | |
| color: #4CAF50; | |
| padding: 2px 8px; | |
| border-radius: 4px; | |
| margin-right: 6px; | |
| font-size: 0.75em; | |
| } | |
| .welcome-section { | |
| text-align: center; | |
| padding: 5px 20px; | |
| } | |
| .dev-info-card { | |
| text-align: center; | |
| padding: 5px; | |
| margin: 10px 0; | |
| color: #e0e0e0; | |
| font-size: 0.95em; | |
| } | |
| .dev-info-card a { | |
| color: #60a5fa; | |
| text-decoration: none; | |
| } | |
| .dev-info-card a:hover { | |
| text-decoration: underline; | |
| } | |
| .status-bar { | |
| padding: 8px 20px; | |
| margin-top: 30px; | |
| text-align: center; | |
| color: #888; | |
| font-size: 0.85em; | |
| } | |
| h1 { | |
| color: #4CAF50; | |
| } | |
| /* Sidebar conversation history - plain text style */ | |
| .stSidebar button[kind="secondary"] { | |
| background: transparent !important; | |
| border: none !important; | |
| box-shadow: none !important; | |
| padding: 4px 0 !important; | |
| text-align: left !important; | |
| color: #e0e0e0 !important; | |
| font-size: 0.9em !important; | |
| } | |
| .stSidebar button[kind="secondary"]:hover { | |
| background: transparent !important; | |
| color: #60a5fa !important; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Initialize session state | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "session_id" not in st.session_state: | |
| st.session_state.session_id = f"session_{int(time.time())}_{hash(time.time())}" | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if "saved_conversations" not in st.session_state: | |
| st.session_state.saved_conversations = [] | |
| if "processing" not in st.session_state: | |
| st.session_state.processing = False | |
| if "last_input" not in st.session_state: | |
| st.session_state.last_input = "" | |
| # Example queries | |
| EXAMPLE_QUERIES = [ | |
| "What's the weather in Paris today?", | |
| "Who invented the telephone?", | |
| "Latest news about artificial intelligence", | |
| "Tell me about Albert Einstein", | |
| "Weather in Mumbai", | |
| "Recent technology news", | |
| "What is quantum computing?", | |
| "Explain the theory of relativity", | |
| "What are the benefits of meditation?", | |
| "How does photosynthesis work?" | |
| ] | |
| def call_agent(query: str) -> dict: | |
| """Call the AI agent directly""" | |
| try: | |
| return agent.process_query(query, st.session_state.session_id) | |
| except Exception as e: | |
| logger.error(f"Agent error: {e}") | |
| return { | |
| "answer": f"β Error: {str(e)}", | |
| "tools_used": [], | |
| "response_time_ms": 0, | |
| "reasoning": f"Error: {str(e)}" | |
| } | |
| def display_message(message: dict): | |
| """Display a chat message""" | |
| if message["role"] == "user": | |
| st.markdown(f'<div class="user-message">π€ {message["content"]}</div>', unsafe_allow_html=True) | |
| else: | |
| # Show reasoning in collapsible section | |
| if "reasoning" in message and message["reasoning"]: | |
| with st.expander("π§ Reasoning", expanded=False): | |
| st.markdown("**Decision Process:**") | |
| st.text(message["reasoning"]) | |
| # Show JSON response | |
| st.markdown("**JSON Response:**") | |
| json_output = { | |
| "reasoning": message["reasoning"], | |
| "answer": message["content"], | |
| "tools_used": message.get("tools_used", []), | |
| "response_time_ms": message.get("response_time_ms", 0), | |
| "timestamp": message.get("timestamp", "") | |
| } | |
| st.json(json_output) | |
| # Show main answer | |
| st.markdown(f'<div class="assistant-message">π€ {message["content"]}</div>', unsafe_allow_html=True) | |
| if "tools_used" in message and message["tools_used"]: | |
| tools_html = " ".join([f'<span class="tool-badge">{tool}</span>' for tool in message["tools_used"]]) | |
| metadata = f'<div class="metadata">{tools_html}' | |
| if "response_time_ms" in message and message["response_time_ms"] > 0: | |
| metadata += f' β±οΈ {message["response_time_ms"]}ms' | |
| metadata += '</div>' | |
| st.markdown(metadata, unsafe_allow_html=True) | |
| def process_query(query: str): | |
| """Process user query""" | |
| # Prevent processing if already processing | |
| if st.session_state.processing: | |
| return | |
| st.session_state.processing = True | |
| st.session_state.messages.append({ | |
| "role": "user", | |
| "content": query, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| with st.spinner("π€ Thinking..."): | |
| response = call_agent(query) | |
| st.session_state.messages.append({ | |
| "role": "assistant", | |
| "content": response.get("answer", "No response"), | |
| "tools_used": response.get("tools_used", []), | |
| "response_time_ms": response.get("response_time_ms", 0), | |
| "reasoning": response.get("reasoning", ""), | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # Save to chat history | |
| if len(st.session_state.messages) >= 2: | |
| st.session_state.chat_history.append({ | |
| "query": query, | |
| "timestamp": datetime.now().strftime("%H:%M:%S"), | |
| "session_id": st.session_state.session_id | |
| }) | |
| if len(st.session_state.chat_history) > 20: | |
| st.session_state.chat_history = st.session_state.chat_history[-20:] | |
| # Reset processing flag after a short delay | |
| st.session_state.processing = False | |
| # Header | |
| st.title("π€ Evolusis AI Agent") | |
| # Main content | |
| if len(st.session_state.messages) == 0: | |
| # Developer info card - right after title | |
| st.markdown(""" | |
| <div class="dev-info-card"> | |
| Yash Gori - +91 7718081766 / <a href="mailto:[email protected]">Email</a> / <a href="https://yashgori20.vercel.app/" target="_blank">Portfolio</a> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Welcome screen with developer info and status | |
| st.markdown('<div class="welcome-section">', unsafe_allow_html=True) | |
| st.markdown("### Intelligent assistant combining LLM reasoning with real-time data") | |
| st.markdown("**Powered by:** GPT oss-120B β’ Whisper Large V3 Turbo β’ OpenWeather β’ Wikipedia β’ NewsAPI") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Chat input and audio upload | |
| st.markdown("---") | |
| # Audio recording for speech-to-text | |
| audio_input = st.audio_input("π€ Click to speak") | |
| if audio_input: | |
| with st.spinner("π§ Transcribing your voice..."): | |
| try: | |
| logger.info(f"Received audio input: {audio_input}") | |
| logger.info(f"Audio input type: {type(audio_input)}") | |
| logger.info(f"Audio input name: {getattr(audio_input, 'name', 'N/A')}") | |
| transcription = agent.tools.transcribe_audio(audio_input) | |
| if transcription: | |
| st.success(f"β You said: {transcription}") | |
| process_query(transcription) | |
| st.rerun() | |
| else: | |
| error_details = "Transcription returned None. Possible causes:\n" | |
| error_details += "- Audio file is empty or too short\n" | |
| error_details += "- Groq API key invalid or missing\n" | |
| error_details += "- Network connectivity issue\n" | |
| error_details += f"- Groq client initialized: {bool(groq_client)}" | |
| st.error(f"β Transcription failed\n\n{error_details}") | |
| logger.error("Transcription returned None") | |
| except Exception as e: | |
| error_msg = f"β Error during transcription\n\n" | |
| error_msg += f"**Error Type:** {type(e).__name__}\n\n" | |
| error_msg += f"**Error Message:** {str(e)}\n\n" | |
| # Add more context if available | |
| if hasattr(e, '__cause__') and e.__cause__: | |
| error_msg += f"**Cause:** {str(e.__cause__)}\n\n" | |
| st.error(error_msg) | |
| logger.error(f"Transcription UI error: {e}", exc_info=True) | |
| # Text input | |
| user_input = st.text_input("β¨οΈ Or type your question...", key="chat_input_text") | |
| if user_input and user_input != st.session_state.last_input: | |
| st.session_state.last_input = user_input | |
| process_query(user_input) | |
| st.rerun() | |
| st.markdown("---") | |
| st.markdown("π€οΈ **Weather Queries:**") | |
| col_a, col_b = st.columns(2) | |
| with col_a: | |
| if st.button(EXAMPLE_QUERIES[0], key="ex_0", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[0] | |
| process_query(EXAMPLE_QUERIES[0]) | |
| st.rerun() | |
| with col_b: | |
| if st.button(EXAMPLE_QUERIES[4], key="ex_4", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[4] | |
| process_query(EXAMPLE_QUERIES[4]) | |
| st.rerun() | |
| st.markdown("π **Knowledge Queries:**") | |
| col_c, col_d = st.columns(2) | |
| with col_c: | |
| if st.button(EXAMPLE_QUERIES[1], key="ex_1", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[1] | |
| process_query(EXAMPLE_QUERIES[1]) | |
| st.rerun() | |
| with col_d: | |
| if st.button(EXAMPLE_QUERIES[3], key="ex_3", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[3] | |
| process_query(EXAMPLE_QUERIES[3]) | |
| st.rerun() | |
| st.markdown("π° **News Queries:**") | |
| col_e, col_f = st.columns(2) | |
| with col_e: | |
| if st.button(EXAMPLE_QUERIES[2], key="ex_2", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[2] | |
| process_query(EXAMPLE_QUERIES[2]) | |
| st.rerun() | |
| with col_f: | |
| if st.button(EXAMPLE_QUERIES[5], key="ex_5", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[5] | |
| process_query(EXAMPLE_QUERIES[5]) | |
| st.rerun() | |
| st.markdown("π§ **Reasoning Queries (Groq LLM Only):**") | |
| col_g, col_h = st.columns(2) | |
| with col_g: | |
| if st.button(EXAMPLE_QUERIES[6], key="ex_6", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[6] | |
| process_query(EXAMPLE_QUERIES[6]) | |
| st.rerun() | |
| if st.button(EXAMPLE_QUERIES[8], key="ex_8", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[8] | |
| process_query(EXAMPLE_QUERIES[8]) | |
| st.rerun() | |
| with col_h: | |
| if st.button(EXAMPLE_QUERIES[7], key="ex_7", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[7] | |
| process_query(EXAMPLE_QUERIES[7]) | |
| st.rerun() | |
| if st.button(EXAMPLE_QUERIES[9], key="ex_9", use_container_width=True): | |
| st.session_state.last_input = EXAMPLE_QUERIES[9] | |
| process_query(EXAMPLE_QUERIES[9]) | |
| st.rerun() | |
| else: | |
| # Show conversation messages | |
| for message in st.session_state.messages: | |
| display_message(message) | |
| # Continue conversation input | |
| st.markdown("---") | |
| # Audio recording for follow-up | |
| audio_input = st.audio_input("π€ Click to speak", key="followup_audio") | |
| if audio_input and not st.session_state.processing: | |
| with st.spinner("π§ Transcribing your voice..."): | |
| try: | |
| logger.info(f"Received followup audio input: {audio_input}") | |
| logger.info(f"Audio input type: {type(audio_input)}") | |
| logger.info(f"Audio input name: {getattr(audio_input, 'name', 'N/A')}") | |
| transcription = agent.tools.transcribe_audio(audio_input) | |
| if transcription: | |
| st.success(f"β You said: {transcription}") | |
| process_query(transcription) | |
| st.rerun() | |
| else: | |
| error_details = "Transcription returned None. Possible causes:\n" | |
| error_details += "- Audio file is empty or too short\n" | |
| error_details += "- Groq API key invalid or missing\n" | |
| error_details += "- Network connectivity issue\n" | |
| error_details += f"- Groq client initialized: {bool(groq_client)}" | |
| st.error(f"β Transcription failed\n\n{error_details}") | |
| logger.error("Transcription returned None") | |
| except Exception as e: | |
| error_msg = f"β Error during transcription\n\n" | |
| error_msg += f"**Error Type:** {type(e).__name__}\n\n" | |
| error_msg += f"**Error Message:** {str(e)}\n\n" | |
| # Add more context if available | |
| if hasattr(e, '__cause__') and e.__cause__: | |
| error_msg += f"**Cause:** {str(e.__cause__)}\n\n" | |
| st.error(error_msg) | |
| logger.error(f"Transcription UI error: {e}", exc_info=True) | |
| # Text input for follow-up | |
| user_input = st.text_input("β¨οΈ Continue the conversation...", key="followup_text") | |
| if user_input and user_input != st.session_state.last_input and not st.session_state.processing: | |
| st.session_state.last_input = user_input | |
| process_query(user_input) | |
| st.rerun() | |
| # System Status Bar at the end | |
| status_groq = 'β' if groq_client is not None else 'β' | |
| status_weather = 'β' if OPENWEATHER_API_KEY else 'β' | |
| status_news = 'β' if NEWS_API_KEY else 'β' | |
| st.markdown(f""" | |
| <div class="status-bar"> | |
| Groq {status_groq} | Weather {status_weather} | News {status_news} | Wiki β | Sessions: {len(memory_store.sessions)} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Left Sidebar - Chat History and Controls | |
| with st.sidebar: | |
| if st.button("π New Chat", use_container_width=True): | |
| # Save current conversation before starting new one | |
| if st.session_state.messages: | |
| first_query = st.session_state.messages[0]["content"] if st.session_state.messages else "Conversation" | |
| # Create a deep copy to avoid reference issues | |
| st.session_state.saved_conversations.append({ | |
| "title": first_query[:50] + "...", | |
| "messages": copy.deepcopy(st.session_state.messages), | |
| "session_id": st.session_state.session_id, # Keep original session ID for history | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M") | |
| }) | |
| # Keep only last 10 conversations | |
| if len(st.session_state.saved_conversations) > 10: | |
| st.session_state.saved_conversations = st.session_state.saved_conversations[-10:] | |
| st.session_state.messages = [] | |
| # Generate a new session ID for the new chat | |
| st.session_state.session_id = f"session_{int(time.time())}_{hash(time.time())}" | |
| st.session_state.last_input = "" | |
| st.rerun() | |
| if st.button("ποΈ Clear History", use_container_width=True): | |
| st.session_state.chat_history = [] | |
| st.session_state.saved_conversations = [] | |
| st.rerun() | |
| st.markdown("<div style='margin: 20px 0; border-top: 1px solid #333;'></div>", unsafe_allow_html=True) | |
| if st.session_state.saved_conversations: | |
| for idx, conv in enumerate(reversed(st.session_state.saved_conversations)): | |
| # Simple clickable text without button box styling | |
| if st.button(conv['title'], key=f"conv_{idx}"): | |
| # Restore this conversation with deep copy to avoid reference issues | |
| st.session_state.messages = copy.deepcopy(conv["messages"]) | |
| # Generate a new session ID to avoid memory store conflicts | |
| st.session_state.session_id = f"session_{int(time.time())}_{hash(time.time())}" | |
| st.rerun() | |
| # Minimal separator with less padding | |
| if idx < len(st.session_state.saved_conversations) - 1: | |
| st.markdown("<div style='margin: 4px 0; border-top: 1px solid #222;'></div>", unsafe_allow_html=True) | |
| else: | |
| st.markdown("<div style='text-align: center; color: #666; font-size: 0.85em; margin-top: 20px;'>No saved conversations</div>", unsafe_allow_html=True) | |