|
|
|
""" |
|
Gmail MCP Server with debug output - Modified version that shows startup info |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import logging |
|
import sys |
|
import time |
|
import os |
|
from typing import Any, Dict, List, Optional, Sequence |
|
from dataclasses import dataclass |
|
from datetime import datetime, timedelta |
|
import base64 |
|
import email |
|
from email.mime.text import MIMEText |
|
import re |
|
|
|
|
|
from google.auth.transport.requests import Request |
|
from google.oauth2.credentials import Credentials |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from googleapiclient.discovery import build |
|
from googleapiclient.errors import HttpError |
|
|
|
|
|
from mcp.server import Server |
|
from mcp.server.models import InitializationOptions |
|
from mcp.types import ( |
|
Resource, |
|
Tool, |
|
|
|
|
|
|
|
|
|
) |
|
import mcp.server.stdio |
|
import mcp.types as types |
|
|
|
|
|
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[logging.StreamHandler(sys.stderr)] |
|
) |
|
logger = logging.getLogger("gmail-mcp-server") |
|
|
|
|
|
class PerformanceTracker: |
|
def __init__(self, operation_name: str): |
|
self.operation_name = operation_name |
|
self.start_time = None |
|
|
|
def __enter__(self): |
|
self.start_time = time.time() |
|
logger.info(f"π Starting operation: {self.operation_name}") |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
duration = time.time() - self.start_time |
|
if exc_type is None: |
|
logger.info(f"β
Completed operation: {self.operation_name} in {duration:.2f}s") |
|
else: |
|
logger.error(f"β Failed operation: {self.operation_name} after {duration:.2f}s - {exc_val}") |
|
|
|
@dataclass |
|
class EmailData: |
|
"""Structure to hold email data""" |
|
id: str |
|
thread_id: str |
|
subject: str |
|
sender: str |
|
recipient: str |
|
date: str |
|
body: str |
|
snippet: str |
|
labels: List[str] |
|
is_unread: bool |
|
|
|
class GmailClient: |
|
"""Gmail API client wrapper""" |
|
|
|
def __init__(self, credentials_file: str = "credentials.json", token_file: str = "token.json"): |
|
self.credentials_file = credentials_file |
|
self.token_file = token_file |
|
self.service = None |
|
self.authenticated_user = None |
|
logger.info("π§ Initializing Gmail client...") |
|
logger.info(f"π Using credentials file: {credentials_file}") |
|
logger.info(f"π Using token file: {token_file}") |
|
self._authenticate() |
|
|
|
def _authenticate(self): |
|
"""Authenticate with Gmail API""" |
|
with PerformanceTracker("Gmail Authentication"): |
|
logger.info("π Starting Gmail authentication process...") |
|
creds = None |
|
|
|
|
|
if not os.path.exists(self.token_file): |
|
logger.warning(f"β οΈ Token file '{self.token_file}' not found - checking if it was created by app.py") |
|
|
|
|
|
try: |
|
creds = Credentials.from_authorized_user_file(self.token_file, SCOPES) |
|
logger.info("π Successfully loaded existing credentials from token file") |
|
logger.info(f"π Token expiry: {creds.expiry}") |
|
logger.info(f"π Has refresh token: {bool(creds.refresh_token)}") |
|
except FileNotFoundError: |
|
logger.warning(f"β οΈ Token file '{self.token_file}' not found - first time setup") |
|
except Exception as e: |
|
logger.error(f"β Error loading token file: {e}") |
|
|
|
|
|
if not creds or not creds.valid: |
|
if creds and creds.expired and creds.refresh_token: |
|
logger.info("π Credentials expired, attempting to refresh...") |
|
try: |
|
creds.refresh(Request()) |
|
logger.info("β
Successfully refreshed expired credentials") |
|
except Exception as e: |
|
logger.error(f"β Failed to refresh credentials: {e}") |
|
creds = None |
|
else: |
|
logger.info("π Getting new credentials via OAuth flow...") |
|
try: |
|
flow = InstalledAppFlow.from_client_secrets_file( |
|
self.credentials_file, SCOPES) |
|
logger.info("π Starting local OAuth server...") |
|
creds = flow.run_local_server(port=0) |
|
logger.info("β
OAuth flow completed successfully") |
|
except FileNotFoundError: |
|
logger.error(f"β Credentials file '{self.credentials_file}' not found!") |
|
logger.error("π Please download your OAuth 2.0 credentials from Google Cloud Console") |
|
logger.error("π Visit: https://console.cloud.google.com/apis/credentials") |
|
raise |
|
except Exception as e: |
|
logger.error(f"β OAuth flow failed: {e}") |
|
raise |
|
|
|
|
|
try: |
|
with open(self.token_file, 'w') as token: |
|
token.write(creds.to_json()) |
|
logger.info(f"πΎ Saved credentials to {self.token_file} for future use") |
|
except Exception as e: |
|
logger.error(f"β Failed to save credentials: {e}") |
|
|
|
try: |
|
self.service = build('gmail', 'v1', credentials=creds) |
|
|
|
|
|
profile = self.service.users().getProfile(userId='me').execute() |
|
self.authenticated_user = profile.get('emailAddress', 'Unknown') |
|
total_messages = profile.get('messagesTotal', 0) |
|
|
|
logger.info("π Gmail authentication successful!") |
|
logger.info(f"π€ Authenticated user: {self.authenticated_user}") |
|
logger.info(f"π Total messages in account: {total_messages:,}") |
|
logger.info(f"π§ Gmail API version: v1") |
|
|
|
except Exception as e: |
|
logger.error(f"β Failed to build Gmail service: {e}") |
|
raise |
|
|
|
def _decode_message_part(self, part: Dict) -> str: |
|
"""Decode email message part""" |
|
try: |
|
data = part['body'].get('data', '') |
|
if data: |
|
decoded = base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore') |
|
return decoded |
|
except Exception as e: |
|
logger.debug(f"β οΈ Failed to decode message part: {e}") |
|
return '' |
|
|
|
def _extract_email_body(self, message: Dict) -> str: |
|
"""Extract email body from message""" |
|
body = "" |
|
|
|
try: |
|
if 'parts' in message['payload']: |
|
logger.debug(f"π Processing multipart message with {len(message['payload']['parts'])} parts") |
|
for part in message['payload']['parts']: |
|
if part['mimeType'] == 'text/plain': |
|
body += self._decode_message_part(part) |
|
elif part['mimeType'] == 'text/html': |
|
html_body = self._decode_message_part(part) |
|
|
|
body += re.sub('<.*?>', '', html_body) |
|
else: |
|
logger.debug(f"π Processing single-part message: {message['payload']['mimeType']}") |
|
if message['payload']['mimeType'] in ['text/plain', 'text/html']: |
|
body = self._decode_message_part(message['payload']) |
|
if message['payload']['mimeType'] == 'text/html': |
|
body = re.sub('<.*?>', '', body) |
|
except Exception as e: |
|
logger.warning(f"β οΈ Error extracting email body: {e}") |
|
|
|
return body.strip() |
|
|
|
def _get_header_value(self, headers: List[Dict], name: str) -> str: |
|
"""Get header value by name""" |
|
for header in headers: |
|
if header['name'].lower() == name.lower(): |
|
return header['value'] |
|
return '' |
|
|
|
def get_emails(self, query: str = '', max_results: int = 10) -> List[EmailData]: |
|
"""Fetch emails from Gmail""" |
|
with PerformanceTracker(f"Fetch emails (query='{query}', max={max_results})"): |
|
logger.info(f"π₯ Fetching emails for user: {self.authenticated_user}") |
|
logger.info(f"π Search query: '{query}' (empty = all emails)") |
|
logger.info(f"π Max results: {max_results}") |
|
|
|
try: |
|
|
|
search_start = time.time() |
|
results = self.service.users().messages().list( |
|
userId='me', |
|
q=query, |
|
maxResults=max_results |
|
).execute() |
|
search_time = time.time() - search_start |
|
|
|
messages = results.get('messages', []) |
|
total_found = results.get('resultSizeEstimate', 0) |
|
|
|
logger.info(f"π Gmail search completed in {search_time:.2f}s") |
|
logger.info(f"π Found {len(messages)} messages to process (estimated total: {total_found})") |
|
|
|
if not messages: |
|
logger.info("π No messages found matching the query") |
|
return [] |
|
|
|
emails = [] |
|
fetch_start = time.time() |
|
|
|
for i, msg in enumerate(messages, 1): |
|
logger.debug(f"π¨ Processing message {i}/{len(messages)}: {msg['id']}") |
|
|
|
try: |
|
|
|
message = self.service.users().messages().get( |
|
userId='me', |
|
id=msg['id'], |
|
format='full' |
|
).execute() |
|
|
|
headers = message['payload'].get('headers', []) |
|
subject = self._get_header_value(headers, 'Subject') |
|
sender = self._get_header_value(headers, 'From') |
|
|
|
email_data = EmailData( |
|
id=message['id'], |
|
thread_id=message['threadId'], |
|
subject=subject, |
|
sender=sender, |
|
recipient=self._get_header_value(headers, 'To'), |
|
date=self._get_header_value(headers, 'Date'), |
|
body=self._extract_email_body(message), |
|
snippet=message.get('snippet', ''), |
|
labels=message.get('labelIds', []), |
|
is_unread='UNREAD' in message.get('labelIds', []) |
|
) |
|
|
|
emails.append(email_data) |
|
|
|
|
|
status = "π΄ UNREAD" if email_data.is_unread else "β
READ" |
|
logger.debug(f" π§ {status} | From: {sender[:50]}... | Subject: {subject[:50]}...") |
|
|
|
except Exception as e: |
|
logger.error(f"β Error processing message {msg['id']}: {e}") |
|
continue |
|
|
|
fetch_time = time.time() - fetch_start |
|
unread_count = sum(1 for email in emails if email.is_unread) |
|
|
|
logger.info(f"β
Successfully processed {len(emails)} emails in {fetch_time:.2f}s") |
|
logger.info(f"π Email breakdown: {unread_count} unread, {len(emails) - unread_count} read") |
|
|
|
|
|
senders = {} |
|
for email in emails: |
|
sender = email.sender.split('<')[0].strip() if '<' in email.sender else email.sender |
|
senders[sender] = senders.get(sender, 0) + 1 |
|
|
|
if senders: |
|
top_senders = sorted(senders.items(), key=lambda x: x[1], reverse=True)[:3] |
|
logger.info(f"π₯ Top senders: {', '.join([f'{s}({c})' for s, c in top_senders])}") |
|
|
|
return emails |
|
|
|
except HttpError as error: |
|
logger.error(f"β Gmail API HTTP error: {error}") |
|
logger.error(f"π Error details: {error.error_details if hasattr(error, 'error_details') else 'No details'}") |
|
raise |
|
except Exception as error: |
|
logger.error(f"β Unexpected error fetching emails: {error}") |
|
raise |
|
|
|
class GmailMCPServer: |
|
"""MCP Server for Gmail integration""" |
|
|
|
def __init__(self): |
|
logger.info("π Initializing Gmail MCP Server...") |
|
self.server = Server("gmail-mcp-server") |
|
self.gmail_client = None |
|
self.client_connections = 0 |
|
self.total_requests = 0 |
|
self._setup_handlers() |
|
logger.info("β
Gmail MCP Server initialized successfully!") |
|
|
|
def _setup_handlers(self): |
|
"""Setup MCP server handlers""" |
|
logger.info("π§ Setting up MCP handlers...") |
|
|
|
@self.server.list_resources() |
|
async def handle_list_resources() -> List[Resource]: |
|
"""List available Gmail resources""" |
|
self.total_requests += 1 |
|
logger.info(f"π [REQUEST #{self.total_requests}] Handling list_resources request") |
|
logger.info("π Available resources: gmail://inbox, gmail://unread, gmail://recent") |
|
|
|
resources = [ |
|
Resource( |
|
uri="gmail://inbox", |
|
name="Gmail Inbox", |
|
description="Access to Gmail inbox messages", |
|
mimeType="application/json" |
|
), |
|
Resource( |
|
uri="gmail://unread", |
|
name="Unread Emails", |
|
description="Access to unread Gmail messages", |
|
mimeType="application/json" |
|
), |
|
Resource( |
|
uri="gmail://recent", |
|
name="Recent Emails", |
|
description="Access to recent Gmail messages", |
|
mimeType="application/json" |
|
) |
|
] |
|
|
|
logger.info(f"β
Returned {len(resources)} available resources") |
|
return resources |
|
|
|
@self.server.read_resource() |
|
async def handle_read_resource(uri: str) -> str: |
|
"""Read Gmail resource""" |
|
self.total_requests += 1 |
|
logger.info(f"π [REQUEST #{self.total_requests}] Handling read_resource request") |
|
logger.info(f"π Resource URI: {uri}") |
|
|
|
if not self.gmail_client: |
|
logger.info("π§ Initializing Gmail client for first-time use") |
|
self.gmail_client = GmailClient() |
|
|
|
try: |
|
if uri == "gmail://inbox": |
|
logger.info("π₯ Fetching inbox messages...") |
|
emails = self.gmail_client.get_emails(query="in:inbox", max_results=20) |
|
elif uri == "gmail://unread": |
|
logger.info("π΄ Fetching unread messages...") |
|
emails = self.gmail_client.get_emails(query="is:unread", max_results=20) |
|
elif uri == "gmail://recent": |
|
logger.info("π Fetching recent messages (last 7 days)...") |
|
emails = self.gmail_client.get_emails(query="newer_than:7d", max_results=20) |
|
else: |
|
logger.error(f"β Unknown resource URI: {uri}") |
|
raise ValueError(f"Unknown resource URI: {uri}") |
|
|
|
|
|
emails_data = [] |
|
for email in emails: |
|
emails_data.append({ |
|
"id": email.id, |
|
"subject": email.subject, |
|
"sender": email.sender, |
|
"recipient": email.recipient, |
|
"date": email.date, |
|
"body": email.body[:1000] + "..." if len(email.body) > 1000 else email.body, |
|
"snippet": email.snippet, |
|
"is_unread": email.is_unread, |
|
"labels": email.labels |
|
}) |
|
|
|
result = json.dumps(emails_data, indent=2) |
|
logger.info(f"β
Successfully processed resource {uri}") |
|
logger.info(f"π Returned {len(emails_data)} emails ({len(result)} characters)") |
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"β Error reading resource {uri}: {e}") |
|
raise |
|
|
|
@self.server.list_tools() |
|
async def handle_list_tools() -> List[Tool]: |
|
"""List available Gmail tools""" |
|
self.total_requests += 1 |
|
logger.info(f"π οΈ [REQUEST #{self.total_requests}] Handling list_tools request") |
|
|
|
tools = [ |
|
Tool( |
|
name="fetch_emails", |
|
description="Fetch emails from Gmail with optional search query", |
|
inputSchema={ |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"type": "string", |
|
"description": "Gmail search query (optional)", |
|
"default": "" |
|
}, |
|
"max_results": { |
|
"type": "integer", |
|
"description": "Maximum number of emails to fetch", |
|
"default": 10, |
|
"minimum": 1, |
|
"maximum": 50 |
|
} |
|
} |
|
} |
|
), |
|
Tool( |
|
name="summarize_emails", |
|
description="Get a summary of recent emails", |
|
inputSchema={ |
|
"type": "object", |
|
"properties": { |
|
"days": { |
|
"type": "integer", |
|
"description": "Number of days to look back", |
|
"default": 1, |
|
"minimum": 1, |
|
"maximum": 30 |
|
}, |
|
"include_body": { |
|
"type": "boolean", |
|
"description": "Include email body in summary", |
|
"default": False |
|
} |
|
} |
|
} |
|
), |
|
Tool( |
|
name="search_emails", |
|
description="Search emails with specific criteria", |
|
inputSchema={ |
|
"type": "object", |
|
"properties": { |
|
"from_address": { |
|
"type": "string", |
|
"description": "Filter by sender email address" |
|
}, |
|
"subject_contains": { |
|
"type": "string", |
|
"description": "Filter by subject containing text" |
|
}, |
|
"has_attachment": { |
|
"type": "boolean", |
|
"description": "Filter emails with attachments" |
|
}, |
|
"is_unread": { |
|
"type": "boolean", |
|
"description": "Filter unread emails only" |
|
}, |
|
"max_results": { |
|
"type": "integer", |
|
"description": "Maximum number of results", |
|
"default": 10 |
|
} |
|
} |
|
} |
|
) |
|
] |
|
|
|
logger.info(f"π οΈ Available tools: {[tool.name for tool in tools]}") |
|
logger.info(f"β
Returned {len(tools)} available tools") |
|
return tools |
|
|
|
@self.server.call_tool() |
|
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: |
|
"""Handle tool calls""" |
|
self.total_requests += 1 |
|
logger.info(f"π οΈ [REQUEST #{self.total_requests}] Handling tool call: {name}") |
|
logger.info(f"π Tool arguments: {json.dumps(arguments, indent=2)}") |
|
|
|
if not self.gmail_client: |
|
logger.info("π§ Initializing Gmail client for tool call") |
|
self.gmail_client = GmailClient() |
|
|
|
try: |
|
if name == "fetch_emails": |
|
query = arguments.get("query", "") |
|
max_results = arguments.get("max_results", 10) |
|
|
|
logger.info(f"π₯ Tool: fetch_emails | Query: '{query}' | Max: {max_results}") |
|
emails = self.gmail_client.get_emails(query=query, max_results=max_results) |
|
|
|
result = { |
|
"total_emails": len(emails), |
|
"query_used": query, |
|
"emails": [] |
|
} |
|
|
|
for email in emails: |
|
result["emails"].append({ |
|
"id": email.id, |
|
"subject": email.subject, |
|
"sender": email.sender, |
|
"date": email.date, |
|
"snippet": email.snippet, |
|
"is_unread": email.is_unread, |
|
"body_preview": email.body[:200] + "..." if len(email.body) > 200 else email.body |
|
}) |
|
|
|
logger.info(f"β
fetch_emails completed: {len(emails)} emails returned") |
|
return [types.TextContent( |
|
type="text", |
|
text=json.dumps(result, indent=2) |
|
)] |
|
|
|
elif name == "summarize_emails": |
|
days = arguments.get("days", 1) |
|
include_body = arguments.get("include_body", False) |
|
|
|
logger.info(f"π Tool: summarize_emails | Days: {days} | Include body: {include_body}") |
|
query = f"newer_than:{days}d" |
|
emails = self.gmail_client.get_emails(query=query, max_results=50) |
|
|
|
|
|
summary = { |
|
"period": f"Last {days} days", |
|
"total_emails": len(emails), |
|
"unread_count": sum(1 for email in emails if email.is_unread), |
|
"top_senders": {}, |
|
"subjects": [] |
|
} |
|
|
|
|
|
for email in emails: |
|
sender = email.sender.split('<')[0].strip() if '<' in email.sender else email.sender |
|
summary["top_senders"][sender] = summary["top_senders"].get(sender, 0) + 1 |
|
|
|
|
|
summary["top_senders"] = dict(sorted( |
|
summary["top_senders"].items(), |
|
key=lambda x: x[1], |
|
reverse=True |
|
)[:5]) |
|
|
|
|
|
for email in emails[:10]: |
|
email_summary = { |
|
"subject": email.subject, |
|
"sender": email.sender, |
|
"date": email.date, |
|
"is_unread": email.is_unread |
|
} |
|
if include_body: |
|
email_summary["body_preview"] = email.body[:300] + "..." if len(email.body) > 300 else email.body |
|
|
|
summary["subjects"].append(email_summary) |
|
|
|
logger.info(f"β
summarize_emails completed: {summary['total_emails']} emails, {summary['unread_count']} unread") |
|
return [types.TextContent( |
|
type="text", |
|
text=json.dumps(summary, indent=2) |
|
)] |
|
|
|
elif name == "search_emails": |
|
|
|
query_parts = [] |
|
|
|
if "from_address" in arguments and arguments["from_address"]: |
|
query_parts.append(f"from:{arguments['from_address']}") |
|
logger.info(f"π Search filter: from={arguments['from_address']}") |
|
|
|
if "subject_contains" in arguments and arguments["subject_contains"]: |
|
query_parts.append(f"subject:{arguments['subject_contains']}") |
|
logger.info(f"π Search filter: subject contains '{arguments['subject_contains']}'") |
|
|
|
if arguments.get("has_attachment"): |
|
query_parts.append("has:attachment") |
|
logger.info("π Search filter: has attachment") |
|
|
|
if arguments.get("is_unread"): |
|
query_parts.append("is:unread") |
|
logger.info("π Search filter: unread only") |
|
|
|
query = " ".join(query_parts) |
|
max_results = arguments.get("max_results", 10) |
|
|
|
logger.info(f"π Tool: search_emails | Combined query: '{query}' | Max: {max_results}") |
|
emails = self.gmail_client.get_emails(query=query, max_results=max_results) |
|
|
|
result = { |
|
"search_query": query, |
|
"search_filters": arguments, |
|
"total_results": len(emails), |
|
"emails": [] |
|
} |
|
|
|
for email in emails: |
|
result["emails"].append({ |
|
"id": email.id, |
|
"subject": email.subject, |
|
"sender": email.sender, |
|
"date": email.date, |
|
"snippet": email.snippet, |
|
"is_unread": email.is_unread |
|
}) |
|
|
|
logger.info(f"β
search_emails completed: {len(emails)} matching emails found") |
|
return [types.TextContent( |
|
type="text", |
|
text=json.dumps(result, indent=2) |
|
)] |
|
|
|
else: |
|
logger.error(f"β Unknown tool requested: {name}") |
|
raise ValueError(f"Unknown tool: {name}") |
|
|
|
except Exception as e: |
|
logger.error(f"β Error executing tool '{name}': {e}") |
|
raise |
|
|
|
async def main(): |
|
"""Main function to run the MCP server""" |
|
logger.info("=" * 60) |
|
logger.info("π Starting Gmail MCP Server...") |
|
logger.info("=" * 60) |
|
|
|
|
|
logger.info("π§ Gmail MCP Server v1.0.0") |
|
logger.info("π§ Server capabilities:") |
|
logger.info(" - Fetch emails from Gmail") |
|
logger.info(" - Search emails with queries") |
|
logger.info(" - Summarize recent emails") |
|
logger.info(" - Access inbox, unread, and recent emails") |
|
logger.info("") |
|
logger.info("π Required files:") |
|
logger.info(" - credentials.json (OAuth 2.0 credentials from Google)") |
|
logger.info(" - token.json (will be created after first auth)") |
|
logger.info("") |
|
logger.info("π Server is ready and waiting for MCP client connections...") |
|
logger.info(" Use this server with Claude Desktop, IDEs, or other MCP clients") |
|
logger.info("=" * 60) |
|
|
|
server_instance = GmailMCPServer() |
|
|
|
|
|
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): |
|
logger.info("π‘ MCP stdio server started - ready to accept connections") |
|
|
|
|
|
capabilities = { |
|
"resources": {"subscribe": False, "listChanged": False}, |
|
"tools": {"listChanged": False}, |
|
"logging": {} |
|
} |
|
|
|
await server_instance.server.run( |
|
read_stream, |
|
write_stream, |
|
InitializationOptions( |
|
server_name="gmail-mcp-server", |
|
server_version="1.0.0", |
|
capabilities=capabilities |
|
) |
|
) |
|
|
|
if __name__ == "__main__": |
|
try: |
|
asyncio.run(main()) |
|
except KeyboardInterrupt: |
|
logger.info("π Server stopped by user") |
|
except Exception as e: |
|
logger.error(f"β Server error: {e}") |
|
sys.exit(1) |