Skip to content

WebSocket API Reference

The Job Queue WebSocket API provides real-time event streaming for job monitoring, worker status, and system observability.

WebSocket URL: wss://job-queue-api.railway.app (production) Local Development: ws://localhost:3000 (from .env.local-dev)

Implementation: /apps/api/src/lightweight-api-server.ts

Connection

Initial Handshake

typescript
// Client connects via WebSocket
const ws = new WebSocket('wss://job-queue-api.railway.app');

// Server assigns client ID and tracks connection
ws.on('open', () => {
  console.log('Connected to Job Queue API');
});

Authentication

http
Authorization: Bearer {AUTH_TOKEN}

The AUTH_TOKEN must match the server's environment variable. Failure results in connection rejection.

Client Types

Monitor Clients

Real-time monitoring dashboards (e.g., /apps/monitor).

Subscriptions: All system events by default

  • Workers: worker_connected, worker_disconnected, worker_status_changed
  • Jobs: job_submitted, job_status_changed, update_job_progress, complete_job, job_failed
  • Machines: machine_status_updated
  • System: full_state_snapshot, heartbeat_ack

EmProps Clients

EmProps API or UI clients tracking specific jobs.

Subscriptions: Job-specific events only

  • Must explicitly subscribe to job IDs
  • Receives events only for subscribed jobs

Message Format

Base Event Structure

typescript
{
  type: string
  timestamp: number
  [key: string]: any  // Event-specific data
}

EmProps-Compatible Format

For EmProps clients, events are automatically adapted:

typescript
{
  type: string
  job_id?: string
  timestamp: number
  // Event-specific fields...
}

Event Types

Job Lifecycle Events

job_submitted

Fired when a job is submitted to the queue.

typescript
{
  type: "job_submitted"
  timestamp: number
  job_id: string
  job_type: string
  requirements: {
    connector?: string
    connector_capabilities?: string[]
    models?: string[]
    custom_nodes?: string[]
  }
  workflow_context?: {
    workflow_id: string
    workflow_priority: number
    workflow_datetime: number
    total_steps: number
    current_step: number
  }
}

job_assigned

Fired when a worker claims a job.

typescript
{
  type: "job_assigned"
  timestamp: number
  job_id: string
  worker_id: string
  machine_id: string
}

job_status_changed

Fired when job status transitions.

typescript
{
  type: "job_status_changed"
  timestamp: number
  job_id: string
  old_status: "pending" | "claimed" | "processing"
  new_status: "claimed" | "processing" | "completed" | "failed"
  worker_id?: string
  machine_id?: string
}

update_job_progress

Fired during job execution (throttled to 1/second per job).

typescript
{
  type: "update_job_progress"
  timestamp: number
  job_id: string
  progress: number  // 0-100
  message?: string
  worker_id: string
  machine_id: string
}

complete_job

Fired when job completes successfully.

typescript
{
  type: "complete_job"
  timestamp: number
  job_id: string
  result: any
  execution_time_ms: number
  worker_id: string
  machine_id: string
}

job_failed

Fired when job fails permanently.

typescript
{
  type: "job_failed"
  timestamp: number
  job_id: string
  error: string
  error_details?: any
  worker_id?: string
  machine_id?: string
}

Worker Events

worker_connected

Fired when a worker registers with the queue.

typescript
{
  type: "worker_connected"
  timestamp: number
  worker_id: string
  machine_id: string
  capabilities: {
    connector: string  // "comfyui_remote", "ollama_remote", etc.
    connector_capabilities: string[]
    models?: string[]
    custom_nodes?: string[]
  }
}

worker_disconnected

Fired when a worker disconnects.

typescript
{
  type: "worker_disconnected"
  timestamp: number
  worker_id: string
  machine_id: string
  reason?: string
}

worker_status_changed

Fired when worker status changes.

typescript
{
  type: "worker_status_changed"
  timestamp: number
  worker_id: string
  machine_id: string
  old_status: "idle" | "busy"
  new_status: "idle" | "busy" | "offline"
  current_job_id?: string
}

Machine Events

machine_status_updated

Comprehensive machine status update (every 15 seconds).

typescript
{
  type: "machine_status_updated"
  timestamp: number
  machine_id: string
  status: {
    workers: {
      worker_id: string
      connector: string
      connector_capabilities: string[]
      status: "idle" | "busy" | "offline"
      current_job_id?: string
      last_seen: number
    }[]
    services: {
      service_id: string
      service_type: string  // "comfyui", "ollama", etc.
      host: string
      port: number
      status: "online" | "offline"
      last_health_check: number
    }[]
    last_seen: number
    cumulative_stats: {
      total_jobs_completed: number
      total_jobs_failed: number
      uptime_ms: number
    }
  }
}

Dual Status System:

  • 15-second periodic updates: Full comprehensive status
  • Immediate change events: Worker/service status changes

Persistence: Components remain visible once registered, status reflects current state.

System Events

full_state_snapshot

Complete system state (sent on initial connection + every 30 seconds).

typescript
{
  type: "full_state_snapshot"
  timestamp: number
  state: {
    workers: WorkerStatus[]
    jobs: {
      pending: number
      processing: number
      completed_last_hour: number
    }
    machines: MachineStatus[]
    queue_depth: number
  }
}

heartbeat_ack

Response to client heartbeat ping.

typescript
{
  type: "heartbeat_ack"
  timestamp: number
  client_id: string
}

Client Operations

Subscribe to Job Updates (EmProps Clients)

typescript
// Client-side
ws.send(JSON.stringify({
  type: 'subscribe_job',
  job_id: 'abc-123'
}));

// Server adds job to client's subscription set
// Client will now receive events for this job

Unsubscribe from Job

typescript
ws.send(JSON.stringify({
  type: 'unsubscribe_job',
  job_id: 'abc-123'
}));

Heartbeat

typescript
// Client sends heartbeat every 30 seconds
ws.send(JSON.stringify({
  type: 'heartbeat',
  timestamp: Date.now()
}));

// Server responds with heartbeat_ack
// Monitors with >3 missed pongs are disconnected

Connection Management

Ping/Pong Protocol

Monitors:

  • Server sends ping every 30 seconds
  • Expects pong response within timeout
  • Disconnects after 3 missed pongs

EmProps Clients:

  • Server sends ping every 30 seconds
  • Expects pong response within timeout
  • Disconnects after 3 missed pongs

Reconnection Strategy

typescript
let reconnectAttempts = 0;
const maxReconnectDelay = 30000;  // 30 seconds

function connect() {
  const ws = new WebSocket(wsUrl);
  
  ws.on('close', (code, reason) => {
    const delay = Math.min(
      1000 * Math.pow(2, reconnectAttempts),
      maxReconnectDelay
    );
    
    setTimeout(() => {
      reconnectAttempts++;
      connect();
    }, delay);
  });
  
  ws.on('open', () => {
    reconnectAttempts = 0;
    // Resubscribe to jobs if needed
  });
}

Close Codes

  • 1000 - Normal closure
  • 1001 - Going away
  • 1006 - Abnormal closure (connection lost)
  • 1008 - Policy violation (auth failure)
  • 4000 - Custom: Heartbeat timeout

Event Flow Examples

Job Submission to Completion

  1. Submit Job (REST or internal)

    → POST /submit-job
  2. job_submitted (WebSocket)

    typescript
    {
      type: "job_submitted",
      job_id: "step-abc-123",
      job_type: "comfyui",
      requirements: {...}
    }
  3. job_assigned (WebSocket)

    typescript
    {
      type: "job_assigned",
      job_id: "step-abc-123",
      worker_id: "worker-xyz",
      machine_id: "machine-001"
    }
  4. job_status_changed (WebSocket)

    typescript
    {
      type: "job_status_changed",
      job_id: "step-abc-123",
      old_status: "claimed",
      new_status: "processing"
    }
  5. update_job_progress (WebSocket, multiple times)

    typescript
    {
      type: "update_job_progress",
      job_id: "step-abc-123",
      progress: 50
    }
  6. complete_job (WebSocket)

    typescript
    {
      type: "complete_job",
      job_id: "step-abc-123",
      result: {...},
      execution_time_ms: 5432
    }

Worker Registration

  1. Worker Connects (WebSocket)

    typescript
    // Worker sends register message
    {
      type: 'register_worker',
      worker_id: 'worker-xyz',
      machine_id: 'machine-001',
      capabilities: {...}
    }
  2. worker_connected (Broadcast to monitors)

    typescript
    {
      type: "worker_connected",
      worker_id: "worker-xyz",
      machine_id: "machine-001",
      capabilities: {...}
    }
  3. machine_status_updated (Periodic broadcast)

    typescript
    {
      type: "machine_status_updated",
      machine_id: "machine-001",
      status: {
        workers: [{worker_id: "worker-xyz", ...}],
        services: [...],
        last_seen: 1234567890
      }
    }

Advanced Features

Workflow Context Propagation

Jobs can include workflow context for multi-step operations:

typescript
{
  workflow_id: "job-abc",  // Parent job ID
  workflow_priority: 75,
  workflow_datetime: 1234567890,
  total_steps: 3,
  current_step: 2
}

This enables:

  • Grouped workflow visualization
  • Priority-based routing
  • Progress tracking across steps

Event Filtering (Client-Side)

Clients can filter events locally:

typescript
ws.on('message', (data) => {
  const event = JSON.parse(data);
  
  // Monitor: filter by machine
  if (event.machine_id === selectedMachineId) {
    handleEvent(event);
  }
  
  // EmProps: already filtered server-side by subscription
  handleJobEvent(event);
});

CORS and Security

Allowed Origins

Configured via CORS_ORIGINS environment variable:

typescript
CORS_ORIGINS=https://app.emprops.com,https://monitor.emprops.com

Token Validation

typescript
// Server validates on connection
const authHeader = request.headers['authorization'];
const token = authHeader?.replace('Bearer ', '');

if (token !== process.env.AUTH_TOKEN) {
  ws.close(1008, 'Unauthorized');
}

Implementation Details

EventBroadcaster Architecture

Location: /packages/core/src/services/event-broadcaster.ts

typescript
class EventBroadcaster {
  private monitors: Map<string, WebSocket>
  private clients: Map<string, ClientConnection>
  private subscriptions: Map<string, MonitorSubscription>
  
  broadcast(event: MonitorEvent) {
    // Send to monitors (all events)
    for (const [id, ws] of this.monitors) {
      if (this.shouldReceiveEvent(id, event)) {
        this.sendToMonitor(ws, event);
      }
    }
    
    // Send to clients (job-specific)
    this.broadcastToClients(event);
  }
}

Progress Streaming

Location: /apps/api/src/lightweight-api-server.ts

typescript
// Subscribe to Redis progress channel
progressSubscriber.subscribe(`job:progress:*`);

progressSubscriber.on('message', (channel, message) => {
  const jobId = channel.split(':')[2];
  const progressData = JSON.parse(message);
  
  // Broadcast via WebSocket
  eventBroadcaster.broadcast({
    type: 'update_job_progress',
    timestamp: Date.now(),
    job_id: jobId,
    progress: progressData.progress
  });
});

File Locations

  • Server: /apps/api/src/lightweight-api-server.ts
  • Event Broadcaster: /packages/core/src/services/event-broadcaster.ts
  • Event Types: /packages/core/src/types/monitor-events.ts
  • Monitor Client: /apps/monitor/src/hooks/useMonitorWebSocket.ts

See Also

Released under the MIT License.