Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import base64 | |
| import json | |
| import uuid | |
| import google.generativeai as genai | |
| from datetime import datetime | |
| from functools import wraps | |
| from flask import Flask, render_template, request, jsonify, session, redirect | |
| from dotenv import load_dotenv | |
| from werkzeug.utils import secure_filename | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure Google Gemini API | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if not api_key: | |
| logger.warning("GEMINI_API_KEY not found in environment variables") | |
| else: | |
| logger.info("GEMINI_API_KEY found. API configured successfully.") | |
| genai.configure(api_key=api_key) | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| app.secret_key = os.environ.get("SESSION_SECRET", "default-dev-secret-key") | |
| app.config['UPLOAD_FOLDER'] = 'static/uploads' | |
| app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 # 10 MB max | |
| # Middleware to ensure user has a session_id | |
| def session_required(f): | |
| def decorated_function(*args, **kwargs): | |
| if 'session_id' not in session: | |
| session['session_id'] = str(uuid.uuid4()) | |
| logger.info(f"Created new session: {session['session_id']}") | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| # Ensure upload directory exists | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| # Configure Gemini model with specific parameters for better responses | |
| model = genai.GenerativeModel( | |
| model_name='gemini-2.0-flash', | |
| generation_config={ | |
| 'temperature': 0.7, # Slightly creative but still focused | |
| 'top_p': 0.9, # Diverse output but not too random | |
| 'top_k': 40, # Reasonable range of tokens to consider | |
| 'max_output_tokens': 2048 # Allow longer responses | |
| } | |
| ) | |
| # Configure Gemini vision model for image processing | |
| vision_model = genai.GenerativeModel('gemini-2.0-vision-flash') | |
| def index(): | |
| """Render the chat interface.""" | |
| return render_template('index.html') | |
| def chat(): | |
| """Process chat messages and get responses from Gemini API.""" | |
| try: | |
| data = request.json | |
| user_message = data.get('message', '') | |
| chat_history = data.get('history', []) | |
| image_data = data.get('image', None) | |
| if not user_message and not image_data: | |
| return jsonify({'error': 'Veuillez entrer un message ou joindre une image.'}), 400 | |
| # Log the incoming request (but not full chat history for privacy) | |
| session_id = session.get('session_id') | |
| logger.info(f"Received chat request from session {session_id}. Message length: {len(user_message)}") | |
| # Format conversation history for context | |
| formatted_history = [] | |
| for msg in chat_history[-15:]: # Use the last 15 messages for more context | |
| role = "user" if msg['sender'] == 'user' else "model" | |
| formatted_history.append({"role": role, "parts": [msg['text']]}) | |
| # Handle image processing if images are included | |
| if image_data: | |
| try: | |
| parts = [] | |
| # Process single image or multiple images | |
| if isinstance(image_data, list): | |
| # Handle multiple images | |
| for img in image_data: | |
| if isinstance(img, str) and ',' in img: | |
| # Extract the base64 part after the comma | |
| image_base64 = img.split(',')[1] | |
| # Create image part | |
| image = genai.types.Part.from_data( | |
| data=base64.b64decode(image_base64), | |
| mime_type="image/jpeg" | |
| ) | |
| parts.append(image) | |
| # Save each image | |
| session_id = session.get('session_id') | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| os.makedirs(session_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = secure_filename(f"image_{timestamp}_{len(parts)}.jpg") | |
| filepath = os.path.join(session_dir, filename) | |
| with open(filepath, "wb") as f: | |
| f.write(base64.b64decode(image_base64)) | |
| else: | |
| # Handle single image | |
| image_base64 = image_data.split(',')[1] | |
| image = genai.types.Part.from_data( | |
| data=base64.b64decode(image_base64), | |
| mime_type="image/jpeg" | |
| ) | |
| parts.append(image) | |
| # Save image with timestamp in the session directory | |
| session_id = session.get('session_id') | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| os.makedirs(session_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = secure_filename(f"image_{timestamp}.jpg") | |
| filepath = os.path.join(session_dir, filename) | |
| with open(filepath, "wb") as f: | |
| f.write(base64.b64decode(image_base64)) | |
| # Add text message if provided | |
| if user_message: | |
| parts.append(user_message) | |
| # Generate response using vision model | |
| response = vision_model.generate_content(parts) | |
| return jsonify({'response': response.text}) | |
| except Exception as img_error: | |
| logger.error(f"Error processing image: {str(img_error)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors du traitement de l\'image. Veuillez réessayer.' | |
| }), 500 | |
| else: | |
| # Text-only processing | |
| # Create a chat session with history | |
| chat = model.start_chat(history=formatted_history) | |
| # Generate response | |
| response = chat.send_message(user_message) | |
| # Log successful response | |
| logger.info(f"Generated response successfully. Response length: {len(response.text)}") | |
| # Return the response | |
| return jsonify({'response': response.text}) | |
| except genai.types.generation_types.BlockedPromptException as be: | |
| logger.warning(f"Content blocked: {str(be)}") | |
| return jsonify({ | |
| 'error': 'Votre message ou la conversation ne peut pas être traitée car elle contient du contenu potentiellement inapproprié.' | |
| }), 400 | |
| except Exception as e: | |
| logger.error(f"Error in chat endpoint: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, j\'ai rencontré une erreur lors du traitement de votre demande. Veuillez réessayer.' | |
| }), 500 | |
| def save_chat(): | |
| """Save the current chat history.""" | |
| try: | |
| session_id = session.get('session_id') | |
| # Create session-specific directory | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| os.makedirs(session_dir, exist_ok=True) | |
| data = request.json | |
| chat_history = data.get('history', []) | |
| if not chat_history: | |
| return jsonify({'error': 'Aucune conversation à sauvegarder.'}), 400 | |
| # Generate filename with timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"chat_{timestamp}.json" | |
| filepath = os.path.join(session_dir, filename) | |
| # Save chat history to file | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(chat_history, f, ensure_ascii=False, indent=2) | |
| return jsonify({'success': True, 'filename': filename}) | |
| except Exception as e: | |
| logger.error(f"Error saving chat: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors de la sauvegarde de la conversation.' | |
| }), 500 | |
| def load_chats(): | |
| """Get a list of saved chat files for current session.""" | |
| try: | |
| session_id = session.get('session_id') | |
| # Get session-specific directory | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| # If the directory doesn't exist yet, return empty list | |
| if not os.path.exists(session_dir): | |
| return jsonify({'chats': []}) | |
| chat_files = [] | |
| for filename in os.listdir(session_dir): | |
| if filename.startswith('chat_') and filename.endswith('.json'): | |
| # Extract timestamp from filename | |
| timestamp = filename[5:-5] # Remove 'chat_' and '.json' | |
| # Add to list | |
| chat_files.append({ | |
| 'filename': filename, | |
| 'timestamp': timestamp | |
| }) | |
| # Sort by timestamp (newest first) | |
| chat_files.sort(key=lambda x: x['timestamp'], reverse=True) | |
| logger.info(f"Loaded {len(chat_files)} chats for session {session_id}") | |
| return jsonify({'chats': chat_files}) | |
| except Exception as e: | |
| logger.error(f"Error loading chat list: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors du chargement des conversations.' | |
| }), 500 | |
| def load_chat(filename): | |
| """Load a specific chat history file.""" | |
| try: | |
| session_id = session.get('session_id') | |
| # Load from session-specific directory | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| filepath = os.path.join(session_dir, secure_filename(filename)) | |
| if not os.path.exists(filepath): | |
| return jsonify({'error': 'Conversation introuvable.'}), 404 | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| chat_history = json.load(f) | |
| logger.info(f"Loaded chat {filename} for session {session_id}") | |
| return jsonify({'history': chat_history}) | |
| except Exception as e: | |
| logger.error(f"Error loading chat file: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors du chargement de la conversation.' | |
| }), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=5000, debug=True) | |