ysharma HF Staff commited on
Commit
6af4299
·
verified ·
1 Parent(s): 460de3b

Create utils.py

Browse files
Files changed (1) hide show
  1. utils.py +205 -0
utils.py ADDED
@@ -0,0 +1,205 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utility functions for Universal MCP Client
3
+ """
4
+ import re
5
+ import logging
6
+ from typing import List, Dict, Any, Optional
7
+ from pathlib import Path
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+ def validate_huggingface_space_name(space_name: str) -> bool:
12
+ """
13
+ Validate HuggingFace space name format
14
+ Expected format: username/space-name
15
+ """
16
+ if not space_name or not isinstance(space_name, str):
17
+ return False
18
+
19
+ # Check for the required "/" separator
20
+ if "/" not in space_name:
21
+ return False
22
+
23
+ parts = space_name.split("/")
24
+ if len(parts) != 2:
25
+ return False
26
+
27
+ username, space_name_part = parts
28
+
29
+ # Basic validation for username and space name
30
+ # HuggingFace usernames and space names should be alphanumeric with hyphens and underscores
31
+ username_pattern = r'^[a-zA-Z0-9\-_]+$'
32
+ space_pattern = r'^[a-zA-Z0-9\-_]+$'
33
+
34
+ return bool(re.match(username_pattern, username) and re.match(space_pattern, space_name_part))
35
+
36
+ def sanitize_server_name(name: str) -> str:
37
+ """
38
+ Sanitize server name for use as MCP server identifier
39
+ """
40
+ if not name:
41
+ return "unnamed_server"
42
+
43
+ # Convert to lowercase and replace spaces and special chars with underscores
44
+ sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower())
45
+
46
+ # Remove multiple consecutive underscores
47
+ sanitized = re.sub(r'_+', '_', sanitized)
48
+
49
+ # Remove leading/trailing underscores
50
+ sanitized = sanitized.strip('_')
51
+
52
+ return sanitized or "unnamed_server"
53
+
54
+ def format_file_size(size_bytes: int) -> str:
55
+ """
56
+ Format file size in human readable format
57
+ """
58
+ if size_bytes == 0:
59
+ return "0 B"
60
+
61
+ size_names = ["B", "KB", "MB", "GB", "TB"]
62
+ i = 0
63
+ while size_bytes >= 1024 and i < len(size_names) - 1:
64
+ size_bytes /= 1024.0
65
+ i += 1
66
+
67
+ return f"{size_bytes:.1f} {size_names[i]}"
68
+
69
+ def get_file_info(file_path: str) -> Dict[str, Any]:
70
+ """
71
+ Get information about a file
72
+ """
73
+ try:
74
+ path = Path(file_path)
75
+ if not path.exists():
76
+ return {"error": "File not found"}
77
+
78
+ stat = path.stat()
79
+
80
+ return {
81
+ "name": path.name,
82
+ "size": stat.st_size,
83
+ "size_formatted": format_file_size(stat.st_size),
84
+ "extension": path.suffix.lower(),
85
+ "exists": True
86
+ }
87
+ except Exception as e:
88
+ logger.error(f"Error getting file info for {file_path}: {e}")
89
+ return {"error": str(e)}
90
+
91
+ def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
92
+ """
93
+ Truncate text to a maximum length with suffix
94
+ """
95
+ if not text or len(text) <= max_length:
96
+ return text
97
+
98
+ return text[:max_length - len(suffix)] + suffix
99
+
100
+ def format_tool_description(tool_name: str, description: str, max_desc_length: int = 150) -> str:
101
+ """
102
+ Format tool description for display
103
+ """
104
+ formatted_name = tool_name.replace("_", " ").title()
105
+ truncated_desc = truncate_text(description, max_desc_length)
106
+
107
+ return f"**{formatted_name}**: {truncated_desc}"
108
+
109
+ def extract_media_type_from_url(url: str) -> Optional[str]:
110
+ """
111
+ Extract media type from URL based on file extension
112
+ """
113
+ if not url:
114
+ return None
115
+
116
+ # Handle data URLs
117
+ if url.startswith('data:'):
118
+ if 'image/' in url:
119
+ return 'image'
120
+ elif 'audio/' in url:
121
+ return 'audio'
122
+ elif 'video/' in url:
123
+ return 'video'
124
+ return None
125
+
126
+ # Handle regular URLs - extract extension
127
+ url_lower = url.lower()
128
+
129
+ if any(ext in url_lower for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.svg']):
130
+ return 'image'
131
+ elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac']):
132
+ return 'audio'
133
+ elif any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']):
134
+ return 'video'
135
+
136
+ return None
137
+
138
+ def clean_html_for_display(html_text: str) -> str:
139
+ """
140
+ Clean HTML text for safe display in Gradio
141
+ """
142
+ if not html_text:
143
+ return ""
144
+
145
+ # Remove script tags for security
146
+ html_text = re.sub(r'<script[^>]*>.*?</script>', '', html_text, flags=re.IGNORECASE | re.DOTALL)
147
+
148
+ # Remove potentially dangerous attributes
149
+ html_text = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', html_text, flags=re.IGNORECASE)
150
+
151
+ return html_text
152
+
153
+ def generate_accordion_html(title: str, content: str, is_open: bool = False) -> str:
154
+ """
155
+ Generate HTML for a collapsible accordion section
156
+ """
157
+ open_attr = "open" if is_open else ""
158
+
159
+ return f"""
160
+ <details {open_attr} style="margin-bottom: 10px;">
161
+ <summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;">
162
+ <strong>{title}</strong>
163
+ </summary>
164
+ <div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;">
165
+ {content}
166
+ </div>
167
+ </details>
168
+ """
169
+
170
+ class EventTracker:
171
+ """Simple event tracking for debugging and monitoring"""
172
+
173
+ def __init__(self):
174
+ self.events: List[Dict[str, Any]] = []
175
+ self.max_events = 100
176
+
177
+ def track_event(self, event_type: str, data: Dict[str, Any] = None):
178
+ """Track an event"""
179
+ import datetime
180
+
181
+ event = {
182
+ "timestamp": datetime.datetime.now().isoformat(),
183
+ "type": event_type,
184
+ "data": data or {}
185
+ }
186
+
187
+ self.events.append(event)
188
+
189
+ # Keep only the most recent events
190
+ if len(self.events) > self.max_events:
191
+ self.events = self.events[-self.max_events:]
192
+
193
+ logger.debug(f"Event tracked: {event_type}")
194
+
195
+ def get_recent_events(self, count: int = 10) -> List[Dict[str, Any]]:
196
+ """Get recent events"""
197
+ return self.events[-count:]
198
+
199
+ def clear_events(self):
200
+ """Clear all tracked events"""
201
+ self.events.clear()
202
+
203
+ # Global event tracker instance
204
+ event_tracker = EventTracker()
205
+