|
|
|
|
|
|
|
import spaces |
|
|
|
|
|
@spaces.GPU |
|
def gpu_available(): |
|
import torch |
|
return torch.cuda.is_available() |
|
|
|
import os |
|
import sys |
|
import json |
|
import time |
|
import re |
|
import logging |
|
import traceback |
|
import subprocess |
|
from datetime import datetime |
|
from typing import List, Dict, Optional, Union |
|
from contextlib import asynccontextmanager |
|
|
|
import torch |
|
import uvicorn |
|
import threading |
|
import requests |
|
import gradio |
|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.responses import JSONResponse, HTMLResponse |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel, Field |
|
|
|
|
|
|
|
|
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__)) |
|
sys.path.insert(0, script_dir) |
|
|
|
|
|
|
|
|
|
from src.utils import ( |
|
ProcessingContext, |
|
ProcessingResponse, |
|
ProcessedImage, |
|
setup_logging, |
|
get_system_info, |
|
cleanup_memory, |
|
custom_dumps, |
|
LOG_LEVEL_MAP, |
|
EMOJI_MAP |
|
) |
|
|
|
from src.models.model_loader import ( |
|
ensure_models_loaded, |
|
check_hardware_environment, |
|
MODELS_LOADED, |
|
LOAD_ERROR, |
|
DEVICE |
|
) |
|
|
|
from src.pipeline import run_functions_in_sequence, PIPELINE_STEPS |
|
|
|
|
|
|
|
|
|
from src.config import ( |
|
API_TITLE, |
|
API_VERSION, |
|
API_DESCRIPTION, |
|
API_HOST, |
|
API_PORT, |
|
GPU_DURATION_LONG, |
|
STATUS_SUCCESS, |
|
STATUS_ERROR, |
|
STATUS_PROCESSED, |
|
STATUS_NOT_PROCESSED, |
|
ERROR_NO_VALID_URLS, |
|
HTTP_OK, |
|
HTTP_BAD_REQUEST, |
|
HTTP_INTERNAL_SERVER_ERROR |
|
) |
|
|
|
|
|
|
|
|
|
try: |
|
from tests.config import RUN_TESTS |
|
except ImportError: |
|
try: |
|
sys.path.insert(0, os.path.join(script_dir, 'tests')) |
|
from config import RUN_TESTS |
|
except ImportError: |
|
RUN_TESTS = False |
|
print("Warning: Could not import RUN_TESTS from tests.config, defaulting to False") |
|
|
|
|
|
|
|
|
|
class ImageRequest(BaseModel): |
|
urls: Union[str, List[str]] = Field(..., description="Image URL(s)") |
|
product_type: str = Field("General", description="Product type") |
|
options: Optional[Dict] = Field(default_factory=dict, description="Processing options") |
|
|
|
class ShopifyWebhook(BaseModel): |
|
data: List = Field(..., description="Shopify webhook data") |
|
|
|
class HealthResponse(BaseModel): |
|
status: str |
|
timestamp: float |
|
device: str |
|
models_loaded: bool |
|
gpu_available: bool = False |
|
system_info: Dict |
|
|
|
|
|
|
|
|
|
@asynccontextmanager |
|
async def lifespan(app: FastAPI): |
|
setup_logging() |
|
logging.info(f"{EMOJI_MAP['INFO']} Starting {API_TITLE} v{API_VERSION}") |
|
|
|
|
|
app.state.quota_tracker = { |
|
"requests": [], |
|
"last_quota_error": None, |
|
"total_gpu_seconds_used": 0, |
|
"quota_recovery": { |
|
"base_quota": 300, |
|
"refill_rate": 30, |
|
"half_life": 7200, |
|
"last_recovery_check": time.time() |
|
}, |
|
"rate_limiting": { |
|
"requests_by_ip": {}, |
|
"last_quota_error_time": 0, |
|
"quota_cooldown_duration": 1800, |
|
"min_request_interval": 30, |
|
"infrastructure_quota_state": "available", |
|
"infrastructure_quota_reset_time": 0, |
|
"global_quota_exhausted_at": 0, |
|
"global_consecutive_errors": 0 |
|
} |
|
} |
|
|
|
check_hardware_environment() |
|
|
|
|
|
try: |
|
ensure_models_loaded() |
|
if os.getenv("SPACE_ID"): |
|
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU environment - models will be loaded on first request") |
|
else: |
|
if MODELS_LOADED: |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Models loaded successfully") |
|
else: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Models not fully loaded") |
|
|
|
|
|
if os.getenv("SPACE_ID"): |
|
try: |
|
init_gpu() |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} GPU initialization completed") |
|
except Exception as e: |
|
error_msg = str(e) |
|
if "GPU task aborted" in error_msg: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU initialization aborted (Zero GPU not ready yet) - this is normal during startup") |
|
logging.info("GPU will be initialized on first request") |
|
else: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU initialization failed: {error_msg}") |
|
|
|
except Exception as e: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Failed to load models: {str(e)}") |
|
|
|
|
|
|
|
skip_startup_test = os.getenv("SKIP_STARTUP_TEST", "false").lower() == "true" |
|
if RUN_TESTS and os.environ.get("IN_PYTEST") != "true" and not skip_startup_test: |
|
logging.info(f"{EMOJI_MAP['INFO']} Running tests at startup...") |
|
|
|
|
|
def run_endpoint_test(): |
|
logging.info(f"{EMOJI_MAP['INFO']} Starting endpoint test with RUN_TESTS={RUN_TESTS}") |
|
|
|
|
|
max_retries = 5 |
|
retry_delay = 60 |
|
initial_delay = 45 |
|
|
|
|
|
payload = { |
|
"data": [ |
|
[{"url": "https://cdn.shopify.com/s/files/1/0505/0928/3527/files/hugging_face_test_image_shirt_product_type.jpg"}], |
|
"Shirt" |
|
] |
|
} |
|
|
|
|
|
if os.getenv("SPACE_ID"): |
|
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU environment detected - waiting {initial_delay}s for GPU to warm up...") |
|
time.sleep(initial_delay) |
|
logging.info(f"{EMOJI_MAP['INFO']} Running full processing test with enhanced retry logic (max {max_retries} attempts)") |
|
|
|
for retry in range(max_retries): |
|
try: |
|
logging.info(f"{EMOJI_MAP['INFO']} Testing /api/rb_and_crop endpoint (attempt {retry + 1}/{max_retries})...") |
|
|
|
response = requests.post( |
|
"http://localhost:7860/api/rb_and_crop", |
|
json=payload, |
|
timeout=180 |
|
) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
if "processed_images" in data and data["processed_images"]: |
|
img = data["processed_images"][0] |
|
img_status = img.get('status') |
|
|
|
if img_status == STATUS_PROCESSED: |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Test passed! Image status: {img_status}") |
|
if img.get('base64_image'): |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Image processed and base64 encoded successfully") |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Full image processing test completed successfully") |
|
break |
|
elif img_status == STATUS_ERROR: |
|
error_detail = img.get('error', 'Unknown error') |
|
if "GPU task aborted" in error_detail or "GPU resources temporarily unavailable" in error_detail: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU task aborted during processing (attempt {retry + 1}/{max_retries})") |
|
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU is warming up - this is expected during startup") |
|
if retry < max_retries - 1: |
|
logging.info(f"{EMOJI_MAP['INFO']} Waiting {retry_delay}s for GPU to stabilize...") |
|
time.sleep(retry_delay) |
|
continue |
|
else: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Processing error: {error_detail}") |
|
else: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Unexpected image status: {img_status}") |
|
else: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Test returned no images") |
|
elif response.status_code == 503: |
|
|
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU resources unavailable (503), will retry...") |
|
if retry < max_retries - 1: |
|
logging.info(f"{EMOJI_MAP['INFO']} Waiting {retry_delay}s for GPU to become available...") |
|
time.sleep(retry_delay) |
|
continue |
|
elif response.status_code == 500: |
|
|
|
try: |
|
error_data = response.json() |
|
error_detail = error_data.get('error', '') |
|
if "GPU task aborted" in error_detail or "GPU resources temporarily unavailable" in error_detail: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU task aborted (500): {error_detail}") |
|
if retry < max_retries - 1: |
|
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU is still warming up. Waiting {retry_delay}s before retry...") |
|
time.sleep(retry_delay) |
|
continue |
|
else: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Server error (500): {error_detail}") |
|
except: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Test failed with status 500: {response.text[:200]}") |
|
else: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Test failed with status {response.status_code}") |
|
if response.text: |
|
try: |
|
error_data = response.json() |
|
logging.error(f"Error details: {error_data.get('error', 'Unknown error')}") |
|
except: |
|
logging.error(f"Response: {response.text[:200]}") |
|
|
|
except requests.exceptions.Timeout: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Request timeout on attempt {retry + 1} - GPU might be initializing") |
|
if retry < max_retries - 1: |
|
logging.info(f"{EMOJI_MAP['INFO']} Waiting {retry_delay}s before retry...") |
|
time.sleep(retry_delay) |
|
continue |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
if "GPU task aborted" in error_msg or "503" in error_msg or "Connection refused" in error_msg: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Connection/GPU error on attempt {retry + 1}: {error_msg}") |
|
if retry < max_retries - 1: |
|
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU warming up. Waiting {retry_delay}s before retry...") |
|
time.sleep(retry_delay) |
|
continue |
|
else: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Test error: {error_msg}") |
|
if retry < max_retries - 1: |
|
logging.info(f"{EMOJI_MAP['INFO']} Will retry after {retry_delay}s...") |
|
time.sleep(retry_delay) |
|
continue |
|
|
|
|
|
|
|
if retry == max_retries - 1: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Full test failed after {max_retries} attempts") |
|
logging.info(f"{EMOJI_MAP['INFO']} This is normal for Zero GPU during startup - the GPU needs time to warm up") |
|
|
|
try: |
|
response = requests.get("http://localhost:7860/health", timeout=10) |
|
if response.status_code == 200: |
|
data = response.json() |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Health check passed - service is running and ready") |
|
logging.info(f"Device: {data.get('device')}, Models loaded: {data.get('models_loaded')}") |
|
logging.info(f"{EMOJI_MAP['INFO']} The GPU will be fully initialized on the first real request") |
|
else: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Health check returned status {response.status_code}") |
|
except Exception as e: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Health check failed: {str(e)}") |
|
|
|
logging.info(f"{EMOJI_MAP['INFO']} Service is available and will handle requests normally once GPU warms up") |
|
|
|
else: |
|
|
|
time.sleep(10) |
|
try: |
|
logging.info(f"{EMOJI_MAP['INFO']} Testing /api/rb_and_crop endpoint...") |
|
|
|
|
|
response = requests.post( |
|
"http://localhost:7860/api/rb_and_crop", |
|
json=payload, |
|
timeout=120 |
|
) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
if "processed_images" in data and data["processed_images"]: |
|
img = data["processed_images"][0] |
|
img_status = img.get('status') |
|
|
|
if img_status == STATUS_PROCESSED: |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Test passed! Image status: {img_status}") |
|
if img.get('base64_image'): |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Image processed and base64 encoded successfully") |
|
elif img_status == STATUS_ERROR: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Processing error: {img.get('error', 'Unknown error')}") |
|
else: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Unexpected image status: {img_status}") |
|
else: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Test returned no images") |
|
else: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Test failed with status {response.status_code}") |
|
if response.text: |
|
try: |
|
error_data = response.json() |
|
logging.error(f"Error details: {error_data.get('error', 'Unknown error')}") |
|
except: |
|
logging.error(f"Response: {response.text[:200]}") |
|
|
|
except Exception as e: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Test error: {str(e)}") |
|
|
|
|
|
import threading |
|
test_thread = threading.Thread(target=run_endpoint_test, daemon=True) |
|
test_thread.start() |
|
|
|
yield |
|
|
|
logging.info(f"{EMOJI_MAP['INFO']} API shutdown initiated") |
|
cleanup_memory() |
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
title=API_TITLE, |
|
version=API_VERSION, |
|
description=API_DESCRIPTION, |
|
docs_url="/api/docs", |
|
redoc_url="/api/redoc", |
|
lifespan=lifespan |
|
) |
|
|
|
|
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
|
|
|
|
@spaces.GPU(duration=GPU_DURATION_LONG) |
|
def init_gpu(): |
|
"""Initialize GPU for Spaces environment""" |
|
try: |
|
logging.info(f"{EMOJI_MAP['INFO']} Initializing GPU...") |
|
|
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
try: |
|
torch.cuda.ipc_collect() |
|
except Exception as e: |
|
logging.warning(f"IPC collect failed, continuing anyway: {e}") |
|
|
|
|
|
test_tensor = torch.tensor([1.0]).cuda() |
|
del test_tensor |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} GPU is available and working") |
|
else: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} CUDA not available in GPU context") |
|
|
|
return True |
|
except Exception as e: |
|
error_msg = str(e) |
|
if "GPU task aborted" in error_msg: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU initialization aborted - Zero GPU not ready") |
|
else: |
|
logging.error(f"{EMOJI_MAP['ERROR']} GPU initialization error: {error_msg}") |
|
raise |
|
|
|
|
|
|
|
|
|
def parse_retry_time_from_error(error_message: str) -> Optional[int]: |
|
"""Parse retry time from GPU quota error messages""" |
|
patterns = [ |
|
r"retry in (\d+):(\d+):(\d+)", |
|
r"Please retry in (\d+):(\d+):(\d+)", |
|
r"retry after (\d+)s", |
|
r"retry_after: (\d+)s", |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, error_message) |
|
if match: |
|
if len(match.groups()) == 3: |
|
|
|
hours, minutes, seconds = map(int, match.groups()) |
|
return hours * 3600 + minutes * 60 + seconds |
|
else: |
|
|
|
return int(match.group(1)) |
|
|
|
|
|
if "quota" in error_message.lower() or "limit" in error_message.lower(): |
|
return 900 |
|
|
|
return None |
|
|
|
|
|
def calculate_quota_recovery(tracker: Dict, current_time: float) -> Dict: |
|
"""Calculate current quota recovery status based on HF ZeroGPU mechanics""" |
|
recovery_info = tracker.get("quota_recovery", {}) |
|
|
|
|
|
|
|
base_quota = recovery_info.get("base_quota", 300) |
|
refill_rate = 30 |
|
half_life = 7200 |
|
max_quota = 600 |
|
|
|
|
|
cutoff_time = current_time - (half_life * 4) |
|
recent_requests = [r for r in tracker.get("requests", []) if r["timestamp"] > cutoff_time] |
|
|
|
|
|
effective_usage = 0 |
|
for req in recent_requests: |
|
if req.get("success", False): |
|
age = current_time - req["timestamp"] |
|
|
|
|
|
decay_factor = 2 ** (-age / half_life) |
|
effective_usage += req.get("duration", 0) * decay_factor |
|
|
|
|
|
oldest_request_time = min([r["timestamp"] for r in recent_requests], default=current_time) |
|
time_since_oldest = current_time - oldest_request_time |
|
|
|
|
|
recovered_quota = time_since_oldest / refill_rate |
|
|
|
|
|
|
|
available_from_decay = base_quota - effective_usage |
|
available_with_recovery = available_from_decay + recovered_quota |
|
|
|
|
|
estimated_available = min(max(0, available_with_recovery), max_quota) |
|
|
|
|
|
deficit = max(0, effective_usage - recovered_quota) |
|
time_to_60s = max(0, (60 - estimated_available) * refill_rate) if estimated_available < 60 else 0 |
|
time_to_full = max(0, deficit * refill_rate) |
|
|
|
return { |
|
"estimated_available_quota": estimated_available, |
|
"effective_usage": effective_usage, |
|
"recovered_quota": recovered_quota, |
|
"time_to_60s_quota": time_to_60s, |
|
"time_to_full_recovery": time_to_full, |
|
"refill_rate_info": "1 GPU second per 30 real seconds", |
|
"half_life_hours": half_life / 3600, |
|
"max_quota_cap": max_quota, |
|
"recent_requests_count": len(recent_requests), |
|
"quota_formula": "min(base_quota - decayed_usage + recovered_quota, 600)" |
|
} |
|
|
|
|
|
def calculate_retry_after_from_quota(error_message: str) -> int: |
|
"""Calculate appropriate retry-after time based on quota information""" |
|
|
|
pattern = r"(\d+)s left vs\. (\d+)s requested" |
|
match = re.search(pattern, error_message) |
|
|
|
if match: |
|
left = int(match.group(1)) |
|
requested = int(match.group(2)) |
|
deficit = requested - left |
|
|
|
|
|
|
|
retry_seconds = (deficit * 30) + 60 |
|
|
|
|
|
return min(retry_seconds, 3600) |
|
|
|
|
|
parsed_time = parse_retry_time_from_error(error_message) |
|
return parsed_time if parsed_time else 900 |
|
|
|
|
|
def estimate_quota_needed(num_images: int, product_type: str = "General") -> float: |
|
|
|
base_time_per_image = 6.0 |
|
|
|
|
|
complexity_multipliers = { |
|
"General": 1.0, |
|
"Shirt": 1.1, |
|
"Dress": 1.2, |
|
"Jacket": 1.3, |
|
"Shoes": 1.1, |
|
"Accessories": 0.9 |
|
} |
|
|
|
multiplier = complexity_multipliers.get(product_type, 1.0) |
|
estimated_time = num_images * base_time_per_image * multiplier |
|
|
|
|
|
if num_images <= 3: |
|
safety_factor = 1.1 |
|
elif num_images <= 10: |
|
safety_factor = 1.2 |
|
else: |
|
safety_factor = 1.3 |
|
|
|
return estimated_time * safety_factor |
|
|
|
|
|
def calculate_processing_quota_limit(available_quota: float, safety_margin: float = 0.9) -> Dict: |
|
"""Calculate how many images can be processed with current quota""" |
|
|
|
usable_quota = available_quota * safety_margin |
|
|
|
|
|
avg_time_per_image = 18 |
|
|
|
max_images = int(usable_quota / avg_time_per_image) |
|
|
|
return { |
|
"available_quota": available_quota, |
|
"usable_quota": usable_quota, |
|
"safety_margin": safety_margin, |
|
"avg_time_per_image": avg_time_per_image, |
|
"max_processable_images": max_images, |
|
"estimated_usage": max_images * avg_time_per_image, |
|
"quota_after_processing": available_quota - (max_images * avg_time_per_image) |
|
} |
|
|
|
|
|
def should_trigger_quota_recovery(available_quota: float, requested_images: int, product_type: str = "General") -> Dict: |
|
"""Determine if processing should be stopped to trigger quota recovery""" |
|
estimated_needed = estimate_quota_needed(requested_images, product_type) |
|
|
|
|
|
quota_threshold = available_quota * 0.9 |
|
|
|
should_wait = estimated_needed > quota_threshold |
|
|
|
if should_wait: |
|
|
|
full_recovery_wait = 9000 |
|
|
|
return { |
|
"should_wait": True, |
|
"reason": "quota_conservation", |
|
"available_quota": available_quota, |
|
"estimated_needed": estimated_needed, |
|
"quota_threshold": quota_threshold, |
|
"recovery_wait_seconds": full_recovery_wait, |
|
"recovery_wait_hours": full_recovery_wait / 3600, |
|
"requested_images": requested_images, |
|
"message": f"Processing {requested_images} images needs {estimated_needed}s but only {available_quota:.0f}s available. Triggering 2.5h recovery wait." |
|
} |
|
|
|
return { |
|
"should_wait": False, |
|
"available_quota": available_quota, |
|
"estimated_needed": estimated_needed, |
|
"quota_threshold": quota_threshold, |
|
"requested_images": requested_images, |
|
"safe_to_process": True |
|
} |
|
|
|
|
|
def validate_image_count_for_quota(available_quota: float, image_count: int, product_type: str = "General") -> Dict: |
|
estimated_needed = estimate_quota_needed(image_count, product_type) |
|
|
|
|
|
if available_quota < 30: |
|
|
|
safety_buffer = 15 |
|
max_usage_percent = 0.6 |
|
elif available_quota < 100: |
|
|
|
safety_buffer = 20 |
|
max_usage_percent = 0.75 |
|
else: |
|
|
|
safety_buffer = 30 |
|
max_usage_percent = 0.85 |
|
|
|
usable_quota = max(0, available_quota - safety_buffer) |
|
max_safe_images = int(usable_quota / 8) |
|
|
|
|
|
if (estimated_needed > usable_quota or |
|
estimated_needed > (available_quota * max_usage_percent) or |
|
available_quota < 25): |
|
|
|
return { |
|
"valid": False, |
|
"reason": "quota_insufficient", |
|
"image_count": image_count, |
|
"available_quota": available_quota, |
|
"estimated_needed": estimated_needed, |
|
"safety_buffer": safety_buffer, |
|
"max_usage_percent": int(max_usage_percent * 100), |
|
"max_safe_images": max_safe_images, |
|
"recommended_action": "wait_for_quota_recovery", |
|
"message": f"Smart quota protection: {image_count} images need {estimated_needed:.1f}s but only {available_quota:.1f}s available. Max safe: {max_safe_images} images." |
|
} |
|
|
|
return { |
|
"valid": True, |
|
"image_count": image_count, |
|
"available_quota": available_quota, |
|
"estimated_needed": estimated_needed, |
|
"quota_after_processing": available_quota - estimated_needed, |
|
"efficiency": f"{(estimated_needed/available_quota)*100:.1f}%", |
|
"safe_to_process": True |
|
} |
|
|
|
|
|
def check_rate_limiting(client_ip: str, request_id: str = None) -> Dict: |
|
"""Check if request should be rate limited""" |
|
current_time = time.time() |
|
|
|
if not hasattr(app, "state") or not hasattr(app.state, "quota_tracker"): |
|
return {"allowed": True, "reason": "no_tracking"} |
|
|
|
rate_limits = app.state.quota_tracker.get("rate_limiting", {}) |
|
|
|
|
|
requests_by_ip = rate_limits.get("requests_by_ip", {}) |
|
|
|
if client_ip in requests_by_ip: |
|
ip_data = requests_by_ip[client_ip] |
|
|
|
|
|
|
|
usage_history = ip_data.get("usage_history", []) |
|
|
|
|
|
effective_used = 0 |
|
half_life = 7200 |
|
|
|
for usage in usage_history: |
|
age = current_time - usage["timestamp"] |
|
if age < 28800: |
|
decay_factor = 2 ** (-age / half_life) |
|
effective_used += usage["gpu_seconds"] * decay_factor |
|
|
|
|
|
last_gpu_usage = ip_data.get("last_gpu_usage", current_time) |
|
time_since_last = current_time - last_gpu_usage |
|
recovered = time_since_last / 30 |
|
|
|
|
|
base_quota = 300 |
|
max_quota = 600 |
|
|
|
available_quota = min(max(0, base_quota - effective_used + recovered), max_quota) |
|
|
|
|
|
ip_data["effective_gpu_seconds_used"] = effective_used |
|
ip_data["available_quota"] = available_quota |
|
|
|
|
|
logging.info(f"{EMOJI_MAP['INFO']} Quota calculation for IP {client_ip}: effective_used={effective_used:.1f}, recovered={recovered:.1f}, available={available_quota:.1f}") |
|
|
|
|
|
quota_recovery_until = ip_data.get("quota_recovery_until", 0) |
|
if current_time < quota_recovery_until: |
|
|
|
if available_quota >= 60: |
|
ip_data["quota_recovery_until"] = 0 |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Quota recovery completed for IP {client_ip}. Available: {available_quota:.1f} GPU seconds") |
|
else: |
|
time_remaining = int(quota_recovery_until - current_time) |
|
return { |
|
"allowed": False, |
|
"reason": "quota_cooldown", |
|
"wait_time_seconds": time_remaining, |
|
"message": f"GPU quota exhausted. Available: {available_quota:.0f}s, Need: 60s. Recovery in {time_remaining}s ({time_remaining//60} minutes).", |
|
"quota_exceeded": True, |
|
"available_quota": available_quota, |
|
"cooldown_until": datetime.fromtimestamp(quota_recovery_until).isoformat() |
|
} |
|
elif quota_recovery_until > 0 and current_time >= quota_recovery_until: |
|
|
|
if available_quota < 60: |
|
|
|
consecutive_errors = ip_data.get("consecutive_quota_errors", 0) |
|
if consecutive_errors >= 2: |
|
|
|
new_recovery_time = 5400 |
|
ip_data["quota_recovery_until"] = current_time + new_recovery_time |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Infrastructure quota appears exhausted for IP {client_ip}. Extended recovery: {new_recovery_time//60} minutes") |
|
return { |
|
"allowed": False, |
|
"reason": "infrastructure_exhausted", |
|
"wait_time_seconds": new_recovery_time, |
|
"message": f"Infrastructure quota exhausted. Extended recovery needed: {new_recovery_time//60} minutes.", |
|
"quota_exceeded": True, |
|
"infrastructure_issue": True |
|
} |
|
else: |
|
|
|
ip_data["quota_recovery_until"] = 0 |
|
logging.info(f"{EMOJI_MAP['INFO']} Quota recovery period ended for IP {client_ip}. Available: {available_quota:.1f} GPU seconds") |
|
|
|
|
|
last_request_time = ip_data.get("last_request", 0) |
|
min_interval = rate_limits.get("min_request_interval", 30) |
|
|
|
if current_time - last_request_time < min_interval: |
|
wait_time = int(min_interval - (current_time - last_request_time)) |
|
return { |
|
"allowed": False, |
|
"reason": "rate_limited", |
|
"wait_time_seconds": wait_time, |
|
"message": f"Rate limited. Please wait {wait_time} seconds between requests." |
|
} |
|
|
|
|
|
return {"allowed": True, "available_quota": available_quota} |
|
|
|
|
|
if request_id and client_ip in requests_by_ip: |
|
recent_requests = requests_by_ip.get(client_ip, {}).get("recent_request_ids", []) |
|
|
|
recent_requests = [(rid, t) for rid, t in recent_requests if current_time - t < 10] |
|
|
|
|
|
for rid, req_time in recent_requests: |
|
if rid == request_id and current_time - req_time < 5: |
|
return { |
|
"allowed": False, |
|
"reason": "duplicate_request", |
|
"wait_time_seconds": 5, |
|
"message": "Duplicate request detected. Please wait before retrying." |
|
} |
|
|
|
return {"allowed": True, "available_quota": 300} |
|
|
|
|
|
def update_rate_limiting(client_ip: str, request_id: str = None, quota_error: bool = False, gpu_seconds_used: float = None, retry_after_override: int = None): |
|
"""Update rate limiting state with per-IP quota tracking""" |
|
current_time = time.time() |
|
|
|
if not hasattr(app, "state") or not hasattr(app.state, "quota_tracker"): |
|
return |
|
|
|
rate_limits = app.state.quota_tracker.get("rate_limiting", {}) |
|
requests_by_ip = rate_limits.get("requests_by_ip", {}) |
|
|
|
|
|
if client_ip not in requests_by_ip: |
|
requests_by_ip[client_ip] = { |
|
"first_request": current_time, |
|
"last_request": current_time, |
|
"request_count": 1, |
|
"recent_request_ids": [], |
|
"usage_history": [], |
|
"last_gpu_usage": current_time, |
|
"consecutive_quota_errors": 0 |
|
} |
|
else: |
|
requests_by_ip[client_ip]["last_request"] = current_time |
|
requests_by_ip[client_ip]["request_count"] += 1 |
|
|
|
|
|
if gpu_seconds_used: |
|
|
|
usage_history = requests_by_ip[client_ip].get("usage_history", []) |
|
usage_history.append({ |
|
"timestamp": current_time, |
|
"gpu_seconds": gpu_seconds_used |
|
}) |
|
|
|
|
|
cutoff_time = current_time - 28800 |
|
usage_history = [u for u in usage_history if u["timestamp"] > cutoff_time] |
|
|
|
requests_by_ip[client_ip]["usage_history"] = usage_history |
|
requests_by_ip[client_ip]["last_gpu_usage"] = current_time |
|
|
|
|
|
if quota_error: |
|
|
|
usage_history = requests_by_ip[client_ip].get("usage_history", []) |
|
effective_used = 0 |
|
half_life = 7200 |
|
|
|
for usage in usage_history: |
|
age = current_time - usage["timestamp"] |
|
if age < 28800: |
|
decay_factor = 2 ** (-age / half_life) |
|
effective_used += usage["gpu_seconds"] * decay_factor |
|
|
|
|
|
last_gpu_usage = requests_by_ip[client_ip].get("last_gpu_usage", current_time) |
|
time_since_last = current_time - last_gpu_usage |
|
recovered = time_since_last / 30 |
|
|
|
base_quota = 300 |
|
max_quota = 600 |
|
available = min(max(0, base_quota - effective_used + recovered), max_quota) |
|
|
|
|
|
target_quota = 60 |
|
|
|
if retry_after_override: |
|
recovery_time = retry_after_override |
|
elif available < target_quota: |
|
|
|
deficit = target_quota - available |
|
recovery_time = int(deficit * 30) |
|
recovery_time = max(recovery_time, 1800) |
|
else: |
|
recovery_time = 1800 |
|
|
|
quota_recovery_until = current_time + recovery_time |
|
requests_by_ip[client_ip]["quota_recovery_until"] = quota_recovery_until |
|
|
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU quota exceeded for IP {client_ip}. Recovery time: {recovery_time}s ({recovery_time//60} minutes)") |
|
logging.info(f"Effective usage: {effective_used:.1f}s, Available: {available:.1f}s, Target: {target_quota}s") |
|
logging.info(f"Recovery until: {datetime.fromtimestamp(quota_recovery_until).isoformat()}") |
|
|
|
|
|
if request_id: |
|
recent_ids = requests_by_ip[client_ip].get("recent_request_ids", []) |
|
recent_ids.append((request_id, current_time)) |
|
|
|
requests_by_ip[client_ip]["recent_request_ids"] = recent_ids[-10:] |
|
|
|
|
|
cutoff_time = current_time - 7200 |
|
requests_by_ip = { |
|
ip: data for ip, data in requests_by_ip.items() |
|
if data["last_request"] > cutoff_time |
|
} |
|
|
|
|
|
rate_limits["requests_by_ip"] = requests_by_ip |
|
app.state.quota_tracker["rate_limiting"] = rate_limits |
|
|
|
|
|
def get_client_ip(request: Request) -> str: |
|
"""Extract client IP from request""" |
|
|
|
forwarded_for = request.headers.get("x-forwarded-for") |
|
if forwarded_for: |
|
|
|
return forwarded_for.split(",")[0].strip() |
|
|
|
|
|
real_ip = request.headers.get("x-real-ip") |
|
if real_ip: |
|
return real_ip |
|
|
|
|
|
return request.client.host if request.client else "unknown" |
|
|
|
|
|
def generate_request_id(urls: Union[str, List[str]], product_type: str) -> str: |
|
"""Generate a unique request ID for deduplication""" |
|
import hashlib |
|
|
|
if isinstance(urls, str): |
|
url_list = [url.strip() for url in urls.split(",") if url.strip()] |
|
else: |
|
url_list = urls |
|
|
|
|
|
content = "|".join(sorted(url_list)) + "|" + product_type |
|
return hashlib.md5(content.encode()).hexdigest()[:12] |
|
|
|
|
|
def check_quota_availability(urls: Union[str, List[str]], product_type: str) -> Dict: |
|
"""Check if current quota is sufficient for the request""" |
|
if isinstance(urls, str): |
|
url_list = [url.strip() for url in urls.split(",") if url.strip()] |
|
else: |
|
url_list = urls |
|
|
|
num_images = len(url_list) |
|
estimated_needed = estimate_quota_needed(num_images, product_type) |
|
|
|
current_time = time.time() |
|
result = { |
|
"num_images": num_images, |
|
"estimated_gpu_seconds_needed": estimated_needed, |
|
"quota_sufficient": True, |
|
"recommended_action": "proceed", |
|
"wait_time_seconds": 0 |
|
} |
|
|
|
|
|
if hasattr(app, "state") and hasattr(app.state, "quota_tracker"): |
|
recovery_info = calculate_quota_recovery(app.state.quota_tracker, current_time) |
|
available = recovery_info["estimated_available_quota"] |
|
|
|
result.update({ |
|
"estimated_available_quota": available, |
|
"quota_recovery_info": recovery_info |
|
}) |
|
|
|
if estimated_needed > available: |
|
result.update({ |
|
"quota_sufficient": False, |
|
"recommended_action": "wait", |
|
"wait_time_seconds": int((estimated_needed - available) * 30), |
|
"shortage_gpu_seconds": estimated_needed - available |
|
}) |
|
|
|
return result |
|
|
|
|
|
def _process_images_impl(urls: Union[str, List[str]], product_type: str) -> Dict: |
|
start_time = time.time() |
|
|
|
if isinstance(urls, str): |
|
url_list = [url.strip() for url in urls.split(",") if url.strip()] |
|
else: |
|
url_list = urls |
|
|
|
if not url_list: |
|
raise HTTPException(status_code=HTTP_BAD_REQUEST, detail=ERROR_NO_VALID_URLS) |
|
|
|
|
|
from src.processing.bounding_box.bounding_box import build_keywords |
|
|
|
|
|
keywords = build_keywords(product_type) |
|
|
|
contexts = [ProcessingContext(url=url, product_type=product_type, keywords=keywords) for url in url_list] |
|
batch_logs = [] |
|
|
|
try: |
|
ensure_models_loaded() |
|
|
|
run_functions_in_sequence(contexts, PIPELINE_STEPS) |
|
|
|
processed_images = [] |
|
for ctx in contexts: |
|
if hasattr(ctx, 'error') and ctx.error: |
|
processed_images.append({ |
|
"url": ctx.url, |
|
"status": STATUS_ERROR, |
|
"error": str(ctx.error) |
|
}) |
|
elif hasattr(ctx, 'skip_processing') and ctx.skip_processing: |
|
|
|
error_msg = "Processing skipped" |
|
if hasattr(ctx, 'processing_error'): |
|
error_msg = str(ctx.processing_error) |
|
processed_images.append({ |
|
"url": ctx.url, |
|
"status": STATUS_ERROR, |
|
"error": error_msg |
|
}) |
|
elif hasattr(ctx, 'result_image') and ctx.result_image: |
|
processed_images.append({ |
|
"url": ctx.url, |
|
"status": STATUS_PROCESSED, |
|
"base64_image": ctx.result_image, |
|
"metadata": ctx.metadata, |
|
"processing_logs": ctx.processing_logs |
|
}) |
|
else: |
|
processed_images.append({ |
|
"url": ctx.url, |
|
"status": STATUS_NOT_PROCESSED |
|
}) |
|
|
|
total_time = time.time() - start_time |
|
|
|
return { |
|
"status": "success", |
|
"processed_images": processed_images, |
|
"total_time": total_time, |
|
"batch_logs": batch_logs, |
|
"system_info": get_system_info() |
|
} |
|
|
|
except Exception as e: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Processing failed: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@spaces.GPU(duration=GPU_DURATION_LONG) |
|
def process_images_gpu(urls: Union[str, List[str]], product_type: str) -> Dict: |
|
"""GPU-accelerated image processing for Spaces""" |
|
gpu_start_time = time.time() |
|
try: |
|
|
|
if not MODELS_LOADED: |
|
logging.info(f"{EMOJI_MAP['INFO']} Loading models in GPU context...") |
|
from src.models.model_loader import load_models |
|
try: |
|
load_models() |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Models loaded in GPU context") |
|
except Exception as e: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Failed to load models in GPU context: {str(e)}") |
|
|
|
|
|
|
|
logging.info(f"{EMOJI_MAP['INFO']} Moving models to GPU...") |
|
from src.models.model_loader import move_models_to_gpu |
|
try: |
|
move_models_to_gpu() |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Models moved to GPU") |
|
except Exception as e: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Failed to move some models to GPU: {str(e)}") |
|
|
|
|
|
result = _process_images_impl(urls, product_type) |
|
|
|
|
|
gpu_duration = time.time() - gpu_start_time |
|
if hasattr(app, "state") and hasattr(app.state, "quota_tracker"): |
|
app.state.quota_tracker["total_gpu_seconds_used"] += gpu_duration |
|
app.state.quota_tracker["requests"].append({ |
|
"timestamp": gpu_start_time, |
|
"duration": gpu_duration, |
|
"success": True |
|
}) |
|
|
|
app.state.quota_tracker["requests"] = app.state.quota_tracker["requests"][-100:] |
|
|
|
|
|
recovery_info = calculate_quota_recovery(app.state.quota_tracker, time.time()) |
|
available_quota = recovery_info.get("estimated_available_quota", 0) |
|
|
|
logging.info(f"{EMOJI_MAP['INFO']} GPU processing completed in {gpu_duration:.2f}s") |
|
logging.info(f"{EMOJI_MAP['INFO']} Estimated remaining quota: {available_quota:.1f}s (after using {gpu_duration:.1f}s)") |
|
|
|
|
|
result["_gpu_seconds_used"] = gpu_duration |
|
|
|
return result |
|
except Exception as e: |
|
error_msg = str(e) |
|
|
|
|
|
gpu_duration = time.time() - gpu_start_time |
|
if hasattr(app, "state") and hasattr(app.state, "quota_tracker"): |
|
app.state.quota_tracker["requests"].append({ |
|
"timestamp": gpu_start_time, |
|
"duration": gpu_duration, |
|
"success": False, |
|
"error": error_msg[:100] |
|
}) |
|
|
|
app.state.quota_tracker["requests"] = app.state.quota_tracker["requests"][-100:] |
|
|
|
if "GPU task aborted" in error_msg: |
|
logging.error(f"{EMOJI_MAP['ERROR']} GPU task was aborted - Zero GPU might be overloaded or warming up") |
|
logging.info(f"{EMOJI_MAP['INFO']} This often happens during startup - the GPU will be ready soon") |
|
raise HTTPException( |
|
status_code=503, |
|
detail="GPU resources temporarily unavailable. Zero GPU is warming up. Please try again in 30-60 seconds." |
|
) |
|
else: |
|
raise |
|
|
|
|
|
def process_images_with_rate_limiting(urls: Union[str, List[str]], product_type: str, client_ip: str, request_id: str = None) -> Dict: |
|
"""Process images with rate limiting and quota management""" |
|
|
|
|
|
if isinstance(urls, str): |
|
url_list = [url.strip() for url in urls.split(",") if url.strip()] |
|
else: |
|
url_list = urls |
|
|
|
num_images = len(url_list) |
|
|
|
|
|
rate_check = check_rate_limiting(client_ip, request_id) |
|
if not rate_check.get("allowed", True): |
|
wait_time = rate_check.get("wait_time_seconds", 30) |
|
message = rate_check.get("message", "Rate limited") |
|
|
|
|
|
error_detail = { |
|
"error": message, |
|
"retry_after": wait_time, |
|
"quota_exceeded": rate_check.get("quota_exceeded", False), |
|
"cooldown_until": rate_check.get("cooldown_until", "") |
|
} |
|
|
|
|
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": json.dumps(error_detail) |
|
}, |
|
headers={"Retry-After": str(wait_time)} |
|
) |
|
|
|
|
|
available_quota = rate_check.get("available_quota", 300) |
|
|
|
|
|
validation = validate_image_count_for_quota(available_quota, num_images, product_type) |
|
|
|
if not validation.get("valid", True): |
|
|
|
recovery_wait = 9000 |
|
|
|
logging.warning(f"{EMOJI_MAP['WARNING']} Image count validation failed for IP {client_ip}: {validation['message']}") |
|
|
|
|
|
update_rate_limiting(client_ip, request_id, quota_error=True, retry_after_override=recovery_wait) |
|
|
|
|
|
error_detail = { |
|
"error": "Image count exceeds quota threshold - 2.5 hour recovery wait initiated", |
|
"retry_after": recovery_wait, |
|
"quota_conservation": True, |
|
"image_count_exceeded": True, |
|
"image_count": validation["image_count"], |
|
"available_quota": validation["available_quota"], |
|
"estimated_needed": validation["estimated_needed"], |
|
"threshold_percent": validation["threshold_percent"], |
|
"max_safe_images": validation["max_safe_images"], |
|
"recovery_hours": recovery_wait / 3600, |
|
"message": validation["message"] |
|
} |
|
|
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": json.dumps(error_detail) |
|
}, |
|
headers={"Retry-After": str(recovery_wait)} |
|
) |
|
|
|
|
|
quota_decision = should_trigger_quota_recovery(available_quota, num_images, product_type) |
|
|
|
if quota_decision.get("should_wait", False): |
|
|
|
recovery_wait = quota_decision["recovery_wait_seconds"] |
|
|
|
logging.warning(f"{EMOJI_MAP['WARNING']} Triggering quota recovery for IP {client_ip}: {quota_decision['message']}") |
|
|
|
|
|
update_rate_limiting(client_ip, request_id, quota_error=True, retry_after_override=recovery_wait) |
|
|
|
|
|
error_detail = { |
|
"error": "Quota conservation triggered - 2.5 hour recovery wait initiated", |
|
"retry_after": recovery_wait, |
|
"quota_conservation": True, |
|
"available_quota": available_quota, |
|
"estimated_needed": quota_decision["estimated_needed"], |
|
"quota_threshold": quota_decision["quota_threshold"], |
|
"recovery_hours": quota_decision["recovery_wait_hours"], |
|
"message": quota_decision["message"] |
|
} |
|
|
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": json.dumps(error_detail) |
|
}, |
|
headers={"Retry-After": str(recovery_wait)} |
|
) |
|
|
|
|
|
update_rate_limiting(client_ip, request_id) |
|
|
|
if os.getenv("SPACE_ID"): |
|
try: |
|
result = process_images_gpu(urls, product_type) |
|
|
|
|
|
gpu_seconds = result.pop("_gpu_seconds_used", 0) |
|
if gpu_seconds > 0: |
|
update_rate_limiting(client_ip, request_id, gpu_seconds_used=gpu_seconds) |
|
|
|
|
|
if hasattr(app.state, "quota_tracker"): |
|
rate_limits = app.state.quota_tracker.get("rate_limiting", {}) |
|
requests_by_ip = rate_limits.get("requests_by_ip", {}) |
|
if client_ip in requests_by_ip: |
|
requests_by_ip[client_ip]["consecutive_quota_errors"] = 0 |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Successful GPU processing - quota error counter reset for IP {client_ip}") |
|
|
|
return result |
|
except Exception as e: |
|
error_msg = str(e) |
|
error_type = type(e).__name__ |
|
|
|
|
|
if isinstance(e, gradio.exceptions.Error) or "gradio.exceptions.Error" in str(type(e)) or error_type == "Error": |
|
if "GPU limit" in error_msg or "quota exceeded" in error_msg or "ZeroGPU quota exceeded" in error_msg: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU quota exceeded: {error_msg}") |
|
|
|
|
|
retry_after = 1800 |
|
|
|
if hasattr(app.state, "quota_tracker"): |
|
rate_limits = app.state.quota_tracker.get("rate_limiting", {}) |
|
|
|
|
|
current_time = time.time() |
|
last_global_error = rate_limits.get("global_quota_exhausted_at", 0) |
|
time_since_last_global_error = current_time - last_global_error |
|
|
|
|
|
if time_since_last_global_error < 300: |
|
rate_limits["global_consecutive_errors"] += 1 |
|
else: |
|
rate_limits["global_consecutive_errors"] = 1 |
|
|
|
rate_limits["global_quota_exhausted_at"] = current_time |
|
|
|
|
|
rate_limits["infrastructure_quota_state"] = "exhausted" |
|
rate_limits["infrastructure_quota_reset_time"] = current_time + 5400 |
|
|
|
|
|
requests_by_ip = rate_limits.get("requests_by_ip", {}) |
|
if client_ip in requests_by_ip: |
|
ip_data = requests_by_ip[client_ip] |
|
|
|
|
|
consecutive_errors = ip_data.get("consecutive_quota_errors", 0) + 1 |
|
ip_data["consecutive_quota_errors"] = consecutive_errors |
|
|
|
|
|
if consecutive_errors == 1: |
|
retry_after = 1800 |
|
logging.info(f"{EMOJI_MAP['INFO']} First quota error - standard recovery time") |
|
elif consecutive_errors == 2: |
|
retry_after = 3600 |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Second consecutive quota error - extended recovery time") |
|
elif consecutive_errors == 3: |
|
retry_after = 5400 |
|
logging.error(f"{EMOJI_MAP['ERROR']} Third consecutive quota error - maximum recovery time") |
|
else: |
|
|
|
retry_after = 9000 |
|
logging.error(f"{EMOJI_MAP['ERROR']} Multiple consecutive quota errors ({consecutive_errors}) - suggesting full recovery wait") |
|
logging.info(f"{EMOJI_MAP['INFO']} Infrastructure quota appears exhausted. Recommend waiting 2.5 hours for full recovery.") |
|
else: |
|
|
|
requests_by_ip[client_ip] = {"consecutive_quota_errors": 1} |
|
|
|
|
|
update_rate_limiting(client_ip, request_id, quota_error=True, retry_after_override=retry_after) |
|
|
|
|
|
if hasattr(app, "state"): |
|
if not hasattr(app.state, "last_quota_error"): |
|
app.state.last_quota_error = {} |
|
|
|
app.state.last_quota_error = { |
|
"timestamp": time.time(), |
|
"retry_after": retry_after, |
|
"error_message": error_msg |
|
} |
|
|
|
|
|
cooldown_until = datetime.fromtimestamp(time.time() + retry_after).isoformat() |
|
error_detail = { |
|
"error": f"GPU quota exceeded. Recovery needed: {retry_after // 60} minutes", |
|
"retry_after": retry_after, |
|
"quota_exceeded": True, |
|
"cooldown_until": cooldown_until |
|
} |
|
|
|
|
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": json.dumps(error_detail) |
|
}, |
|
headers={"Retry-After": str(retry_after)} |
|
) |
|
|
|
|
|
raise |
|
else: |
|
return _process_images_impl(urls, product_type) |
|
|
|
|
|
def process_images(urls: Union[str, List[str]], product_type: str) -> Dict: |
|
"""Backward compatibility wrapper - should not be used directly""" |
|
|
|
|
|
return _process_images_impl(urls, product_type) |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def root(): |
|
return f""" |
|
<html> |
|
<head> |
|
<title>{API_TITLE}</title> |
|
</head> |
|
<body> |
|
<h1>{API_TITLE} v{API_VERSION}</h1> |
|
<p>Visit <a href="/api/docs">/api/docs</a> for API documentation</p> |
|
</body> |
|
</html> |
|
""" |
|
|
|
@app.get("/health", response_model=HealthResponse) |
|
async def health(): |
|
|
|
gpu_available = False |
|
gpu_name = None |
|
|
|
try: |
|
if torch.cuda.is_available(): |
|
gpu_available = True |
|
gpu_name = torch.cuda.get_device_name(0) |
|
except: |
|
pass |
|
|
|
system_info = get_system_info() |
|
system_info["gpu_available"] = gpu_available |
|
system_info["gpu_name"] = gpu_name |
|
system_info["space_id"] = os.getenv("SPACE_ID", None) |
|
system_info["zero_gpu"] = bool(os.getenv("SPACE_ID")) |
|
|
|
return HealthResponse( |
|
status="healthy", |
|
timestamp=time.time(), |
|
device=DEVICE, |
|
models_loaded=MODELS_LOADED, |
|
gpu_available=gpu_available, |
|
system_info=system_info |
|
) |
|
|
|
@app.post("/api/wake") |
|
async def wake_up(): |
|
"""Lightweight endpoint for waking up the space""" |
|
logging.info(f"{EMOJI_MAP['INFO']} Wake-up request received") |
|
|
|
|
|
if os.getenv("SPACE_ID"): |
|
try: |
|
|
|
init_gpu() |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} GPU initialized for wake-up") |
|
except Exception as e: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU initialization during wake-up: {str(e)}") |
|
|
|
|
|
try: |
|
ensure_models_loaded() |
|
logging.info(f"{EMOJI_MAP['SUCCESS']} Models loaded during wake-up") |
|
except Exception as e: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} Model loading during wake-up: {str(e)}") |
|
|
|
return { |
|
"status": "awake", |
|
"timestamp": time.time(), |
|
"device": DEVICE, |
|
"models_loaded": MODELS_LOADED, |
|
"message": "Service is awake and ready" |
|
} |
|
|
|
@app.get("/api/quota-info") |
|
async def quota_info(): |
|
"""Provide information about GPU quota and current recovery status""" |
|
current_time = time.time() |
|
|
|
|
|
quota_status = { |
|
"status": "info", |
|
"quota_management": "Hugging Face ZeroGPU Infrastructure", |
|
"quota_details": { |
|
"total_seconds": 300, |
|
"refill_rate": "1 GPU second per 30 real seconds", |
|
"half_life": "2 hours", |
|
"full_recovery_time": "2.5 hours (9000 seconds)" |
|
}, |
|
"recovery_suggestions": { |
|
"light_usage": { |
|
"gpu_seconds": 30, |
|
"wait_minutes": 15, |
|
"suitable_for": "1-2 images" |
|
}, |
|
"moderate_usage": { |
|
"gpu_seconds": 60, |
|
"wait_minutes": 30, |
|
"suitable_for": "2-4 images" |
|
}, |
|
"heavy_usage": { |
|
"gpu_seconds": 120, |
|
"wait_minutes": 60, |
|
"suitable_for": "4-8 images" |
|
}, |
|
"full_quota": { |
|
"gpu_seconds": 300, |
|
"wait_minutes": 150, |
|
"suitable_for": "10+ images" |
|
} |
|
}, |
|
"user_type_quotas": { |
|
"anonymous": { |
|
"quota_seconds": 180, |
|
"description": "Not logged in" |
|
}, |
|
"authenticated": { |
|
"quota_seconds": 300, |
|
"description": "Logged in users" |
|
}, |
|
"pro": { |
|
"quota_seconds": 1500, |
|
"description": "PRO subscribers (5x quota)" |
|
} |
|
}, |
|
"timestamp": current_time |
|
} |
|
|
|
|
|
if hasattr(app.state, "quota_tracker"): |
|
recovery_info = calculate_quota_recovery(app.state.quota_tracker, current_time) |
|
quota_status["current_quota_estimate"] = recovery_info |
|
|
|
|
|
app.state.quota_tracker["quota_recovery"]["last_recovery_check"] = current_time |
|
|
|
|
|
if hasattr(app.state, "last_quota_error"): |
|
last_error = app.state.last_quota_error |
|
time_since_error = current_time - last_error.get("timestamp", 0) |
|
|
|
quota_status["last_quota_error"] = { |
|
"time_ago_seconds": int(time_since_error), |
|
"retry_after": last_error.get("retry_after", 900), |
|
"estimated_recovery": max(0, last_error.get("retry_after", 900) - time_since_error) |
|
} |
|
|
|
|
|
if hasattr(app.state, "quota_tracker"): |
|
tracker = app.state.quota_tracker |
|
recent_requests = tracker.get("requests", []) |
|
|
|
|
|
if recent_requests: |
|
last_hour_requests = [r for r in recent_requests if current_time - r["timestamp"] < 3600] |
|
successful_requests = [r for r in last_hour_requests if r.get("success", False)] |
|
failed_requests = [r for r in last_hour_requests if not r.get("success", False)] |
|
|
|
quota_status["usage_stats"] = { |
|
"last_hour": { |
|
"total_requests": len(last_hour_requests), |
|
"successful": len(successful_requests), |
|
"failed": len(failed_requests), |
|
"total_gpu_seconds": sum(r.get("duration", 0) for r in successful_requests), |
|
"average_duration": sum(r.get("duration", 0) for r in successful_requests) / len(successful_requests) if successful_requests else 0 |
|
}, |
|
"total_gpu_seconds_used": tracker.get("total_gpu_seconds_used", 0) |
|
} |
|
|
|
|
|
recent_errors = [r for r in failed_requests if "quota" in r.get("error", "").lower()][-3:] |
|
if recent_errors: |
|
quota_status["recent_quota_errors"] = recent_errors |
|
|
|
return quota_status |
|
|
|
@app.post("/api/quota-check") |
|
async def quota_check(request: ImageRequest): |
|
"""Check if current quota is sufficient for the request""" |
|
try: |
|
availability = check_quota_availability(request.urls, request.product_type) |
|
return { |
|
"status": "success", |
|
"quota_check": availability, |
|
"timestamp": time.time() |
|
} |
|
except Exception as e: |
|
return JSONResponse( |
|
status_code=400, |
|
content={ |
|
"status": "error", |
|
"error": str(e), |
|
"timestamp": time.time() |
|
} |
|
) |
|
|
|
@app.post("/api/predict", response_model=ProcessingResponse) |
|
async def predict(request: ImageRequest, http_request: Request): |
|
|
|
x_ip_token = http_request.headers.get("x-ip-token") |
|
if x_ip_token: |
|
logging.info(f"{EMOJI_MAP['INFO']} Request received with X-IP-Token for quota tracking") |
|
|
|
|
|
client_ip = get_client_ip(http_request) |
|
request_id = generate_request_id(request.urls, request.product_type) |
|
|
|
|
|
logging.info(f"{EMOJI_MAP['INFO']} Processing request from {client_ip}, request_id: {request_id}") |
|
|
|
result = process_images_with_rate_limiting(request.urls, request.product_type, client_ip, request_id) |
|
|
|
return ProcessingResponse( |
|
status=result["status"], |
|
results=[ |
|
ProcessedImage( |
|
image_url=img["url"], |
|
status=img["status"], |
|
base64=img.get("base64_image", ""), |
|
format="png", |
|
type="processed", |
|
metadata=img.get("metadata", {}), |
|
error=img.get("error") |
|
) |
|
for img in result["processed_images"] |
|
], |
|
processed_count=len([img for img in result["processed_images"] if img["status"] == STATUS_PROCESSED]), |
|
total_time=result["total_time"], |
|
system_info=result["system_info"] |
|
) |
|
|
|
@app.post("/api/rb_and_crop") |
|
async def shopify_webhook(webhook: ShopifyWebhook, request: Request): |
|
|
|
client_ip = get_client_ip(request) |
|
|
|
if not webhook.data or len(webhook.data) < 2: |
|
raise HTTPException(status_code=HTTP_BAD_REQUEST, detail="Invalid webhook data") |
|
|
|
images_info = webhook.data[0] |
|
product_type = webhook.data[1] if len(webhook.data) > 1 else "General" |
|
|
|
if not isinstance(images_info, list): |
|
raise HTTPException(status_code=HTTP_BAD_REQUEST, detail="Invalid images data") |
|
|
|
urls = [] |
|
for img_dict in images_info: |
|
if isinstance(img_dict, dict) and "url" in img_dict: |
|
urls.append(img_dict["url"]) |
|
|
|
if not urls: |
|
raise HTTPException(status_code=HTTP_BAD_REQUEST, detail=ERROR_NO_VALID_URLS) |
|
|
|
|
|
num_images = len(urls) |
|
rate_check = check_rate_limiting(client_ip) |
|
|
|
if not rate_check.get("allowed", True): |
|
wait_time = rate_check.get("wait_time_seconds", 9000) |
|
message = rate_check.get("message", "Quota exceeded") |
|
|
|
error_detail = { |
|
"error": message, |
|
"retry_after": wait_time, |
|
"quota_exceeded": True, |
|
"recovery_hours": wait_time / 3600 |
|
} |
|
|
|
raise HTTPException( |
|
status_code=429, |
|
detail={"error": json.dumps(error_detail)}, |
|
headers={"Retry-After": str(wait_time)} |
|
) |
|
|
|
|
|
available_quota = rate_check.get("available_quota", 0) |
|
validation = validate_image_count_for_quota(available_quota, num_images, product_type) |
|
|
|
if not validation.get("valid", False): |
|
|
|
estimated_needed = validation.get("estimated_needed", 0) |
|
|
|
if estimated_needed <= 60: |
|
recovery_wait = 1800 |
|
elif estimated_needed <= 120: |
|
recovery_wait = 3600 |
|
else: |
|
recovery_wait = 7200 |
|
|
|
logging.warning(f"{EMOJI_MAP['WARNING']} Smart quota gate: {num_images} images need {estimated_needed:.1f}s, available: {available_quota:.1f}s") |
|
|
|
update_rate_limiting(client_ip, quota_error=True, retry_after_override=recovery_wait) |
|
|
|
error_detail = { |
|
"error": "Smart quota management: insufficient quota for safe processing", |
|
"retry_after": recovery_wait, |
|
"quota_exceeded": True, |
|
"image_count_exceeded": True, |
|
"recovery_hours": recovery_wait / 3600, |
|
"image_count": num_images, |
|
"available_quota": available_quota, |
|
"estimated_needed": estimated_needed, |
|
"max_safe_images": validation.get("max_safe_images", 0), |
|
"efficiency": validation.get("efficiency", "N/A"), |
|
"message": validation.get("message", "Quota insufficient") |
|
} |
|
|
|
raise HTTPException( |
|
status_code=429, |
|
detail={"error": json.dumps(error_detail)}, |
|
headers={"Retry-After": str(recovery_wait)} |
|
) |
|
|
|
|
|
request_id = generate_request_id(urls, product_type) |
|
|
|
|
|
x_ip_token = request.headers.get("x-ip-token") |
|
if x_ip_token: |
|
logging.info(f"{EMOJI_MAP['INFO']} Request received with X-IP-Token for quota tracking") |
|
|
|
|
|
logging.info(f"{EMOJI_MAP['INFO']} Shopify webhook from {client_ip}, request_id: {request_id}") |
|
|
|
|
|
if len(urls) == 1 and product_type == "Test" and "placeholder.com" in urls[0]: |
|
logging.info(f"{EMOJI_MAP['INFO']} Wake-up request detected, returning minimal response") |
|
return { |
|
"status": STATUS_SUCCESS, |
|
"processed_images": [{ |
|
"url": urls[0], |
|
"status": STATUS_PROCESSED, |
|
"base64_image": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg==", |
|
"color": "#ffffff", |
|
"image_type": "wake_up", |
|
"artifacts": "false" |
|
}] |
|
} |
|
|
|
result = process_images_with_rate_limiting(urls, product_type, client_ip, request_id) |
|
|
|
return { |
|
"status": result["status"], |
|
"processed_images": [ |
|
{ |
|
"url": img["url"], |
|
"status": img["status"], |
|
"base64_image": img.get("base64_image", ""), |
|
"hex_color": img.get("metadata", {}).get("hex_color"), |
|
"image_type": img.get("metadata", {}).get("final_image_type"), |
|
"artifacts": img.get("metadata", {}).get("artifacts") |
|
} |
|
for img in result["processed_images"] |
|
] |
|
} |
|
|
|
@app.post("/api/batch") |
|
async def batch_process(requests: List[ImageRequest], http_request: Request): |
|
|
|
client_ip = get_client_ip(http_request) |
|
|
|
|
|
logging.info(f"{EMOJI_MAP['INFO']} Batch request from {client_ip} with {len(requests)} items") |
|
|
|
results = [] |
|
|
|
for i, req in enumerate(requests): |
|
try: |
|
|
|
request_id = f"batch_{generate_request_id(req.urls, req.product_type)}_{i}" |
|
|
|
result = process_images_with_rate_limiting(req.urls, req.product_type, client_ip, request_id) |
|
results.append(result) |
|
except HTTPException as e: |
|
|
|
if e.status_code == 429: |
|
results.append({ |
|
"status": "rate_limited", |
|
"error": e.detail, |
|
"urls": req.urls, |
|
"retry_after": e.headers.get("Retry-After", "30") |
|
}) |
|
|
|
logging.warning(f"{EMOJI_MAP['WARNING']} Batch processing stopped due to rate limiting") |
|
break |
|
else: |
|
results.append({ |
|
"status": "error", |
|
"error": str(e.detail), |
|
"urls": req.urls |
|
}) |
|
except Exception as e: |
|
results.append({ |
|
"status": "error", |
|
"error": str(e), |
|
"urls": req.urls |
|
}) |
|
|
|
return { |
|
"status": "success", |
|
"batch_results": results, |
|
"total_requests": len(requests), |
|
"processed_requests": len(results) |
|
} |
|
|
|
|
|
|
|
|
|
@app.exception_handler(HTTPException) |
|
async def http_exception_handler(request: Request, exc: HTTPException): |
|
return JSONResponse( |
|
status_code=exc.status_code, |
|
content={ |
|
"status": "error", |
|
"error": exc.detail, |
|
"timestamp": time.time() |
|
} |
|
) |
|
|
|
@app.exception_handler(Exception) |
|
async def general_exception_handler(request: Request, exc: Exception): |
|
|
|
if isinstance(exc, HTTPException) and exc.status_code == 429: |
|
return JSONResponse( |
|
status_code=429, |
|
content=exc.detail if isinstance(exc.detail, dict) else {"error": exc.detail}, |
|
headers=exc.headers if hasattr(exc, 'headers') else {"Retry-After": "900"} |
|
) |
|
|
|
|
|
error_type = "UNKNOWN_ERROR" |
|
error_message = str(exc) |
|
error_details = {} |
|
status_code = 500 |
|
|
|
|
|
if (isinstance(exc, gradio.exceptions.Error) and ("GPU limit" in error_message or "quota exceeded" in error_message)) or \ |
|
("GPU" in error_message and ("limit" in error_message or "quota" in error_message)) or \ |
|
"ZeroGPU quota exceeded" in error_message: |
|
|
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU quota exceeded: Space app has reached its GPU limit") |
|
|
|
error_type = "GPU_LIMIT_ERROR" |
|
error_details["gpu_error"] = True |
|
|
|
|
|
retry_after = calculate_retry_after_from_quota(error_message) |
|
error_details["retry_after"] = retry_after |
|
|
|
|
|
parsed_retry = parse_retry_time_from_error(error_message) |
|
if parsed_retry: |
|
error_details["retry_after"] = parsed_retry |
|
logging.info(f"{EMOJI_MAP['INFO']} Parsed retry time from error: {parsed_retry}s") |
|
|
|
|
|
error_details["quota_info"] = { |
|
"message": "GPU quota exceeded. ZeroGPU quota refills at 1 GPU second per 30 real seconds.", |
|
"recommended_wait_times": { |
|
"minimal": 900, |
|
"moderate": 1800, |
|
"full": 5400 |
|
}, |
|
"note": "Quota is managed by Hugging Face infrastructure, not this application.", |
|
"calculated_retry": error_details["retry_after"] |
|
} |
|
|
|
|
|
status_code = 429 |
|
elif "GPU task aborted" in error_message: |
|
logging.error(f"{EMOJI_MAP['ERROR']} GPU task aborted") |
|
error_type = "GPU_TASK_ABORTED" |
|
error_details["gpu_error"] = True |
|
elif "gradio.exceptions.Error" in str(type(exc)): |
|
error_type = "GRADIO_ERROR" |
|
error_details["gradio_error"] = True |
|
|
|
if "GPU limit" in error_message or "GPU quota" in error_message: |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU quota exceeded: Space app has reached its GPU limit") |
|
error_type = "GPU_LIMIT_ERROR" |
|
error_details["gpu_error"] = True |
|
|
|
|
|
retry_after = calculate_retry_after_from_quota(error_message) |
|
error_details["retry_after"] = retry_after |
|
|
|
|
|
parsed_retry = parse_retry_time_from_error(error_message) |
|
if parsed_retry: |
|
error_details["retry_after"] = parsed_retry |
|
logging.info(f"{EMOJI_MAP['INFO']} Parsed retry time from Gradio error: {parsed_retry}s") |
|
|
|
|
|
status_code = 429 |
|
else: |
|
logging.error(f"{EMOJI_MAP['ERROR']} Unhandled exception: {str(exc)}") |
|
logging.error(traceback.format_exc()) |
|
elif isinstance(exc, ValueError): |
|
error_type = "VALIDATION_ERROR" |
|
logging.error(f"{EMOJI_MAP['ERROR']} Unhandled exception: {str(exc)}") |
|
logging.error(traceback.format_exc()) |
|
elif isinstance(exc, TimeoutError): |
|
error_type = "TIMEOUT_ERROR" |
|
logging.error(f"{EMOJI_MAP['ERROR']} Unhandled exception: {str(exc)}") |
|
logging.error(traceback.format_exc()) |
|
else: |
|
|
|
logging.error(f"{EMOJI_MAP['ERROR']} Unhandled exception: {str(exc)}") |
|
logging.error(traceback.format_exc()) |
|
|
|
|
|
error_response = { |
|
"status": "error", |
|
"error_type": error_type, |
|
"error_message": error_message, |
|
"error_details": error_details, |
|
"timestamp": time.time(), |
|
"request_path": str(request.url.path), |
|
"request_method": request.method |
|
} |
|
|
|
|
|
if error_type not in ["GPU_LIMIT_ERROR", "GPU_TASK_ABORTED"] and not ("GPU limit" in error_message) and not ("ZeroGPU quota exceeded" in error_message): |
|
tb_lines = traceback.format_exception(type(exc), exc, exc.__traceback__) |
|
error_response["traceback"] = ''.join(tb_lines) |
|
|
|
|
|
if error_type == "GPU_LIMIT_ERROR": |
|
logging.warning(f"{EMOJI_MAP['WARNING']} GPU quota limit response sent to client with retry_after: {error_details.get('retry_after', 900)}s") |
|
|
|
|
|
if not hasattr(app.state, "last_quota_error"): |
|
app.state.last_quota_error = {} |
|
|
|
app.state.last_quota_error = { |
|
"timestamp": time.time(), |
|
"retry_after": error_details.get("retry_after", 900), |
|
"error_message": error_message |
|
} |
|
else: |
|
|
|
logging.error(f"{EMOJI_MAP['ERROR']} Error response: {json.dumps(error_response, indent=2)}") |
|
|
|
|
|
headers = {} |
|
if error_type == "GPU_LIMIT_ERROR" and "retry_after" in error_details: |
|
headers["Retry-After"] = str(error_details["retry_after"]) |
|
|
|
return JSONResponse( |
|
status_code=status_code, |
|
content=error_response, |
|
headers=headers |
|
) |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
log_config = uvicorn.config.LOGGING_CONFIG |
|
log_config["formatters"]["default"]["fmt"] = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" |
|
log_config["formatters"]["access"]["fmt"] = '%(asctime)s [%(levelname)s] %(name)s: %(client_addr)s - "%(request_line)s" %(status_code)s' |
|
|
|
|
|
log_config["loggers"]["uvicorn"]["propagate"] = False |
|
log_config["loggers"]["uvicorn.access"]["propagate"] = False |
|
|
|
uvicorn.run( |
|
app, |
|
host=API_HOST, |
|
port=API_PORT, |
|
log_level="info", |
|
log_config=log_config |
|
) |
|
|