Gmail-AI-Agent / gmail_mcp_server.py
Kayani9178's picture
New Files added
1136fac verified
#!/usr/bin/env python3
"""
Gmail MCP Server with debug output - Modified version that shows startup info
"""
import asyncio
import json
import logging
import sys
import time
import os
from typing import Any, Dict, List, Optional, Sequence
from dataclasses import dataclass
from datetime import datetime, timedelta
import base64
import email
from email.mime.text import MIMEText
import re
# Gmail API imports
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# MCP imports
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
Resource,
Tool,
# TextContent,
# ImageContent,
# EmbeddedResource,
# LoggingLevel
)
import mcp.server.stdio
import mcp.types as types
# Gmail API scopes
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
# Configure logging to stderr so it doesn't interfere with MCP protocol
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger("gmail-mcp-server")
# Add performance tracking
class PerformanceTracker:
def __init__(self, operation_name: str):
self.operation_name = operation_name
self.start_time = None
def __enter__(self):
self.start_time = time.time()
logger.info(f"πŸš€ Starting operation: {self.operation_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration = time.time() - self.start_time
if exc_type is None:
logger.info(f"βœ… Completed operation: {self.operation_name} in {duration:.2f}s")
else:
logger.error(f"❌ Failed operation: {self.operation_name} after {duration:.2f}s - {exc_val}")
@dataclass
class EmailData:
"""Structure to hold email data"""
id: str
thread_id: str
subject: str
sender: str
recipient: str
date: str
body: str
snippet: str
labels: List[str]
is_unread: bool
class GmailClient:
"""Gmail API client wrapper"""
def __init__(self, credentials_file: str = "credentials.json", token_file: str = "token.json"):
self.credentials_file = credentials_file
self.token_file = token_file
self.service = None
self.authenticated_user = None
logger.info("πŸ”§ Initializing Gmail client...")
logger.info(f"πŸ“„ Using credentials file: {credentials_file}")
logger.info(f"πŸ”‘ Using token file: {token_file}")
self._authenticate()
def _authenticate(self):
"""Authenticate with Gmail API"""
with PerformanceTracker("Gmail Authentication"):
logger.info("πŸ” Starting Gmail authentication process...")
creds = None
# Check if token.json exists
if not os.path.exists(self.token_file):
logger.warning(f"⚠️ Token file '{self.token_file}' not found - checking if it was created by app.py")
# Load existing token if available
try:
creds = Credentials.from_authorized_user_file(self.token_file, SCOPES)
logger.info("πŸ“‚ Successfully loaded existing credentials from token file")
logger.info(f"πŸ” Token expiry: {creds.expiry}")
logger.info(f"πŸ”„ Has refresh token: {bool(creds.refresh_token)}")
except FileNotFoundError:
logger.warning(f"⚠️ Token file '{self.token_file}' not found - first time setup")
except Exception as e:
logger.error(f"❌ Error loading token file: {e}")
# If no valid credentials, get new ones
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
logger.info("πŸ”„ Credentials expired, attempting to refresh...")
try:
creds.refresh(Request())
logger.info("βœ… Successfully refreshed expired credentials")
except Exception as e:
logger.error(f"❌ Failed to refresh credentials: {e}")
creds = None
else:
logger.info("🌐 Getting new credentials via OAuth flow...")
try:
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials_file, SCOPES)
logger.info("πŸ”— Starting local OAuth server...")
creds = flow.run_local_server(port=0)
logger.info("βœ… OAuth flow completed successfully")
except FileNotFoundError:
logger.error(f"❌ Credentials file '{self.credentials_file}' not found!")
logger.error("πŸ“‹ Please download your OAuth 2.0 credentials from Google Cloud Console")
logger.error("πŸ”— Visit: https://console.cloud.google.com/apis/credentials")
raise
except Exception as e:
logger.error(f"❌ OAuth flow failed: {e}")
raise
# Save credentials for next run
try:
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
logger.info(f"πŸ’Ύ Saved credentials to {self.token_file} for future use")
except Exception as e:
logger.error(f"❌ Failed to save credentials: {e}")
try:
self.service = build('gmail', 'v1', credentials=creds)
# Get user profile info
profile = self.service.users().getProfile(userId='me').execute()
self.authenticated_user = profile.get('emailAddress', 'Unknown')
total_messages = profile.get('messagesTotal', 0)
logger.info("πŸŽ‰ Gmail authentication successful!")
logger.info(f"πŸ‘€ Authenticated user: {self.authenticated_user}")
logger.info(f"πŸ“Š Total messages in account: {total_messages:,}")
logger.info(f"πŸ”§ Gmail API version: v1")
except Exception as e:
logger.error(f"❌ Failed to build Gmail service: {e}")
raise
def _decode_message_part(self, part: Dict) -> str:
"""Decode email message part"""
try:
data = part['body'].get('data', '')
if data:
decoded = base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
return decoded
except Exception as e:
logger.debug(f"⚠️ Failed to decode message part: {e}")
return ''
def _extract_email_body(self, message: Dict) -> str:
"""Extract email body from message"""
body = ""
try:
if 'parts' in message['payload']:
logger.debug(f"πŸ“„ Processing multipart message with {len(message['payload']['parts'])} parts")
for part in message['payload']['parts']:
if part['mimeType'] == 'text/plain':
body += self._decode_message_part(part)
elif part['mimeType'] == 'text/html':
html_body = self._decode_message_part(part)
# Simple HTML to text conversion
body += re.sub('<.*?>', '', html_body)
else:
logger.debug(f"πŸ“„ Processing single-part message: {message['payload']['mimeType']}")
if message['payload']['mimeType'] in ['text/plain', 'text/html']:
body = self._decode_message_part(message['payload'])
if message['payload']['mimeType'] == 'text/html':
body = re.sub('<.*?>', '', body)
except Exception as e:
logger.warning(f"⚠️ Error extracting email body: {e}")
return body.strip()
def _get_header_value(self, headers: List[Dict], name: str) -> str:
"""Get header value by name"""
for header in headers:
if header['name'].lower() == name.lower():
return header['value']
return ''
def get_emails(self, query: str = '', max_results: int = 10) -> List[EmailData]:
"""Fetch emails from Gmail"""
with PerformanceTracker(f"Fetch emails (query='{query}', max={max_results})"):
logger.info(f"πŸ“₯ Fetching emails for user: {self.authenticated_user}")
logger.info(f"πŸ” Search query: '{query}' (empty = all emails)")
logger.info(f"πŸ“Š Max results: {max_results}")
try:
# Search for messages
search_start = time.time()
results = self.service.users().messages().list(
userId='me',
q=query,
maxResults=max_results
).execute()
search_time = time.time() - search_start
messages = results.get('messages', [])
total_found = results.get('resultSizeEstimate', 0)
logger.info(f"πŸ” Gmail search completed in {search_time:.2f}s")
logger.info(f"πŸ“Š Found {len(messages)} messages to process (estimated total: {total_found})")
if not messages:
logger.info("πŸ“­ No messages found matching the query")
return []
emails = []
fetch_start = time.time()
for i, msg in enumerate(messages, 1):
logger.debug(f"πŸ“¨ Processing message {i}/{len(messages)}: {msg['id']}")
try:
# Get full message details
message = self.service.users().messages().get(
userId='me',
id=msg['id'],
format='full'
).execute()
headers = message['payload'].get('headers', [])
subject = self._get_header_value(headers, 'Subject')
sender = self._get_header_value(headers, 'From')
email_data = EmailData(
id=message['id'],
thread_id=message['threadId'],
subject=subject,
sender=sender,
recipient=self._get_header_value(headers, 'To'),
date=self._get_header_value(headers, 'Date'),
body=self._extract_email_body(message),
snippet=message.get('snippet', ''),
labels=message.get('labelIds', []),
is_unread='UNREAD' in message.get('labelIds', [])
)
emails.append(email_data)
# Log email details
status = "πŸ”΄ UNREAD" if email_data.is_unread else "βœ… READ"
logger.debug(f" πŸ“§ {status} | From: {sender[:50]}... | Subject: {subject[:50]}...")
except Exception as e:
logger.error(f"❌ Error processing message {msg['id']}: {e}")
continue
fetch_time = time.time() - fetch_start
unread_count = sum(1 for email in emails if email.is_unread)
logger.info(f"βœ… Successfully processed {len(emails)} emails in {fetch_time:.2f}s")
logger.info(f"πŸ“Š Email breakdown: {unread_count} unread, {len(emails) - unread_count} read")
# Log sender summary
senders = {}
for email in emails:
sender = email.sender.split('<')[0].strip() if '<' in email.sender else email.sender
senders[sender] = senders.get(sender, 0) + 1
if senders:
top_senders = sorted(senders.items(), key=lambda x: x[1], reverse=True)[:3]
logger.info(f"πŸ‘₯ Top senders: {', '.join([f'{s}({c})' for s, c in top_senders])}")
return emails
except HttpError as error:
logger.error(f"❌ Gmail API HTTP error: {error}")
logger.error(f"πŸ“Š Error details: {error.error_details if hasattr(error, 'error_details') else 'No details'}")
raise
except Exception as error:
logger.error(f"❌ Unexpected error fetching emails: {error}")
raise
class GmailMCPServer:
"""MCP Server for Gmail integration"""
def __init__(self):
logger.info("πŸš€ Initializing Gmail MCP Server...")
self.server = Server("gmail-mcp-server")
self.gmail_client = None
self.client_connections = 0
self.total_requests = 0
self._setup_handlers()
logger.info("βœ… Gmail MCP Server initialized successfully!")
def _setup_handlers(self):
"""Setup MCP server handlers"""
logger.info("πŸ”§ Setting up MCP handlers...")
@self.server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""List available Gmail resources"""
self.total_requests += 1
logger.info(f"πŸ“‹ [REQUEST #{self.total_requests}] Handling list_resources request")
logger.info("πŸ“š Available resources: gmail://inbox, gmail://unread, gmail://recent")
resources = [
Resource(
uri="gmail://inbox",
name="Gmail Inbox",
description="Access to Gmail inbox messages",
mimeType="application/json"
),
Resource(
uri="gmail://unread",
name="Unread Emails",
description="Access to unread Gmail messages",
mimeType="application/json"
),
Resource(
uri="gmail://recent",
name="Recent Emails",
description="Access to recent Gmail messages",
mimeType="application/json"
)
]
logger.info(f"βœ… Returned {len(resources)} available resources")
return resources
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read Gmail resource"""
self.total_requests += 1
logger.info(f"πŸ“– [REQUEST #{self.total_requests}] Handling read_resource request")
logger.info(f"πŸ”— Resource URI: {uri}")
if not self.gmail_client:
logger.info("πŸ”§ Initializing Gmail client for first-time use")
self.gmail_client = GmailClient()
try:
if uri == "gmail://inbox":
logger.info("πŸ“₯ Fetching inbox messages...")
emails = self.gmail_client.get_emails(query="in:inbox", max_results=20)
elif uri == "gmail://unread":
logger.info("πŸ”΄ Fetching unread messages...")
emails = self.gmail_client.get_emails(query="is:unread", max_results=20)
elif uri == "gmail://recent":
logger.info("πŸ• Fetching recent messages (last 7 days)...")
emails = self.gmail_client.get_emails(query="newer_than:7d", max_results=20)
else:
logger.error(f"❌ Unknown resource URI: {uri}")
raise ValueError(f"Unknown resource URI: {uri}")
# Convert to JSON
emails_data = []
for email in emails:
emails_data.append({
"id": email.id,
"subject": email.subject,
"sender": email.sender,
"recipient": email.recipient,
"date": email.date,
"body": email.body[:1000] + "..." if len(email.body) > 1000 else email.body,
"snippet": email.snippet,
"is_unread": email.is_unread,
"labels": email.labels
})
result = json.dumps(emails_data, indent=2)
logger.info(f"βœ… Successfully processed resource {uri}")
logger.info(f"πŸ“Š Returned {len(emails_data)} emails ({len(result)} characters)")
return result
except Exception as e:
logger.error(f"❌ Error reading resource {uri}: {e}")
raise
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available Gmail tools"""
self.total_requests += 1
logger.info(f"πŸ› οΈ [REQUEST #{self.total_requests}] Handling list_tools request")
tools = [
Tool(
name="fetch_emails",
description="Fetch emails from Gmail with optional search query",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Gmail search query (optional)",
"default": ""
},
"max_results": {
"type": "integer",
"description": "Maximum number of emails to fetch",
"default": 10,
"minimum": 1,
"maximum": 50
}
}
}
),
Tool(
name="summarize_emails",
description="Get a summary of recent emails",
inputSchema={
"type": "object",
"properties": {
"days": {
"type": "integer",
"description": "Number of days to look back",
"default": 1,
"minimum": 1,
"maximum": 30
},
"include_body": {
"type": "boolean",
"description": "Include email body in summary",
"default": False
}
}
}
),
Tool(
name="search_emails",
description="Search emails with specific criteria",
inputSchema={
"type": "object",
"properties": {
"from_address": {
"type": "string",
"description": "Filter by sender email address"
},
"subject_contains": {
"type": "string",
"description": "Filter by subject containing text"
},
"has_attachment": {
"type": "boolean",
"description": "Filter emails with attachments"
},
"is_unread": {
"type": "boolean",
"description": "Filter unread emails only"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results",
"default": 10
}
}
}
)
]
logger.info(f"πŸ› οΈ Available tools: {[tool.name for tool in tools]}")
logger.info(f"βœ… Returned {len(tools)} available tools")
return tools
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Handle tool calls"""
self.total_requests += 1
logger.info(f"πŸ› οΈ [REQUEST #{self.total_requests}] Handling tool call: {name}")
logger.info(f"πŸ“‹ Tool arguments: {json.dumps(arguments, indent=2)}")
if not self.gmail_client:
logger.info("πŸ”§ Initializing Gmail client for tool call")
self.gmail_client = GmailClient()
try:
if name == "fetch_emails":
query = arguments.get("query", "")
max_results = arguments.get("max_results", 10)
logger.info(f"πŸ“₯ Tool: fetch_emails | Query: '{query}' | Max: {max_results}")
emails = self.gmail_client.get_emails(query=query, max_results=max_results)
result = {
"total_emails": len(emails),
"query_used": query,
"emails": []
}
for email in emails:
result["emails"].append({
"id": email.id,
"subject": email.subject,
"sender": email.sender,
"date": email.date,
"snippet": email.snippet,
"is_unread": email.is_unread,
"body_preview": email.body[:200] + "..." if len(email.body) > 200 else email.body
})
logger.info(f"βœ… fetch_emails completed: {len(emails)} emails returned")
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
elif name == "summarize_emails":
days = arguments.get("days", 1)
include_body = arguments.get("include_body", False)
logger.info(f"πŸ“Š Tool: summarize_emails | Days: {days} | Include body: {include_body}")
query = f"newer_than:{days}d"
emails = self.gmail_client.get_emails(query=query, max_results=50)
# Create summary
summary = {
"period": f"Last {days} days",
"total_emails": len(emails),
"unread_count": sum(1 for email in emails if email.is_unread),
"top_senders": {},
"subjects": []
}
# Count senders
for email in emails:
sender = email.sender.split('<')[0].strip() if '<' in email.sender else email.sender
summary["top_senders"][sender] = summary["top_senders"].get(sender, 0) + 1
# Get top 5 senders
summary["top_senders"] = dict(sorted(
summary["top_senders"].items(),
key=lambda x: x[1],
reverse=True
)[:5])
# Add subjects and bodies if requested
for email in emails[:10]: # Limit to 10 for summary
email_summary = {
"subject": email.subject,
"sender": email.sender,
"date": email.date,
"is_unread": email.is_unread
}
if include_body:
email_summary["body_preview"] = email.body[:300] + "..." if len(email.body) > 300 else email.body
summary["subjects"].append(email_summary)
logger.info(f"βœ… summarize_emails completed: {summary['total_emails']} emails, {summary['unread_count']} unread")
return [types.TextContent(
type="text",
text=json.dumps(summary, indent=2)
)]
elif name == "search_emails":
# Build search query
query_parts = []
if "from_address" in arguments and arguments["from_address"]:
query_parts.append(f"from:{arguments['from_address']}")
logger.info(f"πŸ” Search filter: from={arguments['from_address']}")
if "subject_contains" in arguments and arguments["subject_contains"]:
query_parts.append(f"subject:{arguments['subject_contains']}")
logger.info(f"πŸ” Search filter: subject contains '{arguments['subject_contains']}'")
if arguments.get("has_attachment"):
query_parts.append("has:attachment")
logger.info("πŸ” Search filter: has attachment")
if arguments.get("is_unread"):
query_parts.append("is:unread")
logger.info("πŸ” Search filter: unread only")
query = " ".join(query_parts)
max_results = arguments.get("max_results", 10)
logger.info(f"πŸ” Tool: search_emails | Combined query: '{query}' | Max: {max_results}")
emails = self.gmail_client.get_emails(query=query, max_results=max_results)
result = {
"search_query": query,
"search_filters": arguments,
"total_results": len(emails),
"emails": []
}
for email in emails:
result["emails"].append({
"id": email.id,
"subject": email.subject,
"sender": email.sender,
"date": email.date,
"snippet": email.snippet,
"is_unread": email.is_unread
})
logger.info(f"βœ… search_emails completed: {len(emails)} matching emails found")
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
else:
logger.error(f"❌ Unknown tool requested: {name}")
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"❌ Error executing tool '{name}': {e}")
raise
async def main():
"""Main function to run the MCP server"""
logger.info("=" * 60)
logger.info("πŸš€ Starting Gmail MCP Server...")
logger.info("=" * 60)
# Print startup information
logger.info("πŸ“§ Gmail MCP Server v1.0.0")
logger.info("πŸ”§ Server capabilities:")
logger.info(" - Fetch emails from Gmail")
logger.info(" - Search emails with queries")
logger.info(" - Summarize recent emails")
logger.info(" - Access inbox, unread, and recent emails")
logger.info("")
logger.info("πŸ“‹ Required files:")
logger.info(" - credentials.json (OAuth 2.0 credentials from Google)")
logger.info(" - token.json (will be created after first auth)")
logger.info("")
logger.info("πŸ”Œ Server is ready and waiting for MCP client connections...")
logger.info(" Use this server with Claude Desktop, IDEs, or other MCP clients")
logger.info("=" * 60)
server_instance = GmailMCPServer()
# Run the server using stdio transport
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("πŸ“‘ MCP stdio server started - ready to accept connections")
# Create basic capabilities
capabilities = {
"resources": {"subscribe": False, "listChanged": False},
"tools": {"listChanged": False},
"logging": {}
}
await server_instance.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="gmail-mcp-server",
server_version="1.0.0",
capabilities=capabilities
)
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("πŸ‘‹ Server stopped by user")
except Exception as e:
logger.error(f"❌ Server error: {e}")
sys.exit(1)