ADR: Standardized Connector Error Handling
Status: Accepted
Date: 2025-10-16
Deciders: Engineering Team
Context: Production timeout issues, inconsistent error messages, and lack of structured error classification across 19+ worker connectors.
Problem Statement
Worker connectors currently handle errors inconsistently, leading to:
- Client Timeouts: Errors logged but not propagated, causing 3-minute client timeouts
- Poor Debugging: Error messages vary by connector, making production debugging difficult
- No Classification: Errors lack structure (type, reason, retryability)
- Lost Context: HTTP status codes, request IDs, and forensic data not captured
- Component Error Wrapping:
flexible_promptand other components wrap errors, obscuring root cause - Code Duplication: Every connector reimplements HTTP/WebSocket error handling
Impact:
- MTTR (Mean Time To Resolution): 4+ hours for production errors
- Client experience: Unclear error messages, unnecessary timeouts
- Developer experience: Difficult to add new connectors consistently
Decision
Implement a three-layer error handling inheritance model where ALL connectors benefit from standardized error handling:
Layer 1: BaseConnector
↓ Catches ALL errors
↓ Auto-classifies with FailureClassifier
↓ Unwraps component errors
↓ Creates structured JobResult
Layer 2: Protocol Connectors (HTTPConnector, WebSocketConnector)
↓ Protocol-specific error handling
↓ Maps status codes/connection errors
↓ Handles retries and rate limits
Layer 3: Service Connectors (OpenAITextConnector, ComfyUIConnector, etc.)
↓ ONLY business logic
↓ Throws ConnectorError for failures
↓ Inherits all error handlingCore Components
ConnectorError Class (
packages/core/src/types/connector-errors.ts)- Structured error type with classification
- Auto-unwraps component errors
- Preserves forensic context
- Determines retryability
FailureClassifier (Enhanced)
- Two-tier taxonomy: FailureType + FailureReason
- 50+ common error patterns
- Component error unwrapping
- Auto-classification from error messages
BaseConnector (Updated)
- Mandatory
processJob()error handling - Automatic error classification
- Structured JobResult creation
- Status reporting
- Mandatory
Protocol Layers (New)
- HTTPConnector: HTTP status codes, rate limits, auth
- WebSocketConnector: Connection, reconnection, timeouts
Consequences
Positive
✅ Immediate Error Propagation: No more client timeouts ✅ Consistent Error Messages: Same structure across all 19+ connectors ✅ Better Debugging: Structured classification + forensic data ✅ Code Reduction: 70-80% less code per connector ✅ Easy to Add Connectors: Inherit from protocol layer, just implement business logic ✅ Component Error Handling: Automatic unwrapping of flexible_prompt and other component errors ✅ Centralized Bug Fixes: Fix once in base class, all connectors benefit
Negative
⚠️ Migration Effort: Existing connectors need updates (2-3 days per protocol layer) ⚠️ Breaking Changes: Existing error formats may change (mitigated by backwards compatibility) ⚠️ Learning Curve: Developers need to understand ConnectorError structure
Neutral
🔵 Testing Requirements: Comprehensive tests for error handling 🔵 Documentation: Need clear examples of error handling 🔵 Monitoring: New error metrics and dashboards
Detailed Design
1. ConnectorError Class
Location: packages/core/src/types/connector-errors.ts
Interface:
export class ConnectorError extends Error {
constructor(
public readonly failureType: FailureType,
public readonly failureReason: FailureReason,
message: string,
public readonly retryable: boolean = true,
public readonly context?: {
httpStatus?: number;
serviceJobId?: string;
rawRequest?: unknown;
rawResponse?: unknown;
requestedMimeType?: string;
actualMimeType?: string;
componentName?: string; // flexible_prompt1
componentError?: string; // Original wrapped error
retryAfterSeconds?: number; // For rate limits
}
) {
super(message);
this.name = 'ConnectorError';
}
// Auto-classify and unwrap component errors
static fromError(error: Error | string, serviceType?: string): ConnectorError;
// Get user-friendly description
getDescription(): string;
// Check if error is component-wrapped
get isComponentWrapped(): boolean;
}Key Methods:
// Example: Auto-classification
const error = ConnectorError.fromError('Rate limit exceeded', 'openai');
// → failureType: RATE_LIMIT
// → failureReason: REQUESTS_PER_MINUTE
// → retryable: true
// Example: Component unwrapping
const wrapped = "Error in component 'flexible_prompt1': Safety filter blocked request";
const error = ConnectorError.fromError(wrapped, 'openai');
// → message: "Safety filter blocked request" (unwrapped!)
// → failureType: GENERATION_REFUSAL
// → failureReason: SAFETY_FILTER
// → context.componentName: "flexible_prompt1"
// → retryable: false2. BaseConnector Error Handling
Location: apps/worker/src/connectors/base-connector.ts
Mandatory Implementation:
export abstract class BaseConnector {
// ⚠️ FINAL - Cannot be overridden
async processJob(jobData: JobData, progressCallback: ProgressCallback): Promise<JobResult> {
const startTime = Date.now();
try {
await this.updateConnectorStatus('active');
// Call subclass implementation
const result = await this.processJobInternal(jobData, progressCallback);
await this.updateConnectorStatus('idle');
return result;
} catch (error) {
await this.updateConnectorStatus('error', error.message);
// ✅ AUTOMATIC ERROR HANDLING - NO CONNECTOR NEEDS TO IMPLEMENT THIS
return this.handleConnectorError(error, jobData, startTime);
}
}
// ⚠️ PRIVATE - Connectors cannot override
private handleConnectorError(
error: any,
jobData: JobData,
startTime: number
): JobResult {
// Convert to ConnectorError (auto-classifies)
const connectorError = error instanceof ConnectorError
? error
: ConnectorError.fromError(error, this.service_type);
// Log with structured data
this.connectorLogger.logError(connectorError, jobData);
// Return structured failure
return {
success: false,
error: connectorError.message,
processing_time_ms: Date.now() - startTime,
failure_type: connectorError.failureType,
failure_reason: connectorError.failureReason,
failure_description: connectorError.getDescription(),
raw_service_output: connectorError.context?.rawResponse,
raw_service_request: connectorError.context?.rawRequest,
requested_mime_type: connectorError.context?.requestedMimeType,
actual_mime_type: connectorError.context?.actualMimeType,
component_name: connectorError.context?.componentName,
retryable: connectorError.retryable
};
}
// ✅ MUST BE IMPLEMENTED by all connectors
protected abstract processJobInternal(
jobData: JobData,
progressCallback: ProgressCallback
): Promise<JobResult>;
}Contract for Subclasses:
- ✅ MUST implement
processJobInternal() - ✅ MUST throw
ConnectorErrorfor failures (or any Error - will be auto-converted) - ✅ MUST return
JobResultwithsuccess: trueon success - ❌ MUST NOT override
processJob()(it's final) - ❌ MUST NOT catch errors without re-throwing (prevents base class handling)
3. HTTPConnector Protocol Layer
Location: apps/worker/src/connectors/http-connector.ts
Purpose: Handle HTTP-specific errors once for all HTTP-based connectors
Implementation:
export abstract class HTTPConnector extends BaseConnector {
// Shared HTTP execution with error handling
protected async executeHTTPRequest(config: AxiosRequestConfig): Promise<AxiosResponse> {
try {
return await axios.request(config);
} catch (error) {
throw this.mapHTTPError(error);
}
}
// Maps HTTP errors to ConnectorError
private mapHTTPError(error: any): ConnectorError {
if (error.response) {
const status = error.response.status;
const retryAfter = error.response.headers['retry-after'];
// Authentication errors (401, 403)
if (status === 401 || status === 403) {
return new ConnectorError(
FailureType.AUTH_ERROR,
FailureReason.INVALID_API_KEY,
`Authentication failed: ${error.message}`,
false,
{ httpStatus: status, rawResponse: error.response.data }
);
}
// Rate limiting (429)
if (status === 429) {
return new ConnectorError(
FailureType.RATE_LIMIT,
FailureReason.REQUESTS_PER_MINUTE,
'Rate limit exceeded',
true,
{
httpStatus: status,
retryAfterSeconds: retryAfter ? parseInt(retryAfter) : 60
}
);
}
// Service errors (5xx)
if (status >= 500) {
return new ConnectorError(
FailureType.SERVICE_ERROR,
FailureReason.SERVICE_UNAVAILABLE,
`Service error: ${error.message}`,
true,
{ httpStatus: status }
);
}
}
// Network errors
if (error.code === 'ETIMEDOUT' || error.code === 'ECONNREFUSED') {
return new ConnectorError(
FailureType.NETWORK_ERROR,
FailureReason.CONNECTION_FAILED,
`Network error: ${error.message}`,
true
);
}
// Unknown - let FailureClassifier handle it
return ConnectorError.fromError(error, this.service_type);
}
// Subclasses implement these
protected abstract buildRequestConfig(jobData: JobData): AxiosRequestConfig;
protected abstract parseResponse(response: AxiosResponse): JobResult;
// Default implementation calls buildRequestConfig → executeHTTPRequest → parseResponse
protected async processJobInternal(
jobData: JobData,
progressCallback: ProgressCallback
): Promise<JobResult> {
const config = this.buildRequestConfig(jobData);
const response = await this.executeHTTPRequest(config);
return this.parseResponse(response);
}
}Contract for HTTP-based Connectors:
- ✅ MUST extend
HTTPConnector - ✅ MUST implement
buildRequestConfig()- construct HTTP request - ✅ MUST implement
parseResponse()- parse HTTP response - ✅ MAY override
processJobInternal()for custom logic - ✅ MAY throw
ConnectorErrorinparseResponse()for validation errors
4. WebSocketConnector Protocol Layer
Location: apps/worker/src/connectors/websocket-connector.ts
Purpose: Handle WebSocket-specific errors once for all WebSocket-based connectors
Implementation:
export abstract class WebSocketConnector extends BaseConnector {
protected ws?: WebSocket;
protected reconnectAttempts = 0;
protected maxReconnectAttempts = 3;
// Shared WebSocket connection with error handling
protected async connect(url: string): Promise<void> {
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(url);
this.ws.on('open', () => resolve());
this.ws.on('error', (error) => {
reject(new ConnectorError(
FailureType.NETWORK_ERROR,
FailureReason.CONNECTION_FAILED,
`WebSocket connection failed: ${error.message}`,
this.reconnectAttempts < this.maxReconnectAttempts
));
});
this.ws.on('close', () => {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnect(url);
}
});
} catch (error) {
reject(ConnectorError.fromError(error, this.service_type));
}
});
}
// Auto-reconnection logic
private async reconnect(url: string): Promise<void> {
this.reconnectAttempts++;
await new Promise(resolve => setTimeout(resolve, 1000 * this.reconnectAttempts));
await this.connect(url);
}
// Subclasses implement these
protected abstract sendMessage(message: any): Promise<void>;
protected abstract waitForResponse(timeout: number): Promise<any>;
}Contract for WebSocket-based Connectors:
- ✅ MUST extend
WebSocketConnector - ✅ MUST implement
sendMessage()- send data via WebSocket - ✅ MUST implement
waitForResponse()- wait for WebSocket response - ✅ MUST call
connect()before sending messages - ✅ MAY override reconnection logic
Enforcement Mechanisms
1. TypeScript Compilation Enforcement
Abstract Methods MUST Be Implemented:
// ✅ This will NOT compile without processJobInternal()
class MyNewConnector extends BaseConnector {
// ERROR: Non-abstract class must implement processJobInternal()
}
// ✅ This WILL compile
class MyNewConnector extends BaseConnector {
protected async processJobInternal(jobData, progressCallback): Promise<JobResult> {
// Implementation required
}
}2. Runtime Validation
Add to BaseConnector constructor:
constructor(connectorId: string, config?: Partial<ConnectorConfig>) {
super();
// Ensure subclass implements required methods
if (this.processJobInternal === BaseConnector.prototype.processJobInternal) {
throw new Error(
`Connector ${this.constructor.name} must implement processJobInternal(). ` +
'See ADR: connector-error-handling-standard.md for requirements.'
);
}
}3. Linting Rules
ESLint Rule (.eslintrc.js):
rules: {
// Enforce ConnectorError usage in connectors
'no-restricted-syntax': [
'error',
{
selector: 'ThrowStatement > NewExpression[callee.name="Error"]',
message: 'Use ConnectorError instead of Error in connector classes. See ADR: connector-error-handling-standard.md'
}
]
}4. Code Review Checklist
For New Connectors:
- [ ] Extends appropriate base class (HTTPConnector or WebSocketConnector)
- [ ] Implements required abstract methods
- [ ] Throws ConnectorError (not generic Error)
- [ ] Includes context in ConnectorError (rawRequest, rawResponse, etc.)
- [ ] No try/catch blocks that swallow errors
- [ ] No error logging without re-throwing
- [ ] Includes unit tests for error scenarios
5. Connector Template
Create: apps/worker/src/connectors/TEMPLATE.ts
/**
* Template for HTTP-based connectors
*
* REQUIREMENTS (see ADR: connector-error-handling-standard.md):
* 1. Extend HTTPConnector (for HTTP) or WebSocketConnector (for WebSocket)
* 2. Implement buildRequestConfig() and parseResponse()
* 3. Throw ConnectorError for failures (with context)
* 4. Return JobResult with success: true on success
* 5. DO NOT override processJob() or processJobInternal()
* 6. DO NOT catch errors without re-throwing
*/
import { HTTPConnector } from './http-connector.js';
import { JobData, JobResult, ProgressCallback } from '@emp/core';
import { ConnectorError, FailureType, FailureReason } from '@emp/core';
import { AxiosRequestConfig, AxiosResponse } from 'axios';
export class TemplateConnector extends HTTPConnector {
service_type = 'template';
version = '1.0.0';
constructor(connectorId: string, config?: any) {
super(connectorId, config);
// Connector-specific initialization
}
// ✅ REQUIRED: Build HTTP request
protected buildRequestConfig(jobData: JobData): AxiosRequestConfig {
return {
url: `${this.baseURL}/endpoint`,
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
data: {
// Map jobData.payload to service-specific format
}
};
}
// ✅ REQUIRED: Parse HTTP response
protected parseResponse(response: AxiosResponse): JobResult {
// Validate response
if (!response.data || !response.data.result) {
throw new ConnectorError(
FailureType.RESPONSE_ERROR,
FailureReason.MISSING_EXPECTED_DATA,
'Service did not return expected data',
false, // Not retryable
{
rawResponse: response.data,
requestedMimeType: 'application/json',
actualMimeType: response.headers['content-type']
}
);
}
// Return successful result
return {
success: true,
data: response.data.result,
processing_time_ms: Date.now() - this.startTime
};
}
}Migration Strategy
Phase 1: Foundation (Days 1-3)
- Create
ConnectorErrorclass - Update
FailureClassifierfor component unwrapping - Update
BaseConnector.processJob()with error handling - All existing connectors automatically get error handling
Phase 2: Protocol Layers (Days 4-5)
- Create
HTTPConnectorbase class - Create
WebSocketConnectorbase class - Migrate ONE connector to prove concept
- Document migration guide
Phase 3: Connector Migration (Days 6-8)
- Migrate HTTP-based connectors (OpenAI, Ollama, Gemini)
- Migrate WebSocket-based connectors (ComfyUI, A1111)
- Update tests for each migrated connector
Phase 4: Enforcement (Days 9-10)
- Add linting rules
- Create connector template
- Update documentation
- Add code review checklist
Monitoring & Metrics
Error Classification Metrics
Track in monitoring dashboard:
- Error rate by
failure_type - Error rate by
failure_reason - Retryable vs non-retryable ratio
- Component error wrapping frequency
- Top error sources (by connector)
- Error resolution time
Success Criteria
Week 1:
- ✅ 100% of errors have
failure_typeandfailure_reason - ✅ 0% client timeouts from error propagation issues
- ✅ 5+ connectors migrated to protocol layers
Month 1:
- ✅ 95%+ error classification accuracy (manual audit)
- ✅ 80%+ code reduction in migrated connectors
- ✅ 50% reduction in MTTR for production errors
- ✅ All new connectors use protocol layer inheritance
Documentation Requirements
Required Documentation
Migration Guide (
docs/connector-migration-guide.md)- How to migrate existing connector
- Before/after code examples
- Common pitfalls
Connector Development Guide (
docs/connector-development-guide.md)- How to create new connector
- Using the template
- Error handling best practices
Error Handling Reference (
docs/error-handling-reference.md)- Complete list of FailureTypes and FailureReasons
- User-friendly descriptions
- When to use each type
Testing Guide (
docs/connector-testing-guide.md)- How to test error scenarios
- Mock error responses
- Integration test examples
Rollback Plan
If critical issues arise:
- Revert BaseConnector changes - connectors continue working with old error handling
- Keep ConnectorError class - no harm, just unused
- Pause migrations - migrated connectors can be reverted individually
Low Risk: Changes are additive and backwards compatible. Connectors don't need to change to benefit from BaseConnector improvements.
Alternatives Considered
Alternative 1: Per-Connector Error Handling
Rejected because:
- Code duplication across 19+ connectors
- Inconsistent error messages
- No standardization
- Hard to add new connectors
Alternative 2: Middleware/Decorator Pattern
Rejected because:
- More complex than inheritance
- Harder to enforce
- TypeScript support challenging
- Steeper learning curve
Alternative 3: Global Error Handler
Rejected because:
- Loses connector-specific context
- Can't map HTTP status codes properly
- No type safety
- Hard to test
References
- Error Handling Analysis
- Implementation Plan
- Real-world Failure Classification Tests
- Failure Classification Types
Approval
Approved by: [Engineering Team]
Date: 2025-01-16
Next Review: 2025-02-16 (after 1 month of production usage)
