import os import json import time import uuid import sys from typing import List, Dict, Optional, Union, Generator, Any # --- Core Dependencies --- import uvicorn from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field from curl_cffi.requests import Session from curl_cffi import CurlError # --- Environment Configuration --- QODO_API_KEY = os.getenv("QODO_API_KEY") # No default key to encourage setting it explicitly QODO_URL = os.getenv("QODO_URL", "https://*") QODO_INFO_URL = os.getenv("QODO_INFO_URL", "*") # --- Recreated/Mocked webscout & OpenAI Dependencies --- # This section recreates the necessary classes and functions # to make the QodoAI provider self-contained. class exceptions: class FailedToGenerateResponseError(Exception): pass def sanitize_stream(data: Generator[bytes, None, None], content_extractor: callable, **kwargs: Any) -> Generator[str, None, None]: buffer = "" for byte_chunk in data: buffer += byte_chunk.decode('utf-8', errors='ignore') obj_start_indices = [i for i, char in enumerate(buffer) if char == '{'] if not obj_start_indices: continue start_index = 0 for obj_start in obj_start_indices: if obj_start < start_index: continue brace_count = 0 obj_end = -1 for i in range(obj_start, len(buffer)): if buffer[i] == '{': brace_count += 1 elif buffer[i] == '}': brace_count -= 1 if brace_count == 0: obj_end = i break if obj_end != -1: json_str = buffer[obj_start:obj_end + 1] try: json_obj = json.loads(json_str) content = content_extractor(json_obj) if content: yield content start_index = obj_end + 1 except json.JSONDecodeError: continue buffer = buffer[start_index:] # --- OpenAI-Compatible Pydantic Models --- # Request Models class ChatMessage(BaseModel): role: str content: str name: Optional[str] = None tool_calls: Optional[List[Dict]] = None tool_call_id: Optional[str] = None class Function(BaseModel): name: str description: Optional[str] = None parameters: Dict[str, Any] class Tool(BaseModel): type: str = "function" function: Function class ChatCompletionRequest(BaseModel): model: str messages: List[ChatMessage] max_tokens: Optional[int] = 2049 stream: bool = False temperature: Optional[float] = 1.0 top_p: Optional[float] = 1.0 tools: Optional[List[Tool]] = None tool_choice: Optional[Union[str, Dict]] = None # Response Models class ChatCompletionMessage(BaseModel): role: str content: Optional[str] = None tool_calls: Optional[List[Dict]] = None class ChoiceDelta(BaseModel): content: Optional[str] = None role: Optional[str] = None class Choice(BaseModel): index: int message: ChatCompletionMessage finish_reason: Optional[str] = "stop" class ChoiceStreaming(BaseModel): index: int delta: ChoiceDelta finish_reason: Optional[str] = None class CompletionUsage(BaseModel): prompt_tokens: int completion_tokens: int total_tokens: int class ChatCompletion(BaseModel): id: str choices: List[Choice] created: int model: str object: str = "chat.completion" usage: CompletionUsage class ChatCompletionChunk(BaseModel): id: str choices: List[ChoiceStreaming] created: int model: str object: str = "chat.completion.chunk" usage: Optional[CompletionUsage] = None # --- Base Provider Structure --- class BaseCompletions: def __init__(self, client: Any): self._client = client class BaseChat: def __init__(self, client: Any): self.completions = Completions(client) class OpenAICompatibleProvider: def __init__(self, **kwargs: Any): pass # --- QodoAI Provider Logic --- class Completions(BaseCompletions): def create( self, *, model: str, messages: List[Dict[str, Any]], stream: bool = False, **kwargs: Any ) -> Union[ChatCompletion, Generator[ChatCompletionChunk, None, None]]: # Warn about unsupported parameters unsupported_params = ['temperature', 'top_p', 'tools', 'tool_choice', 'max_tokens'] for param in unsupported_params: if param in kwargs: print(f"Warning: Parameter '{param}' is not supported by the QodoAI provider and will be ignored.", file=sys.stderr) user_prompt = "" for message in reversed(messages): if message.get("role") == "user": user_prompt = message.get("content", "") break if not user_prompt: raise ValueError("No user message with 'role': 'user' found in messages.") payload = self._client._build_payload(user_prompt, model) payload["stream"] = stream request_id = f"chatcmpl-{uuid.uuid4()}" created_time = int(time.time()) if stream: return self._create_stream(request_id, created_time, model, payload, user_prompt) else: return self._create_non_stream(request_id, created_time, model, payload, user_prompt) def _create_stream(self, request_id, created_time, model, payload, user_prompt) -> Generator[ChatCompletionChunk, None, None]: try: with self._client.session.post(self._client.url, json=payload, stream=True, timeout=self._client.timeout, impersonate="chrome110") as response: if response.status_code == 401: raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key provided.") response.raise_for_status() for content_chunk in sanitize_stream(response.iter_content(chunk_size=8192), QodoAI._qodo_extractor): if content_chunk: delta = ChoiceDelta(content=content_chunk, role="assistant") choice = ChoiceStreaming(index=0, delta=delta, finish_reason=None) yield ChatCompletionChunk(id=request_id, choices=[choice], created=created_time, model=model) final_delta = ChoiceDelta() final_choice = ChoiceStreaming(index=0, delta=final_delta, finish_reason="stop") yield ChatCompletionChunk(id=request_id, choices=[final_choice], created=created_time, model=model) except Exception as e: raise exceptions.FailedToGenerateResponseError(f"Stream generation failed: {e}") def _create_non_stream(self, request_id, created_time, model, payload, user_prompt) -> ChatCompletion: try: payload["stream"] = False response = self._client.session.post(self._client.url, json=payload, timeout=self._client.timeout, impersonate="chrome110") if response.status_code == 401: raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key provided.") response.raise_for_status() full_response = "".join(list(sanitize_stream(iter([response.content]), QodoAI._qodo_extractor))) prompt_tokens = len(user_prompt.split()) completion_tokens = len(full_response.split()) message = ChatCompletionMessage(role="assistant", content=full_response) choice = Choice(index=0, message=message, finish_reason="stop") usage = CompletionUsage(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=prompt_tokens + completion_tokens) return ChatCompletion(id=request_id, choices=[choice], created=created_time, model=model, usage=usage) except Exception as e: raise exceptions.FailedToGenerateResponseError(f"Non-stream generation failed: {e}") class Chat(BaseChat): def __init__(self, client: 'QodoAI'): self.completions = Completions(client) class QodoAI(OpenAICompatibleProvider): AVAILABLE_MODELS = ["gpt-4.1", "gpt-4o", "o3", "o4-mini", "claude-4-sonnet", "gemini-2.5-pro"] def __init__(self, api_key: str, **kwargs: Any): super().__init__(api_key=api_key, **kwargs) self.url, self.info_url, self.timeout, self.api_key = QODO_URL, QODO_INFO_URL, 600, api_key self.user_agent = "axios/1.10.0" self.session_id = self._get_session_id() self.headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "User-Agent": self.user_agent, "Session-id": self.session_id } self.session = Session(headers=self.headers) self.chat = Chat(self) @staticmethod def _qodo_extractor(chunk: Union[str, Dict[str, Any]]) -> Optional[str]: if isinstance(chunk, dict): data = chunk.get("data", {}) if isinstance(data, dict): content = data.get("content") or (data.get("tool_args", {}) or {}).get("content") if content: return content return None def _get_session_id(self) -> str: try: response = Session(headers={"Authorization": f"Bearer {self.api_key}"}).get(self.info_url, timeout=self.timeout) if response.status_code == 200: return response.json().get("session-id", f"fallback-{uuid.uuid4()}") elif response.status_code == 401: raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key. Please check your QODO_API_KEY environment variable.") else: raise exceptions.FailedToGenerateResponseError(f"Failed to get session_id from Qodo: HTTP {response.status_code}") except Exception as e: raise exceptions.FailedToGenerateResponseError(f"Failed to connect to Qodo API to get session_id: {e}") def _build_payload(self, prompt: str, model: str) -> Dict[str, Any]: return {"agent_type": "cli", "session_id": self.session_id, "user_request": prompt, "custom_model": model, "stream": True} # --- FastAPI Application --- app = FastAPI( title="QodoAI OpenAI-Compatible API", description="Provides an OpenAI-compatible interface for the QodoAI service.", version="1.0.0" ) client: Optional[QodoAI] = None @app.on_event("startup") def startup_event(): global client if not QODO_API_KEY: raise RuntimeError("QODO_API_KEY environment variable not set. The server cannot start without an API key.") try: client = QodoAI(api_key=QODO_API_KEY) print("QodoAI client initialized successfully.") except exceptions.FailedToGenerateResponseError as e: raise RuntimeError(f"FATAL: Could not initialize QodoAI client: {e}") @app.get("/v1/models", response_model_exclude_none=True) async def list_models(): """Lists the available models from the QodoAI provider.""" models_data = [ {"id": model_id, "object": "model", "created": int(time.time()), "owned_by": "qodoai"} for model_id in QodoAI.AVAILABLE_MODELS ] return {"object": "list", "data": models_data} @app.post("/v1/chat/completions") async def create_chat_completion(request: ChatCompletionRequest): """Creates a chat completion, supporting both streaming and non-streaming modes.""" if client is None: raise HTTPException(status_code=503, detail="QodoAI client is not available or failed to initialize.") params = request.model_dump(exclude_none=True) try: if request.stream: async def stream_generator(): try: generator = client.chat.completions.create(**params) for chunk in generator: yield f"data: {chunk.model_dump_json()}\n\n" except exceptions.FailedToGenerateResponseError as e: error_payload = {"error": {"message": str(e), "type": "api_error", "code": 500}} yield f"data: {json.dumps(error_payload)}\n\n" finally: yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: response = client.chat.completions.create(**params) return JSONResponse(content=response.model_dump()) except exceptions.FailedToGenerateResponseError as e: raise HTTPException(status_code=500, detail=str(e)) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if __name__ == "__main__": if not QODO_API_KEY: print("Error: The QODO_API_KEY environment variable must be set.", file=sys.stderr) sys.exit(1) uvicorn.run(app, host="0.0.0.0", port=8000)