Spaces:
Running
Running
| import streamlit as st | |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.prompts import PromptTemplate | |
| import requests | |
| import os | |
| # Environment variables | |
| api_key = os.getenv("HF_API_KEY") | |
| RAPIDAPI_KEY = (os.getenv("RAPIDAPI_KEY") or "").strip() | |
| if not RAPIDAPI_KEY: | |
| st.error("RAPIDAPI_KEY not set") | |
| # Check available languages via RapidAPI | |
| def get_available_languages(video_id): | |
| """Check available transcript languages for a video via RapidAPI""" | |
| url = "https://youtube-transcript3.p.rapidapi.com/api/languages" | |
| querystring = {"videoId": video_id} | |
| headers = { | |
| "x-rapidapi-key": RAPIDAPI_KEY, | |
| "x-rapidapi-host": "youtube-transcript3.p.rapidapi.com" | |
| } | |
| try: | |
| response = requests.get(url, headers=headers, params=querystring, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get("success") and "languages" in data: | |
| languages = [] | |
| for lang in data["languages"]: | |
| code = lang.get("code", "") | |
| name = lang.get("name", "") | |
| languages.append((code, f"{name} ({code})")) | |
| return languages | |
| # Fallback to common languages if API fails | |
| return [ | |
| ("en", "English (en)"), | |
| ("hi", "Hindi (hi)"), | |
| ("es", "Spanish (es)"), | |
| ("fr", "French (fr)"), | |
| ("de", "German (de)"), | |
| ("ja", "Japanese (ja)"), | |
| ("pt", "Portuguese (pt)"), | |
| ("ru", "Russian (ru)") | |
| ] | |
| except Exception as e: | |
| st.warning(f"Could not fetch languages: {e}. Using common languages.") | |
| return [ | |
| ("en", "English (en)"), | |
| ("hi", "Hindi (hi)"), | |
| ("es", "Spanish (es)"), | |
| ("fr", "French (fr)"), | |
| ("de", "German (de)"), | |
| ("ja", "Japanese (ja)"), | |
| ("pt", "Portuguese (pt)"), | |
| ("ru", "Russian (ru)") | |
| ] | |
| # Transcript Fetcher | |
| def get_transcript(video_id, language_code="en"): | |
| url = "https://youtube-transcript3.p.rapidapi.com/api/transcript" | |
| querystring = {"videoId": video_id, "lang": language_code} | |
| headers = { | |
| "x-rapidapi-key": RAPIDAPI_KEY, | |
| "x-rapidapi-host": "youtube-transcript3.p.rapidapi.com" | |
| } | |
| try: | |
| response = requests.get(url, headers=headers, params=querystring, timeout=10) | |
| if response.status_code != 200: | |
| st.error(f"API Error: {response.status_code}") | |
| return None | |
| data = response.json() | |
| if data.get("success") and "transcript" in data: | |
| return ' '.join([item.get('text', '') for item in data["transcript"]]) | |
| else: | |
| st.warning("Unexpected API response format") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| return None | |
| # Vector Store | |
| def create_vector_store(transcript): | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| docs = splitter.create_documents([transcript]) | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name="intfloat/multilingual-e5-base", | |
| model_kwargs={"device": "cpu"} | |
| ) | |
| return FAISS.from_documents(docs, embeddings) | |
| # ------------------------------------------------- | |
| # 3️⃣ Model Builder | |
| # ------------------------------------------------- | |
| def build_model(model_choice, temperature=0.7): | |
| """Return the correct model and a flag indicating if it’s chat-based.""" | |
| if model_choice == "Llama-3.2-1B": | |
| llm = HuggingFaceEndpoint( | |
| repo_id="meta-llama/Llama-3.2-1B-Instruct", | |
| huggingfacehub_api_token=api_key, | |
| task="text-generation", | |
| max_new_tokens=500, | |
| temperature=temperature | |
| ) | |
| return ChatHuggingFace(llm=llm, temperature=temperature), True # (model, is_chat) | |
| elif model_choice == "Gemma-2-3B": | |
| llm = HuggingFaceEndpoint( | |
| repo_id="google/gemma-2-2b-it", | |
| huggingfacehub_api_token=api_key, | |
| task="text-generation", | |
| max_new_tokens=500 | |
| ) | |
| return ChatHuggingFace(llm=llm, temperature=temperature), True | |
| elif model_choice == "DeepSeek-685B": | |
| llm = HuggingFaceEndpoint( | |
| repo_id="deepseek-ai/DeepSeek-V3.2-Exp", | |
| huggingfacehub_api_token=api_key, | |
| task="text-generation", | |
| max_new_tokens=500 | |
| ) | |
| return ChatHuggingFace(llm=llm, temperature=temperature), True | |
| elif model_choice == "OpenAI-20B": | |
| llm = HuggingFaceEndpoint( | |
| repo_id="openai/gpt-oss-20b", | |
| huggingfacehub_api_token=api_key, | |
| task="text-generation", | |
| max_new_tokens=500 | |
| ) | |
| return ChatHuggingFace(llm=llm, temperature=temperature), True | |
| # ------------------------------------------------- | |
| # 4️⃣ Prompt Template | |
| # ------------------------------------------------- | |
| prompt_template = PromptTemplate( | |
| template=( | |
| "You are a helpful assistant.\n\n" | |
| "Answer the question using the context provided below.\n" | |
| "If the context does not mention the topic, say clearly: 'There is no mention of the topic in the video you provided.'\n" | |
| "Then, based on your own knowledge, try to answer the question.\n" | |
| "If both the context and your knowledge are insufficient, say: 'I don't know.'\n\n" | |
| "Keep the answer format neat, clean, and human-readable.\n\n" | |
| "Context:\n{context}\n\n" | |
| "Question:\n{question}" | |
| ), | |
| input_variables=["context", "question"] | |
| ) | |
| # ------------------------------------------------- | |
| # 5️⃣ Streamlit App UI | |
| # ------------------------------------------------- | |
| import re | |
| def extract_video_id(url: str) -> str: | |
| # Handles both youtube.com and youtu.be formats | |
| pattern = r"(?:v=|\/)([0-9A-Za-z_-]{11}).*" | |
| match = re.search(pattern, url) | |
| return match.group(1) if match else None | |
| st.title("🎬 YouTube Transcript Chatbot (RAG)") | |
| video_url = st.text_input("Enter YouTube Video URL", value="lv1_-RER4_I") | |
| video_id = extract_video_id(video_url) | |
| query = st.text_area("Your Query", value="What is RAG?") | |
| model_choice = st.radio("Model to Use", ["Llama-3.2-1B", "Gemma-2-3B", "DeepSeek-685B", "OpenAI-20B"]) | |
| temperature = st.slider("Temperature", 0, 100, value=50) / 100.0 | |
| # Get available languages for this video | |
| language_code = None | |
| if video_id: | |
| with st.spinner("Checking available languages..."): | |
| available_languages = get_available_languages(video_id) | |
| if available_languages: | |
| st.success(f"Found {len(available_languages)} language(s)") | |
| lang_options = {label: code for code, label in available_languages} | |
| selected_label = st.selectbox("Select Language", options=list(lang_options.keys())) | |
| language_code = lang_options[selected_label] | |
| else: | |
| st.warning("No languages found for this video.") | |
| # ------------------------------------------------- | |
| # 6️⃣ Run Chatbot | |
| # ------------------------------------------------- | |
| if st.button("Run Chatbot"): | |
| if not video_id or not query or not language_code: | |
| st.warning("⚠️ Please fill in all fields and select a language.") | |
| else: | |
| with st.spinner("Fetching transcript..."): | |
| transcript = get_transcript(video_id, language_code) | |
| if not transcript: | |
| st.error("❌ Could not fetch transcript.") | |
| else: | |
| st.success(f"✅ Transcript fetched ({len(transcript)} characters).") | |
| with st.spinner("Creating knowledge base..."): | |
| retriever = create_vector_store(transcript).as_retriever( | |
| search_type="mmr", | |
| search_kwargs={"k": 5} | |
| ) | |
| relevant_docs = retriever.invoke(query) | |
| context_text = "\n\n".join(doc.page_content for doc in relevant_docs) | |
| prompt = prompt_template.invoke({'context':context_text, 'question':query}) | |
| with st.spinner(f"Generating response using {model_choice}..."): | |
| model, is_chat = build_model(model_choice, temperature) | |
| try: | |
| if is_chat: | |
| # DeepSeek & OpenAI (chat-based) | |
| response = model.invoke(prompt) | |
| response_text = ( | |
| response.content if hasattr(response, "content") else str(response) | |
| ) | |
| else: | |
| # Flan-T5 (non-chat) | |
| response = model(prompt) | |
| if isinstance(response, list) and "generated_text" in response[0]: | |
| response_text = response[0]["generated_text"] | |
| else: | |
| response_text = str(response) | |
| st.text_area("🧠 Model Response", value=response_text, height=400) | |
| except Exception as e: | |
| st.error(f"Model generation failed: {e}") ## answer |