Skip to content

Internal APIs Reference

Internal communication channels and integration points within the EmProps Job Queue system.

Redis Pub/Sub Channels

Job Progress Channel

Pattern: job:progress:{jobId}

Purpose: Real-time job progress updates from workers to API/monitors.

Publisher: Workers (RedisDirectWorkerClient) Subscribers: API server, monitors

Message Format:

typescript
{
  progress: number  // 0-100
  message?: string
  timestamp: number
  worker_id: string
  machine_id: string
}

Usage:

typescript
// Worker publishes progress
await redis.publish(
  `job:progress:${jobId}`,
  JSON.stringify({
    progress: 75,
    message: "Processing layer 3/4",
    timestamp: Date.now(),
    worker_id: this.workerId,
    machine_id: this.machineId
  })
);

// API server subscribes
progressSubscriber.subscribe('job:progress:*');
progressSubscriber.on('message', (channel, message) => {
  const jobId = channel.split(':')[2];
  const data = JSON.parse(message);
  eventBroadcaster.broadcast({
    type: 'update_job_progress',
    job_id: jobId,
    ...data
  });
});

Machine Status Channel

Pattern: machine:{machineId}:status

Purpose: Periodic machine status broadcasts (every 15 seconds).

Publisher: Machines (via status reporter) Subscribers: API server, monitors

Message Format:

typescript
{
  machine_id: string
  timestamp: number
  workers: {
    worker_id: string
    connector: string
    connector_capabilities: string[]
    status: "idle" | "busy" | "offline"
    current_job_id?: string
    last_seen: number
  }[]
  services: {
    service_id: string
    service_type: string
    host: string
    port: number
    status: "online" | "offline"
    last_health_check: number
  }[]
  cumulative_stats: {
    total_jobs_completed: number
    total_jobs_failed: number
    uptime_ms: number
  }
}

Worker Status Channel

Pattern: worker:status:{workerId}

Purpose: Individual worker status changes.

Publisher: Workers Subscribers: API server, monitors

Message Format:

typescript
{
  worker_id: string
  machine_id: string
  status: "idle" | "busy" | "offline"
  current_job_id?: string
  timestamp: number
}

Service Connection Channel

Pattern: service:connection:{serviceId}

Purpose: Service (ComfyUI/Ollama) health status updates.

Message Format:

typescript
{
  service_id: string
  service_type: "comfyui" | "ollama"
  host: string
  port: number
  status: "online" | "offline"
  last_health_check: number
  machine_id: string
}

Webhook Notification System

Implementation: /packages/core/src/services/webhook-notification-service.ts

Webhook Configuration

Storage: Redis (WebhookRedisStorage)

Endpoint Structure:

typescript
{
  id: string
  url: string
  secret?: string  // HMAC signature
  events: WebhookEventType[]
  filters?: {
    job_types?: string[]
    job_priorities?: number[]
    machine_ids?: string[]
    worker_ids?: string[]
  }
  headers?: Record<string, string>
  retry_config?: {
    max_attempts: number  // default: 3
    initial_delay_ms: number  // default: 1000
    backoff_multiplier: number  // default: 2
    max_delay_ms: number  // default: 30000
  }
  active: boolean
  disconnected?: boolean  // Auto-disconnect after 10 failures
  static?: boolean  // Never auto-disconnect
  consecutive_failures?: number
  last_failure_at?: number
  last_success_at?: number
}

Webhook Event Types

Job Events:

  • job_submitted - Job submitted to queue
  • update_job_progress - Progress updates (throttled 1/sec)
  • complete_job - Job completed successfully
  • job_failed - Job failed permanently
  • cancel_job - Job cancelled

Workflow Events (Generated):

  • workflow_submitted - First job in workflow submitted
  • workflow_completed - All workflow steps completed
  • workflow_failed - Workflow failed (partial/complete)

System Events:

  • worker_status - Worker status changes
  • machine_status - Machine status updates

Webhook Payload Format

typescript
{
  event_type: string
  event_id: string  // Unique event identifier
  timestamp: number
  webhook_id: string
  data: {
    job_id?: string
    job_type?: string
    job_status?: string
    worker_id?: string
    machine_id?: string
    progress?: number
    result?: any
    error?: string
    // Workflow tracking
    workflow_id?: string
    workflow_priority?: number
    workflow_datetime?: number
    total_steps?: number
    current_step?: number
  }
  metadata?: {
    retry_attempt?: number
    original_timestamp?: number
  }
  parent_trace_context?: {
    trace_id?: string
    span_id?: string
  }
}

Delivery Lifecycle

  1. Event Occurs (job completes, worker connects, etc.)
  2. Webhook Service Receives (via EventBroadcaster)
  3. Filter Check (matches filters?)
  4. Throttle Check (progress events: 1/sec max)
  5. Queue Delivery (add to delivery queue)
  6. HTTP POST (to webhook URL with signature)
  7. Retry Logic (exponential backoff on failure)
  8. Auto-Disconnect (after 10 consecutive failures, unless static: true)
  9. Auto-Reconnect (on next successful delivery)

Workflow Completion Detection

Independent Workflow Tracker:

typescript
private workflowTracker = new Map<
  string,  // workflow_id
  {
    steps: Map<number, string>  // current_step → job_id
    completedSteps: Set<number>
    failedSteps: Set<number>
    stepErrors: Map<number, string>
    totalSteps?: number
    startTime: number
    lastUpdate: number
    currentStep?: number
  }
>();

Detection Logic:

  1. Track all steps by workflow_id
  2. On job_submitted: record total_steps, current_step
  3. On complete_job: mark step complete
  4. On job_failed: mark step failed
  5. When completedSteps.size === totalSteps: emit workflow_completed
  6. When failedSteps.size > 0 && (completed + failed === total): emit workflow_failed

Signature Verification

HMAC-SHA256 Signature:

typescript
const signature = crypto
  .createHmac('sha256', webhook.secret)
  .update(JSON.stringify(payload))
  .digest('hex');

headers['X-Webhook-Signature'] = signature;

Recipient Verification:

typescript
const receivedSig = request.headers['x-webhook-signature'];
const computedSig = crypto
  .createHmac('sha256', secret)
  .update(request.body)
  .digest('hex');

if (receivedSig !== computedSig) {
  throw new Error('Invalid signature');
}

Retry Configuration

Default Retry Strategy:

typescript
{
  max_attempts: 3,
  initial_delay_ms: 1000,  // 1 second
  backoff_multiplier: 2,   // exponential
  max_delay_ms: 30000      // 30 seconds cap
}

Backoff Calculation:

typescript
const delay = Math.min(
  initial_delay_ms * Math.pow(backoff_multiplier, attempt - 1),
  max_delay_ms
);

// Attempt 1: 1s
// Attempt 2: 2s
// Attempt 3: 4s

Auto-Disconnect Mechanism

Failure Tracking:

typescript
// On delivery failure
consecutive_failures++
last_failure_at = Date.now()

if (consecutive_failures >= 10 && !webhook.static) {
  webhook.disconnected = true
  logger.warn('Webhook auto-disconnected', { url, failures })
}

// On delivery success
consecutive_failures = 0
last_success_at = Date.now()

if (webhook.disconnected) {
  webhook.disconnected = false
  logger.info('Webhook reconnected', { url })
}

EmProps API Integration

Primary Webhook Endpoint:

typescript
// Environment configuration
EMPROPS_API_URL=https://api.emprops.com
EMPROPS_API_KEY=<service-key>

// Webhook registration (automatic)
{
  id: 'emprops-api-primary',
  url: `${EMPROPS_API_URL}/api/jobs/{job_id}/webhook`,
  events: ['complete_job', 'job_failed', 'update_job_progress'],
  static: true,  // Never auto-disconnect
  headers: {
    'X-API-Key': EMPROPS_API_KEY
  }
}

Failsafe Mechanism: When webhook fails, workers can notify EmProps API directly via REST:

typescript
// POST /api/jobs/:id/complete
{
  outputs: any,
  metadata: any,
  workflow_output: string  // Required
}

Redis Functions (Job Matching)

Location: /packages/core/src/redis-functions/

findMatchingJob Function

Purpose: Atomic job matching and claiming with capability filtering.

Lua Script Logic:

lua
function findMatchingJob(workerCapabilities)
  -- 1. Get pending jobs from queue (ZRANGE)
  -- 2. Filter by requirements:
  --    - Connector type match
  --    - Model availability
  --    - Custom node availability
  -- 3. Atomically claim first match (ZADD to claimed set)
  -- 4. Return job data
  
  -- Future: Pool-aware routing
  --   - Fast Lane / Standard / Heavy pool selection
  --   - Model affinity routing
  --   - Cross-pool fallback
end

Invocation:

typescript
const job = await redis.fcall(
  'findMatchingJob',
  0,  // numkeys
  JSON.stringify({
    connector: 'comfyui_remote',
    connector_capabilities: ['text_to_image', 'upscale'],
    models: ['sd_xl_base_1.0.safetensors'],
    custom_nodes: ['ComfyUI_essentials']
  })
);

Response:

typescript
{
  id: string
  type: string
  payload: any
  requirements: {
    models: string[]
    custom_nodes: string[]
    connector: string
  }
  workflow_context?: {
    workflow_id: string
    workflow_priority: number
    total_steps: number
    current_step: number
  }
}

Redis Key Patterns

Job Storage

  • job:{jobId} (hash) - Job data
  • job:pending (sorted set) - Pending jobs (score: priority)
  • job:claimed (sorted set) - Claimed jobs (score: claim timestamp)
  • job:processing (sorted set) - Processing jobs
  • job:completed (sorted set) - Completed jobs (TTL: 1 hour)
  • job:failed (sorted set) - Failed jobs (TTL: 24 hours)

Worker Registry

  • worker:{workerId} (hash) - Worker metadata
  • worker:{workerId}:capabilities (hash) - Worker capabilities
  • worker:active (set) - Active worker IDs
  • worker:heartbeat:{workerId} (string, TTL: 60s) - Heartbeat tracking

Machine Registry

  • machine:{machineId} (hash) - Machine metadata
  • machine:{machineId}:workers (set) - Worker IDs on machine
  • machine:{machineId}:services (hash) - Service connection details
  • machine:active (set) - Active machine IDs

Webhook Storage

  • webhook:{webhookId} (hash) - Webhook configuration
  • webhook:active (set) - Active webhook IDs
  • webhook:delivery:{eventId} (hash) - Delivery attempt metadata
  • webhook:retry:{eventId} (list) - Retry queue

Observability Integration

OpenTelemetry Trace Context

Webhook Trace Hierarchy:

typescript
// Parent: Job execution span
{
  trace_id: "abc123",
  span_id: "span-job-001"
}

// Child: Webhook delivery span
{
  trace_id: "abc123",  // Same trace
  parent_span_id: "span-job-001",
  span_id: "span-webhook-001"
}

Propagation via Webhook Payload:

typescript
{
  parent_trace_context: {
    trace_id: "abc123",
    span_id: "span-job-001"
  }
}

Redis Event Telemetry

Location: /apps/api/src/index.ts

typescript
// Publish with trace context
await redis.publish(
  `job:progress:${jobId}`,
  JSON.stringify({
    progress: 50,
    trace_context: {
      trace_id: currentTraceId,
      span_id: currentSpanId
    }
  })
);

Error Handling

Redis Connection Failures

typescript
// Auto-reconnect with retry logic
redis.on('error', (error) => {
  logger.warn('Redis error (auto-reconnecting)', { error });
});

redis.on('reconnecting', (ms) => {
  logger.info('Redis reconnecting', { delay_ms: ms });
});

Webhook Delivery Failures

typescript
{
  id: "attempt-123",
  webhook_id: "hook-abc",
  event_id: "event-xyz",
  attempt_number: 2,
  success: false,
  response_status: 500,
  error_message: "Connection timeout",
  next_retry_at: 1234567890
}

Pub/Sub Message Validation

typescript
try {
  const message = JSON.parse(rawMessage);
  if (!message.timestamp || !message.job_id) {
    throw new Error('Invalid message format');
  }
  processMessage(message);
} catch (error) {
  logger.error('Invalid pub/sub message', { error, rawMessage });
}

File Locations

  • Webhook Service: /packages/core/src/services/webhook-notification-service.ts
  • Webhook Storage: /packages/core/src/services/webhook-redis-storage.ts
  • Event Broadcaster: /packages/core/src/services/event-broadcaster.ts
  • Redis Functions: /packages/core/src/redis-functions/
  • Progress Streaming: /apps/api/src/lightweight-api-server.ts

See Also

Released under the MIT License.