""" Minimal RAG Template Store - No LlamaIndex, No LangChain Uses only: ChromaDB + SentenceTransformers """ import os import chromadb from sentence_transformers import SentenceTransformer from typing import List, Dict, Optional import json class ETLTemplateStore: """Minimal RAG store using ChromaDB + SentenceTransformers directly""" def __init__(self, persist_dir: str = "./chroma_db"): """Initialize with direct ChromaDB and embeddings""" self.persist_dir = persist_dir self.collection = None self.model = None try: print("🔧 Initializing Minimal RAG Template Store...") # Load embeddings model (local, no API needed) self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') print("✓ Embeddings model loaded") # Initialize ChromaDB self.chroma_client = chromadb.PersistentClient(path=persist_dir) self.collection = self.chroma_client.get_or_create_collection( name="etl_templates", metadata={"hnsw:space": "cosine"} ) print("✓ ChromaDB initialized") # Add default templates if collection is empty if self.collection.count() == 0: self._add_default_templates() print("✓ Added default templates") else: print(f"✓ Loaded {self.collection.count()} existing templates") print("✅ RAG Template Store ready!") except Exception as e: print(f"⚠️ RAG initialization failed: {e}") raise def _add_default_templates(self): """Add 5 default AWS Glue ETL templates""" templates = [ { "id": "template_1", "name": "Daily Sales Aggregation", "description": "Aggregate daily sales data with customer join and date filtering for e-commerce reporting", "use_case": "E-commerce daily reporting with customer dimensions", "estimated_cost": "$5-10 per day", "worker_config": "G.1X with 5 workers", "data_volume": "50-100GB" }, { "id": "template_2", "name": "Real-time CDC Processing", "description": "Process change data capture events from DynamoDB streams for real-time sync", "use_case": "Real-time data sync between operational and analytical stores", "estimated_cost": "$20-40 per day", "worker_config": "G.2X with 3 workers", "data_volume": "Streaming 10K events/min" }, { "id": "template_3", "name": "Data Quality Validation", "description": "Validate data quality with Great Expectations before loading to ensure data integrity", "use_case": "Data quality checks in ETL pipelines", "estimated_cost": "$3-8 per run", "worker_config": "G.1X with 2 workers", "data_volume": "10-50GB" }, { "id": "template_4", "name": "Multi-Source Data Lake Integration", "description": "Combine data from S3, RDS, and Redshift into unified data lake", "use_case": "Centralized data lake from multiple sources", "estimated_cost": "$15-30 per run", "worker_config": "G.2X with 10 workers", "data_volume": "500GB-2TB" }, { "id": "template_5", "name": "Incremental ETL with Job Bookmarks", "description": "Process only new data using Glue job bookmarks for efficient incremental loading", "use_case": "Efficient incremental data processing", "estimated_cost": "$2-5 per run", "worker_config": "G.1X with 2 workers", "data_volume": "Incremental varies" } ] for template in templates: self.add_template(template) def add_template(self, template: Dict): """Add template to ChromaDB with embeddings""" try: # Create searchable text search_text = f"{template['name']} {template['description']} {template['use_case']}" # Generate embedding embedding = self.model.encode(search_text).tolist() # Add to ChromaDB self.collection.add( ids=[template['id']], embeddings=[embedding], documents=[search_text], metadatas=[template] ) print(f"✓ Added: {template['name']}") except Exception as e: print(f"⚠️ Error adding template: {e}") def find_similar(self, query: str, top_k: int = 3) -> List[Dict]: """Find similar templates using semantic search""" if not self.collection or not self.model: return [] try: # Generate query embedding query_embedding = self.model.encode(query).tolist() # Search ChromaDB results = self.collection.query( query_embeddings=[query_embedding], n_results=min(top_k, self.collection.count()) ) # Format results similar_templates = [] if results and results['metadatas'] and results['metadatas'][0]: for i, metadata in enumerate(results['metadatas'][0]): distance = results['distances'][0][i] if results.get('distances') else 0 # Convert distance to similarity score (1 - distance for cosine) similarity = 1.0 - distance similar_templates.append({ "template": metadata, "relevance_score": round(similarity, 3), "excerpt": results['documents'][0][i][:150] + "..." }) return similar_templates except Exception as e: print(f"⚠️ Search error: {e}") return [] def get_all_templates(self) -> List[Dict]: """Get all stored templates""" if not self.collection: return [] try: all_data = self.collection.get() return all_data.get('metadatas', []) except Exception as e: print(f"⚠️ Error retrieving templates: {e}") return [] # Singleton _template_store = None def get_template_store() -> Optional[ETLTemplateStore]: """Get or create global template store""" global _template_store if _template_store is None: try: _template_store = ETLTemplateStore() except Exception as e: print(f"⚠️ Could not initialize template store: {e}") return None return _template_store