Skip to content

ADR: Glif API Connector Integration

Status: Accepted

Date: 2025-10-22

Deciders: Engineering Team

Context: Adding Glif.app workflow execution capabilities to the EMP job queue system as a direct API connector.


Problem Statement

The EMP job queue currently supports multiple AI service providers (OpenAI, Gemini, ComfyUI, etc.) but lacks integration with Glif.app, a platform for creating and executing AI workflows (called "glifs"). Users want to:

  1. Execute Glif workflows via the job queue system
  2. Use Glif's pre-built workflows without managing local infrastructure
  3. Leverage Glif's credit-based system for workflow execution
  4. Integrate Glif outputs (images, text, JSON) into existing job pipelines

Requirements:

  • Support Glif Simple API (synchronous execution)
  • Handle both positional and named input parameters
  • Save Glif outputs (images, files) to cloud storage
  • Map Glif workflow IDs to job types
  • Follow existing connector patterns (OpenAI, Gemini)

Decision

Implement a GlifConnector following the direct API connector pattern (similar to OpenAI and Gemini connectors):

Architecture Pattern

GlifConnector extends RestSyncConnector
  ↓ Uses HTTP POST to simple-api.glif.app
  ↓ Bearer token authentication
  ↓ Synchronous execution (no polling required)
  ↓ Asset saving for image/file outputs
  ↓ Minimal health checking (API-only service)

Core Design Decisions

  1. Extends RestSyncConnector

    • Glif API is synchronous (returns complete results in one response)
    • Follows same pattern as GeminiConnector
    • Inherits retry logic, timeout handling, progress reporting
  2. Service Type: glif

    • Single connector handles all glif workflows
    • Workflow ID specified in job payload
    • Flexible input format (array or object)
  3. Authentication: Bearer Token

  4. Request Format

    typescript
    {
      id: "workflow_id",        // Glif workflow ID
      inputs: ["val1", "val2"]  // Positional array
      // OR
      inputs: { param1: "val1", param2: "val2" }  // Named object
    }
  5. Response Handling

    • Extract output (primary result) and outputFull (detailed metadata)
    • Save images/files to cloud storage via AssetSaver
    • Return structured JobResult with saved URLs
    • Include nodes output for debugging
    • Parse price for telemetry
  6. Error Handling

    • Glif API returns 200 OK even on errors (check error field)
    • Classify errors as transient vs. permanent
    • Include workflow ID in error messages
  7. Health Check: MINIMAL class

    • No persistent service to monitor (API-only)
    • Health check validates API connectivity and token
    • Periodically tests with a lightweight workflow or API endpoint
    • Cannot query job status after submission
  8. Error Handling Strategy

    • Glif API returns 200 OK even on errors → check error field in response
    • Classify errors using ConnectorError and FailureClassifier
    • Distinguish transient errors (rate limits, timeouts) from permanent (invalid workflow ID, auth failure)
    • Include workflow ID and input context in error messages
    • Preserve forensic data (API response, request payload)

Implementation Plan

Phase 1: Core Connector (1-2 hours)

File: apps/worker/src/connectors/glif-connector.ts

typescript
export class GlifConnector extends RestSyncConnector {
  version = '1.0.0';

  constructor(connectorId: string) {
    const apiToken = process.env.GLIF_API_TOKEN;
    if (!apiToken) throw new Error('GLIF_API_TOKEN required');

    const config: RestSyncConnectorConfig = {
      connector_id: connectorId,
      service_type: 'glif',
      base_url: 'https://simple-api.glif.app',
      timeout_seconds: 60,  // Workflows vary in duration
      retry_attempts: 3,
      retry_delay_seconds: 2,
      health_check_interval_seconds: 120,
      max_concurrent_jobs: 5,
      auth: {
        type: 'bearer',
        token: apiToken,
      },
      settings: {
        method: 'POST',
        response_format: 'json',
      },
    };

    super(connectorId, config);
  }

  async processJob(jobData: JobData, progressCallback: ProgressCallback): Promise<JobResult> {
    // 1. Build Glif request from job payload
    // 2. Call super.processJob() to execute HTTP request
    // 3. Process Glif response (extract output, save assets)
    // 4. Return structured JobResult
  }

  async checkHealth(): Promise<boolean> {
    // Validate API connectivity
    // Option 1: Simple token validation (if endpoint exists)
    // Option 2: Execute test workflow with known workflow_id
    // Return true if API is accessible, false otherwise
  }

  getHealthCheckCapabilities(): HealthCheckCapabilities {
    return {
      supportsBasicHealthCheck: true,
      supportsJobStatusQuery: false,  // Glif doesn't provide job query API
      supportsJobCancellation: false,  // No cancellation endpoint
      supportsServiceRestart: false,   // External API
      supportsQueueIntrospection: false,
    };
  }

  protected getRequiredHealthCheckClass() {
    return HealthCheckClass.MINIMAL;  // API-only service
  }

  private buildGlifRequest(payload: any): any {
    // Extract workflow ID and inputs from payload
    // Support both array and object input formats
    // Validate required fields (workflow_id, inputs)
    // Throw ConnectorError if validation fails
  }

  private async processGlifResponse(response: any, jobData: JobData): Promise<any> {
    // CRITICAL: Check for errors even with 200 status
    if (response.error) {
      throw new ConnectorError(
        response.error,
        this.service_type,
        this.classifyGlifError(response.error),
        FailureReason.SERVICE_ERROR,
        { workflow_id: jobData.payload.workflow_id, response }
      );
    }

    // Extract output and outputFull
    // Save images/files to cloud storage
    // Build result object with saved URLs
  }

  private classifyGlifError(errorMessage: string): FailureType {
    // Classify error types:
    // - "Invalid workflow ID" → CONFIGURATION_ERROR (permanent)
    // - "Insufficient credits" → RESOURCE_EXHAUSTED (permanent)
    // - "Rate limit exceeded" → RATE_LIMITED (transient)
    // - "Timeout" → TIMEOUT (transient)
    // - Default → SERVICE_ERROR (transient)
  }
}

Phase 2: Service Configuration (30 minutes)

Update: apps/machine/src/config/service-mapping.json

json
{
  "workers": {
    "glif": {
      "services": ["glif"],
      "is_gpu_bound": false,
      "job_service_required_map": [
        {
          "job_service_required": "glif",
          "worker_service": "glif"
        }
      ]
    }
  },
  "connectors": {
    "GlifConnector": {
      "path": "./redis-direct-worker.js",
      "description": "Glif.app workflow execution API connector"
    }
  },
  "services": {
    "glif": {
      "connector": "GlifConnector",
      "type": "external_api",
      "installer": null,
      "installer_filename": null,
      "is_gpu_bound": false,
      "build_stage": "base",
      "required_env": ["GLIF_API_TOKEN", "CLOUD_STORAGE_PROVIDER"],
      "description": "Glif.app workflow execution API",
      "api_config": {
        "base_url": "${GLIF_BASE_URL:-https://simple-api.glif.app}",
        "timeout": "${GLIF_TIMEOUT_SECONDS:-60}",
        "max_retries": "${GLIF_RETRY_ATTEMPTS:-3}",
        "max_concurrent_jobs": "${GLIF_MAX_CONCURRENT_JOBS:-5}"
      },
      "telemetry_logs": [
        {
          "path": "/workspace/logs/glif-*.log",
          "name": "glif-main",
          "description": "Glif API connector logs with requests, responses, and errors"
        }
      ]
    }
  }
}

Update: apps/machine/src/config/service-env-mapping.json

json
{
  "glif": {
    "GLIF_API_TOKEN": "${GLIF_API_TOKEN:-}",
    "GLIF_BASE_URL": "${GLIF_BASE_URL:-https://simple-api.glif.app}",
    "GLIF_TIMEOUT_SECONDS": "${GLIF_TIMEOUT_SECONDS:-60}",
    "GLIF_RETRY_ATTEMPTS": "${GLIF_RETRY_ATTEMPTS:-3}",
    "GLIF_MAX_CONCURRENT_JOBS": "${GLIF_MAX_CONCURRENT_JOBS:-5}"
  }
}

Phase 3: Connector Registration (15 minutes)

Update: apps/worker/src/connector-manager.ts

Add GlifConnector import and registration in createConnector() method.

Phase 4: Health Check & Error Handling (1 hour)

Health Check Implementation:

typescript
async checkHealth(): Promise<boolean> {
  try {
    // Option 1: Lightweight API validation
    // If Glif provides a /health or /status endpoint, use that

    // Option 2: Test with a known simple workflow
    // Use a fast, low-cost workflow for health checking
    const testWorkflowId = process.env.GLIF_HEALTH_CHECK_WORKFLOW_ID || 'default-test-workflow';

    const response = await fetch('https://simple-api.glif.app', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiToken}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        id: testWorkflowId,
        inputs: ['health-check'],
      }),
      signal: AbortSignal.timeout(10000), // 10 second timeout
    });

    // API is healthy if we get a response (even an error response)
    return response.status === 200;
  } catch (error) {
    logger.warn('Glif health check failed:', error);
    return false;
  }
}

Error Classification Matrix:

Error PatternFailureTypeFailureReasonRetryable
"Invalid workflow ID"CONFIGURATION_ERRORINVALID_INPUTNo
"Insufficient credits"RESOURCE_EXHAUSTEDQUOTA_EXCEEDEDNo
"Rate limit exceeded"RATE_LIMITEDRATE_LIMITEDYes (with backoff)
"Authentication failed"AUTHENTICATION_ERRORINVALID_CREDENTIALSNo
"Timeout" / "Timed out"TIMEOUTEXECUTION_TIMEOUTYes
Network errorsNETWORK_ERRORCONNECTION_FAILEDYes
500+ status codesSERVICE_ERRORSERVICE_UNAVAILABLEYes
Unknown errorsSERVICE_ERRORUNKNOWNYes (limited)

Error Handling Implementation:

typescript
private classifyGlifError(errorMessage: string): FailureType {
  const msg = errorMessage.toLowerCase();

  // Permanent errors (do not retry)
  if (msg.includes('invalid workflow') || msg.includes('workflow not found')) {
    return FailureType.CONFIGURATION_ERROR;
  }
  if (msg.includes('insufficient credits') || msg.includes('quota exceeded')) {
    return FailureType.RESOURCE_EXHAUSTED;
  }
  if (msg.includes('authentication') || msg.includes('unauthorized') || msg.includes('invalid token')) {
    return FailureType.AUTHENTICATION_ERROR;
  }

  // Transient errors (retry with backoff)
  if (msg.includes('rate limit') || msg.includes('too many requests')) {
    return FailureType.RATE_LIMITED;
  }
  if (msg.includes('timeout') || msg.includes('timed out')) {
    return FailureType.TIMEOUT;
  }
  if (msg.includes('network') || msg.includes('connection')) {
    return FailureType.NETWORK_ERROR;
  }

  // Default to service error (transient)
  return FailureType.SERVICE_ERROR;
}

private getFailureReason(errorMessage: string, failureType: FailureType): FailureReason {
  switch (failureType) {
    case FailureType.CONFIGURATION_ERROR:
      return FailureReason.INVALID_INPUT;
    case FailureType.RESOURCE_EXHAUSTED:
      return FailureReason.QUOTA_EXCEEDED;
    case FailureType.RATE_LIMITED:
      return FailureReason.RATE_LIMITED;
    case FailureType.AUTHENTICATION_ERROR:
      return FailureReason.INVALID_CREDENTIALS;
    case FailureType.TIMEOUT:
      return FailureReason.EXECUTION_TIMEOUT;
    case FailureType.NETWORK_ERROR:
      return FailureReason.CONNECTION_FAILED;
    case FailureType.SERVICE_ERROR:
      return FailureReason.SERVICE_UNAVAILABLE;
    default:
      return FailureReason.UNKNOWN;
  }
}

Forensic Data Preservation:

typescript
private async processGlifResponse(response: any, jobData: JobData): Promise<any> {
  // Check for API errors (200 status but error in response)
  if (response.error) {
    const failureType = this.classifyGlifError(response.error);
    const failureReason = this.getFailureReason(response.error, failureType);

    throw new ConnectorError(
      `Glif workflow execution failed: ${response.error}`,
      this.service_type,
      failureType,
      failureReason,
      {
        workflow_id: jobData.payload.workflow_id,
        inputs: jobData.payload.inputs,
        glif_error: response.error,
        raw_response: response,
        timestamp: new Date().toISOString(),
      }
    );
  }

  // Log successful execution with telemetry
  logger.info('Glif workflow executed successfully', {
    workflow_id: jobData.payload.workflow_id,
    price: response.price,
    output_type: response.outputFull?.type,
  });

  // Continue with output processing...
}

Phase 5: Testing (1 hour)

  1. Unit Tests: apps/worker/src/connectors/__tests__/glif-connector.test.ts

    • Test request building (array vs object inputs)
    • Test response processing (with/without images)
    • Test error handling (API errors with 200 status)
    • Test error classification (all failure types)
    • Test health check (success and failure scenarios)
    • Test asset saving
    • Test forensic data preservation
  2. Integration Test: Submit test job via API

    • Use a simple public Glif workflow
    • Verify image output saved to cloud storage
    • Verify job result structure

Phase 5: Documentation (30 minutes)

  1. Connector README: Document Glif-specific configuration
  2. Example Job Payloads: Show array and object input formats
  3. Workflow ID Discovery: Link to Glif API docs for finding IDs

Technical Specifications

Environment Variables

bash
# Required
GLIF_API_TOKEN=<your-glif-api-token>

# Optional (with defaults)
GLIF_BASE_URL=https://simple-api.glif.app
GLIF_TIMEOUT_SECONDS=60
GLIF_RETRY_ATTEMPTS=3
GLIF_MAX_CONCURRENT_JOBS=5
CLOUD_STORAGE_PROVIDER=<r2|s3|etc>  # For saving outputs

Job Payload Format

json
{
  "id": "unique-job-id",
  "type": "glif",
  "payload": {
    "workflow_id": "clv6pfjka00018xrl05qlr2it",
    "inputs": ["a happy horse", "living on a farm"]
  }
}

OR (named inputs):

json
{
  "id": "unique-job-id",
  "type": "glif",
  "payload": {
    "workflow_id": "clv6pfjka00018xrl05qlr2it",
    "inputs": {
      "prompt": "a happy horse",
      "context": "living on a farm"
    }
  }
}

Expected Response

json
{
  "success": true,
  "data": {
    "content_type": "image",
    "image_url": "https://cdn.example.com/glif-output-12345.png",
    "mime_type": "image/png",
    "raw_response": {
      "output": "https://glif-run-outputs.s3.amazonaws.com/...",
      "outputFull": { "type": "image", ... },
      "price": 0.05,
      "nodes": { ... }
    }
  },
  "processing_time_ms": 3500,
  "metadata": {
    "service": "glif",
    "connector": "glif",
    "workflow_id": "clv6pfjka00018xrl05qlr2it",
    "price": 0.05
  }
}

Consequences

Positive ✅

  1. Low Infrastructure Cost: No GPU/server management, leverages Glif's infrastructure
  2. Fast Integration: Follows proven RestSyncConnector pattern
  3. Workflow Library Access: Users can leverage existing Glif workflows
  4. Consistent API: Matches OpenAI/Gemini connector patterns
  5. Cloud Storage Integration: Automatic asset saving for outputs

Negative ⚠️

  1. External Dependency: Relies on Glif.app service availability
  2. API Instability: Glif API is in beta and "subject to change"
  3. Credit Costs: Users need Glif accounts with sufficient credits
  4. Limited Error Info: API returns 200 OK even on errors
  5. No Async Polling: Cannot query job status after submission
  6. String-Only Inputs: API currently only supports string values

Neutral 🔵

  1. Attribution Required: Must display "powered by glif.app" in UI
  2. Rate Limiting: Subject to Glif's credit/rate limits
  3. CORS Support: Only Simple API supports CORS (fine for server-side)

Risks and Mitigations

RiskImpactMitigation
API changes without noticeHighVersion the connector, monitor Glif changelog
Service downtimeMediumImplement graceful error messages, retry logic
Credit exhaustionMediumMonitor job costs, set job budget limits
Output format changesLowUse outputFull for type detection
Rate limitingLowConfigure max_concurrent_jobs appropriately

Alternative Approaches Considered

1. Build Custom Workflow Engine

Rejected: Reinventing Glif's workflow builder would be massive effort

2. Use ComfyUI Instead

Rejected: Glif provides pre-built workflows; ComfyUI requires custom workflow creation

3. Implement Async Polling Connector

Rejected: Glif Simple API is synchronous; no polling endpoint available


Success Criteria

  1. ✅ GlifConnector successfully extends RestSyncConnector
  2. ✅ Can execute Glif workflows with array and object inputs
  3. ✅ Image outputs automatically saved to cloud storage
  4. ✅ Job results match OpenAI/Gemini result structure
  5. ✅ Error handling properly classifies transient vs. permanent failures
  6. ✅ Integration tests pass with real Glif API
  7. ✅ Documentation includes example job payloads
  8. ✅ Service mapping includes Glif worker configuration

References

Released under the MIT License.