Effect Implementation Guide for Emerge
Status: Proposed Date: 2024-12-10 Author: Sandy, Claude Deciders: Sandy, Sayeed Related: Adopting Functional Flow Patterns
Purpose
This is a companion to the Functional Flow Patterns ADR. While that document describes the patterns we're adopting, this document shows exactly what changes in the emerge-turbo codebase - specific files, specific refactors, specific migration steps.
Current State Assessment
Files That Need Migration (by priority)
| File | Size | Current Issues | Migration Priority |
|---|---|---|---|
apps/api/src/lightweight-api-server.ts | 230KB | All routes in one file, catch-all errors | P0 - Split first |
packages/core/src/redis-service.ts | 49KB | Monolith, silent failures | P0 - Core dependency |
apps/worker/src/redis-direct-worker-client.ts | 78KB | Polling loops, mixed concerns | P1 |
packages/core/src/job-broker.ts | 25KB | Imperative, state mutations | P1 |
apps/worker/src/connector-manager.ts | 20KB | Two-phase init, duck typing | P2 |
Current Anti-Patterns Found
String error throwing with expressive messages
typescript// apps/worker/src/connector-manager.ts:88 throw new Error( `SYSTEM IS FUCKED: service-mapping.json not found in any of these paths: ${possiblePaths.join(', ')}` );null as anytype casting for deferred initializationtypescript// apps/api/src/lightweight-api-server.ts this.redis = null as any; this.redisService = null as any;Catch-all 500 errors hiding root causes
typescript// apps/api/src/lightweight-api-server.ts catch (error) { res.status(500).json({ error: error instanceof Error ? error.message : 'Unknown error' }); }State mutation in async flows
typescript// packages/core/src/job-broker.ts let workflowPriority = request.priority || 50; let workflowDatetime = currentTime; if (metadata) { workflowPriority = metadata.priority; // mutation workflowDatetime = metadata.submitted_at; // mutation }
Target Architecture
New Folder Structure
packages/
core/
src/
services/ # Effect service definitions
redis.ts # Redis service tag + interface
database.ts # Database service tag
job-broker.ts # Job broker service tag
telemetry.ts # Telemetry service tag
services-live/ # Production implementations
redis-live.ts # Real Redis Layer
database-live.ts # Real Prisma/Postgres Layer
job-broker-live.ts # Real job broker Layer
services-test/ # Test implementations
redis-test.ts # In-memory Map
database-test.ts # In-memory records
job-broker-test.ts # Immediate job processing
errors/ # Typed error definitions
redis-errors.ts # RedisConnectionFailed, RedisTimeout, etc.
job-errors.ts # JobNotFound, InvalidJobData, etc.
workflow-errors.ts # WorkflowNotFound, WorkflowExpired, etc.
apps/
api/
src/
features/ # Feature-based organization
job-submission/
index.ts # Public API: submitJob effect
validate.ts # Step 1: Validate request
enrich.ts # Step 2: Add workflow context
submit.ts # Step 3: Submit to Redis
errors.ts # JobSubmission-specific errors
types.ts # Request/response types
handler.ts # Express route handler (edge)
__tests__/
submission.test.ts
job-status/
index.ts
fetch.ts
transform.ts
errors.ts
handler.ts
machine-management/
index.ts
register.ts
heartbeat.ts
restart.ts
errors.ts
handler.ts
workflow-tracking/
index.ts
create.ts
update-status.ts
get-progress.ts
errors.ts
handler.ts
server.ts # Express app setup (thin)
routes.ts # Route registration (thin)
worker/
src/
features/
job-processing/
index.ts # Public API: processJob effect
claim.ts # Atomic job claiming
execute.ts # Run connector
complete.ts # Mark done + cleanup
fail.ts # Handle failures
errors.ts
types.ts
heartbeat/
index.ts
send.ts
check-health.ts
errors.ts
capability-reporting/
index.ts
gather.ts
report.ts
errors.tsSpecific Migrations
Migration 1: Job Submission Flow
Current Code (packages/core/src/job-broker.ts):
async submitJob(request: JobSubmissionRequest): Promise<string> {
const jobId = uuidv4();
const currentTime = Date.now();
let workflowPriority = request.priority || 50;
let workflowDatetime = currentTime;
const workflowId = request.workflow_id;
if (workflowId) {
const metadata = await this.getWorkflowMetadata(workflowId);
if (metadata) {
workflowPriority = metadata.priority;
workflowDatetime = metadata.submitted_at;
} else {
await this.createWorkflow(workflowPriority, request.customer_id, workflowId);
workflowDatetime = currentTime;
}
} else if (request.workflow_priority !== undefined && ...) {
workflowPriority = request.workflow_priority;
workflowDatetime = request.workflow_datetime;
}
const job = { ... };
return await this.submitJobToRedis(jobId, job, workflowPriority, workflowDatetime);
}Target Code (apps/api/src/features/job-submission/index.ts):
import { Effect, pipe } from 'effect'
import * as Validate from './validate'
import * as Enrich from './enrich'
import * as Submit from './submit'
import { JobSubmissionRequest, JobSubmissionResult } from './types'
/**
* Job Submission Pipeline:
* Request → Validate → Resolve Workflow → Enrich → Submit to Redis
*/
export const submitJob = (
request: JobSubmissionRequest
): Effect.Effect<
JobSubmissionResult,
InvalidJobData | WorkflowNotFound | RedisUnavailable,
Redis | JobBroker
> =>
pipe(
// Step 1: Validate the incoming request
Validate.request(request),
// Step 2: Resolve workflow context (priority, datetime)
Effect.flatMap(Enrich.withWorkflowContext),
// Step 3: Generate job ID and build job record
Effect.flatMap(Enrich.buildJobRecord),
// Step 4: Submit to Redis with priority scoring
Effect.flatMap(Submit.toRedis),
// Step 5: Return submission result
Effect.map((job) => ({
success: true,
jobId: job.id,
workflowId: job.workflowId,
position: job.queuePosition
}))
)Workflow Context Resolution (apps/api/src/features/job-submission/enrich.ts):
import { Effect, pipe, Option } from 'effect'
import { Redis } from '@/services/redis'
import { WorkflowNotFound } from './errors'
interface ValidatedRequest {
request: JobSubmissionRequest
validatedAt: number
}
interface EnrichedRequest extends ValidatedRequest {
workflowPriority: number
workflowDatetime: number
workflowId: string | null
}
/**
* Resolve workflow context - no mutations, pure transformation
*/
export const withWorkflowContext = (
validated: ValidatedRequest
): Effect.Effect<EnrichedRequest, WorkflowNotFound, Redis> =>
pipe(
// Check if request has explicit workflow
Effect.succeed(validated.request.workflow_id),
Effect.flatMap(
Option.match({
// No workflow ID - use request values or defaults
onNone: () => Effect.succeed({
...validated,
workflowPriority: validated.request.priority ?? 50,
workflowDatetime: validated.validatedAt,
workflowId: null
}),
// Has workflow ID - fetch or create
onSome: (workflowId) => resolveWorkflow(workflowId, validated)
})
)
)
const resolveWorkflow = (
workflowId: string,
validated: ValidatedRequest
): Effect.Effect<EnrichedRequest, WorkflowNotFound, Redis> =>
pipe(
// Try to get existing workflow
Redis.getWorkflowMetadata(workflowId),
Effect.flatMap(
Option.match({
// Workflow exists - use its values
onSome: (metadata) => Effect.succeed({
...validated,
workflowPriority: metadata.priority,
workflowDatetime: metadata.submitted_at,
workflowId
}),
// Workflow doesn't exist - create it
onNone: () => pipe(
Redis.createWorkflow({
id: workflowId,
priority: validated.request.priority ?? 50,
customerId: validated.request.customer_id,
createdAt: validated.validatedAt
}),
Effect.map(() => ({
...validated,
workflowPriority: validated.request.priority ?? 50,
workflowDatetime: validated.validatedAt,
workflowId
}))
)
})
)
)Migration 2: Error Types
Current Errors (scattered string throws):
throw new Error('Content not found')
throw new Error('User not found')
throw new Error('Insufficient credits')
throw new Error('Generation failed')
throw new Error('SYSTEM IS FUCKED: service-mapping.json not found')Target Errors (packages/core/src/errors/):
// packages/core/src/errors/job-errors.ts
import { Data } from 'effect'
export class JobNotFound extends Data.TaggedError('JobNotFound')<{
jobId: string
searchedQueues: string[]
}> {}
export class InvalidJobData extends Data.TaggedError('InvalidJobData')<{
field: string
reason: string
value: unknown
}> {}
export class JobAlreadyClaimed extends Data.TaggedError('JobAlreadyClaimed')<{
jobId: string
claimedBy: string
claimedAt: number
}> {}
export class JobExecutionFailed extends Data.TaggedError('JobExecutionFailed')<{
jobId: string
connector: string
phase: 'initialization' | 'execution' | 'completion'
cause: unknown
retryable: boolean
}> {}// packages/core/src/errors/redis-errors.ts
import { Data } from 'effect'
export class RedisConnectionFailed extends Data.TaggedError('RedisConnectionFailed')<{
url: string
cause: unknown
retryCount: number
}> {}
export class RedisTimeout extends Data.TaggedError('RedisTimeout')<{
operation: string
timeoutMs: number
key?: string
}> {}
export class RedisScriptError extends Data.TaggedError('RedisScriptError')<{
scriptName: string
cause: unknown
}> {}// packages/core/src/errors/config-errors.ts
import { Data } from 'effect'
export class ServiceMappingNotFound extends Data.TaggedError('ServiceMappingNotFound')<{
searchedPaths: string[]
}> {}
export class MissingEnvironmentVariable extends Data.TaggedError('MissingEnvironmentVariable')<{
variable: string
description: string
}> {}
export class InvalidConfiguration extends Data.TaggedError('InvalidConfiguration')<{
key: string
expectedType: string
actualValue: unknown
}> {}Migration 3: Redis Service
Current Code (packages/core/src/redis-service.ts - 49KB monolith):
class RedisService {
private redis: Redis;
private subscriber: Redis;
private isConnectedFlag: boolean = false;
constructor(redisUrlOrInstance: string | Redis, ...) {
// Overloaded constructor
}
async connect(): Promise<void> { ... }
async disconnect(): Promise<void> { ... }
async submitJob(...): Promise<string> { ... }
async claimJob(...): Promise<Job | null> { ... }
async publishProgress(...): Promise<void> { ... }
async subscribeToJob(...): Promise<void> { ... }
// ... 40+ more methods
}Target Code - Split into focused services:
// packages/core/src/services/redis.ts
import { Context, Effect, Layer } from 'effect'
// Service interface - what it can do
export interface RedisService {
readonly ping: () => Effect.Effect<'PONG', RedisConnectionFailed>
readonly get: (key: string) => Effect.Effect<Option<string>, RedisTimeout>
readonly set: (key: string, value: string, ttlMs?: number) => Effect.Effect<void, RedisTimeout>
readonly hget: (key: string, field: string) => Effect.Effect<Option<string>, RedisTimeout>
readonly hset: (key: string, field: string, value: string) => Effect.Effect<void, RedisTimeout>
readonly publish: (channel: string, message: string) => Effect.Effect<number, RedisTimeout>
readonly subscribe: (channel: string) => Effect.Effect<Stream<string>, RedisConnectionFailed>
}
// Service tag - for dependency injection
export class Redis extends Context.Tag('Redis')<Redis, RedisService>() {}// packages/core/src/services/job-queue.ts
import { Context, Effect } from 'effect'
export interface JobQueueService {
readonly submit: (job: JobRecord) => Effect.Effect<string, RedisUnavailable>
readonly claim: (workerId: string, capabilities: Capabilities) => Effect.Effect<Option<Job>, RedisUnavailable>
readonly complete: (jobId: string, result: JobResult) => Effect.Effect<void, JobNotFound>
readonly fail: (jobId: string, error: JobError) => Effect.Effect<void, JobNotFound>
readonly getStatus: (jobId: string) => Effect.Effect<JobStatus, JobNotFound>
}
export class JobQueue extends Context.Tag('JobQueue')<JobQueue, JobQueueService>() {}// packages/core/src/services-live/redis-live.ts
import { Layer, Effect } from 'effect'
import IORedis from 'ioredis'
import { Redis, RedisService } from '../services/redis'
import { RedisConnectionFailed, RedisTimeout } from '../errors/redis-errors'
export const RedisLive = (url: string): Layer.Layer<Redis, RedisConnectionFailed> =>
Layer.scoped(
Redis,
Effect.gen(function* () {
const client = new IORedis(url, {
enableReadyCheck: true,
maxRetriesPerRequest: 3,
lazyConnect: true
})
// Connect with error handling
yield* Effect.tryPromise({
try: () => client.connect(),
catch: (cause) => new RedisConnectionFailed({ url, cause, retryCount: 0 })
})
// Ensure cleanup on scope close
yield* Effect.addFinalizer(() =>
Effect.promise(() => client.quit())
)
// Return the service implementation
return {
ping: () => Effect.tryPromise({
try: () => client.ping() as Promise<'PONG'>,
catch: (cause) => new RedisConnectionFailed({ url, cause, retryCount: 0 })
}),
get: (key) => Effect.tryPromise({
try: async () => {
const result = await client.get(key)
return result ? Option.some(result) : Option.none()
},
catch: () => new RedisTimeout({ operation: 'get', timeoutMs: 5000, key })
}),
set: (key, value, ttlMs) => Effect.tryPromise({
try: () => ttlMs
? client.set(key, value, 'PX', ttlMs).then(() => {})
: client.set(key, value).then(() => {}),
catch: () => new RedisTimeout({ operation: 'set', timeoutMs: 5000, key })
}),
// ... other methods
} satisfies RedisService
})
)// packages/core/src/services-test/redis-test.ts
import { Layer, Effect, Ref } from 'effect'
import { Redis, RedisService } from '../services/redis'
export const RedisTest = Layer.sync(
Redis,
() => {
const store = new Map<string, string>()
const hashStore = new Map<string, Map<string, string>>()
return {
ping: () => Effect.succeed('PONG' as const),
get: (key) => Effect.sync(() => {
const value = store.get(key)
return value ? Option.some(value) : Option.none()
}),
set: (key, value) => Effect.sync(() => {
store.set(key, value)
}),
// ... simple in-memory implementations
} satisfies RedisService
}
)Migration 4: API Route Handlers
Current Code (apps/api/src/lightweight-api-server.ts):
this.app.post('/api/jobs', async (req: Request, res: Response) => {
try {
const jobId = await telemetryClient.withSpan('job.submit', async (span) => {
const jobData = req.body;
span.setAttribute('job.workflow_id', jobData.workflow_id || 'unknown');
return await this.submitJob(jobDataWithTrace);
});
res.status(201).json({ success: true, job_id: jobId });
} catch (error) {
logger.error('Job submission failed:', error);
res.status(500).json({
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
});
}
});Target Code (apps/api/src/features/job-submission/handler.ts):
import { Effect, pipe } from 'effect'
import { Request, Response } from 'express'
import { submitJob } from './index'
import { InvalidJobData, WorkflowNotFound, RedisUnavailable } from './errors'
import { LiveServices } from '@/services'
/**
* Express handler - thin wrapper that:
* 1. Extracts request data
* 2. Runs the Effect
* 3. Maps results/errors to HTTP responses
*/
export const handleJobSubmission = (req: Request, res: Response) => {
const program = pipe(
// Run the pure business logic
submitJob(req.body),
// Map typed errors to HTTP responses
Effect.catchTags({
InvalidJobData: (e) =>
Effect.succeed({
status: 400,
body: {
success: false,
error: 'invalid_request',
field: e.field,
reason: e.reason
}
}),
WorkflowNotFound: (e) =>
Effect.succeed({
status: 404,
body: {
success: false,
error: 'workflow_not_found',
workflowId: e.workflowId
}
}),
RedisUnavailable: () =>
Effect.succeed({
status: 503,
body: {
success: false,
error: 'service_unavailable',
retryAfter: 5
}
})
}),
// Map success to HTTP response
Effect.map((result) =>
'status' in result
? result
: { status: 201, body: result }
),
// Provide all dependencies
Effect.provide(LiveServices)
)
// Run and send response
Effect.runPromise(program).then(({ status, body }) => {
res.status(status).json(body)
})
}// apps/api/src/routes.ts - thin registration
import { Router } from 'express'
import { handleJobSubmission } from './features/job-submission/handler'
import { handleJobStatus } from './features/job-status/handler'
import { handleMachineRegister } from './features/machine-management/handler'
export const createRouter = () => {
const router = Router()
// Job routes
router.post('/api/jobs', handleJobSubmission)
router.get('/api/jobs/:id', handleJobStatus)
// Machine routes
router.post('/api/machines/register', handleMachineRegister)
return router
}Migration 5: Worker Job Processing
Current Code (apps/worker/src/redis-direct-worker-client.ts - simplified):
private async processJob(job: Job): Promise<void> {
try {
this.currentJob = job;
const connector = this.connectorManager.getConnector(job.connector_name);
if (!connector) {
throw new Error(`No connector found for ${job.connector_name}`);
}
const result = await connector.execute(job);
await this.completeJob(job.id, result);
} catch (error) {
await this.failJob(job.id, error);
} finally {
this.currentJob = null;
}
}Target Code (apps/worker/src/features/job-processing/index.ts):
import { Effect, pipe } from 'effect'
import * as Claim from './claim'
import * as Execute from './execute'
import * as Complete from './complete'
import * as Fail from './fail'
import { JobProcessingResult, ProcessingError } from './types'
/**
* Job Processing Pipeline:
* Claim → Load Connector → Execute → Complete/Fail
*/
export const processNextJob = (
workerId: string,
capabilities: WorkerCapabilities
): Effect.Effect<
JobProcessingResult,
NoJobsAvailable,
Redis | ConnectorRegistry | Telemetry
> =>
pipe(
// Atomically claim a job
Claim.nextJob(workerId, capabilities),
// Load the appropriate connector
Effect.flatMap((job) =>
pipe(
Execute.loadConnector(job.connector_name),
Effect.map((connector) => ({ job, connector }))
)
),
// Execute with automatic failure handling
Effect.flatMap(({ job, connector }) =>
pipe(
Execute.run(connector, job),
Effect.matchEffect({
onSuccess: (result) => Complete.job(job.id, result),
onFailure: (error) => Fail.job(job.id, error)
})
)
),
// Add telemetry
Effect.tap((result) =>
Telemetry.recordJobProcessed(result)
)
)// apps/worker/src/features/job-processing/execute.ts
import { Effect } from 'effect'
import { ConnectorRegistry } from '@/services/connector-registry'
import { ConnectorNotFound, ExecutionFailed } from './errors'
export const loadConnector = (
connectorName: string
): Effect.Effect<Connector, ConnectorNotFound, ConnectorRegistry> =>
Effect.gen(function* () {
const registry = yield* ConnectorRegistry
const connector = yield* registry.get(connectorName)
if (Option.isNone(connector)) {
return yield* Effect.fail(new ConnectorNotFound({
connectorName,
availableConnectors: yield* registry.list()
}))
}
return connector.value
})
export const run = (
connector: Connector,
job: Job
): Effect.Effect<JobResult, ExecutionFailed, Telemetry> =>
Effect.gen(function* () {
const telemetry = yield* Telemetry
return yield* pipe(
// Wrap connector execution (which returns Promise)
Effect.tryPromise({
try: () => connector.execute(job),
catch: (cause) => new ExecutionFailed({
jobId: job.id,
connector: connector.name,
phase: 'execution',
cause,
retryable: isRetryable(cause)
})
}),
// Add execution timing
Effect.tap(() => telemetry.recordExecutionTime(job.id, connector.name))
)
})Testing Strategy
Before: Mock-Heavy Tests
// Current approach requires extensive mocking
jest.mock('@/lib/redis')
jest.mock('@/lib/db')
describe('submitJob', () => {
it('should submit job', async () => {
mockRedis.set.mockResolvedValue('OK')
mockDb.jobs.create.mockResolvedValue({ id: 'job-1' })
const result = await submitJob(data)
expect(result.id).toBe('job-1')
})
})After: Dependency Injection Tests
// Effect approach - no mocking, just different implementations
import { Effect } from 'effect'
import { submitJob } from './index'
import { RedisTest } from '@/services-test/redis-test'
import { JobBrokerTest } from '@/services-test/job-broker-test'
describe('submitJob', () => {
const TestServices = Layer.merge(RedisTest, JobBrokerTest)
it('should submit valid job', async () => {
const program = pipe(
submitJob({ workflow_id: 'wf-1', connector_name: 'test' }),
Effect.provide(TestServices)
)
const result = await Effect.runPromise(program)
expect(result.success).toBe(true)
expect(result.jobId).toBeDefined()
})
it('should fail on invalid data', async () => {
const program = pipe(
submitJob({ /* missing required fields */ }),
Effect.provide(TestServices)
)
const result = await Effect.runPromiseExit(program)
expect(Exit.isFailure(result)).toBe(true)
expect(Cause.failureOption(result.cause)?._tag).toBe('InvalidJobData')
})
})Migration Order
Phase 1: Foundation (Week 1)
Install Effect
bashpnpm add effectCreate service definitions (no implementation changes yet)
packages/core/src/services/redis.tspackages/core/src/services/job-queue.tspackages/core/src/errors/*.ts
Create test layers
packages/core/src/services-test/redis-test.ts- Enables new code to be written with Effect immediately
Phase 2: First Feature Migration (Week 2)
Migrate job submission (new feature or greenfield endpoint)
- Create
apps/api/src/features/job-submission/ - Implement pure Effect pipeline
- Wrap existing
submitJobin Effect.tryPromise initially - Add typed errors
- Create
Create live Redis layer
- Wraps existing IORedis client
- Existing code continues to work
Phase 3: Worker Migration (Week 3-4)
Migrate job processing
- Create
apps/worker/src/features/job-processing/ - Keep connector interface as Promise-based (wrap at boundary)
- Add typed execution errors
- Create
Migrate heartbeat/capability
- Simpler flows, good practice
Phase 4: Gradual Cleanup (Ongoing)
- Refactor as touched
- When fixing bugs in old code, migrate that module
- Don't migrate working code that isn't being changed
Interop Patterns
Wrapping Existing Promise Code
// Existing connector returns Promise
const legacyConnector = {
execute: async (job: Job): Promise<Result> => { ... }
}
// Wrap at the boundary
const executeConnector = (connector: LegacyConnector, job: Job) =>
Effect.tryPromise({
try: () => connector.execute(job),
catch: (cause) => new ExecutionFailed({ jobId: job.id, cause })
})Exposing Effect as Promise (for existing callers)
// New Effect-based implementation
export const submitJob = (request: Request): Effect.Effect<Result, Error, Deps> => ...
// Export Promise wrapper for existing code
export const submitJobPromise = async (request: Request): Promise<Result> => {
return Effect.runPromise(
pipe(
submitJob(request),
Effect.provide(LiveServices)
)
)
}Success Metrics
| Metric | Current | Target | How to Measure |
|---|---|---|---|
| Error type coverage | ~10% typed | 80% typed | Count TaggedError vs string throws |
| Test mock count | High | Zero | Grep for jest.mock |
| Files per feature | 5-8 across dirs | 1 folder | Count folders opened to understand a flow |
| Error response accuracy | Catch-all 500s | Specific status codes | API response audit |
| Time to understand flow | Minutes | Seconds | Dev survey |
Decision
Proceed with Effect migration following the phased approach above, starting with job submission as the pilot feature.
Appendix: Quick Reference
Effect Imports
import { Effect, pipe, Option, Either, Layer, Context, Data } from 'effect'Creating a Typed Error
class MyError extends Data.TaggedError('MyError')<{
field: string
reason: string
}> {}Creating a Service
class MyService extends Context.Tag('MyService')<MyService, {
readonly doThing: (x: string) => Effect.Effect<Result, MyError>
}>() {}Providing Dependencies
const program = pipe(
myEffect,
Effect.provide(ServiceLive),
Effect.provide(OtherServiceLive)
)Running an Effect
// As Promise
const result = await Effect.runPromise(program)
// With exit for error handling
const exit = await Effect.runPromiseExit(program)
if (Exit.isSuccess(exit)) {
console.log(exit.value)
} else {
console.log(Cause.pretty(exit.cause))
}