Spaces:
Runtime error
Runtime error
| import os | |
| import io | |
| import json | |
| import csv | |
| import asyncio | |
| import xml.etree.ElementTree as ET | |
| from typing import Any, Dict, Optional, Tuple, Union, List | |
| import httpx | |
| import gradio as gr | |
| import torch | |
| from dotenv import load_dotenv | |
| from loguru import logger | |
| from huggingface_hub import login | |
| from openai import OpenAI | |
| from reportlab.pdfgen import canvas | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| MarianMTModel, | |
| MarianTokenizer, | |
| ) | |
| import pandas as pd | |
| import altair as alt | |
| import spacy | |
| import spacy.cli | |
| import PyPDF2 | |
| ############################################################################### | |
| # 1) ENVIRONMENT & LOGGING # | |
| ############################################################################### | |
| # Ensure spaCy model is downloaded (English Core Web) | |
| try: | |
| nlp = spacy.load("en_core_web_sm") | |
| except OSError: | |
| logger.info("Downloading SpaCy 'en_core_web_sm' model...") | |
| spacy.cli.download("en_core_web_sm") | |
| nlp = spacy.load("en_core_web_sm") | |
| # Logging | |
| logger.add("error_logs.log", rotation="1 MB", level="ERROR") | |
| # Load environment variables | |
| load_dotenv() | |
| HUGGINGFACE_TOKEN = os.getenv("HF_TOKEN") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| BIOPORTAL_API_KEY = os.getenv("BIOPORTAL_API_KEY") # For BioPortal integration | |
| ENTREZ_EMAIL = os.getenv("ENTREZ_EMAIL") | |
| if not HUGGINGFACE_TOKEN or not OPENAI_API_KEY: | |
| logger.error("Missing Hugging Face or OpenAI credentials.") | |
| raise ValueError("Missing credentials for Hugging Face or OpenAI.") | |
| # Warn if BioPortal key is missing | |
| if not BIOPORTAL_API_KEY: | |
| logger.warning("BIOPORTAL_API_KEY is not set. BioPortal fetch calls will fail.") | |
| # Hugging Face login | |
| login(HUGGINGFACE_TOKEN) | |
| # OpenAI | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| # Device: CPU or GPU | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| logger.info(f"Using device: {device}") | |
| ############################################################################### | |
| # 2) HUGGING FACE & TRANSLATION MODEL SETUP # | |
| ############################################################################### | |
| MODEL_NAME = "mgbam/bert-base-finetuned-mgbam" | |
| try: | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| MODEL_NAME, use_auth_token=HUGGINGFACE_TOKEN | |
| ).to(device) | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_NAME, use_auth_token=HUGGINGFACE_TOKEN | |
| ) | |
| except Exception as e: | |
| logger.error(f"Model load error: {e}") | |
| raise | |
| try: | |
| translation_model_name = "Helsinki-NLP/opus-mt-en-fr" | |
| translation_model = MarianMTModel.from_pretrained( | |
| translation_model_name, use_auth_token=HUGGINGFACE_TOKEN | |
| ).to(device) | |
| translation_tokenizer = MarianTokenizer.from_pretrained( | |
| translation_model_name, use_auth_token=HUGGINGFACE_TOKEN | |
| ) | |
| except Exception as e: | |
| logger.error(f"Translation model load error: {e}") | |
| raise | |
| # Language map for translation | |
| LANGUAGE_MAP: Dict[str, Tuple[str, str]] = { | |
| "English to French": ("en", "fr"), | |
| "French to English": ("fr", "en"), | |
| } | |
| ############################################################################### | |
| # 3) API ENDPOINTS & CONSTANTS # | |
| ############################################################################### | |
| PUBMED_SEARCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi" | |
| PUBMED_FETCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" | |
| EUROPE_PMC_BASE_URL = "https://www.ebi.ac.uk/europepmc/webservices/rest/search" | |
| BIOPORTAL_API_BASE = "https://data.bioontology.org" | |
| CROSSREF_API_URL = "https://api.crossref.org/works" | |
| ############################################################################### | |
| # 4) HELPER FUNCTIONS # | |
| ############################################################################### | |
| def safe_json_parse(text: str) -> Union[Dict[str, Any], None]: | |
| """Safely parse JSON.""" | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON parsing error: {e}") | |
| return None | |
| def parse_pubmed_xml(xml_data: str) -> List[Dict[str, Any]]: | |
| """Parse PubMed XML data into a structured list of articles.""" | |
| root = ET.fromstring(xml_data) | |
| articles = [] | |
| for article in root.findall(".//PubmedArticle"): | |
| pmid = article.findtext(".//PMID") | |
| title = article.findtext(".//ArticleTitle") | |
| abstract = article.findtext(".//AbstractText") | |
| journal = article.findtext(".//Journal/Title") | |
| pub_date_elem = article.find(".//JournalIssue/PubDate") | |
| pub_date = None | |
| if pub_date_elem is not None: | |
| year = pub_date_elem.findtext("Year") | |
| month = pub_date_elem.findtext("Month") | |
| day = pub_date_elem.findtext("Day") | |
| if year and month and day: | |
| pub_date = f"{year}-{month}-{day}" | |
| else: | |
| pub_date = year | |
| articles.append({ | |
| "PMID": pmid, | |
| "Title": title, | |
| "Abstract": abstract, | |
| "Journal": journal, | |
| "PublicationDate": pub_date, | |
| }) | |
| return articles | |
| ############################################################################### | |
| # 5) ASYNC FETCH FUNCTIONS # | |
| ############################################################################### | |
| async def fetch_articles_by_nct_id(nct_id: str) -> Dict[str, Any]: | |
| params = {"query": nct_id, "format": "json"} | |
| async with httpx.AsyncClient() as client_http: | |
| try: | |
| resp = await client_http.get(EUROPE_PMC_BASE_URL, params=params) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except Exception as e: | |
| logger.error(f"Error fetching articles for {nct_id}: {e}") | |
| return {"error": str(e)} | |
| async def fetch_articles_by_query(query_params: str) -> Dict[str, Any]: | |
| """Europe PMC query via JSON input.""" | |
| parsed_params = safe_json_parse(query_params) | |
| if not parsed_params or not isinstance(parsed_params, dict): | |
| return {"error": "Invalid JSON."} | |
| query_string = " AND ".join(f"{k}:{v}" for k, v in parsed_params.items()) | |
| req_params = {"query": query_string, "format": "json"} | |
| async with httpx.AsyncClient() as client_http: | |
| try: | |
| resp = await client_http.get(EUROPE_PMC_BASE_URL, params=req_params) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except Exception as e: | |
| logger.error(f"Error fetching articles: {e}") | |
| return {"error": str(e)} | |
| async def fetch_pubmed_by_query(query_params: str) -> Dict[str, Any]: | |
| parsed_params = safe_json_parse(query_params) | |
| if not parsed_params or not isinstance(parsed_params, dict): | |
| return {"error": "Invalid JSON for PubMed."} | |
| search_params = { | |
| "db": "pubmed", | |
| "retmode": "json", | |
| "email": ENTREZ_EMAIL, | |
| "retmax": parsed_params.get("retmax", "10"), | |
| "term": parsed_params.get("term", ""), | |
| } | |
| async with httpx.AsyncClient() as client_http: | |
| try: | |
| # Search PubMed | |
| search_resp = await client_http.get(PUBMED_SEARCH_URL, params=search_params) | |
| search_resp.raise_for_status() | |
| data = search_resp.json() | |
| id_list = data.get("esearchresult", {}).get("idlist", []) | |
| if not id_list: | |
| return {"result": ""} | |
| # Fetch PubMed | |
| fetch_params = { | |
| "db": "pubmed", | |
| "id": ",".join(id_list), | |
| "retmode": "xml", | |
| "email": ENTREZ_EMAIL, | |
| } | |
| fetch_resp = await client_http.get(PUBMED_FETCH_URL, params=fetch_params) | |
| fetch_resp.raise_for_status() | |
| return {"result": fetch_resp.text} | |
| except Exception as e: | |
| logger.error(f"Error fetching PubMed articles: {e}") | |
| return {"error": str(e)} | |
| async def fetch_crossref_by_query(query_params: str) -> Dict[str, Any]: | |
| parsed_params = safe_json_parse(query_params) | |
| if not parsed_params or not isinstance(parsed_params, dict): | |
| return {"error": "Invalid JSON for Crossref."} | |
| async with httpx.AsyncClient() as client_http: | |
| try: | |
| resp = await client_http.get(CROSSREF_API_URL, params=parsed_params) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except Exception as e: | |
| logger.error(f"Error fetching Crossref data: {e}") | |
| return {"error": str(e)} | |
| async def fetch_bioportal_by_query(query_params: str) -> Dict[str, Any]: | |
| """ | |
| BioPortal fetch for medical ontologies/terminologies. | |
| Expects JSON like: {"q": "cancer"} | |
| See: https://data.bioontology.org/documentation | |
| """ | |
| if not BIOPORTAL_API_KEY: | |
| return {"error": "No BioPortal API Key set."} | |
| parsed_params = safe_json_parse(query_params) | |
| if not parsed_params or not isinstance(parsed_params, dict): | |
| return {"error": "Invalid JSON for BioPortal."} | |
| search_term = parsed_params.get("q", "") | |
| if not search_term: | |
| return {"error": "No 'q' found in JSON. Provide a search term."} | |
| url = f"{BIOPORTAL_API_BASE}/search" | |
| headers = {"Authorization": f"apikey token={BIOPORTAL_API_KEY}"} | |
| req_params = {"q": search_term} | |
| async with httpx.AsyncClient() as client_http: | |
| try: | |
| resp = await client_http.get(url, params=req_params, headers=headers) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except Exception as e: | |
| logger.error(f"Error fetching BioPortal data: {e}") | |
| return {"error": str(e)} | |
| ############################################################################### | |
| # 6) CORE FUNCTIONS # | |
| ############################################################################### | |
| def summarize_text(text: str) -> str: | |
| """OpenAI GPT-3.5 summarization.""" | |
| if not text.strip(): | |
| return "No text provided for summarization." | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[{"role": "user", "content": f"Summarize this clinical data:\n{text}"}], | |
| max_tokens=200, | |
| temperature=0.7, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| logger.error(f"Summarization error: {e}") | |
| return "Summarization failed." | |
| def predict_outcome(text: str) -> Union[Dict[str, float], str]: | |
| """Predict outcomes (classification) using a fine-tuned BERT model.""" | |
| if not text.strip(): | |
| return "No text provided for prediction." | |
| try: | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)[0] | |
| return {f"Label {i+1}": float(prob.item()) for i, prob in enumerate(probabilities)} | |
| except Exception as e: | |
| logger.error(f"Prediction error: {e}") | |
| return "Prediction failed." | |
| def generate_report(text: str, filename: str = "clinical_report.pdf") -> Optional[str]: | |
| """Generate a professional PDF report from the text.""" | |
| try: | |
| if not text.strip(): | |
| logger.warning("No text provided for the report.") | |
| c = canvas.Canvas(filename) | |
| c.drawString(100, 750, "Clinical Research Report") | |
| lines = text.split("\n") | |
| y = 730 | |
| for line in lines: | |
| if y < 50: | |
| c.showPage() | |
| y = 750 | |
| c.drawString(100, y, line) | |
| y -= 15 | |
| c.save() | |
| logger.info(f"Report generated: {filename}") | |
| return filename | |
| except Exception as e: | |
| logger.error(f"Report generation error: {e}") | |
| return None | |
| def visualize_predictions(predictions: Dict[str, float]) -> alt.Chart: | |
| """Simple Altair bar chart to visualize classification probabilities.""" | |
| data = pd.DataFrame(list(predictions.items()), columns=["Label", "Probability"]) | |
| chart = ( | |
| alt.Chart(data) | |
| .mark_bar() | |
| .encode( | |
| x=alt.X("Label:N", sort=None), | |
| y="Probability:Q", | |
| tooltip=["Label", "Probability"], | |
| ) | |
| .properties(title="Prediction Probabilities", width=500, height=300) | |
| ) | |
| return chart | |
| def translate_text(text: str, translation_option: str) -> str: | |
| """Translate text between English and French via MarianMT.""" | |
| if not text.strip(): | |
| return "No text provided for translation." | |
| try: | |
| if translation_option not in LANGUAGE_MAP: | |
| return "Unsupported translation option." | |
| inputs = translation_tokenizer(text, return_tensors="pt", padding=True).to(device) | |
| translated_tokens = translation_model.generate(**inputs) | |
| return translation_tokenizer.decode(translated_tokens[0], skip_special_tokens=True) | |
| except Exception as e: | |
| logger.error(f"Translation error: {e}") | |
| return "Translation failed." | |
| def perform_named_entity_recognition(text: str) -> str: | |
| """NER using spaCy (en_core_web_sm).""" | |
| if not text.strip(): | |
| return "No text provided for NER." | |
| try: | |
| doc = nlp(text) | |
| entities = [(ent.text, ent.label_) for ent in doc.ents] | |
| if not entities: | |
| return "No named entities found." | |
| return "\n".join(f"{t} -> {lbl}" for t, lbl in entities) | |
| except Exception as e: | |
| logger.error(f"NER error: {e}") | |
| return "NER failed." | |
| ############################################################################### | |
| # 7) FILE PARSING (TXT, PDF, CSV, XLS) # | |
| ############################################################################### | |
| def parse_pdf_file_as_str(file_up: gr.File) -> str: | |
| """Read PDF via PyPDF2. Attempt local path, else read from memory.""" | |
| pdf_path = file_up.name | |
| if os.path.isfile(pdf_path): | |
| with open(pdf_path, "rb") as f: | |
| reader = PyPDF2.PdfReader(f) | |
| return "\n".join(page.extract_text() or "" for page in reader.pages) | |
| else: | |
| if not hasattr(file_up, "file"): | |
| raise ValueError("No .file attribute found for PDF.") | |
| pdf_bytes = file_up.file.read() | |
| reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes)) | |
| return "\n".join(page.extract_text() or "" for page in reader.pages) | |
| def parse_text_file_as_str(file_up: gr.File) -> str: | |
| """Read .txt from path or fallback to memory.""" | |
| path = file_up.name | |
| if os.path.isfile(path): | |
| with open(path, "rb") as f: | |
| return f.read().decode("utf-8", errors="replace") | |
| else: | |
| if not hasattr(file_up, "file"): | |
| raise ValueError("No .file attribute for TXT.") | |
| return file_up.file.read().decode("utf-8", errors="replace") | |
| def parse_csv_file_to_df(file_up: gr.File) -> pd.DataFrame: | |
| """ | |
| Attempt multiple encodings for CSV: utf-8, utf-8-sig, latin1, ISO-8859-1. | |
| """ | |
| path = file_up.name | |
| if os.path.isfile(path): | |
| for enc in ["utf-8", "utf-8-sig", "latin1", "ISO-8859-1"]: | |
| try: | |
| return pd.read_csv(path, encoding=enc) | |
| except UnicodeDecodeError: | |
| logger.warning(f"CSV parse failed (enc={enc}). Trying next...") | |
| except Exception as e: | |
| logger.warning(f"CSV parse error (enc={enc}): {e}") | |
| raise ValueError("Could not parse local CSV with known encodings.") | |
| else: | |
| if not hasattr(file_up, "file"): | |
| raise ValueError("No .file attribute for CSV.") | |
| raw_bytes = file_up.file.read() | |
| for enc in ["utf-8", "utf-8-sig", "latin1", "ISO-8859-1"]: | |
| try: | |
| text_decoded = raw_bytes.decode(enc, errors="replace") | |
| from io import StringIO | |
| return pd.read_csv(StringIO(text_decoded)) | |
| except UnicodeDecodeError: | |
| logger.warning(f"CSV in-memory parse failed (enc={enc}). Next...") | |
| except Exception as e: | |
| logger.warning(f"In-memory CSV error (enc={enc}): {e}") | |
| raise ValueError("Could not parse in-memory CSV with known encodings.") | |
| def parse_excel_file_to_df(file_up: gr.File) -> pd.DataFrame: | |
| """Read Excel from local path or memory (openpyxl).""" | |
| path = file_up.name | |
| if os.path.isfile(path): | |
| return pd.read_excel(path, engine="openpyxl") | |
| else: | |
| if not hasattr(file_up, "file"): | |
| raise ValueError("No .file attribute for Excel.") | |
| excel_bytes = file_up.file.read() | |
| return pd.read_excel(io.BytesIO(excel_bytes), engine="openpyxl") | |
| ############################################################################### | |
| # 8) BUILDING THE GRADIO APP # | |
| ############################################################################### | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🏥 AI-Driven Clinical Research Assistant") | |
| gr.Markdown(""" | |
| **Highlights**: | |
| - **Summarize** clinical text (OpenAI GPT-3.5) | |
| - **Predict** with a specialized BERT-based model | |
| - **Translate** (English ↔ French) | |
| - **Named Entity Recognition** (spaCy) | |
| - **Fetch** from PubMed, Crossref, Europe PMC, and **BioPortal** | |
| - **Generate** professional PDF reports | |
| """) | |
| with gr.Row(): | |
| text_input = gr.Textbox(label="Input Text", lines=5, placeholder="Enter clinical text or notes...") | |
| file_input = gr.File( | |
| label="Upload File (txt/csv/xls/xlsx/pdf)", | |
| file_types=[".txt", ".csv", ".xls", ".xlsx", ".pdf"] | |
| ) | |
| action = gr.Radio( | |
| [ | |
| "Summarize", | |
| "Predict Outcome", | |
| "Generate Report", | |
| "Translate", | |
| "Perform Named Entity Recognition", | |
| "Fetch Clinical Studies", | |
| "Fetch PubMed Articles (Legacy)", | |
| "Fetch PubMed by Query", | |
| "Fetch Crossref by Query", | |
| "Fetch BioPortal by Query", | |
| ], | |
| label="Select an Action", | |
| ) | |
| translation_option = gr.Dropdown( | |
| choices=list(LANGUAGE_MAP.keys()), | |
| label="Translation Option", | |
| value="English to French" | |
| ) | |
| query_params_input = gr.Textbox( | |
| label="Query Params (JSON)", | |
| placeholder='{"term": "cancer"} or {"q": "cancer"} for BioPortal' | |
| ) | |
| nct_id_input = gr.Textbox(label="NCT ID") | |
| report_filename_input = gr.Textbox(label="Report Filename", value="clinical_report.pdf") | |
| export_format = gr.Dropdown(choices=["None", "CSV", "JSON"], label="Export Format") | |
| # Outputs | |
| output_text = gr.Textbox(label="Output", lines=8) | |
| with gr.Row(): | |
| output_chart = gr.Plot(label="Chart 1") | |
| output_chart2 = gr.Plot(label="Chart 2") | |
| output_file = gr.File(label="Generated File") | |
| submit_btn = gr.Button("Submit") | |
| ################################################################ | |
| # 9) MAIN ACTION HANDLER (ASYNC) # | |
| ################################################################ | |
| import traceback | |
| async def handle_action( | |
| action: str, | |
| txt: str, | |
| file_up: gr.File, | |
| translation_opt: str, | |
| query_str: str, | |
| nct_id: str, | |
| report_fn: str, | |
| exp_fmt: str | |
| ) -> Tuple[Optional[str], Optional[Any], Optional[Any], Optional[str]]: | |
| """ | |
| Master function to handle user actions. | |
| Returns a 4-tuple mapped to (output_text, output_chart, output_chart2, output_file). | |
| """ | |
| try: | |
| combined_text = txt.strip() | |
| # 1) If user uploaded a file, parse minimal text from .txt/.pdf here | |
| if file_up is not None: | |
| ext = os.path.splitext(file_up.name)[1].lower() | |
| if ext == ".txt": | |
| try: | |
| txt_data = parse_text_file_as_str(file_up) | |
| combined_text += "\n" + txt_data | |
| except Exception as e: | |
| return f"TXT parse error: {e}", None, None, None | |
| elif ext == ".pdf": | |
| try: | |
| pdf_data = parse_pdf_file_as_str(file_up) | |
| combined_text += "\n" + pdf_data | |
| except Exception as e: | |
| return f"PDF parse error: {e}", None, None, None | |
| # CSV and Excel are parsed *within* certain actions (e.g. Summarize) | |
| # 2) Branch by action | |
| if action == "Summarize": | |
| if file_up: | |
| fx = file_up.name.lower() | |
| if fx.endswith(".csv"): | |
| try: | |
| df_csv = parse_csv_file_to_df(file_up) | |
| combined_text += "\n" + df_csv.to_csv(index=False) | |
| except Exception as e: | |
| return f"CSV parse error (Summarize): {e}", None, None, None | |
| elif fx.endswith((".xls", ".xlsx")): | |
| try: | |
| df_xl = parse_excel_file_to_df(file_up) | |
| combined_text += "\n" + df_xl.to_csv(index=False) | |
| except Exception as e: | |
| return f"Excel parse error (Summarize): {e}", None, None, None | |
| summary = summarize_text(combined_text) | |
| return summary, None, None, None | |
| elif action == "Predict Outcome": | |
| if file_up: | |
| fx = file_up.name.lower() | |
| if fx.endswith(".csv"): | |
| try: | |
| df_csv = parse_csv_file_to_df(file_up) | |
| combined_text += "\n" + df_csv.to_csv(index=False) | |
| except Exception as e: | |
| return f"CSV parse error (Predict): {e}", None, None, None | |
| elif fx.endswith((".xls", ".xlsx")): | |
| try: | |
| df_xl = parse_excel_file_to_df(file_up) | |
| combined_text += "\n" + df_xl.to_csv(index=False) | |
| except Exception as e: | |
| return f"Excel parse error (Predict): {e}", None, None, None | |
| preds = predict_outcome(combined_text) | |
| if isinstance(preds, dict): | |
| chart = visualize_predictions(preds) | |
| return json.dumps(preds, indent=2), chart, None, None | |
| return preds, None, None, None | |
| elif action == "Generate Report": | |
| if file_up: | |
| fx = file_up.name.lower() | |
| if fx.endswith(".csv"): | |
| try: | |
| df_csv = parse_csv_file_to_df(file_up) | |
| combined_text += "\n" + df_csv.to_csv(index=False) | |
| except Exception as e: | |
| return f"CSV parse error (Report): {e}", None, None, None | |
| elif fx.endswith((".xls", ".xlsx")): | |
| try: | |
| df_xl = parse_excel_file_to_df(file_up) | |
| combined_text += "\n" + df_xl.to_csv(index=False) | |
| except Exception as e: | |
| return f"Excel parse error (Report): {e}", None, None, None | |
| path = generate_report(combined_text, report_fn) | |
| msg = f"Report generated: {path}" if path else "Report generation failed." | |
| return msg, None, None, path | |
| elif action == "Translate": | |
| if file_up: | |
| fx = file_up.name.lower() | |
| if fx.endswith(".csv"): | |
| try: | |
| df_csv = parse_csv_file_to_df(file_up) | |
| combined_text += "\n" + df_csv.to_csv(index=False) | |
| except Exception as e: | |
| return f"CSV parse error (Translate): {e}", None, None, None | |
| elif fx.endswith((".xls", ".xlsx")): | |
| try: | |
| df_xl = parse_excel_file_to_df(file_up) | |
| combined_text += "\n" + df_xl.to_csv(index=False) | |
| except Exception as e: | |
| return f"Excel parse error (Translate): {e}", None, None, None | |
| translated = translate_text(combined_text, translation_opt) | |
| return translated, None, None, None | |
| elif action == "Perform Named Entity Recognition": | |
| if file_up: | |
| fx = file_up.name.lower() | |
| if fx.endswith(".csv"): | |
| try: | |
| df_csv = parse_csv_file_to_df(file_up) | |
| combined_text += "\n" + df_csv.to_csv(index=False) | |
| except Exception as e: | |
| return f"CSV parse error (NER): {e}", None, None, None | |
| elif fx.endswith((".xls", ".xlsx")): | |
| try: | |
| df_xl = parse_excel_file_to_df(file_up) | |
| combined_text += "\n" + df_xl.to_csv(index=False) | |
| except Exception as e: | |
| return f"Excel parse error (NER): {e}", None, None, None | |
| ner_result = perform_named_entity_recognition(combined_text) | |
| return ner_result, None, None, None | |
| elif action == "Fetch Clinical Studies": | |
| if nct_id: | |
| result = await fetch_articles_by_nct_id(nct_id) | |
| elif query_str: | |
| result = await fetch_articles_by_query(query_str) | |
| else: | |
| return "Provide either an NCT ID or valid query parameters.", None, None, None | |
| articles = result.get("resultList", {}).get("result", []) | |
| if not articles: | |
| return "No articles found.", None, None, None | |
| formatted = "\n\n".join( | |
| f"Title: {a.get('title')}\nJournal: {a.get('journalTitle')} ({a.get('pubYear')})" | |
| for a in articles | |
| ) | |
| return formatted, None, None, None | |
| elif action in ["Fetch PubMed Articles (Legacy)", "Fetch PubMed by Query"]: | |
| pubmed_result = await fetch_pubmed_by_query(query_str) | |
| xml_data = pubmed_result.get("result") | |
| if xml_data: | |
| articles = parse_pubmed_xml(xml_data) | |
| if not articles: | |
| return "No articles found.", None, None, None | |
| formatted = "\n\n".join( | |
| f"{a['Title']} - {a['Journal']} ({a['PublicationDate']})" | |
| for a in articles if a['Title'] | |
| ) | |
| return formatted if formatted else "No articles found.", None, None, None | |
| return "No articles found or error in fetching PubMed data.", None, None, None | |
| elif action == "Fetch Crossref by Query": | |
| crossref_result = await fetch_crossref_by_query(query_str) | |
| items = crossref_result.get("message", {}).get("items", []) | |
| if not items: | |
| return "No results found.", None, None, None | |
| crossref_formatted = "\n\n".join( | |
| f"Title: {it.get('title', ['No title'])[0]}, DOI: {it.get('DOI')}" | |
| for it in items | |
| ) | |
| return crossref_formatted, None, None, None | |
| elif action == "Fetch BioPortal by Query": | |
| bp_result = await fetch_bioportal_by_query(query_str) | |
| collection = bp_result.get("collection", []) | |
| if not collection: | |
| return "No BioPortal results found.", None, None, None | |
| # Format listing | |
| formatted = "\n\n".join( | |
| f"Label: {col.get('prefLabel')}, ID: {col.get('@id')}" | |
| for col in collection | |
| ) | |
| return formatted, None, None, None | |
| # Fallback | |
| return "Invalid action.", None, None, None | |
| except Exception as ex: | |
| # Catch all exceptions, log, and return traceback to 'output_text' | |
| tb_str = traceback.format_exc() | |
| logger.error(f"Exception in handle_action:\n{tb_str}") | |
| return f"Traceback:\n{tb_str}", None, None, None | |
| submit_btn.click( | |
| fn=handle_action, | |
| inputs=[action, text_input, file_input, translation_option, query_params_input, nct_id_input, report_filename_input, export_format], | |
| outputs=[output_text, output_chart, output_chart2, output_file], | |
| ) | |
| # Launch the Gradio interface | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=True) | |