Zoho_mcp_client / zoho_client_mcp.py
vachaspathi's picture
Update zoho_client_mcp.py
4f8ac5b verified
from mcp.server.fastmcp import FastMCP
from typing import Optional
import requests
import os
from config import CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE
# --- Initialize the FastMCP Server ---
mcp = FastMCP("ZohoCRMAgent")
# --- Token Refresh Utility ---
def _get_valid_token_headers() -> dict:
"""Internal function to ensure a valid Zoho access token is available.
This uses the refresh token flow to retrieve a fresh access token."""
token_url = "https://accounts.zoho.in/oauth/v2/token"
params = {
"refresh_token": REFRESH_TOKEN,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "refresh_token"
}
response = requests.post(token_url, params=params)
if response.status_code == 200:
access_token = response.json().get("access_token")
return {"Authorization": f"Zoho-oauthtoken {access_token}"}
else:
raise Exception(f"Failed to refresh token: {response.text}")
# --- MCP Tools for Zoho CRM and Zoho Books Operations ---
@mcp.tool()
def authenticate_zoho() -> str:
"""Refreshes and confirms Zoho CRM access token availability."""
_ = _get_valid_token_headers()
return "Zoho CRM access token successfully refreshed."
@mcp.tool()
def create_record(module_name: str, record_data: dict) -> str:
"""Creates a new record in the specified Zoho CRM module."""
headers = _get_valid_token_headers()
response = requests.post(f"{API_BASE}/{module_name}", headers=headers, json={"data": [record_data]})
if response.status_code in [200, 201]:
return f"Record created successfully in {module_name}."
return f"Error creating record: {response.text}"
@mcp.tool()
def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list:
"""Fetches records from a specified Zoho CRM module."""
headers = _get_valid_token_headers()
params = {"page": page, "per_page": per_page}
response = requests.get(f"{API_BASE}/{module_name}", headers=headers, params=params)
if response.status_code == 200:
return response.json().get("data", [])
return [f"Error retrieving records: {response.text}"]
@mcp.tool()
def update_record(module_name: str, record_id: str, data: dict) -> str:
"""Updates a record in a Zoho CRM module."""
headers = _get_valid_token_headers()
response = requests.put(f"{API_BASE}/{module_name}/{record_id}", headers=headers, json={"data": [data]})
if response.status_code == 200:
return f"Record {record_id} in {module_name} updated successfully."
return f"Error updating record: {response.text}"
@mcp.tool()
def delete_record(module_name: str, record_id: str) -> str:
"""Deletes a record from the specified Zoho CRM module."""
headers = _get_valid_token_headers()
response = requests.delete(f"{API_BASE}/{module_name}/{record_id}", headers=headers)
if response.status_code == 200:
return f"Record {record_id} in {module_name} deleted."
return f"Error deleting record: {response.text}"
@mcp.tool()
def create_invoice(data: dict) -> str:
"""Creates an invoice in Zoho Books."""
headers = _get_valid_token_headers()
response = requests.post(f"{API_BASE}/invoices", headers=headers, json={"data": [data]})
if response.status_code in [200, 201]:
return "Invoice created successfully."
return f"Error creating invoice: {response.text}"
@mcp.tool()
def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict:
"""Extracts data from uploaded file (PDF/image) and returns structured info."""
# Placeholder for OCR + Gemini parsing logic
# raw_text = perform_ocr(file_path)
# structured_data = gemini_parse_json(raw_text)
return {
"status": "success",
"file": os.path.basename(file_path),
"extracted_data": f"Simulated structured data from {target_module} document."
}