Observability Reference
The EMP Job Queue implements comprehensive observability using OpenTelemetry, structured logging, and real-time monitoring.
Observability Stack
OpenTelemetry (OTEL)
- Traces: Distributed request tracing across services
- Spans: Individual operation timing and context
- Context Propagation: Trace IDs flow through WebSocket, Redis, HTTP
Collector: Fluent Bit → Fluentd → Backend (Grafana/Jaeger)
Structured Logging
- Format: JSON with correlation IDs
- Levels: debug, info, warn, error
- Outputs: stdout (container logs), file (local dev)
Real-Time Monitoring
- WebSocket Events: Live system state
- Monitor UI: Visual dashboard (
/apps/monitor) - Metrics: Job throughput, worker utilization, queue depth
Telemetry Architecture
Location: /apps/api/src/index.ts, /packages/core/src/telemetry/
Workflow Telemetry Client
Usage:
typescript
import { WorkflowTelemetryClient } from '@emp/core';
// Initialize for workflow
const telemetry = new WorkflowTelemetryClient(
workflowId, // Used as trace_id
totalSteps
);
// Track step execution
await telemetry.executeStepWithTelemetry(
stepNumber,
`step-${stepNumber}-execution`,
async (span) => {
span.setAttribute('job_id', jobId);
span.setAttribute('job_type', 'comfyui');
// Execute step
const result = await processStep(job);
span.setStatus({ code: SpanStatusCode.OK });
return result;
}
);
// Finalize workflow
await telemetry.finalizeWorkflow({
totalJobsSubmitted: 3,
completedJobs: 3,
failedJobs: 0
});Trace Context Propagation
WebSocket Messages:
typescript
{
type: "job_submitted",
job_id: "step-1",
trace_context: {
trace_id: "workflow-abc",
span_id: "span-submit-1",
trace_flags: 1
}
}File Locations
- Telemetry Client:
/packages/core/src/telemetry/workflow-telemetry-client.ts - OTEL Setup:
/apps/api/src/index.ts - Logger:
/packages/core/src/utils/logger.ts - Fluent Bit Config:
/apps/api/conf/fluent-bit.conf - Entrypoint Scripts:
/apps/api/telemetry-entrypoint-functions.sh
See Also
- Architecture Overview - System design
- WebSocket API - Real-time events with trace context
- Internal APIs - Redis and webhook telemetry
