WebSocket API Reference
The Job Queue WebSocket API provides real-time event streaming for job monitoring, worker status, and system observability.
WebSocket URL: wss://job-queue-api.railway.app (production) Local Development: ws://localhost:3000 (from .env.local-dev)
Implementation: /apps/api/src/lightweight-api-server.ts
Connection
Initial Handshake
// Client connects via WebSocket
const ws = new WebSocket('wss://job-queue-api.railway.app');
// Server assigns client ID and tracks connection
ws.on('open', () => {
console.log('Connected to Job Queue API');
});Authentication
Authorization: Bearer {AUTH_TOKEN}The AUTH_TOKEN must match the server's environment variable. Failure results in connection rejection.
Client Types
Monitor Clients
Real-time monitoring dashboards (e.g., /apps/monitor).
Subscriptions: All system events by default
- Workers:
worker_connected,worker_disconnected,worker_status_changed - Jobs:
job_submitted,job_status_changed,update_job_progress,complete_job,job_failed - Machines:
machine_status_updated - System:
full_state_snapshot,heartbeat_ack
EmProps Clients
EmProps API or UI clients tracking specific jobs.
Subscriptions: Job-specific events only
- Must explicitly subscribe to job IDs
- Receives events only for subscribed jobs
Message Format
Base Event Structure
{
type: string
timestamp: number
[key: string]: any // Event-specific data
}EmProps-Compatible Format
For EmProps clients, events are automatically adapted:
{
type: string
job_id?: string
timestamp: number
// Event-specific fields...
}Event Types
Job Lifecycle Events
job_submitted
Fired when a job is submitted to the queue.
{
type: "job_submitted"
timestamp: number
job_id: string
job_type: string
requirements: {
connector?: string
connector_capabilities?: string[]
models?: string[]
custom_nodes?: string[]
}
workflow_context?: {
workflow_id: string
workflow_priority: number
workflow_datetime: number
total_steps: number
current_step: number
}
}job_assigned
Fired when a worker claims a job.
{
type: "job_assigned"
timestamp: number
job_id: string
worker_id: string
machine_id: string
}job_status_changed
Fired when job status transitions.
{
type: "job_status_changed"
timestamp: number
job_id: string
old_status: "pending" | "claimed" | "processing"
new_status: "claimed" | "processing" | "completed" | "failed"
worker_id?: string
machine_id?: string
}update_job_progress
Fired during job execution (throttled to 1/second per job).
{
type: "update_job_progress"
timestamp: number
job_id: string
progress: number // 0-100
message?: string
worker_id: string
machine_id: string
}complete_job
Fired when job completes successfully.
{
type: "complete_job"
timestamp: number
job_id: string
result: any
execution_time_ms: number
worker_id: string
machine_id: string
}job_failed
Fired when job fails permanently.
{
type: "job_failed"
timestamp: number
job_id: string
error: string
error_details?: any
worker_id?: string
machine_id?: string
}Worker Events
worker_connected
Fired when a worker registers with the queue.
{
type: "worker_connected"
timestamp: number
worker_id: string
machine_id: string
capabilities: {
connector: string // "comfyui_remote", "ollama_remote", etc.
connector_capabilities: string[]
models?: string[]
custom_nodes?: string[]
}
}worker_disconnected
Fired when a worker disconnects.
{
type: "worker_disconnected"
timestamp: number
worker_id: string
machine_id: string
reason?: string
}worker_status_changed
Fired when worker status changes.
{
type: "worker_status_changed"
timestamp: number
worker_id: string
machine_id: string
old_status: "idle" | "busy"
new_status: "idle" | "busy" | "offline"
current_job_id?: string
}Machine Events
machine_status_updated
Comprehensive machine status update (every 15 seconds).
{
type: "machine_status_updated"
timestamp: number
machine_id: string
status: {
workers: {
worker_id: string
connector: string
connector_capabilities: string[]
status: "idle" | "busy" | "offline"
current_job_id?: string
last_seen: number
}[]
services: {
service_id: string
service_type: string // "comfyui", "ollama", etc.
host: string
port: number
status: "online" | "offline"
last_health_check: number
}[]
last_seen: number
cumulative_stats: {
total_jobs_completed: number
total_jobs_failed: number
uptime_ms: number
}
}
}Dual Status System:
- 15-second periodic updates: Full comprehensive status
- Immediate change events: Worker/service status changes
Persistence: Components remain visible once registered, status reflects current state.
System Events
full_state_snapshot
Complete system state (sent on initial connection + every 30 seconds).
{
type: "full_state_snapshot"
timestamp: number
state: {
workers: WorkerStatus[]
jobs: {
pending: number
processing: number
completed_last_hour: number
}
machines: MachineStatus[]
queue_depth: number
}
}heartbeat_ack
Response to client heartbeat ping.
{
type: "heartbeat_ack"
timestamp: number
client_id: string
}Client Operations
Subscribe to Job Updates (EmProps Clients)
// Client-side
ws.send(JSON.stringify({
type: 'subscribe_job',
job_id: 'abc-123'
}));
// Server adds job to client's subscription set
// Client will now receive events for this jobUnsubscribe from Job
ws.send(JSON.stringify({
type: 'unsubscribe_job',
job_id: 'abc-123'
}));Heartbeat
// Client sends heartbeat every 30 seconds
ws.send(JSON.stringify({
type: 'heartbeat',
timestamp: Date.now()
}));
// Server responds with heartbeat_ack
// Monitors with >3 missed pongs are disconnectedConnection Management
Ping/Pong Protocol
Monitors:
- Server sends ping every 30 seconds
- Expects pong response within timeout
- Disconnects after 3 missed pongs
EmProps Clients:
- Server sends ping every 30 seconds
- Expects pong response within timeout
- Disconnects after 3 missed pongs
Reconnection Strategy
let reconnectAttempts = 0;
const maxReconnectDelay = 30000; // 30 seconds
function connect() {
const ws = new WebSocket(wsUrl);
ws.on('close', (code, reason) => {
const delay = Math.min(
1000 * Math.pow(2, reconnectAttempts),
maxReconnectDelay
);
setTimeout(() => {
reconnectAttempts++;
connect();
}, delay);
});
ws.on('open', () => {
reconnectAttempts = 0;
// Resubscribe to jobs if needed
});
}Close Codes
1000- Normal closure1001- Going away1006- Abnormal closure (connection lost)1008- Policy violation (auth failure)4000- Custom: Heartbeat timeout
Event Flow Examples
Job Submission to Completion
Submit Job (REST or internal)
→ POST /submit-jobjob_submitted (WebSocket)
typescript{ type: "job_submitted", job_id: "step-abc-123", job_type: "comfyui", requirements: {...} }job_assigned (WebSocket)
typescript{ type: "job_assigned", job_id: "step-abc-123", worker_id: "worker-xyz", machine_id: "machine-001" }job_status_changed (WebSocket)
typescript{ type: "job_status_changed", job_id: "step-abc-123", old_status: "claimed", new_status: "processing" }update_job_progress (WebSocket, multiple times)
typescript{ type: "update_job_progress", job_id: "step-abc-123", progress: 50 }complete_job (WebSocket)
typescript{ type: "complete_job", job_id: "step-abc-123", result: {...}, execution_time_ms: 5432 }
Worker Registration
Worker Connects (WebSocket)
typescript// Worker sends register message { type: 'register_worker', worker_id: 'worker-xyz', machine_id: 'machine-001', capabilities: {...} }worker_connected (Broadcast to monitors)
typescript{ type: "worker_connected", worker_id: "worker-xyz", machine_id: "machine-001", capabilities: {...} }machine_status_updated (Periodic broadcast)
typescript{ type: "machine_status_updated", machine_id: "machine-001", status: { workers: [{worker_id: "worker-xyz", ...}], services: [...], last_seen: 1234567890 } }
Advanced Features
Workflow Context Propagation
Jobs can include workflow context for multi-step operations:
{
workflow_id: "job-abc", // Parent job ID
workflow_priority: 75,
workflow_datetime: 1234567890,
total_steps: 3,
current_step: 2
}This enables:
- Grouped workflow visualization
- Priority-based routing
- Progress tracking across steps
Event Filtering (Client-Side)
Clients can filter events locally:
ws.on('message', (data) => {
const event = JSON.parse(data);
// Monitor: filter by machine
if (event.machine_id === selectedMachineId) {
handleEvent(event);
}
// EmProps: already filtered server-side by subscription
handleJobEvent(event);
});CORS and Security
Allowed Origins
Configured via CORS_ORIGINS environment variable:
CORS_ORIGINS=https://app.emprops.com,https://monitor.emprops.comToken Validation
// Server validates on connection
const authHeader = request.headers['authorization'];
const token = authHeader?.replace('Bearer ', '');
if (token !== process.env.AUTH_TOKEN) {
ws.close(1008, 'Unauthorized');
}Implementation Details
EventBroadcaster Architecture
Location: /packages/core/src/services/event-broadcaster.ts
class EventBroadcaster {
private monitors: Map<string, WebSocket>
private clients: Map<string, ClientConnection>
private subscriptions: Map<string, MonitorSubscription>
broadcast(event: MonitorEvent) {
// Send to monitors (all events)
for (const [id, ws] of this.monitors) {
if (this.shouldReceiveEvent(id, event)) {
this.sendToMonitor(ws, event);
}
}
// Send to clients (job-specific)
this.broadcastToClients(event);
}
}Progress Streaming
Location: /apps/api/src/lightweight-api-server.ts
// Subscribe to Redis progress channel
progressSubscriber.subscribe(`job:progress:*`);
progressSubscriber.on('message', (channel, message) => {
const jobId = channel.split(':')[2];
const progressData = JSON.parse(message);
// Broadcast via WebSocket
eventBroadcaster.broadcast({
type: 'update_job_progress',
timestamp: Date.now(),
job_id: jobId,
progress: progressData.progress
});
});File Locations
- Server:
/apps/api/src/lightweight-api-server.ts - Event Broadcaster:
/packages/core/src/services/event-broadcaster.ts - Event Types:
/packages/core/src/types/monitor-events.ts - Monitor Client:
/apps/monitor/src/hooks/useMonitorWebSocket.ts
See Also
- REST API Reference - HTTP endpoints
- Internal APIs Reference - Redis channels and webhooks
- Architecture Overview - System design
