Skip to content

ADR: Standardized Connector Error Handling

Status: Accepted

Date: 2025-10-16

Deciders: Engineering Team

Context: Production timeout issues, inconsistent error messages, and lack of structured error classification across 19+ worker connectors.


Problem Statement

Worker connectors currently handle errors inconsistently, leading to:

  1. Client Timeouts: Errors logged but not propagated, causing 3-minute client timeouts
  2. Poor Debugging: Error messages vary by connector, making production debugging difficult
  3. No Classification: Errors lack structure (type, reason, retryability)
  4. Lost Context: HTTP status codes, request IDs, and forensic data not captured
  5. Component Error Wrapping: flexible_prompt and other components wrap errors, obscuring root cause
  6. Code Duplication: Every connector reimplements HTTP/WebSocket error handling

Impact:

  • MTTR (Mean Time To Resolution): 4+ hours for production errors
  • Client experience: Unclear error messages, unnecessary timeouts
  • Developer experience: Difficult to add new connectors consistently

Decision

Implement a three-layer error handling inheritance model where ALL connectors benefit from standardized error handling:

Layer 1: BaseConnector
  ↓ Catches ALL errors
  ↓ Auto-classifies with FailureClassifier
  ↓ Unwraps component errors
  ↓ Creates structured JobResult

Layer 2: Protocol Connectors (HTTPConnector, WebSocketConnector)
  ↓ Protocol-specific error handling
  ↓ Maps status codes/connection errors
  ↓ Handles retries and rate limits

Layer 3: Service Connectors (OpenAITextConnector, ComfyUIConnector, etc.)
  ↓ ONLY business logic
  ↓ Throws ConnectorError for failures
  ↓ Inherits all error handling

Core Components

  1. ConnectorError Class (packages/core/src/types/connector-errors.ts)

    • Structured error type with classification
    • Auto-unwraps component errors
    • Preserves forensic context
    • Determines retryability
  2. FailureClassifier (Enhanced)

    • Two-tier taxonomy: FailureType + FailureReason
    • 50+ common error patterns
    • Component error unwrapping
    • Auto-classification from error messages
  3. BaseConnector (Updated)

    • Mandatory processJob() error handling
    • Automatic error classification
    • Structured JobResult creation
    • Status reporting
  4. Protocol Layers (New)

    • HTTPConnector: HTTP status codes, rate limits, auth
    • WebSocketConnector: Connection, reconnection, timeouts

Consequences

Positive

Immediate Error Propagation: No more client timeouts ✅ Consistent Error Messages: Same structure across all 19+ connectors ✅ Better Debugging: Structured classification + forensic data ✅ Code Reduction: 70-80% less code per connector ✅ Easy to Add Connectors: Inherit from protocol layer, just implement business logic ✅ Component Error Handling: Automatic unwrapping of flexible_prompt and other component errors ✅ Centralized Bug Fixes: Fix once in base class, all connectors benefit

Negative

⚠️ Migration Effort: Existing connectors need updates (2-3 days per protocol layer) ⚠️ Breaking Changes: Existing error formats may change (mitigated by backwards compatibility) ⚠️ Learning Curve: Developers need to understand ConnectorError structure

Neutral

🔵 Testing Requirements: Comprehensive tests for error handling 🔵 Documentation: Need clear examples of error handling 🔵 Monitoring: New error metrics and dashboards


Detailed Design

1. ConnectorError Class

Location: packages/core/src/types/connector-errors.ts

Interface:

typescript
export class ConnectorError extends Error {
  constructor(
    public readonly failureType: FailureType,
    public readonly failureReason: FailureReason,
    message: string,
    public readonly retryable: boolean = true,
    public readonly context?: {
      httpStatus?: number;
      serviceJobId?: string;
      rawRequest?: unknown;
      rawResponse?: unknown;
      requestedMimeType?: string;
      actualMimeType?: string;
      componentName?: string;      // flexible_prompt1
      componentError?: string;      // Original wrapped error
      retryAfterSeconds?: number;   // For rate limits
    }
  ) {
    super(message);
    this.name = 'ConnectorError';
  }

  // Auto-classify and unwrap component errors
  static fromError(error: Error | string, serviceType?: string): ConnectorError;

  // Get user-friendly description
  getDescription(): string;

  // Check if error is component-wrapped
  get isComponentWrapped(): boolean;
}

Key Methods:

typescript
// Example: Auto-classification
const error = ConnectorError.fromError('Rate limit exceeded', 'openai');
// → failureType: RATE_LIMIT
// → failureReason: REQUESTS_PER_MINUTE
// → retryable: true

// Example: Component unwrapping
const wrapped = "Error in component 'flexible_prompt1': Safety filter blocked request";
const error = ConnectorError.fromError(wrapped, 'openai');
// → message: "Safety filter blocked request" (unwrapped!)
// → failureType: GENERATION_REFUSAL
// → failureReason: SAFETY_FILTER
// → context.componentName: "flexible_prompt1"
// → retryable: false

2. BaseConnector Error Handling

Location: apps/worker/src/connectors/base-connector.ts

Mandatory Implementation:

typescript
export abstract class BaseConnector {
  // ⚠️ FINAL - Cannot be overridden
  async processJob(jobData: JobData, progressCallback: ProgressCallback): Promise<JobResult> {
    const startTime = Date.now();

    try {
      await this.updateConnectorStatus('active');

      // Call subclass implementation
      const result = await this.processJobInternal(jobData, progressCallback);

      await this.updateConnectorStatus('idle');
      return result;

    } catch (error) {
      await this.updateConnectorStatus('error', error.message);

      // ✅ AUTOMATIC ERROR HANDLING - NO CONNECTOR NEEDS TO IMPLEMENT THIS
      return this.handleConnectorError(error, jobData, startTime);
    }
  }

  // ⚠️ PRIVATE - Connectors cannot override
  private handleConnectorError(
    error: any,
    jobData: JobData,
    startTime: number
  ): JobResult {
    // Convert to ConnectorError (auto-classifies)
    const connectorError = error instanceof ConnectorError
      ? error
      : ConnectorError.fromError(error, this.service_type);

    // Log with structured data
    this.connectorLogger.logError(connectorError, jobData);

    // Return structured failure
    return {
      success: false,
      error: connectorError.message,
      processing_time_ms: Date.now() - startTime,
      failure_type: connectorError.failureType,
      failure_reason: connectorError.failureReason,
      failure_description: connectorError.getDescription(),
      raw_service_output: connectorError.context?.rawResponse,
      raw_service_request: connectorError.context?.rawRequest,
      requested_mime_type: connectorError.context?.requestedMimeType,
      actual_mime_type: connectorError.context?.actualMimeType,
      component_name: connectorError.context?.componentName,
      retryable: connectorError.retryable
    };
  }

  // ✅ MUST BE IMPLEMENTED by all connectors
  protected abstract processJobInternal(
    jobData: JobData,
    progressCallback: ProgressCallback
  ): Promise<JobResult>;
}

Contract for Subclasses:

  1. MUST implement processJobInternal()
  2. MUST throw ConnectorError for failures (or any Error - will be auto-converted)
  3. MUST return JobResult with success: true on success
  4. MUST NOT override processJob() (it's final)
  5. MUST NOT catch errors without re-throwing (prevents base class handling)

3. HTTPConnector Protocol Layer

Location: apps/worker/src/connectors/http-connector.ts

Purpose: Handle HTTP-specific errors once for all HTTP-based connectors

Implementation:

typescript
export abstract class HTTPConnector extends BaseConnector {
  // Shared HTTP execution with error handling
  protected async executeHTTPRequest(config: AxiosRequestConfig): Promise<AxiosResponse> {
    try {
      return await axios.request(config);
    } catch (error) {
      throw this.mapHTTPError(error);
    }
  }

  // Maps HTTP errors to ConnectorError
  private mapHTTPError(error: any): ConnectorError {
    if (error.response) {
      const status = error.response.status;
      const retryAfter = error.response.headers['retry-after'];

      // Authentication errors (401, 403)
      if (status === 401 || status === 403) {
        return new ConnectorError(
          FailureType.AUTH_ERROR,
          FailureReason.INVALID_API_KEY,
          `Authentication failed: ${error.message}`,
          false,
          { httpStatus: status, rawResponse: error.response.data }
        );
      }

      // Rate limiting (429)
      if (status === 429) {
        return new ConnectorError(
          FailureType.RATE_LIMIT,
          FailureReason.REQUESTS_PER_MINUTE,
          'Rate limit exceeded',
          true,
          {
            httpStatus: status,
            retryAfterSeconds: retryAfter ? parseInt(retryAfter) : 60
          }
        );
      }

      // Service errors (5xx)
      if (status >= 500) {
        return new ConnectorError(
          FailureType.SERVICE_ERROR,
          FailureReason.SERVICE_UNAVAILABLE,
          `Service error: ${error.message}`,
          true,
          { httpStatus: status }
        );
      }
    }

    // Network errors
    if (error.code === 'ETIMEDOUT' || error.code === 'ECONNREFUSED') {
      return new ConnectorError(
        FailureType.NETWORK_ERROR,
        FailureReason.CONNECTION_FAILED,
        `Network error: ${error.message}`,
        true
      );
    }

    // Unknown - let FailureClassifier handle it
    return ConnectorError.fromError(error, this.service_type);
  }

  // Subclasses implement these
  protected abstract buildRequestConfig(jobData: JobData): AxiosRequestConfig;
  protected abstract parseResponse(response: AxiosResponse): JobResult;

  // Default implementation calls buildRequestConfig → executeHTTPRequest → parseResponse
  protected async processJobInternal(
    jobData: JobData,
    progressCallback: ProgressCallback
  ): Promise<JobResult> {
    const config = this.buildRequestConfig(jobData);
    const response = await this.executeHTTPRequest(config);
    return this.parseResponse(response);
  }
}

Contract for HTTP-based Connectors:

  1. MUST extend HTTPConnector
  2. MUST implement buildRequestConfig() - construct HTTP request
  3. MUST implement parseResponse() - parse HTTP response
  4. MAY override processJobInternal() for custom logic
  5. MAY throw ConnectorError in parseResponse() for validation errors

4. WebSocketConnector Protocol Layer

Location: apps/worker/src/connectors/websocket-connector.ts

Purpose: Handle WebSocket-specific errors once for all WebSocket-based connectors

Implementation:

typescript
export abstract class WebSocketConnector extends BaseConnector {
  protected ws?: WebSocket;
  protected reconnectAttempts = 0;
  protected maxReconnectAttempts = 3;

  // Shared WebSocket connection with error handling
  protected async connect(url: string): Promise<void> {
    return new Promise((resolve, reject) => {
      try {
        this.ws = new WebSocket(url);

        this.ws.on('open', () => resolve());

        this.ws.on('error', (error) => {
          reject(new ConnectorError(
            FailureType.NETWORK_ERROR,
            FailureReason.CONNECTION_FAILED,
            `WebSocket connection failed: ${error.message}`,
            this.reconnectAttempts < this.maxReconnectAttempts
          ));
        });

        this.ws.on('close', () => {
          if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnect(url);
          }
        });

      } catch (error) {
        reject(ConnectorError.fromError(error, this.service_type));
      }
    });
  }

  // Auto-reconnection logic
  private async reconnect(url: string): Promise<void> {
    this.reconnectAttempts++;
    await new Promise(resolve => setTimeout(resolve, 1000 * this.reconnectAttempts));
    await this.connect(url);
  }

  // Subclasses implement these
  protected abstract sendMessage(message: any): Promise<void>;
  protected abstract waitForResponse(timeout: number): Promise<any>;
}

Contract for WebSocket-based Connectors:

  1. MUST extend WebSocketConnector
  2. MUST implement sendMessage() - send data via WebSocket
  3. MUST implement waitForResponse() - wait for WebSocket response
  4. MUST call connect() before sending messages
  5. MAY override reconnection logic

Enforcement Mechanisms

1. TypeScript Compilation Enforcement

Abstract Methods MUST Be Implemented:

typescript
// ✅ This will NOT compile without processJobInternal()
class MyNewConnector extends BaseConnector {
  // ERROR: Non-abstract class must implement processJobInternal()
}

// ✅ This WILL compile
class MyNewConnector extends BaseConnector {
  protected async processJobInternal(jobData, progressCallback): Promise<JobResult> {
    // Implementation required
  }
}

2. Runtime Validation

Add to BaseConnector constructor:

typescript
constructor(connectorId: string, config?: Partial<ConnectorConfig>) {
  super();

  // Ensure subclass implements required methods
  if (this.processJobInternal === BaseConnector.prototype.processJobInternal) {
    throw new Error(
      `Connector ${this.constructor.name} must implement processJobInternal(). ` +
      'See ADR: connector-error-handling-standard.md for requirements.'
    );
  }
}

3. Linting Rules

ESLint Rule (.eslintrc.js):

javascript
rules: {
  // Enforce ConnectorError usage in connectors
  'no-restricted-syntax': [
    'error',
    {
      selector: 'ThrowStatement > NewExpression[callee.name="Error"]',
      message: 'Use ConnectorError instead of Error in connector classes. See ADR: connector-error-handling-standard.md'
    }
  ]
}

4. Code Review Checklist

For New Connectors:

  • [ ] Extends appropriate base class (HTTPConnector or WebSocketConnector)
  • [ ] Implements required abstract methods
  • [ ] Throws ConnectorError (not generic Error)
  • [ ] Includes context in ConnectorError (rawRequest, rawResponse, etc.)
  • [ ] No try/catch blocks that swallow errors
  • [ ] No error logging without re-throwing
  • [ ] Includes unit tests for error scenarios

5. Connector Template

Create: apps/worker/src/connectors/TEMPLATE.ts

typescript
/**
 * Template for HTTP-based connectors
 *
 * REQUIREMENTS (see ADR: connector-error-handling-standard.md):
 * 1. Extend HTTPConnector (for HTTP) or WebSocketConnector (for WebSocket)
 * 2. Implement buildRequestConfig() and parseResponse()
 * 3. Throw ConnectorError for failures (with context)
 * 4. Return JobResult with success: true on success
 * 5. DO NOT override processJob() or processJobInternal()
 * 6. DO NOT catch errors without re-throwing
 */

import { HTTPConnector } from './http-connector.js';
import { JobData, JobResult, ProgressCallback } from '@emp/core';
import { ConnectorError, FailureType, FailureReason } from '@emp/core';
import { AxiosRequestConfig, AxiosResponse } from 'axios';

export class TemplateConnector extends HTTPConnector {
  service_type = 'template';
  version = '1.0.0';

  constructor(connectorId: string, config?: any) {
    super(connectorId, config);
    // Connector-specific initialization
  }

  // ✅ REQUIRED: Build HTTP request
  protected buildRequestConfig(jobData: JobData): AxiosRequestConfig {
    return {
      url: `${this.baseURL}/endpoint`,
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      data: {
        // Map jobData.payload to service-specific format
      }
    };
  }

  // ✅ REQUIRED: Parse HTTP response
  protected parseResponse(response: AxiosResponse): JobResult {
    // Validate response
    if (!response.data || !response.data.result) {
      throw new ConnectorError(
        FailureType.RESPONSE_ERROR,
        FailureReason.MISSING_EXPECTED_DATA,
        'Service did not return expected data',
        false, // Not retryable
        {
          rawResponse: response.data,
          requestedMimeType: 'application/json',
          actualMimeType: response.headers['content-type']
        }
      );
    }

    // Return successful result
    return {
      success: true,
      data: response.data.result,
      processing_time_ms: Date.now() - this.startTime
    };
  }
}

Migration Strategy

Phase 1: Foundation (Days 1-3)

  1. Create ConnectorError class
  2. Update FailureClassifier for component unwrapping
  3. Update BaseConnector.processJob() with error handling
  4. All existing connectors automatically get error handling

Phase 2: Protocol Layers (Days 4-5)

  1. Create HTTPConnector base class
  2. Create WebSocketConnector base class
  3. Migrate ONE connector to prove concept
  4. Document migration guide

Phase 3: Connector Migration (Days 6-8)

  1. Migrate HTTP-based connectors (OpenAI, Ollama, Gemini)
  2. Migrate WebSocket-based connectors (ComfyUI, A1111)
  3. Update tests for each migrated connector

Phase 4: Enforcement (Days 9-10)

  1. Add linting rules
  2. Create connector template
  3. Update documentation
  4. Add code review checklist

Monitoring & Metrics

Error Classification Metrics

Track in monitoring dashboard:

  • Error rate by failure_type
  • Error rate by failure_reason
  • Retryable vs non-retryable ratio
  • Component error wrapping frequency
  • Top error sources (by connector)
  • Error resolution time

Success Criteria

Week 1:

  • ✅ 100% of errors have failure_type and failure_reason
  • ✅ 0% client timeouts from error propagation issues
  • ✅ 5+ connectors migrated to protocol layers

Month 1:

  • ✅ 95%+ error classification accuracy (manual audit)
  • ✅ 80%+ code reduction in migrated connectors
  • ✅ 50% reduction in MTTR for production errors
  • ✅ All new connectors use protocol layer inheritance

Documentation Requirements

Required Documentation

  1. Migration Guide (docs/connector-migration-guide.md)

    • How to migrate existing connector
    • Before/after code examples
    • Common pitfalls
  2. Connector Development Guide (docs/connector-development-guide.md)

    • How to create new connector
    • Using the template
    • Error handling best practices
  3. Error Handling Reference (docs/error-handling-reference.md)

    • Complete list of FailureTypes and FailureReasons
    • User-friendly descriptions
    • When to use each type
  4. Testing Guide (docs/connector-testing-guide.md)

    • How to test error scenarios
    • Mock error responses
    • Integration test examples

Rollback Plan

If critical issues arise:

  1. Revert BaseConnector changes - connectors continue working with old error handling
  2. Keep ConnectorError class - no harm, just unused
  3. Pause migrations - migrated connectors can be reverted individually

Low Risk: Changes are additive and backwards compatible. Connectors don't need to change to benefit from BaseConnector improvements.


Alternatives Considered

Alternative 1: Per-Connector Error Handling

Rejected because:

  • Code duplication across 19+ connectors
  • Inconsistent error messages
  • No standardization
  • Hard to add new connectors

Alternative 2: Middleware/Decorator Pattern

Rejected because:

  • More complex than inheritance
  • Harder to enforce
  • TypeScript support challenging
  • Steeper learning curve

Alternative 3: Global Error Handler

Rejected because:

  • Loses connector-specific context
  • Can't map HTTP status codes properly
  • No type safety
  • Hard to test

References


Approval

Approved by: [Engineering Team]

Date: 2025-01-16

Next Review: 2025-02-16 (after 1 month of production usage)

Released under the MIT License.