Skip to content

Adopting Functional Flow Patterns in Emerge

Status: Proposed
Date: 2024-12-10
Author: Sandy
Deciders: Sandy, Sayeed


Context

Emerge is a content platform built on Node/TypeScript. As the codebase grows, we're experiencing friction:

  1. Hard to trace data flow — Business logic scattered across files, callbacks, and middleware
  2. Error handling is inconsistent — Mix of try/catch, .catch(), and unhandled rejections
  3. Testing is painful — Tight coupling makes unit tests require extensive mocking
  4. Debugging async flows — When something fails in a pipeline, finding the source is tedious

The founder (Sandy) naturally thinks in terms of data flowing through connected systems. The current codebase doesn't match this mental model, making it harder to reason about and extend.

We considered switching to Elixir/Phoenix but need to preserve ecosystem compatibility (Farcaster tools, existing integrations, team knowledge).


Decision

We will incrementally adopt functional flow patterns in the existing TypeScript codebase using:

  1. Effect — For typed, composable pipelines with built-in error handling
  2. Feature-based folder structure — Organize by data flow, not file type
  3. Railway-oriented error handling — Errors as data, not exceptions
  4. Explicit dependency injection — Via Effect's service pattern

This is not a rewrite. We will:

  • Apply patterns to new features first
  • Refactor existing code opportunistically
  • Maintain interop with current Promise-based code

The Patterns

Pattern 1: Pipe-Based Composition

Before: Nested, hard to trace

typescript
async function processContent(contentId: string) {
  try {
    const content = await db.content.findUnique({ where: { id: contentId } })
    if (!content) throw new Error('Content not found')
    
    const validated = validateContent(content)
    if (!validated.success) throw new Error(validated.error)
    
    const enriched = await enrichWithMetadata(validated.data)
    const transformed = await transformForFeed(enriched)
    
    await publishToFeed(transformed)
    await notifyFollowers(content.authorId, transformed)
    
    return { success: true, contentId: transformed.id }
  } catch (error) {
    console.error('Failed to process content:', error)
    throw error
  }
}

After: Linear flow, each step visible

typescript
import { Effect, pipe } from 'effect'
import * as Content from '@/features/content-pipeline'

const processContent = (contentId: string) =>
  pipe(
    Content.fetch(contentId),
    Effect.flatMap(Content.validate),
    Effect.flatMap(Content.enrichWithMetadata),
    Effect.flatMap(Content.transformForFeed),
    Effect.tap(Content.publishToFeed),
    Effect.tap((content) => Content.notifyFollowers(content.authorId, content)),
    Effect.map((content) => ({ success: true, contentId: content.id }))
  )

// Run it
Effect.runPromise(processContent("abc123"))

Why this is better:

  • Read top-to-bottom, matches data flow
  • Each step is a pure function, testable in isolation
  • Effect.tap for side effects makes them explicit
  • Errors propagate automatically

Pattern 2: Railway-Oriented Error Handling

Before: Stringly-typed errors, unclear failure modes

typescript
async function mintNFT(collectionId: string, userId: string) {
  const collection = await getCollection(collectionId)
  if (!collection) throw new Error('Collection not found')
  
  const user = await getUser(userId)
  if (!user) throw new Error('User not found')
  
  if (!user.hasCredits) throw new Error('Insufficient credits')
  
  const content = await generateContent(collection)
  if (!content) throw new Error('Generation failed')
  
  const nft = await mint(content)
  return nft
}

// Caller has no idea what errors to expect
try {
  await mintNFT(id, userId)
} catch (e) {
  // Is this a user error? System error? Retryable?
  console.error(e.message)
}

After: Typed errors, explicit handling

typescript
import { Effect, Data } from 'effect'

// Define error types
class CollectionNotFound extends Data.TaggedError("CollectionNotFound")<{
  collectionId: string
}> {}

class UserNotFound extends Data.TaggedError("UserNotFound")<{
  userId: string
}> {}

class InsufficientCredits extends Data.TaggedError("InsufficientCredits")<{
  userId: string
  required: number
  available: number
}> {}

class GenerationFailed extends Data.TaggedError("GenerationFailed")<{
  reason: string
}> {}

// Function signature shows what can fail
const mintNFT = (
  collectionId: string,
  userId: string
): Effect.Effect<
  NFT,
  CollectionNotFound | UserNotFound | InsufficientCredits | GenerationFailed,
  Database | ContentGenerator
> =>
  pipe(
    Effect.all({
      collection: getCollection(collectionId),
      user: getUser(userId)
    }),
    Effect.flatMap(({ collection, user }) =>
      user.credits < 1
        ? Effect.fail(new InsufficientCredits({ userId, required: 1, available: user.credits }))
        : Effect.succeed({ collection, user })
    ),
    Effect.flatMap(({ collection }) => generateContent(collection)),
    Effect.flatMap(mint)
  )

// Caller can handle specific errors
pipe(
  mintNFT(collectionId, userId),
  Effect.catchTags({
    CollectionNotFound: (e) => /* handle missing collection */,
    InsufficientCredits: (e) => /* prompt to buy credits, e.required is typed */,
    UserNotFound: (e) => /* redirect to login */,
    GenerationFailed: (e) => /* retry or show error */
  })
)

Why this is better:

  • TypeScript shows you every error that can occur
  • Each error carries relevant data
  • Callers must handle errors (or explicitly propagate)
  • No stringly-typed error messages to pattern match

Pattern 3: Dependency Injection via Services

Before: Hidden dependencies, hard to test

typescript
import { db } from '@/lib/db'
import { redis } from '@/lib/redis'
import { openai } from '@/lib/openai'

async function generateContent(prompt: string) {
  const cached = await redis.get(`prompt:${prompt}`)
  if (cached) return JSON.parse(cached)
  
  const result = await openai.chat.completions.create({ ... })
  
  await db.generations.create({ data: { prompt, result } })
  await redis.set(`prompt:${prompt}`, JSON.stringify(result))
  
  return result
}

// Testing requires mocking modules 😱
jest.mock('@/lib/db')
jest.mock('@/lib/redis')
jest.mock('@/lib/openai')

After: Explicit dependencies, easy testing

typescript
import { Effect, Context, Layer } from 'effect'

// Define service interfaces
class Database extends Context.Tag("Database")<
  Database,
  {
    readonly saveGeneration: (prompt: string, result: any) => Effect.Effect<void>
  }
>() {}

class Cache extends Context.Tag("Cache")<
  Cache,
  {
    readonly get: (key: string) => Effect.Effect<Option<string>>
    readonly set: (key: string, value: string) => Effect.Effect<void>
  }
>() {}

class ContentGenerator extends Context.Tag("ContentGenerator")<
  ContentGenerator,
  {
    readonly generate: (prompt: string) => Effect.Effect<GeneratedContent>
  }
>() {}

// Function declares what it needs
const generateContent = (prompt: string) =>
  pipe(
    Cache.get(`prompt:${prompt}`),
    Effect.flatMap(
      Option.match({
        onSome: (cached) => Effect.succeed(JSON.parse(cached)),
        onNone: () =>
          pipe(
            ContentGenerator.generate(prompt),
            Effect.tap((result) => Database.saveGeneration(prompt, result)),
            Effect.tap((result) => Cache.set(`prompt:${prompt}`, JSON.stringify(result)))
          )
      })
    )
  )

// Provide real implementations in production
const program = pipe(
  generateContent("a sunset over mountains"),
  Effect.provide(DatabaseLive),
  Effect.provide(CacheLive),
  Effect.provide(ContentGeneratorLive)
)

// Provide test implementations in tests — no mocking!
const testProgram = pipe(
  generateContent("a sunset"),
  Effect.provide(DatabaseTest),  // In-memory
  Effect.provide(CacheTest),     // Simple Map
  Effect.provide(ContentGeneratorTest) // Returns fixture
)

Why this is better:

  • Dependencies are visible in the type signature
  • No global imports to mock
  • Test implementations are just different providers
  • Swap implementations without changing business logic

Pattern 4: Feature-Based Folder Structure

Before: Organized by file type

src/
  components/
    ContentCard.tsx
    FeedItem.tsx
    GenerateButton.tsx
  hooks/
    useContent.ts
    useFeed.ts
    useGenerate.ts
  services/
    contentService.ts
    feedService.ts
    generateService.ts
  types/
    content.ts
    feed.ts
    generate.ts
  utils/
    contentUtils.ts
    feedUtils.ts

To understand "generate content" you touch 6+ files across 5 folders.

After: Organized by feature/flow

src/
  features/
    content-pipeline/
      index.ts           ← Public API (what other features import)
      fetch.ts           ← Step 1: Get content
      validate.ts        ← Step 2: Validate
      enrich.ts          ← Step 3: Add metadata
      transform.ts       ← Step 4: Transform for feed
      publish.ts         ← Step 5: Publish
      notify.ts          ← Step 6: Notify followers
      errors.ts          ← All errors for this flow
      types.ts           ← Types for this flow
      __tests__/
        pipeline.test.ts ← Test the whole flow
        
    nft-minting/
      index.ts
      generate.ts
      mint.ts
      treasury.ts
      errors.ts
      types.ts
      
    token-allocation/
      index.ts
      calculate.ts
      merkle.ts
      claim.ts
      errors.ts
      
  shared/
    services/            ← Shared service definitions
      database.ts
      cache.ts
      blockchain.ts
    lib/                 ← Pure utilities
      math.ts
      format.ts

Why this is better:

  • Open one folder to see the entire flow
  • index.ts shows the pipeline at a glance
  • Related code stays together
  • Adding a step = adding a file in the right folder

Emerge-Specific Examples

Example 1: Content Generation Pipeline

typescript
// src/features/content-pipeline/index.ts

import { Effect, pipe } from 'effect'
import * as Fetch from './fetch'
import * as Validate from './validate'
import * as Generate from './generate'
import * as Mint from './mint'
import * as Distribute from './distribute'

/**
 * Complete content generation flow:
 * Request → Validate → Generate → Mint → Distribute Revenue
 */
export const processGenerationRequest = (request: GenerationRequest) =>
  pipe(
    // Validate the request
    Validate.request(request),
    
    // Check user has credits
    Effect.flatMap(Validate.userCredits(request.userId)),
    
    // Fetch collection parameters
    Effect.flatMap(() => Fetch.collection(request.collectionId)),
    
    // Generate content
    Effect.flatMap((collection) => 
      Generate.content(collection, request.parameters)
    ),
    
    // Mint as NFT
    Effect.flatMap((content) =>
      Mint.nft(content, request.userId)
    ),
    
    // Distribute revenue to token holders
    Effect.tap((nft) =>
      Distribute.revenueToHolders(nft.collectionId, nft.mintPrice)
    ),
    
    // Return result
    Effect.map((nft) => ({
      success: true,
      nftId: nft.id,
      transactionHash: nft.txHash
    }))
  )

// Error handling at the edge
export const handleGenerationRequest = (request: GenerationRequest) =>
  pipe(
    processGenerationRequest(request),
    Effect.catchTags({
      InvalidRequest: (e) => 
        Effect.succeed({ success: false, error: 'invalid_request', details: e.reason }),
      InsufficientCredits: (e) => 
        Effect.succeed({ success: false, error: 'insufficient_credits', required: e.required }),
      CollectionNotFound: () => 
        Effect.succeed({ success: false, error: 'collection_not_found' }),
      GenerationFailed: (e) => 
        Effect.succeed({ success: false, error: 'generation_failed', retryable: e.retryable }),
      MintFailed: (e) => 
        Effect.succeed({ success: false, error: 'mint_failed', txHash: e.attemptedTxHash })
    }),
    // Provide all dependencies
    Effect.provide(LiveServices)
  )

Example 2: Farcaster Share Tracking

typescript
// src/features/viral-tracking/index.ts

import { Effect, pipe, Stream } from 'effect'

/**
 * Track shares across Farcaster, update viral metrics
 * 
 * Flow:
 * Webhook → Parse Cast → Extract Embeds → Match Content → Update Metrics → Trigger Rewards
 */
export const processFarcasterWebhook = (payload: FarcasterWebhookPayload) =>
  pipe(
    // Parse and validate the cast
    Parse.cast(payload),
    
    // Extract any Emerge embeds
    Effect.flatMap(Extract.emergeEmbeds),
    
    // Skip if no Emerge content
    Effect.flatMap((embeds) =>
      embeds.length === 0
        ? Effect.succeed({ tracked: false })
        : processEmbeds(embeds, payload.cast)
    )
  )

const processEmbeds = (embeds: EmergeEmbed[], cast: Cast) =>
  pipe(
    // Process each embed in parallel
    Effect.forEach(embeds, (embed) =>
      pipe(
        // Record the share
        Metrics.recordShare({
          contentId: embed.contentId,
          sharedBy: cast.author.fid,
          castHash: cast.hash,
          timestamp: cast.timestamp
        }),
        
        // Update viral score
        Effect.flatMap(() => 
          Metrics.updateViralScore(embed.contentId)
        ),
        
        // Check if this triggers any rewards
        Effect.flatMap((score) =>
          score.crossedThreshold
            ? Rewards.triggerViralReward(embed.contentId, score.newTier)
            : Effect.succeed(null)
        )
      ),
      { concurrency: 'unbounded' }
    ),
    
    Effect.map((results) => ({
      tracked: true,
      embedsProcessed: results.length
    }))
  )

Example 3: Token Revenue Distribution

typescript
// src/features/revenue-distribution/index.ts

import { Effect, pipe } from 'effect'

/**
 * When NFT sells, distribute revenue to token holders
 * 
 * Flow:
 * Sale Event → Calculate Shares → Buy Token → Burn → Record Distribution
 */
export const distributeRevenue = (saleEvent: NFTSaleEvent) =>
  pipe(
    // Get collection's token contract
    Fetch.tokenContract(saleEvent.collectionId),
    
    // Calculate revenue after platform fee
    Effect.map((contract) => ({
      contract,
      netRevenue: calculateNetRevenue(saleEvent.price)
    })),
    
    // Execute buy & burn
    Effect.flatMap(({ contract, netRevenue }) =>
      pipe(
        // Swap ETH for tokens on AMM
        AMM.buyTokens(contract.ammPool, netRevenue),
        
        // Burn the purchased tokens
        Effect.flatMap((purchasedTokens) =>
          Token.burn(contract.address, purchasedTokens)
        ),
        
        // Record the distribution
        Effect.tap((burnTx) =>
          Database.recordDistribution({
            collectionId: saleEvent.collectionId,
            saleId: saleEvent.id,
            amountBurned: burnTx.amount,
            txHash: burnTx.hash
          })
        )
      )
    ),
    
    // Log success
    Effect.tap((result) =>
      Effect.log(`Burned ${result.amount} tokens for collection ${saleEvent.collectionId}`)
    )
  )

Implementation Plan

Phase 1: Setup (1-2 days)

  1. Install Effect: npm install effect
  2. Add folder structure for first feature
  3. Create shared service definitions (Database, Cache, etc.)
  4. Document patterns in team wiki

Phase 2: New Feature Pilot (1 week)

Apply patterns to one new feature end-to-end:

  • Suggested: Conversation Coin chat room (greenfield)
  • Or: New webhook handler for Farcaster tracking

This validates the patterns before broader adoption.

Phase 3: Extract Shared Services (1 week)

Create Layer implementations for:

  • Database (Prisma wrapper)
  • Cache (Redis wrapper)
  • Blockchain (Viem wrapper)
  • ContentGenerator (OpenAI/Replicate wrapper)

Phase 4: Incremental Migration (Ongoing)

When touching existing code:

  • Refactor to new patterns if change is significant
  • Leave working code alone if just bug fixes
  • New features always use new patterns

Consequences

Positive

  • Traceable flows — Read code top-to-bottom, matches mental model
  • Typed errors — TypeScript catches unhandled error cases
  • Testable — No mocking, just provide test implementations
  • Composable — Build complex flows from simple pieces
  • Consistent — One way to handle async, errors, dependencies

Negative

  • Learning curve — Effect has concepts to learn (3-5 days to be productive)
  • Bundle size — Effect adds ~50KB (acceptable for our use case)
  • Hiring — Fewer devs know Effect (but patterns transfer from other FP)
  • Interop friction — Wrapping Promise-based libraries takes effort

Mitigations

  • Create internal "Effect crash course" doc
  • Build helper functions for common Emerge patterns
  • Pair programming during initial adoption
  • Keep escape hatch: Effect.promise() wraps any Promise

References


Decision

Adopt functional flow patterns using Effect, applied incrementally starting with new features.


Released under the MIT License.