|
|
import sys |
|
|
import ast |
|
|
import logging |
|
|
import traceback |
|
|
from pathlib import Path |
|
|
from mcp.server.fastmcp import FastMCP |
|
|
import json |
|
|
|
|
|
from services.code_generation_service import CodeGenerator |
|
|
from services.project_service import ProjectAnalyzer |
|
|
from services.refactoring_service import RefactoringAdvisor |
|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
from services.sequence_service import ProjectSequenceAnalyzer, SequenceDiagramService |
|
|
from services.usecase_service import UseCaseDiagramService |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
os.environ.setdefault("MODAL_TOKEN_ID", os.getenv("MODAL_TOKEN_ID", "")) |
|
|
os.environ.setdefault("MODAL_TOKEN_SECRET", os.getenv("MODAL_TOKEN_SECRET", "")) |
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
stream=sys.stderr, |
|
|
format='%(message)s' |
|
|
) |
|
|
|
|
|
for lib in ["httpx", "httpcore", "asyncio"]: |
|
|
logging.getLogger(lib).setLevel(logging.WARNING) |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent)) |
|
|
|
|
|
|
|
|
mcp = FastMCP("ArchitectAI", dependencies=["langchain-openai", "langchain-core"]) |
|
|
|
|
|
|
|
|
try: |
|
|
from core.llm_factory import create_sambanova_llm, create_nebius_llm, create_openai_llm, create_gemini_llm |
|
|
from services.filesystem_service import FileSystemVisitor, TreeFormatter |
|
|
from services.architecture_service import ( |
|
|
ArchitectureVisitor, FastTypeEnricher, DeterministicPlantUMLConverter |
|
|
) |
|
|
logging.info("β
All services imported successfully") |
|
|
except ImportError as e: |
|
|
logging.error(f"β Critical Import Error: {e}") |
|
|
raise |
|
|
|
|
|
try: |
|
|
import modal |
|
|
modal_function = modal.Function.from_name("architect-ai-surgeon", "safe_refactor_and_test") |
|
|
logging.info("βοΈ Modal Function imported successfully.") |
|
|
except ImportError: |
|
|
logging.warning("β οΈ Could not import modal_executor function. Cloud features disabled.") |
|
|
modal_function = None |
|
|
|
|
|
def _find_project_root(start_path: Path) -> Path: |
|
|
""" |
|
|
Searches upwards for a marker that indicates the project root |
|
|
(like .git, pyproject.toml, or requirements.txt). |
|
|
Falls back to CWD if nothing is found. |
|
|
""" |
|
|
current = start_path.resolve() |
|
|
for parent in [current] + list(current.parents): |
|
|
|
|
|
if (parent / ".git").exists() or \ |
|
|
(parent / "pyproject.toml").exists() or \ |
|
|
(parent / "requirements.txt").exists(): |
|
|
return parent |
|
|
|
|
|
|
|
|
return Path.cwd().resolve() |
|
|
|
|
|
|
|
|
_script_location = Path(__file__).parent |
|
|
|
|
|
SANDBOX_ROOT = _find_project_root(_script_location) |
|
|
|
|
|
logging.info(f"π Project Root Detected at: {SANDBOX_ROOT}") |
|
|
|
|
|
BLOCKED_PATHS = { |
|
|
SANDBOX_ROOT / ".venv", |
|
|
SANDBOX_ROOT / "venv", |
|
|
SANDBOX_ROOT / ".git", |
|
|
SANDBOX_ROOT / ".env", |
|
|
SANDBOX_ROOT / "__pycache__", |
|
|
} |
|
|
|
|
|
|
|
|
class LLMClientSingleton: |
|
|
""" |
|
|
Singleton pattern for LLM client. |
|
|
Ensures only ONE instance of the LLM client is created and reused. |
|
|
""" |
|
|
_instance = None |
|
|
_llm_client = None |
|
|
_current_provider = None |
|
|
|
|
|
def __new__(cls): |
|
|
if cls._instance is None: |
|
|
cls._instance = super().__new__(cls) |
|
|
logging.info("ποΈ Creating LLM Singleton instance...") |
|
|
return cls._instance |
|
|
|
|
|
def get_client(self, preferred_provider: str = "sambanova", temperature: float = 0.0): |
|
|
""" |
|
|
Get or initialize the LLM client. |
|
|
Only creates a new client if provider changes. |
|
|
""" |
|
|
|
|
|
if self._llm_client is not None and self._current_provider == preferred_provider: |
|
|
logging.debug(f"β»οΈ Reusing cached {preferred_provider} client") |
|
|
return self._llm_client |
|
|
|
|
|
|
|
|
logging.info(f"π Initializing {preferred_provider} LLM client...") |
|
|
|
|
|
strategies = [ |
|
|
("sambanova", create_sambanova_llm), |
|
|
("gemini", create_gemini_llm), |
|
|
("nebius", create_nebius_llm), |
|
|
("openai", create_openai_llm) |
|
|
] |
|
|
|
|
|
if preferred_provider == "gemini": |
|
|
strategies = [ |
|
|
("gemini", create_gemini_llm), |
|
|
("sambanova", create_sambanova_llm), |
|
|
("nebius", create_nebius_llm), |
|
|
("openai", create_openai_llm) |
|
|
] |
|
|
elif preferred_provider == "nebius": |
|
|
strategies = [ |
|
|
("nebius", create_nebius_llm), |
|
|
("gemini", create_gemini_llm), |
|
|
("openai", create_openai_llm), |
|
|
("sambanova", create_sambanova_llm) |
|
|
] |
|
|
elif preferred_provider == "openai": |
|
|
strategies = [ |
|
|
("openai", create_openai_llm), |
|
|
("gemini", create_gemini_llm), |
|
|
("nebius", create_nebius_llm), |
|
|
("sambanova", create_sambanova_llm) |
|
|
] |
|
|
|
|
|
for name, factory in strategies: |
|
|
try: |
|
|
logging.info(f"π Attempting to connect to {name}...") |
|
|
self._llm_client = factory(temperature=temperature) |
|
|
self._current_provider = name |
|
|
logging.info(f"β
Connected to {name} (cached for reuse)") |
|
|
return self._llm_client |
|
|
except Exception as e: |
|
|
logging.warning(f"β οΈ {name} failed: {str(e)[:100]}...") |
|
|
|
|
|
logging.error("β No LLM provider available!") |
|
|
self._llm_client = None |
|
|
self._current_provider = None |
|
|
return None |
|
|
|
|
|
def reset(self): |
|
|
"""Force reset the cached client (useful for testing).""" |
|
|
logging.info("π Resetting LLM Singleton...") |
|
|
self._llm_client = None |
|
|
self._current_provider = None |
|
|
|
|
|
|
|
|
_llm_singleton = LLMClientSingleton() |
|
|
|
|
|
def _validate_path(user_path: str, operation: str = "read") -> tuple[bool, Path, str]: |
|
|
""" |
|
|
Validates that a user-provided path is within the sandbox. |
|
|
|
|
|
Returns: |
|
|
(is_valid, resolved_path, error_message) |
|
|
""" |
|
|
try: |
|
|
requested = Path(user_path).resolve() |
|
|
sandbox_root = SANDBOX_ROOT.resolve() |
|
|
|
|
|
try: |
|
|
requested.relative_to(sandbox_root) |
|
|
except ValueError: |
|
|
return False, requested, f"β Access Denied: Path '{user_path}' is outside project directory" |
|
|
|
|
|
if requested in BLOCKED_PATHS or any(requested.is_relative_to(bp) for bp in BLOCKED_PATHS if bp.exists()): |
|
|
return False, requested, f"β Access Denied: Cannot access '{requested.name}' (protected directory)" |
|
|
|
|
|
if not requested.exists(): |
|
|
return False, requested, f"β Path not found: '{user_path}'" |
|
|
|
|
|
if operation == "read": |
|
|
if requested.is_dir(): |
|
|
return False, requested, f"β '{user_path}' is a directory. Use list_project_structure() instead" |
|
|
if not requested.is_file(): |
|
|
return False, requested, f"β '{user_path}' is not a regular file" |
|
|
|
|
|
elif operation == "list": |
|
|
if not requested.is_dir(): |
|
|
return False, requested, f"β '{user_path}' is not a directory" |
|
|
|
|
|
logging.debug(f"β Path validation passed: {requested}") |
|
|
return True, requested, "" |
|
|
|
|
|
except Exception as e: |
|
|
return False, Path(user_path), f"β Path validation error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def generate_architecture_diagram(code: str, enrich: bool = True, provider: str = "sambanova") -> str: |
|
|
""" |
|
|
Analyzes Python code and generates a PlantUML Class Diagram. |
|
|
Uses singleton LLM client for efficiency. |
|
|
""" |
|
|
try: |
|
|
|
|
|
try: |
|
|
tree = ast.parse(code) |
|
|
visitor = ArchitectureVisitor() |
|
|
visitor.visit(tree) |
|
|
logging.info(f"β Static analysis: {len(visitor.structure)} classes found") |
|
|
except SyntaxError as se: |
|
|
return f"β Syntax Error in code: {se}" |
|
|
|
|
|
|
|
|
if enrich and visitor.structure: |
|
|
|
|
|
llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) |
|
|
|
|
|
if llm: |
|
|
try: |
|
|
enricher = FastTypeEnricher(llm) |
|
|
visitor.structure = enricher.enrich(code, visitor.structure) |
|
|
logging.info("β Type enrichment complete") |
|
|
except Exception as e: |
|
|
logging.error(f"β οΈ Enrichment failed (skipping): {e}") |
|
|
else: |
|
|
logging.warning("β οΈ Skipping enrichment: No LLM provider available") |
|
|
|
|
|
|
|
|
converter = DeterministicPlantUMLConverter() |
|
|
result = converter.convert(visitor.structure) |
|
|
logging.info("β Diagram generated successfully") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Unexpected error: {e}") |
|
|
traceback.print_exc(file=sys.stderr) |
|
|
return f"β Processing failed: {str(e)}" |
|
|
|
|
|
@mcp.tool() |
|
|
def list_project_structure(path: str = ".", style: str = "tree") -> str: |
|
|
""" |
|
|
Deterministically maps the folder structure. |
|
|
|
|
|
Args: |
|
|
path: The root directory to list. |
|
|
style: 'tree' for visual string, 'json' for structured data (easier for coding). |
|
|
|
|
|
π‘οΈ Sandboxed: Can only list directories within the project. |
|
|
""" |
|
|
try: |
|
|
|
|
|
is_valid, resolved_path, error_msg = _validate_path(path, operation="list") |
|
|
if not is_valid: |
|
|
logging.warning(f"π« Sandbox violation attempt: {error_msg}") |
|
|
return error_msg |
|
|
|
|
|
|
|
|
visitor = FileSystemVisitor() |
|
|
tree_data = visitor.visit(str(resolved_path)) |
|
|
|
|
|
|
|
|
if style == "json": |
|
|
result = json.dumps(tree_data, indent=2) |
|
|
else: |
|
|
formatter = TreeFormatter() |
|
|
result = formatter.format(tree_data) |
|
|
|
|
|
logging.info(f"β Listed directory: {resolved_path} (style={style})") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Directory listing failed: {e}") |
|
|
return f"Error listing directory: {e}" |
|
|
@mcp.tool() |
|
|
def read_file(path: str) -> str: |
|
|
""" |
|
|
Reads the full content of a specific file. |
|
|
Use this to inspect code before generating diagrams. |
|
|
π‘οΈ Sandboxed: Can only read files within the project. |
|
|
""" |
|
|
try: |
|
|
is_valid, resolved_path, error_msg = _validate_path(path, operation="read") |
|
|
if not is_valid: |
|
|
logging.warning(f"π« Sandbox violation attempt: {error_msg}") |
|
|
return error_msg |
|
|
|
|
|
content = resolved_path.read_text(encoding='utf-8', errors='replace') |
|
|
logging.info(f"β Read file: {resolved_path.name} ({len(content)} chars)") |
|
|
return content |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Read failed: {e}") |
|
|
return f"β Error reading file: {str(e)}" |
|
|
|
|
|
@mcp.tool() |
|
|
def generate_full_project_diagram(path: str = ".", enrich: bool = False, provider: str = "sambanova") -> str: |
|
|
""" |
|
|
Analyzes the ENTIRE project directory and generates a massive Class Diagram. |
|
|
|
|
|
Args: |
|
|
path: Root directory to analyze. |
|
|
enrich: If True, uses AI to infer types (Can be slow for big projects). |
|
|
""" |
|
|
try: |
|
|
|
|
|
is_valid, resolved_path, error_msg = _validate_path(path, operation="list") |
|
|
if not is_valid: |
|
|
return error_msg |
|
|
|
|
|
|
|
|
analyzer = ProjectAnalyzer(resolved_path) |
|
|
full_structure = analyzer.analyze() |
|
|
|
|
|
if not full_structure: |
|
|
return "β οΈ No Python code found in this directory." |
|
|
|
|
|
|
|
|
|
|
|
if enrich: |
|
|
|
|
|
|
|
|
|
|
|
logging.info("βΉοΈ Skipping AI enrichment for full project to ensure speed.") |
|
|
pass |
|
|
|
|
|
|
|
|
converter = DeterministicPlantUMLConverter() |
|
|
puml_code = converter.convert(full_structure) |
|
|
|
|
|
logging.info(f"β Generated full project diagram ({len(full_structure)} items)") |
|
|
return puml_code |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Project analysis failed: {e}") |
|
|
return f"Error analyzing project: {str(e)}" |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def propose_architecture_refactoring(path: str = ".") -> str: |
|
|
""" |
|
|
Analyzes the project and returns a JSON proposal with 'proposed_uml'. |
|
|
Use this to visualize changes BEFORE applying them. |
|
|
""" |
|
|
try: |
|
|
|
|
|
is_valid, resolved_path, error_msg = _validate_path(path, operation="list") |
|
|
if not is_valid: return error_msg |
|
|
|
|
|
analyzer = ProjectAnalyzer(resolved_path) |
|
|
structure = analyzer.analyze() |
|
|
|
|
|
if not structure: return "Error: No code found." |
|
|
|
|
|
|
|
|
advisor = RefactoringAdvisor() |
|
|
proposal = advisor.propose_improvement(structure) |
|
|
|
|
|
|
|
|
return json.dumps(proposal, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error generating proposal: {e}" |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def apply_refactoring(file_path: str, instruction: str) -> str: |
|
|
""" |
|
|
APPLIES CHANGES TO THE FILE. |
|
|
WARNING: This overwrites the file. Use only on 'ai_architect - Copy'. |
|
|
|
|
|
Args: |
|
|
file_path: Relative path (e.g., 'core/llm_factory.py'). |
|
|
instruction: What to do (e.g., 'Refactor to Singleton pattern'). |
|
|
""" |
|
|
try: |
|
|
|
|
|
target_file = SANDBOX_ROOT / file_path |
|
|
|
|
|
if not target_file.exists(): |
|
|
return f"β File not found: {file_path}" |
|
|
|
|
|
original_code = target_file.read_text(encoding='utf-8') |
|
|
|
|
|
|
|
|
generator = CodeGenerator() |
|
|
new_code = generator.generate_refactored_code(original_code, instruction, file_path) |
|
|
|
|
|
saved_path = generator.save_code(file_path, new_code, SANDBOX_ROOT) |
|
|
|
|
|
return f"β
File updated successfully: {saved_path}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Error: {e}" |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def apply_refactoring_safely( |
|
|
file_path: str, |
|
|
instruction: str, |
|
|
test_file: str = None |
|
|
) -> str: |
|
|
""" |
|
|
Safely refactors code with automatic testing. |
|
|
Uses Modal sandbox if available, falls back to local LLM if not. |
|
|
|
|
|
Args: |
|
|
file_path: Path to file to refactor |
|
|
instruction: Refactoring instructions |
|
|
test_file: Optional path to test file |
|
|
""" |
|
|
try: |
|
|
|
|
|
target = SANDBOX_ROOT / file_path |
|
|
if not target.exists(): |
|
|
return f"β File not found: {file_path}" |
|
|
|
|
|
original_code = target.read_text(encoding='utf-8') |
|
|
|
|
|
|
|
|
test_code = None |
|
|
if test_file: |
|
|
test_path = SANDBOX_ROOT / test_file |
|
|
if test_path.exists(): |
|
|
test_code = test_path.read_text(encoding='utf-8') |
|
|
|
|
|
|
|
|
if modal_function is not None: |
|
|
try: |
|
|
logging.info("π Sending to Modal sandbox...") |
|
|
result = modal_function.remote( |
|
|
original_code, |
|
|
instruction, |
|
|
test_code |
|
|
) |
|
|
|
|
|
|
|
|
if not result["success"]: |
|
|
return f"β Refactoring failed: {result['error']}" |
|
|
|
|
|
if not result["test_results"]["passed"]: |
|
|
return f""" |
|
|
β οΈ Refactoring succeeded but tests FAILED: |
|
|
{result['test_results']['output']} |
|
|
|
|
|
Code was NOT saved. Review and fix tests first. |
|
|
""" |
|
|
|
|
|
|
|
|
target.write_text(result["new_code"], encoding='utf-8') |
|
|
|
|
|
return f""" |
|
|
β
Refactoring completed successfully in Modal sandbox! |
|
|
π Tests: PASSED β |
|
|
πΎ File saved: {file_path} |
|
|
|
|
|
Test output: |
|
|
{result['test_results']['output']} |
|
|
""" |
|
|
except Exception as modal_error: |
|
|
logging.warning(f"β οΈ Modal sandbox failed: {str(modal_error)[:100]}... Falling back to local refactoring.") |
|
|
|
|
|
|
|
|
logging.info("π§ Using local LLM for refactoring (testing disabled)...") |
|
|
llm = _llm_singleton.get_client(preferred_provider="openai", temperature=0.0) |
|
|
|
|
|
if llm is None: |
|
|
return "β No LLM available for refactoring (Modal and local LLM both failed)" |
|
|
|
|
|
from langchain_core.messages import SystemMessage, HumanMessage |
|
|
|
|
|
system_msg = SystemMessage(content= |
|
|
"You are a Senior Python Refactoring Engineer. " |
|
|
"Rewrite the provided code to meet the user's architectural instructions " |
|
|
"while preserving the original business logic." |
|
|
) |
|
|
|
|
|
user_msg = HumanMessage(content=f""" |
|
|
Refactor this Python code according to these instructions: |
|
|
|
|
|
**Instructions:** {instruction} |
|
|
|
|
|
**Original Code:** |
|
|
```python |
|
|
{original_code} |
|
|
``` |
|
|
|
|
|
Return ONLY the refactored code, no explanations. |
|
|
""") |
|
|
|
|
|
try: |
|
|
response = llm.invoke([system_msg, user_msg]) |
|
|
new_code = response.content |
|
|
|
|
|
|
|
|
if "```python" in new_code: |
|
|
new_code = new_code.split("```python")[1].split("```")[0].strip() |
|
|
elif "```" in new_code: |
|
|
new_code = new_code.split("```")[1].split("```")[0].strip() |
|
|
|
|
|
|
|
|
target.write_text(new_code, encoding='utf-8') |
|
|
|
|
|
return f""" |
|
|
β
Refactoring completed successfully (Local LLM)! |
|
|
πΎ File saved: {file_path} |
|
|
β οΈ Note: Testing was skipped (Modal not available). Please verify manually. |
|
|
|
|
|
Refactored code preview: |
|
|
```python |
|
|
{new_code[:500]}... |
|
|
``` |
|
|
""" |
|
|
except Exception as llm_error: |
|
|
return f"β Local LLM refactoring failed: {str(llm_error)}" |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Refactoring error: {e}") |
|
|
return f"β Error: {e}" |
|
|
|
|
|
@mcp.tool() |
|
|
def generate_usecase_diagram(code: str, enrich: bool = True, provider: str = "sambanova") -> str: |
|
|
""" |
|
|
Analyzes Python code and generates a PlantUML Use Case Diagram. |
|
|
|
|
|
Detects: |
|
|
- Service classes (patterns: *Service, *Controller, *Handler, *API) |
|
|
- FastAPI/Flask endpoints (@app.get, @router.post, etc.) |
|
|
- Public methods as potential use cases |
|
|
|
|
|
Args: |
|
|
code: Python source code to analyze |
|
|
enrich: If True, uses AI to infer actors and relationships |
|
|
provider: LLM provider ("sambanova", "nebius", "openai") |
|
|
|
|
|
Returns: |
|
|
PlantUML use case diagram string |
|
|
""" |
|
|
try: |
|
|
|
|
|
llm = None |
|
|
if enrich: |
|
|
llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) |
|
|
|
|
|
|
|
|
service = UseCaseDiagramService(llm=llm) |
|
|
result = service.generate(code, enrich=enrich) |
|
|
|
|
|
logging.info("β Use case diagram generated successfully") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Use case diagram failed: {e}") |
|
|
traceback.print_exc(file=sys.stderr) |
|
|
return f"' β Error: {str(e)}" |
|
|
|
|
|
@mcp.tool() |
|
|
def generate_full_project_usecase_diagram(path: str = ".", enrich: bool = True, provider: str = "sambanova") -> str: |
|
|
""" |
|
|
Analyzes the ENTIRE project directory and generates a Use Case Diagram. |
|
|
|
|
|
Scans all .py files for services, controllers, and endpoints. |
|
|
|
|
|
Args: |
|
|
path: Root directory to analyze |
|
|
enrich: If True, uses AI to infer actors |
|
|
provider: LLM provider |
|
|
|
|
|
Returns: |
|
|
PlantUML use case diagram string |
|
|
""" |
|
|
try: |
|
|
|
|
|
is_valid, resolved_path, error_msg = _validate_path(path, operation="list") |
|
|
if not is_valid: |
|
|
return error_msg |
|
|
|
|
|
|
|
|
combined_code = [] |
|
|
file_count = 0 |
|
|
|
|
|
for file_path in resolved_path.rglob("*.py"): |
|
|
parts = file_path.parts |
|
|
if any(p.startswith(".") or p in ["venv", "env", "__pycache__", "node_modules"] for p in parts): |
|
|
continue |
|
|
try: |
|
|
code = file_path.read_text(encoding='utf-8', errors='replace') |
|
|
combined_code.append(f"# === File: {file_path.name} ===\n{code}") |
|
|
file_count += 1 |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
if not combined_code: |
|
|
return "' β οΈ No Python files found in directory" |
|
|
|
|
|
|
|
|
llm = None |
|
|
if enrich: |
|
|
llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) |
|
|
|
|
|
|
|
|
service = UseCaseDiagramService(llm=llm) |
|
|
|
|
|
full_code = "\n\n".join(combined_code)[:50000] |
|
|
result = service.generate(full_code, enrich=enrich) |
|
|
|
|
|
logging.info(f"β Project use case diagram generated ({file_count} files)") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Project use case diagram failed: {e}") |
|
|
return f"' β Error: {str(e)}" |
|
|
|
|
|
@mcp.tool() |
|
|
def generate_sequence_diagram( |
|
|
code: str, |
|
|
entry_method: str = None, |
|
|
enrich: bool = True, |
|
|
provider: str = "sambanova" |
|
|
) -> str: |
|
|
""" |
|
|
Analyzes Python code and generates a PlantUML Sequence Diagram. |
|
|
|
|
|
Traces method calls through: |
|
|
- self.dependency.method() patterns |
|
|
- Control flow (if/else, loops, try/except) |
|
|
- Async operations |
|
|
|
|
|
Args: |
|
|
code: Python source code to analyze |
|
|
entry_method: Starting point (e.g., "OrderService.create_order") |
|
|
If None, auto-detects the method with most calls |
|
|
enrich: If True, uses AI to improve readability |
|
|
provider: LLM provider ("sambanova", "nebius", "openai") |
|
|
|
|
|
Returns: |
|
|
PlantUML sequence diagram string |
|
|
""" |
|
|
try: |
|
|
|
|
|
llm = None |
|
|
if enrich: |
|
|
llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) |
|
|
|
|
|
|
|
|
service = SequenceDiagramService(llm=llm) |
|
|
result = service.generate(code, entry_method=entry_method, enrich=enrich) |
|
|
|
|
|
logging.info(f"β Sequence diagram generated (entry: {entry_method or 'auto'})") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Sequence diagram failed: {e}") |
|
|
traceback.print_exc(file=sys.stderr) |
|
|
return f"' β Error: {str(e)}" |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def generate_full_project_sequence_diagram( |
|
|
path: str = ".", |
|
|
entry_method: str = None, |
|
|
enrich: bool = False, |
|
|
provider: str = "sambanova" |
|
|
) -> str: |
|
|
""" |
|
|
Analyzes ENTIRE project and generates a cross-file Sequence Diagram. |
|
|
|
|
|
Scans all .py files and traces method calls across modules. |
|
|
|
|
|
Args: |
|
|
path: Root directory to analyze |
|
|
entry_method: Starting point (e.g., "OrderService.create_order") |
|
|
enrich: If True, uses AI to improve readability |
|
|
provider: LLM provider |
|
|
|
|
|
Returns: |
|
|
PlantUML sequence diagram string |
|
|
""" |
|
|
try: |
|
|
|
|
|
is_valid, resolved_path, error_msg = _validate_path(path, operation="list") |
|
|
if not is_valid: |
|
|
return error_msg |
|
|
|
|
|
|
|
|
file_contents = {} |
|
|
for file_path in resolved_path.rglob("*.py"): |
|
|
parts = file_path.parts |
|
|
if any(p.startswith(".") or p in ["venv", "env", "__pycache__", ".git"] for p in parts): |
|
|
continue |
|
|
try: |
|
|
file_contents[file_path.name] = file_path.read_text(encoding='utf-8', errors='replace') |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
if not file_contents: |
|
|
return "' β οΈ No Python files found in directory" |
|
|
|
|
|
|
|
|
llm = None |
|
|
if enrich: |
|
|
llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) |
|
|
|
|
|
|
|
|
analyzer = ProjectSequenceAnalyzer(llm=llm) |
|
|
analyzer.analyze_files(file_contents) |
|
|
|
|
|
|
|
|
if not entry_method: |
|
|
candidates = [ |
|
|
(m, len(c)) |
|
|
for m, c in analyzer.global_visitor.call_sequences.items() |
|
|
if c and not m.startswith('_') |
|
|
] |
|
|
if candidates: |
|
|
candidates.sort(key=lambda x: x[1], reverse=True) |
|
|
entry_method = candidates[0][0] |
|
|
|
|
|
if not entry_method: |
|
|
|
|
|
methods = list(analyzer.global_visitor.call_sequences.keys())[:10] |
|
|
return f"' β οΈ No entry method specified. Available: {methods}" |
|
|
|
|
|
result = analyzer.generate_diagram(entry_method, enrich=enrich) |
|
|
|
|
|
logging.info(f"β Project sequence diagram generated ({len(file_contents)} files)") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Project sequence diagram failed: {e}") |
|
|
return f"' β Error: {str(e)}" |
|
|
|
|
|
@mcp.tool() |
|
|
def list_sequence_entry_points(code: str) -> str: |
|
|
""" |
|
|
Lists all available methods that can be used as sequence diagram entry points. |
|
|
|
|
|
Use this to discover what methods exist before generating a sequence diagram. |
|
|
|
|
|
Args: |
|
|
code: Python source code to analyze |
|
|
|
|
|
Returns: |
|
|
JSON list of available entry methods with their call counts |
|
|
""" |
|
|
try: |
|
|
from services.sequence_service import CallGraphVisitor |
|
|
|
|
|
tree = ast.parse(code) |
|
|
visitor = CallGraphVisitor() |
|
|
visitor.visit(tree) |
|
|
|
|
|
|
|
|
entry_points = [] |
|
|
for method, calls in visitor.call_sequences.items(): |
|
|
if method.startswith('_'): |
|
|
continue |
|
|
entry_points.append({ |
|
|
'method': method, |
|
|
'calls_count': len(calls), |
|
|
'calls_to': list(set( |
|
|
c['target_type'] for c in calls if c.get('target_type') |
|
|
))[:5] |
|
|
}) |
|
|
|
|
|
|
|
|
entry_points.sort(key=lambda x: x['calls_count'], reverse=True) |
|
|
|
|
|
logging.info(f"β Found {len(entry_points)} potential entry points") |
|
|
return json.dumps({ |
|
|
'total_methods': len(entry_points), |
|
|
'entry_points': entry_points[:20] |
|
|
}, indent=2) |
|
|
|
|
|
except SyntaxError as se: |
|
|
return json.dumps({'error': f'Syntax error: {se}'}) |
|
|
except Exception as e: |
|
|
return json.dumps({'error': str(e)}) |
|
|
|
|
|
|
|
|
@mcp.tool() |
|
|
def list_project_entry_points(path: str = ".") -> str: |
|
|
""" |
|
|
Lists all available methods across the ENTIRE project for sequence diagrams. |
|
|
|
|
|
Args: |
|
|
path: Root directory to analyze |
|
|
|
|
|
Returns: |
|
|
JSON list of available entry methods with their call counts |
|
|
""" |
|
|
try: |
|
|
from services.sequence_service import CallGraphVisitor |
|
|
|
|
|
|
|
|
is_valid, resolved_path, error_msg = _validate_path(path, operation="list") |
|
|
if not is_valid: |
|
|
return json.dumps({'error': error_msg}) |
|
|
|
|
|
|
|
|
global_visitor = CallGraphVisitor() |
|
|
file_count = 0 |
|
|
|
|
|
for file_path in resolved_path.rglob("*.py"): |
|
|
parts = file_path.parts |
|
|
if any(p.startswith(".") or p in ["venv", "env", "__pycache__"] for p in parts): |
|
|
continue |
|
|
try: |
|
|
code = file_path.read_text(encoding='utf-8', errors='replace') |
|
|
tree = ast.parse(code) |
|
|
file_visitor = CallGraphVisitor() |
|
|
file_visitor.visit(tree) |
|
|
|
|
|
|
|
|
global_visitor.classes.update(file_visitor.classes) |
|
|
for method, calls in file_visitor.call_sequences.items(): |
|
|
global_visitor.call_sequences[method].extend(calls) |
|
|
file_count += 1 |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
|
|
|
entry_points = [] |
|
|
for method, calls in global_visitor.call_sequences.items(): |
|
|
if method.startswith('_') or '.visit_' in method: |
|
|
continue |
|
|
entry_points.append({ |
|
|
'method': method, |
|
|
'calls_count': len(calls), |
|
|
'calls_to': list(set( |
|
|
c['target_type'] for c in calls if c.get('target_type') |
|
|
))[:5] |
|
|
}) |
|
|
|
|
|
entry_points.sort(key=lambda x: x['calls_count'], reverse=True) |
|
|
|
|
|
logging.info(f"β Scanned {file_count} files, found {len(entry_points)} entry points") |
|
|
return json.dumps({ |
|
|
'files_analyzed': file_count, |
|
|
'total_methods': len(entry_points), |
|
|
'entry_points': entry_points[:30] |
|
|
}, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
return json.dumps({'error': str(e)}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
logging.info("π MCP Server 'ArchitectAI' starting...") |
|
|
logging.info(f"π Sandbox Root: {SANDBOX_ROOT}") |
|
|
logging.info(f"π« Protected Paths: {len(BLOCKED_PATHS)}") |
|
|
logging.info("π§ LLM Client: Singleton Pattern (created once, reused)") |
|
|
logging.info("π Available Diagram Types:") |
|
|
logging.info(" β’ Class Diagrams β generate_architecture_diagram()") |
|
|
logging.info(" β’ Use Case Diagrams β generate_usecase_diagram()") |
|
|
logging.info(" β’ Sequence Diagrams β generate_sequence_diagram()") |
|
|
logging.info("Available providers: sambanova, nebius, openai") |
|
|
|
|
|
|
|
|
try: |
|
|
mcp.run() |
|
|
except KeyboardInterrupt: |
|
|
logging.info("π Server stopped manually.") |
|
|
except Exception as e: |
|
|
logging.critical(f"β Fatal server error: {e}") |
|
|
sys.exit(1) |