GPU Workers
GPU Workers
The GPU workers are specialized containers that handle AI inference tasks using NVIDIA GPUs.
Overview
Location: gpu/
Technology: Python + CUDA + Custom Binaries
Container: Velesio-gpu
Workers: LLM Worker (llm.py
) + Stable Diffusion Worker (sd.py
)
The GPU workers pull jobs from Redis queue and execute AI inference using:
- LLM Worker: Custom
undreamai_server
(llama.cpp fork) for text generation - SD Worker: Automatic1111 WebUI for image generation
Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
βββββββββββββββββββββββββββββββββββββββββββ
β GPU Worker β
β β
β βββββββββββββββ βββββββββββββββββββ β
β β Job Poller β β Result Writer β β
β β (Redis) β β (Redis) β β
β βββββββ¬ββββββββ βββββββββββ²ββββββββ β
β β β β
β βΌ β β
β βββββββββββββββββββββββββββββββ΄ββββββββ β
β β Job Processor β β
β β β β
β β βββββββββββββββ βββββββββββββββββ β β
β β β LLM Engine β β SD Engine β β β
β β β β β β β β
β β βundreamai_ β β Automatic1111 β β β
β β βserver β β WebUI β β β
β β βββββββββββββββ βββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββ β
β β GPU Memory β β
β β βββββββββββββββ βββββββββββββββββ β β
β β β LLM Models β β SD Models β β β
β β β (.gguf) β β (.safetensors)β β β
β β βββββββββββββββ βββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββ
LLM Worker (llm.py
)
Core Technology
Engine: undreamai_server
- Custom llama.cpp fork optimized for undream.ai
Key Features:
- GGUF model format support
- GPU layer offloading (CUDA)
- Streaming text generation
- Context window management
- Memory-efficient inference
Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import subprocess
import json
import time
import redis
from typing import Dict, Any
class LLMWorker:
def __init__(self):
self.redis_client = redis.Redis(host='redis', port=6379)
self.server_process = None
self.model_loaded = False
def start_server(self):
"""Start the undreamai_server process"""
cmd = [
'./data/llama/undreamai_server',
'--model', './data/models/text/model.gguf',
'--ctx-size', '4096',
'--n-gpu-layers', str(os.getenv('GPU_LAYERS', 35)),
'--host', '0.0.0.0',
'--port', '1337'
]
self.server_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Wait for server to start
self.wait_for_server()
self.model_loaded = True
def wait_for_server(self):
"""Wait for server to be ready"""
import requests
for _ in range(60): # 60 second timeout
try:
response = requests.get('http://localhost:1337/health')
if response.status_code == 200:
return
except:
pass
time.sleep(1)
raise Exception("Server failed to start")
Job Processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def process_completion_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process text completion job"""
# Extract parameters
prompt = job_data.get('prompt', '')
max_tokens = job_data.get('max_tokens', 150)
temperature = job_data.get('temperature', 0.7)
# Prepare request to undreamai_server
payload = {
'prompt': prompt,
'n_predict': max_tokens,
'temperature': temperature,
'top_k': job_data.get('top_k', 40),
'top_p': job_data.get('top_p', 0.9),
'repeat_penalty': 1.1,
'stream': job_data.get('stream', False)
}
try:
# Send request to local server
response = requests.post(
'http://localhost:1337/completion',
json=payload,
timeout=300
)
response.raise_for_status()
result = response.json()
# Format response
return {
'text': result['content'],
'finish_reason': result.get('stop_reason', 'stop'),
'usage': {
'prompt_tokens': result.get('tokens_evaluated', 0),
'completion_tokens': result.get('tokens_predicted', 0),
'total_tokens': result.get('tokens_evaluated', 0) + result.get('tokens_predicted', 0)
}
}
except Exception as e:
return {'error': str(e)}
Streaming Support
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def process_streaming_job(self, job_data: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
"""Process streaming completion job"""
payload = {
'prompt': job_data.get('prompt', ''),
'n_predict': job_data.get('max_tokens', 150),
'temperature': job_data.get('temperature', 0.7),
'stream': True
}
try:
response = requests.post(
'http://localhost:1337/completion',
json=payload,
stream=True,
timeout=300
)
for line in response.iter_lines():
if line:
try:
data = json.loads(line.decode('utf-8'))
if 'content' in data:
yield {
'choices': [{
'text': data['content'],
'index': 0,
'finish_reason': None
}]
}
except json.JSONDecodeError:
continue
except Exception as e:
yield {'error': str(e)}
Model Management
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def load_model(self, model_path: str):
"""Load a specific model"""
# Stop current server if running
if self.server_process:
self.server_process.terminate()
self.server_process.wait()
# Start with new model
self.start_server_with_model(model_path)
def get_model_info(self) -> Dict[str, Any]:
"""Get information about loaded model"""
try:
response = requests.get('http://localhost:1337/props')
response.raise_for_status()
props = response.json()
return {
'name': props.get('default_generation_settings', {}).get('model', 'unknown'),
'context_length': props.get('default_generation_settings', {}).get('n_ctx', 0),
'vocab_size': props.get('vocab_size', 0),
'total_params': props.get('n_params', 0)
}
except:
return {'error': 'Failed to get model info'}
Stable Diffusion Worker (sd.py
)
Core Technology
Engine: Automatic1111 WebUI
Framework: PyTorch + Diffusers
Acceleration: xFormers, Flash Attention
Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import requests
import base64
import io
from PIL import Image
class SDWorker:
def __init__(self):
self.redis_client = redis.Redis(host='redis', port=6379)
self.webui_url = 'http://localhost:7860'
self.initialized = False
def start_webui(self):
"""Start Automatic1111 WebUI"""
cmd = [
'python', 'webui.py',
'--api',
'--listen',
'--port', '7860',
'--xformers',
'--no-gradio-queue'
]
self.webui_process = subprocess.Popen(
cmd,
cwd='./data/sd',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.wait_for_webui()
self.initialized = True
def wait_for_webui(self):
"""Wait for WebUI to be ready"""
for _ in range(120): # 2 minute timeout
try:
response = requests.get(f'{self.webui_url}/sdapi/v1/progress')
if response.status_code == 200:
return
except:
pass
time.sleep(1)
raise Exception("WebUI failed to start")
Text-to-Image Processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def process_txt2img_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process text-to-image generation job"""
payload = {
'prompt': job_data.get('prompt', ''),
'negative_prompt': job_data.get('negative_prompt', ''),
'width': job_data.get('width', 512),
'height': job_data.get('height', 512),
'steps': job_data.get('steps', 20),
'cfg_scale': job_data.get('cfg_scale', 7.5),
'sampler_name': job_data.get('sampler_name', 'Euler a'),
'seed': job_data.get('seed', -1),
'batch_size': job_data.get('batch_size', 1),
'n_iter': job_data.get('n_iter', 1)
}
try:
response = requests.post(
f'{self.webui_url}/sdapi/v1/txt2img',
json=payload,
timeout=600 # 10 minute timeout for image generation
)
response.raise_for_status()
result = response.json()
return {
'images': result['images'], # Base64 encoded images
'parameters': {
'prompt': payload['prompt'],
'steps': payload['steps'],
'seed': result.get('info', {}).get('seed', payload['seed']),
'width': payload['width'],
'height': payload['height']
},
'info': result.get('info', '')
}
except Exception as e:
return {'error': str(e)}
Image-to-Image Processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def process_img2img_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process image-to-image transformation job"""
# Decode input images
init_images = []
for img_b64 in job_data.get('init_images', []):
img_data = base64.b64decode(img_b64)
img = Image.open(io.BytesIO(img_data))
init_images.append(img)
payload = {
'init_images': job_data.get('init_images', []),
'prompt': job_data.get('prompt', ''),
'negative_prompt': job_data.get('negative_prompt', ''),
'denoising_strength': job_data.get('denoising_strength', 0.75),
'width': job_data.get('width', 512),
'height': job_data.get('height', 512),
'steps': job_data.get('steps', 20),
'cfg_scale': job_data.get('cfg_scale', 7.5)
}
try:
response = requests.post(
f'{self.webui_url}/sdapi/v1/img2img',
json=payload,
timeout=600
)
response.raise_for_status()
result = response.json()
return format_sd_response(result, payload)
except Exception as e:
return {'error': str(e)}
Model Management
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def list_models(self) -> Dict[str, Any]:
"""List available Stable Diffusion models"""
try:
response = requests.get(f'{self.webui_url}/sdapi/v1/sd-models')
response.raise_for_status()
models = response.json()
return {
'models': [
{
'name': model['title'],
'filename': model['filename'],
'hash': model.get('hash', ''),
'loaded': model.get('model_name') == self.get_current_model()
}
for model in models
]
}
except Exception as e:
return {'error': str(e)}
def switch_model(self, model_name: str) -> Dict[str, Any]:
"""Switch to a different SD model"""
try:
payload = {'sd_model_checkpoint': model_name}
response = requests.post(
f'{self.webui_url}/sdapi/v1/options',
json=payload
)
response.raise_for_status()
return {'success': True, 'model': model_name}
except Exception as e:
return {'error': str(e)}
Job Queue Integration
Redis Job Polling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def main_worker_loop(self):
"""Main worker loop - polls Redis for jobs"""
llm_worker = LLMWorker()
sd_worker = SDWorker()
# Initialize workers
llm_worker.start_server()
if os.getenv('RUN_SD', 'true').lower() == 'true':
sd_worker.start_webui()
while True:
try:
# Poll Redis queue (blocking pop with timeout)
job_data = self.redis_client.brpop('llama_queue', timeout=10)
if job_data:
queue_name, job_json = job_data
job = json.loads(job_json)
# Process job based on type
result = self.process_job(job, llm_worker, sd_worker)
# Store result in Redis
result_key = f"result:{job['id']}"
self.redis_client.setex(
result_key,
3600, # 1 hour TTL
json.dumps(result)
)
except Exception as e:
print(f"Worker error: {e}")
time.sleep(1)
Job Processing Router
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def process_job(self, job: Dict[str, Any], llm_worker: LLMWorker, sd_worker: SDWorker) -> Dict[str, Any]:
"""Route job to appropriate worker"""
job_type = job.get('type', '')
job_data = job.get('data', {})
try:
if job_type == 'completion':
return llm_worker.process_completion_job(job_data)
elif job_type == 'chat_completion':
return llm_worker.process_chat_completion_job(job_data)
elif job_type == 'txt2img':
return sd_worker.process_txt2img_job(job_data)
elif job_type == 'img2img':
return sd_worker.process_img2img_job(job_data)
else:
return {'error': f'Unknown job type: {job_type}'}
except Exception as e:
return {'error': str(e)}
GPU Memory Management
Dynamic Memory Allocation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def optimize_gpu_memory(self):
"""Optimize GPU memory usage"""
# Get available GPU memory
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
available_mb = mem_info.free // 1024 // 1024
# Adjust model parameters based on available memory
if available_mb < 4000: # Less than 4GB
self.gpu_layers = min(self.gpu_layers, 20)
self.context_size = min(self.context_size, 2048)
elif available_mb < 8000: # Less than 8GB
self.gpu_layers = min(self.gpu_layers, 30)
self.context_size = min(self.context_size, 4096)
Model Unloading
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def unload_model_if_needed(self):
"""Unload model to free GPU memory"""
# Check if no jobs processed recently
last_job_time = self.redis_client.get('last_job_time')
if last_job_time:
elapsed = time.time() - float(last_job_time)
if elapsed > 300: # 5 minutes idle
self.unload_models()
def unload_models(self):
"""Unload all models from GPU memory"""
if self.llm_worker.server_process:
self.llm_worker.server_process.terminate()
self.llm_worker.model_loaded = False
# Clear GPU cache
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
Performance Optimization
Batch Processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def process_batch_jobs(self, jobs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process multiple jobs in batch for efficiency"""
# Group jobs by type
txt2img_jobs = [j for j in jobs if j['type'] == 'txt2img']
completion_jobs = [j for j in jobs if j['type'] == 'completion']
results = []
# Process image generation jobs in batch
if txt2img_jobs:
batch_result = self.sd_worker.process_batch_txt2img(txt2img_jobs)
results.extend(batch_result)
# Process text completion jobs sequentially (for now)
for job in completion_jobs:
result = self.llm_worker.process_completion_job(job['data'])
results.append(result)
return results
Caching
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def cache_frequently_used_prompts(self, prompt: str, result: Dict[str, Any]):
"""Cache results for frequently used prompts"""
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
cache_key = f"prompt_cache:{prompt_hash}"
# Store in Redis with TTL
self.redis_client.setex(
cache_key,
7200, # 2 hours
json.dumps(result)
)
def get_cached_result(self, prompt: str) -> Optional[Dict[str, Any]]:
"""Get cached result for prompt"""
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
cache_key = f"prompt_cache:{prompt_hash}"
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
Configuration
Environment Variables
Variable | Default | Description |
---|---|---|
GPU_LAYERS |
35 |
Number of model layers on GPU |
CUDA_VISIBLE_DEVICES |
0 |
GPU device ID |
MODEL_PATH |
./data/models/text/model.gguf |
LLM model path |
SD_MODEL_PATH |
./data/models/image/ |
SD models directory |
CONTEXT_SIZE |
4096 |
LLM context window |
RUN_SD |
true |
Enable Stable Diffusion |
REDIS_URL |
redis://redis:6379 |
Redis connection |
WORKER_TIMEOUT |
600 |
Job processing timeout |
Docker Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
FROM nvidia/cuda:11.8-devel-ubuntu20.04
# Install Python and dependencies
RUN apt-get update && apt-get install -y \
python3 python3-pip git wget curl
WORKDIR /app
# Install Python requirements
COPY requirements.txt .
RUN pip3 install -r requirements.txt
# Copy worker code
COPY llm.py sd.py ./
COPY data/ ./data/
# Set permissions
RUN chmod +x ./data/llama/undreamai_server
# Non-root user
RUN useradd -m -u 1000 worker
USER worker
# Health check
HEALTHCHECK --interval=60s --timeout=30s --start-period=120s --retries=3 \
CMD python3 -c "import requests; requests.get('http://localhost:1337/health')"
# Start worker
CMD ["python3", "llm.py"]
Monitoring
Worker Health Checks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def health_check(self) -> Dict[str, Any]:
"""Comprehensive worker health check"""
health = {
'timestamp': time.time(),
'gpu_available': torch.cuda.is_available(),
'models_loaded': {}
}
# Check LLM worker
try:
response = requests.get('http://localhost:1337/health', timeout=5)
health['llm_worker'] = response.status_code == 200
health['models_loaded']['llm'] = self.llm_worker.model_loaded
except:
health['llm_worker'] = False
health['models_loaded']['llm'] = False
# Check SD worker
if os.getenv('RUN_SD', 'true').lower() == 'true':
try:
response = requests.get('http://localhost:7860/sdapi/v1/progress', timeout=5)
health['sd_worker'] = response.status_code == 200
health['models_loaded']['sd'] = self.sd_worker.initialized
except:
health['sd_worker'] = False
health['models_loaded']['sd'] = False
# GPU memory info
if torch.cuda.is_available():
health['gpu_memory'] = {
'allocated': torch.cuda.memory_allocated(),
'cached': torch.cuda.memory_reserved(),
'total': torch.cuda.get_device_properties(0).total_memory
}
return health
Performance Metrics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def log_performance_metrics(self, job_type: str, duration: float, success: bool):
"""Log performance metrics"""
metrics = {
'job_type': job_type,
'duration': duration,
'success': success,
'timestamp': time.time(),
'gpu_utilization': self.get_gpu_utilization()
}
# Store in Redis for monitoring
metrics_key = f"metrics:{int(time.time())}"
self.redis_client.setex(metrics_key, 86400, json.dumps(metrics)) # 24 hour TTL
Troubleshooting
Common Issues
- Model Loading Failures
1 2 3 4 5 6 7
# Check model file exists and is readable import os model_path = './data/models/text/model.gguf' if not os.path.exists(model_path): print(f"Model file not found: {model_path}") elif not os.access(model_path, os.R_OK): print(f"Model file not readable: {model_path}")
- GPU Memory Issues
1 2 3 4 5 6
# Monitor GPU memory usage import pynvml pynvml.nvmlInit() handle = pynvml.nvmlDeviceGetHandleByIndex(0) mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle) print(f"GPU Memory: {mem_info.used/1024**3:.1f}GB used, {mem_info.free/1024**3:.1f}GB free")
- Worker Communication Issues
1 2 3 4 5 6
# Test Redis connectivity try: self.redis_client.ping() print("Redis connection OK") except Exception as e: print(f"Redis connection failed: {e}")
Debug Mode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Enable verbose logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Add timing information
import time
def timed_function(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
print(f"{func.__name__} took {duration:.2f} seconds")
return result
return wrapper
Next Steps
- Redis Queue - Understand job queue management
- API Service - Learn about API integration
- Monitoring - Set up GPU monitoring