|
|
|
""" |
|
MCP Client for Gmail Server |
|
Provides both async and sync interfaces for communicating with Gmail MCP server |
|
""" |
|
|
|
import json |
|
import logging |
|
import asyncio |
|
from typing import Dict, Any, List |
|
|
|
|
|
from mcp import ClientSession, StdioServerParameters |
|
from mcp.client.stdio import stdio_client |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger("mcp-client") |
|
|
|
class GmailMCPClientAsync: |
|
""" |
|
Asynchronous MCP client for interacting with the Gmail MCP server. |
|
This client handles the low-level communication details for calling tools |
|
and reading resources provided by the server. |
|
""" |
|
|
|
def __init__(self, server_path: str = "gmail_mcp_server.py"): |
|
""" |
|
Initializes the async Gmail MCP client. |
|
|
|
Args: |
|
server_path (str): The path to the Gmail MCP server script. |
|
Defaults to "gmail_mcp_server.py". |
|
""" |
|
self.server_path = server_path |
|
|
|
async def _get_session(self): |
|
""" |
|
Establishes and returns an MCP client session with the server. |
|
This is a helper method primarily intended for internal use or more complex scenarios |
|
where direct session management is needed. |
|
""" |
|
|
|
server_params = StdioServerParameters( |
|
command="python", |
|
args=[self.server_path] |
|
) |
|
|
|
|
|
return stdio_client(server_params) |
|
|
|
async def list_available_tools(self) -> List[Dict[str, Any]]: |
|
""" |
|
Retrieves a list of available tools from the Gmail MCP server. |
|
|
|
Returns: |
|
List[Dict[str, Any]]: A list of dictionaries, each describing a tool. |
|
Returns an empty list if an error occurs or no tools are found. |
|
Raises: |
|
Exception: Propagates exceptions from the MCP communication layer. |
|
""" |
|
try: |
|
|
|
server_params = StdioServerParameters( |
|
command="python", |
|
args=[self.server_path] |
|
) |
|
|
|
|
|
async with stdio_client(server_params) as (read, write): |
|
async with ClientSession(read, write) as session: |
|
await session.initialize() |
|
result = await session.list_tools() |
|
return result.tools |
|
except Exception as e: |
|
logger.error(f"β Failed to list tools: {e}") |
|
raise |
|
|
|
async def list_available_resources(self) -> List[Dict[str, Any]]: |
|
""" |
|
Retrieves a list of available resources from the Gmail MCP server. |
|
|
|
Returns: |
|
List[Dict[str, Any]]: A list of dictionaries, each describing a resource. |
|
Returns an empty list if an error occurs or no resources are found. |
|
Raises: |
|
Exception: Propagates exceptions from the MCP communication layer. |
|
""" |
|
try: |
|
server_params = StdioServerParameters( |
|
command="python", |
|
args=[self.server_path] |
|
) |
|
|
|
async with stdio_client(server_params) as (read, write): |
|
async with ClientSession(read, write) as session: |
|
await session.initialize() |
|
result = await session.list_resources() |
|
return result.resources |
|
except Exception as e: |
|
logger.error(f"β Failed to list resources: {e}") |
|
raise |
|
|
|
async def fetch_emails(self, query: str = "", max_results: int = 10) -> Dict[str, Any]: |
|
""" |
|
Fetches emails from the Gmail account via the MCP server's 'fetch_emails' tool. |
|
|
|
Args: |
|
query (str): The Gmail search query string (e.g., "is:unread", "from:example@example.com"). |
|
max_results (int): The maximum number of emails to fetch. |
|
|
|
Returns: |
|
Dict[str, Any]: A dictionary containing the fetched emails or an error message. |
|
Example success: {"emails": [...], "total_emails": 5} |
|
Example error: {"error": "Error message"} |
|
""" |
|
try: |
|
server_params = StdioServerParameters( |
|
command="python", |
|
args=[self.server_path] |
|
) |
|
|
|
async with stdio_client(server_params) as (read, write): |
|
async with ClientSession(read, write) as session: |
|
await session.initialize() |
|
|
|
result = await session.call_tool( |
|
"fetch_emails", |
|
arguments={ |
|
"query": query, |
|
"max_results": max_results |
|
} |
|
) |
|
|
|
|
|
if result.content and len(result.content) > 0: |
|
content = result.content[0] |
|
if hasattr(content, 'text'): |
|
return json.loads(content.text) |
|
else: |
|
logger.warning("fetch_emails response content has no 'text' attribute.") |
|
return {"error": "No text content in response"} |
|
else: |
|
logger.warning("fetch_emails response has no content.") |
|
return {"error": "No content in response"} |
|
|
|
except Exception as e: |
|
logger.error(f"Error fetching emails: {e}") |
|
return {"error": str(e)} |
|
|
|
async def summarize_emails(self, days: int = 1, include_body: bool = False) -> Dict[str, Any]: |
|
""" |
|
Summarizes emails from the last N days via the MCP server's 'summarize_emails' tool. |
|
|
|
Args: |
|
days (int): The number of past days to consider for summarization. |
|
include_body (bool): Whether to include the email body in the summary process. |
|
|
|
Returns: |
|
Dict[str, Any]: A dictionary containing the email summaries or an error message. |
|
""" |
|
try: |
|
server_params = StdioServerParameters( |
|
command="python", |
|
args=[self.server_path] |
|
) |
|
|
|
async with stdio_client(server_params) as (read, write): |
|
async with ClientSession(read, write) as session: |
|
await session.initialize() |
|
|
|
result = await session.call_tool( |
|
"summarize_emails", |
|
arguments={ |
|
"days": days, |
|
"include_body": include_body |
|
} |
|
) |
|
|
|
if result.content and len(result.content) > 0: |
|
content = result.content[0] |
|
if hasattr(content, 'text'): |
|
return json.loads(content.text) |
|
else: |
|
logger.warning("summarize_emails response content has no 'text' attribute.") |
|
return {"error": "No text content in response"} |
|
else: |
|
logger.warning("summarize_emails response has no content.") |
|
return {"error": "No content in response"} |
|
|
|
except Exception as e: |
|
logger.error(f"Error summarizing emails: {e}") |
|
return {"error": str(e)} |
|
|
|
async def search_emails(self, from_address: str = "", subject_contains: str = "", |
|
has_attachment: bool = False, is_unread: bool = False, |
|
max_results: int = 10) -> Dict[str, Any]: |
|
""" |
|
Searches emails based on specified criteria via the MCP server's 'search_emails' tool. |
|
|
|
Args: |
|
from_address (str): Filter emails from this sender. |
|
subject_contains (str): Filter emails whose subject contains this text. |
|
has_attachment (bool): Filter emails that have attachments. |
|
is_unread (bool): Filter unread emails. |
|
max_results (int): The maximum number of search results to return. |
|
|
|
Returns: |
|
Dict[str, Any]: A dictionary containing the search results or an error message. |
|
""" |
|
try: |
|
server_params = StdioServerParameters( |
|
command="python", |
|
args=[self.server_path] |
|
) |
|
|
|
async with stdio_client(server_params) as (read, write): |
|
async with ClientSession(read, write) as session: |
|
await session.initialize() |
|
|
|
result = await session.call_tool( |
|
"search_emails", |
|
arguments={ |
|
"from_address": from_address, |
|
"subject_contains": subject_contains, |
|
"has_attachment": has_attachment, |
|
"is_unread": is_unread, |
|
"max_results": max_results |
|
} |
|
) |
|
|
|
if result.content and len(result.content) > 0: |
|
content = result.content[0] |
|
if hasattr(content, 'text'): |
|
return json.loads(content.text) |
|
else: |
|
logger.warning("search_emails response content has no 'text' attribute.") |
|
return {"error": "No text content in response"} |
|
else: |
|
logger.warning("search_emails response has no content.") |
|
return {"error": "No content in response"} |
|
|
|
except Exception as e: |
|
logger.error(f"Error searching emails: {e}") |
|
return {"error": str(e)} |
|
|
|
async def get_inbox_emails(self) -> List[Dict[str, Any]]: |
|
""" |
|
Retrieves emails directly from the 'gmail://inbox' resource on the MCP server. |
|
|
|
Returns: |
|
List[Dict[str, Any]]: A list of emails from the inbox, or an empty list on error. |
|
""" |
|
try: |
|
server_params = StdioServerParameters( |
|
command="python", |
|
args=[self.server_path] |
|
) |
|
|
|
async with stdio_client(server_params) as (read, write): |
|
async with ClientSession(read, write) as session: |
|
await session.initialize() |
|
|
|
result = await session.read_resource("gmail://inbox") |
|
|
|
if result.contents and len(result.contents) > 0: |
|
content = result.contents[0] |
|
if hasattr(content, 'text'): |
|
return json.loads(content.text) |
|
else: |
|
logger.warning("get_inbox_emails resource content has no 'text' attribute.") |
|
return [] |
|
else: |
|
logger.warning("get_inbox_emails resource has no content.") |
|
return [] |
|
|
|
except Exception as e: |
|
logger.error(f"Error getting inbox emails: {e}") |
|
return [] |
|
|
|
class GmailMCPClientSync: |
|
""" |
|
Synchronous wrapper for the GmailMCPClientAsync. |
|
Provides a blocking interface to the async client's methods, |
|
making it easier to use in synchronous codebases. |
|
""" |
|
|
|
def __init__(self, server_path: str = "gmail_mcp_server.py"): |
|
""" |
|
Initializes the synchronous Gmail MCP client. |
|
|
|
Args: |
|
server_path (str): The path to the Gmail MCP server script. |
|
""" |
|
self.async_client = GmailMCPClientAsync(server_path) |
|
|
|
def _run_async(self, coro): |
|
""" |
|
Runs an awaitable coroutine in a synchronous context. |
|
Handles existing asyncio event loops gracefully. |
|
|
|
Args: |
|
coro: The coroutine to run. |
|
|
|
Returns: |
|
The result of the coroutine. |
|
""" |
|
try: |
|
|
|
loop = asyncio.get_event_loop() |
|
if loop.is_running(): |
|
|
|
|
|
import concurrent.futures |
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: |
|
future = executor.submit(asyncio.run, coro) |
|
return future.result() |
|
else: |
|
|
|
return loop.run_until_complete(coro) |
|
except RuntimeError: |
|
|
|
|
|
return asyncio.run(coro) |
|
|
|
def list_available_tools(self) -> List[Dict[str, Any]]: |
|
"""Synchronous version of list_available_tools.""" |
|
return self._run_async(self.async_client.list_available_tools()) |
|
|
|
def list_available_resources(self) -> List[Dict[str, Any]]: |
|
"""Synchronous version of list_available_resources.""" |
|
return self._run_async(self.async_client.list_available_resources()) |
|
|
|
def fetch_emails(self, query: str = "", max_results: int = 10) -> Dict[str, Any]: |
|
"""Synchronous version of fetch_emails.""" |
|
return self._run_async(self.async_client.fetch_emails(query, max_results)) |
|
|
|
def summarize_emails(self, days: int = 1, include_body: bool = False) -> Dict[str, Any]: |
|
"""Synchronous version of summarize_emails.""" |
|
return self._run_async(self.async_client.summarize_emails(days, include_body)) |
|
|
|
def search_emails(self, from_address: str = "", subject_contains: str = "", |
|
has_attachment: bool = False, is_unread: bool = False, |
|
max_results: int = 10) -> Dict[str, Any]: |
|
"""Synchronous version of search_emails.""" |
|
return self._run_async(self.async_client.search_emails( |
|
from_address, subject_contains, has_attachment, is_unread, max_results |
|
)) |
|
|
|
def get_inbox_emails(self) -> List[Dict[str, Any]]: |
|
"""Synchronous version of get_inbox_emails.""" |
|
return self._run_async(self.async_client.get_inbox_emails()) |
|
|
|
|
|
|
|
|
|
async def test_mcp_client_async(): |
|
""" |
|
Tests the functionality of the GmailMCPClientAsync. |
|
This function performs a series of calls to the async client methods |
|
and prints the results or error messages. |
|
""" |
|
print("π§ͺ Testing Async MCP Client...") |
|
|
|
client = GmailMCPClientAsync() |
|
|
|
try: |
|
|
|
print("\nπ Listing available tools (async)...") |
|
tools = await client.list_available_tools() |
|
if tools: |
|
print(f"β
Found {len(tools)} tools: {[tool.get('name', 'Unknown') for tool in tools]}") |
|
else: |
|
print("β οΈ No tools found or an error occurred.") |
|
|
|
|
|
print("\nπ Listing available resources (async)...") |
|
resources = await client.list_available_resources() |
|
if resources: |
|
print(f"β
Found {len(resources)} resources: {[resource.get('uri', 'Unknown') for resource in resources]}") |
|
else: |
|
print("β οΈ No resources found or an error occurred.") |
|
|
|
|
|
print("\nπ§ Fetching emails (async, query='is:unread', max_results=2)...") |
|
result = await client.fetch_emails(query="is:unread", max_results=2) |
|
if 'error' not in result: |
|
print(f"β
Fetched emails: {result.get('total_emails', 0)} total, showing {len(result.get('emails', []))} emails.") |
|
else: |
|
print(f"β Error fetching emails: {result['error']}") |
|
|
|
except Exception as e: |
|
print(f"β Async Test failed with an unexpected exception: {e}") |
|
|
|
def test_mcp_client_sync(): |
|
""" |
|
Tests the functionality of the GmailMCPClientSync. |
|
This function performs a series of calls to the sync client methods |
|
and prints the results or error messages. |
|
""" |
|
print("π§ͺ Testing Sync MCP Client...") |
|
|
|
client = GmailMCPClientSync() |
|
|
|
try: |
|
|
|
print("\nπ Listing available tools (sync)...") |
|
tools = client.list_available_tools() |
|
if tools: |
|
print(f"β
Found {len(tools)} tools: {[tool.get('name', 'Unknown') for tool in tools]}") |
|
else: |
|
print("β οΈ No tools found or an error occurred.") |
|
|
|
|
|
print("\nπ Listing available resources (sync)...") |
|
resources = client.list_available_resources() |
|
if resources: |
|
print(f"β
Found {len(resources)} resources: {[resource.get('uri', 'Unknown') for resource in resources]}") |
|
else: |
|
print("β οΈ No resources found or an error occurred.") |
|
|
|
|
|
print("\nπ§ Fetching emails (sync, query='is:unread', max_results=2)...") |
|
result = client.fetch_emails(query="is:unread", max_results=2) |
|
if 'error' not in result: |
|
print(f"β
Fetched emails: {result.get('total_emails', 0)} total, showing {len(result.get('emails', []))} emails.") |
|
else: |
|
print(f"β Error fetching emails: {result['error']}") |
|
|
|
except Exception as e: |
|
print(f"β Sync Test failed with an unexpected exception: {e}") |
|
|
|
if __name__ == "__main__": |
|
""" |
|
Main execution block. |
|
Runs tests for both synchronous and asynchronous MCP clients when the script is executed directly. |
|
This is useful for verifying client functionality during development. |
|
""" |
|
print("π§ Gmail MCP Client Test Suite") |
|
print("=" * 50) |
|
|
|
|
|
test_mcp_client_sync() |
|
|
|
print("\n" + "=" * 50 + "\n") |
|
|
|
|
|
|
|
asyncio.run(test_mcp_client_async()) |
|
|
|
print("\n" + "=" * 50) |
|
print("π Test suite finished.") |