Spaces:
Sleeping
Sleeping
| """Process October 2025 raw data into features for dataset extension. | |
| This script processes the October 2025 raw data (downloaded Nov 13) and generates | |
| feature files matching the 24-month dataset schema: | |
| - Weather features: 375 features | |
| - ENTSO-E features: ~1,863 features | |
| - JAO features: 276 features (if October data exists) | |
| Output files will be saved to data/processed/ with "_october" suffix. | |
| Author: Claude | |
| Date: 2025-11-14 | |
| """ | |
| from pathlib import Path | |
| import polars as pl | |
| import sys | |
| # Add src to path for imports | |
| sys.path.append(str(Path(__file__).parent / "src")) | |
| from feature_engineering.engineer_weather_features import ( | |
| engineer_grid_level_features, | |
| engineer_temporal_lags, | |
| engineer_derived_features | |
| ) | |
| from feature_engineering.engineer_entsoe_features import ( | |
| engineer_generation_features, | |
| engineer_demand_features, | |
| engineer_price_features, | |
| engineer_hydro_storage_features, | |
| engineer_pumped_storage_features, | |
| engineer_load_forecast_features, | |
| engineer_transmission_outage_features | |
| ) | |
| def process_october_weather() -> pl.DataFrame: | |
| """Process October weather data into 375 features.""" | |
| print("\n" + "=" * 80) | |
| print("PROCESSING OCTOBER WEATHER DATA") | |
| print("=" * 80) | |
| raw_file = Path("data/raw/weather_october_2025.parquet") | |
| if not raw_file.exists(): | |
| raise FileNotFoundError(f"Missing: {raw_file}") | |
| # Load October weather data | |
| weather_df = pl.read_parquet(raw_file) | |
| print(f"\nLoaded weather data: {weather_df.shape}") | |
| print(f"Date range: {weather_df['timestamp'].min()} to {weather_df['timestamp'].max()}") | |
| # Engineer features using existing modules | |
| features = engineer_grid_level_features(weather_df) | |
| features = engineer_temporal_lags(features) | |
| features = engineer_derived_features(features) | |
| # Save to processed directory | |
| output_file = Path("data/processed/features_weather_october.parquet") | |
| features.write_parquet(output_file) | |
| print(f"\n[OK] Weather features saved: {output_file}") | |
| print(f" Shape: {features.shape}") | |
| print(f" Features: {len(features.columns) - 1} (+ timestamp)") | |
| return features | |
| def process_october_entsoe() -> pl.DataFrame: | |
| """Process October ENTSO-E data into ~1,863 features.""" | |
| print("\n" + "=" * 80) | |
| print("PROCESSING OCTOBER ENTSO-E DATA") | |
| print("=" * 80) | |
| # Check which ENTSO-E files exist | |
| raw_dir = Path("data/raw") | |
| processed_dir = Path("data/processed") | |
| required_files = { | |
| 'generation': raw_dir / "entsoe_generation_october_2025.parquet", | |
| 'demand': raw_dir / "entsoe_demand_october_2025.parquet", | |
| 'prices': raw_dir / "entsoe_prices_october_2025.parquet", | |
| 'hydro_storage': raw_dir / "entsoe_hydro_storage_october_2025.parquet", | |
| 'pumped_storage': raw_dir / "entsoe_pumped_storage_october_2025.parquet", | |
| 'load_forecast': raw_dir / "entsoe_load_forecast_october_2025.parquet", | |
| 'transmission_outages': raw_dir / "entsoe_transmission_outages_october_2025.parquet" | |
| } | |
| # Load CNEC master list (required for transmission outage features) | |
| cnec_master_path = processed_dir / "cnecs_master_176.csv" | |
| if not cnec_master_path.exists(): | |
| raise FileNotFoundError(f"Missing CNEC master list: {cnec_master_path}") | |
| cnec_master_df = pl.read_csv(cnec_master_path) | |
| print(f"\nLoaded CNEC master list: {cnec_master_df.shape}") | |
| # Verify all files exist | |
| for name, file_path in required_files.items(): | |
| if not file_path.exists(): | |
| print(f"WARNING: Missing {name} file: {file_path}") | |
| # Load all datasets | |
| print("\nLoading ENTSO-E datasets...") | |
| generation_df = pl.read_parquet(required_files['generation']) | |
| demand_df = pl.read_parquet(required_files['demand']) | |
| prices_df = pl.read_parquet(required_files['prices']) | |
| hydro_storage_df = pl.read_parquet(required_files['hydro_storage']) | |
| pumped_storage_df = pl.read_parquet(required_files['pumped_storage']) | |
| load_forecast_df = pl.read_parquet(required_files['load_forecast']) | |
| transmission_outages_df = pl.read_parquet(required_files['transmission_outages']) | |
| print(f" Generation: {generation_df.shape}") | |
| print(f" Demand: {demand_df.shape}") | |
| print(f" Prices: {prices_df.shape}") | |
| print(f" Hydro storage: {hydro_storage_df.shape}") | |
| print(f" Pumped storage: {pumped_storage_df.shape}") | |
| print(f" Load forecast: {load_forecast_df.shape}") | |
| print(f" Transmission outages: {transmission_outages_df.shape}") | |
| # Engineer features for each category | |
| print("\nEngineering ENTSO-E features...") | |
| # Generation features (~228 features) | |
| gen_features = engineer_generation_features(generation_df) | |
| # Demand features (24 features) | |
| demand_features = engineer_demand_features(demand_df) | |
| # Price features (24 features) | |
| price_features = engineer_price_features(prices_df) | |
| # Hydro storage features (12 features) | |
| hydro_features = engineer_hydro_storage_features(hydro_storage_df) | |
| # Pumped storage features (10 features) | |
| pumped_features = engineer_pumped_storage_features(pumped_storage_df) | |
| # Load forecast features (12 features) | |
| load_forecast_features = engineer_load_forecast_features(load_forecast_df) | |
| # Transmission outage features (176 features - ALL CNECs) | |
| # Create hourly range for October (Oct 1-14, 2025) | |
| import datetime | |
| october_start = datetime.datetime(2025, 10, 1, 0, 0) | |
| october_end = datetime.datetime(2025, 10, 14, 23, 0) | |
| hourly_range = pl.DataFrame({ | |
| 'timestamp': pl.datetime_range( | |
| october_start, | |
| october_end, | |
| interval='1h', | |
| eager=True | |
| ) | |
| }) | |
| transmission_features = engineer_transmission_outage_features( | |
| transmission_outages_df, | |
| cnec_master_df, | |
| hourly_range | |
| ) | |
| # Merge all features | |
| print("\nMerging all ENTSO-E features...") | |
| features = gen_features | |
| # Fix timezone and precision issues - ensure all timestamps are timezone-naive and nanosecond precision | |
| features = features.with_columns([ | |
| pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('ns').alias('timestamp') | |
| ]) | |
| for feat_df, name in [ | |
| (demand_features, "demand"), | |
| (price_features, "prices"), | |
| (hydro_features, "hydro_storage"), | |
| (pumped_features, "pumped_storage"), | |
| (load_forecast_features, "load_forecast"), | |
| (transmission_features, "transmission_outages") | |
| ]: | |
| # Ensure timezone and precision consistency | |
| if 'timestamp' in feat_df.columns: | |
| feat_df = feat_df.with_columns([ | |
| pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('ns').alias('timestamp') | |
| ]) | |
| features = features.join(feat_df, on='timestamp', how='left', coalesce=True) | |
| print(f" Added {name}: {len(feat_df.columns) - 1} features") | |
| # Resample to hourly (some datasets have sub-hourly data) | |
| print("\nResampling to hourly...") | |
| features = features.with_columns([ | |
| pl.col('timestamp').dt.truncate('1h').alias('timestamp') | |
| ]) | |
| # Group by hour and take mean (for any sub-hourly values) | |
| agg_exprs = [pl.col(c).mean().alias(c) for c in features.columns if c != 'timestamp'] | |
| features = features.group_by('timestamp').agg(agg_exprs).sort('timestamp') | |
| print(f" Resampled to {len(features)} hourly rows") | |
| # Ensure complete 336-hour range (Oct 1-14) - fill missing hours with forward-fill | |
| october_start = datetime.datetime(2025, 10, 1, 0, 0) | |
| october_end = datetime.datetime(2025, 10, 14, 23, 0) | |
| complete_range = pl.DataFrame({ | |
| 'timestamp': pl.datetime_range( | |
| october_start, | |
| october_end, | |
| interval='1h', | |
| eager=True | |
| ) | |
| }) | |
| # Cast complete_range timestamp to match features precision | |
| complete_range = complete_range.with_columns([ | |
| pl.col('timestamp').dt.cast_time_unit('ns').alias('timestamp') | |
| ]) | |
| # Join to complete range and forward-fill missing values | |
| features = complete_range.join(features, on='timestamp', how='left') | |
| # Forward-fill missing values | |
| fill_exprs = [] | |
| for col in features.columns: | |
| if col != 'timestamp': | |
| fill_exprs.append(pl.col(col).forward_fill().alias(col)) | |
| if fill_exprs: | |
| features = features.with_columns(fill_exprs) | |
| missing_count = 336 - len(features.filter(pl.all_horizontal(pl.all().is_not_null()))) | |
| if missing_count > 0: | |
| print(f" Forward-filled {missing_count} missing hours") | |
| print(f" Final shape: {len(features)} hourly rows (Oct 1-14)") | |
| # Save to processed directory | |
| output_file = Path("data/processed/features_entsoe_october.parquet") | |
| features.write_parquet(output_file) | |
| print(f"\n[OK] ENTSO-E features saved: {output_file}") | |
| print(f" Shape: {features.shape}") | |
| print(f" Features: {len(features.columns) - 1} (+ timestamp)") | |
| return features | |
| def process_october_jao() -> pl.DataFrame | None: | |
| """Process October JAO data into 276 features (if data exists).""" | |
| print("\n" + "=" * 80) | |
| print("PROCESSING OCTOBER JAO DATA") | |
| print("=" * 80) | |
| # Check if October JAO data exists | |
| raw_file = Path("data/raw/jao_october_2025.parquet") | |
| if not raw_file.exists(): | |
| print(f"\nINFO: No October JAO data found at {raw_file}") | |
| print("This is expected - JAO features may be historical only.") | |
| print("Skipping JAO feature engineering for October.") | |
| return None | |
| # If data exists, process it | |
| from feature_engineering.engineer_jao_features import ( | |
| engineer_jao_features_all | |
| ) | |
| jao_df = pl.read_parquet(raw_file) | |
| print(f"\nLoaded JAO data: {jao_df.shape}") | |
| features = engineer_jao_features_all(jao_df) | |
| # Save to processed directory | |
| output_file = Path("data/processed/features_jao_october.parquet") | |
| features.write_parquet(output_file) | |
| print(f"\n[OK] JAO features saved: {output_file}") | |
| print(f" Shape: {features.shape}") | |
| return features | |
| def validate_october_features(): | |
| """Validate October feature files match expected schema.""" | |
| print("\n" + "=" * 80) | |
| print("VALIDATING OCTOBER FEATURES") | |
| print("=" * 80) | |
| # Load October feature files | |
| weather_file = Path("data/processed/features_weather_october.parquet") | |
| entsoe_file = Path("data/processed/features_entsoe_october.parquet") | |
| jao_file = Path("data/processed/features_jao_october.parquet") | |
| weather_df = pl.read_parquet(weather_file) | |
| entsoe_df = pl.read_parquet(entsoe_file) | |
| print(f"\nWeather features: {weather_df.shape}") | |
| print(f" Rows (expected 336): {len(weather_df)}") | |
| print(f" Features (expected 375): {len(weather_df.columns) - 1}") | |
| print(f"\nENTSO-E features: {entsoe_df.shape}") | |
| print(f" Rows (expected 336): {len(entsoe_df)}") | |
| print(f" Features (expected ~1,863): {len(entsoe_df.columns) - 1}") | |
| if jao_file.exists(): | |
| jao_df = pl.read_parquet(jao_file) | |
| print(f"\nJAO features: {jao_df.shape}") | |
| print(f" Rows (expected 336): {len(jao_df)}") | |
| print(f" Features (expected 276): {len(jao_df.columns) - 1}") | |
| else: | |
| print("\nJAO features: Not generated (no October JAO data)") | |
| # Validate row count (14 days × 24 hours = 336) | |
| expected_rows = 336 | |
| issues = [] | |
| if len(weather_df) != expected_rows: | |
| issues.append(f"Weather rows: {len(weather_df)} (expected {expected_rows})") | |
| if len(entsoe_df) != expected_rows: | |
| issues.append(f"ENTSO-E rows: {len(entsoe_df)} (expected {expected_rows})") | |
| # Validate date range (Oct 1-14, 2025) | |
| weather_start = weather_df['timestamp'].min() | |
| weather_end = weather_df['timestamp'].max() | |
| entsoe_start = entsoe_df['timestamp'].min() | |
| entsoe_end = entsoe_df['timestamp'].max() | |
| print(f"\nDate ranges:") | |
| print(f" Weather: {weather_start} to {weather_end}") | |
| print(f" ENTSO-E: {entsoe_start} to {entsoe_end}") | |
| # Check for null values | |
| weather_nulls = weather_df.null_count().sum_horizontal().to_list()[0] | |
| entsoe_nulls = entsoe_df.null_count().sum_horizontal().to_list()[0] | |
| print(f"\nNull value counts:") | |
| print(f" Weather: {weather_nulls} nulls") | |
| print(f" ENTSO-E: {entsoe_nulls} nulls") | |
| # Report validation results | |
| if issues: | |
| print("\n[WARNING] Validation issues found:") | |
| for issue in issues: | |
| print(f" - {issue}") | |
| else: | |
| print("\n[OK] All validation checks passed!") | |
| return len(issues) == 0 | |
| def main(): | |
| """Main execution: Process all October data.""" | |
| print("\n" + "=" * 80) | |
| print("OCTOBER 2025 FEATURE ENGINEERING") | |
| print("Processing raw data into features for dataset extension") | |
| print("=" * 80) | |
| try: | |
| # Process each feature category | |
| weather_features = process_october_weather() | |
| entsoe_features = process_october_entsoe() | |
| jao_features = process_october_jao() # May return None | |
| # Validate features | |
| validation_passed = validate_october_features() | |
| if validation_passed: | |
| print("\n" + "=" * 80) | |
| print("SUCCESS: October feature engineering complete!") | |
| print("=" * 80) | |
| print("\nGenerated files:") | |
| print(" - data/processed/features_weather_october.parquet") | |
| print(" - data/processed/features_entsoe_october.parquet") | |
| if jao_features is not None: | |
| print(" - data/processed/features_jao_october.parquet") | |
| print("\nNext steps:") | |
| print(" 1. Merge October features into unified dataset") | |
| print(" 2. Append to 24-month dataset (17,544 -> 17,880 rows)") | |
| print(" 3. Upload extended dataset to HuggingFace") | |
| else: | |
| print("\n[ERROR] Validation failed - please review issues above") | |
| sys.exit(1) | |
| except Exception as e: | |
| # Avoid Unicode errors on Windows console | |
| error_msg = str(e).encode('ascii', 'replace').decode('ascii') | |
| print(f"\n[ERROR] Feature engineering failed: {error_msg}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |