from mcp.server.fastmcp import FastMCP from typing import Optional import requests import os from config import CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE # --- Initialize the FastMCP Server --- mcp = FastMCP("ZohoCRMAgent") # --- Token Refresh Utility --- def _get_valid_token_headers() -> dict: """Internal function to ensure a valid Zoho access token is available. This uses the refresh token flow to retrieve a fresh access token.""" token_url = "https://accounts.zoho.in/oauth/v2/token" params = { "refresh_token": REFRESH_TOKEN, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "grant_type": "refresh_token" } response = requests.post(token_url, params=params) if response.status_code == 200: access_token = response.json().get("access_token") return {"Authorization": f"Zoho-oauthtoken {access_token}"} else: raise Exception(f"Failed to refresh token: {response.text}") # --- MCP Tools for Zoho CRM and Zoho Books Operations --- @mcp.tool() def authenticate_zoho() -> str: """Refreshes and confirms Zoho CRM access token availability.""" _ = _get_valid_token_headers() return "Zoho CRM access token successfully refreshed." @mcp.tool() def create_record(module_name: str, record_data: dict) -> str: """Creates a new record in the specified Zoho CRM module.""" headers = _get_valid_token_headers() response = requests.post(f"{API_BASE}/{module_name}", headers=headers, json={"data": [record_data]}) if response.status_code in [200, 201]: return f"Record created successfully in {module_name}." return f"Error creating record: {response.text}" @mcp.tool() def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: """Fetches records from a specified Zoho CRM module.""" headers = _get_valid_token_headers() params = {"page": page, "per_page": per_page} response = requests.get(f"{API_BASE}/{module_name}", headers=headers, params=params) if response.status_code == 200: return response.json().get("data", []) return [f"Error retrieving records: {response.text}"] @mcp.tool() def update_record(module_name: str, record_id: str, data: dict) -> str: """Updates a record in a Zoho CRM module.""" headers = _get_valid_token_headers() response = requests.put(f"{API_BASE}/{module_name}/{record_id}", headers=headers, json={"data": [data]}) if response.status_code == 200: return f"Record {record_id} in {module_name} updated successfully." return f"Error updating record: {response.text}" @mcp.tool() def delete_record(module_name: str, record_id: str) -> str: """Deletes a record from the specified Zoho CRM module.""" headers = _get_valid_token_headers() response = requests.delete(f"{API_BASE}/{module_name}/{record_id}", headers=headers) if response.status_code == 200: return f"Record {record_id} in {module_name} deleted." return f"Error deleting record: {response.text}" @mcp.tool() def create_invoice(data: dict) -> str: """Creates an invoice in Zoho Books.""" headers = _get_valid_token_headers() response = requests.post(f"{API_BASE}/invoices", headers=headers, json={"data": [data]}) if response.status_code in [200, 201]: return "Invoice created successfully." return f"Error creating invoice: {response.text}" @mcp.tool() def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: """Extracts data from uploaded file (PDF/image) and returns structured info.""" # Placeholder for OCR + Gemini parsing logic # raw_text = perform_ocr(file_path) # structured_data = gemini_parse_json(raw_text) return { "status": "success", "file": os.path.basename(file_path), "extracted_data": f"Simulated structured data from {target_module} document." }