Skip to content

Machine-Worker System Architecture

Complete guide to the machine/worker system from configuration to job execution

Table of Contents

  1. System Overview
  2. Architecture Layers
  3. Service Configuration (service-mapping.json)
  4. Machine App Layer
  5. Worker App Layer
  6. Connector Architecture
  7. Redis Integration
  8. Complete Job Flow
  9. Deployment Process
  10. Component Reference

System Overview

The machine-worker system is a distributed job processing architecture that enables elastic scaling across ephemeral machines (vast.ai, etc.). It orchestrates AI workload execution through a layered architecture:

[Redis Job Queue]

[Worker App] ← reads jobs from Redis

[Connector Manager] ← dynamically loads connectors based on WORKERS env var

[Specific Connector] ← ComfyUIWebSocketConnector, OllamaConnector, etc.

[End Service] ← ComfyUI, Ollama, OpenAI API, etc.

[Result] → back through connector → Redis

Key Principles

  1. Configuration-Driven: Everything starts from service-mapping.json
  2. Dynamic Loading: Connectors are loaded based on worker type at runtime
  3. Service Isolation: Each service (ComfyUI, Ollama) runs in its own process
  4. GPU Awareness: Services can scale per-GPU with independent instances
  5. Fault Tolerance: Offline stubs for failed services, graceful error handling

Architecture Layers

Layer 1: Configuration Layer

File: apps/machine/src/config/service-mapping.json

This is the master configuration that defines:

  • Worker types and their capabilities
  • Service definitions and installers
  • Connector mappings
  • Job routing logic

Layer 2: Machine App Layer

Directory: apps/machine/

Responsibilities:

  • Install and configure services (ComfyUI, Ollama, etc.)
  • Generate PM2 ecosystem configuration
  • Manage Docker Compose setup
  • Handle service lifecycle (start, restart, stop)

Layer 3: Worker App Layer

Directory: apps/worker/

Responsibilities:

  • Connect to Redis job queue
  • Load appropriate connectors dynamically
  • Route jobs to correct connector
  • Report job status back to Redis

Layer 4: Connector Layer

Directory: apps/worker/src/connectors/

Responsibilities:

  • Interface with specific end services
  • Translate Redis job data to service-specific format
  • Handle service-specific communication (WebSocket, HTTP, API)
  • Report progress and results back to worker

Layer 5: End Service Layer

Examples: ComfyUI, Ollama, OpenAI API

The actual service that processes the job (image generation, text generation, etc.)


Service Configuration

Structure of service-mapping.json

json
{
  "workers": {
    "worker-type-name": {
      "services": ["service1", "service2"],
      "is_gpu_bound": true,
      "job_service_required_map": [
        {
          "job_service_required": "comfyui",
          "worker_service": "comfyui"
        }
      ]
    }
  },
  "services": {
    "service-name": {
      "connector": "ConnectorClassName",
      "type": "pm2_service | external_api | daemon_service",
      "installer": "InstallerClassName",
      "installer_config": {...}
    }
  },
  "connectors": {
    "ConnectorClassName": {
      "path": "./redis-direct-worker.js",
      "description": "..."
    }
  }
}

Example: ComfyUI Service

json
{
  "workers": {
    "comfyui": {
      "services": ["comfyui"],
      "is_gpu_bound": true,
      "job_service_required_map": [
        {
          "job_service_required": "comfyui",
          "worker_service": "comfyui"
        }
      ]
    }
  },
  "services": {
    "comfyui": {
      "connector": "ComfyUIWebSocketConnector",
      "type": "pm2_service",
      "installer": "ComfyUIManagementClient",
      "installer_filename": "./services/comfyui-management-client.js",
      "is_gpu_bound": true,
      "service_instances_per_gpu": "${COMFYUI_INSTANCES_PER_GPU:-1}",
      "ports": ["${COMFYUI_BASE_PORT:-8188}"],
      "port_increment": "${COMFYUI_PORT_INCREMENT:-1}",
      "installer_config": {
        "repo_url": "${COMFYUI_REPO_URL:-https://github.com/stakeordie/ComfyUI.git}",
        "branch": "${COMFYUI_BRANCH:-main}",
        "custom_nodes_enabled": "${COMFYUI_ENABLE_CUSTOM_NODES:-true}"
      }
    }
  },
  "connectors": {
    "ComfyUIWebSocketConnector": {
      "path": "./redis-direct-worker.js",
      "description": "Local ComfyUI WebSocket connector"
    }
  }
}

Service Types

TypeDescriptionHas InstallerHas PortsExample
pm2_servicePM2-managed processesYesYesComfyUI, A1111
daemon_serviceBinary daemon servicesYesYesOllama, Redis
external_apiExternal API servicesNoNoOpenAI, Gemini
external_serviceRemote servicesNoNoRemote ComfyUI

Machine App Layer

Components

1. Service Installer System

File: apps/machine/src/config/service-installer.js

Orchestrates service installation based on service-mapping.json:

javascript
// Pseudo-code flow
class ServiceInstaller {
  async installServices() {
    // 1. Read service-mapping.json
    const serviceMapping = loadServiceMapping();

    // 2. Determine which services to install based on WORKERS env var
    const services = determineServicesForWorkers(process.env.WORKERS);

    // 3. For each service, instantiate its installer
    for (const service of services) {
      const InstallerClass = require(service.installer_filename);
      const installer = new InstallerClass(service.installer_config);

      // 4. Run installation (clone repo, install deps, configure)
      await installer.install();
    }
  }
}

Key Installers:

  • ComfyUIManagementClient - Installs ComfyUI fork, custom nodes, models
  • A1111ManagementClient - Installs Automatic1111 WebUI
  • OllamaDaemonInstaller - Installs Ollama daemon and downloads models

2. PM2 Ecosystem Generator

File: apps/machine/src/config/enhanced-pm2-ecosystem-generator.js

Generates PM2 configuration for services:

javascript
// For GPU-bound services, creates multiple instances
{
  "apps": [
    {
      "name": "comfyui-gpu0",
      "script": "python main.py --port 8188 --listen 0.0.0.0",
      "cwd": "/workspace/ComfyUI",
      "env": {
        "GPU_INDEX": "0",
        "COMFYUI_PORT": "8188"
      }
    },
    {
      "name": "comfyui-gpu1",
      "script": "python main.py --port 8189 --listen 0.0.0.0",
      "cwd": "/workspace/ComfyUI",
      "env": {
        "GPU_INDEX": "1",
        "COMFYUI_PORT": "8189"
      }
    }
  ]
}

Port Assignment Logic:

  • Base port from service.ports[0]
  • For each GPU instance: port = base_port + (gpu_index * port_increment)
  • Example: GPU 0 → 8188, GPU 1 → 8189, GPU 2 → 8190

3. Docker Compose Integration

File: apps/machine/docker-compose.yml

Services are deployed in Docker containers with environment variables from .env:

yaml
services:
  worker:
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      - WORKERS=${WORKERS}  # e.g., "comfyui,ollama"
      - NUM_GPUS=${NUM_GPUS}
      - HUB_REDIS_URL=${HUB_REDIS_URL}
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: all
              capabilities: [gpu]

Worker App Layer

Entry Point

File: apps/worker/src/redis-direct-worker.ts

The worker entry point:

typescript
// 1. Load environment configuration
const WORKER_ID = process.env.WORKER_ID;  // e.g., "comfyui-gpu0"
const HUB_REDIS_URL = process.env.HUB_REDIS_URL;
const MACHINE_ID = process.env.MACHINE_ID || os.hostname();

// 2. Create connector manager
const connectorManager = new ConnectorManager();

// 3. Connect to Redis
const redis = new Redis(HUB_REDIS_URL);

// 4. Inject Redis connection into connector manager
connectorManager.setRedisConnection(redis, WORKER_ID, MACHINE_ID);

// 5. Load connectors based on WORKERS env var
await connectorManager.loadConnectors();

// 6. Start worker
const worker = new RedisDirectBaseWorker(
  WORKER_ID,
  redis,
  connectorManager,
  telemetryClient
);

await worker.start();

Connector Loading Flow

File: apps/worker/src/connector-manager.ts

typescript
class ConnectorManager {
  async loadConnectors() {
    // 1. Parse WORKERS env var
    // Example: WORKERS="comfyui,ollama"
    const workerSpecs = process.env.WORKERS.split(',');

    // 2. Load service-mapping.json
    const serviceMapping = JSON.parse(
      fs.readFileSync('/workspace/src/config/service-mapping.json')
    );

    // 3. For each worker type, find its connectors
    const connectorsToLoad = new Set();
    for (const workerType of workerSpecs) {
      const workerConfig = serviceMapping.workers[workerType];

      // 4. Get services for this worker
      for (const mapping of workerConfig.job_service_required_map) {
        const serviceConfig = serviceMapping.services[mapping.worker_service];
        connectorsToLoad.add(serviceConfig.connector);
      }
    }

    // 5. Dynamically import and instantiate each connector
    for (const connectorName of connectorsToLoad) {
      const ConnectorClass = await import(`./connectors/${connectorName}.js`);
      const connector = new ConnectorClass(connectorId, config);

      // 6. Inject Redis connection
      connector.setRedis(this.redis, this.workerId, this.machineId);

      // 7. Register connector
      this.connectors.set(connectorId, connector);
    }
  }
}

Worker Job Processing Loop

File: apps/worker/src/redis-direct-base-worker.ts

typescript
class RedisDirectBaseWorker {
  async start() {
    while (this.running) {
      // 1. Check Redis for available jobs
      const job = await this.claimNextJob();

      if (job) {
        // 2. Route job to appropriate connector
        const connector = this.connectorManager.getConnector(job.service_required);

        // 3. Process job through connector
        const result = await connector.processJob(job);

        // 4. Update Redis with result
        await this.updateJobStatus(job.id, 'completed', result);
      } else {
        // 5. No jobs available, wait before retrying
        await sleep(this.pollInterval);
      }
    }
  }

  async claimNextJob() {
    // Call Redis Lua function to atomically claim a job
    return await this.redis.evalsha(
      'findMatchingJobLua',
      this.workerCapabilities
    );
  }
}

Connector Architecture

Base Connector

File: apps/worker/src/connectors/base-connector.ts

All connectors extend BaseConnector:

typescript
abstract class BaseConnector implements ConnectorInterface {
  // Required properties
  abstract service_type: string;
  abstract version: string;

  // Connection state
  protected redis?: Redis;
  protected workerId?: string;
  protected machineId?: string;

  // Required methods
  abstract async processJob(jobData: JobData, progressCallback?: ProgressCallback): Promise<JobResult>;
  abstract async getServiceInfo(): Promise<ServiceInfo>;
  abstract async checkHealth(): Promise<boolean>;
  abstract async canProcessJob(jobData: JobData): Promise<boolean>;

  // Optional methods
  async connect(): Promise<void> { }
  async disconnect(): Promise<void> { }
  async cancelJob(jobId: string): Promise<void> { }
}

WebSocket Connector

File: apps/worker/src/connectors/protocol/websocket-connector.ts

For services with WebSocket APIs (ComfyUI):

typescript
abstract class WebSocketConnector extends BaseConnector {
  // WebSocket connection management
  protected ws?: WebSocket;
  protected connectionState: WebSocketState;

  // Message classification
  protected abstract classifyMessage(data: any): MessageType;
  protected abstract extractJobId(data: any): string | undefined;
  protected abstract extractProgress(data: any): number;

  // Job message building
  protected abstract buildJobMessage(jobData: JobData): any;
  protected abstract parseJobResult(data: any, jobData: JobData): Promise<JobResult>;

  // Connection lifecycle
  async connect() {
    this.ws = new WebSocket(this.config.websocket_url);
    this.ws.on('message', (data) => this.handleMessage(data));
  }

  // Job processing through WebSocket
  async processJob(jobData: JobData): Promise<JobResult> {
    // 1. Build job message
    const message = this.buildJobMessage(jobData);

    // 2. Send via WebSocket
    this.ws.send(JSON.stringify(message));

    // 3. Wait for completion message
    return new Promise((resolve, reject) => {
      this.activeJobs.set(jobData.id, { resolve, reject });
    });
  }
}

Example: ComfyUI WebSocket Connector

File: apps/worker/src/connectors/comfyui-websocket-connector.ts

Our custom implementation:

typescript
class ComfyUIWebSocketConnector extends WebSocketConnector {
  service_type = 'comfyui';

  // Message classification
  protected classifyMessage(data: any): MessageType {
    switch (data.type) {
      case 'client_id': return MessageType.CONNECTION;
      case 'prompt_queued': return MessageType.JOB_SUBMIT;
      case 'executing':
        return data.data?.node === null
          ? MessageType.JOB_COMPLETE
          : MessageType.JOB_PROGRESS;
      case 'error': return MessageType.JOB_ERROR;
      default: return MessageType.UNKNOWN;
    }
  }

  // Build ComfyUI-specific job message
  protected buildJobMessage(jobData: JobData): any {
    return {
      type: 'prompt',
      data: {
        prompt: jobData.payload,  // ComfyUI workflow
        extra_data: {
          client_id: this.clientId
        }
      }
    };
  }

  // Parse ComfyUI result
  protected async parseJobResult(data: any, jobData: JobData): Promise<JobResult> {
    // 1. Get history from ComfyUI HTTP API
    const promptId = data.data.prompt_id;
    const historyUrl = `http://${host}:${port}/history/${promptId}`;
    const history = await fetch(historyUrl).then(r => r.json());

    // 2. Find output image
    const outputs = history[promptId]?.outputs;
    const imageFilename = outputs?.['node-id']?.images[0]?.filename;

    // 3. Read from local filesystem
    const imageBuffer = await fs.readFile(`/workspace/outputs/${imageFilename}`);

    // 4. Upload to Azure via AssetSaver
    const { AssetSaver } = await import('./asset-saver.js');
    const savedAsset = await AssetSaver.saveAssetToCloud(
      imageBuffer.toString('base64'),
      jobData.id,
      jobData,
      'image/png',
      'png'
    );

    // 5. Delete local file
    await fs.unlink(`/workspace/outputs/${imageFilename}`);

    // 6. Return result
    return {
      success: true,
      data: {
        image_url: savedAsset.cdnUrl,
        prompt_id: promptId
      }
    };
  }
}

Redis Integration

Job Queue Structure

Redis stores jobs using these key patterns:

jobs:{job_id}                    # Job data hash
jobs:pending                     # Sorted set of pending jobs (by priority)
jobs:active                      # Set of active job IDs
jobs:completed                   # Set of completed job IDs
jobs:failed                      # Set of failed job IDs
worker:{worker_id}:heartbeat     # Worker heartbeat timestamp
worker:{worker_id}:capabilities  # Worker capabilities hash
machine:{machine_id}:status      # Machine status hash

Lua Functions

File: packages/core/src/redis-functions/findMatchingJob.lua

Atomic job claiming:

lua
-- Find and claim a matching job for this worker
local function findMatchingJob(workerCapabilities)
  -- 1. Get all pending jobs
  local pendingJobs = redis.call('ZRANGE', 'jobs:pending', 0, 100)

  -- 2. For each job, check if worker can handle it
  for _, jobId in ipairs(pendingJobs) do
    local job = redis.call('HGETALL', 'jobs:' .. jobId)
    local serviceRequired = job.service_required

    -- 3. Check if worker has matching service
    if workerCapabilities.services[serviceRequired] then
      -- 4. Atomically claim job
      redis.call('ZREM', 'jobs:pending', jobId)
      redis.call('SADD', 'jobs:active', jobId)
      redis.call('HSET', 'jobs:' .. jobId, 'status', 'active', 'worker_id', workerId)

      return job
    end
  end

  return nil  -- No matching job found
end

Worker Capabilities

Workers register their capabilities in Redis:

json
{
  "worker_id": "comfyui-gpu0",
  "machine_id": "salad-machine-123",
  "services": {
    "comfyui": {
      "connector": "ComfyUIWebSocketConnector",
      "version": "1.0.0",
      "max_concurrent_jobs": 1
    }
  },
  "resources": {
    "gpu_index": 0,
    "gpu_memory_mb": 8192,
    "gpu_model": "NVIDIA RTX 3080"
  }
}

Complete Job Flow

1. Job Submission

[API Server]
  ↓ POST /jobs
  ↓ Validate job payload
  ↓ Determine service_required (e.g., "comfyui")

[Redis]
  ↓ ZADD jobs:pending {jobId} {priority}
  ↓ HSET jobs:{jobId} {job data}

2. Job Discovery

[Worker: comfyui-gpu0]
  ↓ Call Redis Lua function: findMatchingJob
  ↓ Pass worker capabilities

[Redis Lua Function]
  ↓ Iterate pending jobs
  ↓ Match job.service_required with worker capabilities
  ↓ Atomically claim matching job

[Worker]
  ↓ Receives claimed job

3. Job Routing

[Worker]
  ↓ Look at job.service_required = "comfyui"
  ↓ Route to ConnectorManager

[ConnectorManager]
  ↓ Find connector for "comfyui" service
  ↓ Returns ComfyUIWebSocketConnector instance

[Worker]
  ↓ Call connector.processJob(jobData)

4. Job Execution

[ComfyUIWebSocketConnector]
  ↓ Build WebSocket message from job payload
  ↓ Send via WebSocket: {type: "prompt", data: {...}}

[ComfyUI Service (PM2)]
  ↓ Receive prompt via WebSocket
  ↓ Validate workflow
  ↓ Queue for execution
  ↓ Send: {type: "prompt_queued", data: {prompt_id: "..."}}

[ComfyUIWebSocketConnector]
  ↓ Receive prompt_queued → classify as JOB_SUBMIT
  ↓ Store prompt_id correlation
  ↓ Wait for progress/completion messages

[ComfyUI Service]
  ↓ Execute workflow nodes
  ↓ Send progress: {type: "executing", data: {node: "..."}}

[ComfyUIWebSocketConnector]
  ↓ Receive progress → classify as JOB_PROGRESS
  ↓ Report to worker via progressCallback

[Worker]
  ↓ Update Redis: HSET jobs:{jobId} progress {percentage}

[ComfyUI Service]
  ↓ Save output to /workspace/outputs/
  ↓ Send completion: {type: "executing", data: {node: null}}

[ComfyUIWebSocketConnector]
  ↓ Receive completion → classify as JOB_COMPLETE
  ↓ Query ComfyUI HTTP API for output details
  ↓ Read output file from filesystem
  ↓ Upload to Azure via AssetSaver
  ↓ Delete local file
  ↓ Return JobResult to worker

5. Result Handling

[Worker]
  ↓ Receive JobResult from connector
  ↓ Update Redis:
      - ZREM jobs:pending {jobId}
      - SREM jobs:active {jobId}
      - SADD jobs:completed {jobId}
      - HSET jobs:{jobId} status "completed" result "{...}"

[Redis Pub/Sub]
  ↓ PUBLISH job:status:update {jobId}

[API Server / WebSocket Clients]
  ↓ Receive notification
  ↓ Fetch job result from Redis
  ↓ Return to user

Deployment Process

1. Configuration

File: apps/machine/.env

bash
# Worker configuration
WORKERS="comfyui,ollama"  # Which services to run
NUM_GPUS=2                # Number of GPUs available

# ComfyUI configuration
COMFYUI_INSTANCES_PER_GPU=1
COMFYUI_BASE_PORT=8188
COMFYUI_REPO_URL=https://github.com/stakeordie/ComfyUI.git
COMFYUI_BRANCH=main

# Ollama configuration
OLLAMA_DEFAULT_MODEL=llama3.2:1b
OLLAMA_BASE_PORT=11434

# Redis connection
HUB_REDIS_URL=redis://localhost:6379

2. Docker Compose Generation

bash
# Generate docker-compose.yml with correct port mappings
pnpm compose:profile

# This script:
# 1. Reads WORKERS env var
# 2. Parses service-mapping.json
# 3. Calculates port requirements per GPU
# 4. Generates docker-compose.yml with all port mappings

Generated docker-compose.yml example:

yaml
services:
  worker:
    ports:
      - "8188:8188"  # ComfyUI GPU 0
      - "8189:8189"  # ComfyUI GPU 1
      - "11434:11434"  # Ollama
    environment:
      - WORKERS=comfyui,ollama
      - NUM_GPUS=2

3. Service Installation

When container starts:

bash
# 1. Service installer reads service-mapping.json
# 2. Installs services based on WORKERS env var

# For ComfyUI:
cd /workspace
git clone https://github.com/stakeordie/ComfyUI.git
cd ComfyUI
pip install -r requirements.txt
# Install custom nodes...

# For Ollama:
curl -fsSL https://ollama.ai/install.sh | sh
ollama pull llama3.2:1b

4. PM2 Ecosystem Generation

bash
# Generate PM2 configuration
node generate-pm2-ecosystem-worker-driven.js

# Creates ecosystem.config.js with:
# - comfyui-gpu0 (port 8188)
# - comfyui-gpu1 (port 8189)
# - worker-comfyui (Redis worker)
# - worker-ollama (Redis worker)

5. Service Startup

bash
# Start PM2 services
pm2 start ecosystem.config.js

# PM2 starts:
# 1. comfyui-gpu0 → python main.py --port 8188 --listen 0.0.0.0
# 2. comfyui-gpu1 → python main.py --port 8189 --listen 0.0.0.0
# 3. worker-comfyui → WORKER_ID=comfyui-gpu0 node redis-direct-worker.js
# 4. worker-comfyui → WORKER_ID=comfyui-gpu1 node redis-direct-worker.js
# 5. worker-ollama → WORKER_ID=ollama node redis-direct-worker.js

6. Worker Initialization

Each worker:

1. Load WORKERS env var → "comfyui,ollama"
2. Read service-mapping.json
3. Find connectors for each service
4. Dynamically load connectors
5. Connect to Redis
6. Register capabilities
7. Start job polling loop

Component Reference

Key Files

Configuration

FilePurpose
apps/machine/src/config/service-mapping.jsonMaster service configuration
apps/machine/.envEnvironment variables
apps/machine/docker-compose.ymlDocker service definition

Machine App

FilePurpose
apps/machine/src/config/service-installer.jsOrchestrates service installation
apps/machine/src/config/enhanced-pm2-ecosystem-generator.jsGenerates PM2 configuration
apps/machine/src/services/comfyui-management-client.jsComfyUI installer
apps/machine/src/services/pm2-service.jsPM2 service management

Worker App

FilePurpose
apps/worker/src/redis-direct-worker.tsWorker entry point
apps/worker/src/redis-direct-base-worker.tsBase worker implementation
apps/worker/src/connector-manager.tsDynamic connector loading

Connectors

FilePurpose
apps/worker/src/connectors/base-connector.tsBase connector interface
apps/worker/src/connectors/protocol/websocket-connector.tsWebSocket base class
apps/worker/src/connectors/comfyui-websocket-connector.tsComfyUI connector
apps/worker/src/connectors/ollama-connector.tsOllama connector
apps/worker/src/connectors/openai-*.tsOpenAI API connectors

Redis Integration

FilePurpose
packages/core/src/redis-functions/findMatchingJob.luaAtomic job claiming
packages/core/src/redis-functions/Other Redis Lua functions

Environment Variables

Core Variables

VariablePurposeExample
WORKERSServices to runcomfyui,ollama
WORKER_IDUnique worker identifiercomfyui-gpu0
MACHINE_IDMachine identifiersalad-machine-123
HUB_REDIS_URLRedis connection stringredis://localhost:6379
NUM_GPUSNumber of GPUs2

Service-Specific Variables

ComfyUI:

  • COMFYUI_INSTANCES_PER_GPU - Instances per GPU (default: 1)
  • COMFYUI_BASE_PORT - Starting port (default: 8188)
  • COMFYUI_PORT_INCREMENT - Port increment per instance (default: 1)
  • COMFYUI_REPO_URL - Git repository URL
  • COMFYUI_BRANCH - Git branch to use
  • COMFYUI_HOST - Host for connector (default: localhost)
  • COMFYUI_PORT - Port for connector (matches PM2 port)

Ollama:

  • OLLAMA_DEFAULT_MODEL - Model to download (default: llama3.2:1b)
  • OLLAMA_BASE_PORT - Port (default: 11434)
  • OLLAMA_HOST - Host (default: localhost)

Port Assignment

Formula for GPU-bound services:

port = base_port + (gpu_index * port_increment) + (instance_within_gpu * port_increment)

Examples:

Single instance per GPU:

  • GPU 0, Instance 0: 8188 + (0 * 1) = 8188
  • GPU 1, Instance 0: 8188 + (1 * 1) = 8189

Multiple instances per GPU (COMFYUI_INSTANCES_PER_GPU=2):

  • GPU 0, Instance 0: 8188 + (0 * 2) + 0 = 8188
  • GPU 0, Instance 1: 8188 + (0 * 2) + 1 = 8189
  • GPU 1, Instance 0: 8188 + (1 * 2) + 0 = 8190
  • GPU 1, Instance 1: 8188 + (1 * 2) + 1 = 8191

Troubleshooting

Common Issues

Worker can't find connectors

Symptom: Failed to load connector ComfyUIWebSocketConnector

Cause: service-mapping.json not found

Fix: Ensure service-mapping.json is in one of these paths:

  • /workspace/worker-bundled/src/config/service-mapping.json
  • /workspace/src/config/service-mapping.json

Service not starting

Symptom: PM2 shows service as "errored"

Cause: Port already in use, or service installation failed

Fix:

bash
# Check PM2 logs
pm2 logs comfyui-gpu0

# Check port availability
netstat -tlnp | grep 8188

# Reinstall service
pm2 delete comfyui-gpu0
pm2 start ecosystem.config.js

Jobs not being claimed

Symptom: Jobs stay in pending state

Cause: Worker capabilities don't match job requirements

Fix:

bash
# Check worker capabilities in Redis
redis-cli HGETALL worker:{worker_id}:capabilities

# Check job requirements
redis-cli HGET jobs:{job_id} service_required

# Verify they match

Future Enhancements

Planned Improvements

  1. Specialized Machine Pools

    • Fast Lane (CPU, light workloads)
    • Standard (GPU, typical workloads)
    • Heavy (High-end GPU, video processing)
  2. Predictive Model Placement

    • Analyze job patterns
    • Pre-download models to likely machines
    • Reduce first-user wait times
  3. Enhanced Job Routing

    • Model affinity routing
    • Pool-aware job matching
    • Cross-pool fallback strategies
  4. Telemetry Integration

    • Performance metrics per service
    • Resource utilization tracking
    • Cost optimization analytics

Appendix

Service-Mapping.json Schema

typescript
interface ServiceMapping {
  workers: {
    [workerType: string]: {
      services: string[];
      is_gpu_bound: boolean;
      job_service_required_map: {
        job_service_required: string;
        worker_service: string;
      }[];
    };
  };
  services: {
    [serviceName: string]: {
      connector: string;
      type: 'pm2_service' | 'daemon_service' | 'external_api' | 'external_service';
      installer?: string;
      installer_filename?: string;
      is_gpu_bound: boolean;
      service_instances_per_gpu?: string;
      ports?: string[];
      port_increment?: string;
      installer_config?: Record<string, any>;
    };
  };
  connectors: {
    [connectorName: string]: {
      path: string;
      description: string;
    };
  };
}

Redis Job Schema

typescript
interface JobData {
  id: string;
  type: string;
  service_required: string;  // e.g., "comfyui"
  payload: any;              // Service-specific data
  priority?: number;
  created_at: string;
  ctx?: {
    storage?: {
      bucket: string;
      accountName: string;
      accountKey: string;
    };
    prefix?: string;
    filename?: string;
  };
}

Document Version: 1.0.0 Last Updated: 2025-10-14 Author: Claude Code Architecture Analysis

Released under the MIT License.