Spaces:
Sleeping
Sleeping
| import requests | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urlparse, urljoin | |
| from markdownify import markdownify as md | |
| import tempfile | |
| import zipfile | |
| import re | |
| from typing import Tuple, Set, List | |
| import os | |
| import gradio as gr | |
| from collections import deque | |
| # =========================================================== | |
| # UTILITIES β Recursive crawler | |
| # =========================================================== | |
| def crawl_site_for_links(start_url: str, max_pages: int = 50, max_depth: int = 2): | |
| """Crawl the given site (up to max_depth) and return sets of HTML and PDF URLs.""" | |
| visited = set() | |
| html_links = set() | |
| pdf_links = set() | |
| parsed_base = urlparse(start_url) | |
| domain = parsed_base.netloc | |
| queue = deque([(start_url, 0)]) # (url, depth) | |
| session = requests.Session() | |
| session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0 Safari/537.36' | |
| }) | |
| while queue and len(visited) < max_pages: | |
| current_url, depth = queue.popleft() | |
| if current_url in visited or depth > max_depth: | |
| continue | |
| visited.add(current_url) | |
| try: | |
| response = session.get(current_url, timeout=10) | |
| if "text/html" not in response.headers.get("Content-Type", ""): | |
| continue | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| for a in soup.find_all("a", href=True): | |
| href = a["href"] | |
| full_url = urljoin(current_url, href) | |
| parsed = urlparse(full_url) | |
| # Stay in same domain | |
| if parsed.netloc != domain: | |
| continue | |
| if full_url.lower().endswith(".pdf"): | |
| pdf_links.add(full_url) | |
| elif not href.startswith(("#", "javascript:", "mailto:", "tel:")): | |
| html_links.add(full_url) | |
| if full_url not in visited and depth + 1 <= max_depth: | |
| queue.append((full_url, depth + 1)) | |
| except Exception: | |
| continue | |
| return html_links, pdf_links | |
| # =========================================================== | |
| # MAIN FUNCTION β Extract text & PDFs into ZIP | |
| # =========================================================== | |
| def extract_all_content_as_zip(url: str, max_links: int = None, max_depth: int = 2) -> Tuple[str, str]: | |
| """ | |
| Extract text content and PDFs from all internal links found on a website recursively. | |
| """ | |
| try: | |
| if not url.startswith(("http://", "https://")): | |
| url = "https://" + url | |
| html_links, pdf_links = crawl_site_for_links(url, max_pages=(max_links or 50), max_depth=max_depth) | |
| if not html_links and not pdf_links: | |
| return "No internal links or PDFs found to extract.", None | |
| # Limit HTML pages if requested | |
| total_html = len(html_links) | |
| if max_links is not None: | |
| html_links = list(html_links)[:max_links] | |
| limited_message = f" (limited to {max_links} out of {total_html})" | |
| else: | |
| limited_message = "" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_zip: | |
| zip_path = temp_zip.name | |
| successful_html = 0 | |
| failed_html = 0 | |
| successful_pdfs = 0 | |
| failed_pdfs = 0 | |
| session = requests.Session() | |
| session.headers.update({ | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" | |
| }) | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zip_file: | |
| # Process HTML pages | |
| for i, link_url in enumerate(html_links, 1): | |
| try: | |
| resp = session.get(link_url, timeout=10) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.content, "html.parser") | |
| for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
| tag.decompose() | |
| main_content = ( | |
| soup.find("main") or | |
| soup.find("article") or | |
| soup.find("div", class_=re.compile(r"content|main|post|article")) or | |
| soup.find("body") | |
| ) | |
| if not main_content: | |
| failed_html += 1 | |
| continue | |
| markdown_text = md(str(main_content), heading_style="ATX") | |
| markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) | |
| title = soup.find("title") | |
| if title: | |
| markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text.strip()}" | |
| filename = re.sub(r"[^\w\-_.]", "_", urlparse(link_url).path or f"page_{i}") + ".md" | |
| if filename in [".md", "index.md"]: | |
| filename = f"page_{i}.md" | |
| zip_file.writestr(filename, f"<!-- Source: {link_url} -->\n\n{markdown_text.strip()}") | |
| successful_html += 1 | |
| except Exception: | |
| failed_html += 1 | |
| continue | |
| # Process PDFs | |
| for j, pdf_url in enumerate(pdf_links, 1): | |
| try: | |
| resp = session.get(pdf_url, timeout=20) | |
| resp.raise_for_status() | |
| pdf_filename = os.path.basename(urlparse(pdf_url).path) | |
| if not pdf_filename.lower().endswith(".pdf"): | |
| pdf_filename = f"document_{j}.pdf" | |
| zip_file.writestr(f"pdfs/{pdf_filename}", resp.content) | |
| successful_pdfs += 1 | |
| except Exception: | |
| failed_pdfs += 1 | |
| continue | |
| status_message = ( | |
| f"Extracted {successful_html} HTML pages{limited_message} and " | |
| f"downloaded {successful_pdfs} PDFs successfully." | |
| ) | |
| if failed_html or failed_pdfs: | |
| status_message += f" ({failed_html} HTML and {failed_pdfs} PDF downloads failed.)" | |
| status_message += f" Created ZIP with {successful_html} markdown files and {successful_pdfs} PDFs." | |
| return status_message, zip_path | |
| except Exception as e: | |
| return f"Error: {str(e)}", None | |
| # =========================================================== | |
| # GRADIO UI | |
| # =========================================================== | |
| def gradio_extract(url, max_links, max_depth): | |
| message, zip_path = extract_all_content_as_zip(url, max_links, max_depth) | |
| if zip_path: | |
| return message, zip_path | |
| else: | |
| return message, None | |
| gr_interface = gr.Interface( | |
| fn=gradio_extract, | |
| inputs=[ | |
| gr.Textbox(label="Website URL", placeholder="https://example.com"), | |
| gr.Number(label="Max number of links (optional)", value=50), | |
| gr.Slider(label="Crawl depth", minimum=1, maximum=3, value=2, step=1) | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Status Message"), | |
| gr.File(label="Download ZIP") | |
| ], | |
| title="Recursive Website Content & PDF Extractor", | |
| description="Recursively crawls a website to extract text and PDF files from internal pages and bundles them into a ZIP file." | |
| ) | |
| # =========================================================== | |
| # MCP SERVER STUB | |
| # =========================================================== | |
| class MCPServerApp: | |
| def __init__(self): | |
| self.name = "mcp_content_extractor" | |
| def launch(self, mcp_server=False): | |
| if mcp_server: | |
| print("π MCP server running (stub mode) β ready for agent connections.") | |
| else: | |
| print("Launching Gradio UI...") | |
| gr_interface.launch(server_name="0.0.0.0", server_port=7860) | |
| def create_mcp_interface(): | |
| return MCPServerApp() | |
| # =========================================================== | |
| # ENTRY POINT | |
| # =========================================================== | |
| if __name__ == "__main__": | |
| app = create_mcp_interface() | |
| app.launch(mcp_server=False) | |