Internal APIs Reference
Internal communication channels and integration points within the EmProps Job Queue system.
Redis Pub/Sub Channels
Job Progress Channel
Pattern: job:progress:{jobId}
Purpose: Real-time job progress updates from workers to API/monitors.
Publisher: Workers (RedisDirectWorkerClient) Subscribers: API server, monitors
Message Format:
{
progress: number // 0-100
message?: string
timestamp: number
worker_id: string
machine_id: string
}Usage:
// Worker publishes progress
await redis.publish(
`job:progress:${jobId}`,
JSON.stringify({
progress: 75,
message: "Processing layer 3/4",
timestamp: Date.now(),
worker_id: this.workerId,
machine_id: this.machineId
})
);
// API server subscribes
progressSubscriber.subscribe('job:progress:*');
progressSubscriber.on('message', (channel, message) => {
const jobId = channel.split(':')[2];
const data = JSON.parse(message);
eventBroadcaster.broadcast({
type: 'update_job_progress',
job_id: jobId,
...data
});
});Machine Status Channel
Pattern: machine:{machineId}:status
Purpose: Periodic machine status broadcasts (every 15 seconds).
Publisher: Machines (via status reporter) Subscribers: API server, monitors
Message Format:
{
machine_id: string
timestamp: number
workers: {
worker_id: string
connector: string
connector_capabilities: string[]
status: "idle" | "busy" | "offline"
current_job_id?: string
last_seen: number
}[]
services: {
service_id: string
service_type: string
host: string
port: number
status: "online" | "offline"
last_health_check: number
}[]
cumulative_stats: {
total_jobs_completed: number
total_jobs_failed: number
uptime_ms: number
}
}Worker Status Channel
Pattern: worker:status:{workerId}
Purpose: Individual worker status changes.
Publisher: Workers Subscribers: API server, monitors
Message Format:
{
worker_id: string
machine_id: string
status: "idle" | "busy" | "offline"
current_job_id?: string
timestamp: number
}Service Connection Channel
Pattern: service:connection:{serviceId}
Purpose: Service (ComfyUI/Ollama) health status updates.
Message Format:
{
service_id: string
service_type: "comfyui" | "ollama"
host: string
port: number
status: "online" | "offline"
last_health_check: number
machine_id: string
}Webhook Notification System
Implementation: /packages/core/src/services/webhook-notification-service.ts
Webhook Configuration
Storage: Redis (WebhookRedisStorage)
Endpoint Structure:
{
id: string
url: string
secret?: string // HMAC signature
events: WebhookEventType[]
filters?: {
job_types?: string[]
job_priorities?: number[]
machine_ids?: string[]
worker_ids?: string[]
}
headers?: Record<string, string>
retry_config?: {
max_attempts: number // default: 3
initial_delay_ms: number // default: 1000
backoff_multiplier: number // default: 2
max_delay_ms: number // default: 30000
}
active: boolean
disconnected?: boolean // Auto-disconnect after 10 failures
static?: boolean // Never auto-disconnect
consecutive_failures?: number
last_failure_at?: number
last_success_at?: number
}Webhook Event Types
Job Events:
job_submitted- Job submitted to queueupdate_job_progress- Progress updates (throttled 1/sec)complete_job- Job completed successfullyjob_failed- Job failed permanentlycancel_job- Job cancelled
Workflow Events (Generated):
workflow_submitted- First job in workflow submittedworkflow_completed- All workflow steps completedworkflow_failed- Workflow failed (partial/complete)
System Events:
worker_status- Worker status changesmachine_status- Machine status updates
Webhook Payload Format
{
event_type: string
event_id: string // Unique event identifier
timestamp: number
webhook_id: string
data: {
job_id?: string
job_type?: string
job_status?: string
worker_id?: string
machine_id?: string
progress?: number
result?: any
error?: string
// Workflow tracking
workflow_id?: string
workflow_priority?: number
workflow_datetime?: number
total_steps?: number
current_step?: number
}
metadata?: {
retry_attempt?: number
original_timestamp?: number
}
parent_trace_context?: {
trace_id?: string
span_id?: string
}
}Delivery Lifecycle
- Event Occurs (job completes, worker connects, etc.)
- Webhook Service Receives (via EventBroadcaster)
- Filter Check (matches filters?)
- Throttle Check (progress events: 1/sec max)
- Queue Delivery (add to delivery queue)
- HTTP POST (to webhook URL with signature)
- Retry Logic (exponential backoff on failure)
- Auto-Disconnect (after 10 consecutive failures, unless
static: true) - Auto-Reconnect (on next successful delivery)
Workflow Completion Detection
Independent Workflow Tracker:
private workflowTracker = new Map<
string, // workflow_id
{
steps: Map<number, string> // current_step → job_id
completedSteps: Set<number>
failedSteps: Set<number>
stepErrors: Map<number, string>
totalSteps?: number
startTime: number
lastUpdate: number
currentStep?: number
}
>();Detection Logic:
- Track all steps by
workflow_id - On
job_submitted: record total_steps, current_step - On
complete_job: mark step complete - On
job_failed: mark step failed - When
completedSteps.size === totalSteps: emitworkflow_completed - When
failedSteps.size > 0 && (completed + failed === total): emitworkflow_failed
Signature Verification
HMAC-SHA256 Signature:
const signature = crypto
.createHmac('sha256', webhook.secret)
.update(JSON.stringify(payload))
.digest('hex');
headers['X-Webhook-Signature'] = signature;Recipient Verification:
const receivedSig = request.headers['x-webhook-signature'];
const computedSig = crypto
.createHmac('sha256', secret)
.update(request.body)
.digest('hex');
if (receivedSig !== computedSig) {
throw new Error('Invalid signature');
}Retry Configuration
Default Retry Strategy:
{
max_attempts: 3,
initial_delay_ms: 1000, // 1 second
backoff_multiplier: 2, // exponential
max_delay_ms: 30000 // 30 seconds cap
}Backoff Calculation:
const delay = Math.min(
initial_delay_ms * Math.pow(backoff_multiplier, attempt - 1),
max_delay_ms
);
// Attempt 1: 1s
// Attempt 2: 2s
// Attempt 3: 4sAuto-Disconnect Mechanism
Failure Tracking:
// On delivery failure
consecutive_failures++
last_failure_at = Date.now()
if (consecutive_failures >= 10 && !webhook.static) {
webhook.disconnected = true
logger.warn('Webhook auto-disconnected', { url, failures })
}
// On delivery success
consecutive_failures = 0
last_success_at = Date.now()
if (webhook.disconnected) {
webhook.disconnected = false
logger.info('Webhook reconnected', { url })
}EmProps API Integration
Primary Webhook Endpoint:
// Environment configuration
EMPROPS_API_URL=https://api.emprops.com
EMPROPS_API_KEY=<service-key>
// Webhook registration (automatic)
{
id: 'emprops-api-primary',
url: `${EMPROPS_API_URL}/api/jobs/{job_id}/webhook`,
events: ['complete_job', 'job_failed', 'update_job_progress'],
static: true, // Never auto-disconnect
headers: {
'X-API-Key': EMPROPS_API_KEY
}
}Failsafe Mechanism: When webhook fails, workers can notify EmProps API directly via REST:
// POST /api/jobs/:id/complete
{
outputs: any,
metadata: any,
workflow_output: string // Required
}Redis Functions (Job Matching)
Location: /packages/core/src/redis-functions/
findMatchingJob Function
Purpose: Atomic job matching and claiming with capability filtering.
Lua Script Logic:
function findMatchingJob(workerCapabilities)
-- 1. Get pending jobs from queue (ZRANGE)
-- 2. Filter by requirements:
-- - Connector type match
-- - Model availability
-- - Custom node availability
-- 3. Atomically claim first match (ZADD to claimed set)
-- 4. Return job data
-- Future: Pool-aware routing
-- - Fast Lane / Standard / Heavy pool selection
-- - Model affinity routing
-- - Cross-pool fallback
endInvocation:
const job = await redis.fcall(
'findMatchingJob',
0, // numkeys
JSON.stringify({
connector: 'comfyui_remote',
connector_capabilities: ['text_to_image', 'upscale'],
models: ['sd_xl_base_1.0.safetensors'],
custom_nodes: ['ComfyUI_essentials']
})
);Response:
{
id: string
type: string
payload: any
requirements: {
models: string[]
custom_nodes: string[]
connector: string
}
workflow_context?: {
workflow_id: string
workflow_priority: number
total_steps: number
current_step: number
}
}Redis Key Patterns
Job Storage
job:{jobId}(hash) - Job datajob:pending(sorted set) - Pending jobs (score: priority)job:claimed(sorted set) - Claimed jobs (score: claim timestamp)job:processing(sorted set) - Processing jobsjob:completed(sorted set) - Completed jobs (TTL: 1 hour)job:failed(sorted set) - Failed jobs (TTL: 24 hours)
Worker Registry
worker:{workerId}(hash) - Worker metadataworker:{workerId}:capabilities(hash) - Worker capabilitiesworker:active(set) - Active worker IDsworker:heartbeat:{workerId}(string, TTL: 60s) - Heartbeat tracking
Machine Registry
machine:{machineId}(hash) - Machine metadatamachine:{machineId}:workers(set) - Worker IDs on machinemachine:{machineId}:services(hash) - Service connection detailsmachine:active(set) - Active machine IDs
Webhook Storage
webhook:{webhookId}(hash) - Webhook configurationwebhook:active(set) - Active webhook IDswebhook:delivery:{eventId}(hash) - Delivery attempt metadatawebhook:retry:{eventId}(list) - Retry queue
Observability Integration
OpenTelemetry Trace Context
Webhook Trace Hierarchy:
// Parent: Job execution span
{
trace_id: "abc123",
span_id: "span-job-001"
}
// Child: Webhook delivery span
{
trace_id: "abc123", // Same trace
parent_span_id: "span-job-001",
span_id: "span-webhook-001"
}Propagation via Webhook Payload:
{
parent_trace_context: {
trace_id: "abc123",
span_id: "span-job-001"
}
}Redis Event Telemetry
Location: /apps/api/src/index.ts
// Publish with trace context
await redis.publish(
`job:progress:${jobId}`,
JSON.stringify({
progress: 50,
trace_context: {
trace_id: currentTraceId,
span_id: currentSpanId
}
})
);Error Handling
Redis Connection Failures
// Auto-reconnect with retry logic
redis.on('error', (error) => {
logger.warn('Redis error (auto-reconnecting)', { error });
});
redis.on('reconnecting', (ms) => {
logger.info('Redis reconnecting', { delay_ms: ms });
});Webhook Delivery Failures
{
id: "attempt-123",
webhook_id: "hook-abc",
event_id: "event-xyz",
attempt_number: 2,
success: false,
response_status: 500,
error_message: "Connection timeout",
next_retry_at: 1234567890
}Pub/Sub Message Validation
try {
const message = JSON.parse(rawMessage);
if (!message.timestamp || !message.job_id) {
throw new Error('Invalid message format');
}
processMessage(message);
} catch (error) {
logger.error('Invalid pub/sub message', { error, rawMessage });
}File Locations
- Webhook Service:
/packages/core/src/services/webhook-notification-service.ts - Webhook Storage:
/packages/core/src/services/webhook-redis-storage.ts - Event Broadcaster:
/packages/core/src/services/event-broadcaster.ts - Redis Functions:
/packages/core/src/redis-functions/ - Progress Streaming:
/apps/api/src/lightweight-api-server.ts
See Also
- REST API Reference - HTTP endpoints
- WebSocket API Reference - Real-time events
- Architecture Overview - System design
- Observability Reference - Trace propagation
