import json
import os
import re
import sys
from typing import List, Optional, Tuple, Dict
print("="*60)
print("STARTUP: Importing gradio...")
print("="*60)
import gradio as gr
print(f"✓ Gradio imported successfully")
print(f"✓ Gradio version: {gr.__version__}")
print(f"✓ Python version: {sys.version}")
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import HfHubHTTPError
print(f"✓ HuggingFace Hub imported successfully")
# DEBUG: Print version information and inspect available parameters
print("="*60)
print("CHECKING AVAILABLE GRADIO PARAMETERS")
print("="*60)
import inspect
try:
blocks_sig = inspect.signature(gr.Blocks.__init__)
blocks_params = list(blocks_sig.parameters.keys())
print(f"✓ gr.Blocks parameters: {blocks_params}")
print(f" - 'css' supported: {'css' in blocks_params}")
print(f" - 'title' supported: {'title' in blocks_params}")
except Exception as e:
print(f"✗ Could not inspect gr.Blocks: {e}")
try:
textbox_sig = inspect.signature(gr.Textbox.__init__)
textbox_params = list(textbox_sig.parameters.keys())
print(f"✓ gr.Textbox parameters: {textbox_params}")
print(f" - 'show_copy_button' supported: {'show_copy_button' in textbox_params}")
print(f" - 'info' supported: {'info' in textbox_params}")
except Exception as e:
print(f"✗ Could not inspect gr.Textbox: {e}")
try:
button_sig = inspect.signature(gr.Button.__init__)
button_params = list(button_sig.parameters.keys())
print(f"✓ gr.Button parameters: {button_params}")
print(f" - 'variant' supported: {'variant' in button_params}")
print(f" - 'scale' supported: {'scale' in button_params}")
except Exception as e:
print(f"✗ Could not inspect gr.Button: {e}")
try:
column_sig = inspect.signature(gr.Column.__init__)
column_params = list(column_sig.parameters.keys())
print(f"✓ gr.Column parameters: {column_params}")
print(f" - 'scale' supported: {'scale' in column_params}")
except Exception as e:
print(f"✗ Could not inspect gr.Column: {e}")
try:
dataframe_sig = inspect.signature(gr.Dataframe.__init__)
dataframe_params = list(dataframe_sig.parameters.keys())
print(f"✓ gr.Dataframe parameters: {dataframe_params}")
print(f" - 'height' supported: {'height' in dataframe_params}")
print(f" - 'line_breaks' supported: {'line_breaks' in dataframe_params}")
print(f" - 'wrap' supported: {'wrap' in dataframe_params}")
except Exception as e:
print(f"✗ Could not inspect gr.Dataframe: {e}")
print("="*60)
print()
DATASET_ID = os.environ.get(
"CIRCLECI_RESULTS_DATASET_ID",
"transformers-community/circleci-test-results",
)
MAX_ROWS = 200
# Get token from environment variable
HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN")
API = HfApi(token=HF_TOKEN)
# Smart cache: stores PR -> list of file paths
# Fetches from remote when specific data not found
_pr_files_cache: Dict[str, List[str]] = {}
def _fetch_files_for_pr(pr_number: str) -> List[str]:
"""
Fetch all failure_summary.json files for a given PR from the remote dataset.
Returns a list of file paths.
"""
prefix = f"pr-{pr_number}"
print(f"DEBUG: Fetching files for PR {pr_number} with prefix: {prefix}")
print(f"DEBUG: Dataset ID: {DATASET_ID}")
try:
# List all files in the repo and filter by prefix
entries = API.list_repo_tree(
repo_id=DATASET_ID,
repo_type="dataset",
revision="main",
recursive=True,
)
print("DEBUG: Successfully called list_repo_tree")
except HfHubHTTPError as error:
print(f"ERROR: Failed to list repo tree: {error}")
return []
except Exception as error:
print(f"ERROR: Unexpected error in list_repo_tree: {error}")
import traceback
traceback.print_exc()
return []
files = []
matching_paths = []
all_entries = []
try:
for entry in entries:
all_entries.append(entry)
entry_type = getattr(entry, "type", type(entry).__name__)
entry_path = getattr(entry, "path", str(entry))
# Debug: show first few entries
if len(all_entries) <= 10:
print(f"DEBUG: Entry {len(all_entries)}: {entry_path} (type: {entry_type})")
# Filter by prefix
if entry_path.startswith(prefix):
matching_paths.append(entry_path)
# Look for failure_summary.json files
if entry_path.startswith(prefix) and entry_path.endswith("failure_summary.json"):
if "file" in entry_type.lower() or entry_type == "RepoFile":
files.append(entry_path)
print(f"DEBUG: Found matching file: {entry_path}")
print(f"DEBUG: Total entries processed: {len(all_entries)}")
print(f"DEBUG: Entries with prefix '{prefix}': {len(matching_paths)}")
print(f"DEBUG: failure_summary.json files found: {len(files)}")
if matching_paths and len(files) == 0:
print(f"DEBUG: Sample matching paths (first 5): {matching_paths[:5]}")
except Exception as error:
print(f"ERROR: Error processing entries: {error}")
import traceback
traceback.print_exc()
return []
return files
def _extract_commit_from_path(path: str) -> str:
"""Extract commit SHA from file path."""
parts = path.split("/")
if len(parts) >= 2 and parts[1].startswith("sha-"):
return parts[1][len("sha-") :]
return "unknown"
def _get_files_for_pr_and_sha(pr_number: str, sha: str = "") -> List[str]:
"""
Get files for a PR, with smart caching.
Strategy:
1. If PR not in cache, fetch from remote and cache it
2. If PR in cache but specific SHA not found, re-fetch and update cache
3. Return matching files
Args:
pr_number: The PR number to query
sha: Optional commit SHA to filter for
Returns:
List of file paths for this PR (optionally filtered by SHA)
"""
sha_lower = sha.lower() if sha else ""
# First check: Do we have this PR in cache?
if pr_number not in _pr_files_cache:
print(f"DEBUG: PR {pr_number} not in cache, fetching from remote...")
files = _fetch_files_for_pr(pr_number)
_pr_files_cache[pr_number] = files
print(f"DEBUG: Cached {len(files)} files for PR {pr_number}")
else:
files = _pr_files_cache[pr_number]
print(f"DEBUG: PR {pr_number} found in cache with {len(files)} files")
# Second check: If SHA specified, do we have it in cache?
if sha:
sha_found = False
for file_path in files:
commit = _extract_commit_from_path(file_path)
if commit.lower().startswith(sha_lower) or sha_lower.startswith(commit.lower()):
sha_found = True
break
if not sha_found:
print(f"DEBUG: SHA {sha} not found in cache for PR {pr_number}, re-fetching...")
files = _fetch_files_for_pr(pr_number)
_pr_files_cache[pr_number] = files
print(f"DEBUG: Updated cache with {len(files)} files for PR {pr_number}")
return files
def _load_payload(path: str) -> Optional[dict]:
try:
local_path = hf_hub_download(
repo_id=DATASET_ID,
filename=path,
repo_type="dataset",
)
except Exception as error:
print(f"Failed to download {path}: {error}")
return None
try:
with open(local_path) as fp:
return json.load(fp)
except Exception as error:
print(f"Failed to load JSON for {path}: {error}")
return None
def _filter_records(repo: str, pr: str, sha: str) -> List[dict]:
repo = repo.strip().lower()
pr = pr.strip()
sha = sha.strip().lower()
print(f"DEBUG: _filter_records called with repo='{repo}', pr='{pr}', sha='{sha}'")
if not pr:
return []
# Use smart cache that auto-fetches if needed
file_paths = _get_files_for_pr_and_sha(pr, sha)
print(f"DEBUG: Found {len(file_paths)} file paths")
records: List[dict] = []
for file_path in file_paths:
commit = _extract_commit_from_path(file_path)
# Fixed SHA matching: works with both short and full SHAs
# Check if stored commit starts with input SHA OR if input SHA starts with stored commit
if sha and not (commit.lower().startswith(sha) or sha.startswith(commit.lower())):
print(f"DEBUG: Skipping {file_path} - commit {commit} doesn't match sha {sha}")
continue
payload = _load_payload(file_path)
if payload is None:
print(f"DEBUG: Skipping {file_path} - failed to load payload")
continue
# Check if metadata exists and has repository field
metadata = payload.get("metadata") or {}
repository = (metadata.get("repository") or "").lower()
# Only filter by repo if we have both a repo filter AND repository metadata
if repo and repository and repo not in repository:
print(f"DEBUG: Skipping {file_path} - repo '{repo}' not in repository '{repository}'")
continue
# If no metadata.repository exists, we don't filter by repo (include all records)
if repo and not repository:
print(f"DEBUG: No repository metadata in {file_path}, including anyway since we can't filter")
payload["__source_path"] = file_path
payload["__commit"] = commit
records.append(payload)
def _sort_key(record: dict) -> str:
metadata = record.get("metadata") or {}
return metadata.get("collected_at") or ""
records.sort(key=_sort_key, reverse=True)
print(f"DEBUG: Returning {len(records)} records after filtering")
return records[:MAX_ROWS]
def _generate_html_tables(record: dict) -> Tuple[str, str]:
"""Generate HTML tables with proper horizontal scrolling and full text display."""
# Warm color scheme: cream/beige background with dark brown/sepia text
# Background colors: cream (#FFF8E7), light beige (#FFF4E0)
# Text: dark brown (#3E2723)
# Accents: warm brown (#8D6E63), blue (#1976D2)
# Borders: medium brown (#A1887F)
# By test table
by_test_html = """
| Test |
Failures |
Full error(s) |
"""
by_test_data = record.get("by_test", {})
for idx, (test_name, test_info) in enumerate(by_test_data.items()):
count = test_info.get("count", 0)
errors = test_info.get("errors", {})
# Format errors with line breaks
error_lines = []
for err, cnt in errors.items():
error_lines.append(f"{cnt}× {err}
")
error_html = "".join(error_lines)
row_bg = "#FFF8E7" if idx % 2 == 0 else "#FFF4E0"
by_test_html += f"""
| {test_name} |
{count} |
{error_html} |
"""
by_test_html += """
"""
# By model table
by_model_html = """
| Model |
Failures |
Full error(s) |
"""
by_model_data = record.get("by_model", {})
for idx, (model_name, model_info) in enumerate(by_model_data.items()):
count = model_info.get("count", 0)
errors = model_info.get("errors", {})
# Format errors with line breaks
error_lines = []
for err, cnt in errors.items():
error_lines.append(f"{cnt}× {err}
")
error_html = "".join(error_lines)
row_bg = "#FFF8E7" if idx % 2 == 0 else "#FFF4E0"
by_model_html += f"""
| {model_name} |
{count} |
{error_html} |
"""
by_model_html += """
"""
return by_test_html, by_model_html
def _generate_markdown_summary(record: dict) -> str:
"""Generate markdown summary for copy-paste to GitHub."""
md = "# Failure summary\n\n"
# By test section
md += "## By test\n\n"
md += "| Test | Failures | Full error(s) |\n"
md += "| --- | --- | --- |\n"
by_test_data = record.get("by_test", {})
for test_name, test_info in by_test_data.items():
count = test_info.get("count", 0)
errors = test_info.get("errors", {})
error_list = [f"{cnt}× {err}" for err, cnt in errors.items()]
error_str = "; ".join(error_list)
md += f"| {test_name} | {count} | {error_str} |\n"
# By model section
md += "\n## By model\n\n"
md += "| Model | Failures | Full error(s) |\n"
md += "| --- | --- | --- |\n"
by_model_data = record.get("by_model", {})
for model_name, model_info in by_model_data.items():
count = model_info.get("count", 0)
errors = model_info.get("errors", {})
error_list = [f"{cnt}× {err}" for err, cnt in errors.items()]
error_str = "; ".join(error_list)
md += f"| {model_name} | {count} | {error_str} |\n"
return md
def _generate_pytest_commands(record: dict) -> str:
"""Generate helpful pytest commands based on the failures."""
commands = []
by_test_data = record.get("by_test", {})
by_model_data = record.get("by_model", {})
# Add header
commands.append("# Helpful pytest commands\n")
# Commands by test name pattern
if by_test_data:
commands.append("## Run specific test patterns:")
# Extract unique test name patterns (without the variants)
test_patterns = {} # Use dict to preserve one example per pattern
for test_name in by_test_data.keys():
if "::" in test_name:
parts = test_name.split("::")
if len(parts) >= 3:
# Extract method name without variant suffix
method = parts[2]
# Remove _XX_ variant suffixes (like _00_fp16_pad_left_sdpa_kernels)
method_base = re.sub(r'_\d+_.*$', '', method)
# Store the pattern with the original test as example
if method_base not in test_patterns:
test_patterns[method_base] = test_name
# Generate commands
for method_base in sorted(test_patterns.keys())[:5]: # Limit to 5 examples
commands.append(f"```bash\npytest -k {method_base}\n```")
# Add a note if there are more patterns
if len(test_patterns) > 5:
commands.append(f"\n*...and {len(test_patterns) - 5} more test patterns*")
# Commands by model
if by_model_data:
commands.append("\n## Run tests for specific models:")
for model_name in sorted(by_model_data.keys())[:5]: # Limit to 5 examples
commands.append(f"```bash\npytest tests/models/{model_name}/\n```")
# Add a note if there are more models
if len(by_model_data) > 5:
commands.append(f"\n*...and {len(by_model_data) - 5} more models*")
return "\n".join(commands)
def query(repo: str, pr: str, sha: str) -> Tuple[
str, # metadata_info
str, # by_test_html
str, # by_model_html
str, # pytest_commands
str, # raw_json
str, # status
str, # data_source_link
]:
repo = repo.strip()
pr = pr.strip()
sha = sha.strip()
print(f"DEBUG: Query called with repo='{repo}', pr='{pr}', sha='{sha}'")
# Validate SHA length if provided
if sha and len(sha) < 6:
return (
"**Error:** Commit SHA must be at least 6 characters.",
"",
"",
"",
json.dumps({"error": "Commit SHA must be at least 6 characters."}, indent=2),
"⚠️ Commit SHA must be at least 6 characters.",
""
)
if not pr:
return (
"**Error:** PR number is required.",
"",
"",
"",
json.dumps({"error": "PR number is required."}, indent=2),
"❌ Provide a PR number to search.",
""
)
records = _filter_records(repo, pr, sha)
print(f"DEBUG: _filter_records returned {len(records)} records")
if not records:
return (
f"**No records found** for PR {pr}.",
"",
"",
"",
json.dumps({"error": "No records found."}, indent=2),
f"❌ No records found for PR {pr}.",
""
)
# Use the latest record
latest_record = records[0]
# Generate data source link
source_path = latest_record.get("__source_path", "")
data_source_link = f"""
---
**Data source:** [{source_path}](https://huggingface.co/datasets/{DATASET_ID}/blob/main/{source_path})
Files are organized as `pr-{{PR}}/sha-{{COMMIT}}/failure_summary.json`
"""
metadata = latest_record.get("metadata", {})
# Generate simplified metadata info
commit_sha = latest_record.get("__commit", "N/A")
# Use repo from input/query, default to huggingface/transformers if not provided
repo_display = repo if repo else "huggingface/transformers"
metadata_lines = [
f"**Repository:** {repo_display}",
f"**PR:** [#{pr}](https://github.com/{repo_display}/pull/{pr})",
f"**Commit:** `{commit_sha}`",
f"**Total failures:** {len(latest_record.get('failures', []))}",
]
metadata_info = "\n\n".join(metadata_lines)
# Generate HTML tables
by_test_html, by_model_html = _generate_html_tables(latest_record)
# Generate pytest commands
pytest_commands = _generate_pytest_commands(latest_record)
# Raw JSON
raw_json = json.dumps(latest_record, indent=2)
# Updated status message format
status = f"✅ Showing test result | PR: {pr} - Commit: {commit_sha}"
return (
metadata_info,
by_test_html,
by_model_html,
pytest_commands,
raw_json,
status,
data_source_link
)
def refresh_dataset() -> str:
_pr_files_cache.clear()
return "✅ Cleared cached manifest. Data will be reloaded on next search."
print("="*60)
print("CREATING GRADIO INTERFACE")
print("="*60)
with gr.Blocks(title="CircleCI Test Results Viewer") as demo:
print("✓ gr.Blocks created successfully")
gr.Markdown(
"""
# 🔍 CircleCI Test Results Viewer
Explore test failure summaries from the Transformers repository CI runs.
**Quick start:** Enter a PR number and click Search to see the latest test failures.
"""
)
# Debug info display
with gr.Accordion("🐛 Debug Information", open=False):
gr.Markdown(f"""
**Gradio Version:** `{gr.__version__}`
**Python Version:** `{sys.version.split()[0]}`
**Dataset ID:** `{DATASET_ID}`
**Note:** Check the application logs for detailed parameter availability.
""")
with gr.Row():
with gr.Column(scale=1):
repo_box = gr.Textbox(
label="Repository",
placeholder="huggingface/transformers",
info="Optional: filter by repository name"
)
with gr.Column(scale=1):
pr_box = gr.Textbox(
label="PR Number",
placeholder="42240",
info="Required: PR number to search"
)
with gr.Column(scale=1):
sha_box = gr.Textbox(
label="Commit SHA",
placeholder="50947fc (min 6 chars)",
info="Optional: commit SHA (min 6 characters)"
)
with gr.Row():
search_btn = gr.Button("🔎 Search", variant="primary", scale=2)
refresh_btn = gr.Button("🔄 Clear Cache", scale=1)
status_md = gr.Markdown("")
with gr.Tabs() as tabs:
with gr.Tab("📊 Summary"):
metadata_box = gr.Markdown(label="Metadata")
gr.Markdown("---")
gr.Markdown("### 📝 By Test")
gr.Markdown("*Scroll horizontally to see full test names and errors*")
by_test_html = gr.HTML(label="Test Failures")
gr.Markdown("---")
gr.Markdown("### 🏷️ By Model")
gr.Markdown("*Scroll horizontally to see full error messages*")
by_model_html = gr.HTML(label="Model Failures")
with gr.Tab("🧪 Pytest Commands"):
gr.Markdown(
"""
Helpful pytest commands to run specific failing tests locally.
"""
)
pytest_output = gr.Markdown()
with gr.Tab("🔧 Raw JSON"):
gr.Markdown(
"""
Full JSON data for debugging or custom processing.
"""
)
json_view = gr.Code(
label="Latest entry details",
language="json",
lines=20,
)
# Dynamic data source link (replaces the hardcoded one)
data_source_md = gr.Markdown("")
def get_url_params(request: gr.Request):
"""Get URL parameters from the request"""
try:
params = dict(request.query_params)
repo = params.get('repo', '')
pr = params.get('pr', '')
sha = params.get('sha', '')
print(f"DEBUG: URL params from request: repo={repo}, pr={pr}, sha={sha}")
return repo, pr, sha
except Exception as e:
print(f"DEBUG: Error getting URL params: {e}")
return '', '', ''
def auto_search_if_params(repo: str, pr: str, sha: str):
"""Automatically trigger search if PR is provided"""
if pr:
print(f"DEBUG: Auto-triggering search with repo={repo}, pr={pr}, sha={sha}")
return query(repo, pr, sha)
else:
return (
"Enter a PR number and click Search",
"",
"",
"",
"",
"💡 Enter a PR number above to get started",
""
)
# Connect the search button
search_btn.click(
query,
inputs=[repo_box, pr_box, sha_box],
outputs=[
metadata_box,
by_test_html,
by_model_html,
pytest_output,
json_view,
status_md,
data_source_md
]
)
# Connect the refresh button
refresh_btn.click(refresh_dataset, outputs=status_md)
# Load URL parameters when page loads, then auto-search if PR is present
demo.load(
get_url_params,
outputs=[repo_box, pr_box, sha_box]
).then(
auto_search_if_params,
inputs=[repo_box, pr_box, sha_box],
outputs=[
metadata_box,
by_test_html,
by_model_html,
pytest_output,
json_view,
status_md,
data_source_md
]
)
print("="*60)
print("✓ GRADIO INTERFACE CREATED SUCCESSFULLY")
print("="*60)
if __name__ == "__main__":
print("Launching app...")
demo.queue(max_size=20).launch(ssr_mode=False)
print("✓ App launched")