Spaces:
Sleeping
Sleeping
| import os | |
| from dotenv import load_dotenv | |
| import json | |
| import gradio as gr | |
| import chromadb | |
| import openai | |
| import requests | |
| import tempfile | |
| from utils.calcules import cal_global | |
| # Try to import pysqlite3 for environments like Hugging Face Spaces | |
| try: | |
| __import__('pysqlite3') | |
| import sys | |
| sys.modules['sqlite3'] = sys.modules.pop('pysqlite3') | |
| except ImportError: | |
| # pysqlite3 not available, use standard sqlite3 (works on most environments) | |
| pass | |
| from dotenv import load_dotenv | |
| import json | |
| import gradio as gr | |
| import chromadb | |
| from utils.rag.retrieval import type_retrieval | |
| from utils.rag.chunking import index_markdown_folder_in_chroma | |
| from llama_index.core import ( | |
| VectorStoreIndex, | |
| StorageContext, | |
| Settings, | |
| download_loader, | |
| ) | |
| from llama_index.llms.mistralai import MistralAI | |
| from llama_index.embeddings.mistralai import MistralAIEmbedding | |
| from llama_index.vector_stores.chroma import ChromaVectorStore | |
| # PRODUCTS = [ | |
| # "aldecoc cmk", | |
| # "aldekol des", | |
| # "BioVX", | |
| # "DeterBio", | |
| # "DeterKlyn", | |
| # "Maskomal", | |
| # "Mentofin", | |
| # "TH5", | |
| # "Virkon H2O", | |
| # "Virkon LSP", | |
| # "Virkon H", | |
| # ] | |
| PRODUCTS=[ | |
| "th5", | |
| "aldecoc cmk", | |
| "virkon lsp", | |
| "aldekol des ff", | |
| "biovx" | |
| ] | |
| PRODUCTS_TO_PRINT = "" | |
| for p in PRODUCTS : | |
| PRODUCTS_TO_PRINT+=f"\n-{p}" | |
| PARAMS = {'surface': None, 'animal': None, 'type_de_sol': None, 'application': None, 'produit': None} | |
| def classify_message(message: str) -> str: | |
| prompt = f""" | |
| Tu es un classificateur de questions pour un chatbot de biosécurité. | |
| Ton rôle est de lire la question de l’utilisateur et de renvoyer UNE SEULE étiquette | |
| parmi les suivantes : | |
| - produit | |
| - dosage | |
| - selecteur | |
| - protocole | |
| - fallback | |
| - autre | |
| Règles : | |
| - Si la question mentionne explicitement un produit parmi {PRODUCTS}, étiquette => produit. | |
| - Si la question parle de dosage, quantité, dilution, calcul, préparation de solution => dosage. | |
| - Si la question demande quel produit utiliser mais ne cite pas de nom précis => selecteur. | |
| - Si la question demande un protocole, un processus, une aide étape par étape => protocole. | |
| - Sinon => fallback. | |
| LIMITES : | |
| On ne propose que des solutions de désinfection de surface ou d'environnement. Si on te questionne pour traiter des animaux, des personnes, des plantes, des arbres alors tu réponds => autre | |
| Question : {message} | |
| Étiquette : | |
| """ | |
| resp = llm.complete(prompt) | |
| label = resp.text.strip().lower() | |
| valid_labels = ["produit", "dosage", "selecteur", "protocole", "fallback", "autre"] | |
| if label not in valid_labels: | |
| label = "fallback" | |
| return label | |
| def orchestrator(message: str, categorie: str): | |
| if categorie=="produit": | |
| prompt = f""" | |
| Je possède cette liste de produits : {PRODUCTS} | |
| A partir de ce texte, extrait le produit présent dans la liste : | |
| {message} | |
| Il peut y avoir des fautes de frappes dans le message et le nom du produit, prend le en compte. | |
| SI c'est le cas ne renvoie que le nom du produit le plus proche. | |
| Si le produit ne correspond à la liste, retourne le nom du produit. | |
| Si aucun nom de produit n'est présent dans le message, répond "aucun". | |
| Tu ne dois répondre que et UNIQUEMENT le nom du produit ou aucun. | |
| """ | |
| resp = str(llm.complete(prompt)).lower() | |
| PRODUCTS_TO_PRINT = """Les produits pris en charge sont les suivants :""" | |
| for p in PRODUCTS : | |
| PRODUCTS_TO_PRINT+=f"\n-{p}" | |
| if resp == "aucun": | |
| return f"""Aucun produit n'est renseigné. Les produits pris en charge sont les suivants : {PRODUCTS_TO_PRINT}""" | |
| if resp not in PRODUCTS: | |
| return f"Le produit {resp} n'est pas pris en compte. La liste des produits est la suivante : {PRODUCTS_TO_PRINT}" | |
| donnees = type_retrieval(kind=categorie, query=message, product = resp) | |
| prompt = f""" | |
| Tu es un assistant virtuel **expert en biosécurité**. | |
| Répond à la question suivante en utilisant uniquement les informations issus du données : | |
| Question : {message} | |
| Données : {donnees} | |
| """ | |
| return llm.complete(prompt) | |
| elif categorie=="protocole": | |
| donnees = type_retrieval(kind=categorie, query=message, product = None) | |
| prompt = f""" | |
| Tu es un assistant virtuel **expert en biosécurité**. | |
| Répond à la question suivante en utilisant uniquement les informations issus du données : | |
| Question : {message} | |
| Données : {donnees} | |
| """ | |
| return llm.complete(prompt) | |
| elif categorie=="dosage": | |
| surface = None | |
| animal = None | |
| type_de_sol = None | |
| application = None | |
| produit = None | |
| prompt = f""" | |
| Tu es un assistant virtuel **expert en biosécurité**, un utilisateur a besoin d'un calul pour un dosage : {message} | |
| Les paramètres nécessaire pour le calcul sont les suivants : | |
| - nom du produits dans la liste : {PRODUCTS} | |
| - le type d'animal entre volaille et porcin | |
| - le type de sol entre terre et béton | |
| - le type d'application entre thermal fogging, spraying et foaming | |
| - la surface en m² (int) | |
| Les paramètre en mémoire sont : | |
| surface = {surface} | |
| animal = {animal} | |
| type_de_sol = {type_de_sol} | |
| application = {application} | |
| produit = {produit} | |
| Tu dois extraire les paramètres de question de l'utilisateur. | |
| Retourne **UNIQUEMENT** un json. | |
| Le JSON EST de la forme : | |
| {{"surface": 1000, "animal": "volaille", "type_de_sol": "terre", "application": "fogging", "produit": "TH5"}} | |
| Met les valeurs à 'null' pour les information que tu n'a pas extrait. | |
| """ | |
| r = llm.complete(prompt).text #.split("{")[1].split("}")[0] | |
| # Charger depuis une chaîne (et pas un fichier) | |
| params = json.loads(str(r)) | |
| print(params) | |
| for k, v in params.items(): | |
| if v is not None: | |
| PARAMS[k] = v | |
| print(PARAMS) | |
| manquant = [k for k, v in PARAMS.items() if v is None] | |
| print(manquant) | |
| if manquant != []: | |
| resp = "il manque les paramètres suivants :" | |
| for p in manquant: | |
| resp += f"\n- {p}" | |
| return resp | |
| return cal_global(produit=params['produit'], application=params['application'], | |
| animal=params['animal'], surface = params['surface'], | |
| type_de_sol = params['type_de_sol']) | |
| elif categorie=="selecteur": | |
| # return promptRAG_selecteur(type="selecteur",produit=None, message=message) | |
| return 1 | |
| elif categorie=="fallback": | |
| return "Désolé, je ne peux pas répondre à cette question car elle ne concerne pas la biosécurité en élevage." | |
| elif categorie=="autre": | |
| return "Cela n'est pas dans nos domaines d'applications. Rapprochez-vous de votre vétérinaire pour un conseil adapté." | |
| else: | |
| return 0 # Le message n'a pas été correctement classifié | |
| load_dotenv() | |
| env_api_key = os.environ.get("MISTRAL_API_KEY") | |
| openai.api_key = os.environ.get("OPENAI_API_KEY") | |
| title = "Gaia Mistral 8x22b Chat RAG PDF Demo" | |
| description = "Example of an assistant with Gradio, RAG from PDF documents and Mistral AI via its API" | |
| placeholder = ( | |
| "Vous pouvez me posez une question sur ce contexte, appuyer sur Entrée pour valider" | |
| ) | |
| placeholder_url = "Extract text from this url" | |
| llm_model = "open-mixtral-8x22b" | |
| env_api_key = os.environ.get("MISTRAL_API_KEY") | |
| query_engine = None | |
| # Define LLMs | |
| llm = MistralAI(api_key=env_api_key, model=llm_model) | |
| embed_model = MistralAIEmbedding(model_name="mistral-embed", api_key=env_api_key) | |
| # create client and a new collection | |
| db = index_markdown_folder_in_chroma('chatbot-lanxess/documents') | |
| Settings.llm = llm | |
| PDFReader = download_loader("PDFReader") | |
| loader = PDFReader() | |
| # ---------- Transcription audio ---------- | |
| def transcribe_audio(file_path): | |
| if not file_path: | |
| return "Aucun audio enregistré." | |
| try: | |
| with open(file_path, "rb") as f: | |
| result = openai.audio.transcriptions.create( | |
| model="gpt-4o-transcribe", | |
| file=f, | |
| response_format="text" | |
| ) | |
| return result.strip() | |
| except Exception as e: | |
| return f"Erreur transcription : {e}" | |
| import tempfile | |
| import openai | |
| def generate_tts(text): | |
| if not text: | |
| return None | |
| try: | |
| # Crée juste un nom de fichier temporaire | |
| temp_file_path = tempfile.mktemp(suffix=".mp3") | |
| # Appel correct de l'API TTS | |
| response = openai.audio.speech.create( | |
| model="gpt-4o-mini-tts", | |
| voice="sage", | |
| input=text | |
| ) | |
| # Récupère les bytes de l'audio | |
| audio_bytes = response.read() # c'est la bonne méthode pour HttpxBinaryResponseContent | |
| # Écriture du fichier MP3 | |
| with open(temp_file_path, "wb") as f: | |
| f.write(audio_bytes) | |
| return temp_file_path | |
| except Exception as e: | |
| print("Erreur TTS :", e) | |
| return None | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| """ # Welcome to Gaia Level 3 Demo | |
| Add a file before interacting with the Chat. | |
| This demo allows you to interact with a pdf file and then ask questions to Mistral APIs. | |
| Mistral will answer with the context extracted from your uploaded file. | |
| *The files will stay in the database unless there is 48h of inactivty or you re-build the space.* | |
| """ | |
| ) | |
| gr.Markdown(""" ### 2 / Ask a question about this context """) | |
| chatbot = gr.Chatbot() | |
| msg = gr.Textbox(placeholder=placeholder) | |
| clear = gr.ClearButton([msg, chatbot]) | |
| def paramsLoop(userResponse): | |
| surface = None | |
| animal = None | |
| type_de_sol = None | |
| application = None | |
| produit = None | |
| message = "coucou" | |
| prompt = f""" | |
| Tu es un assistant virtuel **expert en biosécurité**, un utilisateur a besoin d'un calul pour un dosage : {message} | |
| Les paramètres nécessaire pour le calcul sont les suivants : | |
| - nom du produits dans la liste : {PRODUCTS} | |
| - le type d'animal entre volaille et porcin | |
| - le type de sol entre terre et béton | |
| - le type d'application entre thermal fogging, spraying et foaming | |
| - la surface en m² (int) | |
| Les paramètre en mémoire sont : | |
| surface = {surface} | |
| animal = {animal} | |
| type_de_sol = {type_de_sol} | |
| application = {application} | |
| produit = {produit} | |
| Tu dois extraire les paramètres de question de l'utilisateur et retourner un json de la forme : | |
| {{ | |
| surface: 1000, | |
| animal: "volaille", | |
| type_de_sol: "terre", | |
| application: "fogging", | |
| produit: "TH5" | |
| }} | |
| si il manque des informations met la valeur à None | |
| """ | |
| params = llm.complete(prompt) | |
| stop = False | |
| manquant = [] | |
| k = 0 #Nombre d'itération | |
| while (manquant) and (not stop) and (k<5) : | |
| manquant = [k for k,v in params.items() if v == None] | |
| resp = """ il manque les paramètres suivants :""" | |
| for p in manquant : | |
| resp+=f"\n-{p}" | |
| msg.submit(respondParamsLoop, [msg, chatbot], [chatbot]) | |
| # Get responses | |
| # LLM prompt manquant | |
| json = llm.complete(prompt) | |
| # si k=2 et il y a un param a None : stop | |
| params.update(json) | |
| # return cal_global(produit=params['produit'], ...) | |
| def respondParamsLoop(message, chat_history): | |
| # chat_history.append((message, paramsLoop(message, nous))) | |
| return chat_history | |
| def respond(message, chat_history): | |
| resp = classify_message(message) | |
| response = str(orchestrator(message, resp)) | |
| chat_history.append((message, response)) | |
| tts_path = generate_tts(response) | |
| return chat_history, tts_path | |
| def handle_audio(file_path, chat_history): | |
| transcription = transcribe_audio(file_path) | |
| if "Erreur" in transcription or "Aucun audio" in transcription: | |
| return chat_history, transcription, None | |
| updated_history, tts_path = respond(transcription, chat_history) | |
| return updated_history, transcription, tts_path | |
| msg.submit(respond, [msg, chatbot], [chatbot]) | |
| chatbot = gr.Chatbot() | |
| msg = gr.Textbox(placeholder=placeholder) | |
| tts_output = gr.Audio(label="Réponse vocale", type="filepath", autoplay=True) | |
| clear = gr.ClearButton([msg, chatbot, tts_output]) | |
| msg.submit(respond, [msg, chatbot], [chatbot, tts_output]) | |
| # ---------- Section Audio ---------- | |
| gr.Markdown("### 3 / Record Audio and Transcribe") | |
| with gr.Row(): | |
| audio_input = gr.Audio(label="Enregistrer votre audio", type="filepath") | |
| transcription_output = gr.Textbox(label="Transcription", interactive=False) | |
| audio_input.change( | |
| fn=handle_audio, | |
| inputs=[audio_input, chatbot], | |
| outputs=[chatbot, transcription_output, tts_output] | |
| ) | |
| demo.title = title | |
| demo.launch(share=True) | |