import json import os import re import sys from typing import List, Optional, Tuple, Dict print("="*60) print("STARTUP: Importing gradio...") print("="*60) import gradio as gr print(f"✓ Gradio imported successfully") print(f"✓ Gradio version: {gr.__version__}") print(f"✓ Python version: {sys.version}") from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import HfHubHTTPError print(f"✓ HuggingFace Hub imported successfully") # DEBUG: Print version information and inspect available parameters print("="*60) print("CHECKING AVAILABLE GRADIO PARAMETERS") print("="*60) import inspect try: blocks_sig = inspect.signature(gr.Blocks.__init__) blocks_params = list(blocks_sig.parameters.keys()) print(f"✓ gr.Blocks parameters: {blocks_params}") print(f" - 'css' supported: {'css' in blocks_params}") print(f" - 'title' supported: {'title' in blocks_params}") except Exception as e: print(f"✗ Could not inspect gr.Blocks: {e}") try: textbox_sig = inspect.signature(gr.Textbox.__init__) textbox_params = list(textbox_sig.parameters.keys()) print(f"✓ gr.Textbox parameters: {textbox_params}") print(f" - 'show_copy_button' supported: {'show_copy_button' in textbox_params}") print(f" - 'info' supported: {'info' in textbox_params}") except Exception as e: print(f"✗ Could not inspect gr.Textbox: {e}") try: button_sig = inspect.signature(gr.Button.__init__) button_params = list(button_sig.parameters.keys()) print(f"✓ gr.Button parameters: {button_params}") print(f" - 'variant' supported: {'variant' in button_params}") print(f" - 'scale' supported: {'scale' in button_params}") except Exception as e: print(f"✗ Could not inspect gr.Button: {e}") try: column_sig = inspect.signature(gr.Column.__init__) column_params = list(column_sig.parameters.keys()) print(f"✓ gr.Column parameters: {column_params}") print(f" - 'scale' supported: {'scale' in column_params}") except Exception as e: print(f"✗ Could not inspect gr.Column: {e}") try: dataframe_sig = inspect.signature(gr.Dataframe.__init__) dataframe_params = list(dataframe_sig.parameters.keys()) print(f"✓ gr.Dataframe parameters: {dataframe_params}") print(f" - 'height' supported: {'height' in dataframe_params}") print(f" - 'line_breaks' supported: {'line_breaks' in dataframe_params}") print(f" - 'wrap' supported: {'wrap' in dataframe_params}") except Exception as e: print(f"✗ Could not inspect gr.Dataframe: {e}") print("="*60) print() DATASET_ID = os.environ.get( "CIRCLECI_RESULTS_DATASET_ID", "transformers-community/circleci-test-results", ) MAX_ROWS = 200 # Get token from environment variable HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") API = HfApi(token=HF_TOKEN) # Smart cache: stores PR -> list of file paths # Fetches from remote when specific data not found _pr_files_cache: Dict[str, List[str]] = {} def _fetch_files_for_pr(pr_number: str) -> List[str]: """ Fetch all failure_summary.json files for a given PR from the remote dataset. Returns a list of file paths. """ prefix = f"pr-{pr_number}" print(f"DEBUG: Fetching files for PR {pr_number} with prefix: {prefix}") print(f"DEBUG: Dataset ID: {DATASET_ID}") try: # List all files in the repo and filter by prefix entries = API.list_repo_tree( repo_id=DATASET_ID, repo_type="dataset", revision="main", recursive=True, ) print("DEBUG: Successfully called list_repo_tree") except HfHubHTTPError as error: print(f"ERROR: Failed to list repo tree: {error}") return [] except Exception as error: print(f"ERROR: Unexpected error in list_repo_tree: {error}") import traceback traceback.print_exc() return [] files = [] matching_paths = [] all_entries = [] try: for entry in entries: all_entries.append(entry) entry_type = getattr(entry, "type", type(entry).__name__) entry_path = getattr(entry, "path", str(entry)) # Debug: show first few entries if len(all_entries) <= 10: print(f"DEBUG: Entry {len(all_entries)}: {entry_path} (type: {entry_type})") # Filter by prefix if entry_path.startswith(prefix): matching_paths.append(entry_path) # Look for failure_summary.json files if entry_path.startswith(prefix) and entry_path.endswith("failure_summary.json"): if "file" in entry_type.lower() or entry_type == "RepoFile": files.append(entry_path) print(f"DEBUG: Found matching file: {entry_path}") print(f"DEBUG: Total entries processed: {len(all_entries)}") print(f"DEBUG: Entries with prefix '{prefix}': {len(matching_paths)}") print(f"DEBUG: failure_summary.json files found: {len(files)}") if matching_paths and len(files) == 0: print(f"DEBUG: Sample matching paths (first 5): {matching_paths[:5]}") except Exception as error: print(f"ERROR: Error processing entries: {error}") import traceback traceback.print_exc() return [] return files def _extract_commit_from_path(path: str) -> str: """Extract commit SHA from file path.""" parts = path.split("/") if len(parts) >= 2 and parts[1].startswith("sha-"): return parts[1][len("sha-") :] return "unknown" def _get_files_for_pr_and_sha(pr_number: str, sha: str = "") -> List[str]: """ Get files for a PR, with smart caching. Strategy: 1. If PR not in cache, fetch from remote and cache it 2. If PR in cache but specific SHA not found, re-fetch and update cache 3. Return matching files Args: pr_number: The PR number to query sha: Optional commit SHA to filter for Returns: List of file paths for this PR (optionally filtered by SHA) """ sha_lower = sha.lower() if sha else "" # First check: Do we have this PR in cache? if pr_number not in _pr_files_cache: print(f"DEBUG: PR {pr_number} not in cache, fetching from remote...") files = _fetch_files_for_pr(pr_number) _pr_files_cache[pr_number] = files print(f"DEBUG: Cached {len(files)} files for PR {pr_number}") else: files = _pr_files_cache[pr_number] print(f"DEBUG: PR {pr_number} found in cache with {len(files)} files") # Second check: If SHA specified, do we have it in cache? if sha: sha_found = False for file_path in files: commit = _extract_commit_from_path(file_path) if commit.lower().startswith(sha_lower) or sha_lower.startswith(commit.lower()): sha_found = True break if not sha_found: print(f"DEBUG: SHA {sha} not found in cache for PR {pr_number}, re-fetching...") files = _fetch_files_for_pr(pr_number) _pr_files_cache[pr_number] = files print(f"DEBUG: Updated cache with {len(files)} files for PR {pr_number}") return files def _load_payload(path: str) -> Optional[dict]: try: local_path = hf_hub_download( repo_id=DATASET_ID, filename=path, repo_type="dataset", ) except Exception as error: print(f"Failed to download {path}: {error}") return None try: with open(local_path) as fp: return json.load(fp) except Exception as error: print(f"Failed to load JSON for {path}: {error}") return None def _filter_records(repo: str, pr: str, sha: str) -> List[dict]: repo = repo.strip().lower() pr = pr.strip() sha = sha.strip().lower() print(f"DEBUG: _filter_records called with repo='{repo}', pr='{pr}', sha='{sha}'") if not pr: return [] # Use smart cache that auto-fetches if needed file_paths = _get_files_for_pr_and_sha(pr, sha) print(f"DEBUG: Found {len(file_paths)} file paths") records: List[dict] = [] for file_path in file_paths: commit = _extract_commit_from_path(file_path) # Fixed SHA matching: works with both short and full SHAs # Check if stored commit starts with input SHA OR if input SHA starts with stored commit if sha and not (commit.lower().startswith(sha) or sha.startswith(commit.lower())): print(f"DEBUG: Skipping {file_path} - commit {commit} doesn't match sha {sha}") continue payload = _load_payload(file_path) if payload is None: print(f"DEBUG: Skipping {file_path} - failed to load payload") continue # Check if metadata exists and has repository field metadata = payload.get("metadata") or {} repository = (metadata.get("repository") or "").lower() # Only filter by repo if we have both a repo filter AND repository metadata if repo and repository and repo not in repository: print(f"DEBUG: Skipping {file_path} - repo '{repo}' not in repository '{repository}'") continue # If no metadata.repository exists, we don't filter by repo (include all records) if repo and not repository: print(f"DEBUG: No repository metadata in {file_path}, including anyway since we can't filter") payload["__source_path"] = file_path payload["__commit"] = commit records.append(payload) def _sort_key(record: dict) -> str: metadata = record.get("metadata") or {} return metadata.get("collected_at") or "" records.sort(key=_sort_key, reverse=True) print(f"DEBUG: Returning {len(records)} records after filtering") return records[:MAX_ROWS] def _generate_html_tables(record: dict) -> Tuple[str, str]: """Generate HTML tables with proper horizontal scrolling and full text display.""" # Warm color scheme: cream/beige background with dark brown/sepia text # Background colors: cream (#FFF8E7), light beige (#FFF4E0) # Text: dark brown (#3E2723) # Accents: warm brown (#8D6E63), blue (#1976D2) # Borders: medium brown (#A1887F) # By test table by_test_html = """
""" by_test_data = record.get("by_test", {}) for idx, (test_name, test_info) in enumerate(by_test_data.items()): count = test_info.get("count", 0) errors = test_info.get("errors", {}) # Format errors with line breaks error_lines = [] for err, cnt in errors.items(): error_lines.append(f"
{cnt}× {err}
") error_html = "".join(error_lines) row_bg = "#FFF8E7" if idx % 2 == 0 else "#FFF4E0" by_test_html += f""" """ by_test_html += """
Test Failures Full error(s)
{test_name} {count} {error_html}
""" # By model table by_model_html = """
""" by_model_data = record.get("by_model", {}) for idx, (model_name, model_info) in enumerate(by_model_data.items()): count = model_info.get("count", 0) errors = model_info.get("errors", {}) # Format errors with line breaks error_lines = [] for err, cnt in errors.items(): error_lines.append(f"
{cnt}× {err}
") error_html = "".join(error_lines) row_bg = "#FFF8E7" if idx % 2 == 0 else "#FFF4E0" by_model_html += f""" """ by_model_html += """
Model Failures Full error(s)
{model_name} {count} {error_html}
""" return by_test_html, by_model_html def _generate_markdown_summary(record: dict) -> str: """Generate markdown summary for copy-paste to GitHub.""" md = "# Failure summary\n\n" # By test section md += "## By test\n\n" md += "| Test | Failures | Full error(s) |\n" md += "| --- | --- | --- |\n" by_test_data = record.get("by_test", {}) for test_name, test_info in by_test_data.items(): count = test_info.get("count", 0) errors = test_info.get("errors", {}) error_list = [f"{cnt}× {err}" for err, cnt in errors.items()] error_str = "; ".join(error_list) md += f"| {test_name} | {count} | {error_str} |\n" # By model section md += "\n## By model\n\n" md += "| Model | Failures | Full error(s) |\n" md += "| --- | --- | --- |\n" by_model_data = record.get("by_model", {}) for model_name, model_info in by_model_data.items(): count = model_info.get("count", 0) errors = model_info.get("errors", {}) error_list = [f"{cnt}× {err}" for err, cnt in errors.items()] error_str = "; ".join(error_list) md += f"| {model_name} | {count} | {error_str} |\n" return md def _generate_pytest_commands(record: dict) -> str: """Generate helpful pytest commands based on the failures.""" commands = [] by_test_data = record.get("by_test", {}) by_model_data = record.get("by_model", {}) # Add header commands.append("# Helpful pytest commands\n") # Commands by test name pattern if by_test_data: commands.append("## Run specific test patterns:") # Extract unique test name patterns (without the variants) test_patterns = {} # Use dict to preserve one example per pattern for test_name in by_test_data.keys(): if "::" in test_name: parts = test_name.split("::") if len(parts) >= 3: # Extract method name without variant suffix method = parts[2] # Remove _XX_ variant suffixes (like _00_fp16_pad_left_sdpa_kernels) method_base = re.sub(r'_\d+_.*$', '', method) # Store the pattern with the original test as example if method_base not in test_patterns: test_patterns[method_base] = test_name # Generate commands for method_base in sorted(test_patterns.keys())[:5]: # Limit to 5 examples commands.append(f"```bash\npytest -k {method_base}\n```") # Add a note if there are more patterns if len(test_patterns) > 5: commands.append(f"\n*...and {len(test_patterns) - 5} more test patterns*") # Commands by model if by_model_data: commands.append("\n## Run tests for specific models:") for model_name in sorted(by_model_data.keys())[:5]: # Limit to 5 examples commands.append(f"```bash\npytest tests/models/{model_name}/\n```") # Add a note if there are more models if len(by_model_data) > 5: commands.append(f"\n*...and {len(by_model_data) - 5} more models*") return "\n".join(commands) def query(repo: str, pr: str, sha: str) -> Tuple[ str, # metadata_info str, # by_test_html str, # by_model_html str, # pytest_commands str, # raw_json str, # status str, # data_source_link ]: repo = repo.strip() pr = pr.strip() sha = sha.strip() print(f"DEBUG: Query called with repo='{repo}', pr='{pr}', sha='{sha}'") # Validate SHA length if provided if sha and len(sha) < 6: return ( "**Error:** Commit SHA must be at least 6 characters.", "", "", "", json.dumps({"error": "Commit SHA must be at least 6 characters."}, indent=2), "⚠️ Commit SHA must be at least 6 characters.", "" ) if not pr: return ( "**Error:** PR number is required.", "", "", "", json.dumps({"error": "PR number is required."}, indent=2), "❌ Provide a PR number to search.", "" ) records = _filter_records(repo, pr, sha) print(f"DEBUG: _filter_records returned {len(records)} records") if not records: return ( f"**No records found** for PR {pr}.", "", "", "", json.dumps({"error": "No records found."}, indent=2), f"❌ No records found for PR {pr}.", "" ) # Use the latest record latest_record = records[0] # Generate data source link source_path = latest_record.get("__source_path", "") data_source_link = f""" --- **Data source:** [{source_path}](https://huggingface.co/datasets/{DATASET_ID}/blob/main/{source_path}) Files are organized as `pr-{{PR}}/sha-{{COMMIT}}/failure_summary.json` """ metadata = latest_record.get("metadata", {}) # Generate simplified metadata info commit_sha = latest_record.get("__commit", "N/A") # Use repo from input/query, default to huggingface/transformers if not provided repo_display = repo if repo else "huggingface/transformers" metadata_lines = [ f"**Repository:** {repo_display}", f"**PR:** [#{pr}](https://github.com/{repo_display}/pull/{pr})", f"**Commit:** `{commit_sha}`", f"**Total failures:** {len(latest_record.get('failures', []))}", ] metadata_info = "\n\n".join(metadata_lines) # Generate HTML tables by_test_html, by_model_html = _generate_html_tables(latest_record) # Generate pytest commands pytest_commands = _generate_pytest_commands(latest_record) # Raw JSON raw_json = json.dumps(latest_record, indent=2) # Updated status message format status = f"✅ Showing test result | PR: {pr} - Commit: {commit_sha}" return ( metadata_info, by_test_html, by_model_html, pytest_commands, raw_json, status, data_source_link ) def refresh_dataset() -> str: _pr_files_cache.clear() return "✅ Cleared cached manifest. Data will be reloaded on next search." print("="*60) print("CREATING GRADIO INTERFACE") print("="*60) with gr.Blocks(title="CircleCI Test Results Viewer") as demo: print("✓ gr.Blocks created successfully") gr.Markdown( """ # 🔍 CircleCI Test Results Viewer Explore test failure summaries from the Transformers repository CI runs. **Quick start:** Enter a PR number and click Search to see the latest test failures. """ ) # Debug info display with gr.Accordion("🐛 Debug Information", open=False): gr.Markdown(f""" **Gradio Version:** `{gr.__version__}` **Python Version:** `{sys.version.split()[0]}` **Dataset ID:** `{DATASET_ID}` **Note:** Check the application logs for detailed parameter availability. """) with gr.Row(): with gr.Column(scale=1): repo_box = gr.Textbox( label="Repository", placeholder="huggingface/transformers", info="Optional: filter by repository name" ) with gr.Column(scale=1): pr_box = gr.Textbox( label="PR Number", placeholder="42240", info="Required: PR number to search" ) with gr.Column(scale=1): sha_box = gr.Textbox( label="Commit SHA", placeholder="50947fc (min 6 chars)", info="Optional: commit SHA (min 6 characters)" ) with gr.Row(): search_btn = gr.Button("🔎 Search", variant="primary", scale=2) refresh_btn = gr.Button("🔄 Clear Cache", scale=1) status_md = gr.Markdown("") with gr.Tabs() as tabs: with gr.Tab("📊 Summary"): metadata_box = gr.Markdown(label="Metadata") gr.Markdown("---") gr.Markdown("### 📝 By Test") gr.Markdown("*Scroll horizontally to see full test names and errors*") by_test_html = gr.HTML(label="Test Failures") gr.Markdown("---") gr.Markdown("### 🏷️ By Model") gr.Markdown("*Scroll horizontally to see full error messages*") by_model_html = gr.HTML(label="Model Failures") with gr.Tab("🧪 Pytest Commands"): gr.Markdown( """ Helpful pytest commands to run specific failing tests locally. """ ) pytest_output = gr.Markdown() with gr.Tab("🔧 Raw JSON"): gr.Markdown( """ Full JSON data for debugging or custom processing. """ ) json_view = gr.Code( label="Latest entry details", language="json", lines=20, ) # Dynamic data source link (replaces the hardcoded one) data_source_md = gr.Markdown("") def get_url_params(request: gr.Request): """Get URL parameters from the request""" try: params = dict(request.query_params) repo = params.get('repo', '') pr = params.get('pr', '') sha = params.get('sha', '') print(f"DEBUG: URL params from request: repo={repo}, pr={pr}, sha={sha}") return repo, pr, sha except Exception as e: print(f"DEBUG: Error getting URL params: {e}") return '', '', '' def auto_search_if_params(repo: str, pr: str, sha: str): """Automatically trigger search if PR is provided""" if pr: print(f"DEBUG: Auto-triggering search with repo={repo}, pr={pr}, sha={sha}") return query(repo, pr, sha) else: return ( "Enter a PR number and click Search", "", "", "", "", "💡 Enter a PR number above to get started", "" ) # Connect the search button search_btn.click( query, inputs=[repo_box, pr_box, sha_box], outputs=[ metadata_box, by_test_html, by_model_html, pytest_output, json_view, status_md, data_source_md ] ) # Connect the refresh button refresh_btn.click(refresh_dataset, outputs=status_md) # Load URL parameters when page loads, then auto-search if PR is present demo.load( get_url_params, outputs=[repo_box, pr_box, sha_box] ).then( auto_search_if_params, inputs=[repo_box, pr_box, sha_box], outputs=[ metadata_box, by_test_html, by_model_html, pytest_output, json_view, status_md, data_source_md ] ) print("="*60) print("✓ GRADIO INTERFACE CREATED SUCCESSFULLY") print("="*60) if __name__ == "__main__": print("Launching app...") demo.queue(max_size=20).launch(ssr_mode=False) print("✓ App launched")