gRPC Transition: emprops-api ↔ job-api Client WebSocket Replacement
Status: Proposed Date: 2025-12-01 Author: System Architecture Scope: Replacing the Client WebSocket connection between emprops-api and job-api with gRPC
Context
The emprops-api service currently communicates with job-api via a WebSocket connection (the "Client WebSocket"). This is used for:
- Submitting jobs to the queue
- Receiving job progress updates
- Receiving job completion/failure notifications
- Health check ping/pong
Current Architecture
┌─────────────────────────────────────────────────────────────────┐
│ emprops-api (Next.js) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ RedisServerClient (WebSocket) │ │
│ │ - ws://job-api/ws/client/{clientId} │ │
│ │ - submit_job, complete_job, update_job_progress │ │
│ │ - ping/pong health checks │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓ ↑
WebSocket (ws://)
↓ ↑
┌─────────────────────────────────────────────────────────────────┐
│ job-api (emp-job-queue) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ WebSocket Server (/ws/client/*) │ │
│ │ - handleClientConnection() │ │
│ │ - handleClientMessage() │ │
│ │ - EventBroadcaster for job updates │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓ ↑
Redis (Queue + Events)
↓ ↑
┌─────────────────────────────────────────────────────────────────┐
│ Workers (distributed) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Redis Client (ALL communication via Redis) │ │
│ │ - BRPOPLPUSH job claims │ │
│ │ - PUBLISH job progress/completion │ │
│ │ - worker:commands:* subscription │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘Important: Workers communicate exclusively via Redis. This ADR only covers the emprops-api ↔ job-api connection.
What We're Replacing
File: apps/emprops-api/src/clients/redis-server-client.ts
The RedisServerClient class:
- Singleton WebSocket client connecting to
ws://[host]/ws/client/{clientId}?token={token} - Methods:
runComfyPrompt(),runAuto1111Prompt(),submitJob(),getJobStatusAttestation() - Handles reconnection with exponential backoff (1s-30s, max 10 attempts)
- Health checks via ping/pong (15s interval)
- Job timeouts (2 minute default)
- Telemetry integration for metrics
Current Problems
1. WebSocket Complexity
- Custom reconnection logic with exponential backoff
- Manual ping/pong health checks
- Connection state management across the singleton
- Difficult to test and debug
2. No Type Safety
- JSON messages with no compile-time validation
- Easy to break contract between services
- Runtime errors when message shapes change
3. Single Point of Failure
- One WebSocket connection for all job operations
- Connection loss affects all in-flight jobs
- No built-in load balancing
Decision
Replace the Client WebSocket with gRPC using the Connect framework for TypeScript.
Why gRPC over WebSocket?
| Aspect | WebSocket (Current) | gRPC (Proposed) |
|---|---|---|
| Type Safety | None (JSON) | Full (Protobuf) |
| Reconnection | Custom code | Built-in |
| Health Checks | Custom ping/pong | Native |
| Streaming | Manual | Native server streaming |
| Code Gen | None | Automatic |
| Debugging | Hard (binary WS frames) | grpcurl, Postman |
What's NOT Changing
- ❌ Monitor WebSocket (browser clients) - stays WebSocket
- ❌ Worker communication - stays Redis only
- ❌ Redis job queue - unchanged
- ❌ Redis pub/sub for events - unchanged
Browser/Frontend Boundary
gRPC is for internal service-to-service communication only. Browser clients (monitor app, emprops-studio) continue using WebSocket and REST.
Why Not gRPC for Browsers?
Native gRPC uses HTTP/2 trailers and binary framing that browsers don't fully support. While solutions exist, they add complexity:
| Solution | Limitation |
|---|---|
| gRPC-Web | Requires proxy (Envoy), no bidirectional streaming |
| Connect Protocol | Works without proxy, but still limited vs native WS |
Our Approach
┌─────────────────────────────────────────────────────────────────┐
│ Browser (monitor, emprops-studio) │
│ ├── WebSocket → Real-time subscriptions (job progress) │
│ └── REST/HTTP → Commands (start job, restart machine) │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ emprops-api / job-api (Edge) │
│ ├── WebSocket Server → Browser clients │
│ └── gRPC Client → Internal services │
└─────────────────────────────────────────────────────────────────┘
↓
gRPC (internal)
↓
┌─────────────────────────────────────────────────────────────────┐
│ Internal Services (job-api, future microservices) │
│ └── gRPC Server → Service-to-service │
└─────────────────────────────────────────────────────────────────┘Protocol Selection Guide
| Communication Path | Protocol | Reason |
|---|---|---|
| Browser → API | WebSocket | True bidirectional, realtime updates |
| Browser → API (commands) | REST/HTTP | Simple request/response |
| emprops-api → job-api | gRPC | Type safety, streaming, performance |
| Service → Service | gRPC | Type safety, streaming, performance |
| Worker → Redis | Redis protocol | Job queue, pub/sub (unchanged) |
The monitor app's WebSocket connection is not a candidate for gRPC replacement - it already provides true bidirectional streaming that gRPC-Web cannot match.
Proposed Architecture
┌─────────────────────────────────────────────────────────────────┐
│ emprops-api (Next.js) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ GrpcJobClient (replaces RedisServerClient) │ │
│ │ - SubmitJob() │ │
│ │ - GetJobStatus() │ │
│ │ - CancelJob() │ │
│ │ - StreamJobUpdates() for progress │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓ ↑
gRPC (HTTP/2)
↓ ↑
┌─────────────────────────────────────────────────────────────────┐
│ job-api (emp-job-queue) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ gRPC Server (JobClientService) │ │
│ │ - Runs alongside existing WebSocket server │ │
│ │ - Reuses existing job submission logic │ │
│ │ - Subscribes to Redis for streaming updates │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓ ↑
Redis (Queue + Events)
↓ ↑
┌─────────────────────────────────────────────────────────────────┐
│ Workers (distributed) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Redis Client (unchanged) │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘Protocol Buffer Definition
File: packages/proto/src/job_client.proto
syntax = "proto3";
package emp.job_client.v1;
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
// Service for emprops-api → job-api communication
// Replaces the Client WebSocket connection
service JobClientService {
// Submit a job to the queue (replaces submit_job WebSocket message)
rpc SubmitJob(SubmitJobRequest) returns (SubmitJobResponse);
// Get job status (replaces get_job_status WebSocket message)
rpc GetJobStatus(GetJobStatusRequest) returns (GetJobStatusResponse);
// Cancel a job (replaces cancel_job WebSocket message)
rpc CancelJob(CancelJobRequest) returns (CancelJobResponse);
// Stream job updates (replaces complete_job + update_job_progress WebSocket messages)
rpc StreamJobUpdates(StreamJobUpdatesRequest) returns (stream JobUpdate);
}
// ============================================================================
// Submit Job
// ============================================================================
message SubmitJobRequest {
string message_id = 1; // Client-provided job ID (e.g., "step-{uuid}")
string job_type = 2; // "comfyui", "a1111", etc.
string service_required = 3; // Worker capability required
google.protobuf.Struct payload = 4;
int32 priority = 5;
google.protobuf.Struct ctx = 6; // Storage context for workers
// Workflow tracking
optional string workflow_id = 7;
optional int64 workflow_datetime = 8;
optional int32 current_step = 9;
optional int32 total_steps = 10;
optional int32 workflow_priority = 11;
optional int32 retry_attempt = 12;
// Distributed tracing
optional string traceparent = 13;
// Additional fields
map<string, string> metadata = 14;
optional string mime_type = 15;
optional string output_field = 16;
}
message SubmitJobResponse {
string job_id = 1;
JobStatus status = 2;
google.protobuf.Timestamp queued_at = 3;
}
// ============================================================================
// Get Job Status (Attestation)
// ============================================================================
message GetJobStatusRequest {
string job_id = 1;
}
message GetJobStatusResponse {
string job_id = 1;
bool exists = 2;
JobStatus status = 3;
optional google.protobuf.Timestamp timestamp = 4;
optional google.protobuf.Struct result = 5;
}
// ============================================================================
// Cancel Job
// ============================================================================
message CancelJobRequest {
string job_id = 1;
optional string reason = 2;
}
message CancelJobResponse {
bool success = 1;
string message = 2;
}
// ============================================================================
// Stream Job Updates (Server Streaming)
// ============================================================================
message StreamJobUpdatesRequest {
string job_id = 1;
}
message JobUpdate {
string job_id = 1;
JobUpdateType type = 2;
google.protobuf.Timestamp timestamp = 3;
// Progress update fields
optional double progress = 4;
optional string worker_id = 5;
// Completion fields
optional JobResult result = 6;
}
message JobResult {
JobResultStatus status = 1;
optional google.protobuf.Struct data = 2;
optional string error = 3;
}
// ============================================================================
// Enums
// ============================================================================
enum JobStatus {
JOB_STATUS_UNSPECIFIED = 0;
JOB_STATUS_PENDING = 1;
JOB_STATUS_QUEUED = 2;
JOB_STATUS_PROCESSING = 3;
JOB_STATUS_COMPLETED = 4;
JOB_STATUS_FAILED = 5;
JOB_STATUS_CANCELLED = 6;
}
enum JobUpdateType {
JOB_UPDATE_TYPE_UNSPECIFIED = 0;
JOB_UPDATE_TYPE_PROGRESS = 1; // Maps to update_job_progress
JOB_UPDATE_TYPE_ASSIGNED = 2; // Maps to job_assigned
JOB_UPDATE_TYPE_COMPLETED = 3; // Maps to complete_job (success)
JOB_UPDATE_TYPE_FAILED = 4; // Maps to complete_job (failed)
}
enum JobResultStatus {
JOB_RESULT_STATUS_UNSPECIFIED = 0;
JOB_RESULT_STATUS_SUCCESS = 1;
JOB_RESULT_STATUS_FAILED = 2;
}Implementation Plan
Phase 1: Proto Package (Week 1)
Create packages/proto with:
job_client.protodefinition above- Buf configuration for code generation
- Generated TypeScript types
mkdir -p packages/proto/src
cd packages/proto
pnpm initpackage.json:
{
"name": "@emp/proto",
"version": "0.0.1",
"private": true,
"scripts": {
"generate": "buf generate",
"build": "pnpm generate && tsc"
},
"dependencies": {
"@connectrpc/connect": "^1.4.0",
"@bufbuild/protobuf": "^1.10.0"
},
"devDependencies": {
"@bufbuild/buf": "^1.28.0",
"@connectrpc/protoc-gen-connect-es": "^1.4.0",
"@bufbuild/protoc-gen-es": "^1.10.0"
}
}Phase 2: gRPC Server in job-api (Week 2)
Add gRPC server to apps/api:
File: apps/api/src/grpc/server.ts
import { fastifyConnectPlugin } from '@connectrpc/connect-fastify';
import { jobClientServiceImpl } from './services/job-client-service';
// Run on separate port (e.g., 50051) alongside existing WebSocket serverFile: apps/api/src/grpc/services/job-client-service.ts
// Implement JobClientService
// - SubmitJob: Reuse existing submitJob() logic
// - GetJobStatus: Query Redis for job state
// - CancelJob: Publish to job:cancel:{jobId}
// - StreamJobUpdates: Subscribe to Redis job:events:{jobId}Phase 3: gRPC Client in emprops-api (Week 3)
Create GrpcJobClient to replace RedisServerClient:
File: apps/emprops-api/src/clients/grpc-job-client.ts
// Same API surface as RedisServerClient:
// - runComfyPrompt()
// - runAuto1111Prompt()
// - submitJob()
// - getJobStatusAttestation()
// - isConnected()
// - getConnectionStats()Phase 4: Feature Flag & Switchover (Week 4)
File: apps/emprops-api/src/clients/job-client-factory.ts
export function getJobClient() {
if (process.env.FEATURE_GRPC_CLIENT === 'true') {
return GrpcJobClient.getInstance();
}
return RedisServerClient.getInstance();
}Rollout:
- Set
FEATURE_GRPC_CLIENT=trueto enable - Set
FEATURE_GRPC_CLIENT=falseto rollback - No staged rollout - flip the flag and go
Phase 5: Cleanup (Week 5)
After validation:
- Remove
redis-server-client.ts - Remove feature flag, make gRPC the default
- Update documentation
Message Mapping
| WebSocket Message | gRPC Method/Stream |
|---|---|
submit_job (outbound) | SubmitJob() |
get_job_status (outbound) | GetJobStatus() |
cancel_job (outbound) | CancelJob() |
complete_job (inbound) | StreamJobUpdates (COMPLETED/FAILED) |
update_job_progress (inbound) | StreamJobUpdates (PROGRESS) |
ping/pong | Native gRPC keepalive |
Consequences
Benefits
- Type Safety: Compile-time validation of all messages
- Simpler Code: No custom reconnection or health check logic
- Better Debugging: grpcurl, Postman support
- Native Streaming: Built-in server streaming for job updates
- Performance: HTTP/2 multiplexing, binary protocol
Risks
- HTTP/2 Requirement: Railway supports it, but verify
- Learning Curve: Team needs to learn protobuf/Connect
- Migration Risk: Mitigated by feature flag rollback
Dependencies
@connectrpc/connect: gRPC client/server@connectrpc/connect-node: Node.js transport@connectrpc/connect-fastify: Fastify plugin@bufbuild/buf: Code generation@bufbuild/protobuf: Runtime library
End of ADR
