Skip to content

ComfyUI Logging Architecture - Three-Tier Implementation Plan

Overview

This document describes the three-tier logging architecture for ComfyUI that provides comprehensive observability while minimizing noise and maintaining minimal invasiveness to the ComfyUI core.

Architecture Tiers

  1. OpenTelemetry (Dash0): ALL logs → Comprehensive monitoring and historical analysis
  2. Redis Pub/Sub: Progress patterns → Real-time UI updates (ephemeral)
  3. Redis Streams: Failure patterns → Worker detection and action (persistent)
ComfyUI Logs

[LogInterceptor] ← Existing ComfyUI logger

[LogStreamingWrapper] ← Our transparent wrapper

     ├─→ [OTEL] → ALL logs → Dash0
     ├─→ [Pub/Sub] → Progress patterns → machine:{id}:progress
     └─→ [Stream] → Failure patterns → job:events:{id}

Design Principles

  • Minimal Invasiveness: No changes to logger.py - all logic in separate modules
  • Pattern-Based Routing: Regex patterns classify logs and route appropriately
  • Transparent Wrapping: LogStreamingWrapper intercepts writes without modifying existing behavior
  • Consistent Architecture: Python OTEL integration mirrors Node.js @emp/telemetry package

Phase 1: OpenTelemetry Integration

Goal

Send ALL logs to Dash0/OpenTelemetry for comprehensive monitoring and historical analysis.

Implementation

File: packages/comfyui/app/otel_integration.py (NEW)

python
"""
OpenTelemetry integration for ComfyUI
Mirrors Node.js @emp/telemetry architecture for consistent observability
"""
import os
import logging
from typing import Optional

try:
    from opentelemetry.sdk._logs import LoggerProvider, BatchLogRecordProcessor
    from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
    from opentelemetry._logs import set_logger_provider
    from opentelemetry.sdk.resources import Resource
    from opentelemetry.instrumentation.logging import LoggingInstrumentation
    OTEL_AVAILABLE = True
except ImportError:
    OTEL_AVAILABLE = False
    logging.warning("OpenTelemetry not available - OTEL logging disabled")

def init_otel_logging() -> bool:
    """
    Initialize OpenTelemetry logging for ComfyUI

    Mirrors @emp/telemetry configuration:
    - Uses same endpoints (DASH0_ENDPOINT or OTEL_COLLECTOR_ENDPOINT)
    - Uses same authentication (DASH0_AUTH_TOKEN)
    - Uses same resource attributes (service.name, machine.id, job.id)

    Environment Variables:
        TELEMETRY_TARGET: 'dash0', 'collector', 'remote-collector', or 'disabled'
        DASH0_ENDPOINT: Dash0 OTLP endpoint (required if TELEMETRY_TARGET=dash0)
        DASH0_AUTH_TOKEN: Dash0 authentication token (required if TELEMETRY_TARGET=dash0)
        DASH0_DATASET: Dash0 dataset name (required if TELEMETRY_TARGET=dash0)
        OTEL_COLLECTOR_ENDPOINT: Collector endpoint (required if TELEMETRY_TARGET=collector)
        MACHINE_ID: Machine identifier (optional, added to resource attributes)
        COMFYUI_JOB_ID: Job identifier (optional, added to resource attributes)

    Returns:
        bool: True if initialization succeeded, False otherwise
    """
    if not OTEL_AVAILABLE:
        logging.warning("[OTEL] OpenTelemetry packages not installed - skipping")
        return False

    # Check if enabled
    telemetry_target = os.getenv('TELEMETRY_TARGET')
    if not telemetry_target or telemetry_target == 'disabled':
        logging.info("[OTEL] Telemetry disabled (TELEMETRY_TARGET not set or 'disabled')")
        return False

    try:
        # Determine endpoint based on TELEMETRY_TARGET (mirrors Node.js logic)
        if telemetry_target == 'dash0':
            endpoint = os.getenv('DASH0_ENDPOINT')
            auth_token = os.getenv('DASH0_AUTH_TOKEN')
            dataset = os.getenv('DASH0_DATASET')

            if not endpoint or not auth_token or not dataset:
                raise ValueError("TELEMETRY_TARGET=dash0 requires DASH0_ENDPOINT, DASH0_AUTH_TOKEN, and DASH0_DATASET")

            headers = {
                'Authorization': f'Bearer {auth_token}',
                'dash0-dataset': dataset
            }
        elif telemetry_target in ['collector', 'remote-collector']:
            endpoint = os.getenv('OTEL_COLLECTOR_ENDPOINT')
            if not endpoint:
                raise ValueError(f"TELEMETRY_TARGET={telemetry_target} requires OTEL_COLLECTOR_ENDPOINT")
            headers = {}
        else:
            raise ValueError(f"Unknown TELEMETRY_TARGET: {telemetry_target}")

        # Create resource (mirrors @emp/telemetry resource attributes)
        resource_attrs = {
            'service.name': 'comfyui',
            'service.namespace': os.getenv('OTEL_SERVICE_NAMESPACE', 'emerge'),
            'service.version': '1.0.0',
            'deployment.environment': os.getenv('NODE_ENV', 'development'),
        }

        # Add machine.id and job.id if available
        machine_id = os.getenv('MACHINE_ID')
        if machine_id:
            resource_attrs['machine.id'] = machine_id

        job_id = os.getenv('COMFYUI_JOB_ID')
        if job_id:
            resource_attrs['job.id'] = job_id

        resource = Resource.create(resource_attrs)

        # Create OTLP exporter
        exporter = OTLPLogExporter(
            endpoint=endpoint,
            headers=headers,
        )

        # Create logger provider with batch processor
        logger_provider = LoggerProvider(resource=resource)
        logger_provider.add_log_record_processor(
            BatchLogRecordProcessor(
                exporter,
                max_queue_size=2048,
                max_export_batch_size=512,
                schedule_delay_millis=5000,
            )
        )

        # Set as global logger provider
        set_logger_provider(logger_provider)

        # Instrument Python logging (like WinstonInstrumentation for Node.js)
        LoggingInstrumentation().instrument(set_logging_format=False)

        logging.info(f"[OTEL] ✅ OpenTelemetry logging initialized (endpoint: {endpoint})")
        return True

    except Exception as e:
        logging.error(f"[OTEL] Failed to initialize: {e}")
        return False

Integration: Update packages/comfyui/main.py

python
# Add after imports, before other initialization

# Optional OpenTelemetry logging (Phase 1 - sends ALL logs to Dash0)
if os.getenv('ENABLE_OTEL_LOGGING') == 'true':
    try:
        from app.otel_integration import init_otel_logging
        otel_success = init_otel_logging()
        if otel_success:
            logging.info("✅ OpenTelemetry logging initialized")
    except Exception as e:
        logging.warning(f"Failed to initialize OTEL logging: {e}")

Environment Variables (Add to PM2 config)

javascript
// apps/machine/src/config/enhanced-pm2-ecosystem-generator.js
// Add to comfyui-gpu* services

ENABLE_OTEL_LOGGING: 'true',
TELEMETRY_TARGET: 'dash0', // or 'collector', 'disabled'
// DASH0_* variables already present in .env.secrets.local

Testing Phase 1

bash
# 1. Start machine with OTEL enabled
ENABLE_OTEL_LOGGING=true pnpm machines:basic:local:up:build

# 2. Check ComfyUI logs for initialization
docker logs basic-machine-local | grep OTEL

# Expected output:
# [OTEL] ✅ OpenTelemetry logging initialized (endpoint: https://...)

# 3. Verify logs appear in Dash0
# - Navigate to Dash0 dashboard
# - Filter by service.name: comfyui
# - Should see ALL ComfyUI logs with machine.id and job.id attributes

Phase 2: Pattern-Based Redis Routing

Goal

Route specific log patterns to Redis for real-time monitoring and worker action:

  • Progress patterns → Pub/Sub → Real-time UI updates
  • Failure patterns → Streams → Worker detection and remediation

Implementation

File: packages/comfyui/app/redis_log_stream.py (REFACTOR)

python
"""
Pattern-based Redis log streaming for ComfyUI
Routes logs to Pub/Sub (progress) and Streams (failures) based on regex patterns
"""
import os
import re
import json
import logging
import sys
from typing import Optional, Dict, Any, List
from datetime import datetime

try:
    import redis
    REDIS_AVAILABLE = True
except ImportError:
    REDIS_AVAILABLE = False
    logging.warning("Redis not available - log streaming disabled")


class LogPatternMatcher:
    """
    Classifies log messages using regex patterns

    Progress patterns: ComfyUI describing its own progress
    Failure patterns: Errors, exceptions, critical failures
    """

    PROGRESS_PATTERNS = [
        r'Processing.*?(\d+)/(\d+)',         # "Processing 15/100"
        r'Loading.*?model',                  # "Loading model..."
        r'Step\s+\d+',                       # "Step 5"
        r'(\d+)%',                           # "45%"
        r'FETCH.*?(\d+)/(\d+)',              # "FETCH ComfyRegistry Data: 75/104"
        r'Downloading.*?model',              # "Downloading model..."
        r'Generating.*?(\d+)/(\d+)',         # "Generating frame 5/100"
    ]

    FAILURE_PATTERNS = [
        r'(?i)error[:\s]',                   # "Error:" or "ERROR:"
        r'(?i)exception',                    # "Exception"
        r'(?i)traceback',                    # Traceback
        r'(?i)cuda\s+error',                 # "CUDA error"
        r'(?i)out\s+of\s+memory',            # "Out of memory"
        r'(?i)fatal',                        # "Fatal"
        r'(?i)failed\s+to',                  # "Failed to..."
        r'(?i)cannot\s+',                    # "Cannot..."
        r'(?i)unable\s+to',                  # "Unable to..."
    ]

    def __init__(self):
        self.progress_compiled = [re.compile(p) for p in self.PROGRESS_PATTERNS]
        self.failure_compiled = [re.compile(p) for p in self.FAILURE_PATTERNS]

    def is_progress(self, message: str) -> bool:
        """Check if message matches progress patterns"""
        return any(pattern.search(message) for pattern in self.progress_compiled)

    def is_failure(self, message: str) -> bool:
        """Check if message matches failure patterns"""
        return any(pattern.search(message) for pattern in self.failure_compiled)

    def extract_progress(self, message: str) -> Optional[Dict[str, Any]]:
        """
        Extract structured progress data from message
        Returns: {"current": 15, "total": 100, "message": "..."} or None
        """
        for pattern in self.progress_compiled:
            match = pattern.search(message)
            if match and len(match.groups()) >= 2:
                try:
                    return {
                        "current": int(match.group(1)),
                        "total": int(match.group(2)),
                        "message": message.strip()
                    }
                except (ValueError, IndexError):
                    continue

        # If no numerical progress found, just return message
        if self.is_progress(message):
            return {"message": message.strip()}

        return None


class RedisLogStreamer:
    """
    Routes logs to Redis based on patterns:
    - Progress → Pub/Sub (machine:{machine_id}:progress)
    - Failures → Stream (job:events:{job_id} with event_type: 'log')
    """

    def __init__(self, redis_host: str, redis_port: int, machine_id: str, job_id: Optional[str] = None):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.machine_id = machine_id
        self.job_id = job_id
        self.pattern_matcher = LogPatternMatcher()

        logging.info(f"[RedisLogStreamer] Initialized (machine={machine_id}, job={job_id})")

    def handle_log_line(self, message: str):
        """
        Process a single log line and route to appropriate destination(s)

        Routing logic:
        - Progress patterns → Pub/Sub
        - Failure patterns → Stream (only if job_id is set)
        """
        if not message or message.isspace():
            return

        # Check for progress patterns → Pub/Sub
        if self.pattern_matcher.is_progress(message):
            progress_data = self.pattern_matcher.extract_progress(message)
            self._publish_progress(message, progress_data)

        # Check for failure patterns → Stream (if job_id set)
        if self.pattern_matcher.is_failure(message):
            if self.job_id:
                self._publish_failure(message)
            else:
                logging.debug(f"[RedisLogStreamer] Failure pattern detected but no job_id set: {message[:100]}")

    def _publish_progress(self, message: str, progress_data: Optional[Dict[str, Any]]):
        """
        Publish progress to Redis Pub/Sub
        Channel: machine:{machine_id}:progress
        """
        try:
            channel = f"machine:{self.machine_id}:progress"
            payload = {
                "timestamp": datetime.now().isoformat(),
                "machine_id": self.machine_id,
                "message": message.strip(),
                "progress": progress_data
            }

            self.redis_client.publish(channel, json.dumps(payload))
            logging.debug(f"[RedisLogStreamer] Published progress to {channel}")

        except Exception as e:
            logging.error(f"[RedisLogStreamer] Failed to publish progress: {e}")

    def _publish_failure(self, message: str):
        """
        Publish failure to Redis Stream
        Stream: job:events:{job_id}
        Event type: 'log'
        """
        try:
            stream_key = f"job:events:{self.job_id}"
            event_data = {
                "event_type": "log",
                "timestamp": datetime.now().isoformat(),
                "machine_id": self.machine_id,
                "job_id": self.job_id,
                "level": "ERROR",
                "message": message.strip()
            }

            self.redis_client.xadd(stream_key, event_data, maxlen=1000)
            logging.warning(f"[RedisLogStreamer] Published failure to stream {stream_key}: {message[:100]}")

        except Exception as e:
            logging.error(f"[RedisLogStreamer] Failed to publish failure: {e}")


class LogStreamingWrapper:
    """
    Wraps LogInterceptor to intercept writes and route to Redis

    This is transparent to logger.py - no modifications needed.
    All write() calls pass through after being processed.
    """

    def __init__(self, original_interceptor, streamer: RedisLogStreamer):
        self.original = original_interceptor
        self.streamer = streamer
        logging.info("[LogStreamingWrapper] Initialized - intercepting log writes")

    def write(self, data):
        """Intercept write, route to Redis, then pass through"""
        if data and not data.isspace():
            self.streamer.handle_log_line(data)

        # Pass through to original LogInterceptor
        return self.original.write(data)

    def __getattr__(self, name):
        """Forward all other methods/attributes to original"""
        return getattr(self.original, name)


def init_redis_log_stream() -> bool:
    """
    Initialize Redis log streaming with pattern-based routing

    Environment Variables:
        REDIS_HOST: Redis server host
        REDIS_PORT: Redis server port
        MACHINE_ID: Machine identifier (required)
        COMFYUI_JOB_ID: Job identifier (optional, enables failure streaming)

    Returns:
        bool: True if initialization succeeded, False otherwise
    """
    if not REDIS_AVAILABLE:
        logging.warning("[RedisLogStream] Redis not available - skipping")
        return False

    try:
        redis_host = os.getenv('REDIS_HOST', 'localhost')
        redis_port = int(os.getenv('REDIS_PORT', 6379))
        machine_id = os.getenv('MACHINE_ID')
        job_id = os.getenv('COMFYUI_JOB_ID')

        if not machine_id:
            raise ValueError("MACHINE_ID environment variable required for Redis log streaming")

        # Create streamer
        streamer = RedisLogStreamer(redis_host, redis_port, machine_id, job_id)

        # Wrap existing stdout/stderr interceptors
        from app.logger import stdout_interceptor, stderr_interceptor

        if stdout_interceptor:
            sys.stdout = LogStreamingWrapper(stdout_interceptor, streamer)
            logging.info("[RedisLogStream] ✅ Wrapped stdout interceptor")

        if stderr_interceptor:
            sys.stderr = LogStreamingWrapper(stderr_interceptor, streamer)
            logging.info("[RedisLogStream] ✅ Wrapped stderr interceptor")

        logging.info(f"[RedisLogStream] ✅ Initialized (progress → Pub/Sub, failures → Stream)")
        return True

    except Exception as e:
        logging.error(f"[RedisLogStream] Failed to initialize: {e}")
        return False

Integration: Update packages/comfyui/main.py

python
# Add after OTEL initialization

# Optional Redis log streaming (Phase 2 - patterns to Pub/Sub and Streams)
if os.getenv('ENABLE_REDIS_LOG_STREAM') == 'true':
    try:
        from app.redis_log_stream import init_redis_log_stream
        redis_success = init_redis_log_stream()
        if redis_success:
            logging.info("✅ Redis log streaming initialized")
    except Exception as e:
        logging.warning(f"Failed to initialize Redis log streaming: {e}")

Testing Phase 2

bash
# Terminal 1: Start machine
ENABLE_REDIS_LOG_STREAM=true pnpm machines:basic:local:up:build

# Terminal 2: Monitor progress pub/sub
pnpm monitor:progress

# Terminal 3: Monitor unified stream (for failures)
pnpm monitor:streams

# Terminal 4: Trigger a job with both progress and errors
# - Should see progress updates in Terminal 2
# - Should see failure logs in Terminal 3 (if job has errors)

Phase 3: Monitoring Scripts

Goal

Provide developer-friendly tools to monitor progress pub/sub and unified streams.

Implementation

File: scripts/monitor-pubsub-progress.sh (NEW)

bash
#!/bin/bash

# Monitor Redis Pub/Sub for ComfyUI Progress Events
# Shows real-time progress from all machines or specific machines

set -e

# Colors
GREEN='\033[0;32m'
BLUE='\033[0;34m'
YELLOW='\033[1;33m'
CYAN='\033[0;36m'
NC='\033[0m'

REDIS_HOST="${REDIS_HOST:-localhost}"
REDIS_PORT="${REDIS_PORT:-6379}"

# Parse arguments
PATTERN="${1:-machine:*:progress}"  # Default: all machines
MACHINE_ID="${2:-}"

if [ -n "$MACHINE_ID" ]; then
  PATTERN="machine:${MACHINE_ID}:progress"
fi

echo -e "${BLUE}📊 Redis Pub/Sub Monitor - ComfyUI Progress${NC}"
echo -e "${BLUE}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${NC}"
echo ""
echo -e "${YELLOW}Redis: ${REDIS_HOST}:${REDIS_PORT}${NC}"
echo -e "${YELLOW}Pattern: ${PATTERN}${NC}"
echo ""
echo -e "${GREEN}Listening for real-time progress events...${NC}"
echo -e "${GREEN}Press Ctrl+C to stop${NC}"
echo ""

# Subscribe to pub/sub channel
redis-cli -h $REDIS_HOST -p $REDIS_PORT --csv PSUBSCRIBE "$PATTERN" | while IFS=',' read -r type pattern channel message; do
  # Skip subscription confirmation messages
  if [ "$type" = '"psubscribe"' ] || [ "$type" = '"subscribe"' ]; then
    continue
  fi

  # Parse message (it's JSON)
  if [ "$type" = '"pmessage"' ] || [ "$type" = '"message"' ]; then
    # Remove quotes and parse channel
    channel_clean=$(echo "$channel" | tr -d '"')
    message_clean=$(echo "$message" | tr -d '"' | sed 's/\\"/"/g')

    # Colorize progress events
    echo -e "${CYAN}📊 PROGRESS${NC} [${YELLOW}${channel_clean}${NC}] ${message_clean}"
  fi
done

Update: package.json (Add monitoring scripts)

json
{
  "scripts": {
    "monitor:progress": "bash scripts/monitor-pubsub-progress.sh",
    "monitor:streams": "bash scripts/monitor-redis-streams.sh",
    "monitor:all": "concurrently -n progress,streams \"pnpm monitor:progress\" \"pnpm monitor:streams\""
  }
}

Testing Phase 3

bash
# Monitor progress only
pnpm monitor:progress

# Monitor streams only (failures + completions)
pnpm monitor:streams

# Monitor both simultaneously
pnpm monitor:all

Complete Environment Variables

Development (.env.local)

bash
# Phase 1: OpenTelemetry
ENABLE_OTEL_LOGGING=true
TELEMETRY_TARGET=dash0  # or 'collector', 'disabled'
# DASH0_* variables in .env.secrets.local

# Phase 2: Redis Pattern Routing
ENABLE_REDIS_LOG_STREAM=true
REDIS_HOST=localhost
REDIS_PORT=6379
MACHINE_ID=local-machine  # Set dynamically per machine
# COMFYUI_JOB_ID set dynamically when job starts

Production (PM2 Config)

javascript
// apps/machine/src/config/enhanced-pm2-ecosystem-generator.js

{
  name: 'comfyui-gpu0',
  env: {
    // Phase 1: OTEL
    ENABLE_OTEL_LOGGING: 'true',
    TELEMETRY_TARGET: process.env.TELEMETRY_TARGET || 'dash0',

    // Phase 2: Redis
    ENABLE_REDIS_LOG_STREAM: 'true',
    REDIS_HOST: process.env.REDIS_HOST,
    REDIS_PORT: process.env.REDIS_PORT,
    MACHINE_ID: process.env.MACHINE_ID,
    // COMFYUI_JOB_ID: Set dynamically when starting job
  }
}

Testing the Complete Architecture

1. Verify All Three Tiers

bash
# 1. Start machine with all features enabled
ENABLE_OTEL_LOGGING=true ENABLE_REDIS_LOG_STREAM=true pnpm machines:basic:local:up:build

# 2. Terminal 1: Monitor progress pub/sub
pnpm monitor:progress

# 3. Terminal 2: Monitor failure streams
pnpm monitor:streams

# 4. Terminal 3: Trigger a test job
# (submit job via API or Studio)

# 5. Verify in Dash0
# - Navigate to Dash0 dashboard
# - Filter: service.name=comfyui, machine.id=local-machine
# - Should see ALL logs (progress, failures, everything)

# 6. Verify Pub/Sub (Terminal 1)
# - Should see progress events like:
#   📊 PROGRESS [machine:local-machine:progress] {"message": "Loading model...", "progress": {...}}

# 7. Verify Streams (Terminal 2)
# - Should see failure logs if job errors:
#   📝 LOG [job:events:123] {"event_type": "log", "level": "ERROR", "message": "..."}

2. Verify Pattern Routing

python
# Test script to verify pattern matching

from app.redis_log_stream import LogPatternMatcher

matcher = LogPatternMatcher()

# Should match progress
assert matcher.is_progress("Processing 15/100")
assert matcher.is_progress("Loading model checkpoint...")
assert matcher.is_progress("Step 5 of generation")

# Should match failures
assert matcher.is_failure("Error: CUDA out of memory")
assert matcher.is_failure("Exception in model loading")
assert matcher.is_failure("Fatal error occurred")

# Should not match
assert not matcher.is_progress("Just a normal log message")
assert not matcher.is_failure("Info: Model loaded successfully")

3. Performance Validation

bash
# Ensure logging doesn't impact ComfyUI performance

# Before: Baseline job duration
time curl -X POST http://localhost:8188/api/prompt ...

# After: With all logging enabled
ENABLE_OTEL_LOGGING=true ENABLE_REDIS_LOG_STREAM=true
time curl -X POST http://localhost:8188/api/prompt ...

# Difference should be negligible (<5%)

Migration Path

Rollout Strategy

  1. Phase 1 Only (Week 1):

    • Deploy OTEL integration
    • Verify logs in Dash0
    • No Redis changes yet
  2. Phase 2 Partial (Week 2):

    • Deploy pattern-based routing
    • Enable progress pub/sub only (comment out failure stream)
    • Verify UI updates work
  3. Phase 2 Complete (Week 3):

    • Enable failure stream routing
    • Verify worker detection works
    • Remove old pub/sub implementation
  4. Phase 3 (Week 4):

    • Deploy monitoring scripts
    • Update documentation
    • Train team on new monitoring tools

Rollback Plan

All changes are opt-in via environment variables:

bash
# Disable OTEL
ENABLE_OTEL_LOGGING=false

# Disable Redis pattern routing
ENABLE_REDIS_LOG_STREAM=false

# System falls back to original behavior

Success Metrics

Phase 1 (OTEL)

  • ✅ ALL ComfyUI logs visible in Dash0
  • ✅ Logs tagged with machine.id and job.id
  • ✅ Historical log analysis possible
  • ✅ No performance degradation

Phase 2 (Pattern Routing)

  • ✅ Progress events trigger real-time UI updates
  • ✅ Failure logs trigger worker detection
  • ✅ No false positives/negatives in pattern matching
  • ✅ Pub/Sub latency <100ms

Phase 3 (Monitoring)

  • ✅ Developers can monitor progress in real-time
  • ✅ Developers can debug failures via streams
  • ✅ Clear separation of concerns (progress vs failures)

Troubleshooting

OTEL Logs Not Appearing in Dash0

bash
# Check initialization
docker logs basic-machine-local | grep OTEL

# Verify environment variables
docker exec basic-machine-local env | grep -E 'TELEMETRY|DASH0'

# Test OTLP endpoint connectivity
docker exec basic-machine-local curl -v https://YOUR_DASH0_ENDPOINT

Progress Not Appearing in Pub/Sub

bash
# Check Redis streaming initialization
docker logs basic-machine-local | grep RedisLogStream

# Verify pattern matching
# Add debug logging to LogPatternMatcher.is_progress()

# Monitor Redis directly
redis-cli MONITOR | grep progress

Failures Not Appearing in Streams

bash
# Check if job_id is set
docker logs basic-machine-local | grep COMFYUI_JOB_ID

# Verify stream exists
redis-cli XLEN job:events:YOUR_JOB_ID

# Check pattern matching
# Add debug logging to LogPatternMatcher.is_failure()

Future Enhancements

  1. Dynamic Pattern Updates: Allow pattern updates without restarting
  2. ML-Based Classification: Use ML to classify logs instead of regex
  3. Structured Logging: Parse ComfyUI logs into structured fields
  4. Performance Metrics: Track log processing latency and throughput
  5. Alert Rules: Configure alerts based on failure patterns in Dash0

References

Released under the MIT License.