# Advanced ML Sentiment Lab - Streamlit App
import warnings
warnings.filterwarnings("ignore")
import os
from pathlib import Path
from collections import Counter
from typing import List, Dict, Tuple, Optional
from urllib.parse import urlparse
import numpy as np
import pandas as pd
import streamlit as st
import plotly.express as px
import plotly.graph_objects as go
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
f1_score,
accuracy_score,
precision_score,
recall_score,
average_precision_score,
roc_auc_score,
roc_curve,
precision_recall_curve,
confusion_matrix,
)
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.naive_bayes import MultinomialNB
from scipy.sparse import hstack
import joblib
# =========================================================
# App configuration
# =========================================================
st.set_page_config(
page_title="Advanced ML Sentiment Lab",
page_icon="🚀",
layout="wide",
initial_sidebar_state="expanded",
)
# Base paths (works locally and on Hugging Face Spaces)
BASE_DIR = Path(__file__).resolve().parent
MODELS_DIR = BASE_DIR / "models_sentiment_lab"
MODELS_DIR.mkdir(exist_ok=True)
# =========================================================
# Premium CSS (SaaS-style)
# =========================================================
APP_CSS = """
"""
st.markdown(APP_CSS, unsafe_allow_html=True)
# =========================================================
# Utility functions
# =========================================================
def basic_clean(s: str) -> str:
import re, html
if not isinstance(s, str):
s = str(s)
s = html.unescape(s).lower()
s = re.sub(r"
", " ", s)
s = re.sub(r"http\S+|www\S+", " ", s)
s = re.sub(r"[^a-z0-9\s']", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def _is_url(path: str) -> bool:
try:
parsed = urlparse(path)
return parsed.scheme in ("http", "https")
except Exception:
return False
@st.cache_data(show_spinner=True)
def load_default_sentiment_dataset() -> pd.DataFrame:
"""
Try to automatically load IMDB Dataset from the repo or environment.
Priority:
1) SENTIMENT_DATA_PATH / DATA_PATH / CSV_PATH env vars (file path)
2) SENTIMENT_DATA_URL / DATA_URL / CSV_URL env vars (URL)
3) data/IMDB Dataset.csv in common locations relative to this file.
"""
# 1) Env path hints
env_path = None
for k in ("SENTIMENT_DATA_PATH", "DATA_PATH", "CSV_PATH"):
v = os.getenv(k)
if v:
env_path = v.strip()
break
env_url = None
for k in ("SENTIMENT_DATA_URL", "DATA_URL", "CSV_URL"):
v = os.getenv(k)
if v:
env_url = v.strip()
break
candidates: List[str] = []
if env_path:
candidates.append(env_path)
if env_url:
candidates.append(env_url)
rel_default = "data/IMDB Dataset.csv"
candidates.append(rel_default)
cwd = Path.cwd()
candidates.append(str(cwd / rel_default))
# When file is under src/data or repo/data
candidates.append(str(BASE_DIR / "data" / "IMDB Dataset.csv"))
candidates.append(str(BASE_DIR.parent / "data" / "IMDB Dataset.csv"))
# Directly next to the app
candidates.append(str(BASE_DIR / "IMDB Dataset.csv"))
candidates.append(str(BASE_DIR.parent / "IMDB Dataset.csv"))
tried: List[str] = []
last_err: Optional[Exception] = None
for src in candidates:
if not src or src in tried:
continue
tried.append(src)
try:
if _is_url(src):
df = pd.read_csv(src)
else:
p = Path(src)
if not p.exists():
continue
df = pd.read_csv(p)
if df is not None and not df.empty:
return df
except Exception as e:
last_err = e
continue
msg_lines = [
"Could not find dataset at 'data/IMDB Dataset.csv'. Tried:",
*[f"- {t}" for t in tried],
]
if last_err is not None:
msg_lines.append(f"Last error: {last_err}")
raise FileNotFoundError("\n".join(msg_lines))
@st.cache_data(show_spinner=False)
def clean_df(
df: pd.DataFrame,
text_col: str,
label_col: str,
pos_label_str: str,
neg_label_str: str,
) -> Tuple[pd.DataFrame, np.ndarray]:
out = df.copy()
out["text_raw"] = out[text_col].astype(str)
out["text_clean"] = out["text_raw"].map(basic_clean)
lab = out[label_col].astype(str)
y = np.where(lab == pos_label_str, 1, 0).astype(int)
return out, y
def build_advanced_features(
texts: List[str],
max_word_features: int,
use_char: bool,
char_max: int,
):
word_vec = TfidfVectorizer(
ngram_range=(1, 3),
max_features=max_word_features,
min_df=2,
max_df=0.95,
)
Xw = word_vec.fit_transform(texts)
vecs = [word_vec]
mats = [Xw]
if use_char:
char_vec = TfidfVectorizer(
analyzer="char",
ngram_range=(3, 6),
max_features=char_max,
min_df=2,
)
Xc = char_vec.fit_transform(texts)
vecs.append(char_vec)
mats.append(Xc)
X_all = hstack(mats) if len(mats) > 1 else mats[0]
return X_all, tuple(vecs)
def train_multiple_models(X_train, y_train, models_config: Dict) -> Dict:
models = {}
for name, cfg in models_config.items():
if not cfg.get("enabled", False):
continue
if name == "Logistic Regression":
model = LogisticRegression(
C=cfg["C"],
max_iter=1000,
solver="liblinear",
n_jobs=-1,
class_weight="balanced",
random_state=42,
)
elif name == "Random Forest":
model = RandomForestClassifier(
n_estimators=cfg["n_estimators"],
max_depth=cfg["max_depth"],
min_samples_split=cfg["min_samples_split"],
n_jobs=-1,
class_weight="balanced",
random_state=42,
)
elif name == "Gradient Boosting":
model = GradientBoostingClassifier(
n_estimators=cfg["n_estimators"],
learning_rate=cfg["learning_rate"],
max_depth=cfg["max_depth"],
random_state=42,
)
elif name == "Naive Bayes":
model = MultinomialNB(alpha=cfg["alpha"])
else:
continue
model.fit(X_train, y_train)
models[name] = model
return models
def evaluate_model(model, X_val, y_val) -> Dict:
y_pred = model.predict(X_val)
try:
y_proba = model.predict_proba(X_val)[:, 1]
except Exception:
scores = model.decision_function(X_val)
y_proba = (scores - scores.min()) / (scores.max() - scores.min() + 1e-9)
metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"precision": precision_score(y_val, y_pred, zero_division=0),
"recall": recall_score(y_val, y_pred, zero_division=0),
"f1": f1_score(y_val, y_pred, zero_division=0),
"roc_auc": roc_auc_score(y_val, y_proba),
"pr_auc": average_precision_score(y_val, y_proba),
"y_pred": y_pred,
"y_proba": y_proba,
}
return metrics
def compute_threshold_view(
y_true: np.ndarray,
y_proba: np.ndarray,
threshold: float,
cost_fp: float,
cost_fn: float,
) -> Tuple[Dict, pd.DataFrame]:
y_pred_thr = (y_proba >= threshold).astype(int)
tn, fp, fn, tp = confusion_matrix(y_true, y_pred_thr).ravel()
metrics = {
"threshold": threshold,
"accuracy": accuracy_score(y_true, y_pred_thr),
"precision": precision_score(y_true, y_pred_thr, zero_division=0),
"recall": recall_score(y_true, y_pred_thr, zero_division=0),
"f1": f1_score(y_true, y_pred_thr, zero_division=0),
"specificity": tn / (tn + fp + 1e-9),
"fp": int(fp),
"fn": int(fn),
"tp": int(tp),
"tn": int(tn),
}
metrics["cost"] = metrics["fp"] * cost_fp + metrics["fn"] * cost_fn
grid = np.linspace(0.05, 0.95, 37)
rows = []
for t in grid:
y_pred_g = (y_proba >= t).astype(int)
tn_g, fp_g, fn_g, tp_g = confusion_matrix(y_true, y_pred_g).ravel()
f1_g = f1_score(y_true, y_pred_g, zero_division=0)
cost_g = fp_g * cost_fp + fn_g * cost_fn
rows.append(
{
"threshold": t,
"f1": f1_g,
"fp": fp_g,
"fn": fn_g,
"cost": cost_g,
}
)
df_curve = pd.DataFrame(rows)
return metrics, df_curve
# =========================================================
# Sidebar & dataset loading
# =========================================================
st.sidebar.markdown("### 🚀 Advanced ML Sentiment Lab")
st.sidebar.markdown("---")
st.sidebar.markdown("### Dataset source")
dataset_mode = st.sidebar.radio(
"How do you want to provide the dataset?",
options=["Auto (IMDB from repo)", "Upload CSV"],
index=0,
)
df: Optional[pd.DataFrame] = None
if dataset_mode == "Upload CSV":
upload = st.sidebar.file_uploader(
"Upload CSV dataset",
type=["csv"],
help="Small custom datasets work best here.",
)
if upload is not None:
try:
df = pd.read_csv(upload)
except Exception as e:
st.sidebar.error(f"Could not read uploaded CSV: {e}")
else:
try:
df = load_default_sentiment_dataset()
except Exception as e:
st.markdown(
"""
Advanced ML Sentiment Lab
Dataset could not be loaded automatically.
Make sure data/IMDB Dataset.csv exists in the repo
(or set SENTIMENT_DATA_PATH / DATA_PATH), or switch to "Upload CSV"
in the sidebar.
Text + binary label
TF-IDF word & character features
Threshold tuning with business cost
Artifacts saved under models_sentiment_lab/
""",
unsafe_allow_html=True,
)
st.error(f"Dataset error: {e}")
st.stop()
if df is None or df.empty:
st.error("No dataset available. Provide a CSV via the sidebar.")
st.stop()
all_cols = list(df.columns)
st.sidebar.markdown("### Column mapping")
# Guess text column
default_text_idx = 0
for i, c in enumerate(all_cols):
if str(c).lower() in ["review", "text", "comment", "content", "message", "body"]:
default_text_idx = i
break
text_col = st.sidebar.selectbox("Text column", all_cols, index=default_text_idx)
label_candidates = [c for c in all_cols if c != text_col]
if not label_candidates:
st.error("Dataset must have at least 2 columns (text + label).")
st.stop()
default_label_idx = 0
for i, c in enumerate(label_candidates):
if str(c).lower() in ["sentiment", "label", "target", "y", "class"]:
default_label_idx = i
break
label_col = st.sidebar.selectbox("Label column", label_candidates, index=default_label_idx)
label_values = df[label_col].astype(str).dropna().value_counts().index.tolist()
if len(label_values) < 2:
st.error("Label column must have at least 2 distinct values.")
st.stop()
st.sidebar.markdown("### Label mapping")
pos_label_str = st.sidebar.selectbox("Positive class (1)", label_values, index=0)
neg_label_str = st.sidebar.selectbox(
"Negative class (0)", label_values, index=1 if len(label_values) > 1 else 0
)
# Training sample size (to keep it fast)
st.sidebar.markdown("### Training subset")
max_train_rows = st.sidebar.slider(
"Max rows used for training",
min_value=5000,
max_value=50000,
value=10000,
step=5000,
help="Training uses a stratified subset to keep runtime under control.",
)
# =========================================================
# Data processing & dataset KPIs
# =========================================================
dfc, y = clean_df(
df,
text_col=text_col,
label_col=label_col,
pos_label_str=pos_label_str,
neg_label_str=neg_label_str,
)
n_rows = len(dfc)
n_pos = int((y == 1).sum())
n_neg = int((y == 0).sum())
pos_ratio = n_pos / max(1, n_rows)
avg_len = dfc["text_clean"].str.len().mean()
sample_vocab = len(set(" ".join(dfc["text_clean"].head(5000)).split()))
# =========================================================
# Hero + KPI cards
# =========================================================
st.markdown(
f"""
Advanced ML Sentiment Lab
Production-style sentiment analytics on {n_rows:,} samples.
Configure TF-IDF features, train multiple models, tune the decision threshold
under custom business costs, and inspect model errors.
Text column: {text_col}
Label column: {label_col}
Binary labels: {pos_label_str} / {neg_label_str}
""",
unsafe_allow_html=True,
)
k1, k2, k3, k4 = st.columns(4)
with k1:
st.markdown(
f"""
📊
Total samples
{n_rows:,}
Cleaned for modeling
""",
unsafe_allow_html=True,
)
with k2:
st.markdown(
f"""
âś…
Positive share
{pos_ratio*100:.1f}%
{n_pos:,} positive / {n_neg:,} negative
""",
unsafe_allow_html=True,
)
with k3:
st.markdown(
f"""
📝
Avg text length
{avg_len:.0f}
characters per record
""",
unsafe_allow_html=True,
)
with k4:
st.markdown(
f"""
📚
Sample vocabulary
{sample_vocab:,}
unique tokens (first 5k rows)
""",
unsafe_allow_html=True,
)
# =========================================================
# Tabs
# =========================================================
tab_eda, tab_train, tab_threshold, tab_compare, tab_errors, tab_deploy = st.tabs(
["EDA", "Train & Validation", "Threshold & Cost", "Compare Models", "Error Analysis", "Deploy"]
)
# =========================================================
# TAB 1: EDA
# =========================================================
with tab_eda:
st.markdown(
'',
unsafe_allow_html=True,
)
st.markdown(
'Quick checks on class balance, text lengths, and token distribution.
',
unsafe_allow_html=True,
)
col1, col2 = st.columns(2)
with col1:
dfc["len_tokens"] = dfc["text_clean"].str.split().map(len)
fig_len = px.histogram(
dfc,
x="len_tokens",
nbins=50,
title="Token length distribution",
)
fig_len.update_layout(
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#e5e7eb"),
xaxis_title="Tokens per text",
yaxis_title="Count",
)
st.plotly_chart(fig_len, width="stretch")
dist_data = pd.DataFrame(
{
"Class": [neg_label_str, pos_label_str],
"Count": [n_neg, n_pos],
}
)
fig_class = px.pie(
dist_data,
values="Count",
names="Class",
title="Class distribution",
)
fig_class.update_layout(
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#e5e7eb"),
)
st.plotly_chart(fig_class, width="stretch")
with col2:
sample_size = min(10000, len(dfc))
cnt = Counter()
for t in dfc["text_clean"].sample(sample_size, random_state=42):
cnt.update(t.split())
top_tokens = pd.DataFrame(cnt.most_common(25), columns=["Token", "Frequency"])
fig_tokens = px.bar(
top_tokens,
x="Frequency",
y="Token",
orientation="h",
title="Top tokens (sample)",
)
fig_tokens.update_layout(
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#e5e7eb"),
showlegend=False,
yaxis={"categoryorder": "total ascending"},
)
st.plotly_chart(fig_tokens, width="stretch")
st.markdown("**Length statistics by class**")
st.dataframe(
dfc.groupby(label_col)["len_tokens"].describe().round(2),
width="stretch",
)
# =========================================================
# TAB 2: Train & Validation
# =========================================================
with tab_train:
st.markdown(
'',
unsafe_allow_html=True,
)
st.markdown(
'Configure TF-IDF, select models, then run a stratified train/validation split on a capped subset for fast turnaround.
',
unsafe_allow_html=True,
)
fe1, fe2, fe3 = st.columns(3)
with fe1:
max_word_features = st.slider(
"Max word features",
min_value=5000,
max_value=60000,
value=20000,
step=5000,
)
with fe2:
use_char = st.checkbox("Add character n-grams", value=True)
with fe3:
test_size = st.slider("Validation split (%)", 10, 40, 20, 5) / 100.0
st.markdown("---")
st.markdown("#### Model configuration")
models_config: Dict[str, Dict] = {}
mc1, mc2 = st.columns(2)
with mc1:
with st.expander("Logistic Regression", expanded=True):
en = st.checkbox("Enable Logistic Regression", value=True, key="lr_en_ultra")
C_val = st.slider(
"Regularization C", 0.1, 10.0, 2.0, 0.5, key="lr_C_ultra"
)
models_config["Logistic Regression"] = {"enabled": en, "C": C_val}
with st.expander("Random Forest"):
en = st.checkbox("Enable Random Forest", value=False, key="rf_en_ultra")
est = st.slider(
"n_estimators", 50, 300, 120, 50, key="rf_est_ultra"
)
depth = st.slider("max_depth", 5, 40, 18, 5, key="rf_depth_ultra")
split = st.slider(
"min_samples_split", 2, 20, 5, 1, key="rf_split_ultra"
)
models_config["Random Forest"] = {
"enabled": en,
"n_estimators": est,
"max_depth": depth,
"min_samples_split": split,
}
with mc2:
with st.expander("Gradient Boosting"):
en = st.checkbox("Enable Gradient Boosting", value=False, key="gb_en_ultra")
est = st.slider(
"n_estimators", 50, 300, 120, 50, key="gb_est_ultra"
)
lr = st.slider(
"learning_rate", 0.01, 0.3, 0.08, 0.01, key="gb_lr_ultra"
)
depth = st.slider("max_depth", 2, 8, 3, 1, key="gb_depth_ultra")
models_config["Gradient Boosting"] = {
"enabled": en,
"n_estimators": est,
"learning_rate": lr,
"max_depth": depth,
}
with st.expander("Naive Bayes"):
en = st.checkbox("Enable Naive Bayes", value=True, key="nb_en_ultra")
alpha = st.slider(
"alpha (smoothing)", 0.1, 3.0, 1.0, 0.1, key="nb_alpha_ultra"
)
models_config["Naive Bayes"] = {"enabled": en, "alpha": alpha}
st.markdown("---")
random_state = 42
if st.button("Train models", type="primary"):
enabled_models = [m for m, cfg in models_config.items() if cfg["enabled"]]
if not enabled_models:
st.warning("Enable at least one model before training.", icon="⚠️")
else:
progress = st.progress(0)
status = st.empty()
# Stratified subset for training
progress.progress(5)
status.markdown("Sampling rows for training (stratified)…")
n_total = len(dfc)
train_rows = min(max_train_rows, n_total)
indices = np.arange(n_total)
if train_rows < n_total:
sample_idx, _ = train_test_split(
indices,
train_size=train_rows,
stratify=y,
random_state=random_state,
)
else:
sample_idx = indices
df_train = dfc.iloc[sample_idx].copy()
y_sample = y[sample_idx]
status.markdown("Cleaning and vectorising text…")
progress.progress(20)
texts = df_train["text_clean"].tolist()
X_all, vecs = build_advanced_features(
texts,
max_word_features=max_word_features,
use_char=use_char,
char_max=20000,
)
status.markdown("Creating stratified train/validation split…")
progress.progress(40)
local_idx = np.arange(len(df_train))
train_loc, val_loc, y_train, y_val = train_test_split(
local_idx,
y_sample,
test_size=test_size,
stratify=y_sample,
random_state=random_state,
)
X_train = X_all[train_loc]
X_val = X_all[val_loc]
status.markdown("Training models…")
progress.progress(65)
trained_models = train_multiple_models(X_train, y_train, models_config)
status.markdown("Evaluating models on validation set…")
progress.progress(80)
all_results: Dict[str, Dict] = {}
for name, model in trained_models.items():
metrics = evaluate_model(model, X_val, y_val)
all_results[name] = {"model": model, "metrics": metrics}
status.markdown("Saving artifacts…")
progress.progress(92)
val_idx_global = df_train.index[val_loc]
joblib.dump(vecs, MODELS_DIR / "vectorizers.joblib")
joblib.dump(trained_models, MODELS_DIR / "models.joblib")
joblib.dump(all_results, MODELS_DIR / "results.joblib")
joblib.dump(
{
"pos_label": pos_label_str,
"neg_label": neg_label_str,
"val_idx": val_idx_global,
"y_val": y_val,
"text_col": text_col,
"label_col": label_col,
},
MODELS_DIR / "metadata.joblib",
)
progress.progress(100)
status.markdown("Training complete.")
st.success(f"Trained {len(trained_models)} model(s) on {len(df_train):,} rows.")
rows = []
for name, res in all_results.items():
m = res["metrics"]
rows.append(
{
"Model": name,
"Accuracy": f"{m['accuracy']:.4f}",
"Precision": f"{m['precision']:.4f}",
"Recall": f"{m['recall']:.4f}",
"F1 (validation)": f"{m['f1']:.4f}",
"ROC-AUC": f"{m['roc_auc']:.4f}",
"PR-AUC": f"{m['pr_auc']:.4f}",
}
)
res_df = pd.DataFrame(rows)
st.markdown("#### Training summary")
st.dataframe(res_df, width="stretch", hide_index=True)
# =========================================================
# TAB 3: Threshold & Cost
# =========================================================
with tab_threshold:
st.markdown(
'',
unsafe_allow_html=True,
)
st.markdown(
'Pick a model, move the decision threshold, and inspect how metrics and expected cost change.
',
unsafe_allow_html=True,
)
results_path = MODELS_DIR / "results.joblib"
meta_path = MODELS_DIR / "metadata.joblib"
if not results_path.exists() or not meta_path.exists():
st.info("Train models in the previous tab to unlock threshold tuning.")
else:
all_results = joblib.load(results_path)
metadata = joblib.load(meta_path)
y_val = metadata["y_val"]
best_name = max(
all_results.keys(),
key=lambda n: all_results[n]["metrics"]["f1"],
)
model_name = st.selectbox(
"Model to analyse",
options=list(all_results.keys()),
index=list(all_results.keys()).index(best_name),
)
metrics_base = all_results[model_name]["metrics"]
y_proba = metrics_base["y_proba"]
col_thr, col_cost = st.columns([1.2, 1])
with col_thr:
threshold = st.slider(
"Decision threshold for positive class",
min_value=0.05,
max_value=0.95,
value=0.5,
step=0.01,
)
with col_cost:
cost_fp = st.number_input(
"Cost of a false positive (FP)", min_value=0.0, value=1.0, step=0.5
)
cost_fn = st.number_input(
"Cost of a false negative (FN)", min_value=0.0, value=5.0, step=0.5
)
thr_metrics, df_curve = compute_threshold_view(
y_true=y_val,
y_proba=y_proba,
threshold=threshold,
cost_fp=cost_fp,
cost_fn=cost_fn,
)
c1, c2, c3, c4 = st.columns(4)
with c1:
st.metric("Accuracy", f"{thr_metrics['accuracy']:.4f}")
with c2:
st.metric("Precision", f"{thr_metrics['precision']:.4f}")
with c3:
st.metric("Recall", f"{thr_metrics['recall']:.4f}")
with c4:
st.metric("F1", f"{thr_metrics['f1']:.4f}")
c5, c6, c7, c8 = st.columns(4)
with c5:
st.metric("Specificity", f"{thr_metrics['specificity']:.4f}")
with c6:
st.metric("FP", thr_metrics["fp"])
with c7:
st.metric("FN", thr_metrics["fn"])
with c8:
st.metric("Total cost", f"{thr_metrics['cost']:.2f}")
st.markdown("##### F1 over threshold")
fig_thr = px.line(
df_curve,
x="threshold",
y="f1",
title="F1 vs threshold",
)
fig_thr.update_layout(
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#e5e7eb"),
)
st.plotly_chart(fig_thr, width="stretch")
fig_cost = px.line(
df_curve,
x="threshold",
y="cost",
title=f"Estimated cost (FP cost={cost_fp}, FN cost={cost_fn})",
)
fig_cost.update_layout(
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#e5e7eb"),
)
st.plotly_chart(fig_cost, width="stretch")
# =========================================================
# TAB 4: Compare models
# =========================================================
with tab_compare:
st.markdown(
'',
unsafe_allow_html=True,
)
st.markdown(
'Side-by-side comparison of metrics, ROC / PR curves, and confusion matrices.
',
unsafe_allow_html=True,
)
results_path = MODELS_DIR / "results.joblib"
meta_path = MODELS_DIR / "metadata.joblib"
if not results_path.exists() or not meta_path.exists():
st.info("Train models first to unlock comparison.")
else:
all_results = joblib.load(results_path)
metadata = joblib.load(meta_path)
y_val = metadata["y_val"]
st.markdown("#### Model cards")
cols = st.columns(len(all_results))
for (name, res), col in zip(all_results.items(), cols):
m = res["metrics"]
with col:
st.markdown(
f"""
""",
unsafe_allow_html=True,
)
r1, r2 = st.columns(2)
with r1:
st.markdown("##### ROC curves")
fig_roc = go.Figure()
for name, res in all_results.items():
fpr, tpr, _ = roc_curve(y_val, res["metrics"]["y_proba"])
auc_score = res["metrics"]["roc_auc"]
fig_roc.add_trace(
go.Scatter(
x=fpr,
y=tpr,
mode="lines",
name=f"{name} (AUC={auc_score:.3f})",
)
)
fig_roc.add_trace(
go.Scatter(
x=[0, 1],
y=[0, 1],
mode="lines",
name="Random",
line=dict(dash="dash", color="gray"),
)
)
fig_roc.update_layout(
xaxis_title="False positive rate",
yaxis_title="True positive rate",
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#e5e7eb"),
)
st.plotly_chart(fig_roc, width="stretch")
with r2:
st.markdown("##### Precision-Recall curves")
fig_pr = go.Figure()
for name, res in all_results.items():
prec, rec, _ = precision_recall_curve(
y_val, res["metrics"]["y_proba"]
)
pr_auc = res["metrics"]["pr_auc"]
fig_pr.add_trace(
go.Scatter(
x=rec,
y=prec,
mode="lines",
name=f"{name} (AUC={pr_auc:.3f})",
fill="tonexty",
)
)
fig_pr.update_layout(
xaxis_title="Recall",
yaxis_title="Precision",
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#e5e7eb"),
)
st.plotly_chart(fig_pr, width="stretch")
st.markdown("##### Confusion matrices (validation set)")
cm_cols = st.columns(len(all_results))
for (name, res), col in zip(all_results.items(), cm_cols):
m = res["metrics"]
cm = confusion_matrix(y_val, m["y_pred"])
fig_cm = px.imshow(
cm,
labels=dict(x="Predicted", y="Actual", color="Count"),
x=[metadata["neg_label"], metadata["pos_label"]],
y=[metadata["neg_label"], metadata["pos_label"]],
text_auto=True,
title=name,
)
fig_cm.update_layout(
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#e5e7eb"),
)
with col:
st.plotly_chart(fig_cm, width="stretch")
# =========================================================
# TAB 5: Error analysis
# =========================================================
with tab_errors:
st.markdown(
'',
unsafe_allow_html=True,
)
st.markdown(
'Browse misclassified texts to see where the model struggles and how confident it was.
',
unsafe_allow_html=True,
)
results_path = MODELS_DIR / "results.joblib"
meta_path = MODELS_DIR / "metadata.joblib"
if not results_path.exists() or not meta_path.exists():
st.info("Train models first to unlock error analysis.")
else:
all_results = joblib.load(results_path)
metadata = joblib.load(meta_path)
y_val = metadata["y_val"]
val_idx = metadata["val_idx"]
best_name = max(
all_results.keys(),
key=lambda n: all_results[n]["metrics"]["f1"],
)
model_name = st.selectbox(
"Model to inspect",
options=list(all_results.keys()),
index=list(all_results.keys()).index(best_name),
)
m = all_results[model_name]["metrics"]
y_pred = m["y_pred"]
y_proba = m["y_proba"]
# Use .loc because val_idx is based on original index
val_df = dfc.loc[val_idx].copy()
val_df["true_label"] = np.where(
y_val == 1, metadata["pos_label"], metadata["neg_label"]
)
val_df["pred_label"] = np.where(
y_pred == 1, metadata["pos_label"], metadata["neg_label"]
)
val_df["proba_pos"] = y_proba
val_df["correct"] = (y_val == y_pred)
val_df["error_type"] = np.where(
val_df["correct"],
"Correct",
np.where(y_val == 1, "False negative", "False positive"),
)
col_f1, col_f2 = st.columns([1, 1])
with col_f1:
only_errors = st.checkbox("Show only misclassified samples", value=True)
with col_f2:
sort_mode = st.selectbox(
"Sort by",
options=[
"Most confident errors",
"Least confident predictions",
"Random",
],
)
df_view = val_df.copy()
if only_errors:
df_view = df_view[~df_view["correct"]]
if sort_mode == "Most confident errors":
df_view["conf"] = np.abs(df_view["proba_pos"] - 0.5)
df_view = df_view.sort_values("conf", ascending=False)
elif sort_mode == "Least confident predictions":
df_view["conf"] = np.abs(df_view["proba_pos"] - 0.5)
df_view = df_view.sort_values("conf", ascending=True)
else:
df_view = df_view.sample(frac=1, random_state=42)
top_n = st.slider("Rows to show", 10, 200, 50, 10)
cols_show = [
"text_raw",
"true_label",
"pred_label",
"proba_pos",
"error_type",
]
st.dataframe(
df_view[cols_show].head(top_n),
width="stretch",
)
# =========================================================
# TAB 6: Deploy
# =========================================================
with tab_deploy:
st.markdown(
'',
unsafe_allow_html=True,
)
st.markdown(
'Pick the best model, test arbitrary texts, and reuse the same logic in an API or batch job.
',
unsafe_allow_html=True,
)
models_path = MODELS_DIR / "models.joblib"
vecs_path = MODELS_DIR / "vectorizers.joblib"
results_path = MODELS_DIR / "results.joblib"
meta_path = MODELS_DIR / "metadata.joblib"
if not (
models_path.exists()
and vecs_path.exists()
and results_path.exists()
and meta_path.exists()
):
st.info("Train models first to enable deployment.")
else:
models = joblib.load(models_path)
vecs = joblib.load(vecs_path)
all_results = joblib.load(results_path)
metadata = joblib.load(meta_path)
best_name = max(
all_results.keys(),
key=lambda n: all_results[n]["metrics"]["f1"],
)
model_choice = st.selectbox(
"Model for deployment",
options=["Best (by F1)"] + list(models.keys()),
index=0,
)
if model_choice == "Best (by F1)":
deploy_name = best_name
st.info(f"Using {best_name} (best F1 on validation).")
else:
deploy_name = model_choice
model = models[deploy_name]
word_vec = vecs[0]
char_vec = vecs[1] if len(vecs) > 1 else None
if "deploy_text" not in st.session_state:
st.session_state["deploy_text"] = ""
c_in, c_out = st.columns([1.4, 1.1])
with c_in:
st.markdown("#### Input text")
example_col1, example_col2, example_col3 = st.columns(3)
with example_col1:
if st.button("Positive example"):
st.session_state["deploy_text"] = (
"Absolutely loved this. Great quality, fast delivery, and "
"I would happily buy again."
)
with example_col2:
if st.button("Mixed example"):
st.session_state["deploy_text"] = (
"Some parts were decent, but overall it felt overpriced and a bit disappointing."
)
with example_col3:
if st.button("Negative example"):
st.session_state["deploy_text"] = (
"Terrible experience. Support was unhelpful and the product broke quickly."
)
text_input = st.text_area(
"Write or paste any text",
height=160,
value=st.session_state["deploy_text"],
)
predict_btn = st.button("Predict sentiment")
with c_out:
if predict_btn and text_input.strip():
clean_text = basic_clean(text_input)
Xw = word_vec.transform([clean_text])
if char_vec is not None:
Xc = char_vec.transform([clean_text])
X_test = hstack([Xw, Xc])
else:
X_test = Xw
try:
proba = float(model.predict_proba(X_test)[0, 1])
except Exception:
scores = model.decision_function(X_test)
proba = float(
(scores - scores.min()) / (scores.max() - scores.min() + 1e-9)
)
label_int = int(proba >= 0.5)
label_str = (
metadata["pos_label"] if label_int == 1 else metadata["neg_label"]
)
conf_pct = proba * 100.0 if label_int == 1 else (1.0 - proba) * 100.0
st.markdown(
"""
Predicted sentiment
""",
unsafe_allow_html=True,
)
cls = "prediction-positive" if label_int == 1 else "prediction-negative"
st.markdown(
f'
{label_str}
',
unsafe_allow_html=True,
)
st.markdown(
f'
{conf_pct:.1f}% confidence
',
unsafe_allow_html=True,
)
width_pct = int(conf_pct)
st.markdown(
f"""
""",
unsafe_allow_html=True,
)