ComfyUI Logging Architecture - Three-Tier Implementation Plan
Overview
This document describes the three-tier logging architecture for ComfyUI that provides comprehensive observability while minimizing noise and maintaining minimal invasiveness to the ComfyUI core.
Architecture Tiers
- OpenTelemetry (Dash0): ALL logs → Comprehensive monitoring and historical analysis
- Redis Pub/Sub: Progress patterns → Real-time UI updates (ephemeral)
- Redis Streams: Failure patterns → Worker detection and action (persistent)
ComfyUI Logs
↓
[LogInterceptor] ← Existing ComfyUI logger
↓
[LogStreamingWrapper] ← Our transparent wrapper
↓
├─→ [OTEL] → ALL logs → Dash0
├─→ [Pub/Sub] → Progress patterns → machine:{id}:progress
└─→ [Stream] → Failure patterns → job:events:{id}Design Principles
- Minimal Invasiveness: No changes to
logger.py- all logic in separate modules - Pattern-Based Routing: Regex patterns classify logs and route appropriately
- Transparent Wrapping:
LogStreamingWrapperintercepts writes without modifying existing behavior - Consistent Architecture: Python OTEL integration mirrors Node.js
@emp/telemetrypackage
Phase 1: OpenTelemetry Integration
Goal
Send ALL logs to Dash0/OpenTelemetry for comprehensive monitoring and historical analysis.
Implementation
File: packages/comfyui/app/otel_integration.py (NEW)
python
"""
OpenTelemetry integration for ComfyUI
Mirrors Node.js @emp/telemetry architecture for consistent observability
"""
import os
import logging
from typing import Optional
try:
from opentelemetry.sdk._logs import LoggerProvider, BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry._logs import set_logger_provider
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.logging import LoggingInstrumentation
OTEL_AVAILABLE = True
except ImportError:
OTEL_AVAILABLE = False
logging.warning("OpenTelemetry not available - OTEL logging disabled")
def init_otel_logging() -> bool:
"""
Initialize OpenTelemetry logging for ComfyUI
Mirrors @emp/telemetry configuration:
- Uses same endpoints (DASH0_ENDPOINT or OTEL_COLLECTOR_ENDPOINT)
- Uses same authentication (DASH0_AUTH_TOKEN)
- Uses same resource attributes (service.name, machine.id, job.id)
Environment Variables:
TELEMETRY_TARGET: 'dash0', 'collector', 'remote-collector', or 'disabled'
DASH0_ENDPOINT: Dash0 OTLP endpoint (required if TELEMETRY_TARGET=dash0)
DASH0_AUTH_TOKEN: Dash0 authentication token (required if TELEMETRY_TARGET=dash0)
DASH0_DATASET: Dash0 dataset name (required if TELEMETRY_TARGET=dash0)
OTEL_COLLECTOR_ENDPOINT: Collector endpoint (required if TELEMETRY_TARGET=collector)
MACHINE_ID: Machine identifier (optional, added to resource attributes)
COMFYUI_JOB_ID: Job identifier (optional, added to resource attributes)
Returns:
bool: True if initialization succeeded, False otherwise
"""
if not OTEL_AVAILABLE:
logging.warning("[OTEL] OpenTelemetry packages not installed - skipping")
return False
# Check if enabled
telemetry_target = os.getenv('TELEMETRY_TARGET')
if not telemetry_target or telemetry_target == 'disabled':
logging.info("[OTEL] Telemetry disabled (TELEMETRY_TARGET not set or 'disabled')")
return False
try:
# Determine endpoint based on TELEMETRY_TARGET (mirrors Node.js logic)
if telemetry_target == 'dash0':
endpoint = os.getenv('DASH0_ENDPOINT')
auth_token = os.getenv('DASH0_AUTH_TOKEN')
dataset = os.getenv('DASH0_DATASET')
if not endpoint or not auth_token or not dataset:
raise ValueError("TELEMETRY_TARGET=dash0 requires DASH0_ENDPOINT, DASH0_AUTH_TOKEN, and DASH0_DATASET")
headers = {
'Authorization': f'Bearer {auth_token}',
'dash0-dataset': dataset
}
elif telemetry_target in ['collector', 'remote-collector']:
endpoint = os.getenv('OTEL_COLLECTOR_ENDPOINT')
if not endpoint:
raise ValueError(f"TELEMETRY_TARGET={telemetry_target} requires OTEL_COLLECTOR_ENDPOINT")
headers = {}
else:
raise ValueError(f"Unknown TELEMETRY_TARGET: {telemetry_target}")
# Create resource (mirrors @emp/telemetry resource attributes)
resource_attrs = {
'service.name': 'comfyui',
'service.namespace': os.getenv('OTEL_SERVICE_NAMESPACE', 'emerge'),
'service.version': '1.0.0',
'deployment.environment': os.getenv('NODE_ENV', 'development'),
}
# Add machine.id and job.id if available
machine_id = os.getenv('MACHINE_ID')
if machine_id:
resource_attrs['machine.id'] = machine_id
job_id = os.getenv('COMFYUI_JOB_ID')
if job_id:
resource_attrs['job.id'] = job_id
resource = Resource.create(resource_attrs)
# Create OTLP exporter
exporter = OTLPLogExporter(
endpoint=endpoint,
headers=headers,
)
# Create logger provider with batch processor
logger_provider = LoggerProvider(resource=resource)
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(
exporter,
max_queue_size=2048,
max_export_batch_size=512,
schedule_delay_millis=5000,
)
)
# Set as global logger provider
set_logger_provider(logger_provider)
# Instrument Python logging (like WinstonInstrumentation for Node.js)
LoggingInstrumentation().instrument(set_logging_format=False)
logging.info(f"[OTEL] ✅ OpenTelemetry logging initialized (endpoint: {endpoint})")
return True
except Exception as e:
logging.error(f"[OTEL] Failed to initialize: {e}")
return FalseIntegration: Update packages/comfyui/main.py
python
# Add after imports, before other initialization
# Optional OpenTelemetry logging (Phase 1 - sends ALL logs to Dash0)
if os.getenv('ENABLE_OTEL_LOGGING') == 'true':
try:
from app.otel_integration import init_otel_logging
otel_success = init_otel_logging()
if otel_success:
logging.info("✅ OpenTelemetry logging initialized")
except Exception as e:
logging.warning(f"Failed to initialize OTEL logging: {e}")Environment Variables (Add to PM2 config)
javascript
// apps/machine/src/config/enhanced-pm2-ecosystem-generator.js
// Add to comfyui-gpu* services
ENABLE_OTEL_LOGGING: 'true',
TELEMETRY_TARGET: 'dash0', // or 'collector', 'disabled'
// DASH0_* variables already present in .env.secrets.localTesting Phase 1
bash
# 1. Start machine with OTEL enabled
ENABLE_OTEL_LOGGING=true pnpm machines:basic:local:up:build
# 2. Check ComfyUI logs for initialization
docker logs basic-machine-local | grep OTEL
# Expected output:
# [OTEL] ✅ OpenTelemetry logging initialized (endpoint: https://...)
# 3. Verify logs appear in Dash0
# - Navigate to Dash0 dashboard
# - Filter by service.name: comfyui
# - Should see ALL ComfyUI logs with machine.id and job.id attributesPhase 2: Pattern-Based Redis Routing
Goal
Route specific log patterns to Redis for real-time monitoring and worker action:
- Progress patterns → Pub/Sub → Real-time UI updates
- Failure patterns → Streams → Worker detection and remediation
Implementation
File: packages/comfyui/app/redis_log_stream.py (REFACTOR)
python
"""
Pattern-based Redis log streaming for ComfyUI
Routes logs to Pub/Sub (progress) and Streams (failures) based on regex patterns
"""
import os
import re
import json
import logging
import sys
from typing import Optional, Dict, Any, List
from datetime import datetime
try:
import redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
logging.warning("Redis not available - log streaming disabled")
class LogPatternMatcher:
"""
Classifies log messages using regex patterns
Progress patterns: ComfyUI describing its own progress
Failure patterns: Errors, exceptions, critical failures
"""
PROGRESS_PATTERNS = [
r'Processing.*?(\d+)/(\d+)', # "Processing 15/100"
r'Loading.*?model', # "Loading model..."
r'Step\s+\d+', # "Step 5"
r'(\d+)%', # "45%"
r'FETCH.*?(\d+)/(\d+)', # "FETCH ComfyRegistry Data: 75/104"
r'Downloading.*?model', # "Downloading model..."
r'Generating.*?(\d+)/(\d+)', # "Generating frame 5/100"
]
FAILURE_PATTERNS = [
r'(?i)error[:\s]', # "Error:" or "ERROR:"
r'(?i)exception', # "Exception"
r'(?i)traceback', # Traceback
r'(?i)cuda\s+error', # "CUDA error"
r'(?i)out\s+of\s+memory', # "Out of memory"
r'(?i)fatal', # "Fatal"
r'(?i)failed\s+to', # "Failed to..."
r'(?i)cannot\s+', # "Cannot..."
r'(?i)unable\s+to', # "Unable to..."
]
def __init__(self):
self.progress_compiled = [re.compile(p) for p in self.PROGRESS_PATTERNS]
self.failure_compiled = [re.compile(p) for p in self.FAILURE_PATTERNS]
def is_progress(self, message: str) -> bool:
"""Check if message matches progress patterns"""
return any(pattern.search(message) for pattern in self.progress_compiled)
def is_failure(self, message: str) -> bool:
"""Check if message matches failure patterns"""
return any(pattern.search(message) for pattern in self.failure_compiled)
def extract_progress(self, message: str) -> Optional[Dict[str, Any]]:
"""
Extract structured progress data from message
Returns: {"current": 15, "total": 100, "message": "..."} or None
"""
for pattern in self.progress_compiled:
match = pattern.search(message)
if match and len(match.groups()) >= 2:
try:
return {
"current": int(match.group(1)),
"total": int(match.group(2)),
"message": message.strip()
}
except (ValueError, IndexError):
continue
# If no numerical progress found, just return message
if self.is_progress(message):
return {"message": message.strip()}
return None
class RedisLogStreamer:
"""
Routes logs to Redis based on patterns:
- Progress → Pub/Sub (machine:{machine_id}:progress)
- Failures → Stream (job:events:{job_id} with event_type: 'log')
"""
def __init__(self, redis_host: str, redis_port: int, machine_id: str, job_id: Optional[str] = None):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.machine_id = machine_id
self.job_id = job_id
self.pattern_matcher = LogPatternMatcher()
logging.info(f"[RedisLogStreamer] Initialized (machine={machine_id}, job={job_id})")
def handle_log_line(self, message: str):
"""
Process a single log line and route to appropriate destination(s)
Routing logic:
- Progress patterns → Pub/Sub
- Failure patterns → Stream (only if job_id is set)
"""
if not message or message.isspace():
return
# Check for progress patterns → Pub/Sub
if self.pattern_matcher.is_progress(message):
progress_data = self.pattern_matcher.extract_progress(message)
self._publish_progress(message, progress_data)
# Check for failure patterns → Stream (if job_id set)
if self.pattern_matcher.is_failure(message):
if self.job_id:
self._publish_failure(message)
else:
logging.debug(f"[RedisLogStreamer] Failure pattern detected but no job_id set: {message[:100]}")
def _publish_progress(self, message: str, progress_data: Optional[Dict[str, Any]]):
"""
Publish progress to Redis Pub/Sub
Channel: machine:{machine_id}:progress
"""
try:
channel = f"machine:{self.machine_id}:progress"
payload = {
"timestamp": datetime.now().isoformat(),
"machine_id": self.machine_id,
"message": message.strip(),
"progress": progress_data
}
self.redis_client.publish(channel, json.dumps(payload))
logging.debug(f"[RedisLogStreamer] Published progress to {channel}")
except Exception as e:
logging.error(f"[RedisLogStreamer] Failed to publish progress: {e}")
def _publish_failure(self, message: str):
"""
Publish failure to Redis Stream
Stream: job:events:{job_id}
Event type: 'log'
"""
try:
stream_key = f"job:events:{self.job_id}"
event_data = {
"event_type": "log",
"timestamp": datetime.now().isoformat(),
"machine_id": self.machine_id,
"job_id": self.job_id,
"level": "ERROR",
"message": message.strip()
}
self.redis_client.xadd(stream_key, event_data, maxlen=1000)
logging.warning(f"[RedisLogStreamer] Published failure to stream {stream_key}: {message[:100]}")
except Exception as e:
logging.error(f"[RedisLogStreamer] Failed to publish failure: {e}")
class LogStreamingWrapper:
"""
Wraps LogInterceptor to intercept writes and route to Redis
This is transparent to logger.py - no modifications needed.
All write() calls pass through after being processed.
"""
def __init__(self, original_interceptor, streamer: RedisLogStreamer):
self.original = original_interceptor
self.streamer = streamer
logging.info("[LogStreamingWrapper] Initialized - intercepting log writes")
def write(self, data):
"""Intercept write, route to Redis, then pass through"""
if data and not data.isspace():
self.streamer.handle_log_line(data)
# Pass through to original LogInterceptor
return self.original.write(data)
def __getattr__(self, name):
"""Forward all other methods/attributes to original"""
return getattr(self.original, name)
def init_redis_log_stream() -> bool:
"""
Initialize Redis log streaming with pattern-based routing
Environment Variables:
REDIS_HOST: Redis server host
REDIS_PORT: Redis server port
MACHINE_ID: Machine identifier (required)
COMFYUI_JOB_ID: Job identifier (optional, enables failure streaming)
Returns:
bool: True if initialization succeeded, False otherwise
"""
if not REDIS_AVAILABLE:
logging.warning("[RedisLogStream] Redis not available - skipping")
return False
try:
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', 6379))
machine_id = os.getenv('MACHINE_ID')
job_id = os.getenv('COMFYUI_JOB_ID')
if not machine_id:
raise ValueError("MACHINE_ID environment variable required for Redis log streaming")
# Create streamer
streamer = RedisLogStreamer(redis_host, redis_port, machine_id, job_id)
# Wrap existing stdout/stderr interceptors
from app.logger import stdout_interceptor, stderr_interceptor
if stdout_interceptor:
sys.stdout = LogStreamingWrapper(stdout_interceptor, streamer)
logging.info("[RedisLogStream] ✅ Wrapped stdout interceptor")
if stderr_interceptor:
sys.stderr = LogStreamingWrapper(stderr_interceptor, streamer)
logging.info("[RedisLogStream] ✅ Wrapped stderr interceptor")
logging.info(f"[RedisLogStream] ✅ Initialized (progress → Pub/Sub, failures → Stream)")
return True
except Exception as e:
logging.error(f"[RedisLogStream] Failed to initialize: {e}")
return FalseIntegration: Update packages/comfyui/main.py
python
# Add after OTEL initialization
# Optional Redis log streaming (Phase 2 - patterns to Pub/Sub and Streams)
if os.getenv('ENABLE_REDIS_LOG_STREAM') == 'true':
try:
from app.redis_log_stream import init_redis_log_stream
redis_success = init_redis_log_stream()
if redis_success:
logging.info("✅ Redis log streaming initialized")
except Exception as e:
logging.warning(f"Failed to initialize Redis log streaming: {e}")Testing Phase 2
bash
# Terminal 1: Start machine
ENABLE_REDIS_LOG_STREAM=true pnpm machines:basic:local:up:build
# Terminal 2: Monitor progress pub/sub
pnpm monitor:progress
# Terminal 3: Monitor unified stream (for failures)
pnpm monitor:streams
# Terminal 4: Trigger a job with both progress and errors
# - Should see progress updates in Terminal 2
# - Should see failure logs in Terminal 3 (if job has errors)Phase 3: Monitoring Scripts
Goal
Provide developer-friendly tools to monitor progress pub/sub and unified streams.
Implementation
File: scripts/monitor-pubsub-progress.sh (NEW)
bash
#!/bin/bash
# Monitor Redis Pub/Sub for ComfyUI Progress Events
# Shows real-time progress from all machines or specific machines
set -e
# Colors
GREEN='\033[0;32m'
BLUE='\033[0;34m'
YELLOW='\033[1;33m'
CYAN='\033[0;36m'
NC='\033[0m'
REDIS_HOST="${REDIS_HOST:-localhost}"
REDIS_PORT="${REDIS_PORT:-6379}"
# Parse arguments
PATTERN="${1:-machine:*:progress}" # Default: all machines
MACHINE_ID="${2:-}"
if [ -n "$MACHINE_ID" ]; then
PATTERN="machine:${MACHINE_ID}:progress"
fi
echo -e "${BLUE}📊 Redis Pub/Sub Monitor - ComfyUI Progress${NC}"
echo -e "${BLUE}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${NC}"
echo ""
echo -e "${YELLOW}Redis: ${REDIS_HOST}:${REDIS_PORT}${NC}"
echo -e "${YELLOW}Pattern: ${PATTERN}${NC}"
echo ""
echo -e "${GREEN}Listening for real-time progress events...${NC}"
echo -e "${GREEN}Press Ctrl+C to stop${NC}"
echo ""
# Subscribe to pub/sub channel
redis-cli -h $REDIS_HOST -p $REDIS_PORT --csv PSUBSCRIBE "$PATTERN" | while IFS=',' read -r type pattern channel message; do
# Skip subscription confirmation messages
if [ "$type" = '"psubscribe"' ] || [ "$type" = '"subscribe"' ]; then
continue
fi
# Parse message (it's JSON)
if [ "$type" = '"pmessage"' ] || [ "$type" = '"message"' ]; then
# Remove quotes and parse channel
channel_clean=$(echo "$channel" | tr -d '"')
message_clean=$(echo "$message" | tr -d '"' | sed 's/\\"/"/g')
# Colorize progress events
echo -e "${CYAN}📊 PROGRESS${NC} [${YELLOW}${channel_clean}${NC}] ${message_clean}"
fi
doneUpdate: package.json (Add monitoring scripts)
json
{
"scripts": {
"monitor:progress": "bash scripts/monitor-pubsub-progress.sh",
"monitor:streams": "bash scripts/monitor-redis-streams.sh",
"monitor:all": "concurrently -n progress,streams \"pnpm monitor:progress\" \"pnpm monitor:streams\""
}
}Testing Phase 3
bash
# Monitor progress only
pnpm monitor:progress
# Monitor streams only (failures + completions)
pnpm monitor:streams
# Monitor both simultaneously
pnpm monitor:allComplete Environment Variables
Development (.env.local)
bash
# Phase 1: OpenTelemetry
ENABLE_OTEL_LOGGING=true
TELEMETRY_TARGET=dash0 # or 'collector', 'disabled'
# DASH0_* variables in .env.secrets.local
# Phase 2: Redis Pattern Routing
ENABLE_REDIS_LOG_STREAM=true
REDIS_HOST=localhost
REDIS_PORT=6379
MACHINE_ID=local-machine # Set dynamically per machine
# COMFYUI_JOB_ID set dynamically when job startsProduction (PM2 Config)
javascript
// apps/machine/src/config/enhanced-pm2-ecosystem-generator.js
{
name: 'comfyui-gpu0',
env: {
// Phase 1: OTEL
ENABLE_OTEL_LOGGING: 'true',
TELEMETRY_TARGET: process.env.TELEMETRY_TARGET || 'dash0',
// Phase 2: Redis
ENABLE_REDIS_LOG_STREAM: 'true',
REDIS_HOST: process.env.REDIS_HOST,
REDIS_PORT: process.env.REDIS_PORT,
MACHINE_ID: process.env.MACHINE_ID,
// COMFYUI_JOB_ID: Set dynamically when starting job
}
}Testing the Complete Architecture
1. Verify All Three Tiers
bash
# 1. Start machine with all features enabled
ENABLE_OTEL_LOGGING=true ENABLE_REDIS_LOG_STREAM=true pnpm machines:basic:local:up:build
# 2. Terminal 1: Monitor progress pub/sub
pnpm monitor:progress
# 3. Terminal 2: Monitor failure streams
pnpm monitor:streams
# 4. Terminal 3: Trigger a test job
# (submit job via API or Studio)
# 5. Verify in Dash0
# - Navigate to Dash0 dashboard
# - Filter: service.name=comfyui, machine.id=local-machine
# - Should see ALL logs (progress, failures, everything)
# 6. Verify Pub/Sub (Terminal 1)
# - Should see progress events like:
# 📊 PROGRESS [machine:local-machine:progress] {"message": "Loading model...", "progress": {...}}
# 7. Verify Streams (Terminal 2)
# - Should see failure logs if job errors:
# 📝 LOG [job:events:123] {"event_type": "log", "level": "ERROR", "message": "..."}2. Verify Pattern Routing
python
# Test script to verify pattern matching
from app.redis_log_stream import LogPatternMatcher
matcher = LogPatternMatcher()
# Should match progress
assert matcher.is_progress("Processing 15/100")
assert matcher.is_progress("Loading model checkpoint...")
assert matcher.is_progress("Step 5 of generation")
# Should match failures
assert matcher.is_failure("Error: CUDA out of memory")
assert matcher.is_failure("Exception in model loading")
assert matcher.is_failure("Fatal error occurred")
# Should not match
assert not matcher.is_progress("Just a normal log message")
assert not matcher.is_failure("Info: Model loaded successfully")3. Performance Validation
bash
# Ensure logging doesn't impact ComfyUI performance
# Before: Baseline job duration
time curl -X POST http://localhost:8188/api/prompt ...
# After: With all logging enabled
ENABLE_OTEL_LOGGING=true ENABLE_REDIS_LOG_STREAM=true
time curl -X POST http://localhost:8188/api/prompt ...
# Difference should be negligible (<5%)Migration Path
Rollout Strategy
Phase 1 Only (Week 1):
- Deploy OTEL integration
- Verify logs in Dash0
- No Redis changes yet
Phase 2 Partial (Week 2):
- Deploy pattern-based routing
- Enable progress pub/sub only (comment out failure stream)
- Verify UI updates work
Phase 2 Complete (Week 3):
- Enable failure stream routing
- Verify worker detection works
- Remove old pub/sub implementation
Phase 3 (Week 4):
- Deploy monitoring scripts
- Update documentation
- Train team on new monitoring tools
Rollback Plan
All changes are opt-in via environment variables:
bash
# Disable OTEL
ENABLE_OTEL_LOGGING=false
# Disable Redis pattern routing
ENABLE_REDIS_LOG_STREAM=false
# System falls back to original behaviorSuccess Metrics
Phase 1 (OTEL)
- ✅ ALL ComfyUI logs visible in Dash0
- ✅ Logs tagged with machine.id and job.id
- ✅ Historical log analysis possible
- ✅ No performance degradation
Phase 2 (Pattern Routing)
- ✅ Progress events trigger real-time UI updates
- ✅ Failure logs trigger worker detection
- ✅ No false positives/negatives in pattern matching
- ✅ Pub/Sub latency <100ms
Phase 3 (Monitoring)
- ✅ Developers can monitor progress in real-time
- ✅ Developers can debug failures via streams
- ✅ Clear separation of concerns (progress vs failures)
Troubleshooting
OTEL Logs Not Appearing in Dash0
bash
# Check initialization
docker logs basic-machine-local | grep OTEL
# Verify environment variables
docker exec basic-machine-local env | grep -E 'TELEMETRY|DASH0'
# Test OTLP endpoint connectivity
docker exec basic-machine-local curl -v https://YOUR_DASH0_ENDPOINTProgress Not Appearing in Pub/Sub
bash
# Check Redis streaming initialization
docker logs basic-machine-local | grep RedisLogStream
# Verify pattern matching
# Add debug logging to LogPatternMatcher.is_progress()
# Monitor Redis directly
redis-cli MONITOR | grep progressFailures Not Appearing in Streams
bash
# Check if job_id is set
docker logs basic-machine-local | grep COMFYUI_JOB_ID
# Verify stream exists
redis-cli XLEN job:events:YOUR_JOB_ID
# Check pattern matching
# Add debug logging to LogPatternMatcher.is_failure()Future Enhancements
- Dynamic Pattern Updates: Allow pattern updates without restarting
- ML-Based Classification: Use ML to classify logs instead of regex
- Structured Logging: Parse ComfyUI logs into structured fields
- Performance Metrics: Track log processing latency and throughput
- Alert Rules: Configure alerts based on failure patterns in Dash0
References
- Node.js
@emp/telemetry: packages/telemetry/src/index.ts - OpenTelemetry Python Docs: https://opentelemetry.io/docs/languages/python/
- Redis Pub/Sub vs Streams: https://redis.io/docs/latest/develop/interact/pubsub/
- Dash0 OTLP Integration: https://docs.dash0.com/
