API Service

API Service

The API service is a FastAPI-based web server that handles HTTP requests and manages job queuing to GPU workers.

Overview

Location: api/
Technology: FastAPI + Uvicorn
Port: 8000
Container: Velesio-api

The API service serves as the gateway between client applications and the AI inference workers, providing:

  • HTTP/REST endpoint handling
  • Bearer token authentication
  • Request validation and preprocessing
  • Redis job queue management
  • Response formatting and delivery

Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Client Request
      ↓
┌─────────────────┐
│  FastAPI App    │
│                 │
│ ┌─────────────┐ │
│ │   Auth      │ │ ← Bearer token validation
│ │ Middleware  │ │
│ └─────────────┘ │
│                 │
│ ┌─────────────┐ │
│ │ Request     │ │ ← Pydantic validation
│ │ Validation  │ │
│ └─────────────┘ │
│                 │
│ ┌─────────────┐ │
│ │   Redis     │ │ ← Job queue management
│ │   Client    │ │
│ └─────────────┘ │
└─────────────────┘
      ↓
   Redis Queue

Key Components

1. FastAPI Application (main.py)

The main application file defines all endpoints and middleware:

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer
from pydantic import BaseModel
import redis.asyncio as redis

app = FastAPI(
    title="Velesio AI Server",
    description="Microservice-based AI inference server",
    version="1.0.0"
)

security = HTTPBearer()

2. Authentication Middleware

Token-based authentication using environment variables:

1
2
3
4
5
6
API_TOKENS = os.getenv("API_TOKENS", "").split(",")

async def verify_token(token: HTTPAuthorizationCredentials = Depends(security)):
    if token.credentials not in API_TOKENS:
        raise HTTPException(status_code=401, detail="Invalid token")
    return token.credentials

3. Request Models

Pydantic models ensure type safety and validation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class CompletionRequest(BaseModel):
    prompt: str
    max_tokens: int = 150
    temperature: float = 0.7
    top_p: float = 0.9
    top_k: int = 40
    frequency_penalty: float = 0.0
    presence_penalty: float = 0.0
    stop: Optional[List[str]] = None
    stream: bool = False

class ChatMessage(BaseModel):
    role: Literal["user", "assistant", "system"]
    content: str

class ChatCompletionRequest(BaseModel):
    messages: List[ChatMessage]
    max_tokens: int = 150
    temperature: float = 0.7
    stream: bool = False

4. Redis Integration

Job queue management using Redis:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
redis_client = redis.Redis(
    host=os.getenv("REDIS_HOST", "redis"),
    port=int(os.getenv("REDIS_PORT", 6379)),
    password=os.getenv("REDIS_PASSWORD"),
    decode_responses=True
)

async def enqueue_job(job_type: str, job_data: dict) -> str:
    job_id = str(uuid.uuid4())
    job = {
        "id": job_id,
        "type": job_type,
        "data": job_data,
        "timestamp": time.time()
    }
    
    await redis_client.lpush("llama_queue", json.dumps(job))
    return job_id

API Endpoints

Text Generation

POST /completion

OpenAI-compatible text completion endpoint:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@app.post("/completion")
async def completion(
    request: CompletionRequest,
    token: str = Depends(verify_token)
):
    # Enqueue job to Redis
    job_id = await enqueue_job("completion", request.dict())
    
    # Poll for results
    result = await poll_job_result(job_id)
    
    return {
        "id": f"cmpl-{job_id}",
        "object": "text_completion",
        "created": int(time.time()),
        "choices": [{
            "text": result["text"],
            "index": 0,
            "finish_reason": result.get("finish_reason", "stop")
        }],
        "usage": result.get("usage", {})
    }

POST /chat/completions

Chat-style completion with conversation history:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@app.post("/chat/completions")
async def chat_completions(
    request: ChatCompletionRequest,
    token: str = Depends(verify_token)
):
    # Convert chat messages to prompt
    prompt = format_chat_prompt(request.messages)
    
    # Create completion request
    completion_req = CompletionRequest(
        prompt=prompt,
        max_tokens=request.max_tokens,
        temperature=request.temperature,
        stream=request.stream
    )
    
    # Process similar to /completion
    job_id = await enqueue_job("chat_completion", completion_req.dict())
    result = await poll_job_result(job_id)
    
    return format_chat_response(result, job_id)

Image Generation

POST /sdapi/v1/txt2img

Stable Diffusion text-to-image generation:

1
2
3
4
5
6
7
8
9
10
11
12
13
@app.post("/sdapi/v1/txt2img")
async def txt2img(
    request: Txt2ImgRequest,
    token: str = Depends(verify_token)
):
    job_id = await enqueue_job("txt2img", request.dict())
    result = await poll_job_result(job_id, timeout=300)  # Longer timeout for images
    
    return {
        "images": result["images"],
        "parameters": result["parameters"],
        "info": result.get("info", "")
    }

Utility Endpoints

GET /health

Service health check:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@app.get("/health")
async def health():
    try:
        # Check Redis connection
        await redis_client.ping()
        redis_status = "connected"
    except:
        redis_status = "disconnected"
    
    # Check worker status
    worker_count = await redis_client.llen("llama_queue")
    
    return {
        "status": "healthy" if redis_status == "connected" else "degraded",
        "timestamp": datetime.utcnow().isoformat(),
        "services": {
            "redis": redis_status,
            "queue_depth": worker_count
        }
    }

GET /models

List available models:

1
2
3
4
5
6
7
8
9
@app.get("/models")
async def list_models(token: str = Depends(verify_token)):
    # Query workers for available models
    models_info = await get_worker_models()
    
    return {
        "text_models": models_info.get("text", []),
        "image_models": models_info.get("image", [])
    }

Job Management

Job Queuing

Jobs are queued in Redis with the following structure:

1
2
3
4
5
6
7
8
9
job = {
    "id": "uuid-string",
    "type": "completion|chat_completion|txt2img|img2img",
    "data": {
        # Request parameters
    },
    "timestamp": 1640995200.0,
    "priority": 1  # Optional priority
}

Result Polling

The API polls Redis for job completion:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def poll_job_result(job_id: str, timeout: int = 60) -> dict:
    start_time = time.time()
    
    while time.time() - start_time < timeout:
        # Check for result
        result_key = f"result:{job_id}"
        result = await redis_client.get(result_key)
        
        if result:
            # Parse and return result
            return json.loads(result)
        
        # Wait before next poll
        await asyncio.sleep(0.5)
    
    raise HTTPException(status_code=408, detail="Request timeout")

Streaming Responses

For streaming text generation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.post("/completion")
async def completion_stream(request: CompletionRequest):
    if not request.stream:
        return await completion(request)
    
    # Server-Sent Events streaming
    async def generate():
        job_id = await enqueue_job("completion_stream", request.dict())
        
        async for chunk in poll_streaming_result(job_id):
            yield f"data: {json.dumps(chunk)}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/plain")

Configuration

Environment Variables

Variable Default Description
API_TOKENS Required Comma-separated API tokens
REDIS_HOST redis Redis hostname
REDIS_PORT 6379 Redis port
REDIS_PASSWORD None Redis password
LOG_LEVEL INFO Logging level
CORS_ORIGINS ["*"] CORS allowed origins
MAX_QUEUE_SIZE 1000 Maximum queue depth
REQUEST_TIMEOUT 60 Default request timeout

Docker Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Non-root user
RUN useradd -m -u 1000 apiuser
USER apiuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Start application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Security Features

Rate Limiting

1
2
3
4
5
6
7
8
9
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/completion")
@limiter.limit("60/minute")
async def completion(request: CompletionRequest):
    # Endpoint implementation

Input Validation

1
2
3
4
5
6
7
8
9
10
class CompletionRequest(BaseModel):
    prompt: str = Field(..., max_length=10000, description="Input prompt")
    max_tokens: int = Field(150, ge=1, le=4000, description="Maximum tokens")
    temperature: float = Field(0.7, ge=0.0, le=2.0, description="Temperature")
    
    @validator('prompt')
    def validate_prompt(cls, v):
        if not v.strip():
            raise ValueError('Prompt cannot be empty')
        return v

CORS Configuration

1
2
3
4
5
6
7
8
9
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=os.getenv("CORS_ORIGINS", "*").split(","),
    allow_credentials=True,
    allow_methods=["GET", "POST"],
    allow_headers=["*"],
)

Monitoring and Logging

Structured Logging

1
2
3
4
5
6
7
8
9
10
11
12
import structlog

logger = structlog.get_logger()

@app.post("/completion")
async def completion(request: CompletionRequest):
    logger.info(
        "completion_request",
        prompt_length=len(request.prompt),
        max_tokens=request.max_tokens,
        temperature=request.temperature
    )

Metrics Collection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from prometheus_client import Counter, Histogram, generate_latest

REQUEST_COUNT = Counter('api_requests_total', 'Total API requests', ['endpoint', 'status'])
REQUEST_DURATION = Histogram('api_request_duration_seconds', 'Request duration')

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time
    
    REQUEST_COUNT.labels(endpoint=request.url.path, status=response.status_code).inc()
    REQUEST_DURATION.observe(duration)
    
    return response

@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type="text/plain")

Performance Optimization

Connection Pooling

1
2
3
4
5
6
7
8
9
import aioredis

redis_pool = aioredis.ConnectionPool.from_url(
    "redis://redis:6379",
    max_connections=100,
    retry_on_timeout=True
)

redis_client = aioredis.Redis(connection_pool=redis_pool)

Async/Await Usage

1
2
3
4
5
6
7
8
9
10
11
12
@app.post("/completion")
async def completion(request: CompletionRequest):
    # Non-blocking Redis operations
    job_id = await enqueue_job("completion", request.dict())
    
    # Concurrent job polling
    result = await asyncio.wait_for(
        poll_job_result(job_id),
        timeout=request.timeout or 60
    )
    
    return format_response(result)

Caching

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def get_cached_completion(prompt_hash: str) -> Optional[dict]:
    # Cache frequently requested completions
    pass

async def completion_with_cache(request: CompletionRequest):
    prompt_hash = hashlib.md5(request.prompt.encode()).hexdigest()
    
    # Check cache first
    cached = get_cached_completion(prompt_hash)
    if cached:
        return cached
    
    # Process normally and cache result
    result = await process_completion(request)
    cache_result(prompt_hash, result)
    return result

Testing

Unit Tests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pytest
from fastapi.testclient import TestClient

client = TestClient(app)

def test_completion_endpoint():
    response = client.post(
        "/completion",
        headers={"Authorization": "Bearer test-token"},
        json={
            "prompt": "Hello, world!",
            "max_tokens": 50
        }
    )
    assert response.status_code == 200
    assert "choices" in response.json()

def test_invalid_token():
    response = client.post(
        "/completion",
        headers={"Authorization": "Bearer invalid-token"},
        json={"prompt": "Hello"}
    )
    assert response.status_code == 401

Integration Tests

1
2
3
4
5
6
7
8
9
10
11
async def test_redis_integration():
    # Test Redis connection
    await redis_client.ping()
    
    # Test job queuing
    job_id = await enqueue_job("test", {"data": "test"})
    assert job_id is not None
    
    # Test job retrieval
    queue_length = await redis_client.llen("llama_queue")
    assert queue_length > 0

Deployment

Production Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# docker-compose.prod.yml
services:
  api:
    image: Velesio/Velesio-api:latest
    restart: unless-stopped
    environment:
      - API_TOKENS=${API_TOKENS}
      - REDIS_URL=redis://redis:6379
      - LOG_LEVEL=INFO
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 1G
          cpus: '0.5'
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

Scaling

1
2
3
4
5
6
7
8
9
# Horizontal scaling
docker-compose up -d --scale api=5

# Load balancing with nginx
upstream api_backend {
    server api_1:8000;
    server api_2:8000;
    server api_3:8000;
}

Troubleshooting

Common Issues

  1. Redis Connection Errors
    • Check Redis service status
    • Verify network connectivity
    • Check authentication credentials
  2. High Memory Usage
    • Monitor request queue size
    • Implement request timeouts
    • Add memory limits to containers
  3. Slow Response Times
    • Check GPU worker availability
    • Monitor Redis queue depth
    • Optimize database queries

Debug Mode

1
2
3
4
5
6
7
8
9
10
11
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

# Add request/response logging
@app.middleware("http")
async def log_requests(request: Request, call_next):
    logger.debug(f"Request: {request.method} {request.url}")
    response = await call_next(request)
    logger.debug(f"Response: {response.status_code}")
    return response

Next Steps