WebSocket Removal: Redis Events Architecture - ADR
Status: Proposed Date: 2025-11-07 Author: System Architecture Supersedes: None Related: LOGGING_ARCHITECTURE.md, monitor-events.ts
Context
The emp-job-queue system currently uses WebSocket connections between emprops-api (user-facing Next.js application) and the lightweight-api-server for bidirectional communication. This creates several architectural concerns:
Current Problems
1. Tight Coupling
- emprops-api maintains persistent WebSocket connections to lightweight-api-server
- Connection management overhead (reconnection logic, heartbeats, state sync)
- Both applications must be running and network-connected for real-time updates
2. Complexity
- WebSocket connection state management in Next.js application
- Reconnection logic for network interruptions
- Message queuing during disconnections
- Duplicate event handling infrastructure
3. Scalability Concerns
- WebSocket connections don't scale horizontally well without sticky sessions
- Each emprops-api instance maintains separate connections
- No connection pooling or multiplexing benefits
4. Architectural Misalignment
- Redis is the source of truth for all job state and events
- WebSocket creates an intermediary layer that duplicates Redis pub/sub
- emprops-api could consume events directly from Redis (it's already a Node.js application)
5. Inconsistent Event Delivery
- Events can be missed during WebSocket disconnections
- Requires complex resync logic to catch up on missed events
- No guaranteed delivery semantics
Current Architecture
┌────────────────────────────────────────────────────────────┐
│ emprops-api (Next.js) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ WebSocket Client │ │
│ │ - Maintains persistent connection │ │
│ │ - Handles reconnection logic │ │
│ │ - Buffers events during disconnection │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
↓ ↑
WebSocket Connection
(Bidirectional, Stateful)
↓ ↑
┌────────────────────────────────────────────────────────────┐
│ lightweight-api-server (emp-job-queue) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ WebSocket Server │ │
│ │ - Broadcasts events to connected clients │ │
│ │ - Manages connection state │ │
│ │ - Filters events per client subscription │ │
│ └──────────────────────────────────────────────────────┘ │
│ ↓ ↑ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Redis Subscriber │ │
│ │ - Subscribes to machine:*:logs, job:*, etc. │ │
│ │ - Receives events from Redis pub/sub │ │
│ │ - Forwards to WebSocket clients │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
↓ ↑
┌────────────────────────────────────────────────────────────┐
│ Redis (Source of Truth) │
│ - Pub/Sub: machine:*:logs, job:events:* │
│ - Streams: job:events:{job_id} │
│ - Job state, machine state, worker state │
└────────────────────────────────────────────────────────────┘Key Issue: emprops-api connects to WebSocket which connects to Redis, creating an unnecessary intermediary layer.
Decision
Remove WebSocket connections between emprops-api and emp-job-queue API. Replace with direct Redis event consumption and HTTP commands.
Architectural Shift
Inbound Commands (emprops-api → emp-job-queue):
- Use HTTP POST requests for job submission and control operations
- Leverage existing REST API endpoints
- Stateless, scalable, standard HTTP semantics
Outbound Events (emp-job-queue → emprops-api):
- Use Redis Streams and pub/sub for real-time event delivery
- emprops-api subscribes directly to Redis (using ioredis in Next.js API routes or server components)
- Eliminates WebSocket intermediary layer
Proposed Architecture
Component Overview
┌────────────────────────────────────────────────────────────┐
│ emprops-api (Next.js) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ HTTP Client (Outbound Commands) │ │
│ │ - POST /api/jobs (submit job) │ │
│ │ - POST /api/jobs/:id/cancel │ │
│ │ - GET /api/jobs/:id (query state) │ │
│ └──────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Redis Subscriber (Inbound Events) │ │
│ │ - Direct ioredis connection │ │
│ │ - Subscribe to job:events:{job_id} │ │
│ │ - Subscribe to machine:*:logs (if needed) │ │
│ │ - Consumer group for Streams │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
↓ ↑
HTTP Requests Redis Events
↓ ↑
┌────────────────────────────────────────────────────────────┐
│ lightweight-api-server (emp-job-queue) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ HTTP REST API │ │
│ │ - POST /api/jobs (create job in Redis) │ │
│ │ - Returns job_id immediately │ │
│ │ - No stateful connection required │ │
│ └──────────────────────────────────────────────────────┘ │
│ ↓ ↑ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Redis Publisher │ │
│ │ - Publishes job:events:{job_id} to Streams │ │
│ │ - Publishes machine:*:logs to pub/sub │ │
│ │ - No WebSocket broadcasting needed │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
↓ ↑
┌────────────────────────────────────────────────────────────┐
│ Redis (Source of Truth) │
│ - Streams: job:events:{job_id} (ordered event log) │
│ - Pub/Sub: machine:*:logs (broadcast) │
│ - Job state, machine state, worker state │
└────────────────────────────────────────────────────────────┘Event Flow Examples
Job Submission Flow
Before (WebSocket):
emprops-api → WS: {type: 'submit_job', payload: {...}}
↓
lightweight-api-server → Redis: RPUSH job_queue {...}
↓
lightweight-api-server → WS: {type: 'job_submitted', job_id: 'xxx'}
↓
emprops-api ← WS: Receives job_submitted eventAfter (HTTP + Redis):
emprops-api → HTTP POST /api/jobs {payload: {...}}
↓
lightweight-api-server → Redis: RPUSH job_queue {...}
↓
lightweight-api-server ← HTTP 201: {job_id: 'xxx'}
↓
emprops-api ← HTTP Response: {job_id: 'xxx'}
↓
emprops-api subscribes to: job:events:xxx (Redis Stream)Job Progress Update Flow
Before (WebSocket):
Worker → Redis: XADD job:events:{job_id} {progress: 50}
↓
lightweight-api-server (Redis sub) → receives event
↓
lightweight-api-server → WS broadcast: {type: 'progress', progress: 50}
↓
emprops-api ← WS: Receives progress eventAfter (Direct Redis):
Worker → Redis: XADD job:events:{job_id} {progress: 50}
↓
emprops-api (Redis sub) → receives event directly
↓
emprops-api: Updates UI immediatelyKey Benefit: Eliminates intermediate hop through lightweight-api-server.
Implementation Plan
Phase 1: Add Redis Subscription to emprops-api (2-3 Days)
Objective: Enable emprops-api to consume events directly from Redis.
1.1 Create Redis Client Singleton
File: apps/emprops-studio/src/lib/redis-client.ts (NEW)
import { Redis } from 'ioredis';
let redisClient: Redis | null = null;
let redisSubscriber: Redis | null = null;
export function getRedisClient(): Redis {
if (!redisClient) {
redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0'),
retryStrategy: (times) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
});
}
return redisClient;
}
export function getRedisSubscriber(): Redis {
if (!redisSubscriber) {
redisSubscriber = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0'),
});
}
return redisSubscriber;
}1.2 Create Job Event Consumer
File: apps/emprops-studio/src/lib/job-event-consumer.ts (NEW)
import { getRedisSubscriber } from './redis-client';
import type { JobEvent } from '@emp/core/types';
type JobEventHandler = (event: JobEvent) => void;
class JobEventConsumer {
private handlers = new Map<string, Set<JobEventHandler>>();
private subscriber = getRedisSubscriber();
private initialized = false;
async initialize() {
if (this.initialized) return;
// Subscribe to job event pattern
await this.subscriber.psubscribe('job:events:*');
// Handle incoming events
this.subscriber.on('pmessage', (pattern, channel, message) => {
const jobId = channel.split(':')[2]; // Extract job_id from job:events:{job_id}
const handlers = this.handlers.get(jobId);
if (handlers && handlers.size > 0) {
try {
const event = JSON.parse(message) as JobEvent;
handlers.forEach(handler => handler(event));
} catch (error) {
console.error('[JobEventConsumer] Failed to parse event:', error);
}
}
});
this.initialized = true;
}
subscribeToJob(jobId: string, handler: JobEventHandler): () => void {
if (!this.handlers.has(jobId)) {
this.handlers.set(jobId, new Set());
}
this.handlers.get(jobId)!.add(handler);
// Return unsubscribe function
return () => {
const handlers = this.handlers.get(jobId);
if (handlers) {
handlers.delete(handler);
if (handlers.size === 0) {
this.handlers.delete(jobId);
}
}
};
}
}
export const jobEventConsumer = new JobEventConsumer();1.3 Create Next.js API Route for Job Events (Server-Sent Events)
File: apps/emprops-studio/src/app/api/jobs/[jobId]/events/route.ts (NEW)
import { NextRequest } from 'next/server';
import { jobEventConsumer } from '@/lib/job-event-consumer';
export const runtime = 'nodejs';
export async function GET(
request: NextRequest,
{ params }: { params: { jobId: string } }
) {
const { jobId } = params;
// Initialize consumer if not already done
await jobEventConsumer.initialize();
// Create Server-Sent Events stream
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const encoder = new TextEncoder();
// Subscribe to job events
const unsubscribe = jobEventConsumer.subscribeToJob(jobId, (event) => {
const message = `data: ${JSON.stringify(event)}\n\n`;
writer.write(encoder.encode(message));
});
// Cleanup on client disconnect
request.signal.addEventListener('abort', () => {
unsubscribe();
writer.close();
});
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}1.4 Create Client-Side Hook for Job Events
File: apps/emprops-studio/src/hooks/useJobEvents.ts (NEW)
import { useEffect, useState } from 'react';
import type { JobEvent } from '@emp/core/types';
export function useJobEvents(jobId: string | null) {
const [events, setEvents] = useState<JobEvent[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
if (!jobId) {
setLoading(false);
return;
}
setLoading(true);
const eventSource = new EventSource(`/api/jobs/${jobId}/events`);
eventSource.onmessage = (e) => {
try {
const event = JSON.parse(e.data) as JobEvent;
setEvents(prev => [...prev, event]);
setLoading(false);
} catch (err) {
console.error('Failed to parse job event:', err);
}
};
eventSource.onerror = (err) => {
console.error('EventSource error:', err);
setError('Connection lost. Attempting to reconnect...');
eventSource.close();
};
return () => {
eventSource.close();
};
}, [jobId]);
return { events, loading, error };
}Impact: emprops-api can now receive real-time job events without WebSocket connection.
Phase 2: Convert Commands to HTTP (1-2 Days)
Objective: Replace WebSocket commands with HTTP REST API calls.
2.1 Create HTTP Client for Job Commands
File: apps/emprops-studio/src/lib/job-api-client.ts (NEW)
import { JobData } from '@emp/core/types';
const API_BASE_URL = process.env.NEXT_PUBLIC_EMP_JOB_QUEUE_API || 'http://localhost:3000';
export class JobAPIClient {
/**
* Submit a new job to the queue
*/
static async submitJob(jobData: Partial<JobData>): Promise<{ job_id: string }> {
const response = await fetch(`${API_BASE_URL}/api/jobs`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(jobData),
});
if (!response.ok) {
throw new Error(`Failed to submit job: ${response.statusText}`);
}
return response.json();
}
/**
* Cancel a running job
*/
static async cancelJob(jobId: string): Promise<void> {
const response = await fetch(`${API_BASE_URL}/api/jobs/${jobId}/cancel`, {
method: 'POST',
});
if (!response.ok) {
throw new Error(`Failed to cancel job: ${response.statusText}`);
}
}
/**
* Get current job state
*/
static async getJobState(jobId: string): Promise<JobData> {
const response = await fetch(`${API_BASE_URL}/api/jobs/${jobId}`);
if (!response.ok) {
throw new Error(`Failed to get job state: ${response.statusText}`);
}
return response.json();
}
}2.2 Update Job Submission Flow in emprops-studio
Before (WebSocket):
// Old WebSocket-based job submission
websocketClient.send({
type: 'submit_job',
payload: jobData,
});
// Listen for response
websocketClient.on('job_submitted', (event) => {
console.log('Job submitted:', event.job_id);
});After (HTTP + Redis Events):
// Submit job via HTTP
const { job_id } = await JobAPIClient.submitJob(jobData);
// Subscribe to job events via Redis (Server-Sent Events)
const { events } = useJobEvents(job_id);
// React to events
useEffect(() => {
const latestEvent = events[events.length - 1];
if (latestEvent?.event_type === 'completion') {
console.log('Job completed:', latestEvent);
}
}, [events]);Impact: Command flow is now stateless HTTP, events are real-time via Redis.
Phase 3: Remove WebSocket Infrastructure (1 Day)
Objective: Clean up WebSocket code from emprops-api and optionally from lightweight-api-server.
3.1 Remove WebSocket Client from emprops-api
Files to Remove:
apps/emprops-studio/src/services/websocket.tsapps/emprops-studio/src/hooks/useWebSocket.ts(if exists)
Files to Update:
- Remove WebSocket imports from components
- Update state management to use
useJobEventshook instead
3.2 (Optional) Remove WebSocket Server from lightweight-api-server
Decision Point: The monitor application still uses WebSocket for real-time updates. Options:
Option A: Keep WebSocket for Monitor Only
- lightweight-api-server continues broadcasting events via WebSocket
- Only monitor application connects via WebSocket
- emprops-api uses Redis directly
Option B: Migrate Monitor to Redis Events Too
- Remove WebSocket entirely from lightweight-api-server
- Monitor uses same Redis event consumption pattern as emprops-api
- Fully unified event architecture
Recommendation: Option A for now (incremental migration), Option B for future work.
Impact: Simplified emprops-api codebase, reduced connection management overhead.
Phase 4: Testing & Validation (2 Days)
4.1 Integration Tests
Test Suite: apps/emprops-studio/src/__tests__/job-events-integration.test.ts (NEW)
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { JobAPIClient } from '@/lib/job-api-client';
import { jobEventConsumer } from '@/lib/job-event-consumer';
describe('Job Events Integration', () => {
beforeAll(async () => {
await jobEventConsumer.initialize();
});
it('should receive job events after submission', async () => {
// Submit job via HTTP
const { job_id } = await JobAPIClient.submitJob({
job_type: 'test',
payload: { test: true },
});
// Subscribe to job events
const receivedEvents: any[] = [];
const unsubscribe = jobEventConsumer.subscribeToJob(job_id, (event) => {
receivedEvents.push(event);
});
// Wait for events
await new Promise(resolve => setTimeout(resolve, 2000));
// Verify events received
expect(receivedEvents.length).toBeGreaterThan(0);
expect(receivedEvents[0].type).toBe('job_submitted');
unsubscribe();
});
it('should handle job cancellation via HTTP', async () => {
const { job_id } = await JobAPIClient.submitJob({
job_type: 'test',
payload: { test: true },
});
await JobAPIClient.cancelJob(job_id);
const state = await JobAPIClient.getJobState(job_id);
expect(state.status).toBe('cancelled');
});
});4.2 Load Testing
Verify:
- 100+ concurrent job submissions via HTTP (should scale better than WebSocket)
- Event delivery latency (should be comparable or better than WebSocket)
- No message loss during high load
4.3 Failure Scenarios
Test:
- Redis connection loss → emprops-api should reconnect automatically
- lightweight-api-server restart → emprops-api events should resume
- Network interruption → Server-Sent Events auto-reconnect
Consequences
Benefits
1. Architectural Simplicity ✅
- Direct event consumption from Redis (source of truth)
- Eliminates WebSocket layer (fewer moving parts)
- Standard HTTP semantics for commands (stateless, cacheable, scalable)
2. Better Scalability ✅
- Horizontal scaling of emprops-api without WebSocket sticky sessions
- No connection pooling limits (HTTP connections are short-lived)
- Redis pub/sub scales to thousands of subscribers
3. Improved Reliability ✅
- No WebSocket state management (no reconnection logic, heartbeats, sync)
- Guaranteed event delivery via Redis Streams (persistent, ordered log)
- Consumer groups for fault tolerance (multiple emprops-api instances can share load)
4. Reduced Latency ✅
- Eliminates hop through lightweight-api-server WebSocket broadcast
- Events arrive directly from Redis to emprops-api
- Faster command responses (HTTP request/response vs. WebSocket round-trip)
5. Better Observability ✅
- HTTP access logs for all commands (standard logging infrastructure)
- Redis monitoring for event throughput and lag
- No WebSocket connection state to track
6. Alignment with Redis-First Architecture ✅
- Redis is already the source of truth for all state
- Direct consumption of Redis events matches monitor architecture goal
- Unified event system across all applications
Drawbacks
1. Server-Sent Events Limitations
Issue: Server-Sent Events (SSE) have browser connection limits (6 per domain in most browsers).
Mitigation:
- Use single SSE connection per job (not per event type)
- Close SSE connections when job completes
- For multiple concurrent jobs, multiplex events through single connection
2. Next.js API Route for Event Streaming
Issue: Next.js API routes must handle long-lived connections for SSE.
Mitigation:
- Use Node.js runtime (not Edge runtime) for SSE routes
- Implement proper cleanup on client disconnect
- Monitor memory usage for long-running SSE connections
3. Migration Complexity
Issue: emprops-api must be updated to use new event consumption pattern.
Mitigation:
- Incremental rollout (Phase 1 → Phase 2 → Phase 3)
- Run both systems in parallel during migration
- Feature flags to switch between WebSocket and Redis events
4. Redis Dependency
Issue: emprops-api now directly depends on Redis availability.
Mitigation:
- Redis is already a critical dependency (source of truth)
- Implement retry logic with exponential backoff
- Monitor Redis health and alert on connection issues
Migration Strategy
Week 1: Redis Event Infrastructure
- Implement Redis client singleton in emprops-api
- Create
JobEventConsumerfor job event subscriptions - Add Next.js API route for Server-Sent Events
- Create
useJobEventsReact hook - Test event delivery from Redis → emprops-api
Week 2: HTTP Command Layer
- Implement
JobAPIClientfor HTTP commands - Update job submission flow to use HTTP POST
- Update job cancellation to use HTTP POST
- Add error handling for HTTP failures
- Test command execution via HTTP
Week 3: Parallel Operation
- Run both systems in parallel (WebSocket + Redis events)
- Feature flag to switch between event sources
- Compare event delivery reliability and latency
- Identify and fix any issues with Redis event consumption
Week 4: WebSocket Removal
- Switch emprops-api to Redis events only
- Remove WebSocket client code from emprops-api
- Monitor production metrics (latency, error rates, event delivery)
- (Optional) Remove WebSocket server from lightweight-api-server if monitor also migrated
Success Metrics
Quantitative
Event Delivery Latency
- Target: < 100ms from Redis publish to emprops-api handler
- Baseline: Current WebSocket latency ~150-200ms (includes intermediary hop)
Command Response Time
- Target: HTTP POST /api/jobs < 50ms (vs. WebSocket round-trip ~100ms)
- Baseline: Current WebSocket command latency
Scalability
- Target: Support 10+ concurrent emprops-api instances (horizontal scaling)
- Baseline: WebSocket requires sticky sessions
Reliability
- Target: Zero event loss during normal operation
- Target: < 1 second event delivery lag during Redis reconnection
- Baseline: Current WebSocket reconnection lag ~2-5 seconds
Qualitative
Codebase Simplicity
- Reduced lines of code (WebSocket client/server removal)
- Fewer dependencies (no WebSocket libraries)
- Standard HTTP/Redis patterns (easier onboarding)
Developer Experience
- Easier debugging (HTTP access logs, Redis monitoring)
- Standard tooling (curl for commands, redis-cli for events)
- Clear separation of concerns (commands vs. events)
Alternative Approaches Considered
❌ Alternative 1: Keep WebSocket for Everything
Why Rejected:
- Tight coupling between emprops-api and lightweight-api-server
- Scalability issues with WebSocket sticky sessions
- Duplicate event infrastructure (Redis pub/sub + WebSocket broadcast)
- Connection management overhead
❌ Alternative 2: Use WebSocket for Events, HTTP for Commands
Why Rejected:
- Still requires WebSocket connection state management
- Doesn't simplify architecture (two communication channels)
- WebSocket doesn't provide benefits over Redis pub/sub for events
❌ Alternative 3: Polling for Events
Why Rejected:
- Inefficient (constant HTTP requests)
- Higher latency (poll interval delay)
- Increased server load (unnecessary requests)
✅ Alternative 4: HTTP Commands + Redis Events (THIS ADR)
Why Chosen:
- Eliminates WebSocket entirely (simpler architecture)
- Direct Redis consumption (aligns with source of truth)
- Standard HTTP semantics (scalable, cacheable, well-understood)
- Better reliability (Redis Streams for guaranteed delivery)
- Incremental migration (can run both systems in parallel)
Related ADRs
- LOGGING_ARCHITECTURE.md - Redis log streaming (Phase 2 implemented)
- monitor-events.ts - Event type definitions for real-time monitoring
Dependencies
Infrastructure
- Redis 6.0+ (for Streams and pub/sub)
- Next.js 14+ (for App Router API routes with streaming support)
- ioredis (Node.js Redis client)
Code Changes
apps/emprops-studio/src/lib/redis-client.ts(NEW)apps/emprops-studio/src/lib/job-event-consumer.ts(NEW)apps/emprops-studio/src/lib/job-api-client.ts(NEW)apps/emprops-studio/src/hooks/useJobEvents.ts(NEW)apps/emprops-studio/src/app/api/jobs/[jobId]/events/route.ts(NEW)apps/emprops-studio/src/services/websocket.ts(REMOVED)
Appendix: Redis Event Patterns
Job Events (Redis Streams)
Pattern: job:events:{job_id}
Why Streams:
- Ordered event log (chronological history)
- Persistent (events survive Redis restart)
- Consumer groups (multiple consumers can share load)
- Acknowledgement (exactly-once delivery)
Example:
# Worker publishes job progress
XADD job:events:abc123 * event_type progress progress 50 timestamp 1704672000
# emprops-api consumes events
XREAD BLOCK 0 STREAMS job:events:abc123 $Machine Logs (Redis Pub/Sub)
Pattern: machine:*:logs
Why Pub/Sub:
- Broadcast to all interested subscribers
- Low latency (no persistence overhead)
- Pattern-based subscription (machine:gpu-1:logs, machine:cpu-2:logs)
Example:
# Machine publishes log
PUBLISH machine:gpu-1:logs '{"level":"info","message":"ComfyUI started"}'
# emprops-api subscribes (if needed)
PSUBSCRIBE machine:*:logsEnd of ADR
