import os,logging,asyncio logger = logging.getLogger(__name__) from pathlib import Path from fastmcp import FastMCP mcp = FastMCP("Coderama") @mcp.tool async def create_folder(folder_path:str)->dict: """ Create a folder at the specified location and return an execution status. Agent Tool Specification: Name: create_folder Description: Creates a directory or folder at the given path. Returns a structured status dictionary with success or error details. Input Arguments: folder_path (str): Path where the folder or file should be created. Output: dict: { "STATUS": bool, # True if creation succeeded, False otherwise "PATH": str # Full path of the created file/folder } Behavior: - Validates the input path and constructs the full target path. - Creates any missing directories when creating a file. - Overwrites nothing; if the target already exists, marks the operation as successful. - Captures and reports exceptions in the status dictionary. """ WORKSPACE_DIR=os.environ.get("WORKSPACE_DIR") folder_path = WORKSPACE_DIR+os.sep+folder_path if not safe_join(WORKSPACE_DIR,folder_path): return { "STATUS": "FAILURE", "PATH": f"{folder_path} outside working directory. Do not use eacape sequence in your directory path" } try: os.makedirs(folder_path,exist_ok=True) except (FileExistsError,Exception) as e: logger.error("File exsist {folder_path}") return { "STATUS": "SUCCESS", "PATH": f"{folder_path}" } @mcp.tool async def create_file(parent_folder_path:str,file_name:str,content:str)->dict: """ Create a file at the specified location and return an execution status. Agent Tool Specification: Name: create_file Description: Creates a file at the given path. Returns a structured status dictionary with success or error details. Input Arguments: parent_folder_path (str): Parent folder path where the file should be created.Can be empty if it is the root. file_name (str): The name of the file to create. Provide only the filename, with no directory paths or separators included. content (str): Content to be written to a file Output: dict: { "STATUS": bool, # True if creation succeeded, False otherwise "PATH": str # Full path of the created file/folder } Behavior: - Validates the input path and constructs the full target path. - Creates any missing directories when creating a file. - Overwrites nothing; if the target already exists, marks the operation as successful. - Captures and reports exceptions in the status dictionary. """ WORKSPACE_DIR=os.environ.get("WORKSPACE_DIR") folder_path = WORKSPACE_DIR+os.sep+parent_folder_path try: os.makedirs(folder_path,exist_ok=True) except (FileExistsError,Exception) as e: logger.error(f"File exsist {folder_path}") full_path = folder_path+os.sep+file_name if not safe_join(WORKSPACE_DIR,full_path): return { "STATUS": "FAILURE", "PATH": f"{full_path} outside working directory. Do not use eacape sequence in your directory path" } try: with open(full_path,"w") as f: f.write(content) logger.info(f"file created {full_path}") return { "STATUS": "SUCCESS", "PATH": f"{full_path}" } except (FileNotFoundError,NotADirectoryError,IsADirectoryError,Exception) as e: return { "STATUS": "FAILURE", "PATH": f"{e}" } @mcp.tool async def read_file(folder_path:str,file_name:str)->dict: """ Create a folder or file at the specified location and return an execution status. Agent Tool Specification: Name: read_file Description: Reads a file at a particular location and provides the content of the file. Input Arguments: folder_path (str): Relative path of the folder where file has to be read. file_name (str): Name of the file. Output: dict: { "STATUS": bool, # True if creation succeeded, False otherwise "CONTENT": str # Full path of the created file/folder } """ WORKSPACE_DIR=os.environ.get("WORKSPACE_DIR") file_path = WORKSPACE_DIR+os.sep+folder_path+os.sep+file_name if not safe_join(WORKSPACE_DIR,file_path): return { "STATUS": "FAILURE", "PATH": f"{file_path} outside working directory. Do not use eacape sequence in your directory path" } if not os.path.exists(file_path): return { "STATUS": "FAILURE", "CONTENT": f"File does not exsist at {file_path}" } with open(file_path,"r") as f: content = f.read() return { "STATUS": "SUCCESS", "CONTENT": content } @mcp.tool async def execute_shell_code(shell_file_path:str)->dict: """ Execute a shell code file located at the given path in shell and return an execution status object. Parameters ---------- shell_file_path (str): The name of the shell script file to execute. You must provide ONLY the filename of an existing shell script, not a raw shell command. Do NOT include inline shell commands, arguments, pipes, flags, or bash syntax. Do NOT include any directory paths — provide only the script filename (e.g., "deploy.sh"), which will be executed from the workspace directory. Returns ------- dict A structured status response with the following keys: - "STATUS" (bool): Whether execution completed without errors. - "OUTPUT" (str): Captured standard output as text. - "ERROR" (str): Captured standard error as text. - "REASON" (str): A human-readable summary of the result. Notes ----- - This function does not return live streams; it captures output after execution. - Errors during file loading or process start are returned as part of the structured status rather than raised as exceptions. """ import subprocess WORKSPACE_DIR=os.environ.get("WORKSPACE_DIR") DOCKER_IMAGE_TAG = os.environ.get("DOCKER_IMAGE_TAG") PLATFORM = os.environ.get("PLATFORM") SANDBOX_AVAILABLE = os.environ.get("SANDBOX_AVAILABLE") logger.info(f""" ========================IN EXECUTE SHELL CODE============================ The WORKSPACE_DIR is {WORKSPACE_DIR} DOCKER_IMAGE_TAG is {DOCKER_IMAGE_TAG} PLATFORM is {PLATFORM} SANDBOX_AVAILABLE is {SANDBOX_AVAILABLE} """) if not shell_file_path.endswith(".sh"): return { "STATUS": "FAILURE", "REASON": "PLEASE CREATE A SHELL SCRIPT FILE e.g., script.sh to execute shell commands.CANNOT EXECUTE COMMAND DIRECTLY.", "WORKSPACE": f"{shell_file_path}" } try: logger.info(" =====BEFORE EXECUTE SUBPROCESS===============") if SANDBOX_AVAILABLE is not None and SANDBOX_AVAILABLE.lower()=="true": logger.info(" ===== SANDBOX IS SET===============") if PLATFORM is not None and PLATFORM == "linux/arm64": cmd = [ "docker", "run", f"--platform={PLATFORM}", "--rm", "-v", f"{WORKSPACE_DIR}:/app", "-w", "/app", f"{DOCKER_IMAGE_TAG}", "bash", "-c", f"chmod +x {shell_file_path} && ./{shell_file_path}" ] else: cmd = [ "docker", "run", "--rm", "-v", f"{WORKSPACE_DIR}:/app", "-w", "/app", f"{DOCKER_IMAGE_TAG}", "bash", "-c", f"chmod +x {shell_file_path} && ./{shell_file_path}" ] logger.info(f" Command to be executed : {cmd}") proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL ) out, err = await proc.communicate() else: logger.info(" ===== SANDBOX IS UNSET===============") script_path = os.path.join(WORKSPACE_DIR, shell_file_path) cmd = ["bash", "-c", f"chmod +x {script_path} && ./{shell_file_path}"] logger.info(f" Command to be executed : {cmd}") proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, cwd=WORKSPACE_DIR ) out, err = await proc.communicate() logger.info(" =====AFTER EXECUTE SUBPROCESS===============") logger.info(f"Output: {out.decode() if out else ''}") logger.info(f"Errors: {err.decode() if err else ''}") return { "STATUS": "SUCCESS", "REASON": "CODE EXECUTED SUCCESSFULLY", "OUTPUT": f"{out.decode() if out else ''}", "ERROR": f"{err.decode() if err else ''}", "COMMAND": f"{' '.join(cmd)}" } except (subprocess.CalledProcessError,Exception) as e: logger.error(f"Command failed with exit code {e.returncode}: {e.stderr} : {e.stdout}") return { "STATUS": "FAILURE", "OUTPUT": f"{e.stdout}", "ERROR": f"{e.stderr}", "RETURN_CODE": f"CODE EXECUTION FAILE WITH ERROR {e.returncode}", "COMMAND": f"{' '.join(cmd)}" } @mcp.tool async def web_search(query:str,max_results:int=20) -> dict: """ Tool: Web Search Name : web_search Description: This tool performs a search operation for a given user query. It should be invoked whenever a user makes a request, asks a question, or provides any input that requires information retrieval. Always use this tool to fetch relevant information before attempting to answer the user's query. Args: query:str = The user's query or question that needs to be searched. max_results:int = (OPTIONAL) The total number of results/links returned from the web search. DEFAULT value is 20. START with 5 and keep INCREASING the value till 20 if the results are unsatisfactory. Usage: Call this tool immediately upon receiving a user query to ensure that the LLM has the most accurate and updated context for generating a response. Output: A dictionary of results """ from ddgs import DDGS from googlesearch import search search_results = {} ddgs = DDGS(verify=False,timeout=5) results = ddgs.text(query, safesearch='off',max_results=max_results) if len(results) > 0: search_results["Duck_Duck_GO"]=results google_results=[] for result in search(query, num_results=max_results,unique=True,advanced=True,ssl_verify=True,sleep_interval=5): google_results.append(result) if len(google_results) > 0: search_results["Google"]=google_results if len(search_results) > 0: return search_results else: return { "result": "Unable todo a web search right now. Please try again later!!!!!" } def safe_join(workspace: str, user_path: str) -> bool: workspace_path = Path(workspace).resolve() # absolute path target_path = (workspace_path / user_path).resolve() # resolve user input # Check if the final path is inside the workspace if target_path.is_symlink() or not target_path.relative_to(workspace) or not str(target_path).startswith(str(workspace_path)): return False return True async def run_server(): logger.info(" ==================Starting MCP server ==================") await mcp.run_stdio_async()