import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder, PowerTransformer import json import pickle from datetime import datetime import warnings warnings.filterwarnings('ignore') import os class CryptoDataNormalizer: """ Enhanced normalization pipeline for cryptocurrency features data with crypto-specific handling """ def __init__(self, preserve_symbol=True, handle_outliers=True, feature_engineering=True): self.scalers = {} self.encoders = {} self.feature_info = {} self.is_fitted = False self.preserve_symbol = preserve_symbol self.handle_outliers = handle_outliers self.feature_engineering = feature_engineering self.outlier_bounds = {} def _detect_outliers(self, df, column): """Detect outliers using IQR method""" Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR return lower_bound, upper_bound def _handle_outliers(self, df, column, method='clip'): """Handle outliers in numerical data""" if column not in self.outlier_bounds: lower_bound, upper_bound = self._detect_outliers(df, column) self.outlier_bounds[column] = (lower_bound, upper_bound) else: lower_bound, upper_bound = self.outlier_bounds[column] if method == 'clip': return df[column].clip(lower_bound, upper_bound) elif method == 'remove': return df[column].where((df[column] >= lower_bound) & (df[column] <= upper_bound)) return df[column] def _categorize_features(self, df): """Enhanced feature categorization for crypto data""" # Core identification features id_features = ['symbol', 'backup_id', '__index_level_0__', 'cg_id'] # Timestamp features timestamp_features = [col for col in df.columns if 'timestamp' in col.lower()] # Binary features (0/1, True/False, or boolean-like) binary_features = [] for col in df.columns: if col not in id_features + timestamp_features: unique_vals = set(df[col].dropna().unique()) if (df[col].dtype == bool or (len(unique_vals) <= 2 and unique_vals.issubset({0, 1, True, False, np.nan})) or col in ['stable']): binary_features.append(col) # Categorical features (strings, objects, or low cardinality integers) categorical_features = [] for col in df.columns: if (col not in id_features + binary_features + timestamp_features and (df[col].dtype == 'object' or df[col].dtype.name == 'category' or (df[col].nunique() < 20 and df[col].dtype in ['int64', 'int32']))): categorical_features.append(col) # Crypto-specific features crypto_specific_features = [] crypto_keywords = ['dominance', 'rank'] for col in df.columns: if any(keyword in col.lower() for keyword in crypto_keywords): if col not in id_features + timestamp_features + binary_features + categorical_features: crypto_specific_features.append(col) # Price/volume/market features price_volume_features = [] for col in df.columns: if any(keyword in col.lower() for keyword in ['price', 'volume', 'marketcap', 'open']): if col not in id_features + timestamp_features + binary_features + categorical_features + crypto_specific_features: price_volume_features.append(col) # Exchange price features exchange_features = [] for col in df.columns: if col.startswith('exchangePrices.'): exchange_features.append(col) # Performance features performance_features = [] for col in df.columns: if col.startswith('performance.'): performance_features.append(col) # Rank difference features rank_diff_features = [] for col in df.columns: if col.startswith('rankDiffs.'): rank_diff_features.append(col) # Technical indicator features technical_features = [] tech_keywords = ['rsi', 'macd', 'ema', 'sma', 'bb_', 'cci', 'mfi', 'atr', 'stoch', 'roc'] for col in df.columns: if any(keyword in col.lower() for keyword in tech_keywords): if col not in (id_features + timestamp_features + binary_features + categorical_features + crypto_specific_features + price_volume_features + exchange_features + performance_features + rank_diff_features): technical_features.append(col) # Social sentiment features social_features = [] for col in df.columns: if any(keyword in col.lower() for keyword in ['social', 'sentiment', 'confidence', 'pos', 'neg', 'neu']): if col not in (id_features + timestamp_features + binary_features + categorical_features + crypto_specific_features + price_volume_features + exchange_features + performance_features + rank_diff_features + technical_features): social_features.append(col) # Transaction/blockchain features transaction_features = [] for col in df.columns: if any(keyword in col.lower() for keyword in ['transaction', 'tx_', 'gas', 'fees']): if col not in (id_features + timestamp_features + binary_features + categorical_features + crypto_specific_features + price_volume_features + exchange_features + performance_features + rank_diff_features + technical_features + social_features): transaction_features.append(col) # Data quality features quality_features = [] for col in df.columns: if any(keyword in col.lower() for keyword in ['completeness', 'quality', 'correlation']): if col not in (id_features + timestamp_features + binary_features + categorical_features + crypto_specific_features + price_volume_features + exchange_features + performance_features + rank_diff_features + technical_features + social_features + transaction_features): quality_features.append(col) # Remaining numerical features numerical_features = [] all_categorized = (id_features + timestamp_features + binary_features + categorical_features + crypto_specific_features + price_volume_features + exchange_features + performance_features + rank_diff_features + technical_features + social_features + transaction_features + quality_features) for col in df.columns: if (col not in all_categorized and pd.api.types.is_numeric_dtype(df[col])): numerical_features.append(col) return { 'id_features': id_features, 'timestamp_features': timestamp_features, 'binary_features': binary_features, 'categorical_features': categorical_features, 'crypto_specific_features': crypto_specific_features, 'price_volume_features': price_volume_features, 'exchange_features': exchange_features, 'performance_features': performance_features, 'rank_diff_features': rank_diff_features, 'technical_features': technical_features, 'social_features': social_features, 'transaction_features': transaction_features, 'quality_features': quality_features, 'numerical_features': numerical_features } def _engineer_crypto_features(self, df, normalized_df): """Create crypto-specific engineered features""" if not self.feature_engineering: return normalized_df # Exchange price spread analysis exchange_cols = [col for col in df.columns if col.startswith('exchangePrices.')] if len(exchange_cols) > 1: exchange_prices = df[exchange_cols].replace([np.inf, -np.inf], np.nan) if not exchange_prices.empty and exchange_prices.notna().any().any(): price_mean = exchange_prices.mean(axis=1) price_max = exchange_prices.max(axis=1) price_min = exchange_prices.min(axis=1) price_std = exchange_prices.std(axis=1) # Only calculate if we have valid data valid_mask = (price_mean > 0) & price_mean.notna() if valid_mask.any(): normalized_df['exchange_price_spread'] = ((price_max - price_min) / price_mean).fillna(0) normalized_df['exchange_price_std'] = (price_std / price_mean).fillna(0) # Performance momentum perf_short_cols = [col for col in df.columns if col.startswith('performance.') and any(timeframe in col for timeframe in ['min1', 'min5', 'min15', 'hour'])] perf_long_cols = [col for col in df.columns if col.startswith('performance.') and any(timeframe in col for timeframe in ['day', 'week', 'month'])] if perf_short_cols: short_perf = df[perf_short_cols].replace([np.inf, -np.inf], np.nan) normalized_df['short_term_momentum'] = short_perf.mean(axis=1).fillna(0) if perf_long_cols: long_perf = df[perf_long_cols].replace([np.inf, -np.inf], np.nan) normalized_df['long_term_momentum'] = long_perf.mean(axis=1).fillna(0) # Rank stability rank_diff_cols = [col for col in df.columns if col.startswith('rankDiffs.')] if rank_diff_cols: rank_diffs = df[rank_diff_cols].replace([np.inf, -np.inf], np.nan).fillna(0) normalized_df['rank_stability'] = 1 / (1 + rank_diffs.abs().sum(axis=1) + 1e-8) # Add small epsilon to avoid division by zero # Social sentiment aggregation social_sentiment_cols = [col for col in df.columns if 'social_sentiment' in col.lower()] if social_sentiment_cols: social_data = df[social_sentiment_cols].replace([np.inf, -np.inf], np.nan) normalized_df['avg_social_sentiment'] = social_data.mean(axis=1).fillna(0.5) # Neutral sentiment # Technical strength (similar to stocks but crypto-focused) tech_cols = [col for col in df.columns if any(tech in col.lower() for tech in ['rsi', 'macd', 'cci'])] if tech_cols: tech_data = df[tech_cols].replace([np.inf, -np.inf], np.nan) normalized_df['technical_strength'] = tech_data.mean(axis=1).fillna(0) # Volume-price relationship if 'volume' in df.columns and 'price' in df.columns: volume = df['volume'].replace([np.inf, -np.inf], np.nan) price = df['price'].replace([np.inf, -np.inf], np.nan) valid_mask = (price > 0) & price.notna() & volume.notna() if valid_mask.any(): ratio = volume / price normalized_df['volume_price_ratio'] = ratio.fillna(0) # Market dominance relative to rank if 'dominance' in df.columns and 'rank' in df.columns: dominance = df['dominance'].replace([np.inf, -np.inf], np.nan).fillna(0) rank = df['rank'].replace([np.inf, -np.inf], np.nan).fillna(1000) # High rank for unknown # Avoid division by zero rank_reciprocal = 1 / (rank + 1e-8) normalized_df['dominance_rank_ratio'] = (dominance / rank_reciprocal).fillna(0) return normalized_df def fit(self, df): """Fit the normalizer on training data with crypto-specific preprocessing""" if isinstance(df, dict): df = pd.DataFrame([df]) self.feature_info = self._categorize_features(df) # Fit scalers for different feature types feature_types = { 'crypto_specific_features': RobustScaler(), # Rank and dominance can have outliers 'price_volume_features': RobustScaler(), # Price and volume data often has outliers 'exchange_features': StandardScaler(), # Exchange prices should be similar 'performance_features': StandardScaler(), # Performance percentages 'rank_diff_features': StandardScaler(), # Rank differences are usually small integers 'technical_features': StandardScaler(), # Technical indicators are usually normalized 'social_features': StandardScaler(), # Sentiment scores 'transaction_features': PowerTransformer(), # Transaction data can be very skewed 'quality_features': MinMaxScaler(), # Quality scores are usually 0-1 'numerical_features': PowerTransformer() # General numerical features } for feature_type, scaler in feature_types.items(): features = self.feature_info[feature_type] if features: # Filter existing columns existing_features = [col for col in features if col in df.columns] if existing_features: # Handle outliers if enabled if self.handle_outliers and feature_type in ['crypto_specific_features', 'price_volume_features']: df_clean = df.copy() for col in existing_features: df_clean[col] = self._handle_outliers(df_clean, col) else: df_clean = df.copy() # Comprehensive data cleaning for fitting try: # Replace inf/-inf with NaN df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], np.nan) # Fill NaN with appropriate strategy based on feature type if feature_type in ['crypto_specific_features', 'price_volume_features']: # For price/volume data, use forward fill then median for col in existing_features: df_clean[col] = df_clean[col].fillna(method='ffill').fillna(df_clean[col].median()).fillna(0) elif feature_type in ['performance_features', 'rank_diff_features']: # Performance and rank diffs can be 0 when no change df_clean[existing_features] = df_clean[existing_features].fillna(0) elif feature_type == 'quality_features': # Quality features should default to reasonable values df_clean[existing_features] = df_clean[existing_features].fillna(0.5) else: # General strategy: median then 0 for col in existing_features: df_clean[col] = df_clean[col].fillna(df_clean[col].median()).fillna(0) # Ensure no infinite values remain df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], 0) # Fit the scaler scaler.fit(df_clean[existing_features]) self.scalers[feature_type] = scaler self.feature_info[f'{feature_type}_existing'] = existing_features except Exception as e: print(f"Warning: Could not fit scaler for {feature_type}: {e}") # Skip this feature type if fitting fails continue # Fit encoders for categorical features for col in self.feature_info['categorical_features']: if col in df.columns: self.encoders[col] = LabelEncoder() self.encoders[col].fit(df[col].astype(str).fillna('unknown')) self.is_fitted = True return self def transform(self, data): """Transform data using fitted normalizers with crypto-specific handling""" if not self.is_fitted: raise ValueError("Normalizer must be fitted before transform") if isinstance(data, dict): df = pd.DataFrame([data]) else: df = data.copy() normalized_df = pd.DataFrame(index=df.index) # 1. Preserve symbol if requested if self.preserve_symbol and 'symbol' in df.columns: normalized_df['symbol'] = df['symbol'] # 2. Enhanced timestamp features for col in self.feature_info['timestamp_features']: if col in df.columns: ts = pd.to_datetime(df[col], unit='ms', errors='coerce') # Crypto markets are 24/7, so different time features normalized_df[f'{col}_hour'] = ts.dt.hour / 23.0 normalized_df[f'{col}_day_of_week'] = ts.dt.dayofweek / 6.0 normalized_df[f'{col}_month'] = (ts.dt.month - 1) / 11.0 normalized_df[f'{col}_quarter'] = (ts.dt.quarter - 1) / 3.0 normalized_df[f'{col}_is_weekend'] = (ts.dt.dayofweek >= 5).astype(int) # For crypto, we might want to track different time patterns normalized_df[f'{col}_is_asian_hours'] = ((ts.dt.hour >= 0) & (ts.dt.hour <= 8)).astype(int) normalized_df[f'{col}_is_european_hours'] = ((ts.dt.hour >= 8) & (ts.dt.hour <= 16)).astype(int) normalized_df[f'{col}_is_american_hours'] = ((ts.dt.hour >= 16) & (ts.dt.hour <= 24)).astype(int) # 3. Binary features (keep as is, fill NaN with 0) for col in self.feature_info['binary_features']: if col in df.columns: normalized_df[col] = df[col].fillna(0).astype(int) # 4. Categorical features with better encoding for col in self.feature_info['categorical_features']: if col in df.columns and col in self.encoders: try: # Handle unknown categories values = df[col].astype(str).fillna('unknown') encoded_values = [] for val in values: try: encoded_values.append(self.encoders[col].transform([val])[0]) except ValueError: # Unknown category, assign most frequent class encoded_values.append(0) normalized_df[f'{col}_encoded'] = encoded_values except Exception: normalized_df[f'{col}_encoded'] = 0 # 5. Scale different feature types with appropriate scalers feature_types = ['crypto_specific_features', 'price_volume_features', 'exchange_features', 'performance_features', 'rank_diff_features', 'technical_features', 'social_features', 'transaction_features', 'quality_features', 'numerical_features'] for feature_type in feature_types: if feature_type in self.scalers: existing_features = self.feature_info.get(f'{feature_type}_existing', []) available_features = [col for col in existing_features if col in df.columns] if available_features: try: # Handle outliers if enabled if (self.handle_outliers and feature_type in ['crypto_specific_features', 'price_volume_features']): df_clean = df.copy() for col in available_features: if col in self.outlier_bounds: lower_bound, upper_bound = self.outlier_bounds[col] df_clean[col] = df_clean[col].clip(lower_bound, upper_bound) else: df_clean = df.copy() # Comprehensive data cleaning for transform # Replace inf/-inf with NaN df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], np.nan) # Fill NaN with appropriate strategy based on feature type if feature_type in ['crypto_specific_features', 'price_volume_features']: # For price/volume data, use forward fill then median from training for col in available_features: df_clean[col] = df_clean[col].fillna(method='ffill').fillna(method='bfill').fillna(0) elif feature_type in ['performance_features', 'rank_diff_features']: # Performance and rank diffs can be 0 when no change df_clean[available_features] = df_clean[available_features].fillna(0) elif feature_type == 'quality_features': # Quality features should default to reasonable values df_clean[available_features] = df_clean[available_features].fillna(0.5) else: # General strategy: 0 (since we don't have training medians in transform) df_clean[available_features] = df_clean[available_features].fillna(0) # Ensure no infinite values remain df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], 0) # Transform the data scaled_data = self.scalers[feature_type].transform(df_clean[available_features]) # Add scaled features with descriptive names scaler_name = type(self.scalers[feature_type]).__name__.lower().replace('scaler', '').replace('transformer', '') for i, col in enumerate(available_features): normalized_df[f'{col}_{scaler_name}_scaled'] = scaled_data[:, i] except Exception as e: print(f"Warning: Could not transform {feature_type}: {e}") # If transformation fails, add original features with minimal processing for col in available_features: if col in df.columns: clean_col = df[col].replace([np.inf, -np.inf], np.nan).fillna(0) normalized_df[f'{col}_raw'] = clean_col # 6. Crypto-specific feature engineering normalized_df = self._engineer_crypto_features(df, normalized_df) # 7. Final comprehensive cleanup of any remaining issues # Replace any infinite values that might have been created normalized_df = normalized_df.replace([np.inf, -np.inf], np.nan) # Fill remaining NaN values with appropriate defaults for col in normalized_df.columns: if normalized_df[col].isna().any(): if col == 'symbol': continue # Don't fill symbol elif 'sentiment' in col.lower(): normalized_df[col] = normalized_df[col].fillna(0.5) # Neutral sentiment elif 'ratio' in col.lower() or 'momentum' in col.lower(): normalized_df[col] = normalized_df[col].fillna(0) # No change/neutral elif 'hour' in col or 'day_of_week' in col or 'month' in col or 'quarter' in col: normalized_df[col] = normalized_df[col].fillna(0) # Time features elif col.endswith('_encoded'): normalized_df[col] = normalized_df[col].fillna(0) # Encoded categories else: normalized_df[col] = normalized_df[col].fillna(0) # General fallback # Final validation - ensure no NaN or infinite values remain try: assert not normalized_df.isnull().any().any(), "Still contains NaN values after cleanup" assert not np.isinf(normalized_df.select_dtypes(include=[np.number])).any().any(), "Still contains infinite values after cleanup" except AssertionError as e: print(f"Warning: {e}") # Emergency cleanup normalized_df = normalized_df.fillna(0).replace([np.inf, -np.inf], 0) return normalized_df def fit_transform(self, data): """Fit and transform in one step""" return self.fit(data).transform(data) def get_feature_importance_info(self): """Return information about feature categories for model interpretation""" return { 'feature_categories': self.feature_info, 'scalers_used': {k: type(v).__name__ for k, v in self.scalers.items()}, 'total_features': sum(len(features) for features in self.feature_info.values() if isinstance(features, list)) } def save(self, filepath): """Save the fitted normalizer""" with open(filepath, 'wb') as f: pickle.dump({ 'scalers': self.scalers, 'encoders': self.encoders, 'feature_info': self.feature_info, 'is_fitted': self.is_fitted, 'preserve_symbol': self.preserve_symbol, 'handle_outliers': self.handle_outliers, 'feature_engineering': self.feature_engineering, 'outlier_bounds': self.outlier_bounds }, f) def load(self, filepath): """Load a fitted normalizer""" with open(filepath, 'rb') as f: data = pickle.load(f) self.scalers = data['scalers'] self.encoders = data['encoders'] self.feature_info = data['feature_info'] self.is_fitted = data['is_fitted'] self.preserve_symbol = data.get('preserve_symbol', True) self.handle_outliers = data.get('handle_outliers', True) self.feature_engineering = data.get('feature_engineering', True) self.outlier_bounds = data.get('outlier_bounds', {}) return self def normalize_crypto_data_file(input_file, output_file, save_normalizer=True, **kwargs): """ Enhanced normalization function for crypto data """ # Load data if input_file.endswith('.parquet'): df = pd.read_parquet(input_file) print(f"Loaded {len(df)} records with {len(df.columns)} features from parquet") else: data = [] with open(input_file, 'r') as f: for line in f: data.append(json.loads(line.strip())) df = pd.DataFrame(data) print(f"Loaded {len(df)} records with {len(df.columns)} features from jsonl") # Initialize crypto normalizer normalizer = CryptoDataNormalizer(**kwargs) # Show feature categorization feature_info = normalizer._categorize_features(df) print("\nCrypto Feature Categorization:") for category, features in feature_info.items(): if features: print(f" {category}: {len(features)} features") # Fit and transform normalized_df = normalizer.fit_transform(df) print(f"\nNormalized to {len(normalized_df.columns)} features") print(f"Data shape: {normalized_df.shape}") # Show feature importance info importance_info = normalizer.get_feature_importance_info() print(f"\nScalers used: {importance_info['scalers_used']}") # Ensure output directory exists import os output_dir = os.path.dirname(output_file) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) # Save normalized data as pickle instead of CSV pkl_output_file = output_file.replace('.csv', '.pkl') normalized_df.to_pickle(pkl_output_file) print(f"Saved normalized data to {pkl_output_file}") # Save normalizer if save_normalizer: normalizer_file = output_file.replace('.csv', '_crypto_normalizer.pkl') normalizer.save(normalizer_file) print(f"Saved normalizer to {normalizer_file}") return normalized_df, normalizer # CLI function import argparse def main(): parser = argparse.ArgumentParser( description="Enhanced normalization for cryptocurrency features with crypto-specific handling" ) parser.add_argument('input', nargs='?', default='data/merged/features/crypto_features.parquet', help='Input file (.parquet or .jsonl)') parser.add_argument('output', nargs='?', default='data/merged/features/norm/crypto_features_normalized.pkl', help='Output PKL file for normalized features') parser.add_argument('--no-save-normalizer', action='store_true', help='Do not save the normalizer pickle') parser.add_argument('--no-preserve-symbol', action='store_true', help='Do not preserve symbol column') parser.add_argument('--no-handle-outliers', action='store_true', help='Do not handle outliers') parser.add_argument('--no-feature-engineering', action='store_true', help='Do not create engineered features') parser.add_argument('--train', action='store_true', help='Normalize the train file and save under train/norm/') args = parser.parse_args() kwargs = { 'preserve_symbol': not args.no_preserve_symbol, 'handle_outliers': not args.no_handle_outliers, 'feature_engineering': not args.no_feature_engineering } if args.train: train_input = 'data/merged/train/crypto_features_train.parquet' train_norm_dir = 'data/merged/train/norm' os.makedirs(train_norm_dir, exist_ok=True) train_output = os.path.join(train_norm_dir, 'crypto_features_train_normalized.pkl') print(f"[INFO] Normalizing train file: {train_input} -> {train_output}") normalize_crypto_data_file( train_input, train_output, save_normalizer=not args.no_save_normalizer, **kwargs ) else: print(f"[INFO] Enhanced crypto normalizing: {args.input} -> {args.output}") print(f"[INFO] Options: {kwargs}") normalize_crypto_data_file( args.input, args.output, save_normalizer=not args.no_save_normalizer, **kwargs ) if __name__ == "__main__": main()