Gmail-AI-Agent / mcp_client.py
Kayani9178's picture
New Files added
1136fac verified
#!/usr/bin/env python3
"""
MCP Client for Gmail Server
Provides both async and sync interfaces for communicating with Gmail MCP server
"""
import json
import logging
import asyncio
from typing import Dict, Any, List
# MCP (Modular Capability Protocol) specific imports
from mcp import ClientSession, StdioServerParameters # Core MCP components for client sessions and server configuration
from mcp.client.stdio import stdio_client # Utility to create a client connection via stdio
# Configure basic logging for the application
logging.basicConfig(level=logging.INFO) # Set default logging level to INFO
logger = logging.getLogger("mcp-client") # Get a logger instance specific to this client
class GmailMCPClientAsync:
"""
Asynchronous MCP client for interacting with the Gmail MCP server.
This client handles the low-level communication details for calling tools
and reading resources provided by the server.
"""
def __init__(self, server_path: str = "gmail_mcp_server.py"):
"""
Initializes the async Gmail MCP client.
Args:
server_path (str): The path to the Gmail MCP server script.
Defaults to "gmail_mcp_server.py".
"""
self.server_path = server_path
async def _get_session(self):
"""
Establishes and returns an MCP client session with the server.
This is a helper method primarily intended for internal use or more complex scenarios
where direct session management is needed.
"""
# Define parameters for launching the MCP server via stdio
server_params = StdioServerParameters(
command="python", # Command to execute the server
args=[self.server_path] # Arguments to the command (the server script)
)
# Create and return the stdio client connection
return stdio_client(server_params)
async def list_available_tools(self) -> List[Dict[str, Any]]:
"""
Retrieves a list of available tools from the Gmail MCP server.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each describing a tool.
Returns an empty list if an error occurs or no tools are found.
Raises:
Exception: Propagates exceptions from the MCP communication layer.
"""
try:
# Define server parameters for this specific call
server_params = StdioServerParameters(
command="python",
args=[self.server_path]
)
# Establish connection and session with the server
async with stdio_client(server_params) as (read, write): # Manages server process lifecycle
async with ClientSession(read, write) as session: # Manages MCP session
await session.initialize() # Initialize the MCP session
result = await session.list_tools() # Call the 'list_tools' MCP method
return result.tools # Extract the list of tools from the response
except Exception as e:
logger.error(f"❌ Failed to list tools: {e}")
raise # Re-raise the exception to be handled by the caller
async def list_available_resources(self) -> List[Dict[str, Any]]:
"""
Retrieves a list of available resources from the Gmail MCP server.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each describing a resource.
Returns an empty list if an error occurs or no resources are found.
Raises:
Exception: Propagates exceptions from the MCP communication layer.
"""
try:
server_params = StdioServerParameters(
command="python",
args=[self.server_path]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.list_resources() # Call the 'list_resources' MCP method
return result.resources # Extract the list of resources
except Exception as e:
logger.error(f"❌ Failed to list resources: {e}")
raise
async def fetch_emails(self, query: str = "", max_results: int = 10) -> Dict[str, Any]:
"""
Fetches emails from the Gmail account via the MCP server's 'fetch_emails' tool.
Args:
query (str): The Gmail search query string (e.g., "is:unread", "from:example@example.com").
max_results (int): The maximum number of emails to fetch.
Returns:
Dict[str, Any]: A dictionary containing the fetched emails or an error message.
Example success: {"emails": [...], "total_emails": 5}
Example error: {"error": "Error message"}
"""
try:
server_params = StdioServerParameters(
command="python",
args=[self.server_path]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Call the 'fetch_emails' tool on the server
result = await session.call_tool(
"fetch_emails",
arguments={ # Pass arguments to the tool
"query": query,
"max_results": max_results
}
)
# Process the tool's response
if result.content and len(result.content) > 0:
content = result.content[0] # Assuming the first content part holds the JSON data
if hasattr(content, 'text'):
return json.loads(content.text) # Parse JSON response
else:
logger.warning("fetch_emails response content has no 'text' attribute.")
return {"error": "No text content in response"}
else:
logger.warning("fetch_emails response has no content.")
return {"error": "No content in response"}
except Exception as e:
logger.error(f"Error fetching emails: {e}")
return {"error": str(e)} # Return error information as a dictionary
async def summarize_emails(self, days: int = 1, include_body: bool = False) -> Dict[str, Any]:
"""
Summarizes emails from the last N days via the MCP server's 'summarize_emails' tool.
Args:
days (int): The number of past days to consider for summarization.
include_body (bool): Whether to include the email body in the summary process.
Returns:
Dict[str, Any]: A dictionary containing the email summaries or an error message.
"""
try:
server_params = StdioServerParameters(
command="python",
args=[self.server_path]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Call the 'summarize_emails' tool
result = await session.call_tool(
"summarize_emails",
arguments={
"days": days,
"include_body": include_body
}
)
if result.content and len(result.content) > 0:
content = result.content[0]
if hasattr(content, 'text'):
return json.loads(content.text)
else:
logger.warning("summarize_emails response content has no 'text' attribute.")
return {"error": "No text content in response"}
else:
logger.warning("summarize_emails response has no content.")
return {"error": "No content in response"}
except Exception as e:
logger.error(f"Error summarizing emails: {e}")
return {"error": str(e)}
async def search_emails(self, from_address: str = "", subject_contains: str = "",
has_attachment: bool = False, is_unread: bool = False,
max_results: int = 10) -> Dict[str, Any]:
"""
Searches emails based on specified criteria via the MCP server's 'search_emails' tool.
Args:
from_address (str): Filter emails from this sender.
subject_contains (str): Filter emails whose subject contains this text.
has_attachment (bool): Filter emails that have attachments.
is_unread (bool): Filter unread emails.
max_results (int): The maximum number of search results to return.
Returns:
Dict[str, Any]: A dictionary containing the search results or an error message.
"""
try:
server_params = StdioServerParameters(
command="python",
args=[self.server_path]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Call the 'search_emails' tool
result = await session.call_tool(
"search_emails",
arguments={
"from_address": from_address,
"subject_contains": subject_contains,
"has_attachment": has_attachment,
"is_unread": is_unread,
"max_results": max_results
}
)
if result.content and len(result.content) > 0:
content = result.content[0]
if hasattr(content, 'text'):
return json.loads(content.text)
else:
logger.warning("search_emails response content has no 'text' attribute.")
return {"error": "No text content in response"}
else:
logger.warning("search_emails response has no content.")
return {"error": "No content in response"}
except Exception as e:
logger.error(f"Error searching emails: {e}")
return {"error": str(e)}
async def get_inbox_emails(self) -> List[Dict[str, Any]]:
"""
Retrieves emails directly from the 'gmail://inbox' resource on the MCP server.
Returns:
List[Dict[str, Any]]: A list of emails from the inbox, or an empty list on error.
"""
try:
server_params = StdioServerParameters(
command="python",
args=[self.server_path]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Read data from the 'gmail://inbox' resource
result = await session.read_resource("gmail://inbox")
if result.contents and len(result.contents) > 0:
content = result.contents[0] # Assuming first content part has the data
if hasattr(content, 'text'):
return json.loads(content.text) # Parse JSON response
else:
logger.warning("get_inbox_emails resource content has no 'text' attribute.")
return [] # Return empty list on parsing issue
else:
logger.warning("get_inbox_emails resource has no content.")
return [] # Return empty list if no content
except Exception as e:
logger.error(f"Error getting inbox emails: {e}")
return [] # Return empty list on exception
class GmailMCPClientSync:
"""
Synchronous wrapper for the GmailMCPClientAsync.
Provides a blocking interface to the async client's methods,
making it easier to use in synchronous codebases.
"""
def __init__(self, server_path: str = "gmail_mcp_server.py"):
"""
Initializes the synchronous Gmail MCP client.
Args:
server_path (str): The path to the Gmail MCP server script.
"""
self.async_client = GmailMCPClientAsync(server_path)
def _run_async(self, coro):
"""
Runs an awaitable coroutine in a synchronous context.
Handles existing asyncio event loops gracefully.
Args:
coro: The coroutine to run.
Returns:
The result of the coroutine.
"""
try:
# Attempt to get the current event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# If an event loop is already running (e.g., in a Jupyter notebook or another async framework),
# run the coroutine in a separate thread to avoid 'RuntimeError: asyncio.run() cannot be called from a running event loop'.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: # Use one thread for this task
future = executor.submit(asyncio.run, coro) # Submit asyncio.run(coro) to the thread
return future.result() # Wait for the result from the thread
else:
# If no loop is running, use loop.run_until_complete
return loop.run_until_complete(coro)
except RuntimeError:
# If get_event_loop() fails (e.g. "no current event loop in thread"),
# create a new event loop with asyncio.run().
return asyncio.run(coro)
def list_available_tools(self) -> List[Dict[str, Any]]:
"""Synchronous version of list_available_tools."""
return self._run_async(self.async_client.list_available_tools())
def list_available_resources(self) -> List[Dict[str, Any]]:
"""Synchronous version of list_available_resources."""
return self._run_async(self.async_client.list_available_resources())
def fetch_emails(self, query: str = "", max_results: int = 10) -> Dict[str, Any]:
"""Synchronous version of fetch_emails."""
return self._run_async(self.async_client.fetch_emails(query, max_results))
def summarize_emails(self, days: int = 1, include_body: bool = False) -> Dict[str, Any]:
"""Synchronous version of summarize_emails."""
return self._run_async(self.async_client.summarize_emails(days, include_body))
def search_emails(self, from_address: str = "", subject_contains: str = "",
has_attachment: bool = False, is_unread: bool = False,
max_results: int = 10) -> Dict[str, Any]:
"""Synchronous version of search_emails."""
return self._run_async(self.async_client.search_emails(
from_address, subject_contains, has_attachment, is_unread, max_results
))
def get_inbox_emails(self) -> List[Dict[str, Any]]:
"""Synchronous version of get_inbox_emails."""
return self._run_async(self.async_client.get_inbox_emails())
# --- Test Functions ---
# These functions demonstrate how to use the async and sync clients.
async def test_mcp_client_async():
"""
Tests the functionality of the GmailMCPClientAsync.
This function performs a series of calls to the async client methods
and prints the results or error messages.
"""
print("πŸ§ͺ Testing Async MCP Client...")
client = GmailMCPClientAsync() # Initialize the async client
try:
# Test listing available tools
print("\nπŸ” Listing available tools (async)...")
tools = await client.list_available_tools()
if tools:
print(f"βœ… Found {len(tools)} tools: {[tool.get('name', 'Unknown') for tool in tools]}")
else:
print("⚠️ No tools found or an error occurred.")
# Test listing available resources
print("\nπŸ“š Listing available resources (async)...")
resources = await client.list_available_resources()
if resources:
print(f"βœ… Found {len(resources)} resources: {[resource.get('uri', 'Unknown') for resource in resources]}")
else:
print("⚠️ No resources found or an error occurred.")
# Test fetching emails
print("\nπŸ“§ Fetching emails (async, query='is:unread', max_results=2)...")
result = await client.fetch_emails(query="is:unread", max_results=2) # Reduced max_results for quicker test
if 'error' not in result:
print(f"βœ… Fetched emails: {result.get('total_emails', 0)} total, showing {len(result.get('emails', []))} emails.")
else:
print(f"❌ Error fetching emails: {result['error']}")
except Exception as e:
print(f"❌ Async Test failed with an unexpected exception: {e}")
def test_mcp_client_sync():
"""
Tests the functionality of the GmailMCPClientSync.
This function performs a series of calls to the sync client methods
and prints the results or error messages.
"""
print("πŸ§ͺ Testing Sync MCP Client...")
client = GmailMCPClientSync() # Initialize the sync client
try:
# Test listing available tools
print("\nπŸ” Listing available tools (sync)...")
tools = client.list_available_tools()
if tools:
print(f"βœ… Found {len(tools)} tools: {[tool.get('name', 'Unknown') for tool in tools]}")
else:
print("⚠️ No tools found or an error occurred.")
# Test listing available resources
print("\nπŸ“š Listing available resources (sync)...")
resources = client.list_available_resources()
if resources:
print(f"βœ… Found {len(resources)} resources: {[resource.get('uri', 'Unknown') for resource in resources]}")
else:
print("⚠️ No resources found or an error occurred.")
# Test fetching emails
print("\nπŸ“§ Fetching emails (sync, query='is:unread', max_results=2)...")
result = client.fetch_emails(query="is:unread", max_results=2) # Reduced max_results for quicker test
if 'error' not in result:
print(f"βœ… Fetched emails: {result.get('total_emails', 0)} total, showing {len(result.get('emails', []))} emails.")
else:
print(f"❌ Error fetching emails: {result['error']}")
except Exception as e:
print(f"❌ Sync Test failed with an unexpected exception: {e}")
if __name__ == "__main__":
"""
Main execution block.
Runs tests for both synchronous and asynchronous MCP clients when the script is executed directly.
This is useful for verifying client functionality during development.
"""
print("πŸ”§ Gmail MCP Client Test Suite")
print("=" * 50)
# Test synchronous client first
test_mcp_client_sync()
print("\n" + "=" * 50 + "\n")
# Test asynchronous client
# asyncio.run() is used to execute the async test function
asyncio.run(test_mcp_client_async())
print("\n" + "=" * 50)
print("🏁 Test suite finished.")