Skip to content

EmProps Job Queue: System Architecture Overview

Executive Summary

EmProps Job Queue is a distributed AI workload broker designed to orchestrate computationally intensive tasks (image generation, video processing, LLM inference) across ephemeral, geographically distributed machines. Unlike traditional cloud infrastructure, this system solves the unique challenges of spot instance markets (SALAD, vast.ai) where machines scale elastically from 10 to 50 and back daily, have no shared storage, and must deliver sub-10-second job matching despite constant machine churn.

Core Value Proposition: Enable cost-effective, elastic AI workload processing by intelligently routing jobs to specialized machine pools while managing multi-gigabyte model downloads, ensuring high availability despite ephemeral infrastructure, and providing real-time progress visibility to end users.

Current State: Foundation phase with Redis-based job brokering, pull-based workers, PM2-managed machine services, and WebSocket monitoring operational.

Target State: Specialized machine pools (Fast Lane/Standard/Heavy) with predictive model placement achieving 95% optimal job routing and sub-10-second wait times for 95% of jobs.


System Overview

High-Level Architecture

Architectural Patterns

  1. Pull-Based Worker Model: Workers poll Redis for jobs they can handle (capability matching), eliminating the need for centralized worker management
  2. Redis as Single Source of Truth: All job state, worker status, and coordination happens through Redis (atomic operations + pub/sub)
  3. Atomic Job Claiming: Redis Lua functions ensure exactly-once job assignment even with concurrent worker requests
  4. Event-Driven Communication: WebSocket broadcasts + Redis pub/sub for real-time progress updates
  5. PM2 Service Orchestration: Each machine runs PM2 to manage multiple GPU-specific services and worker processes
  6. Connector Pattern: Pluggable service integrations (ComfyUI, Ollama, OpenAI) isolate external API concerns from core worker logic
  7. Stateless API Server: API server is a thin orchestration layer; all state lives in Redis
  8. Telemetry-First Design: OpenTelemetry instrumentation for distributed tracing across jobs, workflows, and machine lifecycles

Core Components

1. API Server (apps/api)

Purpose: Lightweight HTTP + WebSocket server for job submission, progress streaming, and system monitoring.

Key Responsibilities:

  • Job submission endpoint (POST /submit-job)
  • WebSocket connections for real-time progress (/ws)
  • Monitor dashboard WebSocket (/monitor)
  • Health checks and system status
  • Machine restart commands (full machine + individual PM2 services)
  • Redis pub/sub event relay to WebSocket clients

Technology Stack: Express.js, WebSocket (ws), ioredis, OpenTelemetry

Architecture Decisions:

  • Why WebSocket? Sub-second latency for progress updates critical for user experience (vs. polling or SSE)
  • Why Stateless? Allows horizontal scaling; Redis holds all state
  • Why No Database? Jobs are ephemeral (24-hour TTL); Redis provides sufficient persistence + speed

Critical Files:

  • /apps/api/src/index.ts - Entry point with telemetry initialization
  • /apps/api/src/lightweight-api-server.ts - Main server class with HTTP routes + WebSocket handling
  • /apps/api/src/hybrid-client.ts - Combined Redis + WebSocket client for job submission

Data Flow Example (Job Submission):

1. Client → POST /submit-job → API Server
2. API Server → Generate job ID (UUID)
3. API Server → Store job data in Redis hash (`job:{id}`)
4. API Server → Add job to priority queue (ZADD jobs:pending)
5. API Server → Publish job_submitted event (Redis pub/sub)
6. API Server → Broadcast event to WebSocket clients
7. Return job ID to client

Design Constraints:

  • No silent failures: Explicit errors preferred over fallbacks (e.g., missing AUTH_TOKEN is fatal)
  • Redis connection failures trigger exponential backoff retries (max 60s)
  • WebSocket ping/pong tracking prevents zombie connections

2. Worker Client (apps/worker)

Purpose: Job processor that claims jobs from Redis, executes them via connectors, and reports results.

Key Responsibilities:

  • Register capabilities with Redis (services, hardware, models)
  • Poll Redis for matching jobs (via findMatchingJob Lua function)
  • Execute jobs through appropriate connector
  • Report progress updates (0-100%)
  • Save output assets to Azure Blob Storage
  • Handle job failures with retry logic
  • Maintain heartbeat for liveness detection

Technology Stack: TypeScript, ioredis, connectors (per service type)

Architecture Decisions:

  • Why Pull-Based? Workers know their own capacity; avoids centralized scheduling bottlenecks
  • Why Redis Direct? Eliminates intermediate message queue; atomic operations prevent double-claiming
  • Why Connector Pattern? Isolates service-specific logic (ComfyUI WebSocket, Ollama HTTP, OpenAI API); allows adding new services without core changes

Critical Files:

  • /apps/worker/src/redis-direct-worker-client.ts - Main worker client with job lifecycle management
  • /apps/worker/src/redis-direct-worker.ts - Entry point with capability registration
  • /apps/worker/src/connectors/ - Service-specific job processors (ComfyUI, Ollama, OpenAI, etc.)
  • /apps/worker/src/connectors/base-connector.ts - Abstract base class with shared logic (progress reporting, error handling)
  • /apps/worker/src/connectors/asset-saver.ts - Handles output asset uploads to Azure/S3

Connector Architecture:

typescript
abstract class BaseConnector {
  abstract processJob(job: Job, progress: ProgressCallback): Promise<JobResult>

  // Shared utilities:
  protected reportProgress(percent: number, message: string)
  protected handleError(error: Error): FailureClassification
  protected saveAsset(data: Buffer, mimeType: string): Promise<string>
}

// Example implementation:
class ComfyUIConnector extends BaseConnector {
  async processJob(job: Job, progress: ProgressCallback) {
    // 1. Connect to ComfyUI WebSocket
    // 2. Submit workflow payload
    // 3. Monitor execution queue
    // 4. Stream progress events (0% → 100%)
    // 5. Download output images
    // 6. Upload to Azure Blob
    // 7. Return URLs
  }
}

Job Processing Flow:

1. Worker polls Redis → findMatchingJob(capabilities)
2. Redis Function finds matching job → Claims atomically
3. Worker receives job → Updates status to IN_PROGRESS
4. Worker delegates to connector → processJob()
5. Connector reports progress → Worker publishes to Redis
6. API Server relays progress → WebSocket clients
7. Connector completes → Worker saves assets
8. Worker publishes completion event → Redis
9. Webhook Service delivers notification → External client
10. Worker updates status to IDLE → Ready for next job

Failure Handling Strategy:

  • Transient Failures (network errors, service timeouts): Retry with exponential backoff (max 3 attempts)
  • Permanent Failures (invalid payload, missing models): Mark job as FAILED, publish event, no retry
  • Failure Classification: Structured error analysis (timeout, validation, external_api_error, etc.) stored in Redis attestations
  • Worker Attestations: Persistent records of job completions/failures for forensic analysis (7-day TTL)

3. Machine Service (apps/machine)

Purpose: PM2-based orchestrator for managing GPU services and worker processes on ephemeral machines.

Key Responsibilities:

  • Install and configure ComfyUI with 64 custom nodes (parallel installation)
  • Start PM2-managed services (per-GPU ComfyUI instances, worker processes)
  • Monitor service health and restart on failures
  • Aggregate machine status (unified reporting across all services)
  • Manage machine lifecycle (startup, shutdown, restart)
  • Install models and dependencies at runtime

Technology Stack: Node.js, PM2, Docker, ComfyUI, custom installers

Architecture Decisions:

  • Why PM2? Process management with automatic restarts, log aggregation, and per-service isolation
  • Why Per-GPU Services? Maximizes GPU utilization; each GPU runs independent ComfyUI instance
  • Why Custom Node Installer? Supports recursive clones, parallel installation (5 concurrent), environment variable interpolation
  • Why Not Bake Everything? Models (2-15GB) exceed practical container sizes; dynamic downloads required

Critical Files:

  • /apps/machine/src/index-pm2.js - Main entry point with PM2 orchestration
  • /apps/machine/src/services/comfyui-installer.js - Enhanced custom node installer
  • /apps/machine/src/services/comfyui-service.js - Per-GPU ComfyUI service manager
  • /apps/machine/src/services/machine-status-aggregator.js - Unified status reporting
  • /apps/machine/src/config/service-mapping.json - Service-to-worker configuration

Machine Startup Flow:

1. Container starts → Load environment config
2. Initialize telemetry client → Register machine
3. Install ComfyUI custom nodes (64 nodes, parallel)
4. Parse WORKERS env (e.g., "comfyui:2,ollama:1")
5. Start PM2 services:
   - comfyui-gpu0 (GPU 0)
   - comfyui-gpu1 (GPU 1)
   - worker-comfyui-0 (connects to gpu0)
   - worker-comfyui-1 (connects to gpu1)
6. Services register with Redis
7. Machine status aggregator reports unified status
8. Workers begin polling for jobs

PM2 Service Configuration Example:

javascript
{
  name: 'comfyui-gpu0',
  script: '/workspace/ComfyUI/main.py',
  interpreter: 'python',
  args: '--gpu-index 0 --port 8188',
  env: {
    CUDA_VISIBLE_DEVICES: '0',
    PYTORCH_CUDA_ALLOC_CONF: 'max_split_size_mb:512'
  },
  max_restarts: 10,
  restart_delay: 5000
}

Custom Node Installation:

  • Supports git clone URLs with ${VARIABLE} interpolation
  • Parallel installation (5 concurrent) for 64 nodes
  • Auto-installs requirements.txt if requirements: true
  • Creates .env files with environment variables
  • Recursive clone support for submodules

4. Redis Architecture

Purpose: Distributed coordination layer for job queuing, worker discovery, and real-time events.

Data Structures:

Key PatternTypePurposeTTL
job:{id}HashJob metadata, status, payload24h after completion
jobs:pendingSorted SetPriority queue (score = priority * 1M + timestamp)N/A
jobs:completedHashCompleted job results24h
jobs:failedHashFailed job records7 days
worker:{id}HashWorker capabilities, status, heartbeat60s heartbeat
workers:activeSetActive worker IDsN/A
worker:{id}:heartbeatStringLiveness check60s
machine:{id}:statusHashUnified machine status90s
worker:completion:{job}:attempt:{n}StringWorker attestation7 days

Pub/Sub Channels:

ChannelPurposeSubscribers
update_job_progressJob progress updates (0-100%)API Server → WebSocket clients
complete_jobJob completion eventsAPI Server, Webhook Service
job_failedPermanent job failuresAPI Server, Webhook Service
worker:eventsWorker connect/disconnectAPI Server → Monitor
worker_statusWorker status changes (idle/busy)API Server → Monitor
machine:startup:eventsMachine lifecycle eventsAPI Server → Monitor

Redis Functions (Server-Side Lua):

findMatchingJob(capabilities, maxScan):

  • Atomically finds and claims first matching job
  • Capability matching logic:
    • Service type match (comfyui, ollama, openai)
    • Hardware requirements (GPU memory, CPU cores)
    • Model availability (checks worker's models.{service} array)
    • Custom requirements (nested object comparison)
  • Scoring: Priority-first, then FIFO within priority
  • Updates worker status to busy, job status to ASSIGNED
  • Returns { jobId, job } or null

Why Redis Functions?

  • Atomicity: Prevents race conditions (two workers claiming same job)
  • Performance: Server-side execution eliminates network round-trips
  • Consistency: All workers use identical matching logic

Example Lua Logic (simplified):

lua
-- Scan pending jobs (sorted by priority)
local jobIds = redis.call('ZREVRANGE', 'jobs:pending', 0, maxScan)

for _, jobId in ipairs(jobIds) do
  local job = redis.call('HGETALL', 'job:' .. jobId)

  -- Check service type
  if job.service_required == capabilities.services[1] then
    -- Check hardware requirements
    if job.requirements.gpu_memory <= capabilities.hardware.gpu_memory_gb then
      -- Atomic claim: remove from pending queue
      redis.call('ZREM', 'jobs:pending', jobId)

      -- Assign to worker
      redis.call('HMSET', 'job:' .. jobId,
        'worker_id', capabilities.worker_id,
        'status', 'assigned',
        'assigned_at', timestamp
      )

      -- Update worker status
      redis.call('HMSET', 'worker:' .. capabilities.worker_id,
        'status', 'busy',
        'current_job_id', jobId
      )

      return cjson.encode({ jobId = jobId, job = job })
    end
  end
end

return nil -- No matching job found

5. Monitor Dashboard (apps/monitor)

Purpose: Real-time system monitoring and operations dashboard.

Key Features:

  • Machine cards with status (online/offline/busy)
  • Worker status and current jobs
  • Job queue visualization (pending/in-progress/completed)
  • Live logs viewer (PM2 service logs)
  • Machine restart controls (full machine + individual services)
  • Service connection status (ComfyUI, Ollama availability)

Technology Stack: Next.js 15, React, WebSocket, Tailwind CSS, shadcn/ui

Architecture Decisions:

  • Why Next.js? Server-side rendering for initial state, WebSocket for real-time updates
  • Why WebSocket? Sub-second latency for machine status changes
  • Why No Polling? Wasteful; Redis pub/sub + WebSocket push model is more efficient

Critical Files:

  • /apps/monitor/src/app/page.tsx - Main dashboard
  • /apps/monitor/src/lib/websocket-client.ts - WebSocket connection manager
  • /apps/monitor/src/components/machine-card.tsx - Individual machine display

Monitor WebSocket Protocol:

typescript
// Client → Server (subscribe to topics)
{
  type: 'monitor_subscribe',
  topics: ['machines', 'workers', 'jobs', 'system_status']
}

// Server → Client (machine status update)
{
  type: 'machine_status',
  machine_id: 'salad-001',
  status: 'online',
  services: [
    { name: 'comfyui-gpu0', status: 'running', pid: 1234 },
    { name: 'worker-0', status: 'running', jobs_completed: 42 }
  ],
  last_heartbeat: '2025-10-05T12:34:56Z'
}

6. Webhook Service (apps/webhook-service)

Purpose: Reliable HTTP notification delivery for job lifecycle events.

Key Responsibilities:

  • Subscribe to Redis pub/sub events (complete_job, job_failed)
  • Deliver HTTP POST notifications to registered webhooks
  • Retry failed deliveries (exponential backoff, max 5 attempts)
  • Track delivery status in Redis
  • Emit telemetry for webhook delivery success/failure

Technology Stack: Express.js, ioredis, OpenTelemetry

Architecture Decisions:

  • Why Separate Service? Decouples notification delivery from API server; allows independent scaling
  • Why Redis Pub/Sub? Already available; no additional infrastructure needed
  • Why Retry Logic? Network failures are common; ensures eventual delivery

Webhook Payload Example:

json
{
  "event_type": "job_completed",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "workflow_id": "wf_abc123",
  "status": "completed",
  "result": {
    "data": {
      "images": [
        "https://storage.azure.com/outputs/image1.png"
      ]
    }
  },
  "timestamp": 1696512000000
}

7. EmProps API Integration (apps/emprops-api)

Purpose: Platform-specific API for the EmProps ecosystem (user workflows, collections, payments).

Key Responsibilities:

  • User authentication and authorization (JWT)
  • Workflow composition (multi-step job orchestration)
  • Collection management (reusable workflow templates)
  • Payment processing (Stripe integration)
  • Asset management (IPFS + Azure Blob)
  • Database persistence (PostgreSQL via Prisma)

Technology Stack: Express.js, Prisma, PostgreSQL, Socket.io, Stripe, IPFS

Integration with Job Queue:

  • Submits jobs to API Server via HTTP (POST /submit-job)
  • Receives progress updates via WebSocket subscription
  • Stores job results in PostgreSQL for user access
  • Manages long-running workflows (multi-step sequences)

Why Separate from API Server?

  • Separation of Concerns: Job queue is infrastructure; EmProps API is product layer
  • Independent Scaling: User-facing API has different load patterns than job orchestration
  • Database Coupling: EmProps API requires PostgreSQL; job queue uses Redis exclusively

8. Shared Packages (packages/)

@emp/core:

  • Shared TypeScript types (Job, WorkerCapabilities, JobStatus, etc.)
  • Redis function installer and manager
  • Common utilities (logging, telemetry instrumentation)
  • Event broadcaster for WebSocket relay
  • Job and workflow instrumentation helpers

@emp/telemetry:

  • OpenTelemetry client wrapper
  • Unified logging (Winston + OTLP exporter)
  • Distributed tracing utilities
  • Log file monitoring and streaming

@emp/database (Prisma):

  • PostgreSQL schema and migrations
  • Prisma Client generation
  • Database utilities

@emp/service-config:

  • Environment configuration management
  • Service mapping definitions (worker-to-service relationships)

Key Technical Decisions

1. Why Redis Instead of RabbitMQ/Kafka?

Decision: Use Redis for job queue + pub/sub + state management.

Rationale:

  • Simplicity: Single dependency for queuing, state, and pub/sub
  • Performance: Sub-millisecond latency for job matching (Lua functions run in-memory)
  • Atomic Operations: Native support for atomic claims (ZREM + HMSET in Lua)
  • Familiarity: Team expertise with Redis; lower operational complexity
  • Cost: One managed Redis instance vs. separate queue + cache

Trade-offs Accepted:

  • No durable message guarantees (acceptable: jobs are ephemeral, 24h TTL)
  • Limited message replay (acceptable: real-time system, not event sourcing)
  • Single point of failure (mitigated: Redis persistence + managed service uptime)

2. Why Pull-Based Workers Instead of Push-Based?

Decision: Workers poll Redis for jobs they can handle.

Rationale:

  • Self-Governing Capacity: Workers know their own load; no centralized scheduler needed
  • Capability Matching: Workers declare what they can process; Redis Function finds compatible jobs
  • Resilience: No need to track worker availability; workers claim when ready
  • Simplicity: Eliminates complex worker registry and assignment logic

Trade-offs Accepted:

  • Polling overhead (mitigated: 1-second interval, conditional on idle status)
  • No instant job assignment (acceptable: 1s average latency is tolerable)

3. Why PM2 Instead of Kubernetes?

Decision: Use PM2 for process management on individual machines.

Rationale:

  • Ephemeral Infrastructure: Machines are spot instances (SALAD/vast.ai); no K8s cluster available
  • Per-Machine Orchestration: Each machine independently manages its services
  • GPU Isolation: Per-GPU services require explicit CUDA device assignment (PM2 env vars)
  • Lightweight: PM2 has minimal overhead vs. K8s agent on already resource-constrained machines
  • Proven: PM2 battle-tested for Node.js process management; extensive logging and restart policies

Trade-offs Accepted:

  • No cross-machine orchestration (acceptable: Redis handles coordination)
  • Manual service configuration (mitigated: templated PM2 ecosystem files)

4. Why Custom Connectors Instead of Direct API Calls?

Decision: Abstract service integrations behind connector interface.

Rationale:

  • Service Diversity: ComfyUI uses WebSocket, Ollama uses HTTP streaming, OpenAI uses REST
  • Error Handling: Each service has unique failure modes; connectors normalize them
  • Progress Reporting: Connectors translate service-specific events to standard 0-100% progress
  • Testing: Mock connectors for testing without running actual services
  • Evolution: Add new services (Runway, Replicate) without changing worker core

Connector Interface:

typescript
interface ConnectorInterface {
  processJob(job: Job, progress: ProgressCallback): Promise<JobResult>
  healthCheck(): Promise<HealthStatus>
  getCapabilities(): ServiceCapabilities
}

5. Why OpenTelemetry Instead of Custom Logging?

Decision: Use OpenTelemetry for distributed tracing and structured logging.

Rationale:

  • Distributed Tracing: Track jobs across API → Worker → Connector → External Service
  • Context Propagation: Trace IDs flow through entire system (HTTP headers, Redis fields, logs)
  • Vendor Agnostic: Can send to Jaeger, Zipkin, Grafana, or custom backends
  • Structured Logs: Automatic correlation of logs with traces and spans
  • Industry Standard: Well-supported tooling and ecosystem

Implementation:

  • Telemetry Collector service receives OTLP HTTP spans and logs
  • Stores in PostgreSQL for querying and analysis
  • Workers emit job-level spans with custom attributes (job_id, workflow_id, service_type)

Production Constraints and Solutions

Constraint 1: No Shared Storage

Problem: Machines are geographically distributed with no shared filesystem.

Solution:

  • All output assets uploaded to Azure Blob Storage
  • Workers receive pre-signed URLs for input assets
  • Job payloads contain URLs, not raw data
  • Asset Saver connector handles uploads with retry logic

Example Flow:

1. User uploads image → EmProps API → Azure Blob → Returns URL
2. EmProps API submits job with input_url in payload
3. Worker downloads from URL → Processes → Uploads output to Azure
4. Worker returns output URL in job result

Constraint 2: Ephemeral Machines

Problem: Machines spin up/down constantly; jobs must survive machine churn.

Solution:

  • Job state lives in Redis (survives machine restarts)
  • Workers are stateless (no local job storage)
  • Job retry logic handles mid-flight failures
  • Worker heartbeats detect stale machines; jobs auto-requeued

Failure Scenario:

1. Worker claims job → Starts processing
2. Machine crashes (spot instance reclaimed)
3. Worker heartbeat expires (60s TTL)
4. API server detects stale worker → Re-adds job to pending queue
5. Another worker claims and completes job

Constraint 3: Large Model Files

Problem: Models are 2-15GB; downloading on every job is impractical.

Current Solution (Phase 0):

  • Models downloaded once per machine at startup
  • Cached in /workspace/models (ephemeral, lost on restart)
  • Workers check model availability before claiming jobs

Future Solution (Phase 2):

  • Bake common models into container images (pool-specific)
  • Predictive model placement based on job history
  • Intelligent cache eviction for dynamic models

Constraint 4: Performance Heterogeneity

Problem: 1-second Ollama jobs vs. 10-minute video jobs cause resource contention.

Current Solution:

  • All machines are uniform (same resource allocation)
  • No job duration-based routing

Future Solution (Phase 1):

  • Fast Lane Pool: CPU-optimized, 20-40GB storage, text/simple tasks
  • Standard Pool: Balanced GPU, 80-120GB storage, typical workflows
  • Heavy Pool: High-end GPU, 150-300GB storage, video/complex tasks
  • Redis Function enhanced for pool-aware routing

Data Flow Narratives

Narrative 1: Single Job Execution

User Story: Client submits ComfyUI image generation job.

Step-by-Step Flow:

  1. Job Submission:

    • Client sends POST /submit-job with payload:
      json
      {
        "service_required": "comfyui",
        "priority": 50,
        "payload": {
          "workflow": { ... },
          "positive": "a cat",
          "negative": "blurry"
        },
        "requirements": {
          "gpu_memory": 8
        }
      }
    • API Server generates job ID: job_123
    • Stores in Redis: HMSET job:job_123 {service_required: comfyui, status: pending, ...}
    • Adds to queue: ZADD jobs:pending {score} job_123
    • Publishes event: PUBLISH job_submitted {...}
    • Returns job ID to client
  2. Job Discovery:

    • Worker polls Redis: FCALL findMatchingJob 0 {capabilities} 100
    • Redis Function scans pending jobs (sorted by priority)
    • Finds job_123 matches worker capabilities (has ComfyUI, 16GB GPU)
    • Atomically claims: ZREM jobs:pending job_123 + HMSET job:job_123 {worker_id: worker_1, status: assigned}
    • Returns {jobId: job_123, job: {...}} to worker
  3. Job Execution:

    • Worker updates status: HMSET job:job_123 {status: in_progress, started_at: ...}
    • Worker selects connector: ComfyUIConnector
    • Connector connects to ComfyUI WebSocket: ws://localhost:8188
    • Connector submits workflow: {prompt: {workflow}, client_id: worker_1}
    • ComfyUI queues job: Returns {prompt_id: prompt_456}
  4. Progress Streaming:

    • ComfyUI sends progress events: {type: progress, data: {value: 0.25}}
    • Connector calculates percent: 25%
    • Connector calls progress callback
    • Worker publishes: PUBLISH update_job_progress {job_id: job_123, progress: 25, message: "Generating..."}
    • API Server receives event → Broadcasts to WebSocket clients
    • Client UI updates progress bar
  5. Job Completion:

    • ComfyUI finishes: {type: executed, data: {output: {images: [...]}}}
    • Connector downloads output images from ComfyUI
    • Worker invokes Asset Saver:
      • Uploads to Azure Blob: PUT https://emprops.blob.core.windows.net/outputs/job_123_0.png
      • Returns URL: https://emprops.blob.core.windows.net/outputs/job_123_0.png
    • Worker creates completion attestation: SET worker:completion:step-id:job_123 {...}
    • Worker updates job: HMSET job:job_123 {status: completed, completed_at: ...}
    • Worker stores result: HSET jobs:completed job_123 {success: true, data: {images: [url]}}
    • Worker publishes: PUBLISH complete_job {job_id: job_123, result: {...}}
    • API Server broadcasts to client WebSocket
    • Webhook Service delivers notification to registered endpoint
  6. Cleanup:

    • Worker updates status: HMSET worker:worker_1 {status: idle, current_job_id: ""}
    • Worker polls for next job (1s interval)
    • Redis expires job after 24 hours: EXPIRE job:job_123 86400

Total Latency (typical):

  • Job submission → Job claimed: ~1s (poll interval)
  • Job claimed → Execution start: ~100ms (worker processing)
  • Execution duration: ~30s (ComfyUI generation)
  • Asset upload: ~2s (image upload to Azure)
  • Total: ~33s end-to-end

Narrative 2: Multi-Step Workflow

User Story: EmProps API submits 3-step workflow (text → image → upscale).

Workflow Definition:

json
{
  "workflow_id": "wf_abc123",
  "steps": [
    {
      "step": 1,
      "service_required": "ollama",
      "payload": { "model": "llama3", "prompt": "describe a sunset" }
    },
    {
      "step": 2,
      "service_required": "comfyui",
      "payload": { "positive": "{STEP_1_OUTPUT}" }
    },
    {
      "step": 3,
      "service_required": "comfyui",
      "payload": { "workflow": "upscale", "input": "{STEP_2_OUTPUT}" }
    }
  ]
}

Execution Flow:

  1. Workflow Submission:

    • EmProps API submits step 1 to Job Queue API: POST /submit-job
    • Job payload includes: {workflow_id: wf_abc123, current_step: 1, total_steps: 3}
    • API creates workflow trace context: workflowTraceContexts.set(wf_abc123, {traceId, spanId})
  2. Step 1 Execution (Ollama):

    • Worker claims job → Processes via OllamaConnector
    • Ollama generates text: "A vibrant sunset with orange and purple hues..."
    • Worker completes job → Publishes complete_job event with workflow fields
  3. Workflow Orchestration (EmProps API):

    • Webhook Service receives completion → Delivers to EmProps API
    • EmProps API checks: current_step < total_steps → More steps to go
    • EmProps API substitutes output: {positive: "A vibrant sunset with orange and purple hues..."}
    • EmProps API submits step 2: POST /submit-job {workflow_id: wf_abc123, current_step: 2, ...}
  4. Step 2 Execution (ComfyUI Image Generation):

    • Worker claims job → Generates image via ComfyUI
    • Uploads to Azure: https://.../wf_abc123_step2.png
    • Publishes completion with workflow context
  5. Step 3 Execution (ComfyUI Upscale):

    • EmProps API submits final step with image URL
    • Worker upscales → Uploads final output
    • Publishes completion: {workflow_id: wf_abc123, current_step: 3, total_steps: 3}
  6. Workflow Completion:

    • EmProps API receives final step → Marks workflow complete
    • Stores final result in PostgreSQL
    • Notifies user via platform notification

Workflow Failure Handling:

  • If step 2 fails permanently → EmProps API marks workflow as failed
  • Does NOT submit step 3 (cascading prevention)
  • User receives failure notification with error details

Observability and Telemetry

Distributed Tracing

Trace Propagation:

HTTP Request → API Server (Span 1)
    → Redis Job Storage
    → Worker Claim (Span 2, parent: Span 1)
        → Connector Execution (Span 3, parent: Span 2)
            → External Service Call (Span 4, parent: Span 3)

Trace Context Storage:

  • Job creation: API generates trace ID + span ID
  • Stored in Redis: job:job_123 {job_trace_id, job_span_id}
  • Worker retrieves trace context → Creates child span
  • Connector propagates trace ID to external services (HTTP headers)

Example Span Attributes:

typescript
{
  'job.id': 'job_123',
  'job.service_required': 'comfyui',
  'job.priority': 50,
  'workflow.id': 'wf_abc123',
  'workflow.current_step': 2,
  'workflow.total_steps': 3,
  'worker.id': 'worker_1',
  'machine.id': 'salad-001'
}

Logging Strategy

Log Levels:

  • ERROR: Unrecoverable failures (Redis connection lost, job processing crash)
  • WARN: Recoverable issues (transient network errors, retry attempts)
  • INFO: Lifecycle events (job submitted, worker registered, machine startup)
  • DEBUG: Detailed tracing (Redis calls, connector state changes) - only in development

Log Aggregation:

  • Winston daily rotate files: /workspace/logs/combined-{date}.log
  • PM2 service logs: /workspace/logs/{service}-out.log, /workspace/logs/{service}-error.log
  • Telemetry Collector streams logs to PostgreSQL with trace correlation
  • Monitor UI provides real-time log viewer (WebSocket streaming)

Key Metrics

MetricDescriptionTarget
Job claim latencyTime from submission to worker claim< 2s (p95)
Job processing timeExecution duration by service typeVaries by service
Worker utilization% time spent processing vs. idle> 80%
Job success rateCompleted / (Completed + Failed)> 99%
Asset upload timeTime to save outputs to Azure< 5s (p95)
WebSocket message latencyProgress event publish to client receive< 100ms

Evolution Path: Current → North Star

Phase 0: Foundation (Current State)

Achievements:

  • ✅ Redis-based job broker with atomic matching
  • ✅ Pull-based workers with capability registration
  • ✅ PM2-managed machine services
  • ✅ WebSocket monitoring with real-time updates
  • ✅ ComfyUI with 64 custom nodes (parallel installation)
  • ✅ Asset saving to Azure Blob Storage
  • ✅ OpenTelemetry distributed tracing

Limitations:

  • All machines are uniform (no specialization)
  • Models downloaded at runtime (first-user wait times)
  • No intelligent routing (basic FIFO within priority)

Phase 1: Pool Separation (Months 1-2)

Goal: Eliminate performance heterogeneity by routing jobs to specialized machine pools.

Key Changes:

  1. Pool-Specific Containers:

    • Fast Lane: Baked with CPU models (Ollama, small vision models)
    • Standard: Baked with common Stable Diffusion models
    • Heavy: Baked with video models (AnimateDiff, SVD)
  2. Enhanced Redis Function:

    lua
    function findMatchingJob(capabilities, maxScan)
      -- Determine worker pool type
      local poolType = getPoolType(capabilities)
    
      -- Filter jobs by pool compatibility
      for _, jobId in ipairs(pending) do
        local estimatedDuration = estimateDuration(job)
        if (poolType == 'fast' and estimatedDuration < 10) or
           (poolType == 'standard' and estimatedDuration < 300) or
           (poolType == 'heavy') then
          -- Claim job
        end
      end
    end
  3. Pool-Aware Routing:

    • API Server tags jobs with estimated duration
    • Workers register pool type in capabilities
    • Cross-pool fallback for busy pools

Deliverable: 95% of jobs route to optimal pools; 50% reduction in queue wait times.

Phase 2: Model Intelligence (Months 3-4)

Goal: Eliminate first-user model download wait times through predictive placement.

Key Changes:

  1. TypeScript Model Manager (replaces Python SQLite):

    • Tracks model usage patterns
    • Predicts which models to pre-download on which pools
    • Manages cache eviction (LRU with usage decay)
  2. Model Affinity Routing:

    • Redis Function prefers workers with required models already cached
    • Falls back to workers that can download quickly
  3. Baked Model Strategy:

    • Top 10 models (80% of jobs) baked into pool images
    • Long-tail models downloaded on-demand
    • Persistent cache layer for popular models

Deliverable: 80% reduction in model download wait times; sub-10s wait for 95% of jobs.

Phase 3: Advanced Optimization (Months 5-6)

Goal: Resource optimization and specialization for 10x job volume support.

Key Changes:

  1. ML-Based Demand Prediction:

    • Predict job arrival patterns (time-of-day, service type)
    • Pre-scale machine pools before demand spikes
    • Auto-scale down during low utilization
  2. Specialty Routing:

    • Customer-specific machines for enterprise isolation
    • GPU architecture-specific pools (A100 vs. H100)
    • Geographic routing for latency-sensitive jobs
  3. Intelligent Load Balancing:

    • Job packing optimization (co-locate small jobs)
    • Bin packing for GPU memory utilization
    • Workflow-aware scheduling (reserve capacity for multi-step)

Deliverable: 99.9% job completion rate; 10x job volume vs. current capacity; 95% optimal routing.


Design Principles

1. Explicit Failures Over Silent Fallbacks

Philosophy: Errors should be loud and actionable, not hidden by default values.

Examples:

  • ❌ Missing AUTH_TOKEN → Defaults to dev-token (hides misconfiguration)
  • ✅ Missing AUTH_TOKEN → Throws fatal error with setup instructions

Why This Matters:

  • Prevents "works on my machine" scenarios
  • Guides users to correct solutions immediately
  • Reduces debugging time (no wild goose chases)

2. Pull-Based Over Push-Based

Philosophy: Workers self-govern their capacity; no central scheduler.

Benefits:

  • Workers know when they're ready (no external capacity tracking)
  • Natural backpressure (workers don't poll when busy)
  • Resilient to worker churn (no stale worker registries)

3. Redis as Single Source of Truth

Philosophy: All coordination happens through Redis; no split-brain scenarios.

Implications:

  • Job state lives exclusively in Redis
  • Worker status is Redis-authoritative (heartbeats)
  • No dual writes to multiple data stores

4. Ephemeral Everything

Philosophy: Design for machine churn; assume nothing persists.

Constraints:

  • Workers are stateless (no local job storage)
  • Machines have no persistent volumes
  • Job results expire after 24 hours (move to long-term storage if needed)

5. Telemetry-First Design

Philosophy: Observability is not an afterthought; it's foundational.

Practices:

  • Every job creates a trace span
  • All errors include structured context
  • Metrics emitted for all critical paths

Success Metrics

Current System Performance

MetricCurrentTarget (North Star)
Job routing optimality~60%95%
Average wait time (p95)~15s<10s
Job completion rate~95%99.9%
Machine utilization~65%>80%
Model download wait time~120s (first user)<10s (95%)

Production Readiness Criteria

  • [x] Atomic job claiming (no double-processing)
  • [x] Worker heartbeat detection (stale worker cleanup)
  • [x] Job retry logic (handle transient failures)
  • [x] Asset persistence (outputs survive machine churn)
  • [x] Real-time progress streaming (WebSocket)
  • [ ] Pool-based routing (Phase 1)
  • [ ] Predictive model placement (Phase 2)
  • [ ] Auto-scaling based on demand (Phase 3)

Appendix: File Paths Reference

API Server

  • Entry point: /apps/api/src/index.ts
  • Main server: /apps/api/src/lightweight-api-server.ts
  • Hybrid client: /apps/api/src/hybrid-client.ts

Worker

  • Entry point: /apps/worker/src/redis-direct-worker.ts
  • Worker client: /apps/worker/src/redis-direct-worker-client.ts
  • Connectors: /apps/worker/src/connectors/
  • Base connector: /apps/worker/src/connectors/base-connector.ts
  • Asset saver: /apps/worker/src/connectors/asset-saver.ts

Machine

  • Entry point: /apps/machine/src/index-pm2.js
  • ComfyUI installer: /apps/machine/src/services/comfyui-installer.js
  • Status aggregator: /apps/machine/src/services/machine-status-aggregator.js
  • Service mapping: /apps/machine/src/config/service-mapping.json

Redis Functions

  • Lua script: /packages/core/src/redis-functions/functions/findMatchingJob.lua
  • Installer: /packages/core/src/redis-functions/installer.ts

Core Packages

  • Types: /packages/core/src/types/
  • Redis service: /packages/core/src/services/redis-service.ts
  • Event broadcaster: /packages/core/src/services/event-broadcaster.ts
  • Telemetry: /packages/telemetry/src/

Monitor

  • Dashboard: /apps/monitor/src/app/page.tsx
  • WebSocket client: /apps/monitor/src/lib/websocket-client.ts
  • Machine card: /apps/monitor/src/components/machine-card.tsx

Webhook Service

  • Entry point: /apps/webhook-service/src/index.ts
  • Delivery logic: /apps/webhook-service/src/webhook-delivery.ts

Further Reading

For deeper dives into specific subsystems:

  • Job Lifecycle: See /apps/docs/src/02-how-it-works/job-lifecycle.md
  • Worker Selection: See /apps/docs/src/02-how-it-works/worker-selection.md
  • Redis Functions: See /packages/core/src/redis-functions/README.md
  • Connector Development: See /apps/worker/src/connectors/README.md
  • Telemetry Setup: See /apps/docs/src/09-observability/telemetry-setup-guide.md
  • North Star Architecture: See /apps/docs/src/06-future-vision/planned-features.md

Released under the MIT License.