Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Full Inference Run for Chronos 2 Zero-Shot Forecasting | |
| Generates 14-day forecasts for all 38 FBMC borders | |
| """ | |
| import time | |
| import pandas as pd | |
| import numpy as np | |
| import polars as pl | |
| from datetime import datetime, timedelta | |
| from chronos import Chronos2Pipeline | |
| import torch | |
| from src.forecasting.feature_availability import FeatureAvailability | |
| from src.forecasting.dynamic_forecast import DynamicForecast | |
| print("="*60) | |
| print("CHRONOS 2 FULL INFERENCE - ALL BORDERS") | |
| print("="*60) | |
| total_start = time.time() | |
| # Step 1: Load dataset | |
| print("\n[1/7] Loading dataset from HuggingFace...") | |
| start_time = time.time() | |
| from datasets import load_dataset | |
| import os | |
| # Use HF token for private dataset access | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| raise ValueError("HF_TOKEN not found in environment. Please set HF_TOKEN.") | |
| dataset = load_dataset( | |
| "evgueni-p/fbmc-features-24month", | |
| split="train", | |
| token=hf_token | |
| ) | |
| df = pl.from_pandas(dataset.to_pandas()) | |
| # Ensure timestamp is datetime (check if conversion needed) | |
| if df['timestamp'].dtype == pl.String: | |
| df = df.with_columns(pl.col('timestamp').str.to_datetime()) | |
| elif df['timestamp'].dtype != pl.Datetime: | |
| df = df.with_columns(pl.col('timestamp').cast(pl.Datetime)) | |
| print(f"[OK] Loaded {len(df)} rows, {len(df.columns)} columns") | |
| print(f" Date range: {df['timestamp'].min()} to {df['timestamp'].max()}") | |
| print(f" Load time: {time.time() - start_time:.1f}s") | |
| # Feature categorization using FeatureAvailability module | |
| print("\n[Feature Categorization]") | |
| categories = FeatureAvailability.categorize_features(df.columns) | |
| # Validate categorization | |
| is_valid, warnings = FeatureAvailability.validate_categorization(categories, verbose=False) | |
| # Report categories | |
| print(f" Full-horizon D+14: {len(categories['full_horizon_d14'])} (temporal + weather + outages + LTA)") | |
| print(f" Partial D+1: {len(categories['partial_d1'])} (load forecasts)") | |
| print(f" Historical only: {len(categories['historical'])} (prices, generation, demand, lags, etc.)") | |
| print(f" Total features: {sum(len(v) for v in categories.values())}") | |
| if not is_valid: | |
| print("\n[!] WARNING: Feature categorization issues:") | |
| for w in warnings: | |
| print(f" - {w}") | |
| # For Chronos-2: combine full+partial for future covariates | |
| # (Chronos-2 supports partial availability via masking) | |
| known_future_cols = categories['full_horizon_d14'] + categories['partial_d1'] | |
| past_only_cols = categories['historical'] | |
| # Step 2: Identify all target borders | |
| print("\n[2/7] Identifying target borders...") | |
| target_cols = [col for col in df.columns if col.startswith('target_border_')] | |
| borders = [col.replace('target_border_', '') for col in target_cols] | |
| print(f"[OK] Found {len(borders)} borders") | |
| print(f" Borders: {', '.join(borders[:5])}... (showing first 5)") | |
| # Step 3: Prepare forecast parameters | |
| print("\n[3/7] Setting up forecast parameters...") | |
| # Use a date that has 14 days of future data available | |
| # Dataset ends at 2025-09-30 23:00, so we need run_date such that | |
| # forecast ends at most at 2025-09-30 23:00 | |
| # For 336 hours (14 days), run_date should be at most 2025-09-16 23:00 | |
| context_hours = 512 | |
| prediction_hours = 336 # 14 days (fixed) | |
| max_date = df['timestamp'].max() | |
| run_date = max_date - timedelta(hours=prediction_hours) | |
| print(f" Run date: {run_date}") | |
| print(f" Context window: {context_hours} hours") | |
| print(f" Prediction horizon: {prediction_hours} hours (14 days, D+1 to D+14)") | |
| print(f" Forecast range: {run_date + timedelta(hours=1)} to {run_date + timedelta(hours=prediction_hours)}") | |
| # Initialize DynamicForecast once for all borders | |
| forecaster = DynamicForecast( | |
| dataset=df, | |
| context_hours=context_hours, | |
| forecast_hours=prediction_hours | |
| ) | |
| print(f"[OK] DynamicForecast initialized with time-aware data extraction") | |
| # Step 4: Load model | |
| print("\n[4/7] Loading Chronos 2 model on GPU...") | |
| model_start = time.time() | |
| pipeline = Chronos2Pipeline.from_pretrained( | |
| 'amazon/chronos-2', | |
| device_map='cuda', | |
| dtype=torch.float32 | |
| ) | |
| model_time = time.time() - model_start | |
| print(f"[OK] Model loaded in {model_time:.1f}s") | |
| print(f" Device: {next(pipeline.model.parameters()).device}") | |
| # Step 5: Run inference for all borders | |
| print(f"\n[5/7] Running zero-shot inference for {len(borders)} borders...") | |
| print(f" Prediction: {prediction_hours} hours (14 days) per border") | |
| print(f" Progress:") | |
| all_forecasts = [] | |
| inference_times = [] | |
| for i, border in enumerate(borders, 1): | |
| border_start = time.time() | |
| try: | |
| # Prepare data with time-aware extraction | |
| context_data, future_data = forecaster.prepare_forecast_data(run_date, border) | |
| # Validate no data leakage (on first border only, for performance) | |
| if i == 1: | |
| is_valid, errors = forecaster.validate_no_leakage(context_data, future_data, run_date) | |
| if not is_valid: | |
| print(f"\n[ERROR] Data leakage detected on first border ({border}):") | |
| for err in errors: | |
| print(f" - {err}") | |
| exit(1) | |
| # Call API with separate context and future dataframes | |
| forecasts = pipeline.predict_df( | |
| context_data, # Historical data (positional parameter) | |
| future_df=future_data, # Future covariates (named parameter) | |
| prediction_length=prediction_hours, | |
| id_column='border', | |
| timestamp_column='timestamp', | |
| target='target' | |
| ) | |
| # Add border identifier | |
| forecasts['border'] = border | |
| all_forecasts.append(forecasts) | |
| border_time = time.time() - border_start | |
| inference_times.append(border_time) | |
| print(f" [{i:2d}/{len(borders)}] {border:15s} - {border_time:.2f}s") | |
| except Exception as e: | |
| print(f" [{i:2d}/{len(borders)}] {border:15s} - FAILED: {e}") | |
| inference_time = time.time() - model_start - model_time | |
| print(f"\n[OK] Inference complete!") | |
| print(f" Total inference time: {inference_time:.1f}s") | |
| print(f" Average per border: {np.mean(inference_times):.2f}s") | |
| print(f" Successful forecasts: {len(all_forecasts)}/{len(borders)}") | |
| # Step 6: Combine and save results | |
| print("\n[6/7] Saving forecast results...") | |
| if all_forecasts: | |
| # Combine all forecasts | |
| combined_forecasts = pd.concat(all_forecasts, ignore_index=True) | |
| # Save as parquet (efficient, compressed) | |
| output_file = '/tmp/chronos2_forecasts_14day.parquet' | |
| combined_forecasts.to_parquet(output_file) | |
| print(f"[OK] Forecasts saved to: {output_file}") | |
| print(f" Shape: {combined_forecasts.shape}") | |
| print(f" Columns: {list(combined_forecasts.columns)}") | |
| print(f" File size: {os.path.getsize(output_file) / 1024 / 1024:.2f} MB") | |
| # Save summary statistics | |
| summary_file = '/tmp/chronos2_forecast_summary.csv' | |
| summary_data = [] | |
| for border in borders: | |
| border_forecasts = combined_forecasts[combined_forecasts['border'] == border] | |
| if len(border_forecasts) > 0 and 'mean' in border_forecasts.columns: | |
| summary_data.append({ | |
| 'border': border, | |
| 'forecast_points': len(border_forecasts), | |
| 'mean_forecast': border_forecasts['mean'].mean(), | |
| 'min_forecast': border_forecasts['mean'].min(), | |
| 'max_forecast': border_forecasts['mean'].max(), | |
| 'std_forecast': border_forecasts['mean'].std() | |
| }) | |
| summary_df = pd.DataFrame(summary_data) | |
| summary_df.to_csv(summary_file, index=False) | |
| print(f"[OK] Summary saved to: {summary_file}") | |
| else: | |
| print("[!] No successful forecasts to save") | |
| # Step 7: Validation | |
| print("\n[7/7] Validating results...") | |
| if all_forecasts: | |
| # Check for NaN values | |
| nan_count = combined_forecasts.isna().sum().sum() | |
| print(f" NaN values: {nan_count}") | |
| # Sanity checks on mean forecast | |
| if 'mean' in combined_forecasts.columns: | |
| mean_forecast = combined_forecasts['mean'] | |
| print(f" Overall statistics:") | |
| print(f" Mean: {mean_forecast.mean():.2f} MW") | |
| print(f" Min: {mean_forecast.min():.2f} MW") | |
| print(f" Max: {mean_forecast.max():.2f} MW") | |
| print(f" Std: {mean_forecast.std():.2f} MW") | |
| # Warnings | |
| if mean_forecast.min() < 0: | |
| print(" [!] WARNING: Negative forecasts detected") | |
| if mean_forecast.max() > 20000: | |
| print(" [!] WARNING: Unreasonably high forecasts") | |
| if nan_count == 0 and mean_forecast.min() >= 0 and mean_forecast.max() < 20000: | |
| print(" [OK] Validation passed!") | |
| # Performance summary | |
| print("\n" + "="*60) | |
| print("FULL INFERENCE SUMMARY") | |
| print("="*60) | |
| print(f"Borders forecasted: {len(all_forecasts)}/{len(borders)}") | |
| print(f"Forecast horizon: {prediction_hours} hours (14 days)") | |
| print(f"Total inference time: {inference_time:.1f}s ({inference_time / 60:.2f} min)") | |
| print(f"Average per border: {np.mean(inference_times):.2f}s") | |
| print(f"Speed: {prediction_hours * len(all_forecasts) / inference_time:.1f} hours/second") | |
| # Target check | |
| if inference_time < 300: # 5 minutes | |
| print(f"\n[OK] Performance target met! (<5 min for full run)") | |
| else: | |
| print(f"\n[!] Performance slower than target (expected <5 min)") | |
| print("="*60) | |
| print("[OK] FULL INFERENCE COMPLETE!") | |
| print("="*60) | |
| # Total time | |
| total_time = time.time() - total_start | |
| print(f"\nTotal execution time: {total_time:.1f}s ({total_time / 60:.1f} min)") | |