Spaces:
Sleeping
Sleeping
| # --- START OF FILE GeminiChatbot/app.py --- | |
| import os | |
| import logging | |
| import base64 | |
| import json | |
| import uuid | |
| import google.generativeai as genai | |
| from datetime import datetime | |
| from functools import wraps | |
| from flask import Flask, render_template, request, jsonify, session, redirect | |
| from dotenv import load_dotenv | |
| from werkzeug.utils import secure_filename | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure Google Gemini API | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if not api_key: | |
| logger.warning("GEMINI_API_KEY not found in environment variables") | |
| else: | |
| genai.configure(api_key=api_key) | |
| logger.info("GEMINI_API_KEY found. API configured successfully.") | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| app.secret_key = os.environ.get("SESSION_SECRET", "default-dev-secret-key") | |
| app.config['UPLOAD_FOLDER'] = 'static/uploads' | |
| app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 # 10 MB max | |
| # Middleware to ensure user has a session_id | |
| def session_required(f): | |
| def decorated_function(*args, **kwargs): | |
| if 'session_id' not in session: | |
| session['session_id'] = str(uuid.uuid4()) | |
| logger.info(f"Created new session: {session['session_id']}") | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| # Ensure upload directory exists | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| # Initialize models only if API key is present | |
| model = None | |
| vision_model = None | |
| if api_key: | |
| # Configure Gemini model with specific parameters for better responses | |
| model = genai.GenerativeModel( | |
| model_name='gemini-1.0-pro', # Changed model name as 'gemini-2.0-flash' is not standard | |
| generation_config={ | |
| 'temperature': 0.7, # Slightly creative but still focused | |
| 'top_p': 0.9, # Diverse output but not too random | |
| 'top_k': 40, # Reasonable range of tokens to consider | |
| 'max_output_tokens': 2048 # Allow longer responses | |
| } | |
| ) | |
| # Configure Gemini vision model for image processing | |
| # Using gemini-1.5-flash as it's generally available and supports vision | |
| vision_model = genai.GenerativeModel('gemini-1.5-flash') | |
| else: | |
| logger.error("Cannot initialize Gemini models: API Key is missing.") | |
| def index(): | |
| """Render the chat interface.""" | |
| if not api_key: | |
| return "Erreur: Clé API Gemini manquante. Veuillez configurer GEMINI_API_KEY.", 500 | |
| return render_template('index.html') | |
| def chat(): | |
| """Process chat messages and get responses from Gemini API.""" | |
| if not api_key: | |
| logger.error("Chat request failed: API Key is missing.") | |
| return jsonify({'error': 'Configuration serveur incomplète (clé API manquante).'}), 500 | |
| if not model or not vision_model: | |
| logger.error("Chat request failed: Models not initialized.") | |
| return jsonify({'error': 'Configuration serveur incomplète (modèles non initialisés).'}), 500 | |
| try: | |
| data = request.json | |
| user_message = data.get('message', '') | |
| chat_history = data.get('history', []) | |
| image_data = data.get('image', None) # Expecting a single image base64 string for now | |
| if not user_message and not image_data: | |
| return jsonify({'error': 'Veuillez entrer un message ou joindre une image.'}), 400 | |
| # Log the incoming request (but not full chat history for privacy) | |
| session_id = session.get('session_id') | |
| logger.info(f"Received chat request from session {session_id}. Message length: {len(user_message)}. Image attached: {'Yes' if image_data else 'No'}") | |
| # Handle image processing if an image is included | |
| if image_data: | |
| if not vision_model: | |
| logger.error("Vision model not available.") | |
| return jsonify({'error': 'Le modèle de vision n\'est pas configuré.'}), 500 | |
| try: | |
| # Decode image data | |
| # Assuming image_data is "data:image/jpeg;base64,..." | |
| image_info, image_base64 = image_data.split(',', 1) | |
| mime_type = image_info.split(':')[1].split(';')[0] # Extract mime type like "image/jpeg" | |
| image_bytes = base64.b64decode(image_base64) # Get raw bytes | |
| # Create the image part as a dictionary (CORRECTED METHOD) | |
| image_part = { | |
| "mime_type": mime_type, | |
| "data": image_bytes | |
| } | |
| # --- Save image (optional but good practice) --- | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| os.makedirs(session_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Try to get extension from mime type | |
| extension = mime_type.split('/')[-1] if '/' in mime_type else 'jpg' | |
| filename = secure_filename(f"image_{timestamp}.{extension}") | |
| filepath = os.path.join(session_dir, filename) | |
| with open(filepath, "wb") as f: | |
| f.write(image_bytes) | |
| logger.info(f"Saved uploaded image to {filepath}") | |
| # --- End Save image --- | |
| # Create message parts list for generate_content (CORRECTED METHOD) | |
| parts = [] | |
| if user_message: # Add text first if it exists | |
| parts.append(user_message) | |
| parts.append(image_part) # Add the image dictionary | |
| # Generate response using vision model | |
| logger.debug(f"Sending parts to vision model: {[type(p) if not isinstance(p, dict) else 'dict(image)' for p in parts]}") | |
| response = vision_model.generate_content(parts) | |
| logger.info(f"Generated vision response successfully. Response length: {len(response.text)}") | |
| return jsonify({'response': response.text}) | |
| except (ValueError, IndexError) as decode_error: | |
| logger.error(f"Error decoding image data: {str(decode_error)}") | |
| return jsonify({'error': 'Format de données d\'image invalide.'}), 400 | |
| except Exception as img_error: | |
| # Log the full traceback for better debugging | |
| logger.exception(f"Error processing image: {str(img_error)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors du traitement de l\'image. Veuillez réessayer.' | |
| }), 500 | |
| else: | |
| # Text-only processing | |
| if not model: | |
| logger.error("Text model not available.") | |
| return jsonify({'error': 'Le modèle de texte n\'est pas configuré.'}), 500 | |
| # Format conversation history for context | |
| formatted_history = [] | |
| for msg in chat_history[-15:]: # Use the last 15 messages for more context | |
| role = "user" if msg['sender'] == 'user' else "model" | |
| # Ensure message text is not None or empty before adding | |
| if msg.get('text'): | |
| formatted_history.append({"role": role, "parts": [msg['text']]}) | |
| # Note: History currently doesn't include images sent previously. | |
| # Handling multimodal history requires storing image references/data | |
| # and formatting them correctly for the API on subsequent turns. | |
| try: | |
| # Create a chat session with history | |
| chat_session = model.start_chat(history=formatted_history) | |
| # Generate response | |
| response = chat_session.send_message(user_message) | |
| # Log successful response | |
| logger.info(f"Generated text response successfully. Response length: {len(response.text)}") | |
| # Return the response | |
| return jsonify({'response': response.text}) | |
| except genai.types.generation_types.BlockedPromptException as be: | |
| logger.warning(f"Content blocked for session {session_id}: {str(be)}") | |
| return jsonify({ | |
| 'error': 'Votre message ou la conversation contient du contenu potentiellement inapproprié et ne peut pas être traité.' | |
| }), 400 | |
| except Exception as e: | |
| logger.exception(f"Error during text generation for session {session_id}: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors de la génération de la réponse texte.' | |
| }), 500 | |
| except Exception as e: | |
| # Catch-all for unexpected errors (like issues reading request JSON) | |
| logger.exception(f"Unhandled error in chat endpoint: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, j\'ai rencontré une erreur inattendue. Veuillez réessayer.' | |
| }), 500 | |
| def save_chat(): | |
| """Save the current chat history.""" | |
| try: | |
| session_id = session.get('session_id') | |
| if not session_id: | |
| return jsonify({'error': 'Session introuvable.'}), 400 | |
| # Create session-specific directory | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| os.makedirs(session_dir, exist_ok=True) | |
| data = request.json | |
| chat_history = data.get('history', []) | |
| if not chat_history: | |
| return jsonify({'error': 'Aucune conversation à sauvegarder.'}), 400 | |
| # Generate filename with timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"chat_{timestamp}.json" | |
| filepath = os.path.join(session_dir, filename) | |
| # Save chat history to file | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(chat_history, f, ensure_ascii=False, indent=2) | |
| logger.info(f"Chat history saved for session {session_id} to {filename}") | |
| return jsonify({'success': True, 'filename': filename, 'timestamp': timestamp}) # Return timestamp too | |
| except Exception as e: | |
| logger.exception(f"Error saving chat for session {session_id}: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors de la sauvegarde de la conversation.' | |
| }), 500 | |
| def load_chats(): | |
| """Get a list of saved chat files for current session.""" | |
| try: | |
| session_id = session.get('session_id') | |
| if not session_id: | |
| return jsonify({'error': 'Session introuvable.'}), 400 | |
| # Get session-specific directory | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| # If the directory doesn't exist yet, return empty list | |
| if not os.path.exists(session_dir): | |
| logger.info(f"No chat directory found for session {session_id}") | |
| return jsonify({'chats': []}) | |
| chat_files = [] | |
| for filename in os.listdir(session_dir): | |
| # Ensure we only list chat files, not uploaded images etc. | |
| if filename.startswith('chat_') and filename.endswith('.json'): | |
| try: | |
| # Extract timestamp from filename 'chat_YYYYMMDD_HHMMSS.json' | |
| timestamp_str = filename[5:-5] # Remove 'chat_' and '.json' | |
| # Validate timestamp format (optional but good) | |
| datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") | |
| chat_files.append({ | |
| 'filename': filename, | |
| 'timestamp': timestamp_str # Keep original string for sorting/display | |
| }) | |
| except ValueError: | |
| logger.warning(f"Skipping file with unexpected format: {filename} in {session_dir}") | |
| # Sort by timestamp string (lexicographical sort works for YYYYMMDD_HHMMSS) | |
| chat_files.sort(key=lambda x: x['timestamp'], reverse=True) | |
| logger.info(f"Loaded {len(chat_files)} chats for session {session_id}") | |
| return jsonify({'chats': chat_files}) | |
| except Exception as e: | |
| logger.exception(f"Error loading chat list for session {session_id}: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors du chargement des conversations.' | |
| }), 500 | |
| def load_chat(filename): | |
| """Load a specific chat history file.""" | |
| try: | |
| session_id = session.get('session_id') | |
| if not session_id: | |
| return jsonify({'error': 'Session introuvable.'}), 400 | |
| # Secure the filename before using it | |
| safe_filename = secure_filename(filename) | |
| if not safe_filename.startswith('chat_') or not safe_filename.endswith('.json'): | |
| logger.warning(f"Attempt to load invalid chat filename: {filename} (secured: {safe_filename}) for session {session_id}") | |
| return jsonify({'error': 'Nom de fichier de conversation invalide.'}), 400 | |
| # Load from session-specific directory | |
| session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
| filepath = os.path.join(session_dir, safe_filename) | |
| if not os.path.exists(filepath): | |
| logger.warning(f"Chat file not found: {filepath} for session {session_id}") | |
| return jsonify({'error': 'Conversation introuvable.'}), 404 | |
| # Check if path is still within the intended directory (security measure) | |
| if not os.path.abspath(filepath).startswith(os.path.abspath(session_dir)): | |
| logger.error(f"Attempt to access file outside session directory: {filepath}") | |
| return jsonify({'error': 'Accès non autorisé.'}), 403 | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| chat_history = json.load(f) | |
| # Basic validation of loaded history format (optional) | |
| if not isinstance(chat_history, list): | |
| raise ValueError("Invalid chat history format in file.") | |
| for item in chat_history: | |
| if not isinstance(item, dict) or 'sender' not in item or 'text' not in item: | |
| # Allow for messages that might only have text or image data later | |
| if 'sender' not in item: | |
| raise ValueError("Invalid message format in chat history.") | |
| logger.info(f"Loaded chat {safe_filename} for session {session_id}") | |
| return jsonify({'history': chat_history}) | |
| except json.JSONDecodeError: | |
| logger.error(f"Error decoding JSON from chat file: {safe_filename} for session {session_id}") | |
| return jsonify({'error': 'Le fichier de conversation est corrompu.'}), 500 | |
| except ValueError as ve: | |
| logger.error(f"Invalid content in chat file {safe_filename}: {str(ve)}") | |
| return jsonify({'error': f'Format invalide dans le fichier de conversation: {str(ve)}'}), 500 | |
| except Exception as e: | |
| logger.exception(f"Error loading chat file {safe_filename} for session {session_id}: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Désolé, une erreur est survenue lors du chargement de la conversation.' | |
| }), 500 | |
| if __name__ == '__main__': | |
| # Use 0.0.0.0 to be accessible on the network, debug=False for production | |
| # Port 8080 is often used as an alternative to 5000 | |
| app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)), debug=os.environ.get('FLASK_DEBUG', 'False').lower() == 'true') | |
| # --- END OF FILE GeminiChatbot/app.py --- |