Maaroufabousaleh
f
c49b21b
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder, PowerTransformer
import json
import pickle
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
class ImprovedStockDataNormalizer:
"""
Enhanced normalization pipeline for stock features data with better feature handling
"""
def __init__(self, preserve_symbol=True, handle_outliers=True, feature_engineering=True):
self.scalers = {}
self.encoders = {}
self.feature_info = {}
self.is_fitted = False
self.preserve_symbol = preserve_symbol
self.handle_outliers = handle_outliers
self.feature_engineering = feature_engineering
self.outlier_bounds = {}
def _detect_outliers(self, df, column):
"""Detect outliers using IQR method"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return lower_bound, upper_bound
def _handle_outliers(self, df, column, method='clip'):
"""Handle outliers in numerical data"""
if column not in self.outlier_bounds:
lower_bound, upper_bound = self._detect_outliers(df, column)
self.outlier_bounds[column] = (lower_bound, upper_bound)
else:
lower_bound, upper_bound = self.outlier_bounds[column]
if method == 'clip':
return df[column].clip(lower_bound, upper_bound)
elif method == 'remove':
return df[column].where((df[column] >= lower_bound) & (df[column] <= upper_bound))
return df[column]
def _categorize_features(self, df):
"""Enhanced feature categorization with better detection"""
# Core identification features
id_features = ['symbol', 'backup_id', '__index_level_0__']
# Timestamp features
timestamp_features = [col for col in df.columns if 'timestamp' in col.lower()]
# Binary features (0/1, True/False, or boolean-like)
binary_features = []
for col in df.columns:
if col not in id_features + timestamp_features:
# Skip columns with array-like values (unhashable)
try:
vals = df[col].dropna().unique()
# If any value is a list/array, skip this column
if any(isinstance(v, (list, np.ndarray)) for v in vals):
continue
unique_vals = set(vals)
except TypeError:
continue
if (df[col].dtype == bool or
(len(unique_vals) <= 2 and unique_vals.issubset({0, 1, True, False, np.nan})) or
col.startswith('is_')):
binary_features.append(col)
# Categorical features (strings, objects, or low cardinality integers)
categorical_features = []
for col in df.columns:
if (col not in id_features + binary_features + timestamp_features and
(df[col].dtype == 'object' or
df[col].dtype.name == 'category' or
(df[col].nunique() < 20 and df[col].dtype in ['int64', 'int32']))):
categorical_features.append(col)
# Price/volume features (need special handling)
price_volume_features = []
for col in df.columns:
if any(keyword in col.lower() for keyword in ['price', 'volume', 'vwap', 'market', 'cap']):
if col not in id_features + timestamp_features + binary_features + categorical_features:
price_volume_features.append(col)
# Technical indicator features
technical_features = []
tech_keywords = ['rsi', 'macd', 'ema', 'sma', 'bb_', 'cci', 'mfi', 'atr', 'stoch', 'roc']
for col in df.columns:
if any(keyword in col.lower() for keyword in tech_keywords):
if col not in id_features + timestamp_features + binary_features + categorical_features + price_volume_features:
technical_features.append(col)
# News/sentiment features
news_features = []
for col in df.columns:
if any(keyword in col.lower() for keyword in ['news', 'sentiment', 'pos', 'neg', 'neu']):
if col not in id_features + timestamp_features + binary_features + categorical_features + price_volume_features + technical_features:
news_features.append(col)
# Count/ratio features
count_features = []
for col in df.columns:
if any(keyword in col.lower() for keyword in ['count', 'size', 'ratio', 'change']):
if col not in id_features + timestamp_features + binary_features + categorical_features + price_volume_features + technical_features + news_features:
count_features.append(col)
# Remaining numerical features
numerical_features = []
all_categorized = (id_features + timestamp_features + binary_features +
categorical_features + price_volume_features +
technical_features + news_features + count_features)
for col in df.columns:
if (col not in all_categorized and
pd.api.types.is_numeric_dtype(df[col])):
numerical_features.append(col)
return {
'id_features': id_features,
'timestamp_features': timestamp_features,
'binary_features': binary_features,
'categorical_features': categorical_features,
'price_volume_features': price_volume_features,
'technical_features': technical_features,
'news_features': news_features,
'count_features': count_features,
'numerical_features': numerical_features
}
def _engineer_features(self, df, normalized_df):
"""Create additional engineered features"""
if not self.feature_engineering:
return normalized_df
# Price momentum features
if 'close' in df.columns and 'prev_close' in df.columns:
close = df['close'].replace([np.inf, -np.inf], np.nan)
prev_close = df['prev_close'].replace([np.inf, -np.inf], np.nan)
valid_mask = (prev_close > 0) & prev_close.notna() & close.notna()
if valid_mask.any():
momentum = (close - prev_close) / prev_close
normalized_df['price_momentum'] = momentum.fillna(0)
# Volume-price relationship
if 'volume' in df.columns and 'close' in df.columns:
volume = df['volume'].replace([np.inf, -np.inf], np.nan)
close = df['close'].replace([np.inf, -np.inf], np.nan)
valid_mask = (close > 0) & close.notna() & volume.notna()
if valid_mask.any():
ratio = volume / close
normalized_df['volume_price_ratio'] = ratio.fillna(0)
# Volatility features
if 'high' in df.columns and 'low' in df.columns and 'close' in df.columns:
high = df['high'].replace([np.inf, -np.inf], np.nan)
low = df['low'].replace([np.inf, -np.inf], np.nan)
close = df['close'].replace([np.inf, -np.inf], np.nan)
valid_mask = (close > 0) & close.notna() & high.notna() & low.notna()
if valid_mask.any():
daily_range = (high - low) / close
normalized_df['daily_range'] = daily_range.fillna(0)
# News sentiment aggregation
sentiment_cols = [col for col in df.columns if 'sentiment' in col.lower() and 'mean' in col.lower()]
if sentiment_cols:
sentiment_data = df[sentiment_cols].replace([np.inf, -np.inf], np.nan)
normalized_df['avg_sentiment'] = sentiment_data.mean(axis=1).fillna(0.5) # Neutral sentiment
# Technical indicator strength
tech_cols = [col for col in df.columns if any(tech in col.lower() for tech in ['rsi', 'macd', 'cci'])]
if tech_cols:
tech_data = df[tech_cols].replace([np.inf, -np.inf], np.nan)
normalized_df['technical_strength'] = tech_data.mean(axis=1).fillna(0)
return normalized_df
def fit(self, df):
"""Fit the normalizer on training data with enhanced preprocessing"""
if isinstance(df, dict):
df = pd.DataFrame([df])
self.feature_info = self._categorize_features(df)
# Fit scalers for different feature types
feature_types = ['price_volume_features', 'technical_features', 'news_features',
'count_features', 'numerical_features']
for feature_type in feature_types:
features = self.feature_info[feature_type]
if features:
# Filter existing columns
existing_features = [col for col in features if col in df.columns]
if existing_features:
# Choose appropriate scaler based on feature type
if feature_type == 'price_volume_features':
scaler = RobustScaler() # Robust to outliers
elif feature_type == 'technical_features':
scaler = StandardScaler() # Most technical indicators are already normalized
elif feature_type in ['count_features', 'numerical_features']:
scaler = PowerTransformer(method='yeo-johnson') # Handle skewed distributions
else:
scaler = StandardScaler()
try:
# Handle outliers if enabled
if self.handle_outliers:
df_clean = df.copy()
for col in existing_features:
df_clean[col] = self._handle_outliers(df_clean, col)
else:
df_clean = df.copy()
# Comprehensive data cleaning for fitting
# Replace inf/-inf with NaN
df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], np.nan)
# Fill NaN with appropriate strategy based on feature type
if feature_type == 'price_volume_features':
# For price/volume data, use forward fill then median
for col in existing_features:
df_clean[col] = df_clean[col].fillna(method='ffill').fillna(df_clean[col].median()).fillna(0)
elif feature_type == 'technical_features':
# Technical indicators: use median for each column
for col in existing_features:
median_val = df_clean[col].median()
df_clean[col] = df_clean[col].fillna(median_val if not pd.isna(median_val) else 0)
elif feature_type == 'news_features':
# News features: neutral values
for col in existing_features:
if 'sentiment' in col.lower():
df_clean[col] = df_clean[col].fillna(0.5) # Neutral sentiment
elif 'count' in col.lower():
df_clean[col] = df_clean[col].fillna(0) # No news
else:
df_clean[col] = df_clean[col].fillna(df_clean[col].median()).fillna(0)
else:
# General strategy: median then 0
for col in existing_features:
df_clean[col] = df_clean[col].fillna(df_clean[col].median()).fillna(0)
# Ensure no infinite values remain
df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], 0)
# Fit the scaler
scaler.fit(df_clean[existing_features])
self.scalers[feature_type] = scaler
self.feature_info[f'{feature_type}_existing'] = existing_features
except Exception as e:
print(f"Warning: Could not fit scaler for {feature_type}: {e}")
# Skip this feature type if fitting fails
continue
# Fit encoders for categorical features
for col in self.feature_info['categorical_features']:
if col in df.columns:
self.encoders[col] = LabelEncoder()
self.encoders[col].fit(df[col].astype(str).fillna('unknown'))
self.is_fitted = True
return self
def transform(self, data):
"""Transform data using fitted normalizers with enhanced feature handling"""
if not self.is_fitted:
raise ValueError("Normalizer must be fitted before transform")
if isinstance(data, dict):
df = pd.DataFrame([data])
else:
df = data.copy()
normalized_df = pd.DataFrame(index=df.index)
# 1. Preserve symbol if requested
if self.preserve_symbol and 'symbol' in df.columns:
normalized_df['symbol'] = df['symbol']
# 2. Enhanced timestamp features
for col in self.feature_info['timestamp_features']:
if col in df.columns:
ts = pd.to_datetime(df[col], unit='ms', errors='coerce')
# More comprehensive time features
normalized_df[f'{col}_hour'] = ts.dt.hour / 23.0
normalized_df[f'{col}_day_of_week'] = ts.dt.dayofweek / 6.0
normalized_df[f'{col}_month'] = (ts.dt.month - 1) / 11.0
normalized_df[f'{col}_quarter'] = (ts.dt.quarter - 1) / 3.0
normalized_df[f'{col}_is_weekend'] = (ts.dt.dayofweek >= 5).astype(int)
normalized_df[f'{col}_is_market_hours'] = ((ts.dt.hour >= 9) & (ts.dt.hour <= 16) & (ts.dt.dayofweek < 5)).astype(int)
# 3. Binary features (keep as is, fill NaN with 0)
for col in self.feature_info['binary_features']:
if col in df.columns:
normalized_df[col] = df[col].fillna(0).astype(int)
# 4. Categorical features with better encoding
for col in self.feature_info['categorical_features']:
if col in df.columns and col in self.encoders:
try:
# Handle unknown categories
values = df[col].astype(str).fillna('unknown')
encoded_values = []
for val in values:
try:
encoded_values.append(self.encoders[col].transform([val])[0])
except ValueError:
# Unknown category, assign most frequent class
encoded_values.append(0)
normalized_df[f'{col}_encoded'] = encoded_values
except Exception:
normalized_df[f'{col}_encoded'] = 0
# 5. Scale different feature types with appropriate scalers
feature_types = ['price_volume_features', 'technical_features', 'news_features',
'count_features', 'numerical_features']
for feature_type in feature_types:
if feature_type in self.scalers:
existing_features = self.feature_info.get(f'{feature_type}_existing', [])
available_features = [col for col in existing_features if col in df.columns]
if available_features:
try:
# Handle outliers if enabled
if self.handle_outliers:
df_clean = df.copy()
for col in available_features:
if col in self.outlier_bounds:
lower_bound, upper_bound = self.outlier_bounds[col]
df_clean[col] = df_clean[col].clip(lower_bound, upper_bound)
else:
df_clean = df.copy()
# Comprehensive data cleaning for transform
# Replace inf/-inf with NaN
df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], np.nan)
# Fill NaN with appropriate strategy based on feature type
if feature_type == 'price_volume_features':
# For price/volume data, use forward fill then back fill
for col in available_features:
df_clean[col] = df_clean[col].fillna(method='ffill').fillna(method='bfill').fillna(0)
elif feature_type == 'technical_features':
# Technical indicators: use neutral values
for col in available_features:
if 'rsi' in col.lower():
df_clean[col] = df_clean[col].fillna(50) # Neutral RSI
elif any(indicator in col.lower() for indicator in ['macd', 'cci']):
df_clean[col] = df_clean[col].fillna(0) # Neutral MACD/CCI
else:
df_clean[col] = df_clean[col].fillna(0)
elif feature_type == 'news_features':
# News features: neutral values
for col in available_features:
if 'sentiment' in col.lower():
df_clean[col] = df_clean[col].fillna(0.5) # Neutral sentiment
elif 'count' in col.lower():
df_clean[col] = df_clean[col].fillna(0) # No news
else:
df_clean[col] = df_clean[col].fillna(0)
else:
# General strategy: 0 (since we don't have training medians in transform)
df_clean[available_features] = df_clean[available_features].fillna(0)
# Ensure no infinite values remain
df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], 0)
# Transform the data
scaled_data = self.scalers[feature_type].transform(df_clean[available_features])
# Add scaled features with descriptive names
scaler_name = type(self.scalers[feature_type]).__name__.lower().replace('scaler', '').replace('transformer', '')
for i, col in enumerate(available_features):
normalized_df[f'{col}_{scaler_name}_scaled'] = scaled_data[:, i]
except Exception as e:
print(f"Warning: Could not transform {feature_type}: {e}")
# If transformation fails, add original features with minimal processing
for col in available_features:
if col in df.columns:
clean_col = df[col].replace([np.inf, -np.inf], np.nan).fillna(0)
normalized_df[f'{col}_raw'] = clean_col
# 6. Feature engineering
normalized_df = self._engineer_features(df, normalized_df)
# 7. Final comprehensive cleanup of any remaining issues
# Replace any infinite values that might have been created
normalized_df = normalized_df.replace([np.inf, -np.inf], np.nan)
# Fill remaining NaN values with appropriate defaults
for col in normalized_df.columns:
if normalized_df[col].isna().any():
if col == 'symbol':
continue # Don't fill symbol
elif 'sentiment' in col.lower():
normalized_df[col] = normalized_df[col].fillna(0.5) # Neutral sentiment
elif 'ratio' in col.lower() or 'momentum' in col.lower():
normalized_df[col] = normalized_df[col].fillna(0) # No change/neutral
elif 'hour' in col or 'day_of_week' in col or 'month' in col or 'quarter' in col:
normalized_df[col] = normalized_df[col].fillna(0) # Time features
elif col.endswith('_encoded'):
normalized_df[col] = normalized_df[col].fillna(0) # Encoded categories
else:
normalized_df[col] = normalized_df[col].fillna(0) # General fallback
# Final validation - ensure no NaN or infinite values remain
try:
assert not normalized_df.isnull().any().any(), "Still contains NaN values after cleanup"
assert not np.isinf(normalized_df.select_dtypes(include=[np.number])).any().any(), "Still contains infinite values after cleanup"
except AssertionError as e:
print(f"Warning: {e}")
# Emergency cleanup
normalized_df = normalized_df.fillna(0).replace([np.inf, -np.inf], 0)
return normalized_df
def fit_transform(self, data):
"""Fit and transform in one step"""
return self.fit(data).transform(data)
def get_feature_importance_info(self):
"""Return information about feature categories for model interpretation"""
return {
'feature_categories': self.feature_info,
'scalers_used': {k: type(v).__name__ for k, v in self.scalers.items()},
'total_features': sum(len(features) for features in self.feature_info.values() if isinstance(features, list))
}
def save(self, filepath):
"""Save the fitted normalizer"""
with open(filepath, 'wb') as f:
pickle.dump({
'scalers': self.scalers,
'encoders': self.encoders,
'feature_info': self.feature_info,
'is_fitted': self.is_fitted,
'preserve_symbol': self.preserve_symbol,
'handle_outliers': self.handle_outliers,
'feature_engineering': self.feature_engineering,
'outlier_bounds': self.outlier_bounds
}, f)
def load(self, filepath):
"""Load a fitted normalizer"""
with open(filepath, 'rb') as f:
data = pickle.load(f)
self.scalers = data['scalers']
self.encoders = data['encoders']
self.feature_info = data['feature_info']
self.is_fitted = data['is_fitted']
self.preserve_symbol = data.get('preserve_symbol', True)
self.handle_outliers = data.get('handle_outliers', True)
self.feature_engineering = data.get('feature_engineering', True)
self.outlier_bounds = data.get('outlier_bounds', {})
return self
def cap_outliers(df, features=None, method='iqr', factor=1.5):
"""
Cap outliers in the DataFrame for the given features using the IQR method.
If features is None, all numeric columns are used.
"""
capped_df = df.copy()
if features is None:
features = capped_df.select_dtypes(include=[np.number]).columns
for col in features:
if col not in capped_df.columns:
continue
Q1 = capped_df[col].quantile(0.25)
Q3 = capped_df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - factor * IQR
upper = Q3 + factor * IQR
capped_df[col] = np.clip(capped_df[col], lower, upper)
print(f"Capped outliers in {col}: [{lower:.3g}, {upper:.3g}]")
return capped_df
# Example usage after normalization:
# normalized_df = cap_outliers(normalized_df, features=['price_momentum', 'volume_price_ratio', 'daily_range', 'technical_strength'])
# (You can call this function in your pipeline after normalization, before saving or modeling.)
def normalize_stock_data_file_improved(input_file, output_file, save_normalizer=True, **kwargs):
"""
Enhanced normalization function with better defaults
"""
# Load data
if input_file.endswith('.parquet'):
df = pd.read_parquet(input_file)
print(f"Loaded {len(df)} records with {len(df.columns)} features from parquet")
else:
data = []
with open(input_file, 'r') as f:
for line in f:
data.append(json.loads(line.strip()))
df = pd.DataFrame(data)
print(f"Loaded {len(df)} records with {len(df.columns)} features from jsonl")
# Initialize improved normalizer
normalizer = ImprovedStockDataNormalizer(**kwargs)
# Show feature categorization
feature_info = normalizer._categorize_features(df)
print("\nFeature Categorization:")
for category, features in feature_info.items():
if features:
print(f" {category}: {len(features)} features")
# Fit and transform
normalized_df = normalizer.fit_transform(df)
print(f"\nNormalized to {len(normalized_df.columns)} features")
print(f"Data shape: {normalized_df.shape}")
# Cap outliers in engineered features
engineered_features = ['price_momentum', 'volume_price_ratio', 'daily_range', 'technical_strength']
normalized_df = cap_outliers(normalized_df, features=[f for f in engineered_features if f in normalized_df.columns])
# Show feature importance info
importance_info = normalizer.get_feature_importance_info()
print(f"\nScalers used: {importance_info['scalers_used']}")
# Ensure output directory exists
import os
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
# Save normalized data as pickle
pkl_output_file = output_file.replace('.csv', '.pkl')
normalized_df.to_pickle(pkl_output_file)
print(f"Saved normalized data to {pkl_output_file}")
# Save normalizer
if save_normalizer:
normalizer_file = pkl_output_file.replace('.pkl', '_improved_normalizer.pkl')
normalizer.save(normalizer_file)
print(f"Saved normalizer to {normalizer_file}")
return normalized_df, normalizer
# CLI function
import argparse
def main():
parser = argparse.ArgumentParser(
description="Enhanced normalization for stock/crypto features with better handling of different feature types"
)
parser.add_argument('input', nargs='?', default='data/merged/features/stocks_features.parquet',
help='Input file (.parquet or .jsonl)')
parser.add_argument('output', nargs='?', default='data/merged/features/norm/stocks_features_improved_normalized.pkl',
help='Output pickle file for normalized features')
parser.add_argument('--no-save-normalizer', action='store_true',
help='Do not save the normalizer pickle')
parser.add_argument('--no-preserve-symbol', action='store_true',
help='Do not preserve symbol column')
parser.add_argument('--no-handle-outliers', action='store_true',
help='Do not handle outliers')
parser.add_argument('--no-feature-engineering', action='store_true',
help='Do not create engineered features')
parser.add_argument('--train', action='store_true',
help='Normalize the train file and save under train/norm/')
args = parser.parse_args()
kwargs = {
'preserve_symbol': not args.no_preserve_symbol,
'handle_outliers': not args.no_handle_outliers,
'feature_engineering': not args.no_feature_engineering
}
if args.train:
train_input = 'data/merged/train/stocks_features_train.parquet'
train_norm_dir = 'data/merged/train/norm'
import os
os.makedirs(train_norm_dir, exist_ok=True)
train_output = os.path.join(train_norm_dir, 'stocks_features_train_normalized.pkl')
print(f"[INFO] Normalizing train file: {train_input} -> {train_output}")
normalize_stock_data_file_improved(
train_input,
train_output,
save_normalizer=not args.no_save_normalizer,
**kwargs
)
else:
print(f"[INFO] Enhanced normalizing: {args.input} -> {args.output}")
print(f"[INFO] Options: {kwargs}")
normalize_stock_data_file_improved(
args.input,
args.output,
save_normalizer=not args.no_save_normalizer,
**kwargs
)
if __name__ == "__main__":
main()