Maaroufabousaleh
Enhance health status check to handle malformed last_run values and improve logging
58c5ce0
| import gradio as gr | |
| import json | |
| import os | |
| import sys | |
| import logging | |
| import pandas as pd | |
| import time | |
| from datetime import datetime, timedelta | |
| import psutil | |
| from pathlib import Path | |
| # Internal config for paths and markers | |
| try: | |
| from src.config import DATA_DIR, LOG_DIR, LAST_RUN_PATH | |
| except Exception: | |
| # Fallbacks if import path differs in Spaces | |
| try: | |
| from config import DATA_DIR, LOG_DIR, LAST_RUN_PATH # type: ignore | |
| except Exception: | |
| DATA_DIR = os.environ.get('DATA_DIR', '/data') | |
| LOG_DIR = os.environ.get('LOG_DIR', os.path.join(DATA_DIR, 'logs')) | |
| LAST_RUN_PATH = os.environ.get('LAST_RUN_PATH', '/tmp/last_run.txt') | |
| # Add src to Python path for imports | |
| sys.path.insert(0, '/app/src') | |
| sys.path.insert(0, '/app') | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[logging.StreamHandler(sys.stdout)] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def get_health_status(): | |
| """Get basic health status""" | |
| try: | |
| # Get process info | |
| process = psutil.Process() | |
| memory_mb = process.memory_info().rss / 1024 / 1024 | |
| cpu_percent = process.cpu_percent() | |
| # Get system info | |
| memory = psutil.virtual_memory() | |
| disk = psutil.disk_usage('/') | |
| # Check scheduler status | |
| scheduler_running = False | |
| last_run_time = "Unknown" | |
| try: | |
| last_run_file = LAST_RUN_PATH | |
| if os.path.exists(last_run_file): | |
| with open(last_run_file, 'r') as f: | |
| last_run_str = f.read().strip() | |
| if last_run_str: | |
| try: | |
| last_run = datetime.strptime(last_run_str, '%Y-%m-%d %H:%M:%S') | |
| time_since_last_run = (datetime.now() - last_run).total_seconds() | |
| scheduler_running = time_since_last_run < 2700 # 45 minutes | |
| last_run_time = last_run_str | |
| except Exception: | |
| logger.debug("Malformed last_run value: %s", last_run_str) | |
| scheduler_running = False | |
| last_run_time = "Unknown" | |
| else: | |
| scheduler_running = False | |
| last_run_time = "Unknown" | |
| except Exception as e: | |
| logger.warning(f"Could not check scheduler status: {e}") | |
| return { | |
| "status": "healthy" if memory_mb < 400 else "warning", | |
| "timestamp": datetime.now().isoformat(), | |
| "process_memory_mb": round(memory_mb, 2), | |
| "process_cpu_percent": round(cpu_percent, 2), | |
| "system_memory_percent": round(memory.percent, 1), | |
| "system_memory_available_gb": round(memory.available / (1024**3), 2), | |
| "disk_free_gb": round(disk.free / (1024**3), 2), | |
| "scheduler_running": scheduler_running, | |
| "scheduler_last_run": last_run_time | |
| } | |
| except Exception as e: | |
| logger.error(f"Health check failed: {e}") | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def get_pipeline_status(): | |
| """Get data pipeline status""" | |
| try: | |
| data_dirs = [ | |
| os.path.join(DATA_DIR, 'merged', 'features'), | |
| os.path.join(DATA_DIR, 'merged', 'train'), | |
| os.path.join(DATA_DIR, 'alpaca'), | |
| os.path.join(DATA_DIR, 'advisorai-data'), | |
| ] | |
| recent_files = 0 | |
| total_size = 0 | |
| for data_dir in data_dirs: | |
| if os.path.exists(data_dir): | |
| for root, dirs, files in os.walk(data_dir): | |
| for file in files: | |
| if file.endswith(('.json', '.parquet', '.csv')): | |
| file_path = os.path.join(root, file) | |
| try: | |
| stat = os.stat(file_path) | |
| # Count files modified in last 24 hours | |
| if time.time() - stat.st_mtime < 86400: | |
| recent_files += 1 | |
| total_size += stat.st_size | |
| except Exception: | |
| continue | |
| return { | |
| "status": "running" if recent_files > 0 else "stale", | |
| "recent_files_24h": recent_files, | |
| "total_data_size_gb": round(total_size / (1024**3), 2), | |
| "last_check": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Pipeline status check failed: {e}") | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "last_check": datetime.now().isoformat() | |
| } | |
| def get_recent_files(): | |
| """Get list of recent files in the data directories""" | |
| try: | |
| base_paths = [ | |
| os.path.join(DATA_DIR, 'merged', 'features'), | |
| os.path.join(DATA_DIR, 'merged', 'train'), | |
| os.path.join(DATA_DIR, 'alpaca'), | |
| os.path.join(DATA_DIR, 'advisorai-data', 'features'), | |
| ] | |
| recent_files = [] | |
| for base_path in base_paths: | |
| if os.path.exists(base_path): | |
| for root, dirs, files in os.walk(base_path): | |
| for file in files[:10]: # Limit to 10 files per directory | |
| file_path = os.path.join(root, file) | |
| try: | |
| stat = os.stat(file_path) | |
| recent_files.append({ | |
| "File": file, | |
| "Path": file_path.replace(DATA_DIR.rstrip('/') + '/', ""), | |
| "Size": f"{stat.st_size / (1024**2):.2f} MB", | |
| "Modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M") | |
| }) | |
| except Exception: | |
| continue | |
| # Sort by modification time and take most recent 20 | |
| recent_files.sort(key=lambda x: x["Modified"], reverse=True) | |
| return recent_files[:20] | |
| except Exception as e: | |
| logger.error(f"Error getting recent files: {e}") | |
| return [{"Error": str(e)}] | |
| def get_logs(): | |
| """Get recent log entries""" | |
| try: | |
| log_files = [ | |
| "/data/logs/scheduler.log", | |
| "/data/logs/data_pipeline.log", | |
| "/data/logs/monitor.log" | |
| ] | |
| logs = [] | |
| for log_file in log_files: | |
| if os.path.exists(log_file): | |
| try: | |
| with open(log_file, 'r', encoding='utf-8') as f: | |
| lines = f.readlines() | |
| # Get last 10 lines | |
| recent_lines = lines[-10:] if len(lines) > 10 else lines | |
| logs.append(f"=== {os.path.basename(log_file)} ===\n") | |
| logs.extend(recent_lines) | |
| logs.append("\n") | |
| except Exception as e: | |
| logs.append(f"Error reading {log_file}: {str(e)}\n") | |
| return "".join(logs) if logs else "No log files found" | |
| except Exception as e: | |
| logger.error(f"Error getting logs: {e}") | |
| return f"Error getting logs: {str(e)}" | |
| # Create Gradio interface | |
| with gr.Blocks(title="AdvisorAI Data Pipeline Monitor", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# π€ AdvisorAI Data Pipeline Monitor") | |
| gr.Markdown("Real-time monitoring of the AdvisorAI data collection and processing pipeline") | |
| with gr.Tabs(): | |
| with gr.TabItem("π Dashboard"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Health Status") | |
| health_display = gr.JSON(label="System Health & Status") | |
| with gr.Column(): | |
| gr.Markdown("### Pipeline Status") | |
| pipeline_display = gr.JSON(label="Data Pipeline Status") | |
| with gr.Row(): | |
| refresh_btn = gr.Button("π Refresh", variant="primary") | |
| with gr.TabItem("π Recent Files"): | |
| gr.Markdown("### Recently Modified Data Files") | |
| files_display = gr.Dataframe( | |
| headers=["File", "Path", "Size", "Modified"], | |
| datatype=["str", "str", "str", "str"], | |
| label="Recent Files" | |
| ) | |
| refresh_files_btn = gr.Button("π Refresh Files") | |
| with gr.TabItem("π Logs"): | |
| gr.Markdown("### Recent Log Entries") | |
| logs_display = gr.Textbox( | |
| label="Recent Logs", | |
| lines=20, | |
| max_lines=30, | |
| show_copy_button=True | |
| ) | |
| refresh_logs_btn = gr.Button("π Refresh Logs") | |
| # Event handlers | |
| def refresh_dashboard(): | |
| health = get_health_status() | |
| pipeline = get_pipeline_status() | |
| # JSON components accept dicts directly | |
| return health, pipeline | |
| def refresh_files(): | |
| files = get_recent_files() | |
| if not files: | |
| return [] | |
| if isinstance(files, list) and isinstance(files[0], dict) and "Error" not in files[0]: | |
| rows = [] | |
| for f in files: | |
| rows.append([f.get("File",""), f.get("Path",""), f.get("Size",""), f.get("Modified","")]) | |
| return rows | |
| return [["Error", str(files), "", ""]] | |
| def refresh_logs(): | |
| return get_logs() | |
| # Connect event handlers | |
| refresh_btn.click( | |
| refresh_dashboard, | |
| outputs=[health_display, pipeline_display] | |
| ) | |
| refresh_files_btn.click( | |
| refresh_files, | |
| outputs=[files_display] | |
| ) | |
| refresh_logs_btn.click( | |
| refresh_logs, | |
| outputs=[logs_display] | |
| ) | |
| # Auto-refresh on load | |
| app.load( | |
| refresh_dashboard, | |
| outputs=[health_display, pipeline_display] | |
| ) | |
| app.load( | |
| refresh_files, | |
| outputs=[files_display] | |
| ) | |
| if __name__ == "__main__": | |
| logger.info("Starting Gradio app...") | |
| port = int(os.environ.get("PORT", "7860")) | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False, | |
| show_error=True, | |
| quiet=False | |
| ) | |