Spaces:
Running
Running
| from flask import Flask, render_template, request, Response | |
| from google import genai | |
| from google.genai import types | |
| import os | |
| import logging | |
| import json | |
| import tempfile | |
| import uuid | |
| from typing import List, Dict, Any | |
| import mimetypes | |
| # Configuration du logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| # Configuration | |
| TOKEN = os.environ.get("TOKEN") | |
| if not TOKEN: | |
| raise ValueError("La variable d'environnement TOKEN doit être définie avec votre clé API Gemini") | |
| client = genai.Client(api_key=TOKEN) | |
| # Configuration des modèles | |
| STANDARD_MODEL_NAME = "gemini-flash-latest" | |
| DEEPTHINK_MODEL_NAME = "gemini-flash-latest" | |
| # Types MIME supportés pour les images | |
| SUPPORTED_IMAGE_MIMES = { | |
| 'image/png', 'image/jpeg', 'image/webp', 'image/heic', 'image/heif' | |
| } | |
| # Prompt système pour les devoirs de français | |
| FRENCH_SYSTEM_PROMPT = """ | |
| Tu es expérimenté en langue française. | |
| # Instruction pour travail argumentatif de français niveau lycée | |
| ## Paramètres d'entrée attendus : | |
| - `sujet` : La question/thème à traiter | |
| - `choix` : Type de travail ("discuter", "dissertation", "étayer", "réfuter") | |
| - `style` : Style d'écriture souhaité (ex: "raffiné") | |
| ## Traitement automatique selon le type : | |
| ### Si choix = "discuter" : | |
| Produire un travail argumentatif suivant cette méthodologie : | |
| **INTRODUCTION:** | |
| - Approche par constat | |
| - Problématique | |
| - Annonce du plan | |
| **DÉVELOPPEMENT:** | |
| *Partie 1 : Thèse* | |
| - Introduction partielle (énonce la thèse) | |
| - Argument 1: Explications + Illustration (exemple + explication) | |
| - Argument 2: Explications + Illustration (exemple + explication) | |
| - Argument 3: Explications + Illustration (exemple + explication) | |
| *Phrase de transition vers la deuxième partie* | |
| *Partie 2 : Antithèse* | |
| - Introduction partielle (énonce l'antithèse) | |
| - Argument 1: Explications + Illustration (exemple + explication) | |
| - Argument 2: Explications + Illustration (exemple + explication) | |
| - Argument 3: Explications + Illustration (exemple + explication) | |
| **CONCLUSION:** | |
| - Bilan (Synthèse des arguments pour et contre) | |
| - Ouverture du sujet (sous forme de phrase interrogative) | |
| ### Si choix = "dissertation" : | |
| Produire une dissertation suivant cette méthodologie : | |
| **Phase 1 : L'Introduction (en un seul paragraphe)** | |
| - Amorce : Introduire le thème général | |
| - Position du Sujet : Présenter l'auteur/œuvre, citer et reformuler le sujet | |
| - Problématique : Poser la question centrale dialectique | |
| - Annonce du Plan : Annoncer thèse et antithèse | |
| **Phase 2 : Le Développement (en trois parties distinctes)** | |
| *Partie 1 : La Thèse* | |
| - Explorer et justifier l'affirmation initiale | |
| - 2-3 arguments (structure A.I.E : Affirmation-Illustration-Explication) | |
| - Exemples littéraires précis et analysés | |
| *Partie 2 : L'Antithèse* | |
| - Nuancer, critiquer ou présenter le point de vue opposé | |
| - 2-3 arguments (structure A.I.E) | |
| - Transition cruciale depuis la thèse | |
| - Exemples littéraires précis et analysés | |
| *Partie 3 : La Synthèse* | |
| - Dépasser l'opposition thèse/antithèse | |
| - Nouvelle perspective intégrant les éléments valides | |
| - 2-3 arguments (structure A.I.E) | |
| - Transition depuis l'antithèse | |
| - Exemples littéraires précis et analysés | |
| **Phase 3 : La Conclusion (en un seul paragraphe)** | |
| - Bilan synthétique du parcours dialectique | |
| - Réponse claire à la problématique | |
| - Ouverture vers contexte plus large | |
| ### Si choix = "étayer" ou "réfuter" : | |
| Produire un travail argumentatif suivant cette méthodologie : | |
| **INTRODUCTION:** | |
| - Approche par constat | |
| - Problématique | |
| - Annonce du plan | |
| **DÉVELOPPEMENT:** | |
| - Phrase chapeau (énonce la thèse principale : pour étayer ou contre pour réfuter) | |
| - Argument 1: Explications + Illustration (exemple + explication) | |
| - Argument 2: Explications + Illustration (exemple + explication) | |
| - Argument 3: Explications + Illustration (exemple + explication) | |
| **CONCLUSION:** | |
| - Bilan (rappel de la thèse + arguments principaux) | |
| - Ouverture du sujet (sous forme de phrase interrogative) | |
| ## Application du style : | |
| Adapter l'écriture selon le style demandé (ex: "raffiné" = vocabulaire soutenu, syntaxe élaborée, références culturelles). | |
| Commence directement par l'introduction sans rien expliquer avant. | |
| """ | |
| class FileManager: | |
| """Gestionnaire de fichiers uploadés avec nettoyage automatique""" | |
| def __init__(self): | |
| self.uploaded_files: Dict[str, Any] = {} | |
| def upload_image(self, image_data: bytes, filename: str, content_type: str) -> str: | |
| """Upload une image et retourne son ID""" | |
| try: | |
| # Créer un fichier temporaire | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as temp_file: | |
| temp_file.write(image_data) | |
| temp_path = temp_file.name | |
| # Upload vers Gemini | |
| uploaded_file = client.files.upload(file=temp_path) | |
| file_id = str(uuid.uuid4()) | |
| self.uploaded_files[file_id] = { | |
| 'gemini_file': uploaded_file, | |
| 'temp_path': temp_path, | |
| 'filename': filename, | |
| 'content_type': content_type | |
| } | |
| logger.info(f"Fichier uploadé avec succès: {filename} (ID: {file_id})") | |
| return file_id | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'upload de {filename}: {str(e)}") | |
| # Nettoyer le fichier temporaire en cas d'erreur | |
| if 'temp_path' in locals(): | |
| try: | |
| os.unlink(temp_path) | |
| except: | |
| pass | |
| raise | |
| def get_gemini_file(self, file_id: str): | |
| """Récupère le fichier Gemini par son ID""" | |
| if file_id in self.uploaded_files: | |
| return self.uploaded_files[file_id]['gemini_file'] | |
| return None | |
| def cleanup_file(self, file_id: str): | |
| """Nettoie un fichier spécifique""" | |
| if file_id in self.uploaded_files: | |
| file_info = self.uploaded_files[file_id] | |
| # Supprimer le fichier temporaire | |
| try: | |
| os.unlink(file_info['temp_path']) | |
| except: | |
| pass | |
| # Supprimer de Gemini (optionnel, les fichiers expirent automatiquement) | |
| try: | |
| client.files.delete(name=file_info['gemini_file'].name) | |
| except: | |
| pass | |
| del self.uploaded_files[file_id] | |
| logger.info(f"Fichier nettoyé: {file_id}") | |
| def cleanup_all(self): | |
| """Nettoie tous les fichiers""" | |
| for file_id in list(self.uploaded_files.keys()): | |
| self.cleanup_file(file_id) | |
| # Instance globale du gestionnaire de fichiers | |
| file_manager = FileManager() | |
| def validate_image_file(file) -> bool: | |
| """Valide qu'un fichier est une image supportée""" | |
| if not file or not file.filename: | |
| return False | |
| # Vérifier l'extension | |
| filename = file.filename.lower() | |
| valid_extensions = ['.png', '.jpg', '.jpeg', '.webp', '.heic', '.heif'] | |
| if not any(filename.endswith(ext) for ext in valid_extensions): | |
| return False | |
| # Vérifier le type MIME | |
| content_type = file.content_type or mimetypes.guess_type(filename)[0] | |
| return content_type in SUPPORTED_IMAGE_MIMES | |
| def create_error_response(message: str, status_code: int = 400) -> Response: | |
| """Crée une réponse d'erreur standardisée""" | |
| return Response( | |
| "data: " + json.dumps({'type': 'error', 'content': message}) + "\n\n", | |
| mimetype='text/event-stream' | |
| ), status_code | |
| def index(): | |
| """Page d'accueil""" | |
| logger.info("Page index demandée.") | |
| return render_template('index.html') | |
| def api_francais(): | |
| """API pour génération de devoirs de français""" | |
| logger.info(f"Requête {request.method} reçue sur /api/francais") | |
| # Récupération des paramètres | |
| sujet = request.args.get('sujet', '').strip() | |
| choix_raw = request.args.get('choix', '').strip() | |
| style = request.args.get('style', '').strip() | |
| use_deepthink = request.args.get('use_deepthink', 'false').lower() == 'true' | |
| # Normalisation du choix (minuscules + correction orthographique) | |
| choix = choix_raw.lower() | |
| # Mapper les variations courantes vers les valeurs attendues | |
| choix_mapping = { | |
| 'etaye': 'étayer', | |
| 'etayer': 'étayer', | |
| 'refute': 'réfuter', | |
| 'refuter': 'réfuter', | |
| 'discuter': 'discuter', | |
| 'dissertation': 'dissertation', | |
| 'étayer': 'étayer', | |
| 'réfuter': 'réfuter' | |
| } | |
| choix = choix_mapping.get(choix, choix) | |
| logger.info(f"Données reçues : sujet='{sujet[:50]}...', choix='{choix}' (original: '{choix_raw}'), style='{style}', deepthink={use_deepthink}") | |
| # Validation | |
| if not sujet: | |
| return create_error_response('Erreur: Le sujet ne peut pas être vide.') | |
| if choix not in ['discuter', 'dissertation', 'étayer', 'réfuter']: | |
| return create_error_response(f"Erreur: Type de travail non valide ('{choix_raw}'). Valeurs acceptées : discuter, dissertation, étayer, réfuter.") | |
| # Sélection du modèle | |
| model_name = DEEPTHINK_MODEL_NAME if use_deepthink else STANDARD_MODEL_NAME | |
| logger.info(f"Modèle utilisé : {model_name}") | |
| # Configuration de génération | |
| config = types.GenerateContentConfig( | |
| system_instruction=FRENCH_SYSTEM_PROMPT, | |
| temperature=1.0 | |
| ) | |
| if use_deepthink: | |
| config.thinking_config = types.ThinkingConfig(include_thoughts=True) | |
| # Prompt utilisateur | |
| user_prompt = f"Sujet: {sujet}\nType: {choix}\nStyle: {style}" | |
| try: | |
| logger.info("Démarrage de la génération...") | |
| # Génération sans streaming | |
| response = client.models.generate_content( | |
| model=model_name, | |
| contents=[user_prompt], | |
| config=config | |
| ) | |
| # Vérifier si la réponse a été bloquée | |
| if hasattr(response, 'prompt_feedback') and response.prompt_feedback: | |
| logger.warning(f"Prompt feedback: {response.prompt_feedback}") | |
| if hasattr(response.prompt_feedback, 'block_reason'): | |
| return Response( | |
| json.dumps({ | |
| 'type': 'error', | |
| 'content': f'Contenu bloqué par les filtres de sécurité: {response.prompt_feedback.block_reason}' | |
| }), | |
| mimetype='application/json', | |
| status=400 | |
| ) | |
| # Vérifier les candidats | |
| if not response.candidates: | |
| logger.error("Aucun candidat dans la réponse") | |
| return Response( | |
| json.dumps({ | |
| 'type': 'error', | |
| 'content': 'Aucune réponse générée par le modèle.' | |
| }), | |
| mimetype='application/json', | |
| status=500 | |
| ) | |
| candidate = response.candidates[0] | |
| # Vérifier le statut du candidat | |
| if hasattr(candidate, 'finish_reason') and candidate.finish_reason: | |
| finish_reason = str(candidate.finish_reason) | |
| if 'SAFETY' in finish_reason: | |
| logger.warning(f"Contenu bloqué: {finish_reason}") | |
| return Response( | |
| json.dumps({ | |
| 'type': 'error', | |
| 'content': 'Contenu bloqué par les filtres de sécurité.' | |
| }), | |
| mimetype='application/json', | |
| status=400 | |
| ) | |
| # Extraire le texte | |
| if not hasattr(candidate, 'content') or not candidate.content: | |
| logger.error("Pas de contenu dans le candidat") | |
| return Response( | |
| json.dumps({ | |
| 'type': 'error', | |
| 'content': 'Aucun contenu généré.' | |
| }), | |
| mimetype='application/json', | |
| status=500 | |
| ) | |
| if not hasattr(candidate.content, 'parts') or not candidate.content.parts: | |
| logger.error("Pas de parts dans le contenu") | |
| return Response( | |
| json.dumps({ | |
| 'type': 'error', | |
| 'content': 'Aucun contenu généré.' | |
| }), | |
| mimetype='application/json', | |
| status=500 | |
| ) | |
| # Récupérer tout le texte | |
| full_text = "" | |
| thoughts = "" | |
| for part in candidate.content.parts: | |
| if hasattr(part, 'text') and part.text: | |
| if hasattr(part, 'thought') and part.thought and use_deepthink: | |
| thoughts += part.text + "\n" | |
| else: | |
| full_text += part.text | |
| if not full_text: | |
| logger.error("Aucun texte dans les parts") | |
| return Response( | |
| json.dumps({ | |
| 'type': 'error', | |
| 'content': 'Aucun contenu textuel généré.' | |
| }), | |
| mimetype='application/json', | |
| status=500 | |
| ) | |
| logger.info(f"Génération réussie. Longueur: {len(full_text)} caractères") | |
| result = { | |
| 'type': 'success', | |
| 'content': full_text | |
| } | |
| if thoughts and use_deepthink: | |
| result['thoughts'] = thoughts | |
| return Response( | |
| json.dumps(result), | |
| mimetype='application/json' | |
| ) | |
| except Exception as e: | |
| logger.exception("Erreur pendant la génération de contenu.") | |
| return Response( | |
| json.dumps({ | |
| 'type': 'error', | |
| 'content': f'Erreur serveur pendant la génération: {str(e)}' | |
| }), | |
| mimetype='application/json', | |
| status=500 | |
| ) | |
| def api_etude_texte(): | |
| """API pour analyse de texte à partir d'images uploadées""" | |
| logger.info("Requête reçue sur /api/etude-texte") | |
| # Vérification de la présence d'images | |
| if 'images' not in request.files: | |
| return create_error_response('Aucun fichier image envoyé.') | |
| images = request.files.getlist('images') | |
| if not images or not any(img.filename for img in images): | |
| return create_error_response('Aucune image sélectionnée.') | |
| # Validation et upload des images | |
| uploaded_file_ids = [] | |
| try: | |
| for img in images: | |
| if not img.filename: | |
| continue | |
| # Validation de l'image | |
| if not validate_image_file(img): | |
| logger.warning(f"Fichier non valide ignoré: {img.filename}") | |
| continue | |
| # Lecture et upload | |
| img_data = img.read() | |
| if len(img_data) == 0: | |
| logger.warning(f"Fichier vide ignoré: {img.filename}") | |
| continue | |
| # Upload du fichier | |
| file_id = file_manager.upload_image( | |
| img_data, | |
| img.filename, | |
| img.content_type or 'image/jpeg' | |
| ) | |
| uploaded_file_ids.append(file_id) | |
| if not uploaded_file_ids: | |
| return create_error_response('Aucune image valide trouvée.') | |
| logger.info(f"Nombre d'images uploadées: {len(uploaded_file_ids)}") | |
| except Exception as e: | |
| # Nettoyer les fichiers uploadés en cas d'erreur | |
| for file_id in uploaded_file_ids: | |
| file_manager.cleanup_file(file_id) | |
| logger.exception("Erreur lors de l'upload des images.") | |
| return create_error_response(f'Erreur lors du traitement des images: {str(e)}', 500) | |
| def generate_analysis(): | |
| try: | |
| # Préparation du contenu pour Gemini | |
| content = ["Réponds aux questions présentes dans les images. Analyse le contenu de manière détaillée et structurée."] | |
| # Ajout des fichiers uploadés | |
| for file_id in uploaded_file_ids: | |
| gemini_file = file_manager.get_gemini_file(file_id) | |
| if gemini_file: | |
| content.append(gemini_file) | |
| # Configuration pour l'analyse d'image | |
| config = types.GenerateContentConfig( | |
| system_instruction="Tu es un assistant spécialisé dans l'analyse de textes et de documents. Réponds de manière précise et détaillée aux questions posées.", | |
| temperature=0.7 | |
| ) | |
| logger.info("Démarrage de l'analyse d'images...") | |
| # Génération sans streaming | |
| response = client.models.generate_content( | |
| model=STANDARD_MODEL_NAME, | |
| contents=content, | |
| config=config | |
| ) | |
| # Vérifier les erreurs | |
| if not response.candidates: | |
| raise ValueError("Aucun candidat dans la réponse") | |
| candidate = response.candidates[0] | |
| if not hasattr(candidate, 'content') or not candidate.content: | |
| raise ValueError("Pas de contenu dans le candidat") | |
| if not hasattr(candidate.content, 'parts') or not candidate.content.parts: | |
| raise ValueError("Pas de parts dans le contenu") | |
| # Récupérer le texte | |
| full_text = "" | |
| for part in candidate.content.parts: | |
| if hasattr(part, 'text') and part.text: | |
| full_text += part.text | |
| if not full_text: | |
| raise ValueError("Aucun texte généré") | |
| logger.info(f"Analyse réussie. Longueur: {len(full_text)} caractères") | |
| return Response( | |
| json.dumps({ | |
| 'type': 'success', | |
| 'content': full_text | |
| }), | |
| mimetype='application/json' | |
| ) | |
| except Exception as e: | |
| logger.exception("Erreur pendant l'analyse d'images.") | |
| return Response( | |
| json.dumps({ | |
| 'type': 'error', | |
| 'content': f"Erreur serveur pendant l'analyse: {str(e)}" | |
| }), | |
| mimetype='application/json', | |
| status=500 | |
| ) | |
| finally: | |
| # Nettoyer les fichiers uploadés après traitement | |
| for file_id in uploaded_file_ids: | |
| file_manager.cleanup_file(file_id) | |
| logger.info("Fichiers temporaires nettoyés.") | |
| return generate_analysis() | |
| def health(): | |
| """Endpoint de vérification de santé""" | |
| return {'status': 'healthy', 'model': STANDARD_MODEL_NAME} | |
| def request_entity_too_large(error): | |
| """Gestionnaire pour les fichiers trop volumineux""" | |
| return create_error_response('Fichier trop volumineux. Taille maximale: 20MB.', 413) | |
| def internal_server_error(error): | |
| """Gestionnaire d'erreur serveur""" | |
| logger.exception("Erreur serveur interne") | |
| return create_error_response('Erreur serveur interne.', 500) | |
| # Nettoyage à la fermeture de l'application | |
| def cleanup_files(error): | |
| """Nettoie les fichiers temporaires à la fin de la requête""" | |
| if error: | |
| logger.error(f"Erreur dans le contexte de l'application: {error}") | |
| if __name__ == '__main__': | |
| # Configuration pour la production | |
| app.config['MAX_CONTENT_LENGTH'] = 20 * 1024 * 1024 # 20MB max | |
| logger.info("Démarrage du serveur Flask avec Gemini SDK amélioré...") | |
| try: | |
| # En production, utiliser un serveur WSGI comme Gunicorn | |
| app.run( | |
| debug=True, # Désactiver en production | |
| ) | |
| finally: | |
| # Nettoyer tous les fichiers à la fermeture | |
| file_manager.cleanup_all() | |
| logger.info("Application fermée et fichiers nettoyés.") |