Spaces:
Paused
Paused
| from flask import Flask, render_template, request, jsonify, Response, stream_with_context | |
| from google import genai #hb Usevr's import | |
| import os | |
| from google.genai import types | |
| from PIL import Image | |
| import io | |
| import base64 | |
| import json | |
| import requests | |
| import threading | |
| import uuid | |
| import time | |
| import tempfile # Added | |
| import subprocess # Added | |
| import shutil # Added | |
| import re # Added | |
| app = Flask(__name__) | |
| # API Keys | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| # IMPORTANT: For production, move these to environment variables or a secure config | |
| TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" | |
| TELEGRAM_CHAT_ID = "-1002564204301" | |
| # Initialize Gemini client | |
| if GOOGLE_API_KEY: | |
| try: | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| except Exception as e: | |
| print(f"Erreur lors de l'initialisation du client Gemini: {e}") | |
| client = None | |
| else: | |
| print("GEMINI_API_KEY non trouvé. Le client Gemini ne sera pas initialisé.") | |
| client = None | |
| # --- PROMPT LOADING FUNCTIONS --- | |
| def load_prompt_from_file(filename): | |
| """Charge un prompt depuis un fichier texte""" | |
| try: | |
| # Construire le chemin vers le dossier prompts | |
| prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts') | |
| filepath = os.path.join(prompts_dir, filename) | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| print(f"ERREUR: Fichier prompt '{filename}' introuvable dans le dossier prompts/") | |
| return "" | |
| except Exception as e: | |
| print(f"ERREUR lors du chargement du prompt '{filename}': {e}") | |
| return "" | |
| def get_prompt_for_style(style): | |
| """Retourne le prompt approprié selon le style""" | |
| if style == 'light': | |
| return load_prompt_from_file('prompt_light.txt') | |
| else: # 'colorful' par défaut | |
| return load_prompt_from_file('prompt_colorful.txt') | |
| # Dictionnaire pour stocker les résultats des tâches en cours | |
| task_results = {} | |
| # --- LaTeX Helper Functions --- | |
| def check_latex_installation(): | |
| """Vérifie si pdflatex est installé sur le système""" | |
| try: | |
| subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10) | |
| print("INFO: pdflatex est installé et accessible.") | |
| return True | |
| except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: | |
| print(f"AVERTISSEMENT: pdflatex n'est pas installé ou ne fonctionne pas correctement: {e}") | |
| print("AVERTISSEMENT: La génération de PDF sera désactivée. Seuls les fichiers .tex seront envoyés.") | |
| return False | |
| IS_LATEX_INSTALLED = check_latex_installation() | |
| def clean_latex_code(latex_code): | |
| """Removes markdown code block fences (```latex ... ``` or ``` ... ```) if present.""" | |
| # Pattern for ```latex ... ``` | |
| match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) | |
| if match_latex: | |
| return match_latex.group(1).strip() | |
| # Pattern for generic ``` ... ```, checking if it likely contains LaTeX | |
| match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) | |
| if match_generic: | |
| return match_generic.group(1).strip() | |
| return latex_code.strip() # Default to stripping whitespace if no fences found | |
| def latex_to_pdf(latex_code, output_filename_base="document"): | |
| """ | |
| Converts LaTeX code to PDF. | |
| Returns: path_to_final_pdf (str) or None if error, message (str) | |
| The returned path_to_final_pdf is a temporary file that the caller is responsible for deleting. | |
| """ | |
| if not IS_LATEX_INSTALLED: | |
| return None, "pdflatex n'est pas disponible sur le système." | |
| # Create a temporary directory for LaTeX compilation files | |
| with tempfile.TemporaryDirectory() as temp_dir_compile: | |
| tex_filename = f"{output_filename_base}.tex" | |
| tex_path = os.path.join(temp_dir_compile, tex_filename) | |
| pdf_path_in_compile_dir = os.path.join(temp_dir_compile, f"{output_filename_base}.pdf") | |
| try: | |
| with open(tex_path, "w", encoding="utf-8") as tex_file: | |
| tex_file.write(latex_code) | |
| my_env = os.environ.copy() | |
| my_env["LC_ALL"] = "C.UTF-8" # Ensure UTF-8 locale for pdflatex | |
| my_env["LANG"] = "C.UTF-8" | |
| last_result = None | |
| for i in range(2): # Run pdflatex twice for references, TOC, etc. | |
| process = subprocess.run( | |
| ["pdflatex", "-interaction=nonstopmode", "-output-directory", temp_dir_compile, tex_path], | |
| capture_output=True, | |
| text=True, | |
| check=False, | |
| encoding="utf-8", | |
| errors="replace", | |
| env=my_env, | |
| ) | |
| last_result = process | |
| if not os.path.exists(pdf_path_in_compile_dir) and process.returncode != 0: | |
| break | |
| if os.path.exists(pdf_path_in_compile_dir): | |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf_out_file: | |
| shutil.copy(pdf_path_in_compile_dir, temp_pdf_out_file.name) | |
| final_pdf_path = temp_pdf_out_file.name | |
| return final_pdf_path, f"PDF généré avec succès: {os.path.basename(final_pdf_path)}" | |
| else: | |
| error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation." | |
| if "LaTeX Error: File `" in error_log: | |
| match = re.search(r"LaTeX Error: File `(.*?)' not found.", error_log) | |
| if match: | |
| return None, f"Erreur de compilation PDF: Fichier LaTeX manquant '{match.group(1)}'. Assurez-vous que tous les packages nécessaires (comme fontawesome5) sont installés." | |
| if "! Undefined control sequence." in error_log: | |
| return None, "Erreur de compilation PDF: Commande LaTeX non définie. Vérifiez le code LaTeX pour des erreurs de syntaxe." | |
| if "! LaTeX Error:" in error_log: | |
| match = re.search(r"! LaTeX Error: (.*?)\n", error_log) | |
| if match: | |
| return None, f"Erreur de compilation PDF: {match.group(1).strip()}" | |
| log_preview = error_log[-1000:] | |
| print(f"Erreur de compilation PDF complète pour {output_filename_base}:\n{error_log}") | |
| return None, f"Erreur lors de la compilation du PDF. Détails dans les logs du serveur. Aperçu: ...{log_preview}" | |
| except subprocess.TimeoutExpired: | |
| print(f"Timeout lors de la compilation de {output_filename_base}.tex") | |
| return None, "Timeout lors de la compilation du PDF. Le document est peut-être trop complexe ou contient une boucle infinie." | |
| except Exception as e: | |
| print(f"Exception inattendue lors de la génération du PDF ({output_filename_base}): {e}") | |
| return None, f"Exception inattendue lors de la génération du PDF: {str(e)}" | |
| # --- Telegram Functions --- | |
| def send_to_telegram(file_data, filename, caption="Nouveau fichier uploadé"): | |
| """Envoie un fichier (image ou PDF) à un chat Telegram spécifié""" | |
| try: | |
| if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): | |
| # Envoyer comme photo | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" | |
| files = {'photo': (filename, file_data)} | |
| else: | |
| # Envoyer comme document | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" | |
| files = {'document': (filename, file_data)} | |
| data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} | |
| response = requests.post(url, files=files, data=data, timeout=30) | |
| if response.status_code == 200: | |
| print(f"Fichier '{filename}' envoyé avec succès à Telegram") | |
| return True | |
| else: | |
| print(f"Erreur lors de l'envoi de '{filename}' à Telegram: {response.status_code} - {response.text}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| print(f"Exception Request lors de l'envoi de '{filename}' à Telegram: {e}") | |
| return False | |
| except Exception as e: | |
| print(f"Exception générale lors de l'envoi de '{filename}' à Telegram: {e}") | |
| return False | |
| def send_document_to_telegram(content_or_path, filename="reponse.txt", caption="Réponse", is_pdf=False): | |
| """Envoie un fichier (texte ou PDF) à un chat Telegram spécifié""" | |
| try: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" | |
| files = None | |
| if is_pdf: | |
| with open(content_or_path, 'rb') as f_pdf: | |
| files = {'document': (filename, f_pdf.read(), 'application/pdf')} | |
| else: # Assuming text content | |
| files = {'document': (filename, content_or_path.encode('utf-8'), 'text/plain')} | |
| data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} | |
| response = requests.post(url, files=files, data=data, timeout=60) | |
| if response.status_code == 200: | |
| print(f"Document '{filename}' envoyé avec succès à Telegram.") | |
| return True | |
| else: | |
| print(f"Erreur lors de l'envoi du document '{filename}' à Telegram: {response.status_code} - {response.text}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| print(f"Exception Request lors de l'envoi du document '{filename}' à Telegram: ") | |
| return False | |
| except Exception as e: | |
| print(f"Exception générale lors de l'envoi du document '{filename}' à Telegram:") | |
| return False | |
| # --- Background File Processing --- | |
| def process_files_background(task_id, files_data, resolution_style='colorful'): | |
| """Traite les fichiers (images + PDF), génère LaTeX, convertit en PDF, et envoie via Telegram.""" | |
| pdf_file_to_clean = None | |
| uploaded_file_refs = [] | |
| try: | |
| task_results[task_id]['status'] = 'processing' | |
| if not client: | |
| raise ConnectionError("Client Gemini non initialisé. Vérifiez la clé API et la configuration.") | |
| # Préparer le contenu pour Gemini | |
| contents = [] | |
| # Traiter chaque fichier | |
| for file_info in files_data: | |
| filename = file_info['filename'] | |
| file_data = file_info['data'] | |
| file_type = file_info['type'] | |
| if file_type.startswith('image/'): | |
| # Traitement des images (comme avant) | |
| img = Image.open(io.BytesIO(file_data)) | |
| buffered = io.BytesIO() | |
| img.save(buffered, format="PNG") | |
| img_base64_str = base64.b64encode(buffered.getvalue()).decode() | |
| contents.append({ | |
| 'inline_data': { | |
| 'mime_type': 'image/png', | |
| 'data': img_base64_str | |
| } | |
| }) | |
| elif file_type == 'application/pdf': | |
| # Upload du PDF vers l'API Gemini | |
| try: | |
| # Créer un fichier temporaire pour le PDF | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf: | |
| temp_pdf.write(file_data) | |
| temp_pdf_path = temp_pdf.name | |
| # Upload vers l'API Gemini | |
| file_ref = client.files.upload(file=temp_pdf_path) | |
| uploaded_file_refs.append(file_ref) | |
| contents.append(file_ref) | |
| # Nettoyer le fichier temporaire | |
| os.unlink(temp_pdf_path) | |
| print(f"Task {task_id}: PDF '{filename}' uploadé vers Gemini avec succès") | |
| except Exception as e: | |
| print(f"Task {task_id}: Erreur lors de l'upload du PDF '{filename}': {e}") | |
| raise ValueError(f"Impossible d'uploader le PDF '{filename}': {str(e)}") | |
| if not contents: | |
| raise ValueError("Aucun contenu valide trouvé dans les fichiers uploadés") | |
| # Ajouter le prompt à la fin | |
| prompt_to_use = get_prompt_for_style(resolution_style) | |
| if not prompt_to_use: | |
| raise ValueError(f"Impossible de charger le prompt pour le style '{resolution_style}'") | |
| contents.append(prompt_to_use) | |
| full_latex_response = "" | |
| try: | |
| task_results[task_id]['status'] = 'generating_latex' | |
| print(f"Task {task_id}: Génération LaTeX par Gemini (style: {resolution_style})...") | |
| print(f"Task {task_id}: Nombre d'éléments de contenu: {len(contents)}") | |
| gemini_response = client.models.generate_content( | |
| model="gemini-2.5-pro", | |
| contents=contents, | |
| config=types.GenerateContentConfig( | |
| tools=[types.Tool( | |
| code_execution=types.ToolCodeExecution | |
| )] | |
| ) | |
| ) | |
| if gemini_response.candidates: | |
| candidate = gemini_response.candidates[0] | |
| if candidate.content and candidate.content.parts: | |
| for part in candidate.content.parts: | |
| if hasattr(part, 'text') and part.text: | |
| full_latex_response += part.text | |
| elif hasattr(candidate, 'text') and candidate.text: | |
| full_latex_response = candidate.text | |
| elif hasattr(gemini_response, 'text') and gemini_response.text: | |
| full_latex_response = gemini_response.text | |
| if not full_latex_response.strip(): | |
| raise ValueError(" a retourné une réponse vide ou sans contenu textuel.") | |
| print(f"Task {task_id}: LaTeX brut reçu de Gemini (longueur: {len(full_latex_response)}).") | |
| task_results[task_id]['status'] = 'cleaning_latex' | |
| cleaned_latex = clean_latex_code(full_latex_response) | |
| print(f"Task {task_id}: LaTeX nettoyé (longueur: {len(cleaned_latex)}).") | |
| if not IS_LATEX_INSTALLED: | |
| print(f"Task {task_id}: pdflatex non disponible. Envoi du .tex uniquement.") | |
| send_document_to_telegram( | |
| cleaned_latex, | |
| filename=f"solution_{task_id}.tex", | |
| caption=f"Code LaTeX pour tâche {task_id} (pdflatex non disponible)" | |
| ) | |
| task_results[task_id]['status'] = 'completed_tex_only' | |
| task_results[task_id]['response'] = cleaned_latex | |
| return | |
| task_results[task_id]['status'] = 'generating_pdf' | |
| print(f"Task {task_id}: Génération du PDF...") | |
| pdf_filename_base = f"solution_{task_id}" | |
| pdf_file_to_clean, pdf_message = latex_to_pdf(cleaned_latex, output_filename_base=pdf_filename_base) | |
| if pdf_file_to_clean: | |
| print(f"Task {task_id}: PDF généré: {pdf_file_to_clean}. Envoi à Telegram...") | |
| send_document_to_telegram( | |
| pdf_file_to_clean, | |
| filename=f"{pdf_filename_base}.pdf", | |
| caption=f"Solution PDF pour tâche {task_id}", | |
| is_pdf=True | |
| ) | |
| task_results[task_id]['status'] = 'completed' | |
| task_results[task_id]['response'] = cleaned_latex | |
| else: | |
| print(f"Task {task_id}: Échec de la génération PDF: {pdf_message}. Envoi du .tex en fallback.") | |
| task_results[task_id]['status'] = 'pdf_error' | |
| task_results[task_id]['error_detail'] = f"Erreur PDF: {pdf_message}" | |
| send_document_to_telegram( | |
| cleaned_latex, | |
| filename=f"solution_{task_id}.tex", | |
| caption=f"Code LaTeX pour tâche {task_id} (Erreur PDF: {pdf_message[:150]})" | |
| ) | |
| task_results[task_id]['response'] = cleaned_latex | |
| except Exception as e_gen: | |
| print(f"Task {task_id}: Erreur lors de la génération Gemini ou traitement PDF: {e_gen}") | |
| task_results[task_id]['status'] = 'error' | |
| task_results[task_id]['error'] = f"Erreur de traitement: " | |
| send_document_to_telegram( | |
| f"Erreur lors du traitement de la tâche {task_id}: ", | |
| filename=f"error_{task_id}.txt", | |
| caption=f"Erreur tâche {task_id}" | |
| ) | |
| task_results[task_id]['response'] = f"Erreur: {str(e_gen)}" | |
| except Exception as e_outer: | |
| print(f"Task {task_id}: Exception majeure dans la tâche de fond: {e_outer}") | |
| task_results[task_id]['status'] = 'error' | |
| task_results[task_id]['error'] = f"Erreur système: {str(e_outer)}" | |
| task_results[task_id]['response'] = f"Erreur système: {str(e_outer)}" | |
| finally: | |
| # Nettoyer les fichiers temporaires | |
| if pdf_file_to_clean and os.path.exists(pdf_file_to_clean): | |
| try: | |
| os.remove(pdf_file_to_clean) | |
| print(f"Task {task_id}: Fichier PDF temporaire '{pdf_file_to_clean}' supprimé.") | |
| except Exception as e_clean: | |
| print(f"Task {task_id}: Erreur lors de la suppression du PDF temporaire '{pdf_file_to_clean}': {e_clean}") | |
| # Nettoyer les références de fichiers uploadés (optionnel, ils expirent automatiquement) | |
| for file_ref in uploaded_file_refs: | |
| try: | |
| # Les fichiers uploadés vers l'API Gemini expirent automatiquement | |
| # mais on peut les supprimer explicitement si nécessaire | |
| pass | |
| except Exception as e_del: | |
| print(f"Task {task_id}: Erreur lors de la suppression de la référence de fichier:") | |
| # --- Flask Routes --- | |
| def index(): | |
| return render_template('index.html') | |
| def free(): | |
| return render_template('index.html') | |
| def solve(): | |
| try: | |
| # Vérifier si des fichiers ont été envoyés | |
| if 'user_files' not in request.files: | |
| return jsonify({'error': 'Aucun fichier fourni'}), 400 | |
| uploaded_files = request.files.getlist('user_files') | |
| if not uploaded_files or all(f.filename == '' for f in uploaded_files): | |
| return jsonify({'error': 'Aucun fichier sélectionné'}), 400 | |
| # Récupérer le style de résolution | |
| resolution_style = request.form.get('style', 'colorful') | |
| print(f"Style de résolution sélectionné: {resolution_style}") | |
| # Préparer les données des fichiers | |
| files_data = [] | |
| file_count = {'images': 0, 'pdfs': 0} | |
| for file in uploaded_files: | |
| if file.filename == '': | |
| continue | |
| file_data = file.read() | |
| file_type = file.content_type or 'application/octet-stream' | |
| # Vérifier le type de fichier | |
| if file_type.startswith('image/'): | |
| file_count['images'] += 1 | |
| files_data.append({ | |
| 'filename': file.filename, | |
| 'data': file_data, | |
| 'type': file_type | |
| }) | |
| # Envoyer l'image à Telegram | |
| send_to_telegram( | |
| file_data, | |
| file.filename, | |
| f" Mariam(Pro) - Style: {resolution_style}" | |
| ) | |
| elif file_type == 'application/pdf': | |
| file_count['pdfs'] += 1 | |
| if file_count['pdfs'] > 1: | |
| return jsonify({'error': 'Vous ne pouvez uploader qu\'un seul fichier PDF à la fois'}), 400 | |
| files_data.append({ | |
| 'filename': file.filename, | |
| 'data': file_data, | |
| 'type': file_type | |
| }) | |
| # Envoyer le PDF à Telegram | |
| send_to_telegram( | |
| file_data, | |
| file.filename, | |
| f"PDF uploadé - Style: {resolution_style}" | |
| ) | |
| else: | |
| print(f"Type de fichier ignoré: {file_type} pour {file.filename}") | |
| continue | |
| if not files_data: | |
| return jsonify({'error': 'Aucun fichier valide trouvé (seules les images et les PDF sont acceptés)'}), 400 | |
| print(f"Fichiers traités: {file_count['images']} image(s), {file_count['pdfs']} PDF(s)") | |
| task_id = str(uuid.uuid4()) | |
| task_results[task_id] = { | |
| 'status': 'pending', | |
| 'response': '', | |
| 'error': None, | |
| 'time_started': time.time(), | |
| 'style': resolution_style, | |
| 'file_count': file_count | |
| } | |
| threading.Thread( | |
| target=process_files_background, | |
| args=(task_id, files_data, resolution_style) | |
| ).start() | |
| return jsonify({ | |
| 'task_id': task_id, | |
| 'status': 'pending', | |
| 'style': resolution_style, | |
| 'file_count': file_count | |
| }) | |
| except Exception as e: | |
| print(f"Exception lors de la création de la tâche: {e}") | |
| return jsonify({'error': f'Une erreur serveur est survenue:'}), 500 | |
| def get_task_status(task_id): | |
| if task_id not in task_results: | |
| return jsonify({'error': 'Tâche introuvable'}), 404 | |
| task = task_results[task_id] | |
| # Basic cleanup logic | |
| current_time = time.time() | |
| if task['status'] in ['completed', 'error', 'pdf_error', 'completed_tex_only'] and \ | |
| (current_time - task.get('time_started', 0) > 3600): # Cleanup after 1 hour | |
| pass | |
| response_data = { | |
| 'status': task['status'], | |
| 'response': task.get('response'), | |
| 'error': task.get('error') | |
| } | |
| if task.get('error_detail'): | |
| response_data['error_detail'] = task.get('error_detail') | |
| return jsonify(response_data) | |
| def stream_task_progress(task_id): | |
| def generate(): | |
| if task_id not in task_results: | |
| yield f'data: {json.dumps({"error": "Tâche introuvable", "status": "error"})}\n\n' | |
| return | |
| last_status_sent = None | |
| while True: | |
| task = task_results.get(task_id) | |
| if not task: | |
| yield f'data: {json.dumps({"error": "Tâche disparue ou nettoyée", "status": "error"})}\n\n' | |
| break | |
| current_status = task['status'] | |
| if current_status != last_status_sent: | |
| data_to_send = {"status": current_status} | |
| if current_status == 'completed' or current_status == 'completed_tex_only': | |
| data_to_send["response"] = task.get("response", "") | |
| elif current_status == 'error' or current_status == 'pdf_error': | |
| data_to_send["error"] = task.get("error", "Erreur inconnue") | |
| if task.get("error_detail"): | |
| data_to_send["error_detail"] = task.get("error_detail") | |
| if task.get("response"): | |
| data_to_send["response"] = task.get("response") | |
| yield f'data: {json.dumps(data_to_send)}\n\n' | |
| last_status_sent = current_status | |
| if current_status in ['completed', 'error', 'pdf_error', 'completed_tex_only']: | |
| break | |
| time.sleep(1) | |
| return Response( | |
| stream_with_context(generate()), | |
| mimetype='text/event-stream', | |
| headers={ | |
| 'Cache-Control': 'no-cache', | |
| 'X-Accel-Buffering': 'no', | |
| 'Connection': 'keep-alive' | |
| } | |
| ) | |
| if __name__ == '__main__': | |
| if not GOOGLE_API_KEY: | |
| print("CRITICAL: GOOGLE_API_KEY variable d'environnement non définie. L'application risque de ne pas fonctionner.") | |
| if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: | |
| print("CRITICAL: TELEGRAM_BOT_TOKEN ou TELEGRAM_CHAT_ID non définis. L'intégration Telegram échouera.") | |
| app.run(debug=True, host='0.0.0.0', port=5000) |