from flask import Flask, render_template, request, jsonify, session, send_file
from werkzeug.utils import secure_filename
import os
from google import genai
from google.genai import types
import io
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib.colors import navy, black
from datetime import datetime
import secrets
import re
import sqlite3
import threading
import time
import uuid
app = Flask(__name__)
app.secret_key = secrets.token_hex(16)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB max
app.config['DATABASE'] = 'tasks.db'
# Créer les dossiers nécessaires
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# Configuration Gemini
GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY', 'YOUR_API_KEY_HERE')
client = genai.Client(api_key=GEMINI_API_KEY)
MAX_VIDEO_TOKENS = 600000
ALLOWED_EXTENSIONS = {
'pdf': 'application/pdf',
'mp3': 'audio/mp3',
'mp4': 'video/mp4',
'wav': 'audio/wav',
'txt': 'text/plain',
'doc': 'application/msword',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
}
SUMMARY_TYPES = {
'court': 'Fais un résumé très court (2-3 paragraphes maximum) des points clés essentiels en français.',
'moyen': 'Fais un résumé détaillé structuré avec les points principaux et sous-points importants en français.',
'detaille': 'Fais un résumé exhaustif et détaillé avec tous les points importants, citations clés et analyse approfondie en français'
}
# =======================
# Base de données
# =======================
def init_db():
"""Initialise la base de données"""
conn = sqlite3.connect(app.config['DATABASE'])
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
user_session TEXT,
filename TEXT,
summary_type TEXT,
status TEXT,
progress INTEGER,
summary TEXT,
error TEXT,
created_at TEXT,
completed_at TEXT,
source_type TEXT,
source_path TEXT
)
''')
conn.commit()
conn.close()
def get_db_connection():
"""Retourne une connexion à la base de données"""
conn = sqlite3.connect(app.config['DATABASE'])
conn.row_factory = sqlite3.Row
return conn
def create_task(user_session, filename, summary_type, source_type, source_path):
"""Crée une nouvelle tâche dans la base de données"""
task_id = str(uuid.uuid4())
conn = get_db_connection()
conn.execute('''
INSERT INTO tasks (task_id, user_session, filename, summary_type, status,
progress, created_at, source_type, source_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (task_id, user_session, filename, summary_type, 'pending', 0,
datetime.now().isoformat(), source_type, source_path))
conn.commit()
conn.close()
return task_id
def update_task_status(task_id, status, progress=None, summary=None, error=None):
"""Met à jour le statut d'une tâche"""
conn = get_db_connection()
if status == 'completed' or status == 'failed':
if progress is None:
progress = 100 if status == 'completed' else 0
conn.execute('''
UPDATE tasks
SET status = ?, progress = ?, summary = ?, error = ?, completed_at = ?
WHERE task_id = ?
''', (status, progress, summary, error, datetime.now().isoformat(), task_id))
else:
query = 'UPDATE tasks SET status = ?'
params = [status]
if progress is not None:
query += ', progress = ?'
params.append(progress)
query += ' WHERE task_id = ?'
params.append(task_id)
conn.execute(query, params)
conn.commit()
conn.close()
def get_task(task_id):
"""Récupère une tâche par son ID"""
conn = get_db_connection()
task = conn.execute('SELECT * FROM tasks WHERE task_id = ?', (task_id,)).fetchone()
conn.close()
return task
def get_user_tasks(user_session):
"""Récupère toutes les tâches d'un utilisateur"""
conn = get_db_connection()
tasks = conn.execute(
'SELECT * FROM tasks WHERE user_session = ? ORDER BY created_at DESC',
(user_session,)
).fetchall()
conn.close()
return tasks
def delete_old_tasks():
"""Supprime les tâches de plus de 7 jours"""
conn = get_db_connection()
seven_days_ago = datetime.now().timestamp() - (7 * 24 * 60 * 60)
conn.execute('''
DELETE FROM tasks
WHERE datetime(created_at) < datetime(?, 'unixepoch')
''', (seven_days_ago,))
conn.commit()
conn.close()
# =======================
# Fonctions utilitaires
# =======================
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_mime_type(filename):
ext = filename.rsplit('.', 1)[1].lower()
return ALLOWED_EXTENSIONS.get(ext, 'application/octet-stream')
def is_youtube_url(url):
"""Vérifie si l'URL est une URL YouTube valide"""
youtube_regex = r'(https?://)?(www\.)?(youtube\.com/watch\?v=|youtu\.be/)[\w-]+'
return bool(re.match(youtube_regex, url))
def get_session_id():
"""Récupère ou crée un ID de session"""
if 'session_id' not in session:
session['session_id'] = str(uuid.uuid4())
return session['session_id']
# =======================
# Traitement en arrière-plan
# =======================
def process_file_background(task_id, file_path, filename, summary_type):
"""Traite un fichier en arrière-plan"""
try:
update_task_status(task_id, 'processing', progress=10)
mime_type = get_mime_type(filename)
prompt = SUMMARY_TYPES.get(summary_type, SUMMARY_TYPES['moyen'])
update_task_status(task_id, 'processing', progress=30)
# Upload le fichier
uploaded_file = client.files.upload(file=file_path)
update_task_status(task_id, 'processing', progress=60)
# Génère le résumé
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=[uploaded_file, prompt]
)
summary = response.text
update_task_status(task_id, 'completed', progress=100, summary=summary)
except Exception as e:
update_task_status(task_id, 'failed', error=str(e))
finally:
# Nettoie le fichier
if os.path.exists(file_path):
try:
os.remove(file_path)
except:
pass
def process_youtube_background(task_id, youtube_url, summary_type):
"""Traite une vidéo YouTube en arrière-plan"""
try:
update_task_status(task_id, 'processing', progress=10)
prompt = SUMMARY_TYPES.get(summary_type, SUMMARY_TYPES['moyen'])
update_task_status(task_id, 'processing', progress=30)
# Vérifier les informations du modèle
model_info = client.models.get(model="gemini-2.5-flash")
context_window = model_info.input_token_limit
update_task_status(task_id, 'processing', progress=50)
# Générer le contenu
response = client.models.generate_content(
model='gemini-2.5-flash',
contents=types.Content(
parts=[
types.Part(
file_data=types.FileData(file_uri=youtube_url)
),
types.Part(text=prompt)
]
)
)
summary = response.text
update_task_status(task_id, 'completed', progress=100, summary=summary)
except Exception as e:
error_msg = str(e)
if 'token' in error_msg.lower() or 'too large' in error_msg.lower():
error_msg = f"La vidéo est trop longue (dépasse la limite de {MAX_VIDEO_TOKENS:,} tokens). Veuillez utiliser une vidéo plus courte."
update_task_status(task_id, 'failed', error=error_msg)
# =======================
# Fonction PDF
# =======================
def create_pdf(summary_text, original_filename, summary_type):
"""
Crée un PDF du résumé avec un formatage amélioré qui interprète
la structure (titres, sous-titres, listes) du texte avec support markdown.
"""
buffer = io.BytesIO()
# Fonction pour ajouter le pied de page avec pagination
def add_page_number(canvas, doc):
canvas.saveState()
page_num = canvas.getPageNumber()
text = f"Page {page_num}"
canvas.setFont('Helvetica', 9)
canvas.setFillColor(black)
canvas.drawRightString(letter[0] - inch, 0.75 * inch, text)
# Ajouter le nom du fichier en bas à gauche
canvas.drawString(inch, 0.75 * inch, clean_filename[:50])
canvas.restoreState()
doc = SimpleDocTemplate(buffer, pagesize=letter,
leftMargin=inch, rightMargin=inch,
topMargin=inch, bottomMargin=inch)
# 1. Définir des styles de paragraphes personnalisés
styles = getSampleStyleSheet()
# Check if styles already exist before adding them
if 'MainTitle' not in styles:
styles.add(ParagraphStyle(
name='MainTitle',
parent=styles['h1'],
fontSize=20,
spaceAfter=20,
textColor=navy,
fontName='Helvetica-Bold',
alignment=1 # Centré
))
if 'SectionTitle' not in styles:
styles.add(ParagraphStyle(
name='SectionTitle',
parent=styles['h2'],
fontSize=14,
spaceAfter=12,
spaceBefore=12,
textColor=navy,
fontName='Helvetica-Bold'
))
if 'SubSectionTitle' not in styles:
styles.add(ParagraphStyle(
name='SubSectionTitle',
parent=styles['Normal'],
fontSize=12,
spaceAfter=8,
spaceBefore=8,
fontName='Helvetica-Bold'
))
if 'BodyText' not in styles:
styles.add(ParagraphStyle(
name='BodyText',
parent=styles['Normal'],
spaceAfter=10,
leading=16,
alignment=4, # Justifié
fontSize=11
))
if 'ListItem' not in styles:
styles.add(ParagraphStyle(
name='ListItem',
parent=styles['Normal'],
leftIndent=20,
spaceAfter=6,
leading=15,
fontSize=11
))
if 'BulletItem' not in styles:
styles.add(ParagraphStyle(
name='BulletItem',
parent=styles['Normal'],
leftIndent=25,
spaceAfter=6,
leading=15,
fontSize=11,
bulletIndent=10
))
if 'MetaInfo' not in styles:
styles.add(ParagraphStyle(
name='MetaInfo',
parent=styles['Normal'],
fontSize=10,
textColor=black,
spaceAfter=5,
alignment=1 # Centré
))
story = []
# 2. Nettoyer le nom du fichier pour éviter les doubles extensions ".pdf.pdf"
if original_filename and original_filename.lower().endswith('.pdf'):
clean_filename = original_filename[:-4]
else:
clean_filename = original_filename or "Document"
# 3. Ajouter le titre principal et les métadonnées du document
title = Paragraph(f"Résumé : {clean_filename}", styles['MainTitle'])
story.append(title)
info_text = f"Type de résumé : {summary_type.capitalize()} | Date : {datetime.now().strftime('%d/%m/%Y %H:%M')}"
info = Paragraph(info_text, styles['MetaInfo'])
story.append(info)
story.append(Spacer(1, 0.4 * inch))
# 4. Fonction pour convertir le markdown en HTML ReportLab
def convert_markdown_to_html(text):
"""Convertit le markdown simple en HTML pour ReportLab"""
# Échapper les caractères XML spéciaux d'abord
text = text.replace('&', '&').replace('<', '<').replace('>', '>')
# Convertir ***texte*** en gras+italique
text = re.sub(r'\*\*\*(.+?)\*\*\*', r'\1', text)
# Convertir **texte** en gras
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
# Convertir *texte* en italique
text = re.sub(r'\*(.+?)\*', r'\1', text)
return text
# 5. Parcourir le résumé ligne par ligne et appliquer le style approprié
lines = summary_text.split('\n')
for i, line in enumerate(lines):
stripped_line = line.strip()
# Ignorer les lignes vides
if not stripped_line:
# Ajouter un petit espacement pour les lignes vides entre paragraphes
if i > 0 and i < len(lines) - 1:
story.append(Spacer(1, 0.1 * inch))
continue
# Gérer les lignes de séparation (---, ___, ***)
if stripped_line in ['---', '___', '***', '---', '___']:
story.append(Spacer(1, 0.15 * inch))
continue
# Détecter les titres de section : # Titre ou ## Titre
match_header = re.match(r'^(#{1,3})\s+(.+)$', stripped_line)
if match_header:
level = len(match_header.group(1))
content = convert_markdown_to_html(match_header.group(2))
if level == 1:
p = Paragraph(content, styles['SectionTitle'])
else:
p = Paragraph(content, styles['SubSectionTitle'])
story.append(p)
continue
# Détecter les titres en gras seul : **Titre Complet**
match_bold_title = re.match(r'^\*\*([^*]+)\*\*$', stripped_line)
if match_bold_title and len(match_bold_title.group(1)) < 100:
content = match_bold_title.group(1).strip()
# Si c'est court, c'est probablement un titre
if len(content.split()) < 15:
p = Paragraph(content, styles['SectionTitle'])
story.append(p)
continue
# Détecter les listes à puces : * item ou - item
match_bullet = re.match(r'^[\*\-]\s+(.+)$', stripped_line)
if match_bullet:
content = convert_markdown_to_html(match_bullet.group(1))
p = Paragraph(f"• {content}", styles['BulletItem'])
story.append(p)
continue
# Détecter les listes numérotées : 1. item ou 1) item
match_numbered = re.match(r'^(\d+)[\.\)]\s+(.+)$', stripped_line)
if match_numbered:
num = match_numbered.group(1)
content = convert_markdown_to_html(match_numbered.group(2))
p = Paragraph(f"{num}. {content}", styles['ListItem'])
story.append(p)
continue
# Pour tout le reste, utiliser le style de corps de texte avec conversion markdown
content = convert_markdown_to_html(stripped_line)
p = Paragraph(content, styles['BodyText'])
story.append(p)
# Construire le document PDF avec pagination
doc.build(story, onFirstPage=add_page_number, onLaterPages=add_page_number)
# Remettre le curseur au début du buffer pour la lecture
buffer.seek(0)
return buffer
# =======================
# Routes
# =======================
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
user_session = get_session_id()
youtube_url = request.form.get('youtube_url', '').strip()
summary_type = request.form.get('summary_type', 'moyen')
# Traitement YouTube
if youtube_url:
if not is_youtube_url(youtube_url):
return jsonify({'error': 'URL YouTube invalide'}), 400
# Créer la tâche
task_id = create_task(
user_session=user_session,
filename='Vidéo YouTube',
summary_type=summary_type,
source_type='youtube',
source_path=youtube_url
)
# Lancer le traitement en arrière-plan
thread = threading.Thread(
target=process_youtube_background,
args=(task_id, youtube_url, summary_type)
)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'task_id': task_id,
'message': 'Traitement en cours...'
})
# Traitement fichier
if 'file' not in request.files:
return jsonify({'error': 'Aucun fichier ou URL YouTube fourni'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'Nom de fichier vide'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'Type de fichier non supporté'}), 400
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{uuid.uuid4()}_{filename}")
file.save(filepath)
# Créer la tâche
task_id = create_task(
user_session=user_session,
filename=filename,
summary_type=summary_type,
source_type='file',
source_path=filepath
)
# Lancer le traitement en arrière-plan
thread = threading.Thread(
target=process_file_background,
args=(task_id, filepath, filename, summary_type)
)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'task_id': task_id,
'message': 'Traitement en cours...'
})
@app.route('/task/')
def get_task_status(task_id):
"""Récupère le statut d'une tâche"""
task = get_task(task_id)
if not task:
return jsonify({'error': 'Tâche non trouvée'}), 404
return jsonify({
'task_id': task['task_id'],
'filename': task['filename'],
'summary_type': task['summary_type'],
'status': task['status'],
'progress': task['progress'],
'summary': task['summary'],
'error': task['error'],
'created_at': task['created_at'],
'completed_at': task['completed_at']
})
@app.route('/tasks')
def get_all_tasks():
"""Récupère toutes les tâches de l'utilisateur"""
user_session = get_session_id()
tasks = get_user_tasks(user_session)
task_list = []
for task in tasks:
task_list.append({
'task_id': task['task_id'],
'filename': task['filename'],
'summary_type': task['summary_type'],
'status': task['status'],
'progress': task['progress'],
'summary': task['summary'],
'error': task['error'],
'created_at': task['created_at'],
'completed_at': task['completed_at']
})
return jsonify(task_list)
@app.route('/download/')
def download_pdf(task_id):
"""Télécharge le PDF d'une tâche"""
task = get_task(task_id)
if not task:
return jsonify({'error': 'Tâche non trouvée'}), 404
if task['status'] != 'completed':
return jsonify({'error': 'Résumé non encore disponible'}), 400
pdf_buffer = create_pdf(task['summary'], task['filename'], task['summary_type'])
return send_file(
pdf_buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=f"resume_{task['filename']}.pdf"
)
@app.route('/delete-task/', methods=['DELETE'])
def delete_task(task_id):
"""Supprime une tâche"""
user_session = get_session_id()
task = get_task(task_id)
if not task:
return jsonify({'error': 'Tâche non trouvée'}), 404
if task['user_session'] != user_session:
return jsonify({'error': 'Non autorisé'}), 403
conn = get_db_connection()
conn.execute('DELETE FROM tasks WHERE task_id = ?', (task_id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/clear-tasks', methods=['POST'])
def clear_all_tasks():
"""Supprime toutes les tâches de l'utilisateur"""
user_session = get_session_id()
conn = get_db_connection()
conn.execute('DELETE FROM tasks WHERE user_session = ?', (user_session,))
conn.commit()
conn.close()
return jsonify({'success': True})
# Ajouter cette route dans votre fichier app.py
# Ajouter cette route dans votre fichier app.py
@app.route('/stats')
def system_stats():
"""Affiche les statistiques du système"""
conn = get_db_connection()
# Nombre total de résumés
total = conn.execute('SELECT COUNT(*) as count FROM tasks').fetchone()['count']
# Nombre de résumés réussis
completed = conn.execute(
'SELECT COUNT(*) as count FROM tasks WHERE status = ?',
('completed',)
).fetchone()['count']
# Nombre de résumés en cours
processing = conn.execute(
'SELECT COUNT(*) as count FROM tasks WHERE status IN (?, ?)',
('pending', 'processing')
).fetchone()['count']
# Nombre de résumés échoués
failed = conn.execute(
'SELECT COUNT(*) as count FROM tasks WHERE status = ?',
('failed',)
).fetchone()['count']
# Statistiques par type de résumé
by_type = conn.execute('''
SELECT summary_type, COUNT(*) as count
FROM tasks
WHERE status = 'completed'
GROUP BY summary_type
''').fetchall()
# Statistiques par source
by_source = conn.execute('''
SELECT source_type, COUNT(*) as count
FROM tasks
WHERE status = 'completed'
GROUP BY source_type
''').fetchall()
# Récupérer tous les résumés complétés (limité aux 50 derniers)
all_summaries = conn.execute('''
SELECT task_id, filename, summary_type, summary, created_at, completed_at, source_type
FROM tasks
WHERE status = 'completed'
ORDER BY completed_at DESC
LIMIT 50
''').fetchall()
conn.close()
# Créer le HTML minimaliste
html = f'''
Statistiques Système
Statistiques du Système
Vue d'ensemble
- Total de résumés : {total}
- Résumés réussis : {completed}
- En cours : {processing}
- Échoués : {failed}
Par type de résumé
'''
for row in by_type:
html += f' - {row["summary_type"].capitalize()} : {row["count"]}
\n'
if not by_type:
html += ' - Aucune donnée
\n'
html += '''
Par source
'''
for row in by_source:
source_name = 'Fichier' if row['source_type'] == 'file' else 'YouTube'
html += f' - {source_name} : {row["count"]}
\n'
if not by_source:
html += ' - Aucune donnée
\n'
html += '''
Résumés des utilisateurs (50 derniers)
'''
if not all_summaries:
html += ' Aucun résumé disponible
\n'
else:
for i, summary in enumerate(all_summaries, 1):
source_name = 'Fichier' if summary['source_type'] == 'file' else 'YouTube'
created = summary['created_at'][:19] if summary['created_at'] else 'N/A'
completed = summary['completed_at'][:19] if summary['completed_at'] else 'N/A'
html += f'''
Résumé #{i}
- Fichier : {summary["filename"]}
- Type : {summary["summary_type"].capitalize()}
- Source : {source_name}
- Créé le : {created}
- Complété le : {completed}
Résumé :
{summary["summary"]}
'''
html += '''
Retour à l'accueil
'''
return html
# =======================
# Initialisation
# =======================
# Initialiser la base de données au démarrage
with app.app_context():
init_db()
# Nettoyage périodique des anciennes tâches
def cleanup_old_tasks():
while True:
time.sleep(86400) # Toutes les 24 heures
delete_old_tasks()
cleanup_thread = threading.Thread(target=cleanup_old_tasks)
cleanup_thread.daemon = True
cleanup_thread.start()
if __name__ == '__main__':
app.run(debug=True)