from flask import Flask, render_template, request, jsonify, session, send_file from werkzeug.utils import secure_filename import os from google import genai from google.genai import types import io from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch from reportlab.lib.colors import navy, black from datetime import datetime import secrets import re import sqlite3 import threading import time import uuid app = Flask(__name__) app.secret_key = secrets.token_hex(16) app.config['UPLOAD_FOLDER'] = 'uploads' app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB max app.config['DATABASE'] = 'tasks.db' # Créer les dossiers nécessaires os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # Configuration Gemini GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY', 'YOUR_API_KEY_HERE') client = genai.Client(api_key=GEMINI_API_KEY) MAX_VIDEO_TOKENS = 600000 ALLOWED_EXTENSIONS = { 'pdf': 'application/pdf', 'mp3': 'audio/mp3', 'mp4': 'video/mp4', 'wav': 'audio/wav', 'txt': 'text/plain', 'doc': 'application/msword', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' } SUMMARY_TYPES = { 'court': 'Fais un résumé très court (2-3 paragraphes maximum) des points clés essentiels en français.', 'moyen': 'Fais un résumé détaillé structuré avec les points principaux et sous-points importants en français.', 'detaille': 'Fais un résumé exhaustif et détaillé avec tous les points importants, citations clés et analyse approfondie en français' } # ======================= # Base de données # ======================= def init_db(): """Initialise la base de données""" conn = sqlite3.connect(app.config['DATABASE']) c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS tasks ( task_id TEXT PRIMARY KEY, user_session TEXT, filename TEXT, summary_type TEXT, status TEXT, progress INTEGER, summary TEXT, error TEXT, created_at TEXT, completed_at TEXT, source_type TEXT, source_path TEXT ) ''') conn.commit() conn.close() def get_db_connection(): """Retourne une connexion à la base de données""" conn = sqlite3.connect(app.config['DATABASE']) conn.row_factory = sqlite3.Row return conn def create_task(user_session, filename, summary_type, source_type, source_path): """Crée une nouvelle tâche dans la base de données""" task_id = str(uuid.uuid4()) conn = get_db_connection() conn.execute(''' INSERT INTO tasks (task_id, user_session, filename, summary_type, status, progress, created_at, source_type, source_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (task_id, user_session, filename, summary_type, 'pending', 0, datetime.now().isoformat(), source_type, source_path)) conn.commit() conn.close() return task_id def update_task_status(task_id, status, progress=None, summary=None, error=None): """Met à jour le statut d'une tâche""" conn = get_db_connection() if status == 'completed' or status == 'failed': if progress is None: progress = 100 if status == 'completed' else 0 conn.execute(''' UPDATE tasks SET status = ?, progress = ?, summary = ?, error = ?, completed_at = ? WHERE task_id = ? ''', (status, progress, summary, error, datetime.now().isoformat(), task_id)) else: query = 'UPDATE tasks SET status = ?' params = [status] if progress is not None: query += ', progress = ?' params.append(progress) query += ' WHERE task_id = ?' params.append(task_id) conn.execute(query, params) conn.commit() conn.close() def get_task(task_id): """Récupère une tâche par son ID""" conn = get_db_connection() task = conn.execute('SELECT * FROM tasks WHERE task_id = ?', (task_id,)).fetchone() conn.close() return task def get_user_tasks(user_session): """Récupère toutes les tâches d'un utilisateur""" conn = get_db_connection() tasks = conn.execute( 'SELECT * FROM tasks WHERE user_session = ? ORDER BY created_at DESC', (user_session,) ).fetchall() conn.close() return tasks def delete_old_tasks(): """Supprime les tâches de plus de 7 jours""" conn = get_db_connection() seven_days_ago = datetime.now().timestamp() - (7 * 24 * 60 * 60) conn.execute(''' DELETE FROM tasks WHERE datetime(created_at) < datetime(?, 'unixepoch') ''', (seven_days_ago,)) conn.commit() conn.close() # ======================= # Fonctions utilitaires # ======================= def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def get_mime_type(filename): ext = filename.rsplit('.', 1)[1].lower() return ALLOWED_EXTENSIONS.get(ext, 'application/octet-stream') def is_youtube_url(url): """Vérifie si l'URL est une URL YouTube valide""" youtube_regex = r'(https?://)?(www\.)?(youtube\.com/watch\?v=|youtu\.be/)[\w-]+' return bool(re.match(youtube_regex, url)) def get_session_id(): """Récupère ou crée un ID de session""" if 'session_id' not in session: session['session_id'] = str(uuid.uuid4()) return session['session_id'] # ======================= # Traitement en arrière-plan # ======================= def process_file_background(task_id, file_path, filename, summary_type): """Traite un fichier en arrière-plan""" try: update_task_status(task_id, 'processing', progress=10) mime_type = get_mime_type(filename) prompt = SUMMARY_TYPES.get(summary_type, SUMMARY_TYPES['moyen']) update_task_status(task_id, 'processing', progress=30) # Upload le fichier uploaded_file = client.files.upload(file=file_path) update_task_status(task_id, 'processing', progress=60) # Génère le résumé response = client.models.generate_content( model="gemini-2.5-flash", contents=[uploaded_file, prompt] ) summary = response.text update_task_status(task_id, 'completed', progress=100, summary=summary) except Exception as e: update_task_status(task_id, 'failed', error=str(e)) finally: # Nettoie le fichier if os.path.exists(file_path): try: os.remove(file_path) except: pass def process_youtube_background(task_id, youtube_url, summary_type): """Traite une vidéo YouTube en arrière-plan""" try: update_task_status(task_id, 'processing', progress=10) prompt = SUMMARY_TYPES.get(summary_type, SUMMARY_TYPES['moyen']) update_task_status(task_id, 'processing', progress=30) # Vérifier les informations du modèle model_info = client.models.get(model="gemini-2.5-flash") context_window = model_info.input_token_limit update_task_status(task_id, 'processing', progress=50) # Générer le contenu response = client.models.generate_content( model='gemini-2.5-flash', contents=types.Content( parts=[ types.Part( file_data=types.FileData(file_uri=youtube_url) ), types.Part(text=prompt) ] ) ) summary = response.text update_task_status(task_id, 'completed', progress=100, summary=summary) except Exception as e: error_msg = str(e) if 'token' in error_msg.lower() or 'too large' in error_msg.lower(): error_msg = f"La vidéo est trop longue (dépasse la limite de {MAX_VIDEO_TOKENS:,} tokens). Veuillez utiliser une vidéo plus courte." update_task_status(task_id, 'failed', error=error_msg) # ======================= # Fonction PDF # ======================= def create_pdf(summary_text, original_filename, summary_type): """ Crée un PDF du résumé avec un formatage amélioré qui interprète la structure (titres, sous-titres, listes) du texte avec support markdown. """ buffer = io.BytesIO() # Fonction pour ajouter le pied de page avec pagination def add_page_number(canvas, doc): canvas.saveState() page_num = canvas.getPageNumber() text = f"Page {page_num}" canvas.setFont('Helvetica', 9) canvas.setFillColor(black) canvas.drawRightString(letter[0] - inch, 0.75 * inch, text) # Ajouter le nom du fichier en bas à gauche canvas.drawString(inch, 0.75 * inch, clean_filename[:50]) canvas.restoreState() doc = SimpleDocTemplate(buffer, pagesize=letter, leftMargin=inch, rightMargin=inch, topMargin=inch, bottomMargin=inch) # 1. Définir des styles de paragraphes personnalisés styles = getSampleStyleSheet() # Check if styles already exist before adding them if 'MainTitle' not in styles: styles.add(ParagraphStyle( name='MainTitle', parent=styles['h1'], fontSize=20, spaceAfter=20, textColor=navy, fontName='Helvetica-Bold', alignment=1 # Centré )) if 'SectionTitle' not in styles: styles.add(ParagraphStyle( name='SectionTitle', parent=styles['h2'], fontSize=14, spaceAfter=12, spaceBefore=12, textColor=navy, fontName='Helvetica-Bold' )) if 'SubSectionTitle' not in styles: styles.add(ParagraphStyle( name='SubSectionTitle', parent=styles['Normal'], fontSize=12, spaceAfter=8, spaceBefore=8, fontName='Helvetica-Bold' )) if 'BodyText' not in styles: styles.add(ParagraphStyle( name='BodyText', parent=styles['Normal'], spaceAfter=10, leading=16, alignment=4, # Justifié fontSize=11 )) if 'ListItem' not in styles: styles.add(ParagraphStyle( name='ListItem', parent=styles['Normal'], leftIndent=20, spaceAfter=6, leading=15, fontSize=11 )) if 'BulletItem' not in styles: styles.add(ParagraphStyle( name='BulletItem', parent=styles['Normal'], leftIndent=25, spaceAfter=6, leading=15, fontSize=11, bulletIndent=10 )) if 'MetaInfo' not in styles: styles.add(ParagraphStyle( name='MetaInfo', parent=styles['Normal'], fontSize=10, textColor=black, spaceAfter=5, alignment=1 # Centré )) story = [] # 2. Nettoyer le nom du fichier pour éviter les doubles extensions ".pdf.pdf" if original_filename and original_filename.lower().endswith('.pdf'): clean_filename = original_filename[:-4] else: clean_filename = original_filename or "Document" # 3. Ajouter le titre principal et les métadonnées du document title = Paragraph(f"Résumé : {clean_filename}", styles['MainTitle']) story.append(title) info_text = f"Type de résumé : {summary_type.capitalize()} | Date : {datetime.now().strftime('%d/%m/%Y %H:%M')}" info = Paragraph(info_text, styles['MetaInfo']) story.append(info) story.append(Spacer(1, 0.4 * inch)) # 4. Fonction pour convertir le markdown en HTML ReportLab def convert_markdown_to_html(text): """Convertit le markdown simple en HTML pour ReportLab""" # Échapper les caractères XML spéciaux d'abord text = text.replace('&', '&').replace('<', '<').replace('>', '>') # Convertir ***texte*** en gras+italique text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text) # Convertir **texte** en gras text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) # Convertir *texte* en italique text = re.sub(r'\*(.+?)\*', r'\1', text) return text # 5. Parcourir le résumé ligne par ligne et appliquer le style approprié lines = summary_text.split('\n') for i, line in enumerate(lines): stripped_line = line.strip() # Ignorer les lignes vides if not stripped_line: # Ajouter un petit espacement pour les lignes vides entre paragraphes if i > 0 and i < len(lines) - 1: story.append(Spacer(1, 0.1 * inch)) continue # Gérer les lignes de séparation (---, ___, ***) if stripped_line in ['---', '___', '***', '---', '___']: story.append(Spacer(1, 0.15 * inch)) continue # Détecter les titres de section : # Titre ou ## Titre match_header = re.match(r'^(#{1,3})\s+(.+)$', stripped_line) if match_header: level = len(match_header.group(1)) content = convert_markdown_to_html(match_header.group(2)) if level == 1: p = Paragraph(content, styles['SectionTitle']) else: p = Paragraph(content, styles['SubSectionTitle']) story.append(p) continue # Détecter les titres en gras seul : **Titre Complet** match_bold_title = re.match(r'^\*\*([^*]+)\*\*$', stripped_line) if match_bold_title and len(match_bold_title.group(1)) < 100: content = match_bold_title.group(1).strip() # Si c'est court, c'est probablement un titre if len(content.split()) < 15: p = Paragraph(content, styles['SectionTitle']) story.append(p) continue # Détecter les listes à puces : * item ou - item match_bullet = re.match(r'^[\*\-]\s+(.+)$', stripped_line) if match_bullet: content = convert_markdown_to_html(match_bullet.group(1)) p = Paragraph(f"• {content}", styles['BulletItem']) story.append(p) continue # Détecter les listes numérotées : 1. item ou 1) item match_numbered = re.match(r'^(\d+)[\.\)]\s+(.+)$', stripped_line) if match_numbered: num = match_numbered.group(1) content = convert_markdown_to_html(match_numbered.group(2)) p = Paragraph(f"{num}. {content}", styles['ListItem']) story.append(p) continue # Pour tout le reste, utiliser le style de corps de texte avec conversion markdown content = convert_markdown_to_html(stripped_line) p = Paragraph(content, styles['BodyText']) story.append(p) # Construire le document PDF avec pagination doc.build(story, onFirstPage=add_page_number, onLaterPages=add_page_number) # Remettre le curseur au début du buffer pour la lecture buffer.seek(0) return buffer # ======================= # Routes # ======================= @app.route('/') def index(): return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_file(): user_session = get_session_id() youtube_url = request.form.get('youtube_url', '').strip() summary_type = request.form.get('summary_type', 'moyen') # Traitement YouTube if youtube_url: if not is_youtube_url(youtube_url): return jsonify({'error': 'URL YouTube invalide'}), 400 # Créer la tâche task_id = create_task( user_session=user_session, filename='Vidéo YouTube', summary_type=summary_type, source_type='youtube', source_path=youtube_url ) # Lancer le traitement en arrière-plan thread = threading.Thread( target=process_youtube_background, args=(task_id, youtube_url, summary_type) ) thread.daemon = True thread.start() return jsonify({ 'success': True, 'task_id': task_id, 'message': 'Traitement en cours...' }) # Traitement fichier if 'file' not in request.files: return jsonify({'error': 'Aucun fichier ou URL YouTube fourni'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'Nom de fichier vide'}), 400 if not allowed_file(file.filename): return jsonify({'error': 'Type de fichier non supporté'}), 400 filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{uuid.uuid4()}_{filename}") file.save(filepath) # Créer la tâche task_id = create_task( user_session=user_session, filename=filename, summary_type=summary_type, source_type='file', source_path=filepath ) # Lancer le traitement en arrière-plan thread = threading.Thread( target=process_file_background, args=(task_id, filepath, filename, summary_type) ) thread.daemon = True thread.start() return jsonify({ 'success': True, 'task_id': task_id, 'message': 'Traitement en cours...' }) @app.route('/task/') def get_task_status(task_id): """Récupère le statut d'une tâche""" task = get_task(task_id) if not task: return jsonify({'error': 'Tâche non trouvée'}), 404 return jsonify({ 'task_id': task['task_id'], 'filename': task['filename'], 'summary_type': task['summary_type'], 'status': task['status'], 'progress': task['progress'], 'summary': task['summary'], 'error': task['error'], 'created_at': task['created_at'], 'completed_at': task['completed_at'] }) @app.route('/tasks') def get_all_tasks(): """Récupère toutes les tâches de l'utilisateur""" user_session = get_session_id() tasks = get_user_tasks(user_session) task_list = [] for task in tasks: task_list.append({ 'task_id': task['task_id'], 'filename': task['filename'], 'summary_type': task['summary_type'], 'status': task['status'], 'progress': task['progress'], 'summary': task['summary'], 'error': task['error'], 'created_at': task['created_at'], 'completed_at': task['completed_at'] }) return jsonify(task_list) @app.route('/download/') def download_pdf(task_id): """Télécharge le PDF d'une tâche""" task = get_task(task_id) if not task: return jsonify({'error': 'Tâche non trouvée'}), 404 if task['status'] != 'completed': return jsonify({'error': 'Résumé non encore disponible'}), 400 pdf_buffer = create_pdf(task['summary'], task['filename'], task['summary_type']) return send_file( pdf_buffer, mimetype='application/pdf', as_attachment=True, download_name=f"resume_{task['filename']}.pdf" ) @app.route('/delete-task/', methods=['DELETE']) def delete_task(task_id): """Supprime une tâche""" user_session = get_session_id() task = get_task(task_id) if not task: return jsonify({'error': 'Tâche non trouvée'}), 404 if task['user_session'] != user_session: return jsonify({'error': 'Non autorisé'}), 403 conn = get_db_connection() conn.execute('DELETE FROM tasks WHERE task_id = ?', (task_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/clear-tasks', methods=['POST']) def clear_all_tasks(): """Supprime toutes les tâches de l'utilisateur""" user_session = get_session_id() conn = get_db_connection() conn.execute('DELETE FROM tasks WHERE user_session = ?', (user_session,)) conn.commit() conn.close() return jsonify({'success': True}) # Ajouter cette route dans votre fichier app.py # Ajouter cette route dans votre fichier app.py @app.route('/stats') def system_stats(): """Affiche les statistiques du système""" conn = get_db_connection() # Nombre total de résumés total = conn.execute('SELECT COUNT(*) as count FROM tasks').fetchone()['count'] # Nombre de résumés réussis completed = conn.execute( 'SELECT COUNT(*) as count FROM tasks WHERE status = ?', ('completed',) ).fetchone()['count'] # Nombre de résumés en cours processing = conn.execute( 'SELECT COUNT(*) as count FROM tasks WHERE status IN (?, ?)', ('pending', 'processing') ).fetchone()['count'] # Nombre de résumés échoués failed = conn.execute( 'SELECT COUNT(*) as count FROM tasks WHERE status = ?', ('failed',) ).fetchone()['count'] # Statistiques par type de résumé by_type = conn.execute(''' SELECT summary_type, COUNT(*) as count FROM tasks WHERE status = 'completed' GROUP BY summary_type ''').fetchall() # Statistiques par source by_source = conn.execute(''' SELECT source_type, COUNT(*) as count FROM tasks WHERE status = 'completed' GROUP BY source_type ''').fetchall() # Récupérer tous les résumés complétés (limité aux 50 derniers) all_summaries = conn.execute(''' SELECT task_id, filename, summary_type, summary, created_at, completed_at, source_type FROM tasks WHERE status = 'completed' ORDER BY completed_at DESC LIMIT 50 ''').fetchall() conn.close() # Créer le HTML minimaliste html = f''' Statistiques Système

Statistiques du Système

Vue d'ensemble

  • Total de résumés : {total}
  • Résumés réussis : {completed}
  • En cours : {processing}
  • Échoués : {failed}

Par type de résumé

    ''' for row in by_type: html += f'
  • {row["summary_type"].capitalize()} : {row["count"]}
  • \n' if not by_type: html += '
  • Aucune donnée
  • \n' html += '''

Par source

    ''' for row in by_source: source_name = 'Fichier' if row['source_type'] == 'file' else 'YouTube' html += f'
  • {source_name} : {row["count"]}
  • \n' if not by_source: html += '
  • Aucune donnée
  • \n' html += '''

Résumés des utilisateurs (50 derniers)

''' if not all_summaries: html += '

Aucun résumé disponible

\n' else: for i, summary in enumerate(all_summaries, 1): source_name = 'Fichier' if summary['source_type'] == 'file' else 'YouTube' created = summary['created_at'][:19] if summary['created_at'] else 'N/A' completed = summary['completed_at'][:19] if summary['completed_at'] else 'N/A' html += f'''

Résumé #{i}

  • Fichier : {summary["filename"]}
  • Type : {summary["summary_type"].capitalize()}
  • Source : {source_name}
  • Créé le : {created}
  • Complété le : {completed}

Résumé :

{summary["summary"]}

''' html += '''

Retour à l'accueil

''' return html # ======================= # Initialisation # ======================= # Initialiser la base de données au démarrage with app.app_context(): init_db() # Nettoyage périodique des anciennes tâches def cleanup_old_tasks(): while True: time.sleep(86400) # Toutes les 24 heures delete_old_tasks() cleanup_thread = threading.Thread(target=cleanup_old_tasks) cleanup_thread.daemon = True cleanup_thread.start() if __name__ == '__main__': app.run(debug=True)