Adopting Functional Flow Patterns in Emerge
Status: Proposed
Date: 2024-12-10
Author: Sandy
Deciders: Sandy, Sayeed
Context
Emerge is a content platform built on Node/TypeScript. As the codebase grows, we're experiencing friction:
- Hard to trace data flow — Business logic scattered across files, callbacks, and middleware
- Error handling is inconsistent — Mix of try/catch,
.catch(), and unhandled rejections - Testing is painful — Tight coupling makes unit tests require extensive mocking
- Debugging async flows — When something fails in a pipeline, finding the source is tedious
The founder (Sandy) naturally thinks in terms of data flowing through connected systems. The current codebase doesn't match this mental model, making it harder to reason about and extend.
We considered switching to Elixir/Phoenix but need to preserve ecosystem compatibility (Farcaster tools, existing integrations, team knowledge).
Decision
We will incrementally adopt functional flow patterns in the existing TypeScript codebase using:
- Effect — For typed, composable pipelines with built-in error handling
- Feature-based folder structure — Organize by data flow, not file type
- Railway-oriented error handling — Errors as data, not exceptions
- Explicit dependency injection — Via Effect's service pattern
This is not a rewrite. We will:
- Apply patterns to new features first
- Refactor existing code opportunistically
- Maintain interop with current Promise-based code
The Patterns
Pattern 1: Pipe-Based Composition
Before: Nested, hard to trace
async function processContent(contentId: string) {
try {
const content = await db.content.findUnique({ where: { id: contentId } })
if (!content) throw new Error('Content not found')
const validated = validateContent(content)
if (!validated.success) throw new Error(validated.error)
const enriched = await enrichWithMetadata(validated.data)
const transformed = await transformForFeed(enriched)
await publishToFeed(transformed)
await notifyFollowers(content.authorId, transformed)
return { success: true, contentId: transformed.id }
} catch (error) {
console.error('Failed to process content:', error)
throw error
}
}After: Linear flow, each step visible
import { Effect, pipe } from 'effect'
import * as Content from '@/features/content-pipeline'
const processContent = (contentId: string) =>
pipe(
Content.fetch(contentId),
Effect.flatMap(Content.validate),
Effect.flatMap(Content.enrichWithMetadata),
Effect.flatMap(Content.transformForFeed),
Effect.tap(Content.publishToFeed),
Effect.tap((content) => Content.notifyFollowers(content.authorId, content)),
Effect.map((content) => ({ success: true, contentId: content.id }))
)
// Run it
Effect.runPromise(processContent("abc123"))Why this is better:
- Read top-to-bottom, matches data flow
- Each step is a pure function, testable in isolation
Effect.tapfor side effects makes them explicit- Errors propagate automatically
Pattern 2: Railway-Oriented Error Handling
Before: Stringly-typed errors, unclear failure modes
async function mintNFT(collectionId: string, userId: string) {
const collection = await getCollection(collectionId)
if (!collection) throw new Error('Collection not found')
const user = await getUser(userId)
if (!user) throw new Error('User not found')
if (!user.hasCredits) throw new Error('Insufficient credits')
const content = await generateContent(collection)
if (!content) throw new Error('Generation failed')
const nft = await mint(content)
return nft
}
// Caller has no idea what errors to expect
try {
await mintNFT(id, userId)
} catch (e) {
// Is this a user error? System error? Retryable?
console.error(e.message)
}After: Typed errors, explicit handling
import { Effect, Data } from 'effect'
// Define error types
class CollectionNotFound extends Data.TaggedError("CollectionNotFound")<{
collectionId: string
}> {}
class UserNotFound extends Data.TaggedError("UserNotFound")<{
userId: string
}> {}
class InsufficientCredits extends Data.TaggedError("InsufficientCredits")<{
userId: string
required: number
available: number
}> {}
class GenerationFailed extends Data.TaggedError("GenerationFailed")<{
reason: string
}> {}
// Function signature shows what can fail
const mintNFT = (
collectionId: string,
userId: string
): Effect.Effect<
NFT,
CollectionNotFound | UserNotFound | InsufficientCredits | GenerationFailed,
Database | ContentGenerator
> =>
pipe(
Effect.all({
collection: getCollection(collectionId),
user: getUser(userId)
}),
Effect.flatMap(({ collection, user }) =>
user.credits < 1
? Effect.fail(new InsufficientCredits({ userId, required: 1, available: user.credits }))
: Effect.succeed({ collection, user })
),
Effect.flatMap(({ collection }) => generateContent(collection)),
Effect.flatMap(mint)
)
// Caller can handle specific errors
pipe(
mintNFT(collectionId, userId),
Effect.catchTags({
CollectionNotFound: (e) => /* handle missing collection */,
InsufficientCredits: (e) => /* prompt to buy credits, e.required is typed */,
UserNotFound: (e) => /* redirect to login */,
GenerationFailed: (e) => /* retry or show error */
})
)Why this is better:
- TypeScript shows you every error that can occur
- Each error carries relevant data
- Callers must handle errors (or explicitly propagate)
- No stringly-typed error messages to pattern match
Pattern 3: Dependency Injection via Services
Before: Hidden dependencies, hard to test
import { db } from '@/lib/db'
import { redis } from '@/lib/redis'
import { openai } from '@/lib/openai'
async function generateContent(prompt: string) {
const cached = await redis.get(`prompt:${prompt}`)
if (cached) return JSON.parse(cached)
const result = await openai.chat.completions.create({ ... })
await db.generations.create({ data: { prompt, result } })
await redis.set(`prompt:${prompt}`, JSON.stringify(result))
return result
}
// Testing requires mocking modules 😱
jest.mock('@/lib/db')
jest.mock('@/lib/redis')
jest.mock('@/lib/openai')After: Explicit dependencies, easy testing
import { Effect, Context, Layer } from 'effect'
// Define service interfaces
class Database extends Context.Tag("Database")<
Database,
{
readonly saveGeneration: (prompt: string, result: any) => Effect.Effect<void>
}
>() {}
class Cache extends Context.Tag("Cache")<
Cache,
{
readonly get: (key: string) => Effect.Effect<Option<string>>
readonly set: (key: string, value: string) => Effect.Effect<void>
}
>() {}
class ContentGenerator extends Context.Tag("ContentGenerator")<
ContentGenerator,
{
readonly generate: (prompt: string) => Effect.Effect<GeneratedContent>
}
>() {}
// Function declares what it needs
const generateContent = (prompt: string) =>
pipe(
Cache.get(`prompt:${prompt}`),
Effect.flatMap(
Option.match({
onSome: (cached) => Effect.succeed(JSON.parse(cached)),
onNone: () =>
pipe(
ContentGenerator.generate(prompt),
Effect.tap((result) => Database.saveGeneration(prompt, result)),
Effect.tap((result) => Cache.set(`prompt:${prompt}`, JSON.stringify(result)))
)
})
)
)
// Provide real implementations in production
const program = pipe(
generateContent("a sunset over mountains"),
Effect.provide(DatabaseLive),
Effect.provide(CacheLive),
Effect.provide(ContentGeneratorLive)
)
// Provide test implementations in tests — no mocking!
const testProgram = pipe(
generateContent("a sunset"),
Effect.provide(DatabaseTest), // In-memory
Effect.provide(CacheTest), // Simple Map
Effect.provide(ContentGeneratorTest) // Returns fixture
)Why this is better:
- Dependencies are visible in the type signature
- No global imports to mock
- Test implementations are just different providers
- Swap implementations without changing business logic
Pattern 4: Feature-Based Folder Structure
Before: Organized by file type
src/
components/
ContentCard.tsx
FeedItem.tsx
GenerateButton.tsx
hooks/
useContent.ts
useFeed.ts
useGenerate.ts
services/
contentService.ts
feedService.ts
generateService.ts
types/
content.ts
feed.ts
generate.ts
utils/
contentUtils.ts
feedUtils.tsTo understand "generate content" you touch 6+ files across 5 folders.
After: Organized by feature/flow
src/
features/
content-pipeline/
index.ts ← Public API (what other features import)
fetch.ts ← Step 1: Get content
validate.ts ← Step 2: Validate
enrich.ts ← Step 3: Add metadata
transform.ts ← Step 4: Transform for feed
publish.ts ← Step 5: Publish
notify.ts ← Step 6: Notify followers
errors.ts ← All errors for this flow
types.ts ← Types for this flow
__tests__/
pipeline.test.ts ← Test the whole flow
nft-minting/
index.ts
generate.ts
mint.ts
treasury.ts
errors.ts
types.ts
token-allocation/
index.ts
calculate.ts
merkle.ts
claim.ts
errors.ts
shared/
services/ ← Shared service definitions
database.ts
cache.ts
blockchain.ts
lib/ ← Pure utilities
math.ts
format.tsWhy this is better:
- Open one folder to see the entire flow
index.tsshows the pipeline at a glance- Related code stays together
- Adding a step = adding a file in the right folder
Emerge-Specific Examples
Example 1: Content Generation Pipeline
// src/features/content-pipeline/index.ts
import { Effect, pipe } from 'effect'
import * as Fetch from './fetch'
import * as Validate from './validate'
import * as Generate from './generate'
import * as Mint from './mint'
import * as Distribute from './distribute'
/**
* Complete content generation flow:
* Request → Validate → Generate → Mint → Distribute Revenue
*/
export const processGenerationRequest = (request: GenerationRequest) =>
pipe(
// Validate the request
Validate.request(request),
// Check user has credits
Effect.flatMap(Validate.userCredits(request.userId)),
// Fetch collection parameters
Effect.flatMap(() => Fetch.collection(request.collectionId)),
// Generate content
Effect.flatMap((collection) =>
Generate.content(collection, request.parameters)
),
// Mint as NFT
Effect.flatMap((content) =>
Mint.nft(content, request.userId)
),
// Distribute revenue to token holders
Effect.tap((nft) =>
Distribute.revenueToHolders(nft.collectionId, nft.mintPrice)
),
// Return result
Effect.map((nft) => ({
success: true,
nftId: nft.id,
transactionHash: nft.txHash
}))
)
// Error handling at the edge
export const handleGenerationRequest = (request: GenerationRequest) =>
pipe(
processGenerationRequest(request),
Effect.catchTags({
InvalidRequest: (e) =>
Effect.succeed({ success: false, error: 'invalid_request', details: e.reason }),
InsufficientCredits: (e) =>
Effect.succeed({ success: false, error: 'insufficient_credits', required: e.required }),
CollectionNotFound: () =>
Effect.succeed({ success: false, error: 'collection_not_found' }),
GenerationFailed: (e) =>
Effect.succeed({ success: false, error: 'generation_failed', retryable: e.retryable }),
MintFailed: (e) =>
Effect.succeed({ success: false, error: 'mint_failed', txHash: e.attemptedTxHash })
}),
// Provide all dependencies
Effect.provide(LiveServices)
)Example 2: Farcaster Share Tracking
// src/features/viral-tracking/index.ts
import { Effect, pipe, Stream } from 'effect'
/**
* Track shares across Farcaster, update viral metrics
*
* Flow:
* Webhook → Parse Cast → Extract Embeds → Match Content → Update Metrics → Trigger Rewards
*/
export const processFarcasterWebhook = (payload: FarcasterWebhookPayload) =>
pipe(
// Parse and validate the cast
Parse.cast(payload),
// Extract any Emerge embeds
Effect.flatMap(Extract.emergeEmbeds),
// Skip if no Emerge content
Effect.flatMap((embeds) =>
embeds.length === 0
? Effect.succeed({ tracked: false })
: processEmbeds(embeds, payload.cast)
)
)
const processEmbeds = (embeds: EmergeEmbed[], cast: Cast) =>
pipe(
// Process each embed in parallel
Effect.forEach(embeds, (embed) =>
pipe(
// Record the share
Metrics.recordShare({
contentId: embed.contentId,
sharedBy: cast.author.fid,
castHash: cast.hash,
timestamp: cast.timestamp
}),
// Update viral score
Effect.flatMap(() =>
Metrics.updateViralScore(embed.contentId)
),
// Check if this triggers any rewards
Effect.flatMap((score) =>
score.crossedThreshold
? Rewards.triggerViralReward(embed.contentId, score.newTier)
: Effect.succeed(null)
)
),
{ concurrency: 'unbounded' }
),
Effect.map((results) => ({
tracked: true,
embedsProcessed: results.length
}))
)Example 3: Token Revenue Distribution
// src/features/revenue-distribution/index.ts
import { Effect, pipe } from 'effect'
/**
* When NFT sells, distribute revenue to token holders
*
* Flow:
* Sale Event → Calculate Shares → Buy Token → Burn → Record Distribution
*/
export const distributeRevenue = (saleEvent: NFTSaleEvent) =>
pipe(
// Get collection's token contract
Fetch.tokenContract(saleEvent.collectionId),
// Calculate revenue after platform fee
Effect.map((contract) => ({
contract,
netRevenue: calculateNetRevenue(saleEvent.price)
})),
// Execute buy & burn
Effect.flatMap(({ contract, netRevenue }) =>
pipe(
// Swap ETH for tokens on AMM
AMM.buyTokens(contract.ammPool, netRevenue),
// Burn the purchased tokens
Effect.flatMap((purchasedTokens) =>
Token.burn(contract.address, purchasedTokens)
),
// Record the distribution
Effect.tap((burnTx) =>
Database.recordDistribution({
collectionId: saleEvent.collectionId,
saleId: saleEvent.id,
amountBurned: burnTx.amount,
txHash: burnTx.hash
})
)
)
),
// Log success
Effect.tap((result) =>
Effect.log(`Burned ${result.amount} tokens for collection ${saleEvent.collectionId}`)
)
)Implementation Plan
Phase 1: Setup (1-2 days)
- Install Effect:
npm install effect - Add folder structure for first feature
- Create shared service definitions (Database, Cache, etc.)
- Document patterns in team wiki
Phase 2: New Feature Pilot (1 week)
Apply patterns to one new feature end-to-end:
- Suggested: Conversation Coin chat room (greenfield)
- Or: New webhook handler for Farcaster tracking
This validates the patterns before broader adoption.
Phase 3: Extract Shared Services (1 week)
Create Layer implementations for:
- Database (Prisma wrapper)
- Cache (Redis wrapper)
- Blockchain (Viem wrapper)
- ContentGenerator (OpenAI/Replicate wrapper)
Phase 4: Incremental Migration (Ongoing)
When touching existing code:
- Refactor to new patterns if change is significant
- Leave working code alone if just bug fixes
- New features always use new patterns
Consequences
Positive
- Traceable flows — Read code top-to-bottom, matches mental model
- Typed errors — TypeScript catches unhandled error cases
- Testable — No mocking, just provide test implementations
- Composable — Build complex flows from simple pieces
- Consistent — One way to handle async, errors, dependencies
Negative
- Learning curve — Effect has concepts to learn (3-5 days to be productive)
- Bundle size — Effect adds ~50KB (acceptable for our use case)
- Hiring — Fewer devs know Effect (but patterns transfer from other FP)
- Interop friction — Wrapping Promise-based libraries takes effort
Mitigations
- Create internal "Effect crash course" doc
- Build helper functions for common Emerge patterns
- Pair programming during initial adoption
- Keep escape hatch:
Effect.promise()wraps any Promise
References
- Effect Documentation
- Effect GitHub
- Railway Oriented Programming — Original concept
- Effect Best Practices
- Practical Guide to Effect — Community guide
Decision
Adopt functional flow patterns using Effect, applied incrementally starting with new features.
