EmProps Job Queue: System Architecture Overview
Executive Summary
EmProps Job Queue is a distributed AI workload broker designed to orchestrate computationally intensive tasks (image generation, video processing, LLM inference) across ephemeral, geographically distributed machines. Unlike traditional cloud infrastructure, this system solves the unique challenges of spot instance markets (SALAD, vast.ai) where machines scale elastically from 10 to 50 and back daily, have no shared storage, and must deliver sub-10-second job matching despite constant machine churn.
Core Value Proposition: Enable cost-effective, elastic AI workload processing by intelligently routing jobs to specialized machine pools while managing multi-gigabyte model downloads, ensuring high availability despite ephemeral infrastructure, and providing real-time progress visibility to end users.
Current State: Foundation phase with Redis-based job brokering, pull-based workers, PM2-managed machine services, and WebSocket monitoring operational.
Target State: Specialized machine pools (Fast Lane/Standard/Heavy) with predictive model placement achieving 95% optimal job routing and sub-10-second wait times for 95% of jobs.
System Overview
High-Level Architecture
Architectural Patterns
- Pull-Based Worker Model: Workers poll Redis for jobs they can handle (capability matching), eliminating the need for centralized worker management
- Redis as Single Source of Truth: All job state, worker status, and coordination happens through Redis (atomic operations + pub/sub)
- Atomic Job Claiming: Redis Lua functions ensure exactly-once job assignment even with concurrent worker requests
- Event-Driven Communication: WebSocket broadcasts + Redis pub/sub for real-time progress updates
- PM2 Service Orchestration: Each machine runs PM2 to manage multiple GPU-specific services and worker processes
- Connector Pattern: Pluggable service integrations (ComfyUI, Ollama, OpenAI) isolate external API concerns from core worker logic
- Stateless API Server: API server is a thin orchestration layer; all state lives in Redis
- Telemetry-First Design: OpenTelemetry instrumentation for distributed tracing across jobs, workflows, and machine lifecycles
Core Components
1. API Server (apps/api)
Purpose: Lightweight HTTP + WebSocket server for job submission, progress streaming, and system monitoring.
Key Responsibilities:
- Job submission endpoint (
POST /submit-job) - WebSocket connections for real-time progress (
/ws) - Monitor dashboard WebSocket (
/monitor) - Health checks and system status
- Machine restart commands (full machine + individual PM2 services)
- Redis pub/sub event relay to WebSocket clients
Technology Stack: Express.js, WebSocket (ws), ioredis, OpenTelemetry
Architecture Decisions:
- Why WebSocket? Sub-second latency for progress updates critical for user experience (vs. polling or SSE)
- Why Stateless? Allows horizontal scaling; Redis holds all state
- Why No Database? Jobs are ephemeral (24-hour TTL); Redis provides sufficient persistence + speed
Critical Files:
/apps/api/src/index.ts- Entry point with telemetry initialization/apps/api/src/lightweight-api-server.ts- Main server class with HTTP routes + WebSocket handling/apps/api/src/hybrid-client.ts- Combined Redis + WebSocket client for job submission
Data Flow Example (Job Submission):
1. Client → POST /submit-job → API Server
2. API Server → Generate job ID (UUID)
3. API Server → Store job data in Redis hash (`job:{id}`)
4. API Server → Add job to priority queue (ZADD jobs:pending)
5. API Server → Publish job_submitted event (Redis pub/sub)
6. API Server → Broadcast event to WebSocket clients
7. Return job ID to clientDesign Constraints:
- No silent failures: Explicit errors preferred over fallbacks (e.g., missing
AUTH_TOKENis fatal) - Redis connection failures trigger exponential backoff retries (max 60s)
- WebSocket ping/pong tracking prevents zombie connections
2. Worker Client (apps/worker)
Purpose: Job processor that claims jobs from Redis, executes them via connectors, and reports results.
Key Responsibilities:
- Register capabilities with Redis (services, hardware, models)
- Poll Redis for matching jobs (via
findMatchingJobLua function) - Execute jobs through appropriate connector
- Report progress updates (0-100%)
- Save output assets to Azure Blob Storage
- Handle job failures with retry logic
- Maintain heartbeat for liveness detection
Technology Stack: TypeScript, ioredis, connectors (per service type)
Architecture Decisions:
- Why Pull-Based? Workers know their own capacity; avoids centralized scheduling bottlenecks
- Why Redis Direct? Eliminates intermediate message queue; atomic operations prevent double-claiming
- Why Connector Pattern? Isolates service-specific logic (ComfyUI WebSocket, Ollama HTTP, OpenAI API); allows adding new services without core changes
Critical Files:
/apps/worker/src/redis-direct-worker-client.ts- Main worker client with job lifecycle management/apps/worker/src/redis-direct-worker.ts- Entry point with capability registration/apps/worker/src/connectors/- Service-specific job processors (ComfyUI, Ollama, OpenAI, etc.)/apps/worker/src/connectors/base-connector.ts- Abstract base class with shared logic (progress reporting, error handling)/apps/worker/src/connectors/asset-saver.ts- Handles output asset uploads to Azure/S3
Connector Architecture:
abstract class BaseConnector {
abstract processJob(job: Job, progress: ProgressCallback): Promise<JobResult>
// Shared utilities:
protected reportProgress(percent: number, message: string)
protected handleError(error: Error): FailureClassification
protected saveAsset(data: Buffer, mimeType: string): Promise<string>
}
// Example implementation:
class ComfyUIConnector extends BaseConnector {
async processJob(job: Job, progress: ProgressCallback) {
// 1. Connect to ComfyUI WebSocket
// 2. Submit workflow payload
// 3. Monitor execution queue
// 4. Stream progress events (0% → 100%)
// 5. Download output images
// 6. Upload to Azure Blob
// 7. Return URLs
}
}Job Processing Flow:
1. Worker polls Redis → findMatchingJob(capabilities)
2. Redis Function finds matching job → Claims atomically
3. Worker receives job → Updates status to IN_PROGRESS
4. Worker delegates to connector → processJob()
5. Connector reports progress → Worker publishes to Redis
6. API Server relays progress → WebSocket clients
7. Connector completes → Worker saves assets
8. Worker publishes completion event → Redis
9. Webhook Service delivers notification → External client
10. Worker updates status to IDLE → Ready for next jobFailure Handling Strategy:
- Transient Failures (network errors, service timeouts): Retry with exponential backoff (max 3 attempts)
- Permanent Failures (invalid payload, missing models): Mark job as FAILED, publish event, no retry
- Failure Classification: Structured error analysis (timeout, validation, external_api_error, etc.) stored in Redis attestations
- Worker Attestations: Persistent records of job completions/failures for forensic analysis (7-day TTL)
3. Machine Service (apps/machine)
Purpose: PM2-based orchestrator for managing GPU services and worker processes on ephemeral machines.
Key Responsibilities:
- Install and configure ComfyUI with 64 custom nodes (parallel installation)
- Start PM2-managed services (per-GPU ComfyUI instances, worker processes)
- Monitor service health and restart on failures
- Aggregate machine status (unified reporting across all services)
- Manage machine lifecycle (startup, shutdown, restart)
- Install models and dependencies at runtime
Technology Stack: Node.js, PM2, Docker, ComfyUI, custom installers
Architecture Decisions:
- Why PM2? Process management with automatic restarts, log aggregation, and per-service isolation
- Why Per-GPU Services? Maximizes GPU utilization; each GPU runs independent ComfyUI instance
- Why Custom Node Installer? Supports recursive clones, parallel installation (5 concurrent), environment variable interpolation
- Why Not Bake Everything? Models (2-15GB) exceed practical container sizes; dynamic downloads required
Critical Files:
/apps/machine/src/index-pm2.js- Main entry point with PM2 orchestration/apps/machine/src/services/comfyui-installer.js- Enhanced custom node installer/apps/machine/src/services/comfyui-service.js- Per-GPU ComfyUI service manager/apps/machine/src/services/machine-status-aggregator.js- Unified status reporting/apps/machine/src/config/service-mapping.json- Service-to-worker configuration
Machine Startup Flow:
1. Container starts → Load environment config
2. Initialize telemetry client → Register machine
3. Install ComfyUI custom nodes (64 nodes, parallel)
4. Parse WORKERS env (e.g., "comfyui:2,ollama:1")
5. Start PM2 services:
- comfyui-gpu0 (GPU 0)
- comfyui-gpu1 (GPU 1)
- worker-comfyui-0 (connects to gpu0)
- worker-comfyui-1 (connects to gpu1)
6. Services register with Redis
7. Machine status aggregator reports unified status
8. Workers begin polling for jobsPM2 Service Configuration Example:
{
name: 'comfyui-gpu0',
script: '/workspace/ComfyUI/main.py',
interpreter: 'python',
args: '--gpu-index 0 --port 8188',
env: {
CUDA_VISIBLE_DEVICES: '0',
PYTORCH_CUDA_ALLOC_CONF: 'max_split_size_mb:512'
},
max_restarts: 10,
restart_delay: 5000
}Custom Node Installation:
- Supports
git cloneURLs with${VARIABLE}interpolation - Parallel installation (5 concurrent) for 64 nodes
- Auto-installs requirements.txt if
requirements: true - Creates
.envfiles with environment variables - Recursive clone support for submodules
4. Redis Architecture
Purpose: Distributed coordination layer for job queuing, worker discovery, and real-time events.
Data Structures:
| Key Pattern | Type | Purpose | TTL |
|---|---|---|---|
job:{id} | Hash | Job metadata, status, payload | 24h after completion |
jobs:pending | Sorted Set | Priority queue (score = priority * 1M + timestamp) | N/A |
jobs:completed | Hash | Completed job results | 24h |
jobs:failed | Hash | Failed job records | 7 days |
worker:{id} | Hash | Worker capabilities, status, heartbeat | 60s heartbeat |
workers:active | Set | Active worker IDs | N/A |
worker:{id}:heartbeat | String | Liveness check | 60s |
machine:{id}:status | Hash | Unified machine status | 90s |
worker:completion:{job}:attempt:{n} | String | Worker attestation | 7 days |
Pub/Sub Channels:
| Channel | Purpose | Subscribers |
|---|---|---|
update_job_progress | Job progress updates (0-100%) | API Server → WebSocket clients |
complete_job | Job completion events | API Server, Webhook Service |
job_failed | Permanent job failures | API Server, Webhook Service |
worker:events | Worker connect/disconnect | API Server → Monitor |
worker_status | Worker status changes (idle/busy) | API Server → Monitor |
machine:startup:events | Machine lifecycle events | API Server → Monitor |
Redis Functions (Server-Side Lua):
findMatchingJob(capabilities, maxScan):
- Atomically finds and claims first matching job
- Capability matching logic:
- Service type match (
comfyui,ollama,openai) - Hardware requirements (GPU memory, CPU cores)
- Model availability (checks worker's
models.{service}array) - Custom requirements (nested object comparison)
- Service type match (
- Scoring: Priority-first, then FIFO within priority
- Updates worker status to
busy, job status toASSIGNED - Returns
{ jobId, job }ornull
Why Redis Functions?
- Atomicity: Prevents race conditions (two workers claiming same job)
- Performance: Server-side execution eliminates network round-trips
- Consistency: All workers use identical matching logic
Example Lua Logic (simplified):
-- Scan pending jobs (sorted by priority)
local jobIds = redis.call('ZREVRANGE', 'jobs:pending', 0, maxScan)
for _, jobId in ipairs(jobIds) do
local job = redis.call('HGETALL', 'job:' .. jobId)
-- Check service type
if job.service_required == capabilities.services[1] then
-- Check hardware requirements
if job.requirements.gpu_memory <= capabilities.hardware.gpu_memory_gb then
-- Atomic claim: remove from pending queue
redis.call('ZREM', 'jobs:pending', jobId)
-- Assign to worker
redis.call('HMSET', 'job:' .. jobId,
'worker_id', capabilities.worker_id,
'status', 'assigned',
'assigned_at', timestamp
)
-- Update worker status
redis.call('HMSET', 'worker:' .. capabilities.worker_id,
'status', 'busy',
'current_job_id', jobId
)
return cjson.encode({ jobId = jobId, job = job })
end
end
end
return nil -- No matching job found5. Monitor Dashboard (apps/monitor)
Purpose: Real-time system monitoring and operations dashboard.
Key Features:
- Machine cards with status (online/offline/busy)
- Worker status and current jobs
- Job queue visualization (pending/in-progress/completed)
- Live logs viewer (PM2 service logs)
- Machine restart controls (full machine + individual services)
- Service connection status (ComfyUI, Ollama availability)
Technology Stack: Next.js 15, React, WebSocket, Tailwind CSS, shadcn/ui
Architecture Decisions:
- Why Next.js? Server-side rendering for initial state, WebSocket for real-time updates
- Why WebSocket? Sub-second latency for machine status changes
- Why No Polling? Wasteful; Redis pub/sub + WebSocket push model is more efficient
Critical Files:
/apps/monitor/src/app/page.tsx- Main dashboard/apps/monitor/src/lib/websocket-client.ts- WebSocket connection manager/apps/monitor/src/components/machine-card.tsx- Individual machine display
Monitor WebSocket Protocol:
// Client → Server (subscribe to topics)
{
type: 'monitor_subscribe',
topics: ['machines', 'workers', 'jobs', 'system_status']
}
// Server → Client (machine status update)
{
type: 'machine_status',
machine_id: 'salad-001',
status: 'online',
services: [
{ name: 'comfyui-gpu0', status: 'running', pid: 1234 },
{ name: 'worker-0', status: 'running', jobs_completed: 42 }
],
last_heartbeat: '2025-10-05T12:34:56Z'
}6. Webhook Service (apps/webhook-service)
Purpose: Reliable HTTP notification delivery for job lifecycle events.
Key Responsibilities:
- Subscribe to Redis pub/sub events (
complete_job,job_failed) - Deliver HTTP POST notifications to registered webhooks
- Retry failed deliveries (exponential backoff, max 5 attempts)
- Track delivery status in Redis
- Emit telemetry for webhook delivery success/failure
Technology Stack: Express.js, ioredis, OpenTelemetry
Architecture Decisions:
- Why Separate Service? Decouples notification delivery from API server; allows independent scaling
- Why Redis Pub/Sub? Already available; no additional infrastructure needed
- Why Retry Logic? Network failures are common; ensures eventual delivery
Webhook Payload Example:
{
"event_type": "job_completed",
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"workflow_id": "wf_abc123",
"status": "completed",
"result": {
"data": {
"images": [
"https://storage.azure.com/outputs/image1.png"
]
}
},
"timestamp": 1696512000000
}7. EmProps API Integration (apps/emprops-api)
Purpose: Platform-specific API for the EmProps ecosystem (user workflows, collections, payments).
Key Responsibilities:
- User authentication and authorization (JWT)
- Workflow composition (multi-step job orchestration)
- Collection management (reusable workflow templates)
- Payment processing (Stripe integration)
- Asset management (IPFS + Azure Blob)
- Database persistence (PostgreSQL via Prisma)
Technology Stack: Express.js, Prisma, PostgreSQL, Socket.io, Stripe, IPFS
Integration with Job Queue:
- Submits jobs to API Server via HTTP (
POST /submit-job) - Receives progress updates via WebSocket subscription
- Stores job results in PostgreSQL for user access
- Manages long-running workflows (multi-step sequences)
Why Separate from API Server?
- Separation of Concerns: Job queue is infrastructure; EmProps API is product layer
- Independent Scaling: User-facing API has different load patterns than job orchestration
- Database Coupling: EmProps API requires PostgreSQL; job queue uses Redis exclusively
8. Shared Packages (packages/)
@emp/core:
- Shared TypeScript types (
Job,WorkerCapabilities,JobStatus, etc.) - Redis function installer and manager
- Common utilities (logging, telemetry instrumentation)
- Event broadcaster for WebSocket relay
- Job and workflow instrumentation helpers
@emp/telemetry:
- OpenTelemetry client wrapper
- Unified logging (Winston + OTLP exporter)
- Distributed tracing utilities
- Log file monitoring and streaming
@emp/database (Prisma):
- PostgreSQL schema and migrations
- Prisma Client generation
- Database utilities
@emp/service-config:
- Environment configuration management
- Service mapping definitions (worker-to-service relationships)
Key Technical Decisions
1. Why Redis Instead of RabbitMQ/Kafka?
Decision: Use Redis for job queue + pub/sub + state management.
Rationale:
- Simplicity: Single dependency for queuing, state, and pub/sub
- Performance: Sub-millisecond latency for job matching (Lua functions run in-memory)
- Atomic Operations: Native support for atomic claims (ZREM + HMSET in Lua)
- Familiarity: Team expertise with Redis; lower operational complexity
- Cost: One managed Redis instance vs. separate queue + cache
Trade-offs Accepted:
- No durable message guarantees (acceptable: jobs are ephemeral, 24h TTL)
- Limited message replay (acceptable: real-time system, not event sourcing)
- Single point of failure (mitigated: Redis persistence + managed service uptime)
2. Why Pull-Based Workers Instead of Push-Based?
Decision: Workers poll Redis for jobs they can handle.
Rationale:
- Self-Governing Capacity: Workers know their own load; no centralized scheduler needed
- Capability Matching: Workers declare what they can process; Redis Function finds compatible jobs
- Resilience: No need to track worker availability; workers claim when ready
- Simplicity: Eliminates complex worker registry and assignment logic
Trade-offs Accepted:
- Polling overhead (mitigated: 1-second interval, conditional on idle status)
- No instant job assignment (acceptable: 1s average latency is tolerable)
3. Why PM2 Instead of Kubernetes?
Decision: Use PM2 for process management on individual machines.
Rationale:
- Ephemeral Infrastructure: Machines are spot instances (SALAD/vast.ai); no K8s cluster available
- Per-Machine Orchestration: Each machine independently manages its services
- GPU Isolation: Per-GPU services require explicit CUDA device assignment (PM2 env vars)
- Lightweight: PM2 has minimal overhead vs. K8s agent on already resource-constrained machines
- Proven: PM2 battle-tested for Node.js process management; extensive logging and restart policies
Trade-offs Accepted:
- No cross-machine orchestration (acceptable: Redis handles coordination)
- Manual service configuration (mitigated: templated PM2 ecosystem files)
4. Why Custom Connectors Instead of Direct API Calls?
Decision: Abstract service integrations behind connector interface.
Rationale:
- Service Diversity: ComfyUI uses WebSocket, Ollama uses HTTP streaming, OpenAI uses REST
- Error Handling: Each service has unique failure modes; connectors normalize them
- Progress Reporting: Connectors translate service-specific events to standard 0-100% progress
- Testing: Mock connectors for testing without running actual services
- Evolution: Add new services (Runway, Replicate) without changing worker core
Connector Interface:
interface ConnectorInterface {
processJob(job: Job, progress: ProgressCallback): Promise<JobResult>
healthCheck(): Promise<HealthStatus>
getCapabilities(): ServiceCapabilities
}5. Why OpenTelemetry Instead of Custom Logging?
Decision: Use OpenTelemetry for distributed tracing and structured logging.
Rationale:
- Distributed Tracing: Track jobs across API → Worker → Connector → External Service
- Context Propagation: Trace IDs flow through entire system (HTTP headers, Redis fields, logs)
- Vendor Agnostic: Can send to Jaeger, Zipkin, Grafana, or custom backends
- Structured Logs: Automatic correlation of logs with traces and spans
- Industry Standard: Well-supported tooling and ecosystem
Implementation:
- Telemetry Collector service receives OTLP HTTP spans and logs
- Stores in PostgreSQL for querying and analysis
- Workers emit job-level spans with custom attributes (job_id, workflow_id, service_type)
Production Constraints and Solutions
Constraint 1: No Shared Storage
Problem: Machines are geographically distributed with no shared filesystem.
Solution:
- All output assets uploaded to Azure Blob Storage
- Workers receive pre-signed URLs for input assets
- Job payloads contain URLs, not raw data
- Asset Saver connector handles uploads with retry logic
Example Flow:
1. User uploads image → EmProps API → Azure Blob → Returns URL
2. EmProps API submits job with input_url in payload
3. Worker downloads from URL → Processes → Uploads output to Azure
4. Worker returns output URL in job resultConstraint 2: Ephemeral Machines
Problem: Machines spin up/down constantly; jobs must survive machine churn.
Solution:
- Job state lives in Redis (survives machine restarts)
- Workers are stateless (no local job storage)
- Job retry logic handles mid-flight failures
- Worker heartbeats detect stale machines; jobs auto-requeued
Failure Scenario:
1. Worker claims job → Starts processing
2. Machine crashes (spot instance reclaimed)
3. Worker heartbeat expires (60s TTL)
4. API server detects stale worker → Re-adds job to pending queue
5. Another worker claims and completes jobConstraint 3: Large Model Files
Problem: Models are 2-15GB; downloading on every job is impractical.
Current Solution (Phase 0):
- Models downloaded once per machine at startup
- Cached in
/workspace/models(ephemeral, lost on restart) - Workers check model availability before claiming jobs
Future Solution (Phase 2):
- Bake common models into container images (pool-specific)
- Predictive model placement based on job history
- Intelligent cache eviction for dynamic models
Constraint 4: Performance Heterogeneity
Problem: 1-second Ollama jobs vs. 10-minute video jobs cause resource contention.
Current Solution:
- All machines are uniform (same resource allocation)
- No job duration-based routing
Future Solution (Phase 1):
- Fast Lane Pool: CPU-optimized, 20-40GB storage, text/simple tasks
- Standard Pool: Balanced GPU, 80-120GB storage, typical workflows
- Heavy Pool: High-end GPU, 150-300GB storage, video/complex tasks
- Redis Function enhanced for pool-aware routing
Data Flow Narratives
Narrative 1: Single Job Execution
User Story: Client submits ComfyUI image generation job.
Step-by-Step Flow:
Job Submission:
- Client sends
POST /submit-jobwith payload:json{ "service_required": "comfyui", "priority": 50, "payload": { "workflow": { ... }, "positive": "a cat", "negative": "blurry" }, "requirements": { "gpu_memory": 8 } } - API Server generates job ID:
job_123 - Stores in Redis:
HMSET job:job_123 {service_required: comfyui, status: pending, ...} - Adds to queue:
ZADD jobs:pending {score} job_123 - Publishes event:
PUBLISH job_submitted {...} - Returns job ID to client
- Client sends
Job Discovery:
- Worker polls Redis:
FCALL findMatchingJob 0 {capabilities} 100 - Redis Function scans pending jobs (sorted by priority)
- Finds
job_123matches worker capabilities (has ComfyUI, 16GB GPU) - Atomically claims:
ZREM jobs:pending job_123+HMSET job:job_123 {worker_id: worker_1, status: assigned} - Returns
{jobId: job_123, job: {...}}to worker
- Worker polls Redis:
Job Execution:
- Worker updates status:
HMSET job:job_123 {status: in_progress, started_at: ...} - Worker selects connector:
ComfyUIConnector - Connector connects to ComfyUI WebSocket:
ws://localhost:8188 - Connector submits workflow:
{prompt: {workflow}, client_id: worker_1} - ComfyUI queues job: Returns
{prompt_id: prompt_456}
- Worker updates status:
Progress Streaming:
- ComfyUI sends progress events:
{type: progress, data: {value: 0.25}} - Connector calculates percent:
25% - Connector calls progress callback
- Worker publishes:
PUBLISH update_job_progress {job_id: job_123, progress: 25, message: "Generating..."} - API Server receives event → Broadcasts to WebSocket clients
- Client UI updates progress bar
- ComfyUI sends progress events:
Job Completion:
- ComfyUI finishes:
{type: executed, data: {output: {images: [...]}}} - Connector downloads output images from ComfyUI
- Worker invokes Asset Saver:
- Uploads to Azure Blob:
PUT https://emprops.blob.core.windows.net/outputs/job_123_0.png - Returns URL:
https://emprops.blob.core.windows.net/outputs/job_123_0.png
- Uploads to Azure Blob:
- Worker creates completion attestation:
SET worker:completion:step-id:job_123 {...} - Worker updates job:
HMSET job:job_123 {status: completed, completed_at: ...} - Worker stores result:
HSET jobs:completed job_123 {success: true, data: {images: [url]}} - Worker publishes:
PUBLISH complete_job {job_id: job_123, result: {...}} - API Server broadcasts to client WebSocket
- Webhook Service delivers notification to registered endpoint
- ComfyUI finishes:
Cleanup:
- Worker updates status:
HMSET worker:worker_1 {status: idle, current_job_id: ""} - Worker polls for next job (1s interval)
- Redis expires job after 24 hours:
EXPIRE job:job_123 86400
- Worker updates status:
Total Latency (typical):
- Job submission → Job claimed: ~1s (poll interval)
- Job claimed → Execution start: ~100ms (worker processing)
- Execution duration: ~30s (ComfyUI generation)
- Asset upload: ~2s (image upload to Azure)
- Total: ~33s end-to-end
Narrative 2: Multi-Step Workflow
User Story: EmProps API submits 3-step workflow (text → image → upscale).
Workflow Definition:
{
"workflow_id": "wf_abc123",
"steps": [
{
"step": 1,
"service_required": "ollama",
"payload": { "model": "llama3", "prompt": "describe a sunset" }
},
{
"step": 2,
"service_required": "comfyui",
"payload": { "positive": "{STEP_1_OUTPUT}" }
},
{
"step": 3,
"service_required": "comfyui",
"payload": { "workflow": "upscale", "input": "{STEP_2_OUTPUT}" }
}
]
}Execution Flow:
Workflow Submission:
- EmProps API submits step 1 to Job Queue API:
POST /submit-job - Job payload includes:
{workflow_id: wf_abc123, current_step: 1, total_steps: 3} - API creates workflow trace context:
workflowTraceContexts.set(wf_abc123, {traceId, spanId})
- EmProps API submits step 1 to Job Queue API:
Step 1 Execution (Ollama):
- Worker claims job → Processes via
OllamaConnector - Ollama generates text:
"A vibrant sunset with orange and purple hues..." - Worker completes job → Publishes
complete_jobevent with workflow fields
- Worker claims job → Processes via
Workflow Orchestration (EmProps API):
- Webhook Service receives completion → Delivers to EmProps API
- EmProps API checks:
current_step < total_steps→ More steps to go - EmProps API substitutes output:
{positive: "A vibrant sunset with orange and purple hues..."} - EmProps API submits step 2:
POST /submit-job {workflow_id: wf_abc123, current_step: 2, ...}
Step 2 Execution (ComfyUI Image Generation):
- Worker claims job → Generates image via ComfyUI
- Uploads to Azure:
https://.../wf_abc123_step2.png - Publishes completion with workflow context
Step 3 Execution (ComfyUI Upscale):
- EmProps API submits final step with image URL
- Worker upscales → Uploads final output
- Publishes completion:
{workflow_id: wf_abc123, current_step: 3, total_steps: 3}
Workflow Completion:
- EmProps API receives final step → Marks workflow complete
- Stores final result in PostgreSQL
- Notifies user via platform notification
Workflow Failure Handling:
- If step 2 fails permanently → EmProps API marks workflow as failed
- Does NOT submit step 3 (cascading prevention)
- User receives failure notification with error details
Observability and Telemetry
Distributed Tracing
Trace Propagation:
HTTP Request → API Server (Span 1)
→ Redis Job Storage
→ Worker Claim (Span 2, parent: Span 1)
→ Connector Execution (Span 3, parent: Span 2)
→ External Service Call (Span 4, parent: Span 3)Trace Context Storage:
- Job creation: API generates trace ID + span ID
- Stored in Redis:
job:job_123 {job_trace_id, job_span_id} - Worker retrieves trace context → Creates child span
- Connector propagates trace ID to external services (HTTP headers)
Example Span Attributes:
{
'job.id': 'job_123',
'job.service_required': 'comfyui',
'job.priority': 50,
'workflow.id': 'wf_abc123',
'workflow.current_step': 2,
'workflow.total_steps': 3,
'worker.id': 'worker_1',
'machine.id': 'salad-001'
}Logging Strategy
Log Levels:
- ERROR: Unrecoverable failures (Redis connection lost, job processing crash)
- WARN: Recoverable issues (transient network errors, retry attempts)
- INFO: Lifecycle events (job submitted, worker registered, machine startup)
- DEBUG: Detailed tracing (Redis calls, connector state changes) - only in development
Log Aggregation:
- Winston daily rotate files:
/workspace/logs/combined-{date}.log - PM2 service logs:
/workspace/logs/{service}-out.log,/workspace/logs/{service}-error.log - Telemetry Collector streams logs to PostgreSQL with trace correlation
- Monitor UI provides real-time log viewer (WebSocket streaming)
Key Metrics
| Metric | Description | Target |
|---|---|---|
| Job claim latency | Time from submission to worker claim | < 2s (p95) |
| Job processing time | Execution duration by service type | Varies by service |
| Worker utilization | % time spent processing vs. idle | > 80% |
| Job success rate | Completed / (Completed + Failed) | > 99% |
| Asset upload time | Time to save outputs to Azure | < 5s (p95) |
| WebSocket message latency | Progress event publish to client receive | < 100ms |
Evolution Path: Current → North Star
Phase 0: Foundation (Current State)
Achievements:
- ✅ Redis-based job broker with atomic matching
- ✅ Pull-based workers with capability registration
- ✅ PM2-managed machine services
- ✅ WebSocket monitoring with real-time updates
- ✅ ComfyUI with 64 custom nodes (parallel installation)
- ✅ Asset saving to Azure Blob Storage
- ✅ OpenTelemetry distributed tracing
Limitations:
- All machines are uniform (no specialization)
- Models downloaded at runtime (first-user wait times)
- No intelligent routing (basic FIFO within priority)
Phase 1: Pool Separation (Months 1-2)
Goal: Eliminate performance heterogeneity by routing jobs to specialized machine pools.
Key Changes:
Pool-Specific Containers:
- Fast Lane: Baked with CPU models (Ollama, small vision models)
- Standard: Baked with common Stable Diffusion models
- Heavy: Baked with video models (AnimateDiff, SVD)
Enhanced Redis Function:
luafunction findMatchingJob(capabilities, maxScan) -- Determine worker pool type local poolType = getPoolType(capabilities) -- Filter jobs by pool compatibility for _, jobId in ipairs(pending) do local estimatedDuration = estimateDuration(job) if (poolType == 'fast' and estimatedDuration < 10) or (poolType == 'standard' and estimatedDuration < 300) or (poolType == 'heavy') then -- Claim job end end endPool-Aware Routing:
- API Server tags jobs with estimated duration
- Workers register pool type in capabilities
- Cross-pool fallback for busy pools
Deliverable: 95% of jobs route to optimal pools; 50% reduction in queue wait times.
Phase 2: Model Intelligence (Months 3-4)
Goal: Eliminate first-user model download wait times through predictive placement.
Key Changes:
TypeScript Model Manager (replaces Python SQLite):
- Tracks model usage patterns
- Predicts which models to pre-download on which pools
- Manages cache eviction (LRU with usage decay)
Model Affinity Routing:
- Redis Function prefers workers with required models already cached
- Falls back to workers that can download quickly
Baked Model Strategy:
- Top 10 models (80% of jobs) baked into pool images
- Long-tail models downloaded on-demand
- Persistent cache layer for popular models
Deliverable: 80% reduction in model download wait times; sub-10s wait for 95% of jobs.
Phase 3: Advanced Optimization (Months 5-6)
Goal: Resource optimization and specialization for 10x job volume support.
Key Changes:
ML-Based Demand Prediction:
- Predict job arrival patterns (time-of-day, service type)
- Pre-scale machine pools before demand spikes
- Auto-scale down during low utilization
Specialty Routing:
- Customer-specific machines for enterprise isolation
- GPU architecture-specific pools (A100 vs. H100)
- Geographic routing for latency-sensitive jobs
Intelligent Load Balancing:
- Job packing optimization (co-locate small jobs)
- Bin packing for GPU memory utilization
- Workflow-aware scheduling (reserve capacity for multi-step)
Deliverable: 99.9% job completion rate; 10x job volume vs. current capacity; 95% optimal routing.
Design Principles
1. Explicit Failures Over Silent Fallbacks
Philosophy: Errors should be loud and actionable, not hidden by default values.
Examples:
- ❌ Missing
AUTH_TOKEN→ Defaults todev-token(hides misconfiguration) - ✅ Missing
AUTH_TOKEN→ Throws fatal error with setup instructions
Why This Matters:
- Prevents "works on my machine" scenarios
- Guides users to correct solutions immediately
- Reduces debugging time (no wild goose chases)
2. Pull-Based Over Push-Based
Philosophy: Workers self-govern their capacity; no central scheduler.
Benefits:
- Workers know when they're ready (no external capacity tracking)
- Natural backpressure (workers don't poll when busy)
- Resilient to worker churn (no stale worker registries)
3. Redis as Single Source of Truth
Philosophy: All coordination happens through Redis; no split-brain scenarios.
Implications:
- Job state lives exclusively in Redis
- Worker status is Redis-authoritative (heartbeats)
- No dual writes to multiple data stores
4. Ephemeral Everything
Philosophy: Design for machine churn; assume nothing persists.
Constraints:
- Workers are stateless (no local job storage)
- Machines have no persistent volumes
- Job results expire after 24 hours (move to long-term storage if needed)
5. Telemetry-First Design
Philosophy: Observability is not an afterthought; it's foundational.
Practices:
- Every job creates a trace span
- All errors include structured context
- Metrics emitted for all critical paths
Success Metrics
Current System Performance
| Metric | Current | Target (North Star) |
|---|---|---|
| Job routing optimality | ~60% | 95% |
| Average wait time (p95) | ~15s | <10s |
| Job completion rate | ~95% | 99.9% |
| Machine utilization | ~65% | >80% |
| Model download wait time | ~120s (first user) | <10s (95%) |
Production Readiness Criteria
- [x] Atomic job claiming (no double-processing)
- [x] Worker heartbeat detection (stale worker cleanup)
- [x] Job retry logic (handle transient failures)
- [x] Asset persistence (outputs survive machine churn)
- [x] Real-time progress streaming (WebSocket)
- [ ] Pool-based routing (Phase 1)
- [ ] Predictive model placement (Phase 2)
- [ ] Auto-scaling based on demand (Phase 3)
Appendix: File Paths Reference
API Server
- Entry point:
/apps/api/src/index.ts - Main server:
/apps/api/src/lightweight-api-server.ts - Hybrid client:
/apps/api/src/hybrid-client.ts
Worker
- Entry point:
/apps/worker/src/redis-direct-worker.ts - Worker client:
/apps/worker/src/redis-direct-worker-client.ts - Connectors:
/apps/worker/src/connectors/ - Base connector:
/apps/worker/src/connectors/base-connector.ts - Asset saver:
/apps/worker/src/connectors/asset-saver.ts
Machine
- Entry point:
/apps/machine/src/index-pm2.js - ComfyUI installer:
/apps/machine/src/services/comfyui-installer.js - Status aggregator:
/apps/machine/src/services/machine-status-aggregator.js - Service mapping:
/apps/machine/src/config/service-mapping.json
Redis Functions
- Lua script:
/packages/core/src/redis-functions/functions/findMatchingJob.lua - Installer:
/packages/core/src/redis-functions/installer.ts
Core Packages
- Types:
/packages/core/src/types/ - Redis service:
/packages/core/src/services/redis-service.ts - Event broadcaster:
/packages/core/src/services/event-broadcaster.ts - Telemetry:
/packages/telemetry/src/
Monitor
- Dashboard:
/apps/monitor/src/app/page.tsx - WebSocket client:
/apps/monitor/src/lib/websocket-client.ts - Machine card:
/apps/monitor/src/components/machine-card.tsx
Webhook Service
- Entry point:
/apps/webhook-service/src/index.ts - Delivery logic:
/apps/webhook-service/src/webhook-delivery.ts
Further Reading
For deeper dives into specific subsystems:
- Job Lifecycle: See
/apps/docs/src/02-how-it-works/job-lifecycle.md - Worker Selection: See
/apps/docs/src/02-how-it-works/worker-selection.md - Redis Functions: See
/packages/core/src/redis-functions/README.md - Connector Development: See
/apps/worker/src/connectors/README.md - Telemetry Setup: See
/apps/docs/src/09-observability/telemetry-setup-guide.md - North Star Architecture: See
/apps/docs/src/06-future-vision/planned-features.md
