Spaces:
Paused
Paused
| import logging | |
| import os | |
| import io | |
| import base64 | |
| import json | |
| import requests | |
| import threading | |
| import uuid | |
| import time | |
| import tempfile | |
| import subprocess | |
| import shutil | |
| import re | |
| from datetime import datetime | |
| from flask import Flask, render_template, request, jsonify, Response, stream_with_context, send_from_directory | |
| from google import genai | |
| from google.genai import types | |
| from PIL import Image | |
| # --- Configuration du Logging --- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # --- Configuration de l'Application Flask --- | |
| app = Flask(__name__) | |
| # --- Constantes et Variables Globales --- | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" | |
| TELEGRAM_CHAT_ID = "-1002564204301" | |
| GENERATED_PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'generated_pdfs') | |
| USER_IMAGES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'user_images') | |
| # --- Initialisation des Services Externes --- | |
| client = None | |
| if GOOGLE_API_KEY: | |
| try: | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| logger.info("Client Google GenAI initialisé avec succès.") | |
| except Exception as e: | |
| logger.critical(f"Erreur critique lors de l'initialisation du client Gemini: {e}", exc_info=True) | |
| else: | |
| logger.critical("GOOGLE_API_KEY non trouvé dans les variables d'environnement. Le service ne fonctionnera pas.") | |
| task_results = {} | |
| # --- Fonctions Utilitaires --- | |
| def load_prompt_from_file(filename): | |
| """Charge le contenu d'un fichier de prompt.""" | |
| try: | |
| prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts') | |
| filepath = os.path.join(prompts_dir, filename) | |
| logger.info(f"Chargement du prompt depuis '{filepath}'") | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| logger.error(f"Erreur lors du chargement du prompt '{filename}': {e}", exc_info=True) | |
| return "" | |
| def get_prompt_for_style(style): | |
| """Retourne le prompt approprié en fonction du style demandé.""" | |
| logger.info(f"Sélection du prompt pour le style: '{style}'") | |
| return load_prompt_from_file('prompt_light.txt') if style == 'light' else load_prompt_from_file('prompt_colorful.txt') | |
| def check_latex_installation(): | |
| """Vérifie si pdflatex est installé et accessible dans le PATH.""" | |
| logger.info("Vérification de l'installation de LaTeX (pdflatex)...") | |
| try: | |
| subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10) | |
| logger.info("Vérification réussie: pdflatex est installé et fonctionnel.") | |
| return True | |
| except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e: | |
| logger.warning(f"pdflatex n'est pas installé ou n'est pas dans le PATH. La génération de PDF sera désactivée. Erreur: {e}") | |
| return False | |
| IS_LATEX_INSTALLED = check_latex_installation() | |
| def save_user_image(image_data, filename, task_id): | |
| """Sauvegarde une image utilisateur dans le dossier user_images.""" | |
| try: | |
| # Créer un nom de fichier unique avec le task_id | |
| file_extension = os.path.splitext(filename)[1] | |
| safe_filename = f"{task_id}_{filename}" | |
| image_path = os.path.join(USER_IMAGES_DIR, safe_filename) | |
| with open(image_path, 'wb') as f: | |
| f.write(image_data) | |
| logger.info(f"Image utilisateur sauvegardée: {safe_filename}") | |
| return safe_filename | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la sauvegarde de l'image {filename}: {e}") | |
| return None | |
| def upload_file_to_genai_api(file_data, filename, mime_type): | |
| """Upload un fichier vers l'API Files de Google GenAI.""" | |
| try: | |
| # Déterminer l'extension appropriée | |
| if mime_type.startswith('image/'): | |
| if mime_type == 'image/jpeg': | |
| suffix = '.jpg' | |
| elif mime_type == 'image/png': | |
| suffix = '.png' | |
| elif mime_type == 'image/gif': | |
| suffix = '.gif' | |
| elif mime_type == 'image/webp': | |
| suffix = '.webp' | |
| else: | |
| suffix = '.jpg' # Par défaut | |
| elif mime_type == 'application/pdf': | |
| suffix = '.pdf' | |
| else: | |
| suffix = '.tmp' | |
| # Créer un fichier temporaire | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: | |
| temp_file.write(file_data) | |
| temp_file_path = temp_file.name | |
| logger.info(f"Upload du fichier '{filename}' ({len(file_data)} bytes) vers l'API Files de Google GenAI...") | |
| # Upload vers l'API Files | |
| file_ref = client.files.upload(file=temp_file_path) | |
| # Nettoyer le fichier temporaire | |
| os.unlink(temp_file_path) | |
| logger.info(f"Fichier '{filename}' uploadé avec succès. Référence: {file_ref.name}") | |
| return file_ref | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'upload du fichier '{filename}' vers l'API Files: {e}") | |
| # Nettoyer le fichier temporaire en cas d'erreur | |
| try: | |
| if 'temp_file_path' in locals(): | |
| os.unlink(temp_file_path) | |
| except: | |
| pass | |
| return None | |
| def call_gemini_with_fallback(contents, task_id): | |
| """Appelle Gemini 2.5 Pro en premier, puis 2.5 Flash en cas d'échec.""" | |
| models_to_try = [ | |
| {"name": "gemini-2.5-pro", "display_name": " 2.5 Pro"}, | |
| {"name": "gemini-2.5-flash", "display_name": " 2.5 Flash"} | |
| ] | |
| last_error = None | |
| for i, model_info in enumerate(models_to_try): | |
| model_name = model_info["name"] | |
| model_display = model_info["display_name"] | |
| try: | |
| logger.info(f"[Task {task_id}] Tentative {i+1}/2: Appel de {model_display}...") | |
| # Mettre à jour le statut pour indiquer quel modèle est en cours d'utilisation | |
| if model_name == "gemini-2.5-pro": | |
| task_results[task_id]['status'] = 'generating_latex_pro' | |
| task_results[task_id]['current_model'] = 'Gemini 2.5 Pro' | |
| else: | |
| task_results[task_id]['status'] = 'generating_latex_flash' | |
| task_results[task_id]['current_model'] = 'Gemini 2.5 Flash (fallback)' | |
| # Faire l'appel à l'API | |
| gemini_response = client.models.generate_content( | |
| model=model_name, | |
| contents=contents, | |
| config=types.GenerateContentConfig(tools=[types.Tool(code_execution=types.ToolCodeExecution)]) | |
| ) | |
| # Vérifier si la réponse est valide | |
| full_latex_response = "" | |
| if gemini_response.candidates and gemini_response.candidates[0].content and gemini_response.candidates[0].content.parts: | |
| for part in gemini_response.candidates[0].content.parts: | |
| if hasattr(part, 'text') and part.text: | |
| full_latex_response += part.text | |
| if not full_latex_response.strip(): | |
| raise ValueError(f"Réponse vide de {model_display}") | |
| logger.info(f"[Task {task_id}] ✓ Succès avec {model_display}") | |
| task_results[task_id]['used_model'] = model_display | |
| return full_latex_response | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.warning(f"[Task {task_id}] ✗ Échec avec {model_display}: {error_msg}") | |
| last_error = e | |
| # Si c'est le dernier modèle et qu'il a échoué, on lève l'erreur | |
| if i == len(models_to_try) - 1: | |
| logger.error(f"[Task {task_id}] Tous les modèles ont échoué. Dernière erreur: {error_msg}") | |
| task_results[task_id]['model_failures'] = [ | |
| f"{models_to_try[0]['display_name']}: Échec", | |
| f"{models_to_try[1]['display_name']}: {error_msg}" | |
| ] | |
| raise last_error | |
| # Attendre un peu avant d'essayer le modèle suivant | |
| time.sleep(2) | |
| # Cette ligne ne devrait jamais être atteinte, mais au cas où | |
| raise last_error if last_error else Exception("Erreur inconnue lors des appels aux modèles Gemini") | |
| def get_all_tasks_info(): | |
| """Récupère toutes les informations des tâches pour le centre de gestion.""" | |
| tasks_info = [] | |
| for task_id, task_data in task_results.items(): | |
| task_info = { | |
| 'id': task_id, | |
| 'status': task_data['status'], | |
| 'style': task_data.get('style', 'unknown'), | |
| 'first_filename': task_data.get('first_filename', 'Unknown'), | |
| 'time_started': task_data.get('time_started', 0), | |
| 'time_started_formatted': datetime.fromtimestamp(task_data.get('time_started', 0)).strftime('%Y-%m-%d %H:%M:%S'), | |
| 'file_count': task_data.get('file_count', {'images': 0, 'pdfs': 0}), | |
| 'pdf_filename': task_data.get('pdf_filename'), | |
| 'error': task_data.get('error'), | |
| 'response': task_data.get('response', ''), | |
| 'used_model': task_data.get('used_model', 'N/A'), | |
| 'current_model': task_data.get('current_model', ''), | |
| 'model_failures': task_data.get('model_failures', []), | |
| 'user_images': [] | |
| } | |
| # Chercher les images utilisateur associées à cette tâche | |
| if os.path.exists(USER_IMAGES_DIR): | |
| for img_file in os.listdir(USER_IMAGES_DIR): | |
| if img_file.startswith(f"{task_id}_"): | |
| task_info['user_images'].append(img_file) | |
| tasks_info.append(task_info) | |
| # Trier par date de création (plus récent en premier) | |
| tasks_info.sort(key=lambda x: x['time_started'], reverse=True) | |
| return tasks_info | |
| def get_system_stats(): | |
| """Récupère les statistiques du système.""" | |
| total_tasks = len(task_results) | |
| completed_tasks = sum(1 for task in task_results.values() if task['status'] == 'completed') | |
| error_tasks = sum(1 for task in task_results.values() if task['status'] == 'error') | |
| pending_tasks = sum(1 for task in task_results.values() if task['status'] not in ['completed', 'error']) | |
| # Statistiques d'utilisation des modèles | |
| pro_usage = sum(1 for task in task_results.values() if task.get('used_model') == 'Gemini 2.5 Pro') | |
| flash_usage = sum(1 for task in task_results.values() if task.get('used_model') == 'Gemini 2.5 Flash (fallback)') | |
| # Compter les fichiers PDF générés | |
| pdf_files = 0 | |
| if os.path.exists(GENERATED_PDF_DIR): | |
| pdf_files = len([f for f in os.listdir(GENERATED_PDF_DIR) if f.endswith('.pdf')]) | |
| # Compter les images utilisateur | |
| user_images = 0 | |
| if os.path.exists(USER_IMAGES_DIR): | |
| user_images = len([f for f in os.listdir(USER_IMAGES_DIR) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp'))]) | |
| return { | |
| 'total_tasks': total_tasks, | |
| 'completed_tasks': completed_tasks, | |
| 'error_tasks': error_tasks, | |
| 'pending_tasks': pending_tasks, | |
| 'pdf_files': pdf_files, | |
| 'user_images': user_images, | |
| 'latex_installed': IS_LATEX_INSTALLED, | |
| 'pro_usage': pro_usage, | |
| 'flash_usage': flash_usage | |
| } | |
| def clean_latex_code(latex_code): | |
| """Extrait le code LaTeX brut des blocs de code formatés (```latex ... ```).""" | |
| logger.info("Nettoyage du code LaTeX reçu de Gemini...") | |
| match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) | |
| if match_latex: | |
| logger.info("Bloc de code 'latex' ou 'tex' trouvé et extrait.") | |
| return match_latex.group(1).strip() | |
| match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) | |
| if match_generic: | |
| logger.info("Bloc de code générique avec '\\documentclass' trouvé et extrait.") | |
| return match_generic.group(1).strip() | |
| logger.warning("Aucun bloc de code LaTeX (```...```) n'a été trouvé. Utilisation de la réponse brute.") | |
| return latex_code.strip() | |
| def latex_to_pdf(latex_code, output_filename_base, output_dir): | |
| """Compile une chaîne de code LaTeX en fichier PDF.""" | |
| if not IS_LATEX_INSTALLED: | |
| logger.error("Tentative de compilation LaTeX alors que pdflatex n'est pas disponible.") | |
| return None, "Erreur: pdflatex n'est pas installé sur le serveur." | |
| tex_filename = f"{output_filename_base}.tex" | |
| tex_path = os.path.join(output_dir, tex_filename) | |
| pdf_path = os.path.join(output_dir, f"{output_filename_base}.pdf") | |
| logger.info(f"Début de la compilation LaTeX vers PDF pour '{output_filename_base}'") | |
| try: | |
| with open(tex_path, "w", encoding="utf-8") as tex_file: | |
| tex_file.write(latex_code) | |
| logger.info(f"Fichier .tex '{tex_path}' créé avec succès.") | |
| my_env = os.environ.copy() | |
| my_env["LC_ALL"] = "C.UTF-8" | |
| my_env["LANG"] = "C.UTF-8" | |
| last_result = None | |
| for i in range(2): | |
| logger.info(f"Exécution de pdflatex - Passe {i+1}/2...") | |
| process = subprocess.run( | |
| ["pdflatex", "-interaction=nonstopmode", "-output-directory", output_dir, tex_path], | |
| capture_output=True, text=True, check=False, encoding="utf-8", errors="replace", env=my_env, timeout=60 | |
| ) | |
| last_result = process | |
| if not os.path.exists(pdf_path) and process.returncode != 0: | |
| logger.warning(f"La passe {i+1} de pdflatex a échoué et aucun PDF n'a été créé. Arrêt de la compilation.") | |
| break | |
| if os.path.exists(pdf_path): | |
| logger.info(f"PDF généré avec succès : '{pdf_path}'") | |
| return pdf_path, f"PDF généré: {os.path.basename(pdf_path)}" | |
| else: | |
| error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation disponible." | |
| logger.error(f"Échec de la compilation PDF pour '{tex_filename}'. Log de pdflatex:\n{error_log}") | |
| return None, f"Erreur de compilation PDF. Log: ...{error_log[-1000:]}" | |
| except Exception as e: | |
| logger.error(f"Exception pendant la génération du PDF: {e}", exc_info=True) | |
| return None, f"Exception durant la génération du PDF: {str(e)}" | |
| def send_to_telegram(file_data, filename, caption="Nouveau fichier uploadé"): | |
| """Envoie un fichier au canal Telegram configuré.""" | |
| logger.info(f"Préparation de l'envoi du fichier '{filename}' à Telegram.") | |
| try: | |
| if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" | |
| files = {'photo': (filename, file_data)} | |
| log_msg = f"Envoi de l'image '{filename}' à Telegram..." | |
| else: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" | |
| files = {'document': (filename, file_data)} | |
| log_msg = f"Envoi du document '{filename}' à Telegram..." | |
| logger.info(log_msg) | |
| data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} | |
| response = requests.post(url, files=files, data=data, timeout=30) | |
| response.raise_for_status() | |
| logger.info(f"Fichier '{filename}' envoyé avec succès à Telegram.") | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'envoi à Telegram: {e}", exc_info=True) | |
| # --- Logique Principale (Worker en arrière-plan) --- | |
| def process_files_background(task_id, files_data, resolution_style): | |
| """Fonction exécutée en thread pour traiter les fichiers, appeler Gemini et générer le PDF.""" | |
| logger.info(f"[Task {task_id}] Démarrage du traitement en arrière-plan.") | |
| task_results[task_id]['status'] = 'processing' | |
| uploaded_file_refs = [] | |
| try: | |
| if not client: | |
| raise ConnectionError("Le client Gemini n'est pas initialisé.") | |
| contents = [] | |
| logger.info(f"[Task {task_id}] Préparation des fichiers pour l'API Gemini.") | |
| # Traiter tous les fichiers en utilisant l'API Files | |
| for file_info in files_data: | |
| logger.info(f"[Task {task_id}] Traitement du fichier '{file_info['filename']}' ({file_info['type']}).") | |
| # Sauvegarder l'image utilisateur si c'est une image | |
| if file_info['type'].startswith('image/'): | |
| saved_filename = save_user_image(file_info['data'], file_info['filename'], task_id) | |
| # Upload vers l'API Files de Google GenAI | |
| file_ref = upload_file_to_genai_api( | |
| file_info['data'], | |
| file_info['filename'], | |
| file_info['type'] | |
| ) | |
| if file_ref: | |
| uploaded_file_refs.append(file_ref) | |
| contents.append(file_ref) | |
| logger.info(f"[Task {task_id}] Fichier '{file_info['filename']}' uploadé avec succès. Référence: {file_ref.name}") | |
| else: | |
| logger.warning(f"[Task {task_id}] Échec de l'upload du fichier '{file_info['filename']}'. Fichier ignoré.") | |
| if not contents: | |
| raise ValueError("Aucun fichier n'a pu être uploadé vers l'API Files de Google GenAI.") | |
| prompt_to_use = get_prompt_for_style(resolution_style) | |
| if not prompt_to_use: | |
| raise ValueError(f"Le fichier de prompt pour le style '{resolution_style}' est introuvable ou vide.") | |
| contents.append(prompt_to_use) | |
| # Appeler Gemini avec fallback Pro -> Flash | |
| logger.info(f"[Task {task_id}] Début des appels aux modèles Gemini avec {len(uploaded_file_refs)} fichier(s).") | |
| full_latex_response = call_gemini_with_fallback(contents, task_id) | |
| logger.info(f"[Task {task_id}] Réponse reçue de Gemini ({task_results[task_id].get('used_model', 'modèle inconnu')}).") | |
| logger.debug(f"[Task {task_id}] Réponse brute de Gemini:\n---\n{full_latex_response[:500]}...\n---") | |
| task_results[task_id]['status'] = 'cleaning_latex' | |
| cleaned_latex = clean_latex_code(full_latex_response) | |
| logger.debug(f"[Task {task_id}] Code LaTeX nettoyé:\n---\n{cleaned_latex[:500]}...\n---") | |
| task_results[task_id]['status'] = 'generating_pdf' | |
| pdf_filename_base = f"solution_{task_id}" | |
| pdf_file_path, pdf_message = latex_to_pdf(cleaned_latex, pdf_filename_base, GENERATED_PDF_DIR) | |
| if pdf_file_path: | |
| task_results[task_id]['status'] = 'completed' | |
| task_results[task_id]['pdf_filename'] = os.path.basename(pdf_file_path) | |
| used_model = task_results[task_id].get('used_model', 'modèle inconnu') | |
| task_results[task_id]['response'] = f"PDF généré avec succès: {os.path.basename(pdf_file_path)} (généré par {used_model})" | |
| logger.info(f"[Task {task_id}] Tâche terminée avec succès. PDF: {os.path.basename(pdf_file_path)} (modèle: {used_model})") | |
| else: | |
| raise RuntimeError(f"Échec de la génération du PDF: {pdf_message}") | |
| except Exception as e: | |
| logger.error(f"[Task {task_id}] Une erreur est survenue dans le thread de traitement.", exc_info=True) | |
| task_results[task_id]['status'] = 'error' | |
| task_results[task_id]['error'] = str(e) | |
| task_results[task_id]['response'] = f"Une erreur est survenue: {str(e)}" | |
| finally: | |
| # Nettoyer tous les fichiers uploadés vers l'API Files | |
| if uploaded_file_refs: | |
| logger.info(f"[Task {task_id}] Nettoyage des {len(uploaded_file_refs)} fichiers temporaires de l'API Files.") | |
| for file_ref in uploaded_file_refs: | |
| try: | |
| client.files.delete(file_ref) | |
| logger.info(f"[Task {task_id}] Fichier temporaire '{file_ref.name}' supprimé de l'API Files.") | |
| except Exception as del_e: | |
| logger.warning(f"[Task {task_id}] Échec de la suppression du fichier temporaire '{file_ref.name}': {del_e}") | |
| # --- Routes Flask (API Endpoints) --- | |
| def index(): | |
| logger.info(f"Requête servie pour l'endpoint '/' depuis {request.remote_addr}") | |
| return render_template('index.html') | |
| def admin_panel(): | |
| """Centre de gestion complet.""" | |
| logger.info(f"Accès au centre de gestion depuis {request.remote_addr}") | |
| tasks_info = get_all_tasks_info() | |
| system_stats = get_system_stats() | |
| return render_template('admin.html', tasks=tasks_info, stats=system_stats) | |
| def admin_api_tasks(): | |
| """API JSON pour récupérer les informations des tâches.""" | |
| tasks_info = get_all_tasks_info() | |
| return jsonify(tasks_info) | |
| def admin_api_stats(): | |
| """API JSON pour récupérer les statistiques système.""" | |
| stats = get_system_stats() | |
| return jsonify(stats) | |
| def admin_delete_task(task_id): | |
| """Supprime une tâche et ses fichiers associés.""" | |
| logger.info(f"Demande de suppression de la tâche {task_id}") | |
| if task_id not in task_results: | |
| return jsonify({'error': 'Tâche introuvable'}), 404 | |
| try: | |
| task_data = task_results[task_id] | |
| # Supprimer le PDF s'il existe | |
| if 'pdf_filename' in task_data: | |
| pdf_path = os.path.join(GENERATED_PDF_DIR, task_data['pdf_filename']) | |
| if os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| logger.info(f"PDF supprimé: {pdf_path}") | |
| # Supprimer les images utilisateur associées | |
| if os.path.exists(USER_IMAGES_DIR): | |
| for img_file in os.listdir(USER_IMAGES_DIR): | |
| if img_file.startswith(f"{task_id}_"): | |
| img_path = os.path.join(USER_IMAGES_DIR, img_file) | |
| os.remove(img_path) | |
| logger.info(f"Image utilisateur supprimée: {img_path}") | |
| # Supprimer la tâche de la mémoire | |
| del task_results[task_id] | |
| logger.info(f"Tâche {task_id} supprimée avec succès") | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la suppression de la tâche {task_id}: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def serve_user_image(filename): | |
| """Sert les images utilisateur.""" | |
| try: | |
| return send_from_directory(USER_IMAGES_DIR, filename) | |
| except FileNotFoundError: | |
| return "Image non trouvée", 404 | |
| def solve(): | |
| logger.info(f"Nouvelle requête sur /solve depuis {request.remote_addr}") | |
| try: | |
| if 'user_files' not in request.files: | |
| logger.warning(f"/solve: Requête de {request.remote_addr} sans 'user_files'.") | |
| return jsonify({'error': 'Aucun champ de fichier dans la requête'}), 400 | |
| uploaded_files = request.files.getlist('user_files') | |
| if not uploaded_files or all(f.filename == '' for f in uploaded_files): | |
| logger.warning(f"/solve: Requête de {request.remote_addr} avec champ 'user_files' mais sans fichiers.") | |
| return jsonify({'error': 'Aucun fichier sélectionné'}), 400 | |
| resolution_style = request.form.get('style', 'colorful') | |
| files_data = [] | |
| file_count = {'images': 0, 'pdfs': 0} | |
| for file in uploaded_files: | |
| if not file.filename: continue | |
| file_data = file.read() | |
| file_type = file.content_type or 'application/octet-stream' | |
| if file_type.startswith('image/'): | |
| file_count['images'] += 1 | |
| files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) | |
| send_to_telegram(file_data, file.filename, f"Image reçue: {file.filename} (Style: {resolution_style})") | |
| elif file_type == 'application/pdf': | |
| if file_count['pdfs'] >= 1: | |
| logger.warning(f"/solve: Requête de {request.remote_addr} avec plusieurs PDFs. Rejetée.") | |
| return jsonify({'error': 'Un seul fichier PDF est autorisé par requête'}), 400 | |
| file_count['pdfs'] += 1 | |
| files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) | |
| send_to_telegram(file_data, file.filename, f"PDF reçu: {file.filename} (Style: {resolution_style})") | |
| else: | |
| logger.warning(f"/solve: Fichier non supporté '{file.filename}' de type '{file_type}' uploadé par {request.remote_addr}.") | |
| if not files_data: | |
| logger.warning(f"/solve: Aucun fichier valide (image/pdf) trouvé dans la requête de {request.remote_addr}.") | |
| return jsonify({'error': 'Aucun fichier valide (image ou PDF) n\'a été fourni'}), 400 | |
| task_id = str(uuid.uuid4()) | |
| task_results[task_id] = { | |
| 'status': 'pending', 'response': '', 'error': None, 'time_started': time.time(), | |
| 'style': resolution_style, 'file_count': file_count, 'first_filename': files_data[0]['filename'] | |
| } | |
| logger.info(f"Création de la tâche {task_id} pour {file_count['images']} image(s) et {file_count['pdfs']} PDF(s). Style: {resolution_style}.") | |
| threading.Thread(target=process_files_background, args=(task_id, files_data, resolution_style)).start() | |
| return jsonify({'task_id': task_id, 'status': 'pending', 'first_filename': files_data[0]['filename']}) | |
| except Exception as e: | |
| logger.error(f"Erreur inattendue dans l'endpoint /solve: {e}", exc_info=True) | |
| return jsonify({'error': f'Erreur interne du serveur: {e}'}), 500 | |
| def get_task_status(task_id): | |
| logger.debug(f"Requête de statut pour la tâche {task_id}") | |
| task = task_results.get(task_id) | |
| if not task: | |
| logger.warning(f"Tentative d'accès à une tâche inexistante: {task_id}") | |
| return jsonify({'error': 'Tâche introuvable'}), 404 | |
| response_data = { | |
| 'status': task['status'], | |
| 'response': task.get('response'), | |
| 'error': task.get('error'), | |
| 'current_model': task.get('current_model', ''), | |
| 'used_model': task.get('used_model', ''), | |
| 'model_failures': task.get('model_failures', []) | |
| } | |
| if task['status'] == 'completed': | |
| response_data['download_url'] = f"/download/{task_id}" | |
| return jsonify(response_data) | |
| def stream_task_progress(task_id): | |
| """Endpoint pour Server-Sent Events (SSE) pour streamer la progression.""" | |
| def generate(): | |
| logger.info(f"Nouvelle connexion de streaming (SSE) pour la tâche {task_id}") | |
| last_status_sent = None | |
| last_model_sent = None | |
| while True: | |
| task = task_results.get(task_id) | |
| if not task: | |
| logger.warning(f"La tâche {task_id} a disparu pendant le streaming.") | |
| yield f'data: {json.dumps({"error": "La tâche a été perdue", "status": "error"})}\n\n' | |
| break | |
| current_status = task['status'] | |
| current_model = task.get('current_model', '') | |
| # Envoyer une mise à jour si le statut ou le modèle a changé | |
| if current_status != last_status_sent or current_model != last_model_sent: | |
| data_to_send = { | |
| "status": current_status, | |
| "current_model": current_model, | |
| "used_model": task.get('used_model', '') | |
| } | |
| if current_status == 'completed': | |
| data_to_send["response"] = task.get("response", "") | |
| data_to_send["download_url"] = f"/download/{task_id}" | |
| elif current_status == 'error': | |
| data_to_send["error"] = task.get("error", "Erreur inconnue") | |
| data_to_send["model_failures"] = task.get("model_failures", []) | |
| logger.info(f"[Task {task_id}] Envoi de la mise à jour de statut via SSE: {current_status} ({current_model})") | |
| yield f'data: {json.dumps(data_to_send)}\n\n' | |
| last_status_sent = current_status | |
| last_model_sent = current_model | |
| if current_status in ['completed', 'error']: | |
| logger.info(f"Fermeture de la connexion SSE pour la tâche terminée/échouée {task_id}") | |
| break | |
| time.sleep(1) | |
| return Response(stream_with_context(generate()), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}) | |
| def download_pdf(task_id): | |
| logger.info(f"Requête de téléchargement pour la tâche {task_id}") | |
| task = task_results.get(task_id) | |
| if not task or task['status'] != 'completed' or 'pdf_filename' not in task: | |
| logger.warning(f"Échec du téléchargement pour la tâche {task_id}: Fichier non trouvé ou tâche non terminée.") | |
| return "Fichier non trouvé ou la tâche n'est pas encore terminée.", 404 | |
| try: | |
| logger.info(f"Envoi du fichier '{task['pdf_filename']}' pour la tâche {task_id}") | |
| return send_from_directory(GENERATED_PDF_DIR, task['pdf_filename'], as_attachment=True) | |
| except FileNotFoundError: | |
| logger.error(f"Le fichier PDF '{task['pdf_filename']}' pour la tâche {task_id} est introuvable sur le disque.") | |
| return "Erreur: Fichier introuvable sur le serveur.", 404 | |
| if __name__ == '__main__': | |
| logger.info("Démarrage de l'application Flask.") | |
| # Création des répertoires nécessaires | |
| os.makedirs(GENERATED_PDF_DIR, exist_ok=True) | |
| os.makedirs(USER_IMAGES_DIR, exist_ok=True) | |
| logger.info(f"Répertoires assurés d'exister: '{GENERATED_PDF_DIR}' et '{USER_IMAGES_DIR}'") | |
| app.run(debug=True, host='0.0.0.0', port=5000) |