Skip to content

ADR: Connector Hierarchy Consolidation

Status: ✅ Accepted & Implemented

Date: 2025-10-29

Implementation Completed: 2025-09 (approximately)

Deciders: Engineering Team

Supersedes: Partially implements and completes connector-error-handling-standard.md (2025-01-16)


Problem Statement

The connector error handling ADR from January 2025 was accepted but never fully implemented, leading to critical production issues:

Current State (Broken)

BaseConnector (with ConnectorError handling)

  HTTPConnector ✅ Has error handling

RestSyncConnector ❌ Directly implements ConnectorInterface
  ↓                    (bypasses BaseConnector entirely!)
  GeminiImageConnector
  GeminiImageEditConnector (Nano Banana)

    Returns {success: false, error: "..."} instead of throwing
    Worker calls completeJob() instead of failJob()
    Emits complete_job (GREEN) instead of fail_job (RED)

Impact of Incomplete Implementation

Production Incident (2025-10-27):

  • Gemini connectors return {success: false} on errors
  • Jobs marked as "completed successfully" (green) despite failing
  • No fail_job events emitted
  • Monitoring shows false positives
  • Users see successful job completion with error messages in result

Architectural Chaos:

  • 4 duplicate HTTP connector implementations:
    • HTTPConnector
    • RestSyncConnector
    • AsyncRESTConnector
    • RestAsyncConnector
  • No single source of truth for HTTP error handling
  • Each implementation has different error handling behavior
  • New connectors randomly choose which base to extend

Decision

Complete the January 2025 ADR by consolidating ALL HTTP connectors into a single hierarchy:

BaseConnector (ConnectorError enforcement)

  HTTPConnector (HTTP error handling + native fetch)

    ├─ SyncHTTPConnector (for synchronous APIs)
    ├─ AsyncHTTPConnector (for async/polling APIs)
    └─ StreamingHTTPConnector (for SSE/streaming)

Service Connectors:
  - GeminiTextConnector extends HTTPConnector
  - GeminiImageConnector extends AsyncHTTPConnector
  - GeminiImageEditConnector extends HTTPConnector
  - OpenAITextConnector extends HTTPConnector
  - etc.

Key Changes

  1. DELETE redundant connector bases:

    • RestSyncConnector → Migrate to HTTPConnector
    • AsyncRESTConnector → Migrate to AsyncHTTPConnector
    • RestAsyncConnector → Migrate to AsyncHTTPConnector
  2. Enforce error handling at BaseConnector level:

    • Make processJob() FINAL (cannot override)
    • Force use of processJobInternal()
    • Automatic ConnectorError conversion
    • No {success: false} returns possible
  3. Migrate to native fetch:

    • Remove axios dependency
    • Use Node.js native fetch() API
    • Simpler, more maintainable
    • Better TypeScript support

Consequences

Positive

Error handling actually works - No more {success: false} bypassing ✅ Single HTTP implementation - One place to fix bugs, add features ✅ Consistent behavior - All HTTP connectors work the same way ✅ Simpler codebase - Delete ~2000 lines of duplicate code ✅ Easier to add connectors - Clear pattern to follow ✅ Better monitoring - Failures actually show as failures

Negative

⚠️ Migration work - 10+ connectors need updates ⚠️ Breaking change - Removes old base classes ⚠️ Testing required - Comprehensive tests for each migrated connector

Neutral

🔵 Native fetch - Different API than axios, team needs to learn 🔵 Enforcement - Stricter rules, less flexibility (this is good!)


Technical Design

1. Make BaseConnector.processJob() Final

Location: apps/worker/src/connectors/base-connector.ts

typescript
export abstract class BaseConnector {
  /**
   * 🔒 FINAL - Cannot be overridden by subclasses
   * This ensures ALL errors flow through handleConnectorError()
   */
  async processJob(jobData: JobData, progressCallback: ProgressCallback): Promise<JobResult> {
    const startTime = Date.now();

    try {
      // Status reporting
      await this.updateConnectorStatus('active');

      // Call subclass implementation
      const result = await this.processJobImpl(jobData, progressCallback);

      // Validate result
      if (!result.success) {
        throw new Error('processJobImpl() must throw errors, not return {success: false}');
      }

      await this.updateConnectorStatus('idle');
      return result;

    } catch (error) {
      await this.updateConnectorStatus('error');

      // Convert to ConnectorError and throw
      // Worker will catch this and call failJob()
      throw error instanceof ConnectorError
        ? error
        : ConnectorError.fromError(error, this.service_type);
    }
  }

  /**
   * ✅ REQUIRED: Subclasses implement this instead of processJob()
   * MUST throw ConnectorError on failure
   * MUST return {success: true, data: ...} on success
   */
  protected abstract processJobImpl(
    jobData: JobData,
    progressCallback: ProgressCallback
  ): Promise<JobResult>;
}

Key enforcements:

  • processJob() is NOT marked as protected abstract - it's implemented and final
  • Subclasses CANNOT override it
  • Throws if result has success: false (catches the pattern)
  • Always converts errors to ConnectorError

2. Update HTTPConnector with Native Fetch

Location: apps/worker/src/connectors/protocol/http-connector.ts

typescript
export abstract class HTTPConnector extends BaseConnector {

  /**
   * Execute HTTP request with native fetch
   */
  protected async executeRequest(
    url: string,
    options: RequestInit
  ): Promise<Response> {
    try {
      const response = await fetch(url, options);

      // Handle HTTP errors
      if (!response.ok) {
        throw await this.handleHTTPError(response);
      }

      return response;

    } catch (error) {
      if (error instanceof ConnectorError) {
        throw error;
      }

      // Network/timeout errors
      throw ConnectorError.fromError(error, this.service_type);
    }
  }

  /**
   * Map HTTP status codes to ConnectorError
   */
  private async handleHTTPError(response: Response): Promise<ConnectorError> {
    const status = response.status;
    const body = await response.text();

    // 400: Bad Request (usually invalid payload)
    if (status === 400) {
      return new ConnectorError(
        FailureType.VALIDATION_ERROR,
        FailureReason.INVALID_REQUEST,
        `Bad request: ${body}`,
        false, // Not retryable
        { httpStatus: status, rawResponse: body }
      );
    }

    // 401/403: Authentication
    if (status === 401 || status === 403) {
      return new ConnectorError(
        FailureType.AUTH_ERROR,
        FailureReason.INVALID_API_KEY,
        'Authentication failed',
        false,
        { httpStatus: status }
      );
    }

    // 429: Rate limit
    if (status === 429) {
      const retryAfter = response.headers.get('retry-after');
      return new ConnectorError(
        FailureType.RATE_LIMIT,
        FailureReason.REQUESTS_PER_MINUTE,
        'Rate limit exceeded',
        true,
        {
          httpStatus: status,
          retryAfterSeconds: retryAfter ? parseInt(retryAfter) : 60
        }
      );
    }

    // 5xx: Service errors
    if (status >= 500) {
      return new ConnectorError(
        FailureType.SERVICE_ERROR,
        FailureReason.SERVICE_UNAVAILABLE,
        `Service error: ${status}`,
        true,
        { httpStatus: status, rawResponse: body }
      );
    }

    // Unknown
    return new ConnectorError(
      FailureType.SYSTEM_ERROR,
      FailureReason.UNKNOWN_ERROR,
      `HTTP ${status}: ${body}`,
      true,
      { httpStatus: status, rawResponse: body }
    );
  }

  /**
   * Default implementation - subclasses can override for custom logic
   */
  protected async processJobImpl(
    jobData: JobData,
    progressCallback: ProgressCallback
  ): Promise<JobResult> {
    const config = await this.buildRequestConfig(jobData);
    const response = await this.executeRequest(config.url, config.options);
    return await this.parseResponse(response, jobData);
  }

  /**
   * ✅ REQUIRED: Subclasses implement these
   */
  protected abstract buildRequestConfig(jobData: JobData): Promise<{
    url: string;
    options: RequestInit;
  }>;

  protected abstract parseResponse(
    response: Response,
    jobData: JobData
  ): Promise<JobResult>;
}

3. Create AsyncHTTPConnector for Polling

Location: apps/worker/src/connectors/protocol/async-http-connector.ts

typescript
export abstract class AsyncHTTPConnector extends HTTPConnector {

  /**
   * Override processJobImpl for async/polling pattern
   */
  protected async processJobImpl(
    jobData: JobData,
    progressCallback: ProgressCallback
  ): Promise<JobResult> {
    // Submit job
    const config = await this.buildRequestConfig(jobData);
    const submitResponse = await this.executeRequest(config.url, config.options);
    const serviceJobId = await this.extractJobId(submitResponse);

    // Poll for completion
    const pollConfig = await this.buildPollConfig(serviceJobId);
    const maxAttempts = this.getMaxPollAttempts();
    const pollInterval = this.getPollInterval();

    for (let attempt = 0; attempt < maxAttempts; attempt++) {
      await new Promise(resolve => setTimeout(resolve, pollInterval));

      const pollResponse = await this.executeRequest(pollConfig.url, pollConfig.options);
      const status = await this.checkJobStatus(pollResponse);

      if (status.completed) {
        if (status.error) {
          throw new ConnectorError(
            FailureType.SERVICE_ERROR,
            FailureReason.JOB_FAILED,
            status.error,
            false,
            { serviceJobId, rawResponse: await pollResponse.text() }
          );
        }
        return await this.parseResponse(pollResponse, jobData);
      }

      // Update progress
      if (progressCallback) {
        await progressCallback({
          job_id: jobData.id,
          progress: (attempt / maxAttempts) * 100,
          message: 'Polling for completion...',
          current_step: 'polling'
        });
      }
    }

    // Timeout
    throw new ConnectorError(
      FailureType.TIMEOUT,
      FailureReason.SERVICE_TIMEOUT,
      'Polling timeout exceeded',
      true,
      { serviceJobId, maxAttempts, pollInterval }
    );
  }

  /**
   * ✅ REQUIRED: Async-specific methods
   */
  protected abstract extractJobId(response: Response): Promise<string>;
  protected abstract buildPollConfig(serviceJobId: string): Promise<{url: string, options: RequestInit}>;
  protected abstract checkJobStatus(response: Response): Promise<{completed: boolean, error?: string}>;

  /**
   * 🔧 OPTIONAL: Override polling behavior
   */
  protected getMaxPollAttempts(): number { return 60; }
  protected getPollInterval(): number { return 1000; }
}

Migration Plan

Phase 1: Fix Gemini Connectors (URGENT - Day 1)

Band-aid fix to unblock production:

  1. Change return {success: false} to throw error in:
    • GeminiImageConnector
    • GeminiImageEditConnector
  2. Test and deploy
  3. Status: ✅ DONE (2025-10-27)

Phase 2: Enforce at BaseConnector (Day 2)

  1. Make processJob() final
  2. Add validation that catches {success: false} returns
  3. Force all connectors to use processJobImpl()
  4. All existing connectors still work (backwards compatible)

Phase 3: Consolidate HTTP Connectors (Days 3-5)

  1. Create new HTTPConnector with native fetch
  2. Create AsyncHTTPConnector for polling
  3. Migrate ONE connector to prove pattern (GeminiTextConnector)
  4. Delete old RestSyncConnector, AsyncRESTConnector, RestAsyncConnector

Phase 4: Migrate All Connectors (Days 6-10)

Priority order:

  1. Gemini connectors (already started)
  2. OpenAI connectors
  3. Ollama connector
  4. Other HTTP connectors
  5. WebSocket connectors (separate effort)

Per connector:

  • Update to extend HTTPConnector or AsyncHTTPConnector
  • Implement buildRequestConfig() and parseResponse()
  • Remove all try/catch blocks
  • Update tests
  • Deploy and verify

Testing Strategy

Unit Tests

typescript
describe('HTTPConnector Error Handling', () => {
  it('throws ConnectorError on 400 Bad Request', async () => {
    const connector = new TestHTTPConnector();
    mockFetch.mockResolvedValue(new Response('Invalid payload', { status: 400 }));

    await expect(connector.processJob(jobData, callback))
      .rejects
      .toThrow(ConnectorError);
  });

  it('cannot return {success: false}', async () => {
    class BadConnector extends HTTPConnector {
      protected async processJobImpl() {
        return { success: false, error: 'test' };
      }
    }

    const connector = new BadConnector();
    await expect(connector.processJob(jobData, callback))
      .rejects
      .toThrow('must throw errors, not return {success: false}');
  });
});

Integration Tests

typescript
describe('Gemini Connector Production Scenario', () => {
  it('emits fail_job on 400 error', async () => {
    // Submit job that will fail
    const jobId = await submitGeminiJob({ /* invalid payload */ });

    // Wait for webhook
    const webhook = await waitForWebhook(jobId);

    // Assert fail_job event (not complete_job!)
    expect(webhook.event_type).toBe('fail_job');
    expect(webhook.data.error).toContain('Bad request');
  });
});

Success Criteria

Phase 1: Band-aid Fix

  • [x] Band-aid fix deployed (throw errors in Gemini connectors)

Phase 2: BaseConnector Enforcement

  • [x] BaseConnector.processJob() validates results (line 513-543)
  • [x] No connectors can return {success: false} (validation throws error)
  • [x] All errors must be thrown as ConnectorError

Phase 3: HTTP Hierarchy Consolidation

  • [x] HTTPConnector implements processJobImpl() (not overriding processJob)
  • [x] AsyncHTTPConnector extends HTTPConnector with polling
  • [x] WebSocketConnector for ComfyUI/A1111
  • [~] HTTPConnector uses axios (native fetch deferred - optional future work)

Phase 4: Connector Migration

  • [x] GeminiImageConnector → HTTPConnector
  • [x] GeminiImageEditConnector → HTTPConnector
  • [x] GlifConnector → HTTPConnector
  • [x] OpenAIResponsesConnector → AsyncHTTPConnector
  • [x] All 7+ production connectors migrated
  • [x] Old base classes deleted (RestSyncConnector)
  • [x] 100% of errors show as fail_job (validation enforces this)

Rollback Plan

If issues arise:

  1. Phase 1 (Band-aid): Revert Gemini connectors to return {success: false}
  2. Phase 2 (Enforcement): Remove validation check in BaseConnector
  3. Phase 3/4 (Migration): Keep old base classes, revert individual connectors

Risk Level: LOW - Changes are incremental and backwards compatible


References


Implementation Summary

Status: ✅ COMPLETE (Phases 1-4)

Completed: ~September 2025

Outcome:

  • All production connectors migrated to unified HTTP hierarchy
  • BaseConnector enforcement prevents {success: false} pattern
  • RestSyncConnector and RestAsyncConnector deleted
  • Zero connectors bypass error handling
  • 100% of job failures show as fail_job events

Remaining Work:

  • [ ] Optional: Migrate HTTPConnector from axios to native fetch (2 hours)
  • [x] Documentation updated (this ADR)
  • [x] Dead code removed (rest-async-connector.ts deleted 2025-10-29)

Verification:

bash
# No connectors return {success: false}
grep -r "return.*success.*false" apps/worker/src/connectors/*.ts
# Result: No matches found ✅

# RestSyncConnector deleted
ls apps/worker/src/connectors/rest-sync-connector.ts
# Result: No such file or directory ✅

# All production connectors use proper bases
GeminiImageConnector extends HTTPConnector
GeminiImageEditConnector extends HTTPConnector
GlifConnector extends HTTPConnector
OpenAIResponsesConnector extends AsyncHTTPConnector

Released under the MIT License.