Skip to content

ADR: Attestation Payload Tracking for Workflow and Job Requests

Date: 2025-11-16 Status: Proposed Decision Makers: System Architecture Team

Context

Currently, attestations are created by workers for job completion/failure and by the Job Queue API for workflow completion. However, we lack consistent visibility into:

  1. What triggered the workflow - The original request from the miniapp to EmProps API
  2. What was requested for each job - The job payload sent from EmProps API to Job Queue API

This makes debugging workflows difficult because we cannot quickly see:

  • What the user originally requested (variables, collection, etc.)
  • What payload was sent for each individual job step
  • The full request chain: Miniapp → EmProps API → Job Queue API → Worker

Decision

We will ensure that every attestation record contains two critical pieces of information:

1. miniapp-req (Workflow-level)

Captured at: POST /collections/:id/generations (EmProps API) Stored in: Workflow attestations Contains:

  • Request body (variables, workflow_id, workflow_priority)
  • User context (user_id, collection_id)
  • Request metadata (timestamp, headers if relevant)

2. emprops-req-payload (Job-level)

Captured at: POST /api/jobs (Job Queue API) Stored in: Job attestations (created by workers) Contains:

  • Job payload sent to Job Queue API
  • Already available as job.payload

Implementation Plan

Phase 1: Capture miniapp-req in EmProps API

File: apps/emprops-api/src/routes/generator/v2.tsFunction: runCollectionGenerationLine: ~98-150

Steps:

  1. Capture original request data immediately after validation:

    typescript
    const miniappReq = {
      request_body: {
        variables: bodyValidationResult.data.variables,
        workflow_id: bodyValidationResult.data.workflow_id,
        workflow_priority: bodyValidationResult.data.workflow_priority,
      },
      user_context: {
        user_id: userId,
        collection_id: collectionId,
      },
      metadata: {
        timestamp: Date.now(),
        request_id: jobId, // Same as workflow_id
      }
    };
  2. Pass miniapp-req to all job submissions via the ctx field:

    • Modify where ctx is constructed (in DirectJobNode.execute)
    • Add miniapp_req: miniappReq to the context
    • This gets passed through to Job Queue API
  3. Store in workflow job record:

    typescript
    await prisma.job.create({
      data: {
        // ... existing fields
        metadata: {
          miniapp_req: miniappReq,
          // ... other metadata
        }
      }
    });

Files to modify:

  • apps/emprops-api/src/routes/generator/v2.ts (capture)
  • apps/emprops-api/src/modules/art-gen/nodes-v2/nodes/direct-job.ts (pass to jobs)

Phase 2: Ensure miniapp-req flows through Job Queue API

File: apps/api/src/lightweight-api-server.tsFunction: submitJobLine: ~3623-3900

Steps:

  1. Preserve miniapp-req from incoming job data:

    typescript
    const job: Job = {
      // ... existing fields
      ctx: jobData.ctx as Record<string, unknown> | undefined,
      // ctx should now contain miniapp_req
    };
  2. Store in Redis with job data:

    • Already happens via redis.hmset at line ~3839
    • ctx field is already stringified and stored
    • No code change needed here
  3. Verify ctx is passed to workers:

    • Workers already receive jobData.ctx
    • Workers parse it at line ~903 in redis-direct-worker-client.ts
    • No code change needed

Files to verify:

  • apps/api/src/lightweight-api-server.ts (verify ctx storage)

Phase 3: Include Both Fields in Worker Attestations

File: apps/worker/src/redis-direct-worker-client.tsFunction: completeJob, failJobLines: ~855-950 (completion), ~1118-1230 (failure)

Steps:

  1. Extract miniapp-req from job context:

    typescript
    // Around line 900-906 where ctx is parsed
    const miniappReq = parsedCtx?.miniapp_req || null;
  2. Add to worker completion attestation:

    typescript
    const workerCompletionRecord = {
      // ... existing fields
      miniapp_req: miniappReq, // 🆕 Original miniapp request
      emprops_req_payload: parsedPayload, // 🆕 Job payload from EmProps API
      // ... rest of fields
    };
  3. Add to worker failure attestation:

    typescript
    const workerFailureRecord = {
      // ... existing fields
      miniapp_req: miniappReq, // 🆕 Original miniapp request
      emprops_req_payload: parsedPayload, // 🆕 Job payload from EmProps API
      // ... rest of fields
    };
  4. Store in Redis attestation keys:

    • Already happens via redis.hmset at lines ~952 and ~1205
    • The new fields will automatically be included

Files to modify:

  • apps/worker/src/redis-direct-worker-client.ts (add fields to attestations)

Phase 4: Include miniapp-req in Workflow Attestations

File: apps/api/src/lightweight-api-server.tsFunction: createWorkflowCompletionAttestationLine: ~4892-4960

Steps:

  1. Retrieve miniapp-req from first job's context:

    • Workflow completion attestation needs access to workflow data
    • Fetch first job of workflow to get miniapp-req from its ctx
  2. Add to workflow attestation:

    typescript
    const workflowAttestation = {
      // ... existing fields
      miniapp_req: miniappReq, // 🆕 Original request that triggered workflow
      // ... rest of fields
    };
  3. Store in workflow attestation record:

    • Follow existing pattern for workflow attestation storage

Files to modify:

  • apps/api/src/lightweight-api-server.ts (add miniapp-req to workflow attestation)

Phase 5: Database Schema Verification

File: packages/database-schema/prisma/schema.prismaModel: attestationsLine: ~818-852

Verify:

The attestations table already has:

prisma
attestation_data Json

This JSON field can hold both:

  • miniapp_req (for workflow attestations)
  • emprops_req_payload (for job attestations)

No schema changes needed - both fields will be stored in the attestation_data JSON column.


Data Flow

┌─────────────┐
│   Miniapp   │
└──────┬──────┘
       │ POST /collections/:id/generations
       │ { variables, workflow_id, ... }

┌─────────────────────┐
│   EmProps API       │
│  ┌──────────────┐   │
│  │ Capture:     │   │
│  │ miniapp-req  │◄──┼─── Store in ctx
│  └──────────────┘   │
└──────┬──────────────┘
       │ For each step:
       │ POST /api/jobs
       │ { payload, ctx: { miniapp_req, ... }, ... }

┌─────────────────────┐
│  Job Queue API      │
│  ┌──────────────┐   │
│  │ Store in     │   │
│  │ Redis        │◄──┼─── ctx includes miniapp_req
│  └──────────────┘   │
└──────┬──────────────┘
       │ Worker claims job
       │ jobData includes ctx

┌─────────────────────┐
│     Worker          │
│  ┌──────────────┐   │
│  │ Attestation: │   │
│  │ - miniapp_req│   │ ◄─── From ctx
│  │ - emprops_   │   │ ◄─── From payload
│  │   req_payload│   │
│  └──────────────┘   │
└─────────────────────┘

       │ On workflow completion

┌─────────────────────┐
│  Job Queue API      │
│  ┌──────────────┐   │
│  │ Workflow     │   │
│  │ Attestation: │   │
│  │ - miniapp_req│   │ ◄─── From first job's ctx
│  └──────────────┘   │
└─────────────────────┘

Expected Attestation Structure

Job Attestation (Worker-created)

json
{
  "attestation_key": "worker-completion:job-abc123",
  "attestation_type": "worker_completion",
  "entity_type": "job",
  "redis_step_id": "abc123",
  "redis_workflow_id": "workflow-xyz",
  "worker_id": "comfyui-gpu0",
  "status": "completed",
  "attestation_data": {
    "miniapp_req": {
      "request_body": {
        "variables": { "style": "realistic", "prompt": "sunset" },
        "workflow_priority": 50
      },
      "user_context": {
        "user_id": "user-123",
        "collection_id": "collection-abc"
      },
      "metadata": {
        "timestamp": 1700000000000,
        "request_id": "workflow-xyz"
      }
    },
    "emprops_req_payload": {
      "service_required": "comfyui",
      "payload": {
        "prompt": "beautiful sunset over mountains",
        "width": 1024,
        "height": 1024
      }
    },
    "result": "https://cdn.example.com/output.png",
    "completed_at": 1700000100000
  }
}

Workflow Attestation (API-created)

json
{
  "attestation_key": "workflow-completion:workflow-xyz",
  "attestation_type": "workflow_completion",
  "entity_type": "workflow",
  "redis_workflow_id": "workflow-xyz",
  "status": "completed",
  "attestation_data": {
    "miniapp_req": {
      "request_body": {
        "variables": { "style": "realistic", "prompt": "sunset" },
        "workflow_priority": 50
      },
      "user_context": {
        "user_id": "user-123",
        "collection_id": "collection-abc"
      },
      "metadata": {
        "timestamp": 1700000000000,
        "request_id": "workflow-xyz"
      }
    },
    "workflow_outputs": ["https://cdn.example.com/output.png"],
    "total_steps": 3,
    "completed_steps": 3,
    "completed_at": 1700000200000
  }
}

Benefits

  1. Complete Request Traceability

    • See exactly what the user requested from the miniapp
    • See exactly what was sent to the job queue for each step
    • Full audit trail from user action → final result
  2. Faster Debugging

    • No need to reconstruct what happened from logs
    • Attestations contain the full request context
    • Can quickly identify if issue was in user input, job configuration, or processing
  3. Improved Monitoring

    • Monitor UI can display original user request alongside job results
    • Compare what was requested vs. what was delivered
    • Identify patterns in failures related to specific request types
  4. No Breaking Changes

    • Uses existing attestation_data JSON field
    • Workers already receive ctx - just adding fields
    • Backward compatible - old attestations without these fields still work

Risks and Mitigation

Risk 1: Increased Attestation Size

Impact: Storing full request bodies in every attestation could increase storage

Mitigation:

  • Attestations already have 7-day TTL
  • miniapp-req is relatively small (variables + metadata)
  • emprops-req-payload already exists in job data
  • Only adds ~1-5KB per attestation
  • Redis can handle this easily

Risk 2: Sensitive Data in Attestations

Impact: Request bodies might contain user data (prompts, images)

Mitigation:

  • Already sanitizing base64 data from attestations (line 859-863 in worker)
  • miniapp-req contains variables (small text values)
  • Image URLs already stored, not base64 data
  • Follow existing sanitization patterns

Risk 3: Deployment Complexity

Impact: Changes span 3 services (emprops-api, job-queue-api, worker)

Mitigation:

  • Changes are additive - won't break existing flow
  • Can deploy incrementally:
    1. Deploy emprops-api (starts sending miniapp-req)
    2. Deploy worker (starts including in attestations)
    3. Deploy job-queue-api (starts including in workflow attestations)
  • Old attestations without new fields will still work

Testing Plan

Unit Tests

  1. EmProps API: Verify miniapp-req is captured and passed to jobs
  2. Job Queue API: Verify ctx with miniapp-req is stored in Redis
  3. Worker: Verify attestations include both miniapp-req and emprops-req-payload

Integration Tests

  1. Submit workflow via /collections/:id/generations
  2. Verify first job has miniapp-req in ctx
  3. Verify worker attestation includes both fields
  4. Verify workflow attestation includes miniapp-req

Manual Testing

  1. Trigger workflow from miniapp
  2. Check Redis for job data - verify ctx.miniapp_req exists
  3. Check worker attestation - verify both fields present
  4. Check workflow completion attestation - verify miniapp-req present
  5. View in Monitor UI - verify fields display correctly

Success Criteria

  • [ ] Every job attestation includes emprops-req-payload
  • [ ] Every job attestation includes miniapp-req (from workflow context)
  • [ ] Every workflow attestation includes miniapp-req
  • [ ] Monitor UI can display both fields for debugging
  • [ ] No increase in job submission latency
  • [ ] No breaking changes to existing attestations

Timeline

  • Phase 1 (EmProps API): 2-4 hours
  • Phase 2 (Job Queue API verification): 1 hour
  • Phase 3 (Worker attestations): 2-3 hours
  • Phase 4 (Workflow attestations): 2 hours
  • Phase 5 (Testing): 2-3 hours

Total estimated time: 9-13 hours

References

  • Attestations schema: packages/database-schema/prisma/schema.prisma:818-852
  • Worker attestation creation: apps/worker/src/redis-direct-worker-client.ts:855-1230
  • Job submission: apps/api/src/lightweight-api-server.ts:3623-3900
  • Workflow generation: apps/emprops-api/src/routes/generator/v2.ts:91-300

Released under the MIT License.