Skip to content

Observability Reference

The EMP Job Queue implements comprehensive observability using OpenTelemetry, structured logging, and real-time monitoring.

Observability Stack

OpenTelemetry (OTEL)

  • Traces: Distributed request tracing across services
  • Spans: Individual operation timing and context
  • Context Propagation: Trace IDs flow through WebSocket, Redis, HTTP

Collector: Fluent Bit → Fluentd → Backend (Grafana/Jaeger)

Structured Logging

  • Format: JSON with correlation IDs
  • Levels: debug, info, warn, error
  • Outputs: stdout (container logs), file (local dev)

Real-Time Monitoring

  • WebSocket Events: Live system state
  • Monitor UI: Visual dashboard (/apps/monitor)
  • Metrics: Job throughput, worker utilization, queue depth

Telemetry Architecture

Location: /apps/api/src/index.ts, /packages/core/src/telemetry/

Workflow Telemetry Client

Usage:

typescript
import { WorkflowTelemetryClient } from '@emp/core';

// Initialize for workflow
const telemetry = new WorkflowTelemetryClient(
  workflowId,  // Used as trace_id
  totalSteps
);

// Track step execution
await telemetry.executeStepWithTelemetry(
  stepNumber,
  `step-${stepNumber}-execution`,
  async (span) => {
    span.setAttribute('job_id', jobId);
    span.setAttribute('job_type', 'comfyui');
    
    // Execute step
    const result = await processStep(job);
    
    span.setStatus({ code: SpanStatusCode.OK });
    return result;
  }
);

// Finalize workflow
await telemetry.finalizeWorkflow({
  totalJobsSubmitted: 3,
  completedJobs: 3,
  failedJobs: 0
});

Trace Context Propagation

WebSocket Messages:

typescript
{
  type: "job_submitted",
  job_id: "step-1",
  trace_context: {
    trace_id: "workflow-abc",
    span_id: "span-submit-1",
    trace_flags: 1
  }
}

File Locations

  • Telemetry Client: /packages/core/src/telemetry/workflow-telemetry-client.ts
  • OTEL Setup: /apps/api/src/index.ts
  • Logger: /packages/core/src/utils/logger.ts
  • Fluent Bit Config: /apps/api/conf/fluent-bit.conf
  • Entrypoint Scripts: /apps/api/telemetry-entrypoint-functions.sh

See Also

Released under the MIT License.