Skip to content

Effect Implementation Guide for Emerge

Status: Proposed Date: 2024-12-10 Author: Sandy, Claude Deciders: Sandy, Sayeed Related: Adopting Functional Flow Patterns


Purpose

This is a companion to the Functional Flow Patterns ADR. While that document describes the patterns we're adopting, this document shows exactly what changes in the emerge-turbo codebase - specific files, specific refactors, specific migration steps.


Current State Assessment

Files That Need Migration (by priority)

FileSizeCurrent IssuesMigration Priority
apps/api/src/lightweight-api-server.ts230KBAll routes in one file, catch-all errorsP0 - Split first
packages/core/src/redis-service.ts49KBMonolith, silent failuresP0 - Core dependency
apps/worker/src/redis-direct-worker-client.ts78KBPolling loops, mixed concernsP1
packages/core/src/job-broker.ts25KBImperative, state mutationsP1
apps/worker/src/connector-manager.ts20KBTwo-phase init, duck typingP2

Current Anti-Patterns Found

  1. String error throwing with expressive messages

    typescript
    // apps/worker/src/connector-manager.ts:88
    throw new Error(
      `SYSTEM IS FUCKED: service-mapping.json not found in any of these paths: ${possiblePaths.join(', ')}`
    );
  2. null as any type casting for deferred initialization

    typescript
    // apps/api/src/lightweight-api-server.ts
    this.redis = null as any;
    this.redisService = null as any;
  3. Catch-all 500 errors hiding root causes

    typescript
    // apps/api/src/lightweight-api-server.ts
    catch (error) {
      res.status(500).json({
        error: error instanceof Error ? error.message : 'Unknown error'
      });
    }
  4. State mutation in async flows

    typescript
    // packages/core/src/job-broker.ts
    let workflowPriority = request.priority || 50;
    let workflowDatetime = currentTime;
    if (metadata) {
      workflowPriority = metadata.priority;  // mutation
      workflowDatetime = metadata.submitted_at;  // mutation
    }

Target Architecture

New Folder Structure

packages/
  core/
    src/
      services/                    # Effect service definitions
        redis.ts                   # Redis service tag + interface
        database.ts                # Database service tag
        job-broker.ts              # Job broker service tag
        telemetry.ts               # Telemetry service tag

      services-live/               # Production implementations
        redis-live.ts              # Real Redis Layer
        database-live.ts           # Real Prisma/Postgres Layer
        job-broker-live.ts         # Real job broker Layer

      services-test/               # Test implementations
        redis-test.ts              # In-memory Map
        database-test.ts           # In-memory records
        job-broker-test.ts         # Immediate job processing

      errors/                      # Typed error definitions
        redis-errors.ts            # RedisConnectionFailed, RedisTimeout, etc.
        job-errors.ts              # JobNotFound, InvalidJobData, etc.
        workflow-errors.ts         # WorkflowNotFound, WorkflowExpired, etc.

apps/
  api/
    src/
      features/                    # Feature-based organization
        job-submission/
          index.ts                 # Public API: submitJob effect
          validate.ts              # Step 1: Validate request
          enrich.ts                # Step 2: Add workflow context
          submit.ts                # Step 3: Submit to Redis
          errors.ts                # JobSubmission-specific errors
          types.ts                 # Request/response types
          handler.ts               # Express route handler (edge)
          __tests__/
            submission.test.ts

        job-status/
          index.ts
          fetch.ts
          transform.ts
          errors.ts
          handler.ts

        machine-management/
          index.ts
          register.ts
          heartbeat.ts
          restart.ts
          errors.ts
          handler.ts

        workflow-tracking/
          index.ts
          create.ts
          update-status.ts
          get-progress.ts
          errors.ts
          handler.ts

      server.ts                    # Express app setup (thin)
      routes.ts                    # Route registration (thin)

  worker/
    src/
      features/
        job-processing/
          index.ts                 # Public API: processJob effect
          claim.ts                 # Atomic job claiming
          execute.ts               # Run connector
          complete.ts              # Mark done + cleanup
          fail.ts                  # Handle failures
          errors.ts
          types.ts

        heartbeat/
          index.ts
          send.ts
          check-health.ts
          errors.ts

        capability-reporting/
          index.ts
          gather.ts
          report.ts
          errors.ts

Specific Migrations

Migration 1: Job Submission Flow

Current Code (packages/core/src/job-broker.ts):

typescript
async submitJob(request: JobSubmissionRequest): Promise<string> {
  const jobId = uuidv4();
  const currentTime = Date.now();

  let workflowPriority = request.priority || 50;
  let workflowDatetime = currentTime;
  const workflowId = request.workflow_id;

  if (workflowId) {
    const metadata = await this.getWorkflowMetadata(workflowId);
    if (metadata) {
      workflowPriority = metadata.priority;
      workflowDatetime = metadata.submitted_at;
    } else {
      await this.createWorkflow(workflowPriority, request.customer_id, workflowId);
      workflowDatetime = currentTime;
    }
  } else if (request.workflow_priority !== undefined && ...) {
    workflowPriority = request.workflow_priority;
    workflowDatetime = request.workflow_datetime;
  }

  const job = { ... };
  return await this.submitJobToRedis(jobId, job, workflowPriority, workflowDatetime);
}

Target Code (apps/api/src/features/job-submission/index.ts):

typescript
import { Effect, pipe } from 'effect'
import * as Validate from './validate'
import * as Enrich from './enrich'
import * as Submit from './submit'
import { JobSubmissionRequest, JobSubmissionResult } from './types'

/**
 * Job Submission Pipeline:
 * Request → Validate → Resolve Workflow → Enrich → Submit to Redis
 */
export const submitJob = (
  request: JobSubmissionRequest
): Effect.Effect<
  JobSubmissionResult,
  InvalidJobData | WorkflowNotFound | RedisUnavailable,
  Redis | JobBroker
> =>
  pipe(
    // Step 1: Validate the incoming request
    Validate.request(request),

    // Step 2: Resolve workflow context (priority, datetime)
    Effect.flatMap(Enrich.withWorkflowContext),

    // Step 3: Generate job ID and build job record
    Effect.flatMap(Enrich.buildJobRecord),

    // Step 4: Submit to Redis with priority scoring
    Effect.flatMap(Submit.toRedis),

    // Step 5: Return submission result
    Effect.map((job) => ({
      success: true,
      jobId: job.id,
      workflowId: job.workflowId,
      position: job.queuePosition
    }))
  )

Workflow Context Resolution (apps/api/src/features/job-submission/enrich.ts):

typescript
import { Effect, pipe, Option } from 'effect'
import { Redis } from '@/services/redis'
import { WorkflowNotFound } from './errors'

interface ValidatedRequest {
  request: JobSubmissionRequest
  validatedAt: number
}

interface EnrichedRequest extends ValidatedRequest {
  workflowPriority: number
  workflowDatetime: number
  workflowId: string | null
}

/**
 * Resolve workflow context - no mutations, pure transformation
 */
export const withWorkflowContext = (
  validated: ValidatedRequest
): Effect.Effect<EnrichedRequest, WorkflowNotFound, Redis> =>
  pipe(
    // Check if request has explicit workflow
    Effect.succeed(validated.request.workflow_id),
    Effect.flatMap(
      Option.match({
        // No workflow ID - use request values or defaults
        onNone: () => Effect.succeed({
          ...validated,
          workflowPriority: validated.request.priority ?? 50,
          workflowDatetime: validated.validatedAt,
          workflowId: null
        }),

        // Has workflow ID - fetch or create
        onSome: (workflowId) => resolveWorkflow(workflowId, validated)
      })
    )
  )

const resolveWorkflow = (
  workflowId: string,
  validated: ValidatedRequest
): Effect.Effect<EnrichedRequest, WorkflowNotFound, Redis> =>
  pipe(
    // Try to get existing workflow
    Redis.getWorkflowMetadata(workflowId),
    Effect.flatMap(
      Option.match({
        // Workflow exists - use its values
        onSome: (metadata) => Effect.succeed({
          ...validated,
          workflowPriority: metadata.priority,
          workflowDatetime: metadata.submitted_at,
          workflowId
        }),

        // Workflow doesn't exist - create it
        onNone: () => pipe(
          Redis.createWorkflow({
            id: workflowId,
            priority: validated.request.priority ?? 50,
            customerId: validated.request.customer_id,
            createdAt: validated.validatedAt
          }),
          Effect.map(() => ({
            ...validated,
            workflowPriority: validated.request.priority ?? 50,
            workflowDatetime: validated.validatedAt,
            workflowId
          }))
        )
      })
    )
  )

Migration 2: Error Types

Current Errors (scattered string throws):

typescript
throw new Error('Content not found')
throw new Error('User not found')
throw new Error('Insufficient credits')
throw new Error('Generation failed')
throw new Error('SYSTEM IS FUCKED: service-mapping.json not found')

Target Errors (packages/core/src/errors/):

typescript
// packages/core/src/errors/job-errors.ts
import { Data } from 'effect'

export class JobNotFound extends Data.TaggedError('JobNotFound')<{
  jobId: string
  searchedQueues: string[]
}> {}

export class InvalidJobData extends Data.TaggedError('InvalidJobData')<{
  field: string
  reason: string
  value: unknown
}> {}

export class JobAlreadyClaimed extends Data.TaggedError('JobAlreadyClaimed')<{
  jobId: string
  claimedBy: string
  claimedAt: number
}> {}

export class JobExecutionFailed extends Data.TaggedError('JobExecutionFailed')<{
  jobId: string
  connector: string
  phase: 'initialization' | 'execution' | 'completion'
  cause: unknown
  retryable: boolean
}> {}
typescript
// packages/core/src/errors/redis-errors.ts
import { Data } from 'effect'

export class RedisConnectionFailed extends Data.TaggedError('RedisConnectionFailed')<{
  url: string
  cause: unknown
  retryCount: number
}> {}

export class RedisTimeout extends Data.TaggedError('RedisTimeout')<{
  operation: string
  timeoutMs: number
  key?: string
}> {}

export class RedisScriptError extends Data.TaggedError('RedisScriptError')<{
  scriptName: string
  cause: unknown
}> {}
typescript
// packages/core/src/errors/config-errors.ts
import { Data } from 'effect'

export class ServiceMappingNotFound extends Data.TaggedError('ServiceMappingNotFound')<{
  searchedPaths: string[]
}> {}

export class MissingEnvironmentVariable extends Data.TaggedError('MissingEnvironmentVariable')<{
  variable: string
  description: string
}> {}

export class InvalidConfiguration extends Data.TaggedError('InvalidConfiguration')<{
  key: string
  expectedType: string
  actualValue: unknown
}> {}

Migration 3: Redis Service

Current Code (packages/core/src/redis-service.ts - 49KB monolith):

typescript
class RedisService {
  private redis: Redis;
  private subscriber: Redis;
  private isConnectedFlag: boolean = false;

  constructor(redisUrlOrInstance: string | Redis, ...) {
    // Overloaded constructor
  }

  async connect(): Promise<void> { ... }
  async disconnect(): Promise<void> { ... }
  async submitJob(...): Promise<string> { ... }
  async claimJob(...): Promise<Job | null> { ... }
  async publishProgress(...): Promise<void> { ... }
  async subscribeToJob(...): Promise<void> { ... }
  // ... 40+ more methods
}

Target Code - Split into focused services:

typescript
// packages/core/src/services/redis.ts
import { Context, Effect, Layer } from 'effect'

// Service interface - what it can do
export interface RedisService {
  readonly ping: () => Effect.Effect<'PONG', RedisConnectionFailed>
  readonly get: (key: string) => Effect.Effect<Option<string>, RedisTimeout>
  readonly set: (key: string, value: string, ttlMs?: number) => Effect.Effect<void, RedisTimeout>
  readonly hget: (key: string, field: string) => Effect.Effect<Option<string>, RedisTimeout>
  readonly hset: (key: string, field: string, value: string) => Effect.Effect<void, RedisTimeout>
  readonly publish: (channel: string, message: string) => Effect.Effect<number, RedisTimeout>
  readonly subscribe: (channel: string) => Effect.Effect<Stream<string>, RedisConnectionFailed>
}

// Service tag - for dependency injection
export class Redis extends Context.Tag('Redis')<Redis, RedisService>() {}
typescript
// packages/core/src/services/job-queue.ts
import { Context, Effect } from 'effect'

export interface JobQueueService {
  readonly submit: (job: JobRecord) => Effect.Effect<string, RedisUnavailable>
  readonly claim: (workerId: string, capabilities: Capabilities) => Effect.Effect<Option<Job>, RedisUnavailable>
  readonly complete: (jobId: string, result: JobResult) => Effect.Effect<void, JobNotFound>
  readonly fail: (jobId: string, error: JobError) => Effect.Effect<void, JobNotFound>
  readonly getStatus: (jobId: string) => Effect.Effect<JobStatus, JobNotFound>
}

export class JobQueue extends Context.Tag('JobQueue')<JobQueue, JobQueueService>() {}
typescript
// packages/core/src/services-live/redis-live.ts
import { Layer, Effect } from 'effect'
import IORedis from 'ioredis'
import { Redis, RedisService } from '../services/redis'
import { RedisConnectionFailed, RedisTimeout } from '../errors/redis-errors'

export const RedisLive = (url: string): Layer.Layer<Redis, RedisConnectionFailed> =>
  Layer.scoped(
    Redis,
    Effect.gen(function* () {
      const client = new IORedis(url, {
        enableReadyCheck: true,
        maxRetriesPerRequest: 3,
        lazyConnect: true
      })

      // Connect with error handling
      yield* Effect.tryPromise({
        try: () => client.connect(),
        catch: (cause) => new RedisConnectionFailed({ url, cause, retryCount: 0 })
      })

      // Ensure cleanup on scope close
      yield* Effect.addFinalizer(() =>
        Effect.promise(() => client.quit())
      )

      // Return the service implementation
      return {
        ping: () => Effect.tryPromise({
          try: () => client.ping() as Promise<'PONG'>,
          catch: (cause) => new RedisConnectionFailed({ url, cause, retryCount: 0 })
        }),

        get: (key) => Effect.tryPromise({
          try: async () => {
            const result = await client.get(key)
            return result ? Option.some(result) : Option.none()
          },
          catch: () => new RedisTimeout({ operation: 'get', timeoutMs: 5000, key })
        }),

        set: (key, value, ttlMs) => Effect.tryPromise({
          try: () => ttlMs
            ? client.set(key, value, 'PX', ttlMs).then(() => {})
            : client.set(key, value).then(() => {}),
          catch: () => new RedisTimeout({ operation: 'set', timeoutMs: 5000, key })
        }),

        // ... other methods
      } satisfies RedisService
    })
  )
typescript
// packages/core/src/services-test/redis-test.ts
import { Layer, Effect, Ref } from 'effect'
import { Redis, RedisService } from '../services/redis'

export const RedisTest = Layer.sync(
  Redis,
  () => {
    const store = new Map<string, string>()
    const hashStore = new Map<string, Map<string, string>>()

    return {
      ping: () => Effect.succeed('PONG' as const),

      get: (key) => Effect.sync(() => {
        const value = store.get(key)
        return value ? Option.some(value) : Option.none()
      }),

      set: (key, value) => Effect.sync(() => {
        store.set(key, value)
      }),

      // ... simple in-memory implementations
    } satisfies RedisService
  }
)

Migration 4: API Route Handlers

Current Code (apps/api/src/lightweight-api-server.ts):

typescript
this.app.post('/api/jobs', async (req: Request, res: Response) => {
  try {
    const jobId = await telemetryClient.withSpan('job.submit', async (span) => {
      const jobData = req.body;
      span.setAttribute('job.workflow_id', jobData.workflow_id || 'unknown');
      return await this.submitJob(jobDataWithTrace);
    });
    res.status(201).json({ success: true, job_id: jobId });
  } catch (error) {
    logger.error('Job submission failed:', error);
    res.status(500).json({
      success: false,
      error: error instanceof Error ? error.message : 'Unknown error'
    });
  }
});

Target Code (apps/api/src/features/job-submission/handler.ts):

typescript
import { Effect, pipe } from 'effect'
import { Request, Response } from 'express'
import { submitJob } from './index'
import { InvalidJobData, WorkflowNotFound, RedisUnavailable } from './errors'
import { LiveServices } from '@/services'

/**
 * Express handler - thin wrapper that:
 * 1. Extracts request data
 * 2. Runs the Effect
 * 3. Maps results/errors to HTTP responses
 */
export const handleJobSubmission = (req: Request, res: Response) => {
  const program = pipe(
    // Run the pure business logic
    submitJob(req.body),

    // Map typed errors to HTTP responses
    Effect.catchTags({
      InvalidJobData: (e) =>
        Effect.succeed({
          status: 400,
          body: {
            success: false,
            error: 'invalid_request',
            field: e.field,
            reason: e.reason
          }
        }),

      WorkflowNotFound: (e) =>
        Effect.succeed({
          status: 404,
          body: {
            success: false,
            error: 'workflow_not_found',
            workflowId: e.workflowId
          }
        }),

      RedisUnavailable: () =>
        Effect.succeed({
          status: 503,
          body: {
            success: false,
            error: 'service_unavailable',
            retryAfter: 5
          }
        })
    }),

    // Map success to HTTP response
    Effect.map((result) =>
      'status' in result
        ? result
        : { status: 201, body: result }
    ),

    // Provide all dependencies
    Effect.provide(LiveServices)
  )

  // Run and send response
  Effect.runPromise(program).then(({ status, body }) => {
    res.status(status).json(body)
  })
}
typescript
// apps/api/src/routes.ts - thin registration
import { Router } from 'express'
import { handleJobSubmission } from './features/job-submission/handler'
import { handleJobStatus } from './features/job-status/handler'
import { handleMachineRegister } from './features/machine-management/handler'

export const createRouter = () => {
  const router = Router()

  // Job routes
  router.post('/api/jobs', handleJobSubmission)
  router.get('/api/jobs/:id', handleJobStatus)

  // Machine routes
  router.post('/api/machines/register', handleMachineRegister)

  return router
}

Migration 5: Worker Job Processing

Current Code (apps/worker/src/redis-direct-worker-client.ts - simplified):

typescript
private async processJob(job: Job): Promise<void> {
  try {
    this.currentJob = job;

    const connector = this.connectorManager.getConnector(job.connector_name);
    if (!connector) {
      throw new Error(`No connector found for ${job.connector_name}`);
    }

    const result = await connector.execute(job);
    await this.completeJob(job.id, result);

  } catch (error) {
    await this.failJob(job.id, error);
  } finally {
    this.currentJob = null;
  }
}

Target Code (apps/worker/src/features/job-processing/index.ts):

typescript
import { Effect, pipe } from 'effect'
import * as Claim from './claim'
import * as Execute from './execute'
import * as Complete from './complete'
import * as Fail from './fail'
import { JobProcessingResult, ProcessingError } from './types'

/**
 * Job Processing Pipeline:
 * Claim → Load Connector → Execute → Complete/Fail
 */
export const processNextJob = (
  workerId: string,
  capabilities: WorkerCapabilities
): Effect.Effect<
  JobProcessingResult,
  NoJobsAvailable,
  Redis | ConnectorRegistry | Telemetry
> =>
  pipe(
    // Atomically claim a job
    Claim.nextJob(workerId, capabilities),

    // Load the appropriate connector
    Effect.flatMap((job) =>
      pipe(
        Execute.loadConnector(job.connector_name),
        Effect.map((connector) => ({ job, connector }))
      )
    ),

    // Execute with automatic failure handling
    Effect.flatMap(({ job, connector }) =>
      pipe(
        Execute.run(connector, job),
        Effect.matchEffect({
          onSuccess: (result) => Complete.job(job.id, result),
          onFailure: (error) => Fail.job(job.id, error)
        })
      )
    ),

    // Add telemetry
    Effect.tap((result) =>
      Telemetry.recordJobProcessed(result)
    )
  )
typescript
// apps/worker/src/features/job-processing/execute.ts
import { Effect } from 'effect'
import { ConnectorRegistry } from '@/services/connector-registry'
import { ConnectorNotFound, ExecutionFailed } from './errors'

export const loadConnector = (
  connectorName: string
): Effect.Effect<Connector, ConnectorNotFound, ConnectorRegistry> =>
  Effect.gen(function* () {
    const registry = yield* ConnectorRegistry
    const connector = yield* registry.get(connectorName)

    if (Option.isNone(connector)) {
      return yield* Effect.fail(new ConnectorNotFound({
        connectorName,
        availableConnectors: yield* registry.list()
      }))
    }

    return connector.value
  })

export const run = (
  connector: Connector,
  job: Job
): Effect.Effect<JobResult, ExecutionFailed, Telemetry> =>
  Effect.gen(function* () {
    const telemetry = yield* Telemetry

    return yield* pipe(
      // Wrap connector execution (which returns Promise)
      Effect.tryPromise({
        try: () => connector.execute(job),
        catch: (cause) => new ExecutionFailed({
          jobId: job.id,
          connector: connector.name,
          phase: 'execution',
          cause,
          retryable: isRetryable(cause)
        })
      }),

      // Add execution timing
      Effect.tap(() => telemetry.recordExecutionTime(job.id, connector.name))
    )
  })

Testing Strategy

Before: Mock-Heavy Tests

typescript
// Current approach requires extensive mocking
jest.mock('@/lib/redis')
jest.mock('@/lib/db')

describe('submitJob', () => {
  it('should submit job', async () => {
    mockRedis.set.mockResolvedValue('OK')
    mockDb.jobs.create.mockResolvedValue({ id: 'job-1' })

    const result = await submitJob(data)
    expect(result.id).toBe('job-1')
  })
})

After: Dependency Injection Tests

typescript
// Effect approach - no mocking, just different implementations
import { Effect } from 'effect'
import { submitJob } from './index'
import { RedisTest } from '@/services-test/redis-test'
import { JobBrokerTest } from '@/services-test/job-broker-test'

describe('submitJob', () => {
  const TestServices = Layer.merge(RedisTest, JobBrokerTest)

  it('should submit valid job', async () => {
    const program = pipe(
      submitJob({ workflow_id: 'wf-1', connector_name: 'test' }),
      Effect.provide(TestServices)
    )

    const result = await Effect.runPromise(program)
    expect(result.success).toBe(true)
    expect(result.jobId).toBeDefined()
  })

  it('should fail on invalid data', async () => {
    const program = pipe(
      submitJob({ /* missing required fields */ }),
      Effect.provide(TestServices)
    )

    const result = await Effect.runPromiseExit(program)
    expect(Exit.isFailure(result)).toBe(true)
    expect(Cause.failureOption(result.cause)?._tag).toBe('InvalidJobData')
  })
})

Migration Order

Phase 1: Foundation (Week 1)

  1. Install Effect

    bash
    pnpm add effect
  2. Create service definitions (no implementation changes yet)

    • packages/core/src/services/redis.ts
    • packages/core/src/services/job-queue.ts
    • packages/core/src/errors/*.ts
  3. Create test layers

    • packages/core/src/services-test/redis-test.ts
    • Enables new code to be written with Effect immediately

Phase 2: First Feature Migration (Week 2)

  1. Migrate job submission (new feature or greenfield endpoint)

    • Create apps/api/src/features/job-submission/
    • Implement pure Effect pipeline
    • Wrap existing submitJob in Effect.tryPromise initially
    • Add typed errors
  2. Create live Redis layer

    • Wraps existing IORedis client
    • Existing code continues to work

Phase 3: Worker Migration (Week 3-4)

  1. Migrate job processing

    • Create apps/worker/src/features/job-processing/
    • Keep connector interface as Promise-based (wrap at boundary)
    • Add typed execution errors
  2. Migrate heartbeat/capability

    • Simpler flows, good practice

Phase 4: Gradual Cleanup (Ongoing)

  1. Refactor as touched
    • When fixing bugs in old code, migrate that module
    • Don't migrate working code that isn't being changed

Interop Patterns

Wrapping Existing Promise Code

typescript
// Existing connector returns Promise
const legacyConnector = {
  execute: async (job: Job): Promise<Result> => { ... }
}

// Wrap at the boundary
const executeConnector = (connector: LegacyConnector, job: Job) =>
  Effect.tryPromise({
    try: () => connector.execute(job),
    catch: (cause) => new ExecutionFailed({ jobId: job.id, cause })
  })

Exposing Effect as Promise (for existing callers)

typescript
// New Effect-based implementation
export const submitJob = (request: Request): Effect.Effect<Result, Error, Deps> => ...

// Export Promise wrapper for existing code
export const submitJobPromise = async (request: Request): Promise<Result> => {
  return Effect.runPromise(
    pipe(
      submitJob(request),
      Effect.provide(LiveServices)
    )
  )
}

Success Metrics

MetricCurrentTargetHow to Measure
Error type coverage~10% typed80% typedCount TaggedError vs string throws
Test mock countHighZeroGrep for jest.mock
Files per feature5-8 across dirs1 folderCount folders opened to understand a flow
Error response accuracyCatch-all 500sSpecific status codesAPI response audit
Time to understand flowMinutesSecondsDev survey

Decision

Proceed with Effect migration following the phased approach above, starting with job submission as the pilot feature.


Appendix: Quick Reference

Effect Imports

typescript
import { Effect, pipe, Option, Either, Layer, Context, Data } from 'effect'

Creating a Typed Error

typescript
class MyError extends Data.TaggedError('MyError')<{
  field: string
  reason: string
}> {}

Creating a Service

typescript
class MyService extends Context.Tag('MyService')<MyService, {
  readonly doThing: (x: string) => Effect.Effect<Result, MyError>
}>() {}

Providing Dependencies

typescript
const program = pipe(
  myEffect,
  Effect.provide(ServiceLive),
  Effect.provide(OtherServiceLive)
)

Running an Effect

typescript
// As Promise
const result = await Effect.runPromise(program)

// With exit for error handling
const exit = await Effect.runPromiseExit(program)
if (Exit.isSuccess(exit)) {
  console.log(exit.value)
} else {
  console.log(Cause.pretty(exit.cause))
}

Released under the MIT License.