#!/usr/bin/env python3 # Updated for Hugging Face Space compatibility """ Enhanced Track 3: Gmail AI Agent with Advanced Behaviors Sophisticated agent decision-making and workflow automation """ # ⚠️ IMPORTANT NOTICE ⚠️ # On first run, the app may take 120-200 seconds to respond to queries # This is because the Qwen LLM is hosted on Modal and requires time for cold start # Subsequent requests will be much faster after the initial cold start # Please be patient during the initial interaction with the AI assistant import gradio as gr import json import logging import os from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass import pandas as pd from datetime import datetime, timedelta import re import requests # Added for Modal API requests import time # Added for retry logic from collections import Counter # Import Google API libraries for authentication from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow # Import existing modules from dotenv import load_dotenv load_dotenv() from mcp_client import GmailMCPClientSync # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("enhanced-gmail-agent") # Get environment variables from Hugging Face Secrets GMAIL_CLIENT_ID = os.getenv("GMAIL_CLIENT_ID") GMAIL_CLIENT_SECRET = os.getenv("GMAIL_CLIENT_SECRET") GMAIL_TOKEN_JSON = os.getenv("GMAIL_TOKEN_JSON") MODAL_API_URL = os.getenv("MODAL_API_URL") # Log environment variable status (without revealing values) logger.info(f"GMAIL_CLIENT_ID available: {GMAIL_CLIENT_ID is not None}") logger.info(f"GMAIL_CLIENT_SECRET available: {GMAIL_CLIENT_SECRET is not None}") logger.info(f"GMAIL_TOKEN_JSON available: {GMAIL_TOKEN_JSON is not None}") logger.info(f"MODAL_API_URL available: {MODAL_API_URL is not None}") # Create credentials.json from environment variables if not present if GMAIL_CLIENT_ID and GMAIL_CLIENT_SECRET: credentials_data = { "installed": { "client_id": GMAIL_CLIENT_ID, "client_secret": GMAIL_CLIENT_SECRET, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "redirect_uris": ["urn:ietf:wg:oauth:2.0:oob", "http://localhost"] } } # Write credentials.json file for Gmail MCP server with open("credentials.json", "w") as f: json.dump(credentials_data, f) logger.info("Created credentials.json from environment variables") # Create token.json from environment variable if not present if GMAIL_TOKEN_JSON: with open("token.json", "w") as f: f.write(GMAIL_TOKEN_JSON) logger.info("Created token.json from environment variable") class QwenClient: """Client for the Modal-hosted Qwen model - Updated to properly handle thinking model""" def __init__(self, api_url: str = None): self.api_url = api_url self.first_request = True logger.info(f"Initializing QwenClient with API URL: {self.api_url}") def _strip_thinking_tags(self, text: str) -> str: """Strip sections from the response to get clean output""" import re # Find and remove content between and or end of string return re.sub(r'.*?(?:|$)', '', text, flags=re.DOTALL).strip() def generate_content(self, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, strip_thinking: bool = True, retries: int = 3, retry_delay: float = 2.0, timeout: float = 180.0) -> Any: """Generate content using the Modal-hosted Qwen model with proper thinking model support""" # Check if prompt is too long - truncate if needed if len(prompt) > 4000: logger.warning(f"Prompt too long ({len(prompt)} chars), truncating to 4000 chars") prompt = prompt[:4000] + "... [truncated]" # Show extra message for first request if self.first_request: logger.info("⚠️ First request might take longer due to cold start...") self.first_request = False for attempt in range(retries): try: payload = { "message": prompt, "max_tokens": max_tokens, "temperature": temperature, "top_p": 0.9, "strip_thinking": strip_thinking } logger.info(f"Sending request to Qwen API (attempt {attempt+1}/{retries}) with prompt length: {len(prompt)}") start_time = time.time() response = requests.post( self.api_url, json=payload, headers={"Content-Type": "application/json"}, timeout=timeout ) duration = time.time() - start_time logger.info(f"⏱️ Response received in {duration:.1f} seconds") # Check for successful response response.raise_for_status() result = response.json() response_text = result.get("response", "") logger.info(f"Received response from Qwen API: {len(response_text)} chars") # Only return if we actually got content if response_text and len(response_text.strip()) > 0: # Create a response object that mimics Gemini's response format class QwenResponse: def __init__(self, text): self.text = text # If strip_thinking was False, we already got clean response from API # If strip_thinking was True, we also got clean response from API # But let's double-check and clean locally if needed if not strip_thinking and '' in response_text: # User wants to see thinking, so keep everything final_text = response_text else: # Clean any remaining thinking tags final_text = self._strip_thinking_tags(response_text) # If after cleaning we have very little content, try to extract from original if len(final_text.strip()) < 10 and '' in response_text: # Extract content after tag parts = response_text.split('') if len(parts) > 1: final_text = parts[-1].strip() else: # No closing tag, take content after last > final_text = response_text.split('>')[-1].strip() # Log token usage if available if 'tokens_used' in result: logger.info(f"📊 Token usage: {result.get('input_tokens', 'N/A')} input, {result['tokens_used']} output") return QwenResponse(final_text) else: logger.warning(f"Empty response received from Qwen API (attempt {attempt+1}/{retries})") if attempt < retries - 1: logger.info(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) except requests.exceptions.Timeout: logger.error(f"Timeout calling Qwen API (attempt {attempt+1}/{retries}) - request took too long") if attempt < retries - 1: next_delay = retry_delay * (attempt + 1) logger.info(f"Retrying in {next_delay} seconds...") time.sleep(next_delay) else: # Return a helpful fallback response on last attempt class QwenResponse: def __init__(self, text): self.text = text return QwenResponse("I apologize, but I'm having difficulty accessing enhanced AI capabilities right now due to timeout. Please try again later when the service may be more responsive.") except requests.RequestException as e: logger.error(f"Error calling Qwen API (attempt {attempt+1}/{retries}): {str(e)}") if hasattr(e, 'response') and e.response is not None: logger.error(f"Response: {e.response.text}") if attempt < retries - 1: next_delay = retry_delay * (attempt + 1) logger.info(f"Retrying in {next_delay} seconds...") time.sleep(next_delay) logger.error("All retry attempts failed for Qwen API request") # Create a fallback response when all attempts fail class QwenResponse: def __init__(self, text): self.text = text return QwenResponse("I apologize, but I can't access my advanced AI capabilities right now. Let me provide a simplified response based on your request. You might want to try again later when the network connection to the AI service is more stable.") @dataclass class AgentMemory: """Memory system for the agent to track context and user patterns""" user_preferences: Dict[str, Any] conversation_context: List[Dict[str, Any]] email_patterns: Dict[str, Any] workflow_history: List[Dict[str, Any]] class EnhancedGmailAgent: """Enhanced Gmail AI Agent with sophisticated behaviors""" def __init__(self, mcp_server_path: str = "gmail_mcp_server.py", modal_api_url: str = None): self.mcp_client = GmailMCPClientSync(mcp_server_path) self.memory = AgentMemory( user_preferences={}, conversation_context=[], email_patterns={}, workflow_history=[] ) # Initialize Qwen model - Use environment variable if no URL provided self.modal_api_url = modal_api_url or MODAL_API_URL try: logger.info("Initializing Qwen model client from Modal") if not self.modal_api_url: logger.warning("No Modal API URL provided, Qwen functionality will be limited") self.model = None self.model_status = "Not configured" else: self.model = QwenClient(api_url=self.modal_api_url) logger.info("Qwen model client initialized successfully") self.model_status = "Initialized" except Exception as e: logger.error(f"Error initializing Qwen model: {str(e)}") self.model = None self.model_status = "Error during initialization" # Initialize agent state self.last_analysis = {} self.active_workflows = [] def intelligent_email_triage(self, max_emails: int = 20) -> Dict[str, Any]: """ Sophisticated email triage with AI-powered categorization and priority scoring - SINGLE API CALL VERSION """ logger.info("🧠 Starting intelligent email triage...") try: # Fetch recent emails emails = self.mcp_client.fetch_emails(query="newer_than:3d", max_results=max_emails) if not emails or 'emails' not in emails: return {"error": "No emails to analyze"} email_list = emails['emails'] logger.info(f"Fetched {len(email_list)} emails for analysis") # Categorize and prioritize emails using AI - SINGLE API CALL triage_results = { "high_priority": [], "meetings_and_calls": [], "action_required": [], "newsletters_and_updates": [], "personal": [], "low_priority": [], "analysis_summary": "", "recommendations": [] } if self.model and len(email_list) > 0: logger.info(f"Processing ALL {len(email_list)} emails in SINGLE API call") # Create a comprehensive prompt for ALL emails at once email_summaries = [] for i, email in enumerate(email_list): # Create compact representations of emails subject = email.get('subject', 'No subject')[:80] # Limit subject length sender = self._clean_sender(email.get('sender', 'Unknown'))[:50] # Limit sender length snippet = email.get('snippet', 'No preview')[:60] # Limit snippet length is_unread = email.get('is_unread', False) email_summaries.append(f"Email {i+1}: Subject=\"{subject}\" | From={sender} | Preview=\"{snippet}\" | Unread={is_unread}") # Create the master prompt for all emails - IMPROVED FOR BETTER ANALYSIS master_prompt = f"""You are an expert email analyst and productivity coach. Analyze ALL {len(email_list)} emails below and provide detailed, actionable insights. EMAILS TO ANALYZE: {chr(10).join(email_summaries)} TASK: For each email, determine: 1. Category (select exactly one): - high_priority: Urgent emails needing immediate attention (deadlines, emergencies) - meetings_and_calls: Calendar items, meeting invites, calls - action_required: Emails requiring specific action but not urgent - newsletters_and_updates: Marketing, newsletters, product updates - personal: Personal communications - low_priority: Everything else with minimal importance 2. Priority score: 0.0 to 1.0 (0.0 = lowest, 1.0 = highest priority) - 0.9-1.0: Critical/urgent - requires immediate attention - 0.7-0.8: Important - handle today - 0.5-0.6: Moderate - handle within 48 hours - 0.3-0.4: Low - handle when convenient - 0.1-0.2: Very low - can be archived or ignored 3. Detailed reasoning: Explain WHY you categorized this way (deadline mentions, sender importance, action words, etc.) RESPONSE FORMAT - Respond with ONLY this JSON array (no extra text): [ {{ "email_num": 1, "category": "category_name", "priority_score": 0.8, "reasoning": "Detailed reason with specific insights from email content" }}, {{ "email_num": 2, "category": "category_name", "priority_score": 0.6, "reasoning": "Detailed reason with specific insights from email content" }}, ...continue for all {len(email_list)} emails... ] IMPORTANT ANALYSIS GUIDELINES: - Look for urgency indicators: words like "urgent", "ASAP", "deadline", "today", "overdue" - Consider sender importance: work contacts vs marketing emails - Identify action verbs: "confirm", "review", "approve", "respond", "complete" - Check for meeting details: times, dates, calendar invites - Detect personal communication markers: friendly tone, personal questions - Evaluate if the email requires a response or action - Consider unread status as potentially more important Respond with ONLY the JSON array - no introduction, explanation or additional text. """ try: # Single API call for all emails logger.info(f"Sending SINGLE request to Qwen API for ALL {len(email_list)} emails") response = self.model.generate_content( master_prompt, max_tokens=2048, # Increased for longer response temperature=0.2, # Lower temperature for more consistent JSON strip_thinking=True, # We want clean JSON output timeout=240.0 # Longer timeout for processing many emails ) if response and hasattr(response, 'text') and response.text: response_text = response.text.strip() logger.info(f"Received comprehensive response: {len(response_text)} chars") # Extract JSON from response json_start = response_text.find('[') json_end = response_text.rfind(']') + 1 if json_start >= 0 and json_end > json_start: try: json_text = response_text[json_start:json_end] results = json.loads(json_text) logger.info(f"Successfully parsed JSON with {len(results)} email analyses") # Process results and assign to categories valid_categories = ['high_priority', 'meetings_and_calls', 'action_required', 'newsletters_and_updates', 'personal', 'low_priority'] for result in results: try: email_num = int(result.get('email_num', 1)) - 1 # Convert to 0-based index if 0 <= email_num < len(email_list): email = email_list[email_num] # Validate and clean category category = result.get('category', '').lower() if category not in valid_categories: category = 'newsletters_and_updates' # Default fallback # Validate priority score try: priority_score = float(result.get('priority_score', 0.5)) if not (0.0 <= priority_score <= 1.0): priority_score = 0.5 except: priority_score = 0.5 reasoning = result.get('reasoning', 'AI analysis completed') # Create enhanced email data email_analysis = { **email, "priority_score": priority_score, "category": category, "ai_reasoning": reasoning, "suggested_actions": self._suggest_email_actions(email, category, priority_score) } triage_results[category].append(email_analysis) logger.info(f"✅ Email {email_num+1}: '{email.get('subject', '')[:40]}...' → {category} (score: {priority_score})") except Exception as e: logger.warning(f"Error processing email result {result}: {str(e)}") # Use fallback for this email if 0 <= email_num < len(email_list): self._apply_fallback_categorization(email_list[email_num], triage_results) # Check if we processed all emails, add fallback for any missing processed_count = sum(len(emails) for category, emails in triage_results.items() if category not in ['analysis_summary', 'recommendations']) if processed_count < len(email_list): logger.warning(f"Only processed {processed_count}/{len(email_list)} emails, using fallback for remaining") # Add fallback for unprocessed emails for i, email in enumerate(email_list): # Check if this email was already processed email_found = False for category in ['high_priority', 'meetings_and_calls', 'action_required', 'newsletters_and_updates', 'personal', 'low_priority']: if any(e.get('id') == email.get('id') for e in triage_results[category]): email_found = True break if not email_found: logger.info(f"Adding fallback categorization for email {i+1}") self._apply_fallback_categorization(email, triage_results) logger.info(f"🎉 Successfully analyzed {len(email_list)} emails with AI in single API call!") except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON from AI response: {e}") logger.warning(f"Response was: {response_text[:500]}...") # Use fallback for all emails for email in email_list: self._apply_fallback_categorization(email, triage_results) else: logger.warning("Could not find valid JSON array in AI response") logger.warning(f"Response was: {response_text[:500]}...") # Use fallback for all emails for email in email_list: self._apply_fallback_categorization(email, triage_results) else: logger.warning("Empty or invalid response from AI") # Use fallback for all emails for email in email_list: self._apply_fallback_categorization(email, triage_results) except Exception as e: logger.error(f"Error in AI analysis: {str(e)}") # Use fallback for all emails for email in email_list: self._apply_fallback_categorization(email, triage_results) else: # No AI model available, use fallback for all emails logger.info("No AI model available, using fallback categorization for all emails") for email in email_list: self._apply_fallback_categorization(email, triage_results) # Generate intelligent summary and recommendations triage_results["analysis_summary"] = self._generate_triage_summary(triage_results) triage_results["recommendations"] = self._generate_workflow_recommendations(triage_results) # Update agent memory self._update_email_patterns(email_list) # Format results for beautiful display logger.info("🎨 Formatting triage results for display") formatted_output = self._display_triage_results(triage_results) # Return both raw results and formatted display triage_results["formatted_display"] = formatted_output return triage_results except Exception as e: logger.error(f"Error in intelligent triage: {e}") return {"error": str(e)} def _apply_fallback_categorization(self, email: Dict, triage_results: Dict): """Apply fallback categorization and add to triage results""" category, priority_score, reasoning = self._fallback_categorization(email) email_analysis = { **email, "priority_score": priority_score, "category": category, "ai_reasoning": reasoning, "suggested_actions": self._suggest_email_actions(email, category, priority_score) } triage_results[category].append(email_analysis) def _analyze_email_batch_with_ai(self, emails: List[Dict]) -> List[Tuple[str, float, str]]: """Use AI to analyze a batch of emails at once""" if not self.model or not emails: return [self._fallback_categorization(email) for email in emails] try: # Create a more efficient batch prompt for analyzing multiple emails at once # Simplify and shorten the prompt to prevent timeouts email_prompts = [] for i, email in enumerate(emails): # Limit snippet length to avoid timeouts snippet = email.get('snippet', 'No preview') snippet = snippet[:50] if snippet else 'No preview' # Simpler prompt format email_prompts.append(f"""Email #{i+1}: Subject: "{email.get('subject', 'No subject')}" | From: {email.get('sender', 'Unknown')} | Preview: "{snippet}" | Unread: {email.get('is_unread', False)}""") batch_prompt = f""" You are an email categorization expert. Analyze each email below and categorize them. Respond ONLY with a JSON array containing one object per email. Emails to analyze: {chr(10).join(email_prompts)} Categories: high_priority, meetings_and_calls, action_required, newsletters_and_updates, personal, low_priority Response format (JSON array only): [ {{"category": "category_name", "priority_score": 0.1_to_1.0, "reasoning": "brief reason"}} ] """ # Attempt to get response with increased timeout for batch processing response = self.model.generate_content( batch_prompt, max_tokens=512, # Reduced token count to speed up generation temperature=0.2, # Lower temperature for more deterministic results timeout=120.0 ) # Validate response exists and has content if not response or not hasattr(response, 'text') or not response.text: logger.warning("Empty response from Qwen model for batch analysis") return [self._fallback_categorization(email) for email in emails] response_text = response.text.strip() logger.debug(f"Qwen batch response: {response_text}") # Try to extract JSON from response (in case there's extra text) json_start = response_text.find('[') json_end = response_text.rfind(']') + 1 if json_start == -1 or json_end == 0: logger.warning("No valid JSON array found in batch response") return [self._fallback_categorization(email) for email in emails] json_text = response_text[json_start:json_end] try: # Parse JSON with validation results = json.loads(json_text) if not isinstance(results, list) or len(results) == 0: logger.warning("JSON response is not a valid array") return [self._fallback_categorization(email) for email in emails] valid_categories = [ 'high_priority', 'meetings_and_calls', 'action_required', 'newsletters_and_updates', 'personal', 'low_priority' ] # Process and validate each result processed_results = [] for i, result in enumerate(results): if i >= len(emails): # Skip extra results break # Validate required fields if not all(key in result for key in ['category', 'priority_score', 'reasoning']): logger.warning(f"Missing required fields in JSON for email #{i+1}") processed_results.append(self._fallback_categorization(emails[i])) continue category = result['category'] if category not in valid_categories: logger.warning(f"Invalid category '{category}' returned for email #{i+1}") processed_results.append(self._fallback_categorization(emails[i])) continue # Validate priority score is a number between 0 and 1 try: priority_score = float(result['priority_score']) if not 0.0 <= priority_score <= 1.0: priority_score = 0.5 # Default if out of range except (ValueError, TypeError): priority_score = 0.5 reasoning = str(result.get('reasoning', 'AI analysis completed')) logger.info(f"AI categorized email '{emails[i].get('subject', '')[:50]}...' as {category} (score: {priority_score})") processed_results.append((category, priority_score, reasoning)) # Fill in any missing results with fallback categorization while len(processed_results) < len(emails): missing_idx = len(processed_results) processed_results.append(self._fallback_categorization(emails[missing_idx])) return processed_results except json.JSONDecodeError as e: logger.warning(f"JSON parsing failed for batch: {e}. Response was: {json_text}") return [self._fallback_categorization(email) for email in emails] except Exception as e: logger.warning(f"Batch AI analysis failed with error: {e}") return [self._fallback_categorization(email) for email in emails] def _analyze_email_with_ai(self, email: Dict) -> Tuple[str, float, str]: """Use AI to analyze email and determine category, priority, and reasoning""" if not self.model: logger.info("No Qwen model available, using rule-based categorization") return self._fallback_categorization(email) try: # Create a more detailed, explicit prompt for the model prompt = f""" You are an email categorization expert. Your task is to analyze the email details below and categorize it. You MUST respond with ONLY a valid JSON object in the exact format requested at the end. Email Details: - Subject: {email.get('subject', 'No subject')} - From: {email.get('sender', 'Unknown sender')} - Content Preview: {email.get('snippet', 'No preview')} - Is Unread: {email.get('is_unread', False)} - Date: {email.get('date', 'Unknown date')} Available Categories (choose exactly one): 1. high_priority: For urgent emails that need immediate attention 2. meetings_and_calls: For meeting invites, call schedules, or appointment-related emails 3. action_required: For emails that require a specific action or response, but aren't urgent 4. newsletters_and_updates: For subscription emails, product updates, marketing content 5. personal: For emails that are personal in nature but not urgent 6. low_priority: For emails that can be handled later or are low importance Your response MUST be ONLY a JSON object in this exact format: {{"category": "one_of_the_categories_above", "priority_score": 0.1_to_1.0, "reasoning": "Brief explanation of your categorization"}} Examples of valid responses: {{"category": "high_priority", "priority_score": 0.9, "reasoning": "Contains urgent deadline requiring immediate action"}} {{"category": "newsletters_and_updates", "priority_score": 0.3, "reasoning": "Marketing newsletter with no action required"}} """ # Attempt to get response with retry logic built into the client response = self.model.generate_content( prompt, max_tokens=512, temperature=0.3, retries=3 ) # Validate response exists and has content if not response or not hasattr(response, 'text') or not response.text: logger.warning("Empty response from Qwen model after retries") return self._fallback_categorization(email) response_text = response.text.strip() logger.debug(f"Qwen response: {response_text}") # Try to extract JSON from response (in case there's extra text) json_start = response_text.find('{') json_end = response_text.rfind('}') + 1 if json_start == -1 or json_end == 0: logger.warning("No valid JSON found in response") # Try to parse in a more lenient way if possible try: import re # Look for patterns like "category": "value" category_match = re.search(r'"category"\s*:\s*"([^"]+)"', response_text) score_match = re.search(r'"priority_score"\s*:\s*([\d.]+)', response_text) reasoning_match = re.search(r'"reasoning"\s*:\s*"([^"]+)"', response_text) if category_match and score_match: category = category_match.group(1) priority_score = float(score_match.group(1)) reasoning = reasoning_match.group(1) if reasoning_match else "AI analysis" valid_categories = [ 'high_priority', 'meetings_and_calls', 'action_required', 'newsletters_and_updates', 'personal', 'low_priority' ] if category in valid_categories and 0 <= priority_score <= 1: logger.info(f"Recovered partial JSON data from malformed response") return category, priority_score, reasoning except Exception as e: logger.warning(f"Could not recover data from malformed response: {e}") return self._fallback_categorization(email) json_text = response_text[json_start:json_end] try: # Parse JSON with validation result = json.loads(json_text) # Validate required fields if not all(key in result for key in ['category', 'priority_score', 'reasoning']): logger.warning("Missing required fields in JSON response") return self._fallback_categorization(email) # Validate category is valid valid_categories = [ 'high_priority', 'meetings_and_calls', 'action_required', 'newsletters_and_updates', 'personal', 'low_priority' ] category = result['category'] if category not in valid_categories: logger.warning(f"Invalid category '{category}' returned") return self._fallback_categorization(email) # Validate priority score is a number between 0 and 1 try: priority_score = float(result['priority_score']) if not 0.0 <= priority_score <= 1.0: priority_score = 0.5 # Default if out of range except (ValueError, TypeError): priority_score = 0.5 reasoning = str(result.get('reasoning', 'AI analysis completed')) logger.info(f"AI categorized email '{email.get('subject', '')[:50]}...' as {category} (score: {priority_score})") return category, priority_score, reasoning except json.JSONDecodeError as e: logger.warning(f"JSON parsing failed: {e}. Response was: {json_text}") return self._fallback_categorization(email) except Exception as e: logger.warning(f"AI analysis failed with error: {e}") return self._fallback_categorization(email) def _fallback_categorization(self, email: Dict) -> Tuple[str, float, str]: """Fallback categorization when AI is not available""" subject = email.get('subject', '').lower() sender = email.get('sender', '').lower() is_unread = email.get('is_unread', False) # Priority keywords if any(word in subject for word in ['urgent', 'asap', 'critical', 'emergency']): return "high_priority", 0.9, "Contains urgent keywords" elif any(word in subject for word in ['meeting', 'call', 'zoom', 'appointment']): return "meetings_and_calls", 0.7, "Meeting or call related" elif any(word in subject for word in ['action', 'required', 'todo', 'task']): return "action_required", 0.6, "Appears to require action" elif any(word in subject for word in ['newsletter', 'digest', 'update', 'notification']): return "newsletters_and_updates", 0.3, "Newsletter or update" elif is_unread: return "personal", 0.5, "Unread personal email" else: return "low_priority", 0.2, "Standard email" def _suggest_email_actions(self, email: Dict, category: str, priority_score: float) -> List[str]: """Suggest specific actions for each email""" actions = [] if category == "high_priority": actions.extend(["📞 Call sender immediately", "⚡ Respond within 1 hour", "📌 Add to priority list"]) elif category == "meetings_and_calls": actions.extend(["📅 Add to calendar", "✅ Send confirmation", "📋 Prepare agenda"]) elif category == "action_required": actions.extend(["✏️ Create task", "⏰ Set reminder", "📝 Draft response"]) elif category == "newsletters_and_updates": actions.extend(["📖 Schedule reading time", "🗂️ Archive after reading"]) elif priority_score > 0.6: actions.extend(["👀 Review carefully", "📝 Respond today"]) else: actions.extend(["📁 Archive if not important", "👁️ Quick scan"]) return actions def _generate_triage_summary(self, triage_results: Dict) -> str: """Generate intelligent summary of triage results based on actual email content""" total_emails = sum(len(emails) for key, emails in triage_results.items() if key not in ['analysis_summary', 'recommendations']) if total_emails == 0: return "📭 **No emails to analyze**\n\nYour inbox is empty or no emails match the search criteria." # Get specific counts high_priority_count = len(triage_results.get('high_priority', [])) action_required_count = len(triage_results.get('action_required', [])) meetings_count = len(triage_results.get('meetings_and_calls', [])) newsletters_count = len(triage_results.get('newsletters_and_updates', [])) personal_count = len(triage_results.get('personal', [])) # Get specific email subjects for personalized insights urgent_subjects = [email['subject'][:30] + "..." for email in triage_results.get('high_priority', [])[:2]] action_subjects = [email['subject'][:30] + "..." for email in triage_results.get('action_required', [])[:2]] # Extract senders for more personalized recommendations all_senders = [] for category in triage_results: if category in ['analysis_summary', 'recommendations']: continue for email in triage_results[category]: all_senders.append(self._clean_sender(email.get('sender', 'Unknown'))) top_senders = Counter(all_senders).most_common(3) # Calculate unread percentage unread_count = sum(1 for cat in triage_results.keys() if cat not in ['analysis_summary', 'recommendations'] for email in triage_results[cat] if email.get('is_unread', False)) unread_percentage = (unread_count / total_emails * 100) if total_emails > 0 else 0 # Generate personalized summary summary = f""" 🧠 **Intelligent Email Triage Analysis** 📊 **Overview**: Analyzed {total_emails} emails from the last 3 days """ # Personalized urgent section if high_priority_count > 0: summary += f"🚨 **Immediate Attention**: {high_priority_count} high-priority emails require urgent response\n" if urgent_subjects: summary += " • " + "\n • ".join(f'"{subject}"' for subject in urgent_subjects) if high_priority_count > len(urgent_subjects): summary += f"\n • ...and {high_priority_count - len(urgent_subjects)} more" summary += "\n\n" else: summary += "✅ **No urgent emails requiring immediate attention**\n\n" # Personalized action section if action_required_count > 0: summary += f"✅ **Action Items**: {action_required_count} emails need specific actions\n" if action_subjects: summary += " • " + "\n • ".join(f'"{subject}"' for subject in action_subjects) if action_required_count > len(action_subjects): summary += f"\n • ...and {action_required_count - len(action_subjects)} more" summary += "\n\n" # Add meeting info if meetings_count > 0: summary += f"📅 **Calendar Items**: {meetings_count} meeting-related emails\n\n" # Add distribution info summary += f"📦 **Email Distribution**:\n" if newsletters_count > 0: summary += f" • Newsletters/Updates: {newsletters_count} ({newsletters_count/total_emails*100:.0f}%)\n" if personal_count > 0: summary += f" • Personal: {personal_count} ({personal_count/total_emails*100:.0f}%)\n" if action_required_count > 0: summary += f" • Action Required: {action_required_count} ({action_required_count/total_emails*100:.0f}%)\n" if high_priority_count > 0: summary += f" • High Priority: {high_priority_count} ({high_priority_count/total_emails*100:.0f}%)\n" # Add unread status summary += f"\n📬 **Inbox Status**: {unread_count} unread emails ({unread_percentage:.0f}% of analyzed emails)\n" # Add top senders if available if top_senders: summary += f"\n👥 **Top Senders**:\n" for sender, count in top_senders: summary += f" • {sender}: {count} emails\n" # Add AI insights based on actual data summary += f""" 💡 **AI Insights**: - {'High' if unread_percentage > 70 else 'Moderate' if unread_percentage > 30 else 'Low'} unread email ratio ({unread_percentage:.0f}%) - {'High' if high_priority_count > 3 else 'Normal'} priority workload ({high_priority_count} urgent emails) - {'Consider' if action_required_count > 5 else 'Manageable'} task delegation for action items ({action_required_count} tasks) - {top_senders[0][0] if top_senders else 'No single sender'} is your most frequent correspondent ({top_senders[0][1] if top_senders else 0} emails) """ return summary def _generate_workflow_recommendations(self, triage_results: Dict) -> List[str]: """Generate intelligent workflow recommendations""" recommendations = [] high_priority_count = len(triage_results.get('high_priority', [])) action_count = len(triage_results.get('action_required', [])) if high_priority_count > 0: recommendations.append(f"🚨 Handle {high_priority_count} urgent emails first") if action_count > 5: recommendations.append("📋 Consider batching similar action items") recommendations.append("⏰ Set aside 2-3 hour block for email processing") recommendations.extend([ "📅 Schedule 15-min email review sessions", "🔄 Set up automated filters for newsletters", "📱 Enable smart notifications for high-priority senders" ]) return recommendations def _update_email_patterns(self, emails: List[Dict]): """Update agent memory with email patterns""" # Simple pattern tracking current_time = datetime.now().isoformat() self.memory.email_patterns[current_time] = { "total_emails": len(emails), "unread_count": sum(1 for e in emails if e.get('is_unread', False)), "top_senders": self._get_sender_stats(emails) } def _get_sender_stats(self, emails: List[Dict]) -> Dict[str, int]: """Get sender statistics""" senders = {} for email in emails: sender = email.get('sender', 'Unknown') senders[sender] = senders.get(sender, 0) + 1 return dict(sorted(senders.items(), key=lambda x: x[1], reverse=True)[:5]) def proactive_assistant_chat(self, user_message: str, chat_history: List) -> Tuple[str, List]: """Enhanced chat with proactive suggestions and agent reasoning""" try: # Analyze user intent intent, confidence = self._analyze_user_intent(user_message) # Add to chat history first to show user message immediately chat_history.append((user_message, None)) # Try to generate smart response with better timeout handling try: # Use a smaller query for recent emails with a shorter timeout logger.info("Fetching recent emails for chat context") recent_emails = self.mcp_client.fetch_emails(query="newer_than:3d", max_results=10) if recent_emails and 'emails' in recent_emails: # Use emails for context-aware response response = self._generate_smart_response_batch(user_message, intent, confidence, recent_emails['emails'][:5]) else: # Fallback if email fetching fails response = self._handle_intent_fallback(user_message, intent, confidence) # Update conversation context self.memory.conversation_context.append({ "user_message": user_message, "intent": intent, "confidence": confidence, "timestamp": datetime.now().isoformat() }) except requests.exceptions.Timeout: logger.error(f"Timeout when processing chat response") response = f"🧠 **AI Analysis**\n💭 *Understanding: {intent.replace('_', ' ').title()}*\n\nI apologize, but I'm having trouble connecting to the advanced AI service at the moment. Here's a simplified response based on your query:\n\n{self._handle_intent_fallback(user_message, intent, confidence)}" except Exception as e: logger.error(f"Error generating chat response: {e}") response = f"🧠 **AI Analysis**\n💭 *Understanding: {intent.replace('_', ' ').title()}*\n\nI apologize, but I encountered an error while analyzing your emails. Let me provide a simple response instead:\n\n{self._handle_intent_fallback(user_message, intent, confidence)}" # Replace the temporary thinking message with the actual response chat_history[-1] = (user_message, response) return "", chat_history except Exception as e: error_response = f"🧠 **Agent Analysis**\n\n❌ Error: {str(e)}\n\nI apologize for the inconvenience. Please try a different question or check the email connection status." # Handle the case where chat history might be empty or invalid if not chat_history: chat_history = [] # Add user message and error response chat_history.append((user_message, error_response)) return "", chat_history def _generate_smart_response_batch(self, message: str, intent: str, confidence: float, emails: List[Dict]) -> str: """Generate intelligent response based on intent analysis and email processing - SINGLE API CALL""" # Add agent header without confidence score response_header = f"🧠 **AI Analysis**\n" response_header += f"Understanding: {intent.replace('_', ' ').title()}\n\n" try: if self.model and emails: logger.info(f"Generating smart response with email context ({len(emails)} emails)") # Create simplified email context for chat email_summaries = [] for i, email in enumerate(emails[:5]): # Limit to 5 emails for chat context sender = self._clean_sender(email.get('sender', 'Unknown')) subject = email.get('subject', 'No subject')[:60] # Limit subject length is_unread = "unread" if email.get('is_unread', False) else "read" email_summaries.append(f"Email {i+1}: \"{subject}\" from {sender} ({is_unread})") # Create an improved chat prompt with better instructions chat_prompt = f"""You are an intelligent email assistant with access to the user's actual emails. The user asked: "{message}" I've detected their intent as: {intent} Here are their recent emails for context: {chr(10).join(email_summaries)} IMPORTANT GUIDELINES: 1. Be concise but informative - keep responses under 250 words 2. Reference specific emails by subject or sender when relevant 3. Format your response with clear sections and bullet points when appropriate 4. Provide actionable advice based on the actual emails 5. Use a friendly, helpful tone 6. For search queries, mention specific matching emails 7. For summaries, group similar emails together 8. For workflow questions, suggest specific organization strategies Make your response personalized to their actual emails. Be direct and helpful without unnecessary explanations. """ logger.info(f"Sending chat request to Qwen API with {len(emails)} email context") response = self.model.generate_content( chat_prompt, max_tokens=512, temperature=0.7, strip_thinking=True, # Clean response for chat timeout=120.0 ) if response and hasattr(response, 'text') and response.text: ai_response = response.text.strip() logger.info(f"Received chat response: {len(ai_response)} chars") # Make sure we have actual content if len(ai_response) > 10: # Add agent analysis header final_response = response_header + ai_response return final_response else: logger.warning("AI response too short, using fallback") return response_header + self._handle_intent_fallback(message, intent, confidence) else: # If we got no response, fall back to simpler intent handling logger.warning("Empty response from Qwen for chat, using fallback") return response_header + self._handle_intent_fallback(message, intent, confidence) else: # No model or emails available logger.info("Using fallback response (no model or emails available)") return response_header + self._handle_intent_fallback(message, intent, confidence) except requests.exceptions.Timeout: logger.warning(f"Timeout in Qwen response generation - using fallback") return response_header + "I'll help with your request using my basic capabilities:\n\n" + self._handle_intent_fallback(message, intent, confidence) except Exception as e: logger.warning(f"Error generating smart response: {e}") return response_header + self._handle_intent_fallback(message, intent, confidence) def _analyze_user_intent(self, message: str) -> Tuple[str, float]: """Analyze user intent and confidence level""" message_lower = message.lower() # Intent patterns with confidence scores intent_patterns = { "email_search": (["find", "search", "look for", "show me"], 0.8), "email_summary": (["summarize", "summary", "overview", "brief"], 0.9), "workflow_automation": (["automate", "organize", "cleanup", "triage"], 0.7), "meeting_prep": (["meeting", "call", "appointment", "schedule"], 0.8), "priority_focus": (["urgent", "important", "priority", "critical"], 0.9), "general_help": (["help", "how to", "what can", "assistance"], 0.6) } best_intent = "general_help" best_confidence = 0.3 for intent, (keywords, base_confidence) in intent_patterns.items(): matches = sum(1 for keyword in keywords if keyword in message_lower) if matches > 0: confidence = min(base_confidence + (matches - 1) * 0.1, 1.0) if confidence > best_confidence: best_intent = intent best_confidence = confidence return best_intent, best_confidence def _generate_smart_response(self, message: str, intent: str, confidence: float) -> str: """Generate intelligent response based on intent analysis using real email data""" # Add header without confidence response_header = f"🧠 **AI Analysis**\n" response_header += f"Understanding: {intent.replace('_', ' ').title()}\n\n" try: # Get real email context for better responses recent_emails = self.mcp_client.fetch_emails(query="newer_than:7d", max_results=15) if self.model and recent_emails and 'emails' in recent_emails: # Use Qwen with real email context return self._generate_qwen_response(message, intent, confidence, recent_emails['emails']) else: # Fallback to intent-based handling with real data if intent == "email_search" and confidence > 0.7: return response_header + self._handle_search_intent(message) elif intent == "email_summary" and confidence > 0.8: return response_header + self._handle_summary_intent(message) elif intent == "workflow_automation": return response_header + self._handle_automation_intent(message) elif intent == "meeting_prep": return response_header + self._handle_meeting_intent(message) elif intent == "priority_focus": return response_header + self._handle_priority_intent(message) else: return response_header + self._handle_general_intent(message) except Exception as e: return f"Error generating smart response: {str(e)}" def _handle_search_intent(self, message: str) -> str: """Handle search-related queries""" # Extract search terms from message search_terms = self._extract_search_terms(message) if search_terms: # Execute search results = self.mcp_client.search_emails( subject_contains=" OR ".join(search_terms), max_results=10 ) if results and 'emails' in results: count = len(results['emails']) return f"🔍 **Search Results**: Found {count} emails matching your criteria.\n\n" + \ f"**Search Terms Used**: {', '.join(search_terms)}\n\n" + \ "📧 **Top Matches**:\n" + \ "\n".join([f"• {email['subject']} (from {self._clean_sender(email['sender'])})" for email in results['emails'][:3]]) return "🔍 I can help you search your emails! Try being more specific, like:\n" + \ "• 'Find emails from John about the project'\n" + \ "• 'Search for meeting emails from last week'\n" + \ "• 'Show me emails with attachments'" def _handle_summary_intent(self, message: str) -> str: """Handle summary-related queries""" # Determine time period if "week" in message.lower(): days = 7 elif "month" in message.lower(): days = 30 elif "today" in message.lower(): days = 1 else: days = 3 # Default summary_result = self.mcp_client.summarize_emails(days=days, include_body=True) if summary_result: return f"📊 **Email Summary - Last {days} Days**\n\n" + \ f"📧 Total: {summary_result.get('total_emails', 0)} emails\n" + \ f"🔴 Unread: {summary_result.get('unread_count', 0)} emails\n\n" + \ "💡 **Agent Recommendation**: Focus on unread emails first for maximum efficiency." return f"📊 No emails found for the last {days} days." def _handle_automation_intent(self, message: str) -> str: """Handle workflow automation queries""" return """🤖 **Email Organization Options**: 📊 **Email Analysis** - Get insights about your emails 🎯 **Priority Focus** - Focus on high-priority emails 🧹 **Inbox Management** - Get organization suggestions 📅 **Meeting Preparation** - Find meeting-related emails 💡 **Try asking**: "Help me organize my inbox" or "Analyze my emails and provide recommendations" """ def _handle_meeting_intent(self, message: str) -> str: """Handle meeting-related queries""" meeting_workflow = self._meeting_preparation_workflow() return f"📅 **Meeting Assistant Activated**\n\n{meeting_workflow}" def _handle_priority_intent(self, message: str) -> str: """Handle priority/urgent email queries""" priority_workflow = self._priority_focus_workflow() return f"🚨 **Priority Mode Activated**\n\n{priority_workflow}" def _handle_general_intent(self, message: str) -> str: """Handle general queries with proactive suggestions""" return """🤖 **How can I help you today?** I'm your intelligent email assistant with advanced capabilities: 🧠 **Smart Features**: • Intent recognition and context awareness • Automated email triage and categorization • Intelligent recommendations • Email pattern analysis 💬 **Try asking me**: • "Show me my most important emails" • "Help me organize my inbox" • "What emails do I have from [specific sender]?" • "Summarize emails from this week" 🎯 **Proactive Suggestion**: Let me analyze your emails to provide personalized recommendations!""" def _extract_search_terms(self, message: str) -> List[str]: """Extract search terms from user message""" # Simple keyword extraction words = message.lower().split() stop_words = {'find', 'search', 'show', 'me', 'my', 'from', 'about', 'with', 'for', 'the', 'a', 'an'} return [word for word in words if word not in stop_words and len(word) > 2] def _clean_sender(self, sender: str) -> str: """Clean sender email for display""" if '<' in sender: email_match = re.search(r'<([^>]+)>', sender) if email_match: return email_match.group(1) return sender def _generate_qwen_response(self, message: str, intent: str, confidence: float, emails: List[Dict]) -> str: """Generate response using Qwen with real email context""" try: # Prepare email context for Qwen - MINIMIZE CONTENT TO AVOID TIMEOUT email_context = [] # Include at most 3 emails to avoid timeout issues for email in emails[:2]: # Further reduced from 3 to 2 email_context.append({ 'subject': email.get('subject', 'No subject')[:20], # Further limit subject length 'sender': self._clean_sender(email.get('sender', 'Unknown')), 'is_unread': email.get('is_unread', False) # Removed more fields to reduce prompt size }) # Create an even simpler system prompt to avoid timeouts system_prompt = f""" You are an email assistant. Answer the user's query about their emails briefly. USER QUERY: "{message}" INTENT: {intent} EMAIL SAMPLE: {len(email_context)} emails """ # Use a shorter max_tokens and higher temperature for faster responses response = self.model.generate_content( system_prompt, max_tokens=512, # Reduced from 1024 to 512 temperature=0.7, strip_thinking=True, retries=2 ) if response and hasattr(response, 'text') and response.text: ai_response = response.text.strip() # Add header without confidence final_response = f"🧠 **AI Analysis**\n" final_response += f"Understanding: {intent.replace('_', ' ').title()}\n\n" final_response += ai_response return final_response else: # If we got no response, fall back to simpler intent handling logger.warning("Empty response from Qwen, using fallback") return self._handle_intent_fallback(message, intent, confidence) except requests.exceptions.Timeout: logger.warning(f"Timeout in Qwen response generation - using fallback") response_header = f"🧠 **AI Analysis**\n" response_header += f"Understanding: {intent.replace('_', ' ').title()}\n\n" response_header += "I'll help with your request using my basic capabilities:\n\n" return response_header + self._handle_intent_fallback(message, intent, confidence) except Exception as e: logger.warning(f"Qwen response generation failed: {e}") return self._handle_intent_fallback(message, intent, confidence) def _handle_intent_fallback(self, message: str, intent: str, confidence: float) -> str: """Fallback intent handling when Qwen is not available""" response_header = f"🧠 **AI Analysis**\n" response_header += f"Understanding: {intent.replace('_', ' ').title()}\n\n" if intent == "email_search" and confidence > 0.7: return response_header + self._handle_search_intent(message) elif intent == "email_summary" and confidence > 0.8: return response_header + self._handle_summary_intent(message) elif intent == "workflow_automation": return response_header + self._handle_automation_intent(message) elif intent == "meeting_prep": return response_header + self._handle_meeting_intent(message) elif intent == "priority_focus": return response_header + self._handle_priority_intent(message) else: return response_header + self._handle_general_intent(message) def get_connection_status(self) -> Tuple[str, str]: """Get the current connection status of the MCP client""" try: # Test connection by attempting to fetch a small number of emails test_result = self.mcp_client.fetch_emails(query="newer_than:1d", max_results=1) if test_result and 'emails' in test_result: total_count = test_result.get('total_count', 0) return f"Connected ✅ ({total_count} emails accessible)", "success" else: return "Connected but no emails found 📭", "warning" except Exception as e: return f"Connection Error ❌ ({str(e)[:50]}...)", "error" def _display_triage_results(self, triage_results: Dict) -> str: """Format triage results for beautiful, readable display in Gradio""" if "error" in triage_results: return f"❌ **Error:** {triage_results['error']}" # Start with a clean, formatted header output = "# 🧠 **AI Email Analysis Results**\n\n" # Add the analysis summary with better formatting summary = triage_results.get("analysis_summary", "") if summary: output += summary + "\n\n" output += "---\n\n" # Add separator # Count emails by category for overview categories = [ ("high_priority", "🚨 **High Priority**", "Urgent attention needed"), ("meetings_and_calls", "📅 **Meetings & Calls**", "Calendar items"), ("action_required", "✅ **Action Required**", "Tasks to complete"), ("newsletters_and_updates", "📰 **Newsletters**", "Updates & info"), ("personal", "👤 **Personal**", "Personal messages"), ("low_priority", "📁 **Low Priority**", "Can wait") ] # Count total emails processed total_processed = sum(len(triage_results.get(cat, [])) for cat, _, _ in categories) if total_processed == 0: return output + "📭 **No emails found to analyze.**" # Add quick stats overview with visual bar chart output += f"## 📊 **Quick Overview** ({total_processed} emails analyzed)\n\n" # Create a visual bar chart representation output += "| Category | Count | Distribution |\n" output += "|----------|-------|-------------|\n" for category_key, category_icon, category_desc in categories: count = len(triage_results.get(category_key, [])) if total_processed > 0: percentage = (count / total_processed * 100) # Create a visual bar using emoji bar_length = int(percentage / 5) # 5% per character bar = "█" * bar_length if bar_length > 0 else "" output += f"| {category_icon} | {count} | {bar} {percentage:.0f}% |\n" output += "\n---\n\n" # Show detailed breakdown for important categories priority_categories = ["high_priority", "meetings_and_calls", "action_required"] for category_key, category_title, category_desc in categories: emails = triage_results.get(category_key, []) if not emails: continue # Only show full details for priority categories, summary for others show_full_details = category_key in priority_categories output += f"## {category_title} ({len(emails)} emails)\n" output += f"*{category_desc}*\n\n" if show_full_details: # Show detailed view for important emails for i, email in enumerate(emails[:5]): # Increased from 3 to 5 emails priority_score = email.get('priority_score', 0.5) priority_icon = "🔥" if priority_score > 0.8 else "⚠️" if priority_score > 0.6 else "📌" subject = email.get('subject', 'No subject') sender = self._clean_sender(email.get('sender', 'Unknown')) # Truncate for readability if len(subject) > 60: subject = subject[:57] + "..." if len(sender) > 35: sender = sender[:32] + "..." output += f"### {priority_icon} {subject}\n" output += f"📤 **From:** {sender} | ⭐ **Priority:** {priority_score:.1f}/1.0\n\n" # Add AI reasoning if available - improved formatting reasoning = email.get('ai_reasoning', '') if reasoning and len(reasoning) > 5: output += f"🧠 **AI Analysis:** {reasoning}\n\n" # Add suggested actions with better formatting actions = email.get('suggested_actions', []) if actions: output += "💡 **Suggested Actions:**\n" for action in actions[:3]: # Limit to 3 actions output += f"- {action}\n" output += "\n" output += "---\n\n" # Show count if there are more emails if len(emails) > 5: output += f"*...and {len(emails) - 5} more emails in this category*\n\n" else: # Show summary view for less important categories output += "
\n\n" output += "| Subject | From | Priority |\n" output += "|---------|------|----------|\n" for email in emails[:5]: # Show up to 5 emails subject = email.get('subject', 'No subject') if len(subject) > 40: subject = subject[:37] + "..." sender = self._clean_sender(email.get('sender', 'Unknown')) if len(sender) > 25: sender = sender[:22] + "..." priority = email.get('priority_score', 0.5) priority_icon = "🔥" if priority > 0.8 else "⚠️" if priority > 0.6 else "📌" output += f"| {subject} | {sender} | {priority_icon} {priority:.1f} |\n" output += "\n
\n\n" if len(emails) > 5: output += f"*...and {len(emails) - 5} more emails in this category*\n\n" # Add recommendations section with better formatting recommendations = triage_results.get("recommendations", []) if recommendations: output += "---\n\n" output += "## 💡 **AI Recommendations**\n\n" for i, rec in enumerate(recommendations[:5], 1): # Limit to 5 recommendations output += f"{i}. {rec}\n" output += "\n" # Add concise next steps with better visual separation high_count = len(triage_results.get('high_priority', [])) action_count = len(triage_results.get('action_required', [])) output += "---\n\n" output += "## 🎯 **What to Do Next**\n\n" output += "
\n\n" if high_count > 0: output += f"1. 🚨 **URGENT:** Handle {high_count} high-priority emails first\n\n" if action_count > 0: output += f"2. ✅ **TODAY:** Complete {action_count} action items\n\n" output += f"3. 📅 **SCHEDULE:** Review meeting emails and update calendar\n\n" output += f"4. 🗂️ **ORGANIZE:** Archive newsletters and low-priority items\n\n" output += "
\n\n" output += "---\n\n" output += "*🤖 Analysis powered by Qwen AI • Results based on your actual email content*" return output # Create the enhanced Gradio interface def create_enhanced_gradio_interface(modal_api_url: str = None): """Create the enhanced Gradio interface for the Gmail AI Agent""" # Use environment variable if no URL provided modal_api_url = modal_api_url or MODAL_API_URL # Initialize the agent agent = EnhancedGmailAgent(modal_api_url=modal_api_url) # Check if we have valid credentials mcp_status, model_status = agent.get_connection_status() # Theme configuration theme = gr.Theme( primary_hue="indigo", secondary_hue="blue", neutral_hue="slate", ) # Custom CSS for better styling custom_css = """ /* ... existing code ... */ """ with gr.Blocks(theme=theme, title="Enhanced Gmail AI Agent - Track 3") as app: # Enhanced Header gr.HTML("""

🤖 Enhanced Gmail AI Agent

Track 3: Sophisticated Agent Behaviors & Workflow Automation

Intelligent Decision-Making • Workflow Automation • Proactive Assistance

""") # Warning about initial response time gr.HTML("""

⚠️ IMPORTANT: Initial Loading Time

On first run, AI responses may take 120-200 seconds because the Qwen LLM is hosted on Modal and requires time for cold start.

Subsequent requests will be much faster after the initial cold start. Please be patient during your first interaction.

""") # Status section with Qwen model status with gr.Row(): with gr.Column(): status_text = gr.HTML(elem_classes=["status-card"]) def update_status(): email_status, email_status_type = agent.get_connection_status() # Add Qwen model status qwen_status = agent.model_status if hasattr(agent, 'model_status') else "Unknown" qwen_class = "status-connected" if agent.model else "status-error" return f'''
🧠 Qwen Model Status: {qwen_status}

Note: The Qwen API will be tested when you first use an AI feature

''' status_text.value = update_status() with gr.Tabs() as tabs: # Intelligent Triage Tab with loading with gr.Tab("🧠 Intelligent Triage"): with gr.Column(elem_classes=["content-card"]): gr.HTML('

🧠 AI-Powered Email Triage & Analysis

') gr.HTML("""

✨ Smart Email Analysis

The agent analyzes your emails using AI to categorize, prioritize, and suggest specific actions. Each email gets a priority score and reasoning.

⏱️ Expected Time: 20-30 seconds for thorough analysis

""") # First-run warning gr.HTML("""

⚠️ IMPORTANT: First-time AI operations may take 120-200 seconds due to Modal cold start.

""") with gr.Row(): max_emails_triage = gr.Slider( minimum=10, maximum=50, value=20, step=5, label="📊 Emails to Analyze", info="More emails = more thorough analysis but longer wait time" ) triage_btn = gr.Button("🧠 Run Intelligent Triage", variant="primary", size="lg") # Loading state for triage triage_loading = gr.HTML(visible=False) # Updated to use a scrollable container with proper styling triage_output = gr.Markdown( label="🎯 Triage Results", value="Click 'Run Intelligent Triage' to see AI-powered email analysis with specific recommendations", elem_classes=["triage-results"], show_label=True ) def run_triage_with_loading(max_emails): loading_html = f"""

🧠 Analyzing Your {max_emails} Most Recent Emails...

AI is categorizing, prioritizing, and generating recommendations

⏱️ This usually takes 20-30 seconds • Please wait patiently

📊 Processing: Fetch emails → AI analysis → Priority scoring → Recommendations

""" return ( gr.update(value=loading_html, visible=True), gr.update(visible=False), gr.update(interactive=False) ) def run_triage_complete(max_emails): results = agent.intelligent_email_triage(max_emails) if "error" in results: output = f"❌ **Error:** {results['error']}" else: # Use the beautifully formatted display instead of manual formatting output = results.get("formatted_display", "Analysis complete but no formatted output available.") return ( gr.update(visible=False), gr.update(value=output, visible=True), gr.update(interactive=True) ) triage_btn.click( run_triage_with_loading, inputs=[max_emails_triage], outputs=[triage_loading, triage_output, triage_btn] ).then( run_triage_complete, inputs=[max_emails_triage], outputs=[triage_loading, triage_output, triage_btn] ) # Proactive AI Assistant Tab with gr.Tab("🤖 Proactive Assistant"): with gr.Column(elem_classes=["content-card"]): gr.HTML('

🤖 Intelligent AI Assistant

') gr.HTML("""

🧠 Advanced AI Capabilities

This AI assistant analyzes your actual email data using Qwen model to provide personalized, specific recommendations:

""") # First-run warning gr.HTML("""

⚠️ IMPORTANT: First-time AI operations may take 120-200 seconds due to Modal cold start.

""") # Improved chatbot with proper scrolling and styling with gr.Column(elem_classes=["chat-container"]): chatbot = gr.Chatbot( height=500, container=True, value=[(None, "🤖 **Welcome to your Enhanced Email Assistant!**\n\nI'm powered by advanced AI and can analyze your actual emails to provide specific, personalized recommendations.\n\n⚠️ **IMPORTANT: First-time responses may take 120-200 seconds** due to the Qwen model's cold start on Modal. Please be patient!\n\n💡 **Try asking me:**\n• \"What emails should I focus on today?\"\n• \"Help me organize my inbox based on my recent emails\"\n• \"Show me my most important unread emails\"\n• \"Summarize my emails from this week\"\n\nI'll analyze your real email data and give you specific advice! ✨")], elem_classes=["enhanced-chatbot", "chat-messages"] ) with gr.Row(elem_classes=["chat-input-container"]): chat_input = gr.Textbox( placeholder="Ask me about your emails or request email analysis...", scale=4, lines=2, max_lines=8, elem_classes=["enhanced-input"], autofocus=True, show_label=False ) chat_send = gr.Button("📤 Send", variant="primary", scale=1, elem_classes=["send-button"]) chat_send.click( agent.proactive_assistant_chat, inputs=[chat_input, chatbot], outputs=[chat_input, chatbot] ) chat_input.submit( agent.proactive_assistant_chat, inputs=[chat_input, chatbot], outputs=[chat_input, chatbot] ) # Enhanced Footer gr.HTML(""" """) return app if __name__ == "__main__": print("🚀 Starting Enhanced Gmail AI Agent (Track 3)") print("🧠 Features: Intelligent Triage | Smart Workflows | Proactive Assistant") # Get Modal API URL from environment variable modal_api_url = MODAL_API_URL if modal_api_url: print(f"📡 Using Modal API URL from environment: {modal_api_url}") else: print("⚠️ No Modal API URL found in environment variables. Some features may be limited.") app = create_enhanced_gradio_interface(modal_api_url=modal_api_url) app.launch( server_name="0.0.0.0", # Use 0.0.0.0 to allow external connections (needed for Hugging Face Spaces) server_port=7860, # Use port 7860 for Hugging Face Spaces share=False, show_error=True ) # Create the Gradio app for Hugging Face Spaces app = create_enhanced_gradio_interface(modal_api_url=MODAL_API_URL)