Skip to content

WebSocket Removal: Redis Events Architecture - ADR

Status: Proposed Date: 2025-11-07 Author: System Architecture Supersedes: None Related: LOGGING_ARCHITECTURE.md, monitor-events.ts


Context

The emp-job-queue system currently uses WebSocket connections between emprops-api (user-facing Next.js application) and the lightweight-api-server for bidirectional communication. This creates several architectural concerns:

Current Problems

1. Tight Coupling

  • emprops-api maintains persistent WebSocket connections to lightweight-api-server
  • Connection management overhead (reconnection logic, heartbeats, state sync)
  • Both applications must be running and network-connected for real-time updates

2. Complexity

  • WebSocket connection state management in Next.js application
  • Reconnection logic for network interruptions
  • Message queuing during disconnections
  • Duplicate event handling infrastructure

3. Scalability Concerns

  • WebSocket connections don't scale horizontally well without sticky sessions
  • Each emprops-api instance maintains separate connections
  • No connection pooling or multiplexing benefits

4. Architectural Misalignment

  • Redis is the source of truth for all job state and events
  • WebSocket creates an intermediary layer that duplicates Redis pub/sub
  • emprops-api could consume events directly from Redis (it's already a Node.js application)

5. Inconsistent Event Delivery

  • Events can be missed during WebSocket disconnections
  • Requires complex resync logic to catch up on missed events
  • No guaranteed delivery semantics

Current Architecture

┌────────────────────────────────────────────────────────────┐
│                    emprops-api (Next.js)                   │
│  ┌──────────────────────────────────────────────────────┐ │
│  │  WebSocket Client                                     │ │
│  │  - Maintains persistent connection                   │ │
│  │  - Handles reconnection logic                        │ │
│  │  - Buffers events during disconnection               │ │
│  └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
                           ↓ ↑
                   WebSocket Connection
                   (Bidirectional, Stateful)
                           ↓ ↑
┌────────────────────────────────────────────────────────────┐
│           lightweight-api-server (emp-job-queue)           │
│  ┌──────────────────────────────────────────────────────┐ │
│  │  WebSocket Server                                     │ │
│  │  - Broadcasts events to connected clients            │ │
│  │  - Manages connection state                          │ │
│  │  - Filters events per client subscription            │ │
│  └──────────────────────────────────────────────────────┘ │
│                           ↓ ↑                              │
│  ┌──────────────────────────────────────────────────────┐ │
│  │  Redis Subscriber                                     │ │
│  │  - Subscribes to machine:*:logs, job:*, etc.        │ │
│  │  - Receives events from Redis pub/sub                │ │
│  │  - Forwards to WebSocket clients                     │ │
│  └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
                           ↓ ↑
┌────────────────────────────────────────────────────────────┐
│                    Redis (Source of Truth)                 │
│  - Pub/Sub: machine:*:logs, job:events:*                 │
│  - Streams: job:events:{job_id}                           │
│  - Job state, machine state, worker state                 │
└────────────────────────────────────────────────────────────┘

Key Issue: emprops-api connects to WebSocket which connects to Redis, creating an unnecessary intermediary layer.


Decision

Remove WebSocket connections between emprops-api and emp-job-queue API. Replace with direct Redis event consumption and HTTP commands.

Architectural Shift

Inbound Commands (emprops-api → emp-job-queue):

  • Use HTTP POST requests for job submission and control operations
  • Leverage existing REST API endpoints
  • Stateless, scalable, standard HTTP semantics

Outbound Events (emp-job-queue → emprops-api):

  • Use Redis Streams and pub/sub for real-time event delivery
  • emprops-api subscribes directly to Redis (using ioredis in Next.js API routes or server components)
  • Eliminates WebSocket intermediary layer

Proposed Architecture

Component Overview

┌────────────────────────────────────────────────────────────┐
│                    emprops-api (Next.js)                   │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐ │
│  │  HTTP Client (Outbound Commands)                      │ │
│  │  - POST /api/jobs (submit job)                       │ │
│  │  - POST /api/jobs/:id/cancel                         │ │
│  │  - GET /api/jobs/:id (query state)                   │ │
│  └──────────────────────────────────────────────────────┘ │
│                           ↓                                 │
│  ┌──────────────────────────────────────────────────────┐ │
│  │  Redis Subscriber (Inbound Events)                    │ │
│  │  - Direct ioredis connection                         │ │
│  │  - Subscribe to job:events:{job_id}                  │ │
│  │  - Subscribe to machine:*:logs (if needed)           │ │
│  │  - Consumer group for Streams                        │ │
│  └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
                    ↓                            ↑
              HTTP Requests                Redis Events
                    ↓                            ↑
┌────────────────────────────────────────────────────────────┐
│           lightweight-api-server (emp-job-queue)           │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐ │
│  │  HTTP REST API                                        │ │
│  │  - POST /api/jobs (create job in Redis)             │ │
│  │  - Returns job_id immediately                        │ │
│  │  - No stateful connection required                   │ │
│  └──────────────────────────────────────────────────────┘ │
│                           ↓ ↑                              │
│  ┌──────────────────────────────────────────────────────┐ │
│  │  Redis Publisher                                      │ │
│  │  - Publishes job:events:{job_id} to Streams         │ │
│  │  - Publishes machine:*:logs to pub/sub              │ │
│  │  - No WebSocket broadcasting needed                  │ │
│  └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
                           ↓ ↑
┌────────────────────────────────────────────────────────────┐
│                    Redis (Source of Truth)                 │
│  - Streams: job:events:{job_id} (ordered event log)      │
│  - Pub/Sub: machine:*:logs (broadcast)                    │
│  - Job state, machine state, worker state                 │
└────────────────────────────────────────────────────────────┘

Event Flow Examples

Job Submission Flow

Before (WebSocket):

emprops-api → WS: {type: 'submit_job', payload: {...}}

lightweight-api-server → Redis: RPUSH job_queue {...}

lightweight-api-server → WS: {type: 'job_submitted', job_id: 'xxx'}

emprops-api ← WS: Receives job_submitted event

After (HTTP + Redis):

emprops-api → HTTP POST /api/jobs {payload: {...}}

lightweight-api-server → Redis: RPUSH job_queue {...}

lightweight-api-server ← HTTP 201: {job_id: 'xxx'}

emprops-api ← HTTP Response: {job_id: 'xxx'}

emprops-api subscribes to: job:events:xxx (Redis Stream)

Job Progress Update Flow

Before (WebSocket):

Worker → Redis: XADD job:events:{job_id} {progress: 50}

lightweight-api-server (Redis sub) → receives event

lightweight-api-server → WS broadcast: {type: 'progress', progress: 50}

emprops-api ← WS: Receives progress event

After (Direct Redis):

Worker → Redis: XADD job:events:{job_id} {progress: 50}

emprops-api (Redis sub) → receives event directly

emprops-api: Updates UI immediately

Key Benefit: Eliminates intermediate hop through lightweight-api-server.


Implementation Plan

Phase 1: Add Redis Subscription to emprops-api (2-3 Days)

Objective: Enable emprops-api to consume events directly from Redis.

1.1 Create Redis Client Singleton

File: apps/emprops-studio/src/lib/redis-client.ts (NEW)

typescript
import { Redis } from 'ioredis';

let redisClient: Redis | null = null;
let redisSubscriber: Redis | null = null;

export function getRedisClient(): Redis {
  if (!redisClient) {
    redisClient = new Redis({
      host: process.env.REDIS_HOST || 'localhost',
      port: parseInt(process.env.REDIS_PORT || '6379'),
      password: process.env.REDIS_PASSWORD,
      db: parseInt(process.env.REDIS_DB || '0'),
      retryStrategy: (times) => {
        const delay = Math.min(times * 50, 2000);
        return delay;
      },
    });
  }
  return redisClient;
}

export function getRedisSubscriber(): Redis {
  if (!redisSubscriber) {
    redisSubscriber = new Redis({
      host: process.env.REDIS_HOST || 'localhost',
      port: parseInt(process.env.REDIS_PORT || '6379'),
      password: process.env.REDIS_PASSWORD,
      db: parseInt(process.env.REDIS_DB || '0'),
    });
  }
  return redisSubscriber;
}

1.2 Create Job Event Consumer

File: apps/emprops-studio/src/lib/job-event-consumer.ts (NEW)

typescript
import { getRedisSubscriber } from './redis-client';
import type { JobEvent } from '@emp/core/types';

type JobEventHandler = (event: JobEvent) => void;

class JobEventConsumer {
  private handlers = new Map<string, Set<JobEventHandler>>();
  private subscriber = getRedisSubscriber();
  private initialized = false;

  async initialize() {
    if (this.initialized) return;

    // Subscribe to job event pattern
    await this.subscriber.psubscribe('job:events:*');

    // Handle incoming events
    this.subscriber.on('pmessage', (pattern, channel, message) => {
      const jobId = channel.split(':')[2]; // Extract job_id from job:events:{job_id}
      const handlers = this.handlers.get(jobId);

      if (handlers && handlers.size > 0) {
        try {
          const event = JSON.parse(message) as JobEvent;
          handlers.forEach(handler => handler(event));
        } catch (error) {
          console.error('[JobEventConsumer] Failed to parse event:', error);
        }
      }
    });

    this.initialized = true;
  }

  subscribeToJob(jobId: string, handler: JobEventHandler): () => void {
    if (!this.handlers.has(jobId)) {
      this.handlers.set(jobId, new Set());
    }

    this.handlers.get(jobId)!.add(handler);

    // Return unsubscribe function
    return () => {
      const handlers = this.handlers.get(jobId);
      if (handlers) {
        handlers.delete(handler);
        if (handlers.size === 0) {
          this.handlers.delete(jobId);
        }
      }
    };
  }
}

export const jobEventConsumer = new JobEventConsumer();

1.3 Create Next.js API Route for Job Events (Server-Sent Events)

File: apps/emprops-studio/src/app/api/jobs/[jobId]/events/route.ts (NEW)

typescript
import { NextRequest } from 'next/server';
import { jobEventConsumer } from '@/lib/job-event-consumer';

export const runtime = 'nodejs';

export async function GET(
  request: NextRequest,
  { params }: { params: { jobId: string } }
) {
  const { jobId } = params;

  // Initialize consumer if not already done
  await jobEventConsumer.initialize();

  // Create Server-Sent Events stream
  const stream = new TransformStream();
  const writer = stream.writable.getWriter();
  const encoder = new TextEncoder();

  // Subscribe to job events
  const unsubscribe = jobEventConsumer.subscribeToJob(jobId, (event) => {
    const message = `data: ${JSON.stringify(event)}\n\n`;
    writer.write(encoder.encode(message));
  });

  // Cleanup on client disconnect
  request.signal.addEventListener('abort', () => {
    unsubscribe();
    writer.close();
  });

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}

1.4 Create Client-Side Hook for Job Events

File: apps/emprops-studio/src/hooks/useJobEvents.ts (NEW)

typescript
import { useEffect, useState } from 'react';
import type { JobEvent } from '@emp/core/types';

export function useJobEvents(jobId: string | null) {
  const [events, setEvents] = useState<JobEvent[]>([]);
  const [loading, setLoading] = useState(true);
  const [error, setError] = useState<string | null>(null);

  useEffect(() => {
    if (!jobId) {
      setLoading(false);
      return;
    }

    setLoading(true);
    const eventSource = new EventSource(`/api/jobs/${jobId}/events`);

    eventSource.onmessage = (e) => {
      try {
        const event = JSON.parse(e.data) as JobEvent;
        setEvents(prev => [...prev, event]);
        setLoading(false);
      } catch (err) {
        console.error('Failed to parse job event:', err);
      }
    };

    eventSource.onerror = (err) => {
      console.error('EventSource error:', err);
      setError('Connection lost. Attempting to reconnect...');
      eventSource.close();
    };

    return () => {
      eventSource.close();
    };
  }, [jobId]);

  return { events, loading, error };
}

Impact: emprops-api can now receive real-time job events without WebSocket connection.


Phase 2: Convert Commands to HTTP (1-2 Days)

Objective: Replace WebSocket commands with HTTP REST API calls.

2.1 Create HTTP Client for Job Commands

File: apps/emprops-studio/src/lib/job-api-client.ts (NEW)

typescript
import { JobData } from '@emp/core/types';

const API_BASE_URL = process.env.NEXT_PUBLIC_EMP_JOB_QUEUE_API || 'http://localhost:3000';

export class JobAPIClient {
  /**
   * Submit a new job to the queue
   */
  static async submitJob(jobData: Partial<JobData>): Promise<{ job_id: string }> {
    const response = await fetch(`${API_BASE_URL}/api/jobs`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(jobData),
    });

    if (!response.ok) {
      throw new Error(`Failed to submit job: ${response.statusText}`);
    }

    return response.json();
  }

  /**
   * Cancel a running job
   */
  static async cancelJob(jobId: string): Promise<void> {
    const response = await fetch(`${API_BASE_URL}/api/jobs/${jobId}/cancel`, {
      method: 'POST',
    });

    if (!response.ok) {
      throw new Error(`Failed to cancel job: ${response.statusText}`);
    }
  }

  /**
   * Get current job state
   */
  static async getJobState(jobId: string): Promise<JobData> {
    const response = await fetch(`${API_BASE_URL}/api/jobs/${jobId}`);

    if (!response.ok) {
      throw new Error(`Failed to get job state: ${response.statusText}`);
    }

    return response.json();
  }
}

2.2 Update Job Submission Flow in emprops-studio

Before (WebSocket):

typescript
// Old WebSocket-based job submission
websocketClient.send({
  type: 'submit_job',
  payload: jobData,
});

// Listen for response
websocketClient.on('job_submitted', (event) => {
  console.log('Job submitted:', event.job_id);
});

After (HTTP + Redis Events):

typescript
// Submit job via HTTP
const { job_id } = await JobAPIClient.submitJob(jobData);

// Subscribe to job events via Redis (Server-Sent Events)
const { events } = useJobEvents(job_id);

// React to events
useEffect(() => {
  const latestEvent = events[events.length - 1];
  if (latestEvent?.event_type === 'completion') {
    console.log('Job completed:', latestEvent);
  }
}, [events]);

Impact: Command flow is now stateless HTTP, events are real-time via Redis.


Phase 3: Remove WebSocket Infrastructure (1 Day)

Objective: Clean up WebSocket code from emprops-api and optionally from lightweight-api-server.

3.1 Remove WebSocket Client from emprops-api

Files to Remove:

  • apps/emprops-studio/src/services/websocket.ts
  • apps/emprops-studio/src/hooks/useWebSocket.ts (if exists)

Files to Update:

  • Remove WebSocket imports from components
  • Update state management to use useJobEvents hook instead

3.2 (Optional) Remove WebSocket Server from lightweight-api-server

Decision Point: The monitor application still uses WebSocket for real-time updates. Options:

Option A: Keep WebSocket for Monitor Only

  • lightweight-api-server continues broadcasting events via WebSocket
  • Only monitor application connects via WebSocket
  • emprops-api uses Redis directly

Option B: Migrate Monitor to Redis Events Too

  • Remove WebSocket entirely from lightweight-api-server
  • Monitor uses same Redis event consumption pattern as emprops-api
  • Fully unified event architecture

Recommendation: Option A for now (incremental migration), Option B for future work.

Impact: Simplified emprops-api codebase, reduced connection management overhead.


Phase 4: Testing & Validation (2 Days)

4.1 Integration Tests

Test Suite: apps/emprops-studio/src/__tests__/job-events-integration.test.ts (NEW)

typescript
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { JobAPIClient } from '@/lib/job-api-client';
import { jobEventConsumer } from '@/lib/job-event-consumer';

describe('Job Events Integration', () => {
  beforeAll(async () => {
    await jobEventConsumer.initialize();
  });

  it('should receive job events after submission', async () => {
    // Submit job via HTTP
    const { job_id } = await JobAPIClient.submitJob({
      job_type: 'test',
      payload: { test: true },
    });

    // Subscribe to job events
    const receivedEvents: any[] = [];
    const unsubscribe = jobEventConsumer.subscribeToJob(job_id, (event) => {
      receivedEvents.push(event);
    });

    // Wait for events
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Verify events received
    expect(receivedEvents.length).toBeGreaterThan(0);
    expect(receivedEvents[0].type).toBe('job_submitted');

    unsubscribe();
  });

  it('should handle job cancellation via HTTP', async () => {
    const { job_id } = await JobAPIClient.submitJob({
      job_type: 'test',
      payload: { test: true },
    });

    await JobAPIClient.cancelJob(job_id);

    const state = await JobAPIClient.getJobState(job_id);
    expect(state.status).toBe('cancelled');
  });
});

4.2 Load Testing

Verify:

  • 100+ concurrent job submissions via HTTP (should scale better than WebSocket)
  • Event delivery latency (should be comparable or better than WebSocket)
  • No message loss during high load

4.3 Failure Scenarios

Test:

  • Redis connection loss → emprops-api should reconnect automatically
  • lightweight-api-server restart → emprops-api events should resume
  • Network interruption → Server-Sent Events auto-reconnect

Consequences

Benefits

1. Architectural Simplicity ✅

  • Direct event consumption from Redis (source of truth)
  • Eliminates WebSocket layer (fewer moving parts)
  • Standard HTTP semantics for commands (stateless, cacheable, scalable)

2. Better Scalability ✅

  • Horizontal scaling of emprops-api without WebSocket sticky sessions
  • No connection pooling limits (HTTP connections are short-lived)
  • Redis pub/sub scales to thousands of subscribers

3. Improved Reliability ✅

  • No WebSocket state management (no reconnection logic, heartbeats, sync)
  • Guaranteed event delivery via Redis Streams (persistent, ordered log)
  • Consumer groups for fault tolerance (multiple emprops-api instances can share load)

4. Reduced Latency ✅

  • Eliminates hop through lightweight-api-server WebSocket broadcast
  • Events arrive directly from Redis to emprops-api
  • Faster command responses (HTTP request/response vs. WebSocket round-trip)

5. Better Observability ✅

  • HTTP access logs for all commands (standard logging infrastructure)
  • Redis monitoring for event throughput and lag
  • No WebSocket connection state to track

6. Alignment with Redis-First Architecture ✅

  • Redis is already the source of truth for all state
  • Direct consumption of Redis events matches monitor architecture goal
  • Unified event system across all applications

Drawbacks

1. Server-Sent Events Limitations

Issue: Server-Sent Events (SSE) have browser connection limits (6 per domain in most browsers).

Mitigation:

  • Use single SSE connection per job (not per event type)
  • Close SSE connections when job completes
  • For multiple concurrent jobs, multiplex events through single connection

2. Next.js API Route for Event Streaming

Issue: Next.js API routes must handle long-lived connections for SSE.

Mitigation:

  • Use Node.js runtime (not Edge runtime) for SSE routes
  • Implement proper cleanup on client disconnect
  • Monitor memory usage for long-running SSE connections

3. Migration Complexity

Issue: emprops-api must be updated to use new event consumption pattern.

Mitigation:

  • Incremental rollout (Phase 1 → Phase 2 → Phase 3)
  • Run both systems in parallel during migration
  • Feature flags to switch between WebSocket and Redis events

4. Redis Dependency

Issue: emprops-api now directly depends on Redis availability.

Mitigation:

  • Redis is already a critical dependency (source of truth)
  • Implement retry logic with exponential backoff
  • Monitor Redis health and alert on connection issues

Migration Strategy

Week 1: Redis Event Infrastructure

  • Implement Redis client singleton in emprops-api
  • Create JobEventConsumer for job event subscriptions
  • Add Next.js API route for Server-Sent Events
  • Create useJobEvents React hook
  • Test event delivery from Redis → emprops-api

Week 2: HTTP Command Layer

  • Implement JobAPIClient for HTTP commands
  • Update job submission flow to use HTTP POST
  • Update job cancellation to use HTTP POST
  • Add error handling for HTTP failures
  • Test command execution via HTTP

Week 3: Parallel Operation

  • Run both systems in parallel (WebSocket + Redis events)
  • Feature flag to switch between event sources
  • Compare event delivery reliability and latency
  • Identify and fix any issues with Redis event consumption

Week 4: WebSocket Removal

  • Switch emprops-api to Redis events only
  • Remove WebSocket client code from emprops-api
  • Monitor production metrics (latency, error rates, event delivery)
  • (Optional) Remove WebSocket server from lightweight-api-server if monitor also migrated

Success Metrics

Quantitative

  1. Event Delivery Latency

    • Target: < 100ms from Redis publish to emprops-api handler
    • Baseline: Current WebSocket latency ~150-200ms (includes intermediary hop)
  2. Command Response Time

    • Target: HTTP POST /api/jobs < 50ms (vs. WebSocket round-trip ~100ms)
    • Baseline: Current WebSocket command latency
  3. Scalability

    • Target: Support 10+ concurrent emprops-api instances (horizontal scaling)
    • Baseline: WebSocket requires sticky sessions
  4. Reliability

    • Target: Zero event loss during normal operation
    • Target: < 1 second event delivery lag during Redis reconnection
    • Baseline: Current WebSocket reconnection lag ~2-5 seconds

Qualitative

  1. Codebase Simplicity

    • Reduced lines of code (WebSocket client/server removal)
    • Fewer dependencies (no WebSocket libraries)
    • Standard HTTP/Redis patterns (easier onboarding)
  2. Developer Experience

    • Easier debugging (HTTP access logs, Redis monitoring)
    • Standard tooling (curl for commands, redis-cli for events)
    • Clear separation of concerns (commands vs. events)

Alternative Approaches Considered

❌ Alternative 1: Keep WebSocket for Everything

Why Rejected:

  • Tight coupling between emprops-api and lightweight-api-server
  • Scalability issues with WebSocket sticky sessions
  • Duplicate event infrastructure (Redis pub/sub + WebSocket broadcast)
  • Connection management overhead

❌ Alternative 2: Use WebSocket for Events, HTTP for Commands

Why Rejected:

  • Still requires WebSocket connection state management
  • Doesn't simplify architecture (two communication channels)
  • WebSocket doesn't provide benefits over Redis pub/sub for events

❌ Alternative 3: Polling for Events

Why Rejected:

  • Inefficient (constant HTTP requests)
  • Higher latency (poll interval delay)
  • Increased server load (unnecessary requests)

✅ Alternative 4: HTTP Commands + Redis Events (THIS ADR)

Why Chosen:

  • Eliminates WebSocket entirely (simpler architecture)
  • Direct Redis consumption (aligns with source of truth)
  • Standard HTTP semantics (scalable, cacheable, well-understood)
  • Better reliability (Redis Streams for guaranteed delivery)
  • Incremental migration (can run both systems in parallel)

  • LOGGING_ARCHITECTURE.md - Redis log streaming (Phase 2 implemented)
  • monitor-events.ts - Event type definitions for real-time monitoring

Dependencies

Infrastructure

  • Redis 6.0+ (for Streams and pub/sub)
  • Next.js 14+ (for App Router API routes with streaming support)
  • ioredis (Node.js Redis client)

Code Changes

  • apps/emprops-studio/src/lib/redis-client.ts (NEW)
  • apps/emprops-studio/src/lib/job-event-consumer.ts (NEW)
  • apps/emprops-studio/src/lib/job-api-client.ts (NEW)
  • apps/emprops-studio/src/hooks/useJobEvents.ts (NEW)
  • apps/emprops-studio/src/app/api/jobs/[jobId]/events/route.ts (NEW)
  • apps/emprops-studio/src/services/websocket.ts (REMOVED)

Appendix: Redis Event Patterns

Job Events (Redis Streams)

Pattern: job:events:{job_id}

Why Streams:

  • Ordered event log (chronological history)
  • Persistent (events survive Redis restart)
  • Consumer groups (multiple consumers can share load)
  • Acknowledgement (exactly-once delivery)

Example:

bash
# Worker publishes job progress
XADD job:events:abc123 * event_type progress progress 50 timestamp 1704672000

# emprops-api consumes events
XREAD BLOCK 0 STREAMS job:events:abc123 $

Machine Logs (Redis Pub/Sub)

Pattern: machine:*:logs

Why Pub/Sub:

  • Broadcast to all interested subscribers
  • Low latency (no persistence overhead)
  • Pattern-based subscription (machine:gpu-1:logs, machine:cpu-2:logs)

Example:

bash
# Machine publishes log
PUBLISH machine:gpu-1:logs '{"level":"info","message":"ComfyUI started"}'

# emprops-api subscribes (if needed)
PSUBSCRIBE machine:*:logs

End of ADR

Released under the MIT License.