unknown
Fix: Remove @gr .mcp.tool() decorators and add numpy<2.0.0 for HuggingFace compatibility
65719ee
"""
PipelineForge MCP - Gradio MCP Server for AWS Glue ETL Optimization
MCP 1st Birthday Hackathon Submission - AUTOMATIC DATA FLOW VERSION
"""
import os
import gradio as gr
from dotenv import load_dotenv
import anthropic
from elevenlabs.client import ElevenLabs
import json
from typing import List, Dict, Optional, Tuple
from PIL import Image
import io
import base64
load_dotenv()
# Initialize API clients
anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY")
elevenlabs_client = ElevenLabs(api_key=elevenlabs_api_key) if elevenlabs_api_key else None
# Initialize RAG (using minimal implementation - no LangChain conflicts)
template_store = None
try:
from rag_templates_minimal import get_template_store
template_store = get_template_store()
if template_store:
print("βœ… RAG template store initialized")
else:
print("⚠️ RAG template store returned None")
except Exception as e:
print(f"⚠️ RAG disabled: {e}")
# ===========================
# CORE MCP TOOLS
# ===========================
def analyze_screenshot(image: Image.Image, requirements: str) -> Dict:
"""Analyze AWS console screenshot and extract ETL pipeline requirements."""
try:
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
message = anthropic_client.messages.create(
model="claude-3-opus-20240229",
max_tokens=2048,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": img_str}},
{"type": "text", "text": f"""Analyze this AWS console screenshot and extract ETL pipeline requirements.
Additional context: {requirements}
Return JSON with: sources, transformations, targets, estimated_volume, frequency"""}
]
}]
)
response_text = message.content[0].text
try:
return json.loads(response_text)
except:
return {"analysis": response_text, "sources": [], "transformations": [], "targets": [], "estimated_volume": "50GB", "frequency": "daily"}
except Exception as e:
return {"error": f"Screenshot analysis failed: {str(e)}"}
def generate_glue_script(requirements: Dict, optimization_level: str = "balanced") -> Dict:
"""Generate optimized AWS Glue PySpark ETL script."""
try:
message = anthropic_client.messages.create(
model="claude-3-opus-20240229",
max_tokens=4096,
messages=[{"role": "user", "content": f"""Generate an optimized AWS Glue PySpark ETL script:
{json.dumps(requirements, indent=2)}
Optimization: {optimization_level}
Include error handling, logging, and AWS Glue best practices."""}]
)
return {"script": message.content[0].text, "optimization_level": optimization_level}
except Exception as e:
return {"error": f"Script generation failed: {str(e)}"}
def simulate_glue_cost(requirements: Dict, worker_type: str = "G.1X", num_workers: int = 2) -> Dict:
"""Simulate AWS Glue job cost."""
try:
pricing = {"G.1X": 0.44, "G.2X": 0.88, "G.4X": 1.76, "G.8X": 3.52}
volume_str = str(requirements.get("estimated_volume", "50GB"))
if "GB" in volume_str:
volume_gb = float(volume_str.replace("GB", "").strip())
base_time_minutes = volume_gb * 5
elif "TB" in volume_str:
volume_tb = float(volume_str.replace("TB", "").strip())
base_time_minutes = volume_tb * 1000 * 5
else:
base_time_minutes = 30
worker_multiplier = {"G.1X": 1.0, "G.2X": 0.6, "G.4X": 0.4, "G.8X": 0.25}
estimated_time_hours = (base_time_minutes * worker_multiplier.get(worker_type, 1.0)) / 60
dpu_per_worker = {"G.1X": 1, "G.2X": 2, "G.4X": 4, "G.8X": 8}
total_dpus = num_workers * dpu_per_worker.get(worker_type, 1)
hourly_cost = total_dpus * pricing.get(worker_type, 0.44)
total_cost = hourly_cost * estimated_time_hours
recommendations = []
if total_cost > 10:
recommendations.append("Consider G.1X workers for cost optimization")
if estimated_time_hours > 2:
recommendations.append("Consider partitioning data")
return {
"worker_type": worker_type,
"num_workers": num_workers,
"estimated_time_hours": round(estimated_time_hours, 2),
"hourly_cost_usd": round(hourly_cost, 2),
"total_cost_usd": round(total_cost, 2),
"total_dpus": total_dpus,
"recommendations": recommendations
}
except Exception as e:
return {"error": f"Cost simulation failed: {str(e)}"}
def generate_cdk_infrastructure(requirements: Dict, script: str) -> Dict:
"""Generate AWS CDK Python code."""
try:
message = anthropic_client.messages.create(
model="claude-3-opus-20240229",
max_tokens=4096,
messages=[{"role": "user", "content": f"""Generate AWS CDK Python code:
Requirements: {json.dumps(requirements, indent=2)}
Script: {script[:500]}...
Include Glue job, IAM, S3, CloudWatch, EventBridge."""}]
)
return {"cdk_code": message.content[0].text}
except Exception as e:
return {"error": f"CDK generation failed: {str(e)}"}
def generate_voice_explanation(content: str) -> str:
"""Generate voice narration using ElevenLabs TTS."""
try:
if not elevenlabs_client:
return "ElevenLabs API key not configured"
audio_generator = elevenlabs_client.text_to_speech.convert(
text=content[:1000],
voice_id="21m00Tcm4TlvDq8ikWAM",
model_id="eleven_monolingual_v1"
)
output_path = "narration.mp3"
with open(output_path, "wb") as f:
for chunk in audio_generator:
f.write(chunk)
return output_path
except Exception as e:
return f"Voice generation failed: {str(e)}"
def find_similar_pipelines(query: str, top_k: int = 3) -> Dict:
"""Find similar ETL pipeline templates using RAG."""
try:
if template_store is None:
return {"error": "Template store not initialized"}
similar = template_store.find_similar(query, top_k)
return {"query": query, "similar_templates": similar, "count": len(similar)}
except Exception as e:
return {"error": f"Template search failed: {str(e)}"}
# ===========================
# GRADIO UI - AUTOMATIC DATA FLOW
# ===========================
def create_ui():
"""Create Gradio interface with automatic data flow and colorful theme"""
# Custom CSS for colorful, modern theme
custom_css = """
/* Main container gradient background */
.gradio-container {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
}
/* Header styling */
.main-header {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
padding: 2rem;
border-radius: 15px;
margin-bottom: 2rem;
box-shadow: 0 8px 32px rgba(31, 38, 135, 0.37);
backdrop-filter: blur(4px);
border: 1px solid rgba(255, 255, 255, 0.18);
}
/* Tab styling */
.tab-nav button {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white !important;
border: none;
border-radius: 10px 10px 0 0;
padding: 12px 24px;
margin: 0 4px;
transition: all 0.3s ease;
font-weight: 600;
}
.tab-nav button:hover {
transform: translateY(-3px);
box-shadow: 0 5px 15px rgba(102, 126, 234, 0.4);
}
.tab-nav button.selected {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
box-shadow: 0 5px 20px rgba(245, 87, 108, 0.5);
}
/* Button styling */
.primary-button {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
border: none !important;
color: white !important;
padding: 14px 32px !important;
border-radius: 12px !important;
font-size: 16px !important;
font-weight: 600 !important;
transition: all 0.3s ease !important;
box-shadow: 0 6px 20px rgba(102, 126, 234, 0.4) !important;
}
.primary-button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 8px 30px rgba(102, 126, 234, 0.6) !important;
}
/* Card containers */
.gr-box, .gr-form, .gr-panel {
background: rgba(255, 255, 255, 0.95) !important;
border-radius: 15px !important;
padding: 1.5rem !important;
box-shadow: 0 8px 32px rgba(31, 38, 135, 0.15) !important;
backdrop-filter: blur(4px) !important;
border: 1px solid rgba(255, 255, 255, 0.3) !important;
}
/* Input fields */
input, textarea, select {
border: 2px solid #e0e7ff !important;
border-radius: 10px !important;
padding: 10px !important;
transition: all 0.3s ease !important;
background: white !important;
}
input:focus, textarea:focus, select:focus {
border-color: #667eea !important;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important;
}
/* Code blocks */
.code-container {
background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%) !important;
border-radius: 12px !important;
padding: 1rem !important;
box-shadow: 0 8px 32px rgba(30, 60, 114, 0.3) !important;
}
/* JSON output */
.json-container {
background: linear-gradient(135deg, #134e5e 0%, #71b280 100%) !important;
border-radius: 12px !important;
padding: 1rem !important;
color: white !important;
}
/* Success indicators */
.success-mark {
color: #10b981;
font-size: 1.2em;
font-weight: bold;
}
/* Step numbers */
.step-number {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
color: white;
border-radius: 50%;
width: 40px;
height: 40px;
display: inline-flex;
align-items: center;
justify-content: center;
font-weight: bold;
margin-right: 10px;
}
/* Footer */
.footer {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 1.5rem;
border-radius: 15px;
margin-top: 2rem;
color: white;
text-align: center;
box-shadow: 0 8px 32px rgba(102, 126, 234, 0.3);
}
/* Animations */
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.8; }
}
.animated-icon {
animation: pulse 2s ease-in-out infinite;
}
"""
with gr.Blocks(title="PipelineForge MCP") as demo:
# Inject custom CSS
gr.HTML(f"""
<style>
{custom_css}
</style>
""")
gr.HTML("""
<div class="main-header">
<h1 style="color: white; margin: 0; font-size: 3em; text-shadow: 2px 2px 4px rgba(0,0,0,0.2);">
πŸ”§ PipelineForge MCP
</h1>
<p style="color: rgba(255,255,255,0.95); font-size: 1.3em; margin-top: 0.5rem;">
⚑ AWS Glue ETL Optimizer - Automatic Workflow ⚑
</p>
<p style="color: rgba(255,255,255,0.9); font-size: 1.1em; margin-top: 1rem;">
🎯 Each step automatically uses data from previous steps - no copy/paste needed!
</p>
</div>
""")
# Shared state
pipeline_state = gr.State({"requirements": {}, "script": "", "cost": {}, "cdk": ""})
with gr.Tabs() as tabs:
# Tab 1: Screenshot Analysis
with gr.Tab("1️⃣ Start: Analyze Screenshot") as tab1:
gr.Markdown("### Extract requirements from AWS console image")
with gr.Row():
with gr.Column():
img_input = gr.Image(type="pil", label="Upload Screenshot")
req_input = gr.Textbox(label="Description", placeholder="Daily sales ETL", lines=2)
analyze_btn = gr.Button("πŸ” Analyze", variant="primary", size="lg")
with gr.Column():
analysis_out = gr.JSON(label="βœ… Requirements Extracted")
gr.Markdown("**Next:** Go to Script Generation tab β†’")
def analyze_and_store(img, req, state):
result = analyze_screenshot(img, req) if img else {"estimated_volume": "50GB", "sources": [], "transformations": [], "targets": []}
state["requirements"] = result
return result, state
analyze_btn.click(analyze_and_store, [img_input, req_input, pipeline_state], [analysis_out, pipeline_state])
# Tab 2: Script Generation
with gr.Tab("2️⃣ Generate Script") as tab2:
gr.Markdown("### AI generates PySpark code automatically using requirements from Tab 1")
with gr.Row():
with gr.Column():
gr.Markdown("**Using requirements from Screenshot Analysis** βœ…")
opt_level = gr.Radio(["cost", "performance", "balanced"], value="balanced", label="Optimization")
gen_script_btn = gr.Button("πŸš€ Generate Script", variant="primary", size="lg")
with gr.Column():
script_out = gr.Code(label="βœ… Generated PySpark Script", language="python", lines=20)
gr.Markdown("**Next:** Go to Cost Simulation tab β†’")
def generate_and_store(opt, state):
reqs = state.get("requirements", {"estimated_volume": "50GB"})
result = generate_glue_script(reqs, opt)
script = result.get("script", "")
state["script"] = script
return script, state
gen_script_btn.click(generate_and_store, [opt_level, pipeline_state], [script_out, pipeline_state])
# Tab 3: Cost Simulation
with gr.Tab("3️⃣ Simulate Cost") as tab3:
gr.Markdown("### Calculate AWS Glue costs automatically using requirements from Tab 1")
with gr.Row():
with gr.Column():
gr.Markdown("**Using requirements from Screenshot Analysis** βœ…")
worker_type = gr.Dropdown(["G.1X", "G.2X", "G.4X", "G.8X"], value="G.1X", label="Worker Type")
num_workers = gr.Slider(2, 50, 5, step=1, label="Workers")
cost_btn = gr.Button("πŸ’΅ Simulate Cost", variant="primary", size="lg")
with gr.Column():
cost_out = gr.JSON(label="βœ… Cost Breakdown")
gr.Markdown("**Next:** Go to CDK Infrastructure tab β†’")
def simulate_and_store(worker, workers, state):
reqs = state.get("requirements", {"estimated_volume": "50GB"})
result = simulate_glue_cost(reqs, worker, workers)
state["cost"] = result
return result, state
cost_btn.click(simulate_and_store, [worker_type, num_workers, pipeline_state], [cost_out, pipeline_state])
# Tab 4: CDK Infrastructure
with gr.Tab("4️⃣ Generate CDK") as tab4:
gr.Markdown("### Generate deployment code automatically using requirements + script from previous tabs")
with gr.Row():
with gr.Column():
gr.Markdown("**Using requirements from Tab 1 + script from Tab 2** βœ…")
cdk_btn = gr.Button("🏭 Generate CDK", variant="primary", size="lg")
with gr.Column():
cdk_out = gr.Code(label="βœ… AWS CDK Python Code", language="python", lines=20)
gr.Markdown("**Next:** Go to Voice Summary tab β†’")
def generate_cdk_and_store(state):
reqs = state.get("requirements", {})
script = state.get("script", "")
result = generate_cdk_infrastructure(reqs, script)
cdk = result.get("cdk_code", "")
state["cdk"] = cdk
return cdk, state
cdk_btn.click(generate_cdk_and_store, [pipeline_state], [cdk_out, pipeline_state])
# Tab 5: Voice Summary
with gr.Tab("5️⃣ Voice Summary") as tab5:
gr.Markdown("### Automatic voice narration of your complete pipeline")
with gr.Row():
with gr.Column():
gr.Markdown("**Automatically summarizes ALL previous tabs** βœ…")
voice_btn = gr.Button("🎀 Generate Voice Summary", variant="primary", size="lg")
summary_text = gr.Textbox(label="Summary (auto-generated)", lines=10)
with gr.Column():
audio_out = gr.Audio(label="βœ… Voice Narration")
status_out = gr.Textbox(label="Status")
def generate_voice_summary(state):
parts = []
reqs = state.get("requirements", {})
if reqs:
volume = reqs.get("estimated_volume", "unknown")
sources = ", ".join(reqs.get("sources", []))[:50]
parts.append(f"This ETL pipeline processes {volume} of data")
if sources:
parts.append(f"from {sources}")
cost = state.get("cost", {})
if cost and "total_cost_usd" in cost:
cost_val = cost["total_cost_usd"]
time_val = cost["estimated_time_hours"]
workers = cost["num_workers"]
parts.append(f"using {workers} workers. It costs ${cost_val:.2f} per run, taking {time_val:.1f} hours.")
if state.get("script"):
parts.append("Complete PySpark script has been generated with AWS Glue best practices.")
if state.get("cdk"):
parts.append("Infrastructure code is ready for deployment with CDK.")
summary = " ".join(parts) if parts else "Complete the previous tabs first to generate summary."
audio_path = generate_voice_explanation(summary)
status = "βœ… Voice generated!" if not audio_path.startswith("Voice generation failed") else "❌ " + audio_path
return summary, audio_path, status
voice_btn.click(generate_voice_summary, [pipeline_state], [summary_text, audio_out, status_out])
# Tab 6: Template Library
with gr.Tab("6️⃣ Find Similar Templates") as tab6:
gr.Markdown("### Search ETL template library with RAG")
with gr.Row():
with gr.Column():
# Dropdown with pre-defined queries
template_dropdown = gr.Dropdown(
choices=[
"Daily sales aggregation with customer join",
"Real-time CDC processing from DynamoDB",
"Data quality validation with Great Expectations",
"Multi-source data lake integration",
"Incremental ETL with job bookmarks",
"Custom query..."
],
label="Select Template Type",
value="Daily sales aggregation with customer join"
)
custom_query = gr.Textbox(label="Or enter custom query", placeholder="Describe your ETL needs", lines=2)
search_btn = gr.Button("πŸ” Find Templates", variant="primary")
with gr.Column():
template_out = gr.JSON(label="βœ… Similar Templates")
def search_templates(dropdown_val, custom_val):
query = custom_val if custom_val else dropdown_val
return find_similar_pipelines(query, 3)
search_btn.click(search_templates, [template_dropdown, custom_query], template_out)
gr.HTML("""
<div class="footer">
<h2 style="margin: 0; color: white; text-shadow: 2px 2px 4px rgba(0,0,0,0.2);">
πŸ† MCP 1st Birthday Hackathon
</h2>
<p style="margin-top: 0.5rem; font-size: 1.2em; color: rgba(255,255,255,0.95);">
✨ All 6 features with automatic data flow ✨
</p>
<p style="margin-top: 0.5rem; color: rgba(255,255,255,0.9);">
🎯 ChromaDB RAG | ⚑ Modal Testing | 🧠 Claude AI | πŸŽ™οΈ ElevenLabs TTS
</p>
</div>
""")
return demo
if __name__ == "__main__":
demo = create_ui()
demo.launch(
server_name=os.getenv("GRADIO_SERVER_NAME", "127.0.0.1"),
server_port=int(os.getenv("GRADIO_SERVER_PORT", "7860")),
share=False
)