Skip to content

gRPC Transition: emprops-api ↔ job-api Client WebSocket Replacement

Status: Proposed Date: 2025-12-01 Author: System Architecture Scope: Replacing the Client WebSocket connection between emprops-api and job-api with gRPC


Context

The emprops-api service currently communicates with job-api via a WebSocket connection (the "Client WebSocket"). This is used for:

  • Submitting jobs to the queue
  • Receiving job progress updates
  • Receiving job completion/failure notifications
  • Health check ping/pong

Current Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    emprops-api (Next.js)                        │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  RedisServerClient (WebSocket)                            │  │
│  │  - ws://job-api/ws/client/{clientId}                      │  │
│  │  - submit_job, complete_job, update_job_progress          │  │
│  │  - ping/pong health checks                                │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
                         ↓ ↑
                    WebSocket (ws://)
                         ↓ ↑
┌─────────────────────────────────────────────────────────────────┐
│           job-api (emp-job-queue)                               │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  WebSocket Server (/ws/client/*)                          │  │
│  │  - handleClientConnection()                               │  │
│  │  - handleClientMessage()                                  │  │
│  │  - EventBroadcaster for job updates                       │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
                              ↓ ↑
                         Redis (Queue + Events)
                              ↓ ↑
┌─────────────────────────────────────────────────────────────────┐
│                    Workers (distributed)                         │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  Redis Client (ALL communication via Redis)               │  │
│  │  - BRPOPLPUSH job claims                                  │  │
│  │  - PUBLISH job progress/completion                        │  │
│  │  - worker:commands:* subscription                         │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

Important: Workers communicate exclusively via Redis. This ADR only covers the emprops-api ↔ job-api connection.

What We're Replacing

File: apps/emprops-api/src/clients/redis-server-client.ts

The RedisServerClient class:

  • Singleton WebSocket client connecting to ws://[host]/ws/client/{clientId}?token={token}
  • Methods: runComfyPrompt(), runAuto1111Prompt(), submitJob(), getJobStatusAttestation()
  • Handles reconnection with exponential backoff (1s-30s, max 10 attempts)
  • Health checks via ping/pong (15s interval)
  • Job timeouts (2 minute default)
  • Telemetry integration for metrics

Current Problems

1. WebSocket Complexity

  • Custom reconnection logic with exponential backoff
  • Manual ping/pong health checks
  • Connection state management across the singleton
  • Difficult to test and debug

2. No Type Safety

  • JSON messages with no compile-time validation
  • Easy to break contract between services
  • Runtime errors when message shapes change

3. Single Point of Failure

  • One WebSocket connection for all job operations
  • Connection loss affects all in-flight jobs
  • No built-in load balancing

Decision

Replace the Client WebSocket with gRPC using the Connect framework for TypeScript.

Why gRPC over WebSocket?

AspectWebSocket (Current)gRPC (Proposed)
Type SafetyNone (JSON)Full (Protobuf)
ReconnectionCustom codeBuilt-in
Health ChecksCustom ping/pongNative
StreamingManualNative server streaming
Code GenNoneAutomatic
DebuggingHard (binary WS frames)grpcurl, Postman

What's NOT Changing

  • ❌ Monitor WebSocket (browser clients) - stays WebSocket
  • ❌ Worker communication - stays Redis only
  • ❌ Redis job queue - unchanged
  • ❌ Redis pub/sub for events - unchanged

Browser/Frontend Boundary

gRPC is for internal service-to-service communication only. Browser clients (monitor app, emprops-studio) continue using WebSocket and REST.

Why Not gRPC for Browsers?

Native gRPC uses HTTP/2 trailers and binary framing that browsers don't fully support. While solutions exist, they add complexity:

SolutionLimitation
gRPC-WebRequires proxy (Envoy), no bidirectional streaming
Connect ProtocolWorks without proxy, but still limited vs native WS

Our Approach

┌─────────────────────────────────────────────────────────────────┐
│  Browser (monitor, emprops-studio)                              │
│    ├── WebSocket → Real-time subscriptions (job progress)       │
│    └── REST/HTTP → Commands (start job, restart machine)        │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│  emprops-api / job-api (Edge)                                   │
│    ├── WebSocket Server → Browser clients                       │
│    └── gRPC Client → Internal services                          │
└─────────────────────────────────────────────────────────────────┘

                         gRPC (internal)

┌─────────────────────────────────────────────────────────────────┐
│  Internal Services (job-api, future microservices)              │
│    └── gRPC Server → Service-to-service                         │
└─────────────────────────────────────────────────────────────────┘

Protocol Selection Guide

Communication PathProtocolReason
Browser → APIWebSocketTrue bidirectional, realtime updates
Browser → API (commands)REST/HTTPSimple request/response
emprops-api → job-apigRPCType safety, streaming, performance
Service → ServicegRPCType safety, streaming, performance
Worker → RedisRedis protocolJob queue, pub/sub (unchanged)

The monitor app's WebSocket connection is not a candidate for gRPC replacement - it already provides true bidirectional streaming that gRPC-Web cannot match.


Proposed Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    emprops-api (Next.js)                        │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  GrpcJobClient (replaces RedisServerClient)               │  │
│  │  - SubmitJob()                                            │  │
│  │  - GetJobStatus()                                         │  │
│  │  - CancelJob()                                            │  │
│  │  - StreamJobUpdates() for progress                        │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
                         ↓ ↑
                    gRPC (HTTP/2)
                         ↓ ↑
┌─────────────────────────────────────────────────────────────────┐
│           job-api (emp-job-queue)                               │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  gRPC Server (JobClientService)                           │  │
│  │  - Runs alongside existing WebSocket server               │  │
│  │  - Reuses existing job submission logic                   │  │
│  │  - Subscribes to Redis for streaming updates              │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
                              ↓ ↑
                         Redis (Queue + Events)
                              ↓ ↑
┌─────────────────────────────────────────────────────────────────┐
│                    Workers (distributed)                         │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  Redis Client (unchanged)                                 │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

Protocol Buffer Definition

File: packages/proto/src/job_client.proto

protobuf
syntax = "proto3";

package emp.job_client.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

// Service for emprops-api → job-api communication
// Replaces the Client WebSocket connection
service JobClientService {
  // Submit a job to the queue (replaces submit_job WebSocket message)
  rpc SubmitJob(SubmitJobRequest) returns (SubmitJobResponse);

  // Get job status (replaces get_job_status WebSocket message)
  rpc GetJobStatus(GetJobStatusRequest) returns (GetJobStatusResponse);

  // Cancel a job (replaces cancel_job WebSocket message)
  rpc CancelJob(CancelJobRequest) returns (CancelJobResponse);

  // Stream job updates (replaces complete_job + update_job_progress WebSocket messages)
  rpc StreamJobUpdates(StreamJobUpdatesRequest) returns (stream JobUpdate);
}

// ============================================================================
// Submit Job
// ============================================================================

message SubmitJobRequest {
  string message_id = 1;           // Client-provided job ID (e.g., "step-{uuid}")
  string job_type = 2;             // "comfyui", "a1111", etc.
  string service_required = 3;     // Worker capability required
  google.protobuf.Struct payload = 4;
  int32 priority = 5;
  google.protobuf.Struct ctx = 6;  // Storage context for workers

  // Workflow tracking
  optional string workflow_id = 7;
  optional int64 workflow_datetime = 8;
  optional int32 current_step = 9;
  optional int32 total_steps = 10;
  optional int32 workflow_priority = 11;
  optional int32 retry_attempt = 12;

  // Distributed tracing
  optional string traceparent = 13;

  // Additional fields
  map<string, string> metadata = 14;
  optional string mime_type = 15;
  optional string output_field = 16;
}

message SubmitJobResponse {
  string job_id = 1;
  JobStatus status = 2;
  google.protobuf.Timestamp queued_at = 3;
}

// ============================================================================
// Get Job Status (Attestation)
// ============================================================================

message GetJobStatusRequest {
  string job_id = 1;
}

message GetJobStatusResponse {
  string job_id = 1;
  bool exists = 2;
  JobStatus status = 3;
  optional google.protobuf.Timestamp timestamp = 4;
  optional google.protobuf.Struct result = 5;
}

// ============================================================================
// Cancel Job
// ============================================================================

message CancelJobRequest {
  string job_id = 1;
  optional string reason = 2;
}

message CancelJobResponse {
  bool success = 1;
  string message = 2;
}

// ============================================================================
// Stream Job Updates (Server Streaming)
// ============================================================================

message StreamJobUpdatesRequest {
  string job_id = 1;
}

message JobUpdate {
  string job_id = 1;
  JobUpdateType type = 2;
  google.protobuf.Timestamp timestamp = 3;

  // Progress update fields
  optional double progress = 4;
  optional string worker_id = 5;

  // Completion fields
  optional JobResult result = 6;
}

message JobResult {
  JobResultStatus status = 1;
  optional google.protobuf.Struct data = 2;
  optional string error = 3;
}

// ============================================================================
// Enums
// ============================================================================

enum JobStatus {
  JOB_STATUS_UNSPECIFIED = 0;
  JOB_STATUS_PENDING = 1;
  JOB_STATUS_QUEUED = 2;
  JOB_STATUS_PROCESSING = 3;
  JOB_STATUS_COMPLETED = 4;
  JOB_STATUS_FAILED = 5;
  JOB_STATUS_CANCELLED = 6;
}

enum JobUpdateType {
  JOB_UPDATE_TYPE_UNSPECIFIED = 0;
  JOB_UPDATE_TYPE_PROGRESS = 1;      // Maps to update_job_progress
  JOB_UPDATE_TYPE_ASSIGNED = 2;      // Maps to job_assigned
  JOB_UPDATE_TYPE_COMPLETED = 3;     // Maps to complete_job (success)
  JOB_UPDATE_TYPE_FAILED = 4;        // Maps to complete_job (failed)
}

enum JobResultStatus {
  JOB_RESULT_STATUS_UNSPECIFIED = 0;
  JOB_RESULT_STATUS_SUCCESS = 1;
  JOB_RESULT_STATUS_FAILED = 2;
}

Implementation Plan

Phase 1: Proto Package (Week 1)

Create packages/proto with:

  • job_client.proto definition above
  • Buf configuration for code generation
  • Generated TypeScript types
bash
mkdir -p packages/proto/src
cd packages/proto
pnpm init

package.json:

json
{
  "name": "@emp/proto",
  "version": "0.0.1",
  "private": true,
  "scripts": {
    "generate": "buf generate",
    "build": "pnpm generate && tsc"
  },
  "dependencies": {
    "@connectrpc/connect": "^1.4.0",
    "@bufbuild/protobuf": "^1.10.0"
  },
  "devDependencies": {
    "@bufbuild/buf": "^1.28.0",
    "@connectrpc/protoc-gen-connect-es": "^1.4.0",
    "@bufbuild/protoc-gen-es": "^1.10.0"
  }
}

Phase 2: gRPC Server in job-api (Week 2)

Add gRPC server to apps/api:

File: apps/api/src/grpc/server.ts

typescript
import { fastifyConnectPlugin } from '@connectrpc/connect-fastify';
import { jobClientServiceImpl } from './services/job-client-service';

// Run on separate port (e.g., 50051) alongside existing WebSocket server

File: apps/api/src/grpc/services/job-client-service.ts

typescript
// Implement JobClientService
// - SubmitJob: Reuse existing submitJob() logic
// - GetJobStatus: Query Redis for job state
// - CancelJob: Publish to job:cancel:{jobId}
// - StreamJobUpdates: Subscribe to Redis job:events:{jobId}

Phase 3: gRPC Client in emprops-api (Week 3)

Create GrpcJobClient to replace RedisServerClient:

File: apps/emprops-api/src/clients/grpc-job-client.ts

typescript
// Same API surface as RedisServerClient:
// - runComfyPrompt()
// - runAuto1111Prompt()
// - submitJob()
// - getJobStatusAttestation()
// - isConnected()
// - getConnectionStats()

Phase 4: Feature Flag & Switchover (Week 4)

File: apps/emprops-api/src/clients/job-client-factory.ts

typescript
export function getJobClient() {
  if (process.env.FEATURE_GRPC_CLIENT === 'true') {
    return GrpcJobClient.getInstance();
  }
  return RedisServerClient.getInstance();
}

Rollout:

  • Set FEATURE_GRPC_CLIENT=true to enable
  • Set FEATURE_GRPC_CLIENT=false to rollback
  • No staged rollout - flip the flag and go

Phase 5: Cleanup (Week 5)

After validation:

  • Remove redis-server-client.ts
  • Remove feature flag, make gRPC the default
  • Update documentation

Message Mapping

WebSocket MessagegRPC Method/Stream
submit_job (outbound)SubmitJob()
get_job_status (outbound)GetJobStatus()
cancel_job (outbound)CancelJob()
complete_job (inbound)StreamJobUpdates (COMPLETED/FAILED)
update_job_progress (inbound)StreamJobUpdates (PROGRESS)
ping/pongNative gRPC keepalive

Consequences

Benefits

  1. Type Safety: Compile-time validation of all messages
  2. Simpler Code: No custom reconnection or health check logic
  3. Better Debugging: grpcurl, Postman support
  4. Native Streaming: Built-in server streaming for job updates
  5. Performance: HTTP/2 multiplexing, binary protocol

Risks

  1. HTTP/2 Requirement: Railway supports it, but verify
  2. Learning Curve: Team needs to learn protobuf/Connect
  3. Migration Risk: Mitigated by feature flag rollback

Dependencies

  • @connectrpc/connect: gRPC client/server
  • @connectrpc/connect-node: Node.js transport
  • @connectrpc/connect-fastify: Fastify plugin
  • @bufbuild/buf: Code generation
  • @bufbuild/protobuf: Runtime library

End of ADR

Released under the MIT License.