|
import os |
|
import json |
|
import time |
|
import uuid |
|
import sys |
|
from typing import List, Dict, Optional, Union, Generator, Any |
|
|
|
|
|
import uvicorn |
|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
from pydantic import BaseModel, Field |
|
from curl_cffi.requests import Session |
|
from curl_cffi import CurlError |
|
|
|
|
|
QODO_API_KEY = os.getenv("QODO_API_KEY") |
|
QODO_URL = os.getenv("QODO_URL", "https://*") |
|
QODO_INFO_URL = os.getenv("QODO_INFO_URL", "*") |
|
|
|
|
|
|
|
|
|
|
|
class exceptions: |
|
class FailedToGenerateResponseError(Exception): |
|
pass |
|
|
|
def sanitize_stream(data: Generator[bytes, None, None], content_extractor: callable, **kwargs: Any) -> Generator[str, None, None]: |
|
buffer = "" |
|
for byte_chunk in data: |
|
buffer += byte_chunk.decode('utf-8', errors='ignore') |
|
obj_start_indices = [i for i, char in enumerate(buffer) if char == '{'] |
|
if not obj_start_indices: |
|
continue |
|
|
|
start_index = 0 |
|
for obj_start in obj_start_indices: |
|
if obj_start < start_index: |
|
continue |
|
|
|
brace_count = 0 |
|
obj_end = -1 |
|
for i in range(obj_start, len(buffer)): |
|
if buffer[i] == '{': |
|
brace_count += 1 |
|
elif buffer[i] == '}': |
|
brace_count -= 1 |
|
if brace_count == 0: |
|
obj_end = i |
|
break |
|
|
|
if obj_end != -1: |
|
json_str = buffer[obj_start:obj_end + 1] |
|
try: |
|
json_obj = json.loads(json_str) |
|
content = content_extractor(json_obj) |
|
if content: |
|
yield content |
|
start_index = obj_end + 1 |
|
except json.JSONDecodeError: |
|
continue |
|
buffer = buffer[start_index:] |
|
|
|
|
|
|
|
|
|
class ChatMessage(BaseModel): |
|
role: str |
|
content: str |
|
name: Optional[str] = None |
|
tool_calls: Optional[List[Dict]] = None |
|
tool_call_id: Optional[str] = None |
|
|
|
class Function(BaseModel): |
|
name: str |
|
description: Optional[str] = None |
|
parameters: Dict[str, Any] |
|
|
|
class Tool(BaseModel): |
|
type: str = "function" |
|
function: Function |
|
|
|
class ChatCompletionRequest(BaseModel): |
|
model: str |
|
messages: List[ChatMessage] |
|
max_tokens: Optional[int] = 2049 |
|
stream: bool = False |
|
temperature: Optional[float] = 1.0 |
|
top_p: Optional[float] = 1.0 |
|
tools: Optional[List[Tool]] = None |
|
tool_choice: Optional[Union[str, Dict]] = None |
|
|
|
|
|
class ChatCompletionMessage(BaseModel): |
|
role: str |
|
content: Optional[str] = None |
|
tool_calls: Optional[List[Dict]] = None |
|
|
|
class ChoiceDelta(BaseModel): |
|
content: Optional[str] = None |
|
role: Optional[str] = None |
|
|
|
class Choice(BaseModel): |
|
index: int |
|
message: ChatCompletionMessage |
|
finish_reason: Optional[str] = "stop" |
|
|
|
class ChoiceStreaming(BaseModel): |
|
index: int |
|
delta: ChoiceDelta |
|
finish_reason: Optional[str] = None |
|
|
|
class CompletionUsage(BaseModel): |
|
prompt_tokens: int |
|
completion_tokens: int |
|
total_tokens: int |
|
|
|
class ChatCompletion(BaseModel): |
|
id: str |
|
choices: List[Choice] |
|
created: int |
|
model: str |
|
object: str = "chat.completion" |
|
usage: CompletionUsage |
|
|
|
class ChatCompletionChunk(BaseModel): |
|
id: str |
|
choices: List[ChoiceStreaming] |
|
created: int |
|
model: str |
|
object: str = "chat.completion.chunk" |
|
usage: Optional[CompletionUsage] = None |
|
|
|
|
|
|
|
class BaseCompletions: |
|
def __init__(self, client: Any): |
|
self._client = client |
|
|
|
class BaseChat: |
|
def __init__(self, client: Any): |
|
self.completions = Completions(client) |
|
|
|
class OpenAICompatibleProvider: |
|
def __init__(self, **kwargs: Any): |
|
pass |
|
|
|
|
|
|
|
class Completions(BaseCompletions): |
|
def create( |
|
self, |
|
*, |
|
model: str, |
|
messages: List[Dict[str, Any]], |
|
stream: bool = False, |
|
**kwargs: Any |
|
) -> Union[ChatCompletion, Generator[ChatCompletionChunk, None, None]]: |
|
|
|
|
|
unsupported_params = ['temperature', 'top_p', 'tools', 'tool_choice', 'max_tokens'] |
|
for param in unsupported_params: |
|
if param in kwargs: |
|
print(f"Warning: Parameter '{param}' is not supported by the QodoAI provider and will be ignored.", file=sys.stderr) |
|
|
|
user_prompt = "" |
|
for message in reversed(messages): |
|
if message.get("role") == "user": |
|
user_prompt = message.get("content", "") |
|
break |
|
if not user_prompt: |
|
raise ValueError("No user message with 'role': 'user' found in messages.") |
|
|
|
payload = self._client._build_payload(user_prompt, model) |
|
payload["stream"] = stream |
|
|
|
request_id = f"chatcmpl-{uuid.uuid4()}" |
|
created_time = int(time.time()) |
|
|
|
if stream: |
|
return self._create_stream(request_id, created_time, model, payload, user_prompt) |
|
else: |
|
return self._create_non_stream(request_id, created_time, model, payload, user_prompt) |
|
|
|
def _create_stream(self, request_id, created_time, model, payload, user_prompt) -> Generator[ChatCompletionChunk, None, None]: |
|
try: |
|
with self._client.session.post(self._client.url, json=payload, stream=True, timeout=self._client.timeout, impersonate="chrome110") as response: |
|
if response.status_code == 401: |
|
raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key provided.") |
|
response.raise_for_status() |
|
|
|
for content_chunk in sanitize_stream(response.iter_content(chunk_size=8192), QodoAI._qodo_extractor): |
|
if content_chunk: |
|
delta = ChoiceDelta(content=content_chunk, role="assistant") |
|
choice = ChoiceStreaming(index=0, delta=delta, finish_reason=None) |
|
yield ChatCompletionChunk(id=request_id, choices=[choice], created=created_time, model=model) |
|
|
|
final_delta = ChoiceDelta() |
|
final_choice = ChoiceStreaming(index=0, delta=final_delta, finish_reason="stop") |
|
yield ChatCompletionChunk(id=request_id, choices=[final_choice], created=created_time, model=model) |
|
except Exception as e: |
|
raise exceptions.FailedToGenerateResponseError(f"Stream generation failed: {e}") |
|
|
|
def _create_non_stream(self, request_id, created_time, model, payload, user_prompt) -> ChatCompletion: |
|
try: |
|
payload["stream"] = False |
|
response = self._client.session.post(self._client.url, json=payload, timeout=self._client.timeout, impersonate="chrome110") |
|
|
|
if response.status_code == 401: |
|
raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key provided.") |
|
response.raise_for_status() |
|
|
|
full_response = "".join(list(sanitize_stream(iter([response.content]), QodoAI._qodo_extractor))) |
|
|
|
prompt_tokens = len(user_prompt.split()) |
|
completion_tokens = len(full_response.split()) |
|
|
|
message = ChatCompletionMessage(role="assistant", content=full_response) |
|
choice = Choice(index=0, message=message, finish_reason="stop") |
|
usage = CompletionUsage(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=prompt_tokens + completion_tokens) |
|
return ChatCompletion(id=request_id, choices=[choice], created=created_time, model=model, usage=usage) |
|
except Exception as e: |
|
raise exceptions.FailedToGenerateResponseError(f"Non-stream generation failed: {e}") |
|
|
|
class Chat(BaseChat): |
|
def __init__(self, client: 'QodoAI'): |
|
self.completions = Completions(client) |
|
|
|
class QodoAI(OpenAICompatibleProvider): |
|
AVAILABLE_MODELS = ["gpt-4.1", "gpt-4o", "o3", "o4-mini", "claude-4-sonnet", "gemini-2.5-pro"] |
|
|
|
def __init__(self, api_key: str, **kwargs: Any): |
|
super().__init__(api_key=api_key, **kwargs) |
|
self.url, self.info_url, self.timeout, self.api_key = QODO_URL, QODO_INFO_URL, 600, api_key |
|
self.user_agent = "axios/1.10.0" |
|
self.session_id = self._get_session_id() |
|
self.headers = { |
|
"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", |
|
"User-Agent": self.user_agent, "Session-id": self.session_id |
|
} |
|
self.session = Session(headers=self.headers) |
|
self.chat = Chat(self) |
|
|
|
@staticmethod |
|
def _qodo_extractor(chunk: Union[str, Dict[str, Any]]) -> Optional[str]: |
|
if isinstance(chunk, dict): |
|
data = chunk.get("data", {}) |
|
if isinstance(data, dict): |
|
content = data.get("content") or (data.get("tool_args", {}) or {}).get("content") |
|
if content: return content |
|
return None |
|
|
|
def _get_session_id(self) -> str: |
|
try: |
|
response = Session(headers={"Authorization": f"Bearer {self.api_key}"}).get(self.info_url, timeout=self.timeout) |
|
if response.status_code == 200: |
|
return response.json().get("session-id", f"fallback-{uuid.uuid4()}") |
|
elif response.status_code == 401: |
|
raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key. Please check your QODO_API_KEY environment variable.") |
|
else: |
|
raise exceptions.FailedToGenerateResponseError(f"Failed to get session_id from Qodo: HTTP {response.status_code}") |
|
except Exception as e: |
|
raise exceptions.FailedToGenerateResponseError(f"Failed to connect to Qodo API to get session_id: {e}") |
|
|
|
def _build_payload(self, prompt: str, model: str) -> Dict[str, Any]: |
|
return {"agent_type": "cli", "session_id": self.session_id, "user_request": prompt, "custom_model": model, "stream": True} |
|
|
|
|
|
|
|
app = FastAPI( |
|
title="QodoAI OpenAI-Compatible API", |
|
description="Provides an OpenAI-compatible interface for the QodoAI service.", |
|
version="1.0.0" |
|
) |
|
|
|
client: Optional[QodoAI] = None |
|
|
|
@app.on_event("startup") |
|
def startup_event(): |
|
global client |
|
if not QODO_API_KEY: |
|
raise RuntimeError("QODO_API_KEY environment variable not set. The server cannot start without an API key.") |
|
try: |
|
client = QodoAI(api_key=QODO_API_KEY) |
|
print("QodoAI client initialized successfully.") |
|
except exceptions.FailedToGenerateResponseError as e: |
|
raise RuntimeError(f"FATAL: Could not initialize QodoAI client: {e}") |
|
|
|
@app.get("/v1/models", response_model_exclude_none=True) |
|
async def list_models(): |
|
"""Lists the available models from the QodoAI provider.""" |
|
models_data = [ |
|
{"id": model_id, "object": "model", "created": int(time.time()), "owned_by": "qodoai"} |
|
for model_id in QodoAI.AVAILABLE_MODELS |
|
] |
|
return {"object": "list", "data": models_data} |
|
|
|
@app.post("/v1/chat/completions") |
|
async def create_chat_completion(request: ChatCompletionRequest): |
|
"""Creates a chat completion, supporting both streaming and non-streaming modes.""" |
|
if client is None: |
|
raise HTTPException(status_code=503, detail="QodoAI client is not available or failed to initialize.") |
|
|
|
params = request.model_dump(exclude_none=True) |
|
|
|
try: |
|
if request.stream: |
|
async def stream_generator(): |
|
try: |
|
generator = client.chat.completions.create(**params) |
|
for chunk in generator: |
|
yield f"data: {chunk.model_dump_json()}\n\n" |
|
except exceptions.FailedToGenerateResponseError as e: |
|
error_payload = {"error": {"message": str(e), "type": "api_error", "code": 500}} |
|
yield f"data: {json.dumps(error_payload)}\n\n" |
|
finally: |
|
yield "data: [DONE]\n\n" |
|
return StreamingResponse(stream_generator(), media_type="text/event-stream") |
|
else: |
|
response = client.chat.completions.create(**params) |
|
return JSONResponse(content=response.model_dump()) |
|
|
|
except exceptions.FailedToGenerateResponseError as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
except ValueError as e: |
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
if __name__ == "__main__": |
|
if not QODO_API_KEY: |
|
print("Error: The QODO_API_KEY environment variable must be set.", file=sys.stderr) |
|
sys.exit(1) |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |