#!/usr/bin/env python3 """ Gmail MCP Server with debug output - Modified version that shows startup info """ import asyncio import json import logging import sys import time import os from typing import Any, Dict, List, Optional, Sequence from dataclasses import dataclass from datetime import datetime, timedelta import base64 import email from email.mime.text import MIMEText import re # Gmail API imports from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError # MCP imports from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.types import ( Resource, Tool, # TextContent, # ImageContent, # EmbeddedResource, # LoggingLevel ) import mcp.server.stdio import mcp.types as types # Gmail API scopes SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] # Configure logging to stderr so it doesn't interfere with MCP protocol logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stderr)] ) logger = logging.getLogger("gmail-mcp-server") # Add performance tracking class PerformanceTracker: def __init__(self, operation_name: str): self.operation_name = operation_name self.start_time = None def __enter__(self): self.start_time = time.time() logger.info(f"🚀 Starting operation: {self.operation_name}") return self def __exit__(self, exc_type, exc_val, exc_tb): duration = time.time() - self.start_time if exc_type is None: logger.info(f"✅ Completed operation: {self.operation_name} in {duration:.2f}s") else: logger.error(f"❌ Failed operation: {self.operation_name} after {duration:.2f}s - {exc_val}") @dataclass class EmailData: """Structure to hold email data""" id: str thread_id: str subject: str sender: str recipient: str date: str body: str snippet: str labels: List[str] is_unread: bool class GmailClient: """Gmail API client wrapper""" def __init__(self, credentials_file: str = "credentials.json", token_file: str = "token.json"): self.credentials_file = credentials_file self.token_file = token_file self.service = None self.authenticated_user = None logger.info("🔧 Initializing Gmail client...") logger.info(f"📄 Using credentials file: {credentials_file}") logger.info(f"🔑 Using token file: {token_file}") self._authenticate() def _authenticate(self): """Authenticate with Gmail API""" with PerformanceTracker("Gmail Authentication"): logger.info("🔐 Starting Gmail authentication process...") creds = None # Check if token.json exists if not os.path.exists(self.token_file): logger.warning(f"⚠️ Token file '{self.token_file}' not found - checking if it was created by app.py") # Load existing token if available try: creds = Credentials.from_authorized_user_file(self.token_file, SCOPES) logger.info("📂 Successfully loaded existing credentials from token file") logger.info(f"🔍 Token expiry: {creds.expiry}") logger.info(f"🔄 Has refresh token: {bool(creds.refresh_token)}") except FileNotFoundError: logger.warning(f"⚠️ Token file '{self.token_file}' not found - first time setup") except Exception as e: logger.error(f"❌ Error loading token file: {e}") # If no valid credentials, get new ones if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: logger.info("🔄 Credentials expired, attempting to refresh...") try: creds.refresh(Request()) logger.info("✅ Successfully refreshed expired credentials") except Exception as e: logger.error(f"❌ Failed to refresh credentials: {e}") creds = None else: logger.info("🌐 Getting new credentials via OAuth flow...") try: flow = InstalledAppFlow.from_client_secrets_file( self.credentials_file, SCOPES) logger.info("🔗 Starting local OAuth server...") creds = flow.run_local_server(port=0) logger.info("✅ OAuth flow completed successfully") except FileNotFoundError: logger.error(f"❌ Credentials file '{self.credentials_file}' not found!") logger.error("📋 Please download your OAuth 2.0 credentials from Google Cloud Console") logger.error("🔗 Visit: https://console.cloud.google.com/apis/credentials") raise except Exception as e: logger.error(f"❌ OAuth flow failed: {e}") raise # Save credentials for next run try: with open(self.token_file, 'w') as token: token.write(creds.to_json()) logger.info(f"💾 Saved credentials to {self.token_file} for future use") except Exception as e: logger.error(f"❌ Failed to save credentials: {e}") try: self.service = build('gmail', 'v1', credentials=creds) # Get user profile info profile = self.service.users().getProfile(userId='me').execute() self.authenticated_user = profile.get('emailAddress', 'Unknown') total_messages = profile.get('messagesTotal', 0) logger.info("🎉 Gmail authentication successful!") logger.info(f"👤 Authenticated user: {self.authenticated_user}") logger.info(f"📊 Total messages in account: {total_messages:,}") logger.info(f"🔧 Gmail API version: v1") except Exception as e: logger.error(f"❌ Failed to build Gmail service: {e}") raise def _decode_message_part(self, part: Dict) -> str: """Decode email message part""" try: data = part['body'].get('data', '') if data: decoded = base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore') return decoded except Exception as e: logger.debug(f"⚠️ Failed to decode message part: {e}") return '' def _extract_email_body(self, message: Dict) -> str: """Extract email body from message""" body = "" try: if 'parts' in message['payload']: logger.debug(f"📄 Processing multipart message with {len(message['payload']['parts'])} parts") for part in message['payload']['parts']: if part['mimeType'] == 'text/plain': body += self._decode_message_part(part) elif part['mimeType'] == 'text/html': html_body = self._decode_message_part(part) # Simple HTML to text conversion body += re.sub('<.*?>', '', html_body) else: logger.debug(f"📄 Processing single-part message: {message['payload']['mimeType']}") if message['payload']['mimeType'] in ['text/plain', 'text/html']: body = self._decode_message_part(message['payload']) if message['payload']['mimeType'] == 'text/html': body = re.sub('<.*?>', '', body) except Exception as e: logger.warning(f"⚠️ Error extracting email body: {e}") return body.strip() def _get_header_value(self, headers: List[Dict], name: str) -> str: """Get header value by name""" for header in headers: if header['name'].lower() == name.lower(): return header['value'] return '' def get_emails(self, query: str = '', max_results: int = 10) -> List[EmailData]: """Fetch emails from Gmail""" with PerformanceTracker(f"Fetch emails (query='{query}', max={max_results})"): logger.info(f"📥 Fetching emails for user: {self.authenticated_user}") logger.info(f"🔍 Search query: '{query}' (empty = all emails)") logger.info(f"📊 Max results: {max_results}") try: # Search for messages search_start = time.time() results = self.service.users().messages().list( userId='me', q=query, maxResults=max_results ).execute() search_time = time.time() - search_start messages = results.get('messages', []) total_found = results.get('resultSizeEstimate', 0) logger.info(f"🔍 Gmail search completed in {search_time:.2f}s") logger.info(f"📊 Found {len(messages)} messages to process (estimated total: {total_found})") if not messages: logger.info("📭 No messages found matching the query") return [] emails = [] fetch_start = time.time() for i, msg in enumerate(messages, 1): logger.debug(f"📨 Processing message {i}/{len(messages)}: {msg['id']}") try: # Get full message details message = self.service.users().messages().get( userId='me', id=msg['id'], format='full' ).execute() headers = message['payload'].get('headers', []) subject = self._get_header_value(headers, 'Subject') sender = self._get_header_value(headers, 'From') email_data = EmailData( id=message['id'], thread_id=message['threadId'], subject=subject, sender=sender, recipient=self._get_header_value(headers, 'To'), date=self._get_header_value(headers, 'Date'), body=self._extract_email_body(message), snippet=message.get('snippet', ''), labels=message.get('labelIds', []), is_unread='UNREAD' in message.get('labelIds', []) ) emails.append(email_data) # Log email details status = "🔴 UNREAD" if email_data.is_unread else "✅ READ" logger.debug(f" 📧 {status} | From: {sender[:50]}... | Subject: {subject[:50]}...") except Exception as e: logger.error(f"❌ Error processing message {msg['id']}: {e}") continue fetch_time = time.time() - fetch_start unread_count = sum(1 for email in emails if email.is_unread) logger.info(f"✅ Successfully processed {len(emails)} emails in {fetch_time:.2f}s") logger.info(f"📊 Email breakdown: {unread_count} unread, {len(emails) - unread_count} read") # Log sender summary senders = {} for email in emails: sender = email.sender.split('<')[0].strip() if '<' in email.sender else email.sender senders[sender] = senders.get(sender, 0) + 1 if senders: top_senders = sorted(senders.items(), key=lambda x: x[1], reverse=True)[:3] logger.info(f"👥 Top senders: {', '.join([f'{s}({c})' for s, c in top_senders])}") return emails except HttpError as error: logger.error(f"❌ Gmail API HTTP error: {error}") logger.error(f"📊 Error details: {error.error_details if hasattr(error, 'error_details') else 'No details'}") raise except Exception as error: logger.error(f"❌ Unexpected error fetching emails: {error}") raise class GmailMCPServer: """MCP Server for Gmail integration""" def __init__(self): logger.info("🚀 Initializing Gmail MCP Server...") self.server = Server("gmail-mcp-server") self.gmail_client = None self.client_connections = 0 self.total_requests = 0 self._setup_handlers() logger.info("✅ Gmail MCP Server initialized successfully!") def _setup_handlers(self): """Setup MCP server handlers""" logger.info("🔧 Setting up MCP handlers...") @self.server.list_resources() async def handle_list_resources() -> List[Resource]: """List available Gmail resources""" self.total_requests += 1 logger.info(f"📋 [REQUEST #{self.total_requests}] Handling list_resources request") logger.info("📚 Available resources: gmail://inbox, gmail://unread, gmail://recent") resources = [ Resource( uri="gmail://inbox", name="Gmail Inbox", description="Access to Gmail inbox messages", mimeType="application/json" ), Resource( uri="gmail://unread", name="Unread Emails", description="Access to unread Gmail messages", mimeType="application/json" ), Resource( uri="gmail://recent", name="Recent Emails", description="Access to recent Gmail messages", mimeType="application/json" ) ] logger.info(f"✅ Returned {len(resources)} available resources") return resources @self.server.read_resource() async def handle_read_resource(uri: str) -> str: """Read Gmail resource""" self.total_requests += 1 logger.info(f"📖 [REQUEST #{self.total_requests}] Handling read_resource request") logger.info(f"🔗 Resource URI: {uri}") if not self.gmail_client: logger.info("🔧 Initializing Gmail client for first-time use") self.gmail_client = GmailClient() try: if uri == "gmail://inbox": logger.info("📥 Fetching inbox messages...") emails = self.gmail_client.get_emails(query="in:inbox", max_results=20) elif uri == "gmail://unread": logger.info("🔴 Fetching unread messages...") emails = self.gmail_client.get_emails(query="is:unread", max_results=20) elif uri == "gmail://recent": logger.info("🕐 Fetching recent messages (last 7 days)...") emails = self.gmail_client.get_emails(query="newer_than:7d", max_results=20) else: logger.error(f"❌ Unknown resource URI: {uri}") raise ValueError(f"Unknown resource URI: {uri}") # Convert to JSON emails_data = [] for email in emails: emails_data.append({ "id": email.id, "subject": email.subject, "sender": email.sender, "recipient": email.recipient, "date": email.date, "body": email.body[:1000] + "..." if len(email.body) > 1000 else email.body, "snippet": email.snippet, "is_unread": email.is_unread, "labels": email.labels }) result = json.dumps(emails_data, indent=2) logger.info(f"✅ Successfully processed resource {uri}") logger.info(f"📊 Returned {len(emails_data)} emails ({len(result)} characters)") return result except Exception as e: logger.error(f"❌ Error reading resource {uri}: {e}") raise @self.server.list_tools() async def handle_list_tools() -> List[Tool]: """List available Gmail tools""" self.total_requests += 1 logger.info(f"🛠️ [REQUEST #{self.total_requests}] Handling list_tools request") tools = [ Tool( name="fetch_emails", description="Fetch emails from Gmail with optional search query", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Gmail search query (optional)", "default": "" }, "max_results": { "type": "integer", "description": "Maximum number of emails to fetch", "default": 10, "minimum": 1, "maximum": 50 } } } ), Tool( name="summarize_emails", description="Get a summary of recent emails", inputSchema={ "type": "object", "properties": { "days": { "type": "integer", "description": "Number of days to look back", "default": 1, "minimum": 1, "maximum": 30 }, "include_body": { "type": "boolean", "description": "Include email body in summary", "default": False } } } ), Tool( name="search_emails", description="Search emails with specific criteria", inputSchema={ "type": "object", "properties": { "from_address": { "type": "string", "description": "Filter by sender email address" }, "subject_contains": { "type": "string", "description": "Filter by subject containing text" }, "has_attachment": { "type": "boolean", "description": "Filter emails with attachments" }, "is_unread": { "type": "boolean", "description": "Filter unread emails only" }, "max_results": { "type": "integer", "description": "Maximum number of results", "default": 10 } } } ) ] logger.info(f"🛠️ Available tools: {[tool.name for tool in tools]}") logger.info(f"✅ Returned {len(tools)} available tools") return tools @self.server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: """Handle tool calls""" self.total_requests += 1 logger.info(f"🛠️ [REQUEST #{self.total_requests}] Handling tool call: {name}") logger.info(f"📋 Tool arguments: {json.dumps(arguments, indent=2)}") if not self.gmail_client: logger.info("🔧 Initializing Gmail client for tool call") self.gmail_client = GmailClient() try: if name == "fetch_emails": query = arguments.get("query", "") max_results = arguments.get("max_results", 10) logger.info(f"📥 Tool: fetch_emails | Query: '{query}' | Max: {max_results}") emails = self.gmail_client.get_emails(query=query, max_results=max_results) result = { "total_emails": len(emails), "query_used": query, "emails": [] } for email in emails: result["emails"].append({ "id": email.id, "subject": email.subject, "sender": email.sender, "date": email.date, "snippet": email.snippet, "is_unread": email.is_unread, "body_preview": email.body[:200] + "..." if len(email.body) > 200 else email.body }) logger.info(f"✅ fetch_emails completed: {len(emails)} emails returned") return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "summarize_emails": days = arguments.get("days", 1) include_body = arguments.get("include_body", False) logger.info(f"📊 Tool: summarize_emails | Days: {days} | Include body: {include_body}") query = f"newer_than:{days}d" emails = self.gmail_client.get_emails(query=query, max_results=50) # Create summary summary = { "period": f"Last {days} days", "total_emails": len(emails), "unread_count": sum(1 for email in emails if email.is_unread), "top_senders": {}, "subjects": [] } # Count senders for email in emails: sender = email.sender.split('<')[0].strip() if '<' in email.sender else email.sender summary["top_senders"][sender] = summary["top_senders"].get(sender, 0) + 1 # Get top 5 senders summary["top_senders"] = dict(sorted( summary["top_senders"].items(), key=lambda x: x[1], reverse=True )[:5]) # Add subjects and bodies if requested for email in emails[:10]: # Limit to 10 for summary email_summary = { "subject": email.subject, "sender": email.sender, "date": email.date, "is_unread": email.is_unread } if include_body: email_summary["body_preview"] = email.body[:300] + "..." if len(email.body) > 300 else email.body summary["subjects"].append(email_summary) logger.info(f"✅ summarize_emails completed: {summary['total_emails']} emails, {summary['unread_count']} unread") return [types.TextContent( type="text", text=json.dumps(summary, indent=2) )] elif name == "search_emails": # Build search query query_parts = [] if "from_address" in arguments and arguments["from_address"]: query_parts.append(f"from:{arguments['from_address']}") logger.info(f"🔍 Search filter: from={arguments['from_address']}") if "subject_contains" in arguments and arguments["subject_contains"]: query_parts.append(f"subject:{arguments['subject_contains']}") logger.info(f"🔍 Search filter: subject contains '{arguments['subject_contains']}'") if arguments.get("has_attachment"): query_parts.append("has:attachment") logger.info("🔍 Search filter: has attachment") if arguments.get("is_unread"): query_parts.append("is:unread") logger.info("🔍 Search filter: unread only") query = " ".join(query_parts) max_results = arguments.get("max_results", 10) logger.info(f"🔍 Tool: search_emails | Combined query: '{query}' | Max: {max_results}") emails = self.gmail_client.get_emails(query=query, max_results=max_results) result = { "search_query": query, "search_filters": arguments, "total_results": len(emails), "emails": [] } for email in emails: result["emails"].append({ "id": email.id, "subject": email.subject, "sender": email.sender, "date": email.date, "snippet": email.snippet, "is_unread": email.is_unread }) logger.info(f"✅ search_emails completed: {len(emails)} matching emails found") return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] else: logger.error(f"❌ Unknown tool requested: {name}") raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"❌ Error executing tool '{name}': {e}") raise async def main(): """Main function to run the MCP server""" logger.info("=" * 60) logger.info("🚀 Starting Gmail MCP Server...") logger.info("=" * 60) # Print startup information logger.info("📧 Gmail MCP Server v1.0.0") logger.info("🔧 Server capabilities:") logger.info(" - Fetch emails from Gmail") logger.info(" - Search emails with queries") logger.info(" - Summarize recent emails") logger.info(" - Access inbox, unread, and recent emails") logger.info("") logger.info("📋 Required files:") logger.info(" - credentials.json (OAuth 2.0 credentials from Google)") logger.info(" - token.json (will be created after first auth)") logger.info("") logger.info("🔌 Server is ready and waiting for MCP client connections...") logger.info(" Use this server with Claude Desktop, IDEs, or other MCP clients") logger.info("=" * 60) server_instance = GmailMCPServer() # Run the server using stdio transport async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): logger.info("📡 MCP stdio server started - ready to accept connections") # Create basic capabilities capabilities = { "resources": {"subscribe": False, "listChanged": False}, "tools": {"listChanged": False}, "logging": {} } await server_instance.server.run( read_stream, write_stream, InitializationOptions( server_name="gmail-mcp-server", server_version="1.0.0", capabilities=capabilities ) ) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("👋 Server stopped by user") except Exception as e: logger.error(f"❌ Server error: {e}") sys.exit(1)