""" PipelineForge MCP - Gradio MCP Server for AWS Glue ETL Optimization MCP 1st Birthday Hackathon Submission - AUTOMATIC DATA FLOW VERSION """ import os import gradio as gr from dotenv import load_dotenv import anthropic from elevenlabs.client import ElevenLabs import json from typing import List, Dict, Optional, Tuple from PIL import Image import io import base64 load_dotenv() # Initialize API clients anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY") elevenlabs_client = ElevenLabs(api_key=elevenlabs_api_key) if elevenlabs_api_key else None # Initialize RAG (using minimal implementation - no LangChain conflicts) template_store = None try: from rag_templates_minimal import get_template_store template_store = get_template_store() if template_store: print("✅ RAG template store initialized") else: print("⚠️ RAG template store returned None") except Exception as e: print(f"⚠️ RAG disabled: {e}") # =========================== # CORE MCP TOOLS # =========================== def analyze_screenshot(image: Image.Image, requirements: str) -> Dict: """Analyze AWS console screenshot and extract ETL pipeline requirements.""" try: buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() message = anthropic_client.messages.create( model="claude-3-opus-20240229", max_tokens=2048, messages=[{ "role": "user", "content": [ {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": img_str}}, {"type": "text", "text": f"""Analyze this AWS console screenshot and extract ETL pipeline requirements. Additional context: {requirements} Return JSON with: sources, transformations, targets, estimated_volume, frequency"""} ] }] ) response_text = message.content[0].text try: return json.loads(response_text) except: return {"analysis": response_text, "sources": [], "transformations": [], "targets": [], "estimated_volume": "50GB", "frequency": "daily"} except Exception as e: return {"error": f"Screenshot analysis failed: {str(e)}"} def generate_glue_script(requirements: Dict, optimization_level: str = "balanced") -> Dict: """Generate optimized AWS Glue PySpark ETL script.""" try: message = anthropic_client.messages.create( model="claude-3-opus-20240229", max_tokens=4096, messages=[{"role": "user", "content": f"""Generate an optimized AWS Glue PySpark ETL script: {json.dumps(requirements, indent=2)} Optimization: {optimization_level} Include error handling, logging, and AWS Glue best practices."""}] ) return {"script": message.content[0].text, "optimization_level": optimization_level} except Exception as e: return {"error": f"Script generation failed: {str(e)}"} def simulate_glue_cost(requirements: Dict, worker_type: str = "G.1X", num_workers: int = 2) -> Dict: """Simulate AWS Glue job cost.""" try: pricing = {"G.1X": 0.44, "G.2X": 0.88, "G.4X": 1.76, "G.8X": 3.52} volume_str = str(requirements.get("estimated_volume", "50GB")) if "GB" in volume_str: volume_gb = float(volume_str.replace("GB", "").strip()) base_time_minutes = volume_gb * 5 elif "TB" in volume_str: volume_tb = float(volume_str.replace("TB", "").strip()) base_time_minutes = volume_tb * 1000 * 5 else: base_time_minutes = 30 worker_multiplier = {"G.1X": 1.0, "G.2X": 0.6, "G.4X": 0.4, "G.8X": 0.25} estimated_time_hours = (base_time_minutes * worker_multiplier.get(worker_type, 1.0)) / 60 dpu_per_worker = {"G.1X": 1, "G.2X": 2, "G.4X": 4, "G.8X": 8} total_dpus = num_workers * dpu_per_worker.get(worker_type, 1) hourly_cost = total_dpus * pricing.get(worker_type, 0.44) total_cost = hourly_cost * estimated_time_hours recommendations = [] if total_cost > 10: recommendations.append("Consider G.1X workers for cost optimization") if estimated_time_hours > 2: recommendations.append("Consider partitioning data") return { "worker_type": worker_type, "num_workers": num_workers, "estimated_time_hours": round(estimated_time_hours, 2), "hourly_cost_usd": round(hourly_cost, 2), "total_cost_usd": round(total_cost, 2), "total_dpus": total_dpus, "recommendations": recommendations } except Exception as e: return {"error": f"Cost simulation failed: {str(e)}"} def generate_cdk_infrastructure(requirements: Dict, script: str) -> Dict: """Generate AWS CDK Python code.""" try: message = anthropic_client.messages.create( model="claude-3-opus-20240229", max_tokens=4096, messages=[{"role": "user", "content": f"""Generate AWS CDK Python code: Requirements: {json.dumps(requirements, indent=2)} Script: {script[:500]}... Include Glue job, IAM, S3, CloudWatch, EventBridge."""}] ) return {"cdk_code": message.content[0].text} except Exception as e: return {"error": f"CDK generation failed: {str(e)}"} def generate_voice_explanation(content: str) -> str: """Generate voice narration using ElevenLabs TTS.""" try: if not elevenlabs_client: return "ElevenLabs API key not configured" audio_generator = elevenlabs_client.text_to_speech.convert( text=content[:1000], voice_id="21m00Tcm4TlvDq8ikWAM", model_id="eleven_monolingual_v1" ) output_path = "narration.mp3" with open(output_path, "wb") as f: for chunk in audio_generator: f.write(chunk) return output_path except Exception as e: return f"Voice generation failed: {str(e)}" def find_similar_pipelines(query: str, top_k: int = 3) -> Dict: """Find similar ETL pipeline templates using RAG.""" try: if template_store is None: return {"error": "Template store not initialized"} similar = template_store.find_similar(query, top_k) return {"query": query, "similar_templates": similar, "count": len(similar)} except Exception as e: return {"error": f"Template search failed: {str(e)}"} # =========================== # GRADIO UI - AUTOMATIC DATA FLOW # =========================== def create_ui(): """Create Gradio interface with automatic data flow and colorful theme""" # Custom CSS for colorful, modern theme custom_css = """ /* Main container gradient background */ .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; } /* Header styling */ .main-header { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); padding: 2rem; border-radius: 15px; margin-bottom: 2rem; box-shadow: 0 8px 32px rgba(31, 38, 135, 0.37); backdrop-filter: blur(4px); border: 1px solid rgba(255, 255, 255, 0.18); } /* Tab styling */ .tab-nav button { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white !important; border: none; border-radius: 10px 10px 0 0; padding: 12px 24px; margin: 0 4px; transition: all 0.3s ease; font-weight: 600; } .tab-nav button:hover { transform: translateY(-3px); box-shadow: 0 5px 15px rgba(102, 126, 234, 0.4); } .tab-nav button.selected { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); box-shadow: 0 5px 20px rgba(245, 87, 108, 0.5); } /* Button styling */ .primary-button { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; border: none !important; color: white !important; padding: 14px 32px !important; border-radius: 12px !important; font-size: 16px !important; font-weight: 600 !important; transition: all 0.3s ease !important; box-shadow: 0 6px 20px rgba(102, 126, 234, 0.4) !important; } .primary-button:hover { transform: translateY(-2px) !important; box-shadow: 0 8px 30px rgba(102, 126, 234, 0.6) !important; } /* Card containers */ .gr-box, .gr-form, .gr-panel { background: rgba(255, 255, 255, 0.95) !important; border-radius: 15px !important; padding: 1.5rem !important; box-shadow: 0 8px 32px rgba(31, 38, 135, 0.15) !important; backdrop-filter: blur(4px) !important; border: 1px solid rgba(255, 255, 255, 0.3) !important; } /* Input fields */ input, textarea, select { border: 2px solid #e0e7ff !important; border-radius: 10px !important; padding: 10px !important; transition: all 0.3s ease !important; background: white !important; } input:focus, textarea:focus, select:focus { border-color: #667eea !important; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important; } /* Code blocks */ .code-container { background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%) !important; border-radius: 12px !important; padding: 1rem !important; box-shadow: 0 8px 32px rgba(30, 60, 114, 0.3) !important; } /* JSON output */ .json-container { background: linear-gradient(135deg, #134e5e 0%, #71b280 100%) !important; border-radius: 12px !important; padding: 1rem !important; color: white !important; } /* Success indicators */ .success-mark { color: #10b981; font-size: 1.2em; font-weight: bold; } /* Step numbers */ .step-number { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); color: white; border-radius: 50%; width: 40px; height: 40px; display: inline-flex; align-items: center; justify-content: center; font-weight: bold; margin-right: 10px; } /* Footer */ .footer { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 1.5rem; border-radius: 15px; margin-top: 2rem; color: white; text-align: center; box-shadow: 0 8px 32px rgba(102, 126, 234, 0.3); } /* Animations */ @keyframes pulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.8; } } .animated-icon { animation: pulse 2s ease-in-out infinite; } """ with gr.Blocks(title="PipelineForge MCP") as demo: # Inject custom CSS gr.HTML(f""" """) gr.HTML("""

🔧 PipelineForge MCP

⚡ AWS Glue ETL Optimizer - Automatic Workflow ⚡

🎯 Each step automatically uses data from previous steps - no copy/paste needed!

""") # Shared state pipeline_state = gr.State({"requirements": {}, "script": "", "cost": {}, "cdk": ""}) with gr.Tabs() as tabs: # Tab 1: Screenshot Analysis with gr.Tab("1️⃣ Start: Analyze Screenshot") as tab1: gr.Markdown("### Extract requirements from AWS console image") with gr.Row(): with gr.Column(): img_input = gr.Image(type="pil", label="Upload Screenshot") req_input = gr.Textbox(label="Description", placeholder="Daily sales ETL", lines=2) analyze_btn = gr.Button("🔍 Analyze", variant="primary", size="lg") with gr.Column(): analysis_out = gr.JSON(label="✅ Requirements Extracted") gr.Markdown("**Next:** Go to Script Generation tab →") def analyze_and_store(img, req, state): result = analyze_screenshot(img, req) if img else {"estimated_volume": "50GB", "sources": [], "transformations": [], "targets": []} state["requirements"] = result return result, state analyze_btn.click(analyze_and_store, [img_input, req_input, pipeline_state], [analysis_out, pipeline_state]) # Tab 2: Script Generation with gr.Tab("2️⃣ Generate Script") as tab2: gr.Markdown("### AI generates PySpark code automatically using requirements from Tab 1") with gr.Row(): with gr.Column(): gr.Markdown("**Using requirements from Screenshot Analysis** ✅") opt_level = gr.Radio(["cost", "performance", "balanced"], value="balanced", label="Optimization") gen_script_btn = gr.Button("🚀 Generate Script", variant="primary", size="lg") with gr.Column(): script_out = gr.Code(label="✅ Generated PySpark Script", language="python", lines=20) gr.Markdown("**Next:** Go to Cost Simulation tab →") def generate_and_store(opt, state): reqs = state.get("requirements", {"estimated_volume": "50GB"}) result = generate_glue_script(reqs, opt) script = result.get("script", "") state["script"] = script return script, state gen_script_btn.click(generate_and_store, [opt_level, pipeline_state], [script_out, pipeline_state]) # Tab 3: Cost Simulation with gr.Tab("3️⃣ Simulate Cost") as tab3: gr.Markdown("### Calculate AWS Glue costs automatically using requirements from Tab 1") with gr.Row(): with gr.Column(): gr.Markdown("**Using requirements from Screenshot Analysis** ✅") worker_type = gr.Dropdown(["G.1X", "G.2X", "G.4X", "G.8X"], value="G.1X", label="Worker Type") num_workers = gr.Slider(2, 50, 5, step=1, label="Workers") cost_btn = gr.Button("💵 Simulate Cost", variant="primary", size="lg") with gr.Column(): cost_out = gr.JSON(label="✅ Cost Breakdown") gr.Markdown("**Next:** Go to CDK Infrastructure tab →") def simulate_and_store(worker, workers, state): reqs = state.get("requirements", {"estimated_volume": "50GB"}) result = simulate_glue_cost(reqs, worker, workers) state["cost"] = result return result, state cost_btn.click(simulate_and_store, [worker_type, num_workers, pipeline_state], [cost_out, pipeline_state]) # Tab 4: CDK Infrastructure with gr.Tab("4️⃣ Generate CDK") as tab4: gr.Markdown("### Generate deployment code automatically using requirements + script from previous tabs") with gr.Row(): with gr.Column(): gr.Markdown("**Using requirements from Tab 1 + script from Tab 2** ✅") cdk_btn = gr.Button("🏭 Generate CDK", variant="primary", size="lg") with gr.Column(): cdk_out = gr.Code(label="✅ AWS CDK Python Code", language="python", lines=20) gr.Markdown("**Next:** Go to Voice Summary tab →") def generate_cdk_and_store(state): reqs = state.get("requirements", {}) script = state.get("script", "") result = generate_cdk_infrastructure(reqs, script) cdk = result.get("cdk_code", "") state["cdk"] = cdk return cdk, state cdk_btn.click(generate_cdk_and_store, [pipeline_state], [cdk_out, pipeline_state]) # Tab 5: Voice Summary with gr.Tab("5️⃣ Voice Summary") as tab5: gr.Markdown("### Automatic voice narration of your complete pipeline") with gr.Row(): with gr.Column(): gr.Markdown("**Automatically summarizes ALL previous tabs** ✅") voice_btn = gr.Button("🎤 Generate Voice Summary", variant="primary", size="lg") summary_text = gr.Textbox(label="Summary (auto-generated)", lines=10) with gr.Column(): audio_out = gr.Audio(label="✅ Voice Narration") status_out = gr.Textbox(label="Status") def generate_voice_summary(state): parts = [] reqs = state.get("requirements", {}) if reqs: volume = reqs.get("estimated_volume", "unknown") sources = ", ".join(reqs.get("sources", []))[:50] parts.append(f"This ETL pipeline processes {volume} of data") if sources: parts.append(f"from {sources}") cost = state.get("cost", {}) if cost and "total_cost_usd" in cost: cost_val = cost["total_cost_usd"] time_val = cost["estimated_time_hours"] workers = cost["num_workers"] parts.append(f"using {workers} workers. It costs ${cost_val:.2f} per run, taking {time_val:.1f} hours.") if state.get("script"): parts.append("Complete PySpark script has been generated with AWS Glue best practices.") if state.get("cdk"): parts.append("Infrastructure code is ready for deployment with CDK.") summary = " ".join(parts) if parts else "Complete the previous tabs first to generate summary." audio_path = generate_voice_explanation(summary) status = "✅ Voice generated!" if not audio_path.startswith("Voice generation failed") else "❌ " + audio_path return summary, audio_path, status voice_btn.click(generate_voice_summary, [pipeline_state], [summary_text, audio_out, status_out]) # Tab 6: Template Library with gr.Tab("6️⃣ Find Similar Templates") as tab6: gr.Markdown("### Search ETL template library with RAG") with gr.Row(): with gr.Column(): # Dropdown with pre-defined queries template_dropdown = gr.Dropdown( choices=[ "Daily sales aggregation with customer join", "Real-time CDC processing from DynamoDB", "Data quality validation with Great Expectations", "Multi-source data lake integration", "Incremental ETL with job bookmarks", "Custom query..." ], label="Select Template Type", value="Daily sales aggregation with customer join" ) custom_query = gr.Textbox(label="Or enter custom query", placeholder="Describe your ETL needs", lines=2) search_btn = gr.Button("🔍 Find Templates", variant="primary") with gr.Column(): template_out = gr.JSON(label="✅ Similar Templates") def search_templates(dropdown_val, custom_val): query = custom_val if custom_val else dropdown_val return find_similar_pipelines(query, 3) search_btn.click(search_templates, [template_dropdown, custom_query], template_out) gr.HTML(""" """) return demo if __name__ == "__main__": demo = create_ui() demo.launch( server_name=os.getenv("GRADIO_SERVER_NAME", "127.0.0.1"), server_port=int(os.getenv("GRADIO_SERVER_PORT", "7860")), share=False )