🔧 PipelineForge MCP
⚡ AWS Glue ETL Optimizer - Automatic Workflow ⚡
🎯 Each step automatically uses data from previous steps - no copy/paste needed!
""" PipelineForge MCP - Gradio MCP Server for AWS Glue ETL Optimization MCP 1st Birthday Hackathon Submission - AUTOMATIC DATA FLOW VERSION """ import os import gradio as gr from dotenv import load_dotenv import anthropic from elevenlabs.client import ElevenLabs import json from typing import List, Dict, Optional, Tuple from PIL import Image import io import base64 load_dotenv() # Initialize API clients anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY") elevenlabs_client = ElevenLabs(api_key=elevenlabs_api_key) if elevenlabs_api_key else None # Initialize RAG (using minimal implementation - no LangChain conflicts) template_store = None try: from rag_templates_minimal import get_template_store template_store = get_template_store() if template_store: print("✅ RAG template store initialized") else: print("⚠️ RAG template store returned None") except Exception as e: print(f"⚠️ RAG disabled: {e}") # =========================== # CORE MCP TOOLS # =========================== def analyze_screenshot(image: Image.Image, requirements: str) -> Dict: """Analyze AWS console screenshot and extract ETL pipeline requirements.""" try: buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() message = anthropic_client.messages.create( model="claude-3-opus-20240229", max_tokens=2048, messages=[{ "role": "user", "content": [ {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": img_str}}, {"type": "text", "text": f"""Analyze this AWS console screenshot and extract ETL pipeline requirements. Additional context: {requirements} Return JSON with: sources, transformations, targets, estimated_volume, frequency"""} ] }] ) response_text = message.content[0].text try: return json.loads(response_text) except: return {"analysis": response_text, "sources": [], "transformations": [], "targets": [], "estimated_volume": "50GB", "frequency": "daily"} except Exception as e: return {"error": f"Screenshot analysis failed: {str(e)}"} def generate_glue_script(requirements: Dict, optimization_level: str = "balanced") -> Dict: """Generate optimized AWS Glue PySpark ETL script.""" try: message = anthropic_client.messages.create( model="claude-3-opus-20240229", max_tokens=4096, messages=[{"role": "user", "content": f"""Generate an optimized AWS Glue PySpark ETL script: {json.dumps(requirements, indent=2)} Optimization: {optimization_level} Include error handling, logging, and AWS Glue best practices."""}] ) return {"script": message.content[0].text, "optimization_level": optimization_level} except Exception as e: return {"error": f"Script generation failed: {str(e)}"} def simulate_glue_cost(requirements: Dict, worker_type: str = "G.1X", num_workers: int = 2) -> Dict: """Simulate AWS Glue job cost.""" try: pricing = {"G.1X": 0.44, "G.2X": 0.88, "G.4X": 1.76, "G.8X": 3.52} volume_str = str(requirements.get("estimated_volume", "50GB")) if "GB" in volume_str: volume_gb = float(volume_str.replace("GB", "").strip()) base_time_minutes = volume_gb * 5 elif "TB" in volume_str: volume_tb = float(volume_str.replace("TB", "").strip()) base_time_minutes = volume_tb * 1000 * 5 else: base_time_minutes = 30 worker_multiplier = {"G.1X": 1.0, "G.2X": 0.6, "G.4X": 0.4, "G.8X": 0.25} estimated_time_hours = (base_time_minutes * worker_multiplier.get(worker_type, 1.0)) / 60 dpu_per_worker = {"G.1X": 1, "G.2X": 2, "G.4X": 4, "G.8X": 8} total_dpus = num_workers * dpu_per_worker.get(worker_type, 1) hourly_cost = total_dpus * pricing.get(worker_type, 0.44) total_cost = hourly_cost * estimated_time_hours recommendations = [] if total_cost > 10: recommendations.append("Consider G.1X workers for cost optimization") if estimated_time_hours > 2: recommendations.append("Consider partitioning data") return { "worker_type": worker_type, "num_workers": num_workers, "estimated_time_hours": round(estimated_time_hours, 2), "hourly_cost_usd": round(hourly_cost, 2), "total_cost_usd": round(total_cost, 2), "total_dpus": total_dpus, "recommendations": recommendations } except Exception as e: return {"error": f"Cost simulation failed: {str(e)}"} def generate_cdk_infrastructure(requirements: Dict, script: str) -> Dict: """Generate AWS CDK Python code.""" try: message = anthropic_client.messages.create( model="claude-3-opus-20240229", max_tokens=4096, messages=[{"role": "user", "content": f"""Generate AWS CDK Python code: Requirements: {json.dumps(requirements, indent=2)} Script: {script[:500]}... Include Glue job, IAM, S3, CloudWatch, EventBridge."""}] ) return {"cdk_code": message.content[0].text} except Exception as e: return {"error": f"CDK generation failed: {str(e)}"} def generate_voice_explanation(content: str) -> str: """Generate voice narration using ElevenLabs TTS.""" try: if not elevenlabs_client: return "ElevenLabs API key not configured" audio_generator = elevenlabs_client.text_to_speech.convert( text=content[:1000], voice_id="21m00Tcm4TlvDq8ikWAM", model_id="eleven_monolingual_v1" ) output_path = "narration.mp3" with open(output_path, "wb") as f: for chunk in audio_generator: f.write(chunk) return output_path except Exception as e: return f"Voice generation failed: {str(e)}" def find_similar_pipelines(query: str, top_k: int = 3) -> Dict: """Find similar ETL pipeline templates using RAG.""" try: if template_store is None: return {"error": "Template store not initialized"} similar = template_store.find_similar(query, top_k) return {"query": query, "similar_templates": similar, "count": len(similar)} except Exception as e: return {"error": f"Template search failed: {str(e)}"} # =========================== # GRADIO UI - AUTOMATIC DATA FLOW # =========================== def create_ui(): """Create Gradio interface with automatic data flow and colorful theme""" # Custom CSS for colorful, modern theme custom_css = """ /* Main container gradient background */ .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; } /* Header styling */ .main-header { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); padding: 2rem; border-radius: 15px; margin-bottom: 2rem; box-shadow: 0 8px 32px rgba(31, 38, 135, 0.37); backdrop-filter: blur(4px); border: 1px solid rgba(255, 255, 255, 0.18); } /* Tab styling */ .tab-nav button { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white !important; border: none; border-radius: 10px 10px 0 0; padding: 12px 24px; margin: 0 4px; transition: all 0.3s ease; font-weight: 600; } .tab-nav button:hover { transform: translateY(-3px); box-shadow: 0 5px 15px rgba(102, 126, 234, 0.4); } .tab-nav button.selected { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); box-shadow: 0 5px 20px rgba(245, 87, 108, 0.5); } /* Button styling */ .primary-button { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; border: none !important; color: white !important; padding: 14px 32px !important; border-radius: 12px !important; font-size: 16px !important; font-weight: 600 !important; transition: all 0.3s ease !important; box-shadow: 0 6px 20px rgba(102, 126, 234, 0.4) !important; } .primary-button:hover { transform: translateY(-2px) !important; box-shadow: 0 8px 30px rgba(102, 126, 234, 0.6) !important; } /* Card containers */ .gr-box, .gr-form, .gr-panel { background: rgba(255, 255, 255, 0.95) !important; border-radius: 15px !important; padding: 1.5rem !important; box-shadow: 0 8px 32px rgba(31, 38, 135, 0.15) !important; backdrop-filter: blur(4px) !important; border: 1px solid rgba(255, 255, 255, 0.3) !important; } /* Input fields */ input, textarea, select { border: 2px solid #e0e7ff !important; border-radius: 10px !important; padding: 10px !important; transition: all 0.3s ease !important; background: white !important; } input:focus, textarea:focus, select:focus { border-color: #667eea !important; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important; } /* Code blocks */ .code-container { background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%) !important; border-radius: 12px !important; padding: 1rem !important; box-shadow: 0 8px 32px rgba(30, 60, 114, 0.3) !important; } /* JSON output */ .json-container { background: linear-gradient(135deg, #134e5e 0%, #71b280 100%) !important; border-radius: 12px !important; padding: 1rem !important; color: white !important; } /* Success indicators */ .success-mark { color: #10b981; font-size: 1.2em; font-weight: bold; } /* Step numbers */ .step-number { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); color: white; border-radius: 50%; width: 40px; height: 40px; display: inline-flex; align-items: center; justify-content: center; font-weight: bold; margin-right: 10px; } /* Footer */ .footer { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 1.5rem; border-radius: 15px; margin-top: 2rem; color: white; text-align: center; box-shadow: 0 8px 32px rgba(102, 126, 234, 0.3); } /* Animations */ @keyframes pulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.8; } } .animated-icon { animation: pulse 2s ease-in-out infinite; } """ with gr.Blocks(title="PipelineForge MCP") as demo: # Inject custom CSS gr.HTML(f""" """) gr.HTML("""
⚡ AWS Glue ETL Optimizer - Automatic Workflow ⚡
🎯 Each step automatically uses data from previous steps - no copy/paste needed!