sonar_core_1 / train.py
Vu Anh
Clean up training runs and enhance model export functionality
742fa4d
#!/usr/bin/env python3
"""
Training script for Vietnamese text classification.
Supports both VNTC (news) and UTS2017_Bank (banking) datasets.
This script trains a TF-IDF + Logistic Regression model on Vietnamese text classification datasets.
"""
import argparse
import json
import logging
import os
import time
from datetime import datetime
import numpy as np
from datasets import load_dataset
import requests
import zipfile
from io import BytesIO
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
import joblib
def setup_logging(run_name):
"""Setup logging to save all information to runs folder"""
runs_dir = "runs"
os.makedirs(runs_dir, exist_ok=True)
run_dir = os.path.join(runs_dir, run_name)
os.makedirs(run_dir, exist_ok=True)
log_file = os.path.join(run_dir, "training.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
)
return run_dir
def load_vntc_data(split_ratio=0.2, random_state=42, n_samples=None):
"""Load and prepare VNTC dataset
Args:
split_ratio: Not used for VNTC (has predefined train/test split)
random_state: Not used for VNTC (has predefined train/test split)
n_samples: Optional limit on number of samples
Returns:
Tuple of (X_train, y_train), (X_test, y_test)
"""
print("Loading VNTC dataset...")
# Define dataset folder
dataset_folder = os.path.expanduser("~/.underthesea/VNTC")
os.makedirs(dataset_folder, exist_ok=True)
train_file = os.path.join(dataset_folder, "train.txt")
test_file = os.path.join(dataset_folder, "test.txt")
# Download if not exists
if not os.path.exists(train_file) or not os.path.exists(test_file):
print("Downloading VNTC dataset...")
url = "https://github.com/undertheseanlp/underthesea/releases/download/resources/VNTC.zip"
response = requests.get(url)
with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
zip_file.extractall(dataset_folder)
print("Dataset downloaded and extracted.")
# Load train data
X_train = []
y_train = []
with open(train_file, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
parts = line.strip().split(' ', 1)
if len(parts) == 2:
label = parts[0].replace('__label__', '')
text = parts[1]
y_train.append(label)
X_train.append(text)
# Load test data
X_test = []
y_test = []
with open(test_file, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
parts = line.strip().split(' ', 1)
if len(parts) == 2:
label = parts[0].replace('__label__', '')
text = parts[1]
y_test.append(label)
X_test.append(text)
# Apply sample limit if specified with stratified sampling
if n_samples:
if n_samples < len(X_train):
# Use stratified sampling to maintain class distribution
X_train_array = np.array(X_train)
y_train_array = np.array(y_train)
indices = np.arange(len(X_train))
# Shuffle to get random stratified sample
np.random.seed(42)
shuffled_indices = np.random.permutation(indices)
# Take first n_samples
sample_indices = shuffled_indices[:n_samples]
X_train = X_train_array[sample_indices].tolist()
y_train = y_train_array[sample_indices].tolist()
if n_samples < len(X_test):
# Use stratified sampling for test set too
X_test_array = np.array(X_test)
y_test_array = np.array(y_test)
indices = np.arange(len(X_test))
# Shuffle to get random stratified sample
np.random.seed(42)
shuffled_indices = np.random.permutation(indices)
# Take first n_samples
sample_indices = shuffled_indices[:n_samples]
X_test = X_test_array[sample_indices].tolist()
y_test = y_test_array[sample_indices].tolist()
# Convert to numpy arrays
X_train = np.array(X_train)
y_train = np.array(y_train)
X_test = np.array(X_test)
y_test = np.array(y_test)
print(f"Dataset loaded: {len(X_train)} train samples, {len(X_test)} test samples")
print(f"Number of unique labels: {len(set(y_train))}")
return (X_train, y_train), (X_test, y_test)
def load_uts2017_data(split_ratio=0.2, random_state=42, n_samples=None):
"""Load and prepare UTS2017_Bank classification dataset
Args:
split_ratio: Ratio for train/test split
random_state: Random seed for reproducibility
n_samples: Optional limit on number of samples
Returns:
Tuple of (X_train, y_train), (X_test, y_test)
"""
print("Loading UTS2017_Bank dataset from Hugging Face...")
# Load the classification subset
dataset = load_dataset("undertheseanlp/UTS2017_Bank", "classification")
# Get the train split (the dataset only has a train split)
train_data = dataset["train"]
# Extract text and labels
texts = train_data["text"]
labels = train_data["label"]
# Apply sample limit if specified
if n_samples and n_samples < len(texts):
texts = texts[:n_samples]
labels = labels[:n_samples]
# Convert to numpy arrays for consistency
X = np.array(texts)
y = np.array(labels)
# Split into train and test sets
# Use stratify only if we have enough samples per class (at least 2)
min_samples_per_class = 2
unique_classes, class_counts = np.unique(y, return_counts=True)
can_stratify = all(count >= min_samples_per_class for count in class_counts)
if can_stratify:
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=split_ratio, random_state=random_state, stratify=y
)
else:
print(
f"Warning: Some classes have fewer than {min_samples_per_class} samples. Disabling stratification."
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=split_ratio, random_state=random_state
)
print(f"Dataset loaded: {len(X_train)} train samples, {len(X_test)} test samples")
print(f"Number of unique labels: {len(set(y))}")
return (X_train, y_train), (X_test, y_test)
def get_available_models():
"""Get available classifier options"""
return {
# Traditional algorithms
"logistic": LogisticRegression(max_iter=1000, random_state=42),
"svc_linear": SVC(kernel="linear", random_state=42, probability=True),
"svc_rbf": SVC(kernel="rbf", random_state=42, probability=True, gamma='scale'),
"naive_bayes": MultinomialNB(),
# Tree-based algorithms
"decision_tree": DecisionTreeClassifier(random_state=42, max_depth=10),
"random_forest": RandomForestClassifier(n_estimators=100, random_state=42, max_depth=10, n_jobs=-1),
# Boosting algorithms
"gradient_boost": GradientBoostingClassifier(n_estimators=100, random_state=42, max_depth=5),
"ada_boost": AdaBoostClassifier(n_estimators=100, random_state=42),
# Neural network
"mlp": MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=500, random_state=42, early_stopping=True),
}
def train_model(
dataset="uts2017",
model_name="logistic",
max_features=20000,
ngram_range=(1, 2),
split_ratio=0.2,
n_samples=None,
export_model=False,
):
"""Train a single model with specified parameters
Args:
dataset: Dataset to use ('vntc' or 'uts2017')
model_name: Name of the model to train ('logistic' or 'svc')
max_features: Maximum number of features for TF-IDF vectorizer
ngram_range: N-gram range for feature extraction
split_ratio: Train/test split ratio
n_samples: Optional limit on number of samples
Returns:
Dictionary containing training results
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_dir = setup_logging(timestamp)
logging.info(f"Starting training run: {timestamp}")
logging.info(f"Model: {model_name}")
logging.info(f"Max features: {max_features}")
logging.info(f"N-gram range: {ngram_range}")
if n_samples:
logging.info(f"Sample limit: {n_samples}")
# Create output folder for models
output_folder = os.path.join(run_dir, "models")
os.makedirs(output_folder, exist_ok=True)
# Load data
if dataset == "vntc":
logging.info("Loading VNTC dataset...")
(X_train, y_train), (X_test, y_test) = load_vntc_data(
split_ratio=split_ratio, n_samples=n_samples
)
dataset_name = "VNTC"
else:
logging.info("Loading UTS2017_Bank dataset...")
(X_train, y_train), (X_test, y_test) = load_uts2017_data(
split_ratio=split_ratio, n_samples=n_samples
)
dataset_name = "UTS2017_Bank"
# Get unique labels for reporting
unique_labels = sorted(set(y_train))
label_counts_train = {label: np.sum(y_train == label) for label in unique_labels}
label_counts_test = {label: np.sum(y_test == label) for label in unique_labels}
logging.info(f"Train samples: {len(X_train)}")
logging.info(f"Test samples: {len(X_test)}")
logging.info(f"Unique labels: {len(unique_labels)}")
logging.info(f"Label distribution (train): {label_counts_train}")
logging.info(f"Label distribution (test): {label_counts_test}")
# Get model
available_models = get_available_models()
if model_name not in available_models:
raise ValueError(
f"Model '{model_name}' not available. Choose from: {list(available_models.keys())}"
)
classifier = available_models[model_name]
clf_name = classifier.__class__.__name__
logging.info(f"Selected classifier: {clf_name}")
# Configuration name
config_name = f"{dataset_name}_{clf_name}_feat{max_features // 1000}k_ngram{ngram_range[0]}-{ngram_range[1]}"
logging.info("=" * 60)
logging.info(f"Training: {config_name}")
logging.info("=" * 60)
# Create TF-IDF pipeline
logging.info(
f"Creating pipeline with max_features={max_features}, ngram_range={ngram_range}"
)
text_clf = Pipeline(
[
(
"vect",
CountVectorizer(max_features=max_features, ngram_range=ngram_range),
),
("tfidf", TfidfTransformer(use_idf=True)),
("clf", classifier),
]
)
# Train the model
logging.info("Training model...")
start_time = time.time()
text_clf.fit(X_train, y_train)
train_time = time.time() - start_time
logging.info(f"Training completed in {train_time:.2f} seconds")
# Evaluate on training set
logging.info("Evaluating on training set...")
train_predictions = text_clf.predict(X_train)
train_accuracy = accuracy_score(y_train, train_predictions)
logging.info(f"Training accuracy: {train_accuracy:.4f}")
# Evaluate on test set
logging.info("Evaluating on test set...")
start_time = time.time()
test_predictions = text_clf.predict(X_test)
test_accuracy = accuracy_score(y_test, test_predictions)
prediction_time = time.time() - start_time
logging.info(f"Test accuracy: {test_accuracy:.4f}")
logging.info(f"Prediction time: {prediction_time:.2f} seconds")
# Classification report
logging.info("Classification Report:")
report = classification_report(y_test, test_predictions, zero_division=0)
logging.info(report)
print("\nClassification Report:")
print(report)
# Save classification report as dict
report_dict = classification_report(
y_test, test_predictions, zero_division=0, output_dict=True
)
# Confusion matrix
cm = confusion_matrix(y_test, test_predictions, labels=unique_labels)
logging.info(f"Confusion Matrix shape: {cm.shape}")
# Save the model
model_path = os.path.join(output_folder, "model.joblib")
joblib.dump(text_clf, model_path)
logging.info(f"Model saved to {model_path}")
print(f"Model saved to {model_path}")
# Save model with config name
config_model_path = os.path.join(output_folder, f"{config_name}.joblib")
joblib.dump(text_clf, config_model_path)
logging.info(f"Model also saved as {config_model_path}")
# Export model if requested
if export_model:
# Use new format: <datasetname>_classifier_<run_id>.joblib
run_id = os.path.basename(run_dir)
export_filename = f"{dataset_name.lower()}_classifier_{run_id}.joblib"
export_path = os.path.join(".", export_filename)
joblib.dump(text_clf, export_path)
logging.info(f"Model exported as {export_path}")
print(f"Model exported for distribution: {export_filename}")
# Save label mapping
label_mapping_path = os.path.join(output_folder, "labels.txt")
with open(label_mapping_path, "w", encoding="utf-8") as f:
for label in unique_labels:
f.write(f"{label}\n")
logging.info(f"Label mapping saved to {label_mapping_path}")
# Save metadata
metadata = {
"timestamp": timestamp,
"config_name": config_name,
"model_name": model_name,
"classifier": clf_name,
"max_features": max_features,
"ngram_range": list(ngram_range),
"split_ratio": split_ratio,
"n_samples": n_samples,
"train_samples": len(X_train),
"test_samples": len(X_test),
"unique_labels": len(unique_labels),
"labels": unique_labels,
"train_accuracy": float(train_accuracy),
"test_accuracy": float(test_accuracy),
"train_time": train_time,
"prediction_time": prediction_time,
"classification_report": report_dict,
"confusion_matrix": cm.tolist(),
}
metadata_path = os.path.join(run_dir, "metadata.json")
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
logging.info(f"Metadata saved to {metadata_path}")
# Print summary
print("\n" + "=" * 60)
print("Training Summary")
print("=" * 60)
print(f"Model: {clf_name}")
print(f"Training samples: {len(X_train)}")
print(f"Test samples: {len(X_test)}")
print(f"Number of classes: {len(unique_labels)}")
print(f"Training accuracy: {train_accuracy:.4f}")
print(f"Test accuracy: {test_accuracy:.4f}")
print(f"Training time: {train_time:.2f} seconds")
print(f"Model saved to: {model_path}")
print("=" * 60)
return metadata
def train_all_configurations(dataset="vntc", models=None, num_rows=None):
"""Train multiple model configurations and compare results"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_dir = setup_logging(timestamp)
logging.info(f"Starting comparison run: {timestamp}")
logging.info(f"Dataset: {dataset}")
if num_rows:
logging.info(f"Sample limit: {num_rows}")
if models is None:
# Define all available models for comparison
available_models = get_available_models()
models = list(available_models.keys())
logging.info(f"Models to compare: {models}")
# Define configurations to test - focusing on best performing settings
configurations = []
for model_name in models:
if model_name in ["svc_rbf", "gradient_boost", "ada_boost", "mlp"]:
# Use fewer features for computationally expensive models
configurations.append({
"dataset": dataset,
"model_name": model_name,
"max_features": 10000,
"ngram_range": (1, 2),
"n_samples": num_rows
})
else:
# Use more features for faster models
configurations.append({
"dataset": dataset,
"model_name": model_name,
"max_features": 20000,
"ngram_range": (1, 2),
"n_samples": num_rows
})
results = []
for config in configurations:
print(f"\nTraining configuration: {config}")
try:
result = train_model(**config)
results.append(result)
except Exception as e:
logging.error(f"Failed to train with config {config}: {e}")
print(f"Error training configuration: {e}")
# Save comparison results
comparison_path = os.path.join(run_dir, "comparison_results.json")
with open(comparison_path, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
# Print comparison table
print("\n" + "=" * 80)
print("Model Comparison Results")
print("=" * 80)
print(
f"{'Model':<10} {'Features':<10} {'N-gram':<10} {'Train Acc':<12} {'Test Acc':<12}"
)
print("-" * 80)
for result in sorted(results, key=lambda x: x["test_accuracy"], reverse=True):
model = result["classifier"][:8]
features = f"{result['max_features'] // 1000}k"
ngram = f"{result['ngram_range'][0]}-{result['ngram_range'][1]}"
train_acc = result["train_accuracy"]
test_acc = result["test_accuracy"]
print(
f"{model:<10} {features:<10} {ngram:<10} {train_acc:<12.4f} {test_acc:<12.4f}"
)
print("=" * 80)
# Find best model
best_model = max(results, key=lambda x: x["test_accuracy"])
print(f"\nBest model: {best_model['config_name']}")
print(f"Test accuracy: {best_model['test_accuracy']:.4f}")
return results
def train_notebook(dataset="uts2017", model_name="logistic", max_features=20000, ngram_min=1, ngram_max=2,
split_ratio=0.2, n_samples=None, compare=False, export_model=False):
"""
Convenience function for training in Jupyter/Colab notebooks without argparse.
Example usage:
from train import train_notebook
train_notebook(dataset="vntc", model_name="logistic", max_features=20000, export_model=True)
"""
if compare:
print("Training and comparing multiple configurations...")
return train_all_configurations()
else:
dataset_name = "VNTC" if dataset == "vntc" else "UTS2017_Bank"
print(f"Training {model_name} model on {dataset_name} dataset...")
print(f"Configuration: max_features={max_features}, ngram=({ngram_min}, {ngram_max})")
return train_model(
dataset=dataset,
model_name=model_name,
max_features=max_features,
ngram_range=(ngram_min, ngram_max),
split_ratio=split_ratio,
n_samples=n_samples,
export_model=export_model,
)
def main():
"""Main function with argument parsing"""
# Detect if running in Jupyter/Colab
import sys
in_notebook = hasattr(sys, 'ps1') or 'ipykernel' in sys.modules or 'google.colab' in sys.modules
parser = argparse.ArgumentParser(
description="Train Vietnamese text classification model on VNTC or UTS2017_Bank dataset"
)
parser.add_argument(
"--dataset",
type=str,
choices=["vntc", "uts2017"],
default="uts2017",
help="Dataset to use for training (default: uts2017)",
)
parser.add_argument(
"--model",
type=str,
choices=["logistic", "svc_linear", "svc_rbf", "naive_bayes", "decision_tree", "random_forest", "gradient_boost", "ada_boost", "mlp"],
default="logistic",
help="Model type to train (default: logistic)",
)
parser.add_argument(
"--max-features",
type=int,
default=20000,
help="Maximum number of features for TF-IDF (default: 20000)",
)
parser.add_argument(
"--ngram-min", type=int, default=1, help="Minimum n-gram range (default: 1)"
)
parser.add_argument(
"--ngram-max", type=int, default=2, help="Maximum n-gram range (default: 2)"
)
parser.add_argument(
"--split-ratio", type=float, default=0.2, help="Test split ratio (default: 0.2)"
)
parser.add_argument(
"--num-rows",
type=int,
default=None,
help="Limit number of rows/samples for quick testing (default: None - use all data)",
)
parser.add_argument(
"--compare",
action="store_true",
help="Train and compare multiple configurations",
)
parser.add_argument(
"--compare-models",
nargs="+",
help="List of specific models to compare (e.g., --compare-models logistic random_forest svc_rbf)",
choices=["logistic", "svc_linear", "svc_rbf", "naive_bayes", "decision_tree", "random_forest", "gradient_boost", "ada_boost", "mlp"]
)
parser.add_argument(
"--compare-dataset",
type=str,
choices=["vntc", "uts2017"],
default="vntc",
help="Dataset to use for model comparison (default: vntc)"
)
parser.add_argument(
"--export-model",
action="store_true",
help="Export a copy of the trained model to project root for distribution/publishing"
)
# Use parse_known_args to ignore Jupyter/Colab kernel arguments
args, unknown = parser.parse_known_args()
# If running in notebook and there are unknown args, inform user
if in_notebook and unknown:
print(f"Note: Running in Jupyter/Colab environment. Ignoring kernel arguments: {unknown}")
if args.compare or args.compare_models:
if args.compare_models:
print(f"Training and comparing selected models: {args.compare_models}")
print(f"Dataset: {args.compare_dataset}")
if args.num_rows:
print(f"Using {args.num_rows} rows per dataset")
train_all_configurations(dataset=args.compare_dataset, models=args.compare_models, num_rows=args.num_rows)
else:
print("Training and comparing all available models...")
print(f"Dataset: {args.compare_dataset}")
if args.num_rows:
print(f"Using {args.num_rows} rows per dataset")
train_all_configurations(dataset=args.compare_dataset, num_rows=args.num_rows)
else:
dataset_name = "VNTC" if args.dataset == "vntc" else "UTS2017_Bank"
print(f"Training {args.model} model on {dataset_name} dataset...")
print(
f"Configuration: max_features={args.max_features}, ngram=({args.ngram_min}, {args.ngram_max})"
)
train_model(
dataset=args.dataset,
model_name=args.model,
max_features=args.max_features,
ngram_range=(args.ngram_min, args.ngram_max),
split_ratio=args.split_ratio,
n_samples=args.num_rows,
export_model=args.export_model,
)
if __name__ == "__main__":
main()