luciagomez's picture
Upload app.py
555f6a1 verified
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from markdownify import markdownify as md
import tempfile
import zipfile
import re
from typing import Tuple, Set, List
import os
import gradio as gr
from collections import deque
# ===========================================================
# UTILITIES β€” Recursive crawler
# ===========================================================
def crawl_site_for_links(start_url: str, max_pages: int = 50, max_depth: int = 2):
"""Crawl the given site (up to max_depth) and return sets of HTML and PDF URLs."""
visited = set()
html_links = set()
pdf_links = set()
parsed_base = urlparse(start_url)
domain = parsed_base.netloc
queue = deque([(start_url, 0)]) # (url, depth)
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0 Safari/537.36'
})
while queue and len(visited) < max_pages:
current_url, depth = queue.popleft()
if current_url in visited or depth > max_depth:
continue
visited.add(current_url)
try:
response = session.get(current_url, timeout=10)
if "text/html" not in response.headers.get("Content-Type", ""):
continue
soup = BeautifulSoup(response.content, "html.parser")
for a in soup.find_all("a", href=True):
href = a["href"]
full_url = urljoin(current_url, href)
parsed = urlparse(full_url)
# Stay in same domain
if parsed.netloc != domain:
continue
if full_url.lower().endswith(".pdf"):
pdf_links.add(full_url)
elif not href.startswith(("#", "javascript:", "mailto:", "tel:")):
html_links.add(full_url)
if full_url not in visited and depth + 1 <= max_depth:
queue.append((full_url, depth + 1))
except Exception:
continue
return html_links, pdf_links
# ===========================================================
# MAIN FUNCTION β€” Extract text & PDFs into ZIP
# ===========================================================
def extract_all_content_as_zip(url: str, max_links: int = None, max_depth: int = 2) -> Tuple[str, str]:
"""
Extract text content and PDFs from all internal links found on a website recursively.
"""
try:
if not url.startswith(("http://", "https://")):
url = "https://" + url
html_links, pdf_links = crawl_site_for_links(url, max_pages=(max_links or 50), max_depth=max_depth)
if not html_links and not pdf_links:
return "No internal links or PDFs found to extract.", None
# Limit HTML pages if requested
total_html = len(html_links)
if max_links is not None:
html_links = list(html_links)[:max_links]
limited_message = f" (limited to {max_links} out of {total_html})"
else:
limited_message = ""
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_zip:
zip_path = temp_zip.name
successful_html = 0
failed_html = 0
successful_pdfs = 0
failed_pdfs = 0
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
})
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zip_file:
# Process HTML pages
for i, link_url in enumerate(html_links, 1):
try:
resp = session.get(link_url, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
main_content = (
soup.find("main") or
soup.find("article") or
soup.find("div", class_=re.compile(r"content|main|post|article")) or
soup.find("body")
)
if not main_content:
failed_html += 1
continue
markdown_text = md(str(main_content), heading_style="ATX")
markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text)
title = soup.find("title")
if title:
markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text.strip()}"
filename = re.sub(r"[^\w\-_.]", "_", urlparse(link_url).path or f"page_{i}") + ".md"
if filename in [".md", "index.md"]:
filename = f"page_{i}.md"
zip_file.writestr(filename, f"<!-- Source: {link_url} -->\n\n{markdown_text.strip()}")
successful_html += 1
except Exception:
failed_html += 1
continue
# Process PDFs
for j, pdf_url in enumerate(pdf_links, 1):
try:
resp = session.get(pdf_url, timeout=20)
resp.raise_for_status()
pdf_filename = os.path.basename(urlparse(pdf_url).path)
if not pdf_filename.lower().endswith(".pdf"):
pdf_filename = f"document_{j}.pdf"
zip_file.writestr(f"pdfs/{pdf_filename}", resp.content)
successful_pdfs += 1
except Exception:
failed_pdfs += 1
continue
status_message = (
f"Extracted {successful_html} HTML pages{limited_message} and "
f"downloaded {successful_pdfs} PDFs successfully."
)
if failed_html or failed_pdfs:
status_message += f" ({failed_html} HTML and {failed_pdfs} PDF downloads failed.)"
status_message += f" Created ZIP with {successful_html} markdown files and {successful_pdfs} PDFs."
return status_message, zip_path
except Exception as e:
return f"Error: {str(e)}", None
# ===========================================================
# GRADIO UI
# ===========================================================
def gradio_extract(url, max_links, max_depth):
message, zip_path = extract_all_content_as_zip(url, max_links, max_depth)
if zip_path:
return message, zip_path
else:
return message, None
gr_interface = gr.Interface(
fn=gradio_extract,
inputs=[
gr.Textbox(label="Website URL", placeholder="https://example.com"),
gr.Number(label="Max number of links (optional)", value=50),
gr.Slider(label="Crawl depth", minimum=1, maximum=3, value=2, step=1)
],
outputs=[
gr.Textbox(label="Status Message"),
gr.File(label="Download ZIP")
],
title="Recursive Website Content & PDF Extractor",
description="Recursively crawls a website to extract text and PDF files from internal pages and bundles them into a ZIP file."
)
# ===========================================================
# MCP SERVER STUB
# ===========================================================
class MCPServerApp:
def __init__(self):
self.name = "mcp_content_extractor"
def launch(self, mcp_server=False):
if mcp_server:
print("πŸš€ MCP server running (stub mode) β€” ready for agent connections.")
else:
print("Launching Gradio UI...")
gr_interface.launch(server_name="0.0.0.0", server_port=7860)
def create_mcp_interface():
return MCPServerApp()
# ===========================================================
# ENTRY POINT
# ===========================================================
if __name__ == "__main__":
app = create_mcp_interface()
app.launch(mcp_server=False)