|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder, PowerTransformer |
|
|
import json |
|
|
import pickle |
|
|
from datetime import datetime |
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
class ImprovedStockDataNormalizer: |
|
|
""" |
|
|
Enhanced normalization pipeline for stock features data with better feature handling |
|
|
""" |
|
|
|
|
|
def __init__(self, preserve_symbol=True, handle_outliers=True, feature_engineering=True): |
|
|
self.scalers = {} |
|
|
self.encoders = {} |
|
|
self.feature_info = {} |
|
|
self.is_fitted = False |
|
|
self.preserve_symbol = preserve_symbol |
|
|
self.handle_outliers = handle_outliers |
|
|
self.feature_engineering = feature_engineering |
|
|
self.outlier_bounds = {} |
|
|
|
|
|
def _detect_outliers(self, df, column): |
|
|
"""Detect outliers using IQR method""" |
|
|
Q1 = df[column].quantile(0.25) |
|
|
Q3 = df[column].quantile(0.75) |
|
|
IQR = Q3 - Q1 |
|
|
lower_bound = Q1 - 1.5 * IQR |
|
|
upper_bound = Q3 + 1.5 * IQR |
|
|
return lower_bound, upper_bound |
|
|
|
|
|
def _handle_outliers(self, df, column, method='clip'): |
|
|
"""Handle outliers in numerical data""" |
|
|
if column not in self.outlier_bounds: |
|
|
lower_bound, upper_bound = self._detect_outliers(df, column) |
|
|
self.outlier_bounds[column] = (lower_bound, upper_bound) |
|
|
else: |
|
|
lower_bound, upper_bound = self.outlier_bounds[column] |
|
|
|
|
|
if method == 'clip': |
|
|
return df[column].clip(lower_bound, upper_bound) |
|
|
elif method == 'remove': |
|
|
return df[column].where((df[column] >= lower_bound) & (df[column] <= upper_bound)) |
|
|
return df[column] |
|
|
|
|
|
def _categorize_features(self, df): |
|
|
"""Enhanced feature categorization with better detection""" |
|
|
|
|
|
id_features = ['symbol', 'backup_id', '__index_level_0__'] |
|
|
|
|
|
|
|
|
timestamp_features = [col for col in df.columns if 'timestamp' in col.lower()] |
|
|
|
|
|
|
|
|
binary_features = [] |
|
|
for col in df.columns: |
|
|
if col not in id_features + timestamp_features: |
|
|
|
|
|
try: |
|
|
vals = df[col].dropna().unique() |
|
|
|
|
|
if any(isinstance(v, (list, np.ndarray)) for v in vals): |
|
|
continue |
|
|
unique_vals = set(vals) |
|
|
except TypeError: |
|
|
continue |
|
|
if (df[col].dtype == bool or |
|
|
(len(unique_vals) <= 2 and unique_vals.issubset({0, 1, True, False, np.nan})) or |
|
|
col.startswith('is_')): |
|
|
binary_features.append(col) |
|
|
|
|
|
|
|
|
categorical_features = [] |
|
|
for col in df.columns: |
|
|
if (col not in id_features + binary_features + timestamp_features and |
|
|
(df[col].dtype == 'object' or |
|
|
df[col].dtype.name == 'category' or |
|
|
(df[col].nunique() < 20 and df[col].dtype in ['int64', 'int32']))): |
|
|
categorical_features.append(col) |
|
|
|
|
|
|
|
|
price_volume_features = [] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in ['price', 'volume', 'vwap', 'market', 'cap']): |
|
|
if col not in id_features + timestamp_features + binary_features + categorical_features: |
|
|
price_volume_features.append(col) |
|
|
|
|
|
|
|
|
technical_features = [] |
|
|
tech_keywords = ['rsi', 'macd', 'ema', 'sma', 'bb_', 'cci', 'mfi', 'atr', 'stoch', 'roc'] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in tech_keywords): |
|
|
if col not in id_features + timestamp_features + binary_features + categorical_features + price_volume_features: |
|
|
technical_features.append(col) |
|
|
|
|
|
|
|
|
news_features = [] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in ['news', 'sentiment', 'pos', 'neg', 'neu']): |
|
|
if col not in id_features + timestamp_features + binary_features + categorical_features + price_volume_features + technical_features: |
|
|
news_features.append(col) |
|
|
|
|
|
|
|
|
count_features = [] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in ['count', 'size', 'ratio', 'change']): |
|
|
if col not in id_features + timestamp_features + binary_features + categorical_features + price_volume_features + technical_features + news_features: |
|
|
count_features.append(col) |
|
|
|
|
|
|
|
|
numerical_features = [] |
|
|
all_categorized = (id_features + timestamp_features + binary_features + |
|
|
categorical_features + price_volume_features + |
|
|
technical_features + news_features + count_features) |
|
|
|
|
|
for col in df.columns: |
|
|
if (col not in all_categorized and |
|
|
pd.api.types.is_numeric_dtype(df[col])): |
|
|
numerical_features.append(col) |
|
|
|
|
|
return { |
|
|
'id_features': id_features, |
|
|
'timestamp_features': timestamp_features, |
|
|
'binary_features': binary_features, |
|
|
'categorical_features': categorical_features, |
|
|
'price_volume_features': price_volume_features, |
|
|
'technical_features': technical_features, |
|
|
'news_features': news_features, |
|
|
'count_features': count_features, |
|
|
'numerical_features': numerical_features |
|
|
} |
|
|
|
|
|
def _engineer_features(self, df, normalized_df): |
|
|
"""Create additional engineered features""" |
|
|
if not self.feature_engineering: |
|
|
return normalized_df |
|
|
|
|
|
|
|
|
if 'close' in df.columns and 'prev_close' in df.columns: |
|
|
close = df['close'].replace([np.inf, -np.inf], np.nan) |
|
|
prev_close = df['prev_close'].replace([np.inf, -np.inf], np.nan) |
|
|
valid_mask = (prev_close > 0) & prev_close.notna() & close.notna() |
|
|
if valid_mask.any(): |
|
|
momentum = (close - prev_close) / prev_close |
|
|
normalized_df['price_momentum'] = momentum.fillna(0) |
|
|
|
|
|
|
|
|
if 'volume' in df.columns and 'close' in df.columns: |
|
|
volume = df['volume'].replace([np.inf, -np.inf], np.nan) |
|
|
close = df['close'].replace([np.inf, -np.inf], np.nan) |
|
|
valid_mask = (close > 0) & close.notna() & volume.notna() |
|
|
if valid_mask.any(): |
|
|
ratio = volume / close |
|
|
normalized_df['volume_price_ratio'] = ratio.fillna(0) |
|
|
|
|
|
|
|
|
if 'high' in df.columns and 'low' in df.columns and 'close' in df.columns: |
|
|
high = df['high'].replace([np.inf, -np.inf], np.nan) |
|
|
low = df['low'].replace([np.inf, -np.inf], np.nan) |
|
|
close = df['close'].replace([np.inf, -np.inf], np.nan) |
|
|
valid_mask = (close > 0) & close.notna() & high.notna() & low.notna() |
|
|
if valid_mask.any(): |
|
|
daily_range = (high - low) / close |
|
|
normalized_df['daily_range'] = daily_range.fillna(0) |
|
|
|
|
|
|
|
|
sentiment_cols = [col for col in df.columns if 'sentiment' in col.lower() and 'mean' in col.lower()] |
|
|
if sentiment_cols: |
|
|
sentiment_data = df[sentiment_cols].replace([np.inf, -np.inf], np.nan) |
|
|
normalized_df['avg_sentiment'] = sentiment_data.mean(axis=1).fillna(0.5) |
|
|
|
|
|
|
|
|
tech_cols = [col for col in df.columns if any(tech in col.lower() for tech in ['rsi', 'macd', 'cci'])] |
|
|
if tech_cols: |
|
|
tech_data = df[tech_cols].replace([np.inf, -np.inf], np.nan) |
|
|
normalized_df['technical_strength'] = tech_data.mean(axis=1).fillna(0) |
|
|
|
|
|
return normalized_df |
|
|
|
|
|
def fit(self, df): |
|
|
"""Fit the normalizer on training data with enhanced preprocessing""" |
|
|
if isinstance(df, dict): |
|
|
df = pd.DataFrame([df]) |
|
|
|
|
|
self.feature_info = self._categorize_features(df) |
|
|
|
|
|
|
|
|
feature_types = ['price_volume_features', 'technical_features', 'news_features', |
|
|
'count_features', 'numerical_features'] |
|
|
|
|
|
for feature_type in feature_types: |
|
|
features = self.feature_info[feature_type] |
|
|
if features: |
|
|
|
|
|
existing_features = [col for col in features if col in df.columns] |
|
|
if existing_features: |
|
|
|
|
|
if feature_type == 'price_volume_features': |
|
|
scaler = RobustScaler() |
|
|
elif feature_type == 'technical_features': |
|
|
scaler = StandardScaler() |
|
|
elif feature_type in ['count_features', 'numerical_features']: |
|
|
scaler = PowerTransformer(method='yeo-johnson') |
|
|
else: |
|
|
scaler = StandardScaler() |
|
|
|
|
|
try: |
|
|
|
|
|
if self.handle_outliers: |
|
|
df_clean = df.copy() |
|
|
for col in existing_features: |
|
|
df_clean[col] = self._handle_outliers(df_clean, col) |
|
|
else: |
|
|
df_clean = df.copy() |
|
|
|
|
|
|
|
|
|
|
|
df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], np.nan) |
|
|
|
|
|
|
|
|
if feature_type == 'price_volume_features': |
|
|
|
|
|
for col in existing_features: |
|
|
df_clean[col] = df_clean[col].fillna(method='ffill').fillna(df_clean[col].median()).fillna(0) |
|
|
elif feature_type == 'technical_features': |
|
|
|
|
|
for col in existing_features: |
|
|
median_val = df_clean[col].median() |
|
|
df_clean[col] = df_clean[col].fillna(median_val if not pd.isna(median_val) else 0) |
|
|
elif feature_type == 'news_features': |
|
|
|
|
|
for col in existing_features: |
|
|
if 'sentiment' in col.lower(): |
|
|
df_clean[col] = df_clean[col].fillna(0.5) |
|
|
elif 'count' in col.lower(): |
|
|
df_clean[col] = df_clean[col].fillna(0) |
|
|
else: |
|
|
df_clean[col] = df_clean[col].fillna(df_clean[col].median()).fillna(0) |
|
|
else: |
|
|
|
|
|
for col in existing_features: |
|
|
df_clean[col] = df_clean[col].fillna(df_clean[col].median()).fillna(0) |
|
|
|
|
|
|
|
|
df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], 0) |
|
|
|
|
|
|
|
|
scaler.fit(df_clean[existing_features]) |
|
|
self.scalers[feature_type] = scaler |
|
|
self.feature_info[f'{feature_type}_existing'] = existing_features |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Could not fit scaler for {feature_type}: {e}") |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
for col in self.feature_info['categorical_features']: |
|
|
if col in df.columns: |
|
|
self.encoders[col] = LabelEncoder() |
|
|
self.encoders[col].fit(df[col].astype(str).fillna('unknown')) |
|
|
|
|
|
self.is_fitted = True |
|
|
return self |
|
|
|
|
|
def transform(self, data): |
|
|
"""Transform data using fitted normalizers with enhanced feature handling""" |
|
|
if not self.is_fitted: |
|
|
raise ValueError("Normalizer must be fitted before transform") |
|
|
|
|
|
if isinstance(data, dict): |
|
|
df = pd.DataFrame([data]) |
|
|
else: |
|
|
df = data.copy() |
|
|
|
|
|
normalized_df = pd.DataFrame(index=df.index) |
|
|
|
|
|
|
|
|
if self.preserve_symbol and 'symbol' in df.columns: |
|
|
normalized_df['symbol'] = df['symbol'] |
|
|
|
|
|
|
|
|
for col in self.feature_info['timestamp_features']: |
|
|
if col in df.columns: |
|
|
ts = pd.to_datetime(df[col], unit='ms', errors='coerce') |
|
|
|
|
|
normalized_df[f'{col}_hour'] = ts.dt.hour / 23.0 |
|
|
normalized_df[f'{col}_day_of_week'] = ts.dt.dayofweek / 6.0 |
|
|
normalized_df[f'{col}_month'] = (ts.dt.month - 1) / 11.0 |
|
|
normalized_df[f'{col}_quarter'] = (ts.dt.quarter - 1) / 3.0 |
|
|
normalized_df[f'{col}_is_weekend'] = (ts.dt.dayofweek >= 5).astype(int) |
|
|
normalized_df[f'{col}_is_market_hours'] = ((ts.dt.hour >= 9) & (ts.dt.hour <= 16) & (ts.dt.dayofweek < 5)).astype(int) |
|
|
|
|
|
|
|
|
for col in self.feature_info['binary_features']: |
|
|
if col in df.columns: |
|
|
normalized_df[col] = df[col].fillna(0).astype(int) |
|
|
|
|
|
|
|
|
for col in self.feature_info['categorical_features']: |
|
|
if col in df.columns and col in self.encoders: |
|
|
try: |
|
|
|
|
|
values = df[col].astype(str).fillna('unknown') |
|
|
encoded_values = [] |
|
|
for val in values: |
|
|
try: |
|
|
encoded_values.append(self.encoders[col].transform([val])[0]) |
|
|
except ValueError: |
|
|
|
|
|
encoded_values.append(0) |
|
|
normalized_df[f'{col}_encoded'] = encoded_values |
|
|
except Exception: |
|
|
normalized_df[f'{col}_encoded'] = 0 |
|
|
|
|
|
|
|
|
feature_types = ['price_volume_features', 'technical_features', 'news_features', |
|
|
'count_features', 'numerical_features'] |
|
|
|
|
|
for feature_type in feature_types: |
|
|
if feature_type in self.scalers: |
|
|
existing_features = self.feature_info.get(f'{feature_type}_existing', []) |
|
|
available_features = [col for col in existing_features if col in df.columns] |
|
|
|
|
|
if available_features: |
|
|
try: |
|
|
|
|
|
if self.handle_outliers: |
|
|
df_clean = df.copy() |
|
|
for col in available_features: |
|
|
if col in self.outlier_bounds: |
|
|
lower_bound, upper_bound = self.outlier_bounds[col] |
|
|
df_clean[col] = df_clean[col].clip(lower_bound, upper_bound) |
|
|
else: |
|
|
df_clean = df.copy() |
|
|
|
|
|
|
|
|
|
|
|
df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], np.nan) |
|
|
|
|
|
|
|
|
if feature_type == 'price_volume_features': |
|
|
|
|
|
for col in available_features: |
|
|
df_clean[col] = df_clean[col].fillna(method='ffill').fillna(method='bfill').fillna(0) |
|
|
elif feature_type == 'technical_features': |
|
|
|
|
|
for col in available_features: |
|
|
if 'rsi' in col.lower(): |
|
|
df_clean[col] = df_clean[col].fillna(50) |
|
|
elif any(indicator in col.lower() for indicator in ['macd', 'cci']): |
|
|
df_clean[col] = df_clean[col].fillna(0) |
|
|
else: |
|
|
df_clean[col] = df_clean[col].fillna(0) |
|
|
elif feature_type == 'news_features': |
|
|
|
|
|
for col in available_features: |
|
|
if 'sentiment' in col.lower(): |
|
|
df_clean[col] = df_clean[col].fillna(0.5) |
|
|
elif 'count' in col.lower(): |
|
|
df_clean[col] = df_clean[col].fillna(0) |
|
|
else: |
|
|
df_clean[col] = df_clean[col].fillna(0) |
|
|
else: |
|
|
|
|
|
df_clean[available_features] = df_clean[available_features].fillna(0) |
|
|
|
|
|
|
|
|
df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], 0) |
|
|
|
|
|
|
|
|
scaled_data = self.scalers[feature_type].transform(df_clean[available_features]) |
|
|
|
|
|
|
|
|
scaler_name = type(self.scalers[feature_type]).__name__.lower().replace('scaler', '').replace('transformer', '') |
|
|
for i, col in enumerate(available_features): |
|
|
normalized_df[f'{col}_{scaler_name}_scaled'] = scaled_data[:, i] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Could not transform {feature_type}: {e}") |
|
|
|
|
|
for col in available_features: |
|
|
if col in df.columns: |
|
|
clean_col = df[col].replace([np.inf, -np.inf], np.nan).fillna(0) |
|
|
normalized_df[f'{col}_raw'] = clean_col |
|
|
|
|
|
|
|
|
normalized_df = self._engineer_features(df, normalized_df) |
|
|
|
|
|
|
|
|
|
|
|
normalized_df = normalized_df.replace([np.inf, -np.inf], np.nan) |
|
|
|
|
|
|
|
|
for col in normalized_df.columns: |
|
|
if normalized_df[col].isna().any(): |
|
|
if col == 'symbol': |
|
|
continue |
|
|
elif 'sentiment' in col.lower(): |
|
|
normalized_df[col] = normalized_df[col].fillna(0.5) |
|
|
elif 'ratio' in col.lower() or 'momentum' in col.lower(): |
|
|
normalized_df[col] = normalized_df[col].fillna(0) |
|
|
elif 'hour' in col or 'day_of_week' in col or 'month' in col or 'quarter' in col: |
|
|
normalized_df[col] = normalized_df[col].fillna(0) |
|
|
elif col.endswith('_encoded'): |
|
|
normalized_df[col] = normalized_df[col].fillna(0) |
|
|
else: |
|
|
normalized_df[col] = normalized_df[col].fillna(0) |
|
|
|
|
|
|
|
|
try: |
|
|
assert not normalized_df.isnull().any().any(), "Still contains NaN values after cleanup" |
|
|
assert not np.isinf(normalized_df.select_dtypes(include=[np.number])).any().any(), "Still contains infinite values after cleanup" |
|
|
except AssertionError as e: |
|
|
print(f"Warning: {e}") |
|
|
|
|
|
normalized_df = normalized_df.fillna(0).replace([np.inf, -np.inf], 0) |
|
|
|
|
|
return normalized_df |
|
|
|
|
|
def fit_transform(self, data): |
|
|
"""Fit and transform in one step""" |
|
|
return self.fit(data).transform(data) |
|
|
|
|
|
def get_feature_importance_info(self): |
|
|
"""Return information about feature categories for model interpretation""" |
|
|
return { |
|
|
'feature_categories': self.feature_info, |
|
|
'scalers_used': {k: type(v).__name__ for k, v in self.scalers.items()}, |
|
|
'total_features': sum(len(features) for features in self.feature_info.values() if isinstance(features, list)) |
|
|
} |
|
|
|
|
|
def save(self, filepath): |
|
|
"""Save the fitted normalizer""" |
|
|
with open(filepath, 'wb') as f: |
|
|
pickle.dump({ |
|
|
'scalers': self.scalers, |
|
|
'encoders': self.encoders, |
|
|
'feature_info': self.feature_info, |
|
|
'is_fitted': self.is_fitted, |
|
|
'preserve_symbol': self.preserve_symbol, |
|
|
'handle_outliers': self.handle_outliers, |
|
|
'feature_engineering': self.feature_engineering, |
|
|
'outlier_bounds': self.outlier_bounds |
|
|
}, f) |
|
|
|
|
|
def load(self, filepath): |
|
|
"""Load a fitted normalizer""" |
|
|
with open(filepath, 'rb') as f: |
|
|
data = pickle.load(f) |
|
|
self.scalers = data['scalers'] |
|
|
self.encoders = data['encoders'] |
|
|
self.feature_info = data['feature_info'] |
|
|
self.is_fitted = data['is_fitted'] |
|
|
self.preserve_symbol = data.get('preserve_symbol', True) |
|
|
self.handle_outliers = data.get('handle_outliers', True) |
|
|
self.feature_engineering = data.get('feature_engineering', True) |
|
|
self.outlier_bounds = data.get('outlier_bounds', {}) |
|
|
return self |
|
|
|
|
|
def cap_outliers(df, features=None, method='iqr', factor=1.5): |
|
|
""" |
|
|
Cap outliers in the DataFrame for the given features using the IQR method. |
|
|
If features is None, all numeric columns are used. |
|
|
""" |
|
|
capped_df = df.copy() |
|
|
if features is None: |
|
|
features = capped_df.select_dtypes(include=[np.number]).columns |
|
|
for col in features: |
|
|
if col not in capped_df.columns: |
|
|
continue |
|
|
Q1 = capped_df[col].quantile(0.25) |
|
|
Q3 = capped_df[col].quantile(0.75) |
|
|
IQR = Q3 - Q1 |
|
|
lower = Q1 - factor * IQR |
|
|
upper = Q3 + factor * IQR |
|
|
capped_df[col] = np.clip(capped_df[col], lower, upper) |
|
|
print(f"Capped outliers in {col}: [{lower:.3g}, {upper:.3g}]") |
|
|
return capped_df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_stock_data_file_improved(input_file, output_file, save_normalizer=True, **kwargs): |
|
|
""" |
|
|
Enhanced normalization function with better defaults |
|
|
""" |
|
|
|
|
|
if input_file.endswith('.parquet'): |
|
|
df = pd.read_parquet(input_file) |
|
|
print(f"Loaded {len(df)} records with {len(df.columns)} features from parquet") |
|
|
else: |
|
|
data = [] |
|
|
with open(input_file, 'r') as f: |
|
|
for line in f: |
|
|
data.append(json.loads(line.strip())) |
|
|
df = pd.DataFrame(data) |
|
|
print(f"Loaded {len(df)} records with {len(df.columns)} features from jsonl") |
|
|
|
|
|
|
|
|
normalizer = ImprovedStockDataNormalizer(**kwargs) |
|
|
|
|
|
|
|
|
feature_info = normalizer._categorize_features(df) |
|
|
print("\nFeature Categorization:") |
|
|
for category, features in feature_info.items(): |
|
|
if features: |
|
|
print(f" {category}: {len(features)} features") |
|
|
|
|
|
|
|
|
normalized_df = normalizer.fit_transform(df) |
|
|
|
|
|
print(f"\nNormalized to {len(normalized_df.columns)} features") |
|
|
print(f"Data shape: {normalized_df.shape}") |
|
|
|
|
|
|
|
|
engineered_features = ['price_momentum', 'volume_price_ratio', 'daily_range', 'technical_strength'] |
|
|
normalized_df = cap_outliers(normalized_df, features=[f for f in engineered_features if f in normalized_df.columns]) |
|
|
|
|
|
|
|
|
importance_info = normalizer.get_feature_importance_info() |
|
|
print(f"\nScalers used: {importance_info['scalers_used']}") |
|
|
|
|
|
|
|
|
import os |
|
|
output_dir = os.path.dirname(output_file) |
|
|
if output_dir and not os.path.exists(output_dir): |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
pkl_output_file = output_file.replace('.csv', '.pkl') |
|
|
normalized_df.to_pickle(pkl_output_file) |
|
|
print(f"Saved normalized data to {pkl_output_file}") |
|
|
|
|
|
|
|
|
if save_normalizer: |
|
|
normalizer_file = pkl_output_file.replace('.pkl', '_improved_normalizer.pkl') |
|
|
normalizer.save(normalizer_file) |
|
|
print(f"Saved normalizer to {normalizer_file}") |
|
|
|
|
|
return normalized_df, normalizer |
|
|
|
|
|
|
|
|
import argparse |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Enhanced normalization for stock/crypto features with better handling of different feature types" |
|
|
) |
|
|
parser.add_argument('input', nargs='?', default='data/merged/features/stocks_features.parquet', |
|
|
help='Input file (.parquet or .jsonl)') |
|
|
parser.add_argument('output', nargs='?', default='data/merged/features/norm/stocks_features_improved_normalized.pkl', |
|
|
help='Output pickle file for normalized features') |
|
|
parser.add_argument('--no-save-normalizer', action='store_true', |
|
|
help='Do not save the normalizer pickle') |
|
|
parser.add_argument('--no-preserve-symbol', action='store_true', |
|
|
help='Do not preserve symbol column') |
|
|
parser.add_argument('--no-handle-outliers', action='store_true', |
|
|
help='Do not handle outliers') |
|
|
parser.add_argument('--no-feature-engineering', action='store_true', |
|
|
help='Do not create engineered features') |
|
|
parser.add_argument('--train', action='store_true', |
|
|
help='Normalize the train file and save under train/norm/') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
kwargs = { |
|
|
'preserve_symbol': not args.no_preserve_symbol, |
|
|
'handle_outliers': not args.no_handle_outliers, |
|
|
'feature_engineering': not args.no_feature_engineering |
|
|
} |
|
|
|
|
|
if args.train: |
|
|
train_input = 'data/merged/train/stocks_features_train.parquet' |
|
|
train_norm_dir = 'data/merged/train/norm' |
|
|
import os |
|
|
os.makedirs(train_norm_dir, exist_ok=True) |
|
|
train_output = os.path.join(train_norm_dir, 'stocks_features_train_normalized.pkl') |
|
|
print(f"[INFO] Normalizing train file: {train_input} -> {train_output}") |
|
|
normalize_stock_data_file_improved( |
|
|
train_input, |
|
|
train_output, |
|
|
save_normalizer=not args.no_save_normalizer, |
|
|
**kwargs |
|
|
) |
|
|
else: |
|
|
print(f"[INFO] Enhanced normalizing: {args.input} -> {args.output}") |
|
|
print(f"[INFO] Options: {kwargs}") |
|
|
normalize_stock_data_file_improved( |
|
|
args.input, |
|
|
args.output, |
|
|
save_normalizer=not args.no_save_normalizer, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |