Spaces:
Runtime error
Runtime error
| import os | |
| import gradio as gr | |
| import google.generativeai as genai | |
| from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from datetime import datetime | |
| import pytz | |
| import time | |
| # Get API key from Hugging Face Spaces secrets | |
| google_api_key = os.environ.get("GOOGLE_API_KEY") | |
| if not google_api_key: | |
| raise ValueError("GOOGLE_API_KEY not found in environment variables. Please add it to Hugging Face Space secrets.") | |
| # Configure Google Generative AI | |
| genai.configure(api_key=google_api_key) | |
| # Function to get current date and time | |
| def get_current_datetime(): | |
| # Using UTC as default, but you can change to any timezone | |
| utc_now = datetime.now(pytz.UTC) | |
| # Convert to IST (Indian Standard Time) - modify as needed | |
| ist_timezone = pytz.timezone('Asia/Kolkata') | |
| ist_now = utc_now.astimezone(ist_timezone) | |
| # Format the datetime | |
| formatted_date = ist_now.strftime("%B %d, %Y") | |
| formatted_time = ist_now.strftime("%I:%M:%S %p") | |
| return formatted_date, formatted_time | |
| # Load PDF and create vector store | |
| def initialize_retriever(): | |
| try: | |
| # Get current directory | |
| current_dir = os.getcwd() | |
| print(f"Current working directory: {current_dir}") | |
| # List files in current directory for debugging | |
| print(f"Files in directory: {os.listdir(current_dir)}") | |
| # Use absolute path for the PDF | |
| pdf_path = os.path.join(current_dir, "Team1.pdf") | |
| print(f"Attempting to load PDF from: {pdf_path}") | |
| # Check if file exists | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(f"The file {pdf_path} does not exist") | |
| # Load PDF | |
| loader = PyPDFLoader(pdf_path) | |
| documents = loader.load() | |
| print(f"Successfully loaded {len(documents)} pages from the PDF") | |
| # Split text into chunks | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=10) | |
| text_chunks = text_splitter.split_documents(documents) | |
| print(f"Split into {len(text_chunks)} text chunks") | |
| # Generate embeddings | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| # Store embeddings in FAISS index | |
| vectorstore = FAISS.from_documents(text_chunks, embeddings) | |
| print("Successfully created vector store") | |
| return vectorstore.as_retriever(search_kwargs={"k": 4}) | |
| except Exception as e: | |
| print(f"Error in initialize_retriever: {str(e)}") | |
| # Return a dummy retriever for graceful failure | |
| class DummyRetriever: | |
| def get_relevant_documents(self, query): | |
| return [] | |
| print("Returning dummy retriever due to error") | |
| return DummyRetriever() | |
| # Initialize LLM | |
| def get_llm(): | |
| try: | |
| return ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) | |
| except Exception as e: | |
| print(f"Error initializing LLM: {str(e)}") | |
| return None | |
| llm = get_llm() | |
| # RAG query function | |
| def rag_query(query, retriever): | |
| if retriever is None: | |
| return "Error: Could not initialize document retriever. Please check if Team1.pdf exists." | |
| # Get current date and time for context | |
| current_date, current_time = get_current_datetime() | |
| try: | |
| # Retrieve relevant documents | |
| docs = retriever.get_relevant_documents(query) | |
| if not docs: | |
| return "No relevant information found in the document. Try a general query instead." | |
| # Create context from retrieved documents | |
| context = "\n".join([doc.page_content for doc in docs]) | |
| prompt = f"""Context:\n{context} | |
| Current Date: {current_date} | |
| Current Time: {current_time} | |
| Question: {query} | |
| Answer directly and concisely, using the current date and time information if relevant:""" | |
| response = llm.invoke(prompt) | |
| return response.content | |
| except Exception as e: | |
| return f"Error in RAG processing: {str(e)}" | |
| # General query function | |
| def general_query(query): | |
| if llm is None: | |
| return "Error: Could not initialize language model. Please check your API key." | |
| # Get current date and time for context | |
| current_date, current_time = get_current_datetime() | |
| try: | |
| # Define the prompt with date and time context | |
| prompt_template = """Current Date: {date} | |
| Current Time: {time} | |
| Answer the following query, using the current date and time information if relevant: {query}""" | |
| prompt = PromptTemplate.from_template(prompt_template) | |
| # Create an LLM Chain | |
| chain = LLMChain(llm=llm, prompt=prompt) | |
| # Run chatbot and get response | |
| response = chain.run(date=current_date, time=current_time, query=query) | |
| return response | |
| except Exception as e: | |
| return f"Error in general query: {str(e)}" | |
| # Function to handle the case when no PDF is found | |
| def file_not_found_message(): | |
| return ("The Team1.pdf file could not be found. Team Query mode will not work properly. " | |
| "Please ensure the PDF is correctly uploaded to the Hugging Face Space.") | |
| # Query router function | |
| def query_router(query, method, retriever): | |
| if method == "Team Query": | |
| if isinstance(retriever, type) or retriever is None: | |
| return file_not_found_message() | |
| return rag_query(query, retriever) | |
| elif method == "General Query": | |
| return general_query(query) | |
| return "Invalid selection!" | |
| # Function to update the clock | |
| def update_datetime(): | |
| date, time = get_current_datetime() | |
| return date, time | |
| # Main function to create and launch the Gradio interface | |
| def main(): | |
| # Initialize retriever | |
| print("Initializing retriever...") | |
| retriever = initialize_retriever() | |
| # Custom CSS for styling | |
| custom_css = """ | |
| .gradio-container { | |
| background-color: #f0f0f0; | |
| text-align: center; | |
| } | |
| #logo img { | |
| display: block; | |
| margin: 0 auto; | |
| max-width: 200px; | |
| } | |
| .datetime-display { | |
| text-align: center; | |
| margin-bottom: 20px; | |
| font-size: 18px; | |
| font-weight: bold; | |
| } | |
| """ | |
| logo_path = "equinix-sign.jpg" | |
| logo_exists = os.path.exists(logo_path) | |
| # Create Gradio UI | |
| with gr.Blocks(css=custom_css) as ui: | |
| if logo_exists: | |
| gr.Image(logo_path, elem_id="logo", show_label=False, height=100, width=200) | |
| else: | |
| gr.Markdown("<h2 style='text-align: center;'>Equinix</h2>") | |
| print(f"Warning: Logo file {logo_path} not found") | |
| # Title & Description | |
| gr.Markdown("<h1 style='text-align: center; color: black;'>Equinix Chatbot for Automation Team</h1>") | |
| # Date and Time Display | |
| with gr.Row(elem_classes="datetime-display"): | |
| date_display = gr.Textbox(label="Date", interactive=False) | |
| time_display = gr.Textbox(label="Time", interactive=False) | |
| # Update date and time using Gradio's interval functionality | |
| date_val, time_val = get_current_datetime() | |
| date_display.value = date_val | |
| time_display.value = time_val | |
| # Add refresh button for time | |
| refresh_btn = gr.Button("Update Date & Time") | |
| refresh_btn.click(fn=update_datetime, inputs=[], outputs=[date_display, time_display]) | |
| gr.Markdown("<p style='text-align: center; color: black;'>Ask me anything!</p>") | |
| # Input & Dropdown Section | |
| with gr.Row(): | |
| query_input = gr.Textbox(label="Enter your query") | |
| query_method = gr.Dropdown(["Team Query", "General Query"], label="Select Query Type", value="Team Query") | |
| # Button for submitting query | |
| submit_button = gr.Button("Submit") | |
| # Output Textbox | |
| output_box = gr.Textbox(label="Response", interactive=False) | |
| # Button Click Events | |
| submit_button.click( | |
| lambda query, method: query_router(query, method, retriever), | |
| inputs=[query_input, query_method], | |
| outputs=output_box | |
| ) | |
| # This callback will update the date and time whenever the user submits a query | |
| submit_button.click( | |
| fn=update_datetime, | |
| inputs=[], | |
| outputs=[date_display, time_display] | |
| ) | |
| # Launch UI | |
| ui.launch(share=True) | |
| if __name__ == "__main__": | |
| main() |