ADR: Glif API Connector Integration
Status: Accepted
Date: 2025-10-22
Deciders: Engineering Team
Context: Adding Glif.app workflow execution capabilities to the EMP job queue system as a direct API connector.
Problem Statement
The EMP job queue currently supports multiple AI service providers (OpenAI, Gemini, ComfyUI, etc.) but lacks integration with Glif.app, a platform for creating and executing AI workflows (called "glifs"). Users want to:
- Execute Glif workflows via the job queue system
- Use Glif's pre-built workflows without managing local infrastructure
- Leverage Glif's credit-based system for workflow execution
- Integrate Glif outputs (images, text, JSON) into existing job pipelines
Requirements:
- Support Glif Simple API (synchronous execution)
- Handle both positional and named input parameters
- Save Glif outputs (images, files) to cloud storage
- Map Glif workflow IDs to job types
- Follow existing connector patterns (OpenAI, Gemini)
Decision
Implement a GlifConnector following the direct API connector pattern (similar to OpenAI and Gemini connectors):
Architecture Pattern
GlifConnector extends RestSyncConnector
↓ Uses HTTP POST to simple-api.glif.app
↓ Bearer token authentication
↓ Synchronous execution (no polling required)
↓ Asset saving for image/file outputs
↓ Minimal health checking (API-only service)Core Design Decisions
Extends RestSyncConnector
- Glif API is synchronous (returns complete results in one response)
- Follows same pattern as GeminiConnector
- Inherits retry logic, timeout handling, progress reporting
Service Type:
glif- Single connector handles all glif workflows
- Workflow ID specified in job payload
- Flexible input format (array or object)
Authentication: Bearer Token
GLIF_API_TOKENenvironment variable (required)- Token obtained from https://glif.app/settings/api-tokens
- Header:
Authorization: Bearer ${token}
Request Format
typescript{ id: "workflow_id", // Glif workflow ID inputs: ["val1", "val2"] // Positional array // OR inputs: { param1: "val1", param2: "val2" } // Named object }Response Handling
- Extract
output(primary result) andoutputFull(detailed metadata) - Save images/files to cloud storage via AssetSaver
- Return structured JobResult with saved URLs
- Include
nodesoutput for debugging - Parse
pricefor telemetry
- Extract
Error Handling
- Glif API returns 200 OK even on errors (check
errorfield) - Classify errors as transient vs. permanent
- Include workflow ID in error messages
- Glif API returns 200 OK even on errors (check
Health Check: MINIMAL class
- No persistent service to monitor (API-only)
- Health check validates API connectivity and token
- Periodically tests with a lightweight workflow or API endpoint
- Cannot query job status after submission
Error Handling Strategy
- Glif API returns 200 OK even on errors → check
errorfield in response - Classify errors using ConnectorError and FailureClassifier
- Distinguish transient errors (rate limits, timeouts) from permanent (invalid workflow ID, auth failure)
- Include workflow ID and input context in error messages
- Preserve forensic data (API response, request payload)
- Glif API returns 200 OK even on errors → check
Implementation Plan
Phase 1: Core Connector (1-2 hours)
File: apps/worker/src/connectors/glif-connector.ts
export class GlifConnector extends RestSyncConnector {
version = '1.0.0';
constructor(connectorId: string) {
const apiToken = process.env.GLIF_API_TOKEN;
if (!apiToken) throw new Error('GLIF_API_TOKEN required');
const config: RestSyncConnectorConfig = {
connector_id: connectorId,
service_type: 'glif',
base_url: 'https://simple-api.glif.app',
timeout_seconds: 60, // Workflows vary in duration
retry_attempts: 3,
retry_delay_seconds: 2,
health_check_interval_seconds: 120,
max_concurrent_jobs: 5,
auth: {
type: 'bearer',
token: apiToken,
},
settings: {
method: 'POST',
response_format: 'json',
},
};
super(connectorId, config);
}
async processJob(jobData: JobData, progressCallback: ProgressCallback): Promise<JobResult> {
// 1. Build Glif request from job payload
// 2. Call super.processJob() to execute HTTP request
// 3. Process Glif response (extract output, save assets)
// 4. Return structured JobResult
}
async checkHealth(): Promise<boolean> {
// Validate API connectivity
// Option 1: Simple token validation (if endpoint exists)
// Option 2: Execute test workflow with known workflow_id
// Return true if API is accessible, false otherwise
}
getHealthCheckCapabilities(): HealthCheckCapabilities {
return {
supportsBasicHealthCheck: true,
supportsJobStatusQuery: false, // Glif doesn't provide job query API
supportsJobCancellation: false, // No cancellation endpoint
supportsServiceRestart: false, // External API
supportsQueueIntrospection: false,
};
}
protected getRequiredHealthCheckClass() {
return HealthCheckClass.MINIMAL; // API-only service
}
private buildGlifRequest(payload: any): any {
// Extract workflow ID and inputs from payload
// Support both array and object input formats
// Validate required fields (workflow_id, inputs)
// Throw ConnectorError if validation fails
}
private async processGlifResponse(response: any, jobData: JobData): Promise<any> {
// CRITICAL: Check for errors even with 200 status
if (response.error) {
throw new ConnectorError(
response.error,
this.service_type,
this.classifyGlifError(response.error),
FailureReason.SERVICE_ERROR,
{ workflow_id: jobData.payload.workflow_id, response }
);
}
// Extract output and outputFull
// Save images/files to cloud storage
// Build result object with saved URLs
}
private classifyGlifError(errorMessage: string): FailureType {
// Classify error types:
// - "Invalid workflow ID" → CONFIGURATION_ERROR (permanent)
// - "Insufficient credits" → RESOURCE_EXHAUSTED (permanent)
// - "Rate limit exceeded" → RATE_LIMITED (transient)
// - "Timeout" → TIMEOUT (transient)
// - Default → SERVICE_ERROR (transient)
}
}Phase 2: Service Configuration (30 minutes)
Update: apps/machine/src/config/service-mapping.json
{
"workers": {
"glif": {
"services": ["glif"],
"is_gpu_bound": false,
"job_service_required_map": [
{
"job_service_required": "glif",
"worker_service": "glif"
}
]
}
},
"connectors": {
"GlifConnector": {
"path": "./redis-direct-worker.js",
"description": "Glif.app workflow execution API connector"
}
},
"services": {
"glif": {
"connector": "GlifConnector",
"type": "external_api",
"installer": null,
"installer_filename": null,
"is_gpu_bound": false,
"build_stage": "base",
"required_env": ["GLIF_API_TOKEN", "CLOUD_STORAGE_PROVIDER"],
"description": "Glif.app workflow execution API",
"api_config": {
"base_url": "${GLIF_BASE_URL:-https://simple-api.glif.app}",
"timeout": "${GLIF_TIMEOUT_SECONDS:-60}",
"max_retries": "${GLIF_RETRY_ATTEMPTS:-3}",
"max_concurrent_jobs": "${GLIF_MAX_CONCURRENT_JOBS:-5}"
},
"telemetry_logs": [
{
"path": "/workspace/logs/glif-*.log",
"name": "glif-main",
"description": "Glif API connector logs with requests, responses, and errors"
}
]
}
}
}Update: apps/machine/src/config/service-env-mapping.json
{
"glif": {
"GLIF_API_TOKEN": "${GLIF_API_TOKEN:-}",
"GLIF_BASE_URL": "${GLIF_BASE_URL:-https://simple-api.glif.app}",
"GLIF_TIMEOUT_SECONDS": "${GLIF_TIMEOUT_SECONDS:-60}",
"GLIF_RETRY_ATTEMPTS": "${GLIF_RETRY_ATTEMPTS:-3}",
"GLIF_MAX_CONCURRENT_JOBS": "${GLIF_MAX_CONCURRENT_JOBS:-5}"
}
}Phase 3: Connector Registration (15 minutes)
Update: apps/worker/src/connector-manager.ts
Add GlifConnector import and registration in createConnector() method.
Phase 4: Health Check & Error Handling (1 hour)
Health Check Implementation:
async checkHealth(): Promise<boolean> {
try {
// Option 1: Lightweight API validation
// If Glif provides a /health or /status endpoint, use that
// Option 2: Test with a known simple workflow
// Use a fast, low-cost workflow for health checking
const testWorkflowId = process.env.GLIF_HEALTH_CHECK_WORKFLOW_ID || 'default-test-workflow';
const response = await fetch('https://simple-api.glif.app', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
id: testWorkflowId,
inputs: ['health-check'],
}),
signal: AbortSignal.timeout(10000), // 10 second timeout
});
// API is healthy if we get a response (even an error response)
return response.status === 200;
} catch (error) {
logger.warn('Glif health check failed:', error);
return false;
}
}Error Classification Matrix:
| Error Pattern | FailureType | FailureReason | Retryable |
|---|---|---|---|
| "Invalid workflow ID" | CONFIGURATION_ERROR | INVALID_INPUT | No |
| "Insufficient credits" | RESOURCE_EXHAUSTED | QUOTA_EXCEEDED | No |
| "Rate limit exceeded" | RATE_LIMITED | RATE_LIMITED | Yes (with backoff) |
| "Authentication failed" | AUTHENTICATION_ERROR | INVALID_CREDENTIALS | No |
| "Timeout" / "Timed out" | TIMEOUT | EXECUTION_TIMEOUT | Yes |
| Network errors | NETWORK_ERROR | CONNECTION_FAILED | Yes |
| 500+ status codes | SERVICE_ERROR | SERVICE_UNAVAILABLE | Yes |
| Unknown errors | SERVICE_ERROR | UNKNOWN | Yes (limited) |
Error Handling Implementation:
private classifyGlifError(errorMessage: string): FailureType {
const msg = errorMessage.toLowerCase();
// Permanent errors (do not retry)
if (msg.includes('invalid workflow') || msg.includes('workflow not found')) {
return FailureType.CONFIGURATION_ERROR;
}
if (msg.includes('insufficient credits') || msg.includes('quota exceeded')) {
return FailureType.RESOURCE_EXHAUSTED;
}
if (msg.includes('authentication') || msg.includes('unauthorized') || msg.includes('invalid token')) {
return FailureType.AUTHENTICATION_ERROR;
}
// Transient errors (retry with backoff)
if (msg.includes('rate limit') || msg.includes('too many requests')) {
return FailureType.RATE_LIMITED;
}
if (msg.includes('timeout') || msg.includes('timed out')) {
return FailureType.TIMEOUT;
}
if (msg.includes('network') || msg.includes('connection')) {
return FailureType.NETWORK_ERROR;
}
// Default to service error (transient)
return FailureType.SERVICE_ERROR;
}
private getFailureReason(errorMessage: string, failureType: FailureType): FailureReason {
switch (failureType) {
case FailureType.CONFIGURATION_ERROR:
return FailureReason.INVALID_INPUT;
case FailureType.RESOURCE_EXHAUSTED:
return FailureReason.QUOTA_EXCEEDED;
case FailureType.RATE_LIMITED:
return FailureReason.RATE_LIMITED;
case FailureType.AUTHENTICATION_ERROR:
return FailureReason.INVALID_CREDENTIALS;
case FailureType.TIMEOUT:
return FailureReason.EXECUTION_TIMEOUT;
case FailureType.NETWORK_ERROR:
return FailureReason.CONNECTION_FAILED;
case FailureType.SERVICE_ERROR:
return FailureReason.SERVICE_UNAVAILABLE;
default:
return FailureReason.UNKNOWN;
}
}Forensic Data Preservation:
private async processGlifResponse(response: any, jobData: JobData): Promise<any> {
// Check for API errors (200 status but error in response)
if (response.error) {
const failureType = this.classifyGlifError(response.error);
const failureReason = this.getFailureReason(response.error, failureType);
throw new ConnectorError(
`Glif workflow execution failed: ${response.error}`,
this.service_type,
failureType,
failureReason,
{
workflow_id: jobData.payload.workflow_id,
inputs: jobData.payload.inputs,
glif_error: response.error,
raw_response: response,
timestamp: new Date().toISOString(),
}
);
}
// Log successful execution with telemetry
logger.info('Glif workflow executed successfully', {
workflow_id: jobData.payload.workflow_id,
price: response.price,
output_type: response.outputFull?.type,
});
// Continue with output processing...
}Phase 5: Testing (1 hour)
Unit Tests:
apps/worker/src/connectors/__tests__/glif-connector.test.ts- Test request building (array vs object inputs)
- Test response processing (with/without images)
- Test error handling (API errors with 200 status)
- Test error classification (all failure types)
- Test health check (success and failure scenarios)
- Test asset saving
- Test forensic data preservation
Integration Test: Submit test job via API
- Use a simple public Glif workflow
- Verify image output saved to cloud storage
- Verify job result structure
Phase 5: Documentation (30 minutes)
- Connector README: Document Glif-specific configuration
- Example Job Payloads: Show array and object input formats
- Workflow ID Discovery: Link to Glif API docs for finding IDs
Technical Specifications
Environment Variables
# Required
GLIF_API_TOKEN=<your-glif-api-token>
# Optional (with defaults)
GLIF_BASE_URL=https://simple-api.glif.app
GLIF_TIMEOUT_SECONDS=60
GLIF_RETRY_ATTEMPTS=3
GLIF_MAX_CONCURRENT_JOBS=5
CLOUD_STORAGE_PROVIDER=<r2|s3|etc> # For saving outputsJob Payload Format
{
"id": "unique-job-id",
"type": "glif",
"payload": {
"workflow_id": "clv6pfjka00018xrl05qlr2it",
"inputs": ["a happy horse", "living on a farm"]
}
}OR (named inputs):
{
"id": "unique-job-id",
"type": "glif",
"payload": {
"workflow_id": "clv6pfjka00018xrl05qlr2it",
"inputs": {
"prompt": "a happy horse",
"context": "living on a farm"
}
}
}Expected Response
{
"success": true,
"data": {
"content_type": "image",
"image_url": "https://cdn.example.com/glif-output-12345.png",
"mime_type": "image/png",
"raw_response": {
"output": "https://glif-run-outputs.s3.amazonaws.com/...",
"outputFull": { "type": "image", ... },
"price": 0.05,
"nodes": { ... }
}
},
"processing_time_ms": 3500,
"metadata": {
"service": "glif",
"connector": "glif",
"workflow_id": "clv6pfjka00018xrl05qlr2it",
"price": 0.05
}
}Consequences
Positive ✅
- Low Infrastructure Cost: No GPU/server management, leverages Glif's infrastructure
- Fast Integration: Follows proven RestSyncConnector pattern
- Workflow Library Access: Users can leverage existing Glif workflows
- Consistent API: Matches OpenAI/Gemini connector patterns
- Cloud Storage Integration: Automatic asset saving for outputs
Negative ⚠️
- External Dependency: Relies on Glif.app service availability
- API Instability: Glif API is in beta and "subject to change"
- Credit Costs: Users need Glif accounts with sufficient credits
- Limited Error Info: API returns 200 OK even on errors
- No Async Polling: Cannot query job status after submission
- String-Only Inputs: API currently only supports string values
Neutral 🔵
- Attribution Required: Must display "powered by glif.app" in UI
- Rate Limiting: Subject to Glif's credit/rate limits
- CORS Support: Only Simple API supports CORS (fine for server-side)
Risks and Mitigations
| Risk | Impact | Mitigation |
|---|---|---|
| API changes without notice | High | Version the connector, monitor Glif changelog |
| Service downtime | Medium | Implement graceful error messages, retry logic |
| Credit exhaustion | Medium | Monitor job costs, set job budget limits |
| Output format changes | Low | Use outputFull for type detection |
| Rate limiting | Low | Configure max_concurrent_jobs appropriately |
Alternative Approaches Considered
1. Build Custom Workflow Engine
Rejected: Reinventing Glif's workflow builder would be massive effort
2. Use ComfyUI Instead
Rejected: Glif provides pre-built workflows; ComfyUI requires custom workflow creation
3. Implement Async Polling Connector
Rejected: Glif Simple API is synchronous; no polling endpoint available
Success Criteria
- ✅ GlifConnector successfully extends RestSyncConnector
- ✅ Can execute Glif workflows with array and object inputs
- ✅ Image outputs automatically saved to cloud storage
- ✅ Job results match OpenAI/Gemini result structure
- ✅ Error handling properly classifies transient vs. permanent failures
- ✅ Integration tests pass with real Glif API
- ✅ Documentation includes example job payloads
- ✅ Service mapping includes Glif worker configuration
References
- Glif API Documentation
- Glif API Token Settings
- GeminiConnector Implementation (similar pattern)
- OpenAIConnector Implementation (similar pattern)
- RestSyncConnector Base Class
