Global Error Enhancement Service - ADR
Status: Proposed Date: 2025-12-03 Author: System Architecture Supersedes: None Related: 2025-11-07-error-handling-modernization.md, 2025-01-09-redis-driven-error-classification.md
Context
The emp-job-queue system processes jobs across multiple connectors (ComfyUI, Ollama, OpenAI, etc.). When errors occur, they are propagated to users via the workflow.failed webhook. Currently:
Current State:
- ComfyUI has
ComfyUIErrorEnhancerin@emp/corefor TypeScript-side error enrichment - Error patterns stored in Redis (
error_patterns:global,error_patterns:{connector}) - Monitor UI at
/error-patternsallows CRUD on patterns - Pattern matching is connector-specific (only ComfyUI uses it)
Bridge State (as of 2025-12-03)
A temporary bridge implementation exists to flow error data from workers to webhooks. This is not the target architecture but allows error data to flow while the ErrorEnhancementService is being built.
Bridge Implementation - How Error Data Currently Flows:
Worker: ConnectorError.toJSON() → publishes event.error as object (not string)
↓
apps/api/src/grpc/services/job-client-service.ts
- mapChannelEventToJobUpdate() detects ConnectorError object via isConnectorErrorObject()
- Extracts ConnectorError as error_details, message as error string
- Puts error_details in update.result.data.error_details
↓
apps/emprops-api/src/clients/grpc-job-client.ts
- Receives update.result with data.error_details
- Attaches as error.jobResult on thrown Error
↓
apps/emprops-api/src/lib/workflows.ts
- Extracts from error.jobResult.data.error_details
- Returns in errorResponse({ details: errorDetails })
↓
apps/emprops-api/src/modules/art-gen/nodes-v2/nodes/workflow.ts
- Extracts from result.details (which IS the ConnectorError.toJSON())
- Creates StepFailedError with fields mapped from ConnectorError structure:
- failureType, failureReason from errorDetails
- humanReadableMessage, callToAction from errorDetails.context.suggestion
- patternMatched from errorDetails.context.patternMatched
- exceptionType from errorDetails.context.rawError.exception_type
- nodeId from errorDetails.context.node.id
- connector from errorDetails.context.serviceType
↓
apps/emprops-api/src/routes/generator/v2.ts
- Builds StructuredError from StepFailedError fields
- Sends to Svix webhookBridge Files Modified:
| File | Changes |
|---|---|
apps/api/src/grpc/services/job-client-service.ts | Added isConnectorErrorObject() type guard, extracts ConnectorError as error_details |
apps/emprops-api/src/clients/grpc-job-client.ts | Attaches jobResult to thrown error |
apps/emprops-api/src/lib/workflows.ts | Extracts from jobResult.data.error_details |
apps/emprops-api/src/modules/art-gen/nodes-v2/nodes/workflow.ts | Maps ConnectorError fields to StepFailedError |
apps/emprops-api/src/lib/errors.ts | Added rich error fields to StepFailedError |
apps/emprops-api/src/routes/generator/v2.ts | Builds StructuredError, maps failureType to category |
apps/emprops-api/src/lib/miniapp-webhook.ts | Defined StructuredError interface |
Why This Is A Bridge (Not Target):
- Ad-hoc extraction at each layer - Every file has its own extraction logic
- Field names differ - ConnectorError uses
failureType, StepFailedError usesfailureType, StructuredError usescategory - No single source of truth - Normalization happens across 5+ files
- Fragile - Adding a new field requires changes in multiple places
- No error saving - Errors are not persisted for pattern creation
Target State (This ADR):
- Single
ErrorEnhancementService.processError()in worker - Returns
NormalizedErrorwith guaranteed structure - All downstream layers pass through unchanged
- Errors saved to Redis for pattern creation workflow
Problems:
- No centralized error saving: Raw errors are not persisted, making it impossible to create patterns from production errors
- Connector-specific enhancement: Only ComfyUI has an enhancer; other connectors get generic errors
- Pattern creation is manual: Must manually transcribe error messages to create patterns
- No feedback loop: Cannot track which errors need enhancement vs. which are already good
User Request:
"Any error from any worker will pass through this before being sent to job-api / emprops. We should save every error here then we can take the saved errors and create enhancements for the next time it shows up."
Key Requirements:
- Normalized Error Format: Every error must conform to a consistent structure across the entire application
- Pattern Override: If an error matches a pattern with an enhancer, always apply the enhancement (even if error already has human_readable - it might be wrong)
- Save Everything: Persist all errors for pattern creation workflow
- Fatal vs Non-Fatal: Errors must indicate whether they are fatal (no retry) or non-fatal (retry if attempts remain)
Understanding Failure Types
This service handles step-level failures - when EmProps sends a job (a step in a workflow), and that job fails on the worker side:
EmProps Workflow
└── Step 1 (Job A) ✓
└── Step 2 (Job B) ← FAILS HERE (this is what we capture)
└── Step 3 (Job C) (never runs if Step 2 is fatal)Fatal vs Non-Fatal:
| Type | fatal | Behavior |
|---|---|---|
| Fatal | true | Don't retry. Error is unrecoverable (e.g., invalid input, missing model, auth failure) |
| Non-Fatal | false | Retry if retry attempts remain. Error may be transient (e.g., timeout, rate limit, temporary unavailability) |
Important: The actual retry logic is handled at the EmProps API level, not here. This service just provides the fatal boolean so EmProps can make the decision.
Decision
Implement a Global Error Enhancement Service in @emp/core that:
- Normalizes all errors into a consistent
NormalizedErrorformat - Enhances errors by matching against Redis patterns (pattern match always overrides existing human_readable)
- Saves ALL errors to Redis for pattern creation workflow
- All workers inherit and use automatically
- Provides UI workflow to create patterns from saved errors
Normalized Error Format
Every error flowing through the system MUST conform to this structure:
/**
* The canonical error format used across the entire emp-job-queue system.
* All errors are normalized to this format before being sent to job-api/webhooks.
*/
export interface NormalizedError {
/** Error classification code (e.g., 'COMFYUI_NODE_ERROR', 'MODEL_NOT_FOUND') */
code: string;
/** Original raw error message from the source */
message: string;
/** Human-friendly explanation of what went wrong */
human_readable_message: string;
/** Actionable guidance for the user to resolve the issue */
call_to_action?: string;
/**
* Whether this error is fatal (unrecoverable).
* - true: Don't retry - error is permanent (invalid input, missing model, auth failure)
* - false: Retry if attempts remain - error may be transient (timeout, rate limit)
*
* Note: Actual retry logic is handled by EmProps API, not here.
*/
fatal: boolean;
/** Source connector that produced the error */
connector: string;
/** Error category for grouping/filtering */
category: 'validation' | 'resource' | 'timeout' | 'authentication' | 'rate_limit' | 'internal' | 'unknown';
/** Optional structured details for debugging */
details?: {
/** Exception type if available (e.g., 'ValueError', 'FileNotFoundError') */
exception_type?: string;
/** Node that failed (ComfyUI-specific) */
node_type?: string;
/** Model that was missing/failed */
model_name?: string;
/** Stack trace if available */
stack?: string;
/** Any additional context */
[key: string]: unknown;
};
/** Timestamp when error occurred */
timestamp: string;
/** Pattern ID that matched (if enhanced via pattern) */
matched_pattern_id?: string;
}Error Code Convention:
- Format:
{CONNECTOR}_{CATEGORY}_{SPECIFIC}(e.g.,COMFYUI_NODE_MISSING_MODEL) - All uppercase with underscores
- Prefix with connector name for connector-specific errors
- Use
GLOBAL_prefix for cross-connector errors
Category Definitions:
| Category | Description | Example |
|---|---|---|
validation | Input/parameter validation failures | Invalid prompt, bad dimensions |
resource | Missing or unavailable resources | Model not found, file missing |
timeout | Operation exceeded time limits | ComfyUI execution timeout |
authentication | Auth/permission failures | Invalid API key |
rate_limit | Rate limiting triggered | Too many requests |
internal | Internal system errors | Worker crash, unexpected exception |
unknown | Unclassified errors | Fallback category |
Example NormalizedError (pattern-enhanced):
{
"code": "MISSING_REQUIRED_FIELD",
"message": "Missing required field 'url' in node emprops_image_loader",
"human_readable_message": "Your workflow is missing a required image URL. The 'emprops_image_loader' node needs an image to process.",
"call_to_action": "Add the 'url' parameter to your workflow configuration.",
"fatal": true,
"connector": "comfyui",
"category": "validation",
"details": {
"exception_type": "KeyError",
"node_type": "emprops_image_loader",
"node_id": "66"
},
"timestamp": "2025-12-03T15:30:45.123Z",
"matched_pattern_id": "KeyError.MissingField"
}Architecture Overview
┌─────────────────────────────────────────────────────────────────────────┐
│ Worker Connectors │
│ ComfyUI │ Ollama │ OpenAI │ Gemini │ Glif │ Custom... │
│ │
│ Each connector catches errors (any format: Error, ConnectorError, etc) │
└────────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────────┐
│ ErrorEnhancementService (@emp/core) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 1. NORMALIZE: Convert any error to NormalizedError format │ │
│ │ → Extract message, exception_type, context from source error │ │
│ │ → Infer category from error type/message if possible │ │
│ │ → Generate error code based on connector + category │ │
│ │ → Set defaults: retryable=false, category='unknown' │ │
│ │ │ │
│ │ 2. ENHANCE: Match against Redis patterns (ALWAYS runs) │ │
│ │ → Load patterns from error_patterns:global + error_patterns:X │ │
│ │ → Match by priority order (contains/regex/exact) │ │
│ │ → If match found: OVERRIDE human_readable, call_to_action, │ │
│ │ retryable, category (pattern is source of truth) │ │
│ │ │ │
│ │ 3. SAVE: Persist NormalizedError to Redis for pattern creation │ │
│ │ → Key: saved_errors:{id} │ │
│ │ → TTL: 7 days (configurable) │ │
│ │ → Index by connector and timestamp for efficient querying │ │
│ │ │ │
│ │ 4. RETURN: NormalizedError (consistent structure guaranteed) │ │
│ └────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────────┐
│ Worker → job-api → Svix │
│ NormalizedError flows through pipeline with guaranteed structure │
│ workflow.failed webhook delivers consistent error format to EmProps │
└─────────────────────────────────────────────────────────────────────────┘Key Design Decision: Pattern Match Always Overrides
If an error matches a pattern, the pattern's enhancement is ALWAYS applied, even if the error already had a human_readable_message. This is intentional:
- Source of Truth: Patterns are curated by operators who understand the user experience
- Override Bad Messages: Connector-level messages may be technically accurate but user-unfriendly
- Consistency: Same error always produces same user-facing message
- Iterative Improvement: Operators can fix bad messages by adding/updating patterns
Pattern Creation Workflow (Monitor UI)
┌─────────────────────────────────────────────────────────────────────────┐
│ Monitor UI: /error-patterns │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Tab 1: Error Patterns (existing) │ │
│ │ - View/edit/delete existing patterns │ │
│ │ - Create new pattern manually │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Tab 2: Saved Errors (NEW) │ │
│ │ - View all saved errors from last 7 days │ │
│ │ - Filter by: connector, classification, enhanced/not enhanced │ │
│ │ - Click error → "Create Pattern from this Error" │ │
│ │ - Auto-populates pattern form with: │ │
│ │ - Pattern text extracted from error │ │
│ │ - Suggested match_type (contains/regex) │ │
│ │ - Connector scope │ │
│ │ - Example message (the saved error) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘Detailed Design
1. ErrorEnhancementService Class
Location: packages/core/src/errors/error-enhancement-service.ts
import { Redis } from 'ioredis';
import { ErrorPattern, CompiledPattern, ERROR_PATTERN_KEYS } from '../types/error-patterns.js';
/**
* Input error - can be any error format from connectors
*/
export interface ErrorInput {
/** Error message */
message: string;
/** Exception type if available */
exception_type?: string;
/** Any existing context */
context?: Record<string, any>;
/** Stack trace if available */
stack?: string;
}
/**
* The canonical error format - ALWAYS returned by the service
*/
export interface NormalizedError {
/** Error classification code */
code: string;
/** Original raw error message */
message: string;
/** Human-friendly explanation */
human_readable_message: string;
/** Actionable guidance */
call_to_action?: string;
/** Whether error is fatal (unrecoverable) - true means don't retry */
fatal: boolean;
/** Source connector */
connector: string;
/** Error category */
category: 'validation' | 'resource' | 'timeout' | 'authentication' | 'rate_limit' | 'internal' | 'unknown';
/** Structured details */
details?: {
exception_type?: string;
node_type?: string;
model_name?: string;
stack?: string;
[key: string]: unknown;
};
/** Timestamp */
timestamp: string;
/** Pattern ID if enhanced */
matched_pattern_id?: string;
}
/**
* Saved error record for pattern creation workflow
*/
export interface SavedError extends NormalizedError {
/** Unique ID */
id: string;
/** Job ID for context */
job_id?: string;
/** Workflow ID for context */
workflow_id?: string;
/** Whether a pattern match was applied */
pattern_enhanced: boolean;
/** Created timestamp */
created_at: string;
}
export interface EnhancementResult {
/** Normalized error (ALWAYS in canonical format) */
error: NormalizedError;
/** Whether a pattern was matched and applied */
patternMatched: boolean;
/** Pattern that matched (if any) */
matchedPattern?: CompiledPattern;
/** Saved error ID for reference */
savedErrorId: string;
}
export class ErrorEnhancementService {
private redis: Redis;
private patternCache: Map<string, CompiledPattern[]> = new Map();
private patternVersion: string = '';
private readonly SAVED_ERROR_TTL = 7 * 24 * 60 * 60; // 7 days
constructor(redis: Redis) {
this.redis = redis;
}
/**
* Normalize, enhance, and save an error.
*
* ALWAYS returns a NormalizedError - guaranteed consistent structure.
* If a pattern matches, it OVERRIDES any existing human_readable_message.
*
* @param error - The error to process (any format)
* @param connector - Connector type (comfyui, ollama, etc.)
* @param context - Additional context (job_id, workflow_id)
*/
async processError(
error: ErrorInput,
connector: string,
context?: { job_id?: string; workflow_id?: string }
): Promise<EnhancementResult> {
// Step 1: NORMALIZE - Convert to canonical format with defaults
const normalized = this.normalize(error, connector);
// Step 2: ENHANCE - Always try to match patterns (pattern overrides existing values)
let patternMatched = false;
let matchedPattern: CompiledPattern | undefined;
const pattern = await this.matchPattern(error.message, connector);
if (pattern) {
matchedPattern = pattern;
patternMatched = true;
// Pattern OVERRIDES - this is intentional
normalized.human_readable_message = pattern.human_readable_message;
normalized.matched_pattern_id = pattern.id;
if (pattern.call_to_action) {
normalized.call_to_action = pattern.call_to_action;
}
if (pattern.fatal !== undefined) {
normalized.fatal = pattern.fatal;
}
if (pattern.category) {
normalized.category = pattern.category;
}
}
// Step 3: SAVE - Persist for pattern creation workflow
const savedErrorId = await this.saveError(normalized, context, patternMatched);
return {
error: normalized,
patternMatched,
matchedPattern,
savedErrorId,
};
}
/**
* Normalize any error input to canonical NormalizedError format.
* Even without pattern matching, returns a valid structure with defaults.
*/
private normalize(error: ErrorInput, connector: string): NormalizedError {
// Infer category from error characteristics
const category = this.inferCategory(error);
// Generate error code
const code = this.generateErrorCode(connector, category, error);
// Infer fatal based on category (can be overridden by pattern)
const fatal = this.inferFatal(category);
// Default human_readable is the raw message (will be overridden if pattern matches)
// This ensures we ALWAYS have something, even for brand new errors
const human_readable_message = error.context?.human_readable_message
|| `An error occurred: ${this.truncateMessage(error.message, 200)}`;
return {
code,
message: error.message,
human_readable_message,
call_to_action: error.context?.call_to_action,
fatal,
connector,
category,
details: {
exception_type: error.exception_type,
stack: error.stack,
...this.extractDetails(error),
},
timestamp: new Date().toISOString(),
};
}
/**
* Infer whether error is fatal based on category.
* Fatal errors should NOT be retried.
*/
private inferFatal(category: NormalizedError['category']): boolean {
switch (category) {
// Fatal - these won't succeed on retry
case 'validation': // Invalid input won't become valid
case 'resource': // Missing model/file won't appear
case 'authentication': // Bad credentials won't become good
return true;
// Non-fatal - these may succeed on retry
case 'timeout': // May complete if given more time
case 'rate_limit': // Will succeed after cooldown
return false;
// Conservative default - treat unknown/internal as fatal
case 'internal':
case 'unknown':
default:
return true;
}
}
/**
* Infer error category from error characteristics.
*/
private inferCategory(error: ErrorInput): NormalizedError['category'] {
const msg = error.message.toLowerCase();
const exc = error.exception_type?.toLowerCase() || '';
// Timeout indicators
if (msg.includes('timeout') || msg.includes('timed out') || exc.includes('timeout')) {
return 'timeout';
}
// Authentication indicators
if (msg.includes('unauthorized') || msg.includes('forbidden') ||
msg.includes('api key') || msg.includes('authentication') ||
exc.includes('auth')) {
return 'authentication';
}
// Rate limit indicators
if (msg.includes('rate limit') || msg.includes('too many requests') ||
msg.includes('429')) {
return 'rate_limit';
}
// Resource indicators
if (msg.includes('not found') || msg.includes('missing') ||
msg.includes('does not exist') || msg.includes('no such file') ||
exc.includes('filenotfound') || exc.includes('notfound')) {
return 'resource';
}
// Validation indicators
if (msg.includes('invalid') || msg.includes('validation') ||
msg.includes('required') || msg.includes('must be') ||
exc.includes('validation') || exc.includes('valueerror')) {
return 'validation';
}
return 'unknown';
}
/**
* Generate error code from connector, category, and error info.
*/
private generateErrorCode(
connector: string,
category: NormalizedError['category'],
error: ErrorInput
): string {
const connectorPrefix = connector.toUpperCase().replace(/-/g, '_');
const categoryPart = category.toUpperCase();
// Try to extract a more specific suffix from exception type
let suffix = 'ERROR';
if (error.exception_type) {
suffix = error.exception_type
.replace(/Error$/, '')
.replace(/Exception$/, '')
.toUpperCase()
.replace(/[^A-Z0-9]/g, '_');
}
return `${connectorPrefix}_${categoryPart}_${suffix}`;
}
/**
* Extract structured details from error context.
*/
private extractDetails(error: ErrorInput): Record<string, unknown> {
const details: Record<string, unknown> = {};
if (error.context) {
// Extract known fields
if (error.context.node_type) details.node_type = error.context.node_type;
if (error.context.model_name) details.model_name = error.context.model_name;
if (error.context.node_id) details.node_id = error.context.node_id;
// Include any other context that might be useful
for (const [key, value] of Object.entries(error.context)) {
if (!['human_readable_message', 'call_to_action'].includes(key)) {
details[key] = value;
}
}
}
return details;
}
/**
* Truncate message for default human_readable.
*/
private truncateMessage(message: string, maxLength: number): string {
if (message.length <= maxLength) return message;
return message.slice(0, maxLength - 3) + '...';
}
/**
* Match error message against patterns for a connector.
*/
private async matchPattern(
message: string,
connector: string
): Promise<CompiledPattern | undefined> {
await this.refreshPatternsIfNeeded();
// Check connector-specific patterns first (higher priority)
const connectorPatterns = this.patternCache.get(connector) || [];
const globalPatterns = this.patternCache.get('global') || [];
// Sort by priority (highest first)
const allPatterns = [...connectorPatterns, ...globalPatterns]
.sort((a, b) => b.priority - a.priority);
for (const pattern of allPatterns) {
if (this.matchesPattern(message, pattern)) {
return pattern;
}
}
return undefined;
}
/**
* Check if message matches a pattern.
*/
private matchesPattern(message: string, pattern: CompiledPattern): boolean {
const normalizedMessage = pattern.caseSensitive
? message
: message.toLowerCase();
const normalizedPattern = pattern.caseSensitive
? pattern.pattern
: pattern.pattern.toLowerCase();
switch (pattern.matchType) {
case 'exact':
return normalizedMessage === normalizedPattern;
case 'contains':
return normalizedMessage.includes(normalizedPattern);
case 'regex':
return pattern.regex?.test(message) ?? false;
default:
return false;
}
}
/**
* Refresh pattern cache if version changed.
*/
private async refreshPatternsIfNeeded(): Promise<void> {
const version = await this.redis.get(ERROR_PATTERN_KEYS.version);
if (version === this.patternVersion) return;
this.patternVersion = version || '';
this.patternCache.clear();
// Load global patterns
await this.loadPatternsForScope('global');
// Discover and load connector-specific patterns
const keys = await this.redis.keys('error_patterns:*');
for (const key of keys) {
if (key.includes('version') || key.includes('analytics')) continue;
const scope = key.replace('error_patterns:', '');
if (scope !== 'global') {
await this.loadPatternsForScope(scope);
}
}
}
/**
* Load and compile patterns for a scope.
*/
private async loadPatternsForScope(scope: string): Promise<void> {
const key = scope === 'global'
? ERROR_PATTERN_KEYS.global
: ERROR_PATTERN_KEYS.connector(scope);
const data = await this.redis.hgetall(key);
const patterns: CompiledPattern[] = [];
for (const [, json] of Object.entries(data)) {
try {
const pattern = JSON.parse(json) as ErrorPattern;
if (!pattern.active) continue;
patterns.push({
pattern: pattern.pattern,
regex: pattern.match_type === 'regex'
? new RegExp(pattern.pattern, pattern.case_sensitive ? '' : 'i')
: null,
matchType: pattern.match_type,
caseSensitive: pattern.case_sensitive,
priority: pattern.priority,
human_readable_message: pattern.human_readable_message,
call_to_action: pattern.call_to_action,
retry: pattern.retry,
is_log_filter_only: pattern.is_log_filter_only,
});
} catch {
// Skip invalid patterns
}
}
this.patternCache.set(scope, patterns);
}
/**
* Save NormalizedError to Redis for pattern creation workflow.
*/
private async saveError(
error: NormalizedError,
context?: { job_id?: string; workflow_id?: string },
patternEnhanced: boolean = false
): Promise<string> {
const id = `${error.connector}_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
const key = `saved_errors:${id}`;
const savedError: SavedError = {
...error,
id,
job_id: context?.job_id,
workflow_id: context?.workflow_id,
pattern_enhanced: patternEnhanced,
created_at: new Date().toISOString(),
};
await this.redis.set(key, JSON.stringify(savedError), 'EX', this.SAVED_ERROR_TTL);
// Also add to a sorted set for efficient listing by time
await this.redis.zadd(
`saved_errors:index:${error.connector}`,
Date.now(),
id
);
await this.redis.zadd('saved_errors:index:all', Date.now(), id);
// Trim old entries from index (keep last 10000)
await this.redis.zremrangebyrank(`saved_errors:index:${error.connector}`, 0, -10001);
await this.redis.zremrangebyrank('saved_errors:index:all', 0, -10001);
return id;
}
/**
* List saved errors for UI.
*/
async listSavedErrors(options?: {
connector?: string;
category?: NormalizedError['category'];
limit?: number;
offset?: number;
pattern_enhanced_only?: boolean;
needs_pattern_only?: boolean;
}): Promise<{ errors: SavedError[]; total: number }> {
const limit = options?.limit ?? 50;
const offset = options?.offset ?? 0;
const indexKey = options?.connector
? `saved_errors:index:${options.connector}`
: 'saved_errors:index:all';
const total = await this.redis.zcard(indexKey);
const ids = await this.redis.zrevrange(indexKey, offset, offset + limit - 1);
const errors: SavedError[] = [];
for (const id of ids) {
const data = await this.redis.get(`saved_errors:${id}`);
if (data) {
const error = JSON.parse(data) as SavedError;
// Apply filters
if (options?.pattern_enhanced_only && !error.pattern_enhanced) continue;
if (options?.needs_pattern_only && error.pattern_enhanced) continue;
if (options?.category && error.category !== options.category) continue;
errors.push(error);
}
}
return { errors, total };
}
/**
* Get a specific saved error by ID.
*/
async getSavedError(id: string): Promise<SavedError | null> {
const data = await this.redis.get(`saved_errors:${id}`);
return data ? JSON.parse(data) : null;
}
}2. Integration with Worker Base
Modify: apps/worker/src/connectors/base-connector.ts
import { ErrorEnhancementService, NormalizedError, ErrorInput } from '@emp/core';
export abstract class BaseConnector {
protected errorEnhancementService: ErrorEnhancementService;
constructor(redis: Redis) {
this.errorEnhancementService = new ErrorEnhancementService(redis);
}
/**
* Process any error through the enhancement service.
* ALWAYS returns a NormalizedError with guaranteed structure.
*/
protected async processError(
error: Error | ConnectorError | unknown,
jobData: JobData
): Promise<NormalizedError> {
// Convert any error type to ErrorInput
const errorInput: ErrorInput = this.toErrorInput(error);
const result = await this.errorEnhancementService.processError(
errorInput,
this.connectorType,
{
job_id: jobData.id,
workflow_id: jobData.workflow_id,
}
);
return result.error;
}
/**
* Convert any error type to ErrorInput format.
*/
private toErrorInput(error: unknown): ErrorInput {
if (error instanceof Error) {
return {
message: error.message,
stack: error.stack,
exception_type: error.constructor.name,
context: (error as any).context,
};
}
if (typeof error === 'object' && error !== null) {
const obj = error as Record<string, any>;
return {
message: obj.message || String(error),
exception_type: obj.exception_type || obj.type,
context: obj.context || obj,
stack: obj.stack,
};
}
return {
message: String(error),
};
}
}Example Connector Usage:
class ComfyUIConnector extends BaseConnector {
async executeJob(jobData: JobData): Promise<JobResult> {
try {
// ... execute job
return result;
} catch (error) {
// processError ALWAYS returns NormalizedError
const normalizedError = await this.processError(error, jobData);
// Throw with normalized structure - downstream can rely on this format
throw new JobError(normalizedError);
}
}
}3. Monitor API Endpoints
New file: apps/monitor/src/app/api/saved-errors/route.ts
// GET /api/saved-errors
// List saved errors with filtering
export async function GET(request: NextRequest) {
const { redis } = await getMonitorRedisConnection();
const enhancementService = new ErrorEnhancementService(redis);
const params = request.nextUrl.searchParams;
const result = await enhancementService.listSavedErrors({
connector: params.get('connector') || undefined,
category: params.get('category') as NormalizedError['category'] || undefined,
limit: parseInt(params.get('limit') || '50'),
offset: parseInt(params.get('offset') || '0'),
pattern_enhanced_only: params.get('pattern_enhanced_only') === 'true',
needs_pattern_only: params.get('needs_pattern_only') === 'true',
});
return NextResponse.json({ success: true, ...result });
}New file: apps/monitor/src/app/api/saved-errors/[id]/route.ts
// GET /api/saved-errors/:id
// Get single saved error for pattern creation
export async function GET(
request: NextRequest,
{ params }: { params: { id: string } }
) {
const { redis } = await getMonitorRedisConnection();
const enhancementService = new ErrorEnhancementService(redis);
const error = await enhancementService.getSavedError(params.id);
if (!error) {
return NextResponse.json(
{ success: false, error: 'Not found' },
{ status: 404 }
);
}
return NextResponse.json({ success: true, error });
}4. Redis Key Structure
# Existing pattern keys (unchanged)
error_patterns:global → Hash of pattern_id → ErrorPattern JSON
error_patterns:{connector} → Hash of pattern_id → ErrorPattern JSON
error_patterns:version → Version timestamp for cache invalidation
error_patterns:updated → Pub/Sub channel for pattern updates
# New saved error keys
saved_errors:{id} → SavedError JSON (TTL: 7 days)
saved_errors:index:all → Sorted Set (score=timestamp, member=id)
saved_errors:index:{connector} → Sorted Set (score=timestamp, member=id)Implementation Phases
Phase 1: Core Service
- Create
NormalizedErrorinterface in@emp/core/types - Create
ErrorEnhancementServiceclass in@emp/core - Implement
normalize()method with category inference and code generation - Add pattern matching logic (reuse existing pattern compilation)
- Add error saving logic with Redis indexes
- Add
listSavedErrorsandgetSavedErrormethods - Export from
@emp/core
Phase 2: Worker Integration
- Add
errorEnhancementServicetoBaseConnector - Create
processError()method that ALWAYS returnsNormalizedError - Create
toErrorInput()helper to convert any error type - Update ComfyUI connector to use service (merge with existing enhancer logic)
- Update other connectors to call
processError()in catch blocks - Update job failure reporting to use
NormalizedErrorstructure
Phase 3: Monitor API
- Create
/api/saved-errorsendpoint with category filtering - Create
/api/saved-errors/[id]endpoint - Add authentication checks
Phase 4: Monitor UI
- Add "Saved Errors" tab to
/error-patternspage - Create saved error list with filtering by connector, category, pattern status
- Add "Create Pattern from Error" button
- Pre-populate pattern form from saved error (including suggested category)
- Add link from new pattern back to saved error
Consequences
Benefits
- Guaranteed Structure: Every error has the same
NormalizedErrorformat - downstream code can rely on consistent fields - Centralized Enhancement: All connectors benefit from patterns, not just ComfyUI
- Pattern Override: Operators can fix bad error messages by adding patterns - no code changes needed
- Error Visibility: Every error is saved for analysis with full context
- Pattern Creation Workflow: Easy to create patterns from real production errors
- Category Intelligence: Automatic category inference helps with error triage and analytics
- No Duplication: Single service replaces connector-specific enhancers
- Feedback Loop: Can track which errors need patterns vs. which are already good
Drawbacks
- Redis Storage: Saved errors use Redis memory (mitigated by TTL and index limits)
- Migration: Need to update all connectors to use service
- Async Overhead: Enhancement adds Redis calls to error path
Mitigations
- Storage: 7-day TTL + 10k error limit per connector keeps storage bounded
- Migration: Can be done incrementally (connectors without service still work)
- Performance: Pattern cache avoids Redis calls; saving is fire-and-forget
Success Metrics
- 100% error capture: All errors from all connectors saved to Redis
- Pattern coverage: Track % of errors that match patterns over time
- Pattern creation rate: Track patterns created from saved errors vs. manual
- User experience: Reduced "unknown error" messages in production
Alternatives Considered
Alternative 1: Per-Connector Enhancers
Keep current approach with ComfyUIErrorEnhancer, create OllamaErrorEnhancer, etc.
Rejected because:
- Duplicated pattern matching logic
- No centralized error saving
- Harder to manage patterns across connectors
Alternative 2: Database Storage for Errors
Store saved errors in PostgreSQL instead of Redis.
Rejected because:
- Adds database dependency to workers
- Redis is already available everywhere
- 7-day TTL is sufficient for pattern creation workflow
Alternative 3: Pub/Sub for Error Events
Publish errors to a channel, separate service saves them.
Rejected because:
- Adds architectural complexity
- Fire-and-forget in same service is simpler
- No need for real-time error streaming
Related ADRs
- 2025-11-07-error-handling-modernization.md - Connector-agnostic error handling design
- 2025-01-09-redis-driven-error-classification.md - Original Redis pattern classification
- 2025-11-08-database-driven-error-classification.md - Pattern storage evolution
End of ADR
