Spaces:
Paused
Paused
| from flask import Flask, render_template, request, jsonify, Response, stream_with_context | |
| from google import genai | |
| import os | |
| from PIL import Image | |
| import io | |
| import base64 | |
| import json | |
| import requests | |
| import threading | |
| import uuid | |
| import time | |
| app = Flask(__name__) | |
| # API Keys | |
| GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY") | |
| TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" | |
| TELEGRAM_CHAT_ID = "-1002497861230" | |
| # Initialize Gemini client | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| # Dictionnaire pour stocker les résultats des tâches en cours | |
| task_results = {} | |
| def send_to_telegram(image_data, caption="Nouvelle image uploadée"): | |
| """Envoie l'image à un chat Telegram spécifié""" | |
| try: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" | |
| files = {'photo': ('image.png', image_data)} | |
| data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} | |
| response = requests.post(url, files=files, data=data) | |
| if response.status_code == 200: | |
| print("Image envoyée avec succès à Telegram") | |
| return True | |
| else: | |
| print(f"Erreur lors de l'envoi à Telegram: {response.text}") | |
| return False | |
| except Exception as e: | |
| print(f"Exception lors de l'envoi à Telegram: {e}") | |
| return False | |
| def send_document_to_telegram(text_content, filename="reponse.txt", caption="Réponse"): | |
| """Envoie un fichier texte à un chat Telegram spécifié""" | |
| try: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" | |
| files = {'document': (filename, text_content.encode('utf-8'), 'text/plain')} | |
| data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} | |
| response = requests.post(url, files=files, data=data) | |
| if response.status_code == 200: | |
| print("Document envoyé avec succès à Telegram") | |
| return True | |
| else: | |
| print(f"Erreur lors de l'envoi du document à Telegram: {response.text}") | |
| return False | |
| except Exception as e: | |
| print(f"Exception lors de l'envoi du document à Telegram: {e}") | |
| return False | |
| def process_image_background(task_id, image_data): | |
| """Traite l'image en arrière-plan et met à jour le statut de la tâche""" | |
| try: | |
| # Mettre à jour le statut | |
| task_results[task_id]['status'] = 'processing' | |
| # Ouvrir l'image pour la traiter | |
| img = Image.open(io.BytesIO(image_data)) | |
| # Traitement pour Gemini | |
| buffered = io.BytesIO() | |
| img.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| # Générer une réponse complète | |
| full_response = "" | |
| try: | |
| response = client.models.generate_content( | |
| model="gemini-2.5-pro-exp-03-25", | |
| contents=[ | |
| {'inline_data': {'mime_type': 'image/png', 'data': img_str}}, | |
| "Résous ça en français with rendering latex" | |
| ]) | |
| # Extraire le texte complet | |
| for part in response.candidates[0].content.parts: | |
| full_response += part.text | |
| # Envoyer la réponse par Telegram | |
| send_document_to_telegram( | |
| full_response, | |
| filename=f"reponse_{task_id}.txt", | |
| caption=f"Réponse pour la tâche {task_id}" | |
| ) | |
| # Mettre à jour le résultat | |
| task_results[task_id]['status'] = 'completed' | |
| task_results[task_id]['response'] = full_response | |
| except Exception as e: | |
| print(f"Error during generation: {e}") | |
| task_results[task_id]['status'] = 'error' | |
| task_results[task_id]['error'] = str(e) | |
| except Exception as e: | |
| print(f"Exception in background task: {e}") | |
| task_results[task_id]['status'] = 'error' | |
| task_results[task_id]['error'] = str(e) | |
| def index(): | |
| return render_template('index.html') | |
| def solve(): | |
| try: | |
| # Lire l'image | |
| image_data = request.files['image'].read() | |
| # Envoyer l'image à Telegram | |
| send_to_telegram(image_data, "Nouvelle image pour résolution") | |
| # Créer un identifiant unique pour cette tâche | |
| task_id = str(uuid.uuid4()) | |
| # Initialiser le dictionnaire de résultats pour cette tâche | |
| task_results[task_id] = { | |
| 'status': 'pending', | |
| 'response': '', | |
| 'time_started': time.time() | |
| } | |
| # Lancer le traitement en arrière-plan | |
| threading.Thread( | |
| target=process_image_background, | |
| args=(task_id, image_data) | |
| ).start() | |
| # Retourner immédiatement l'ID de la tâche | |
| return jsonify({ | |
| 'task_id': task_id, | |
| 'status': 'pending' | |
| }) | |
| except Exception as e: | |
| print(f"Exception during task creation: {e}") | |
| return jsonify({'error': 'Une erreur inattendue est survenue'}), 500 | |
| def get_task_status(task_id): | |
| """Récupère le statut d'une tâche en cours""" | |
| if task_id not in task_results: | |
| return jsonify({'error': 'Tâche introuvable'}), 404 | |
| task = task_results[task_id] | |
| # Nettoyer les tâches terminées après 30 minutes | |
| current_time = time.time() | |
| if (task['status'] in ['completed', 'error'] and | |
| current_time - task.get('time_started', 0) > 1800): | |
| # Ne pas supprimer maintenant, mais prévoir un nettoyage futur | |
| pass | |
| response = { | |
| 'status': task['status'] | |
| } | |
| if task['status'] == 'completed': | |
| response['response'] = task['response'] | |
| elif task['status'] == 'error': | |
| response['error'] = task.get('error', 'Une erreur inconnue est survenue') | |
| return jsonify(response) | |
| def stream_task_progress(task_id): | |
| """Stream les mises à jour de progression d'une tâche""" | |
| def generate(): | |
| if task_id not in task_results: | |
| yield f'data: {json.dumps({"error": "Tâche introuvable"})}\n\n' | |
| return | |
| last_status = None | |
| while True: | |
| task = task_results.get(task_id) | |
| if not task: | |
| yield f'data: {json.dumps({"error": "Tâche introuvable"})}\n\n' | |
| break | |
| current_status = task['status'] | |
| # Si le statut a changé, envoyer une mise à jour | |
| if current_status != last_status: | |
| yield f'data: {json.dumps({"status": current_status})}\n\n' | |
| last_status = current_status | |
| # Si la tâche est terminée ou en erreur, envoyer les résultats et terminer | |
| if current_status == 'completed': | |
| yield f'data: {json.dumps({"status": "completed", "response": task["response"]})}\n\n' | |
| break | |
| elif current_status == 'error': | |
| yield f'data: {json.dumps({"status": "error", "error": task.get("error", "Une erreur est survenue")})}\n\n' | |
| break | |
| # Attendre un peu avant la prochaine vérification | |
| time.sleep(0.5) | |
| return Response( | |
| stream_with_context(generate()), | |
| mimetype='text/event-stream', | |
| headers={ | |
| 'Cache-Control': 'no-cache', | |
| 'X-Accel-Buffering': 'no' | |
| } | |
| ) | |
| if __name__ == '__main__': | |
| app.run(debug=True) |