Documentation source
Workflow Engine
DAG-based workflow orchestration for entity field extraction and agent tasks, with dependency resolution, atomic node claiming, output contracts, and human-in-the-loop support.
<Callout type="danger" title="ARCHIVED & DEPRECATED">
**This documentation is obsolete.** As of PR 620, the legacy execution engine has been eradicated. This functionality has been completely replaced by **the unified Sessions engine** and the Unified Sessions primitive. Do not build new features using these patterns.
Please refer to [Tasks](/docs/features/tasks) and [Response System](/docs/features/response-system) for the new execution architecture.
</Callout>
## Overview
The workflow engine orchestrates multi-step work on entities by compiling field extraction configs into a directed acyclic graph (DAG). Each node represents a discrete unit of work: collect human input for a field, ask an agent to produce a field response, or execute a free-form agent task. Nodes declare dependencies on other nodes, and the engine resolves execution order automatically.
The engine supports both synchronous execution (a caller advances the workflow run directly) and asynchronous resumption (agents claim and execute nodes assigned to them during heartbeat runs). Human-in-the-loop workflows pause at `waiting_human` nodes until an entity update satisfies the node's requirements.
Workflow runs and their node executions are fully tracked in the `workflow_runs` and `workflow_node_runs` database tables for auditability and debugging.
## Key Concepts
### Node Types
```typescript
const WORKFLOW_NODE_TYPES = [
"wait_human",
"agent_task",
] as const;
```
- **wait_human** -- Waits for a human to fill in or approve a field value. The node starts in `waiting_human` status once its dependencies are satisfied and completes when the entity content contains the required value.
- **agent_task** -- An agent-executed node. Field extraction nodes and free-form workflow tasks both run through this path. For field nodes, the agent is expected to use `submitResponse`, not write raw content directly.
Legacy rows may still carry `extract_field` or `human_input`; the runtime treats them as backwards-compatible aliases for `agent_task` and `wait_human`.
### Node Lifecycle
```typescript
const WORKFLOW_NODE_STATUSES = [
"pending", // Ready for execution, dependencies met
"blocked", // Waiting on upstream dependencies
"running", // Currently being executed (claimed)
"waiting_human", // Paused, waiting for human input
"completed", // Successfully finished
"failed", // Execution failed
] as const;
```
The status transitions follow this flow:
```
blocked -> pending -> running -> completed
\-> waiting_human -> completed
\-> running -> failed
```
When a workflow starts, each node's initial status is computed from its dependencies, whether a field already has a value, and whether the field is locked. Nodes with all dependencies completed start as `pending`; others start as `blocked`. As upstream nodes complete, blocked nodes are re-evaluated and promoted to `pending` or `waiting_human`.
### Workflow Run Statuses
```typescript
const WORKFLOW_RUN_STATUSES = [
"running", // Active execution
"completed", // All nodes completed
"failed", // All incomplete nodes failed or blocked
"partial", // Some completed, some failed
"waiting_human", // Paused on human input nodes
] as const;
```
The run status is derived from the aggregate of its node statuses. A run is `completed` only when every node is `completed`. If any nodes are still `pending` or `running`, the run is `running`. If all remaining work is blocked on human input, the run is `waiting_human`.
### Trigger Types
```typescript
const WORKFLOW_TRIGGER_TYPES = [
"manual", // User clicked "Extract" or triggered via API
"entity_created", // Auto-run on new entity creation
"entity_updated", // Entity update resumed a waiting_human node
"document_ready", // Document finished processing
"heartbeat", // Agent heartbeat picked up assigned nodes
"feedback_rejected", // Extraction result rejected, rerun triggered
] as const;
```
### Output Contracts
Output contracts declare what a node is expected to produce, enabling validation after agent task execution:
```typescript
interface OutputContract {
type: WorkflowOutputType; // field, fields, entity, entities, relation-entity, document, status, none
fieldNames?: string[]; // For field/fields: which fields should be populated
entityTypeSlug?: string; // For entity types: what type to create
metadataKey?: string; // For status: which metadata key to set
metadataValue?: unknown; // For status: expected value
}
```
The eight output types and their validation logic:
| Type | Validation |
|------|-----------|
| `field` | Named field in entity content is non-empty, **or** a matching `entity_responses` record for this node contains the field value. |
| `fields` | All named fields in entity content are non-empty, **or** a matching response covers the missing fields. |
| `entity` | At least one new entity relation was created since node started. |
| `entities` | Same as `entity` (at least one relation). |
| `relation-entity` | A new entity of the specified type was created since node started. |
| `document` | A new document was linked to the entity since node started. |
| `status` | Named metadata key was set (optionally matching expected value). |
| `none` | Always passes. No validation performed. |
For `field` and `fields` types, validation is a two-step check: first the entity content is inspected, then `entity_responses` records tagged with the current `workflow_node_run_id` are checked. This prevents spurious failures when an agent correctly called `submitResponse` but the promotion pipeline had not yet written the value back to `entities.content`.
### Field Overrides
`WorkflowFieldOverride` allows per-field reviewer guidance and response metadata to be injected at workflow start time:
```typescript
interface WorkflowFieldOverride {
instructions?: string; // Extra reviewer guidance appended to the field prompt
parentResponseId?: string; // Links the new response to a parent for lineage tracking
responseSource?: "extraction" | "feedback"; // Tags the response source; defaults to trigger type
}
```
Pass field overrides via `RunEntityWorkflowOptions.fieldOverrides`:
```typescript
await startEntityWorkflow(entityId, {
fieldOverrides: {
market_size: {
instructions: "Focus on the EMEA region for this analysis.",
responseSource: "feedback",
},
},
});
```
When a `feedback_rejected` trigger fires a rerun, `responseSource` automatically defaults to `"feedback"` so the new response is correctly distinguished from the original extraction. Fields with `requiresApproval: true` in their extraction config are not auto-promoted — they require human review before their value is committed to the entity.
### Node Claiming
To prevent duplicate execution in concurrent environments (multiple heartbeat agents, manual triggers), nodes use atomic claiming:
```typescript
interface WorkflowNodeRunRecord {
// ... other fields
claimed_by: string | null; // Who claimed this node
claimed_at: string | null; // When it was claimed
attempt_count: number; // How many times execution was attempted
}
```
The `claim_workflow_node_run` SQL function performs an atomic compare-and-swap. It can claim fresh `pending` work and retryable `failed` work, while still preventing duplicate execution by requiring the row to be unclaimed at the moment of update.
Stale claims (from crashed workers or timed-out executions) are expired via `expireStaleWorkflowNodeClaims()`, which resets nodes that have been claimed longer than the timeout (default: 15 minutes) back to `failed` status with an "eligible for retry" message.
### Atomic Entity Merge Writes
Entity extraction and recovery paths now use Postgres RPCs to merge JSONB in the database instead of fetching `entities.content` or `entities.metadata`, merging in application code, and writing back a full object. The two supported RPCs are:
- `merge_entity_content(entityId, tenantId, newContent)`
- `merge_entity_metadata(entityId, tenantId, newMetadata)`
They perform a single `UPDATE ... SET column = COALESCE(column, '{}'::jsonb) || new_value` statement scoped by both entity ID and tenant ID. This removes the TOCTOU gap that existed in the old read-merge-write pattern and keeps concurrent writes from overwriting sibling fields.
## How It Works
### Workflow Compilation
The `compileEntityWorkflowDefinition()` function compiles field config into a `WorkflowDefinition`:
1. **Field configs** -- From `EntityTypeConfig.fields`. Each field with `extraction` becomes an `agent_task` node keyed as `field:{fieldName}`. Each field with `humanInput` becomes a `wait_human` node.
2. **Schema properties** -- From `entity_types.json_schema.properties`. Used for labels and field-specific prompt context.
3. **Dependency normalization** -- Dependencies can reference field names or node keys. The compiler resolves them to canonical node keys and discards references to missing nodes.
The compiler also carries field-level requirements like `required`, `validation`, and `requiresApproval` into node metadata so the runtime can pause on review-gated work.
If a field or scoring node does not have an explicit `agentSlug`, the compiler uses the resolved workflow default agent. Resolution order is: entity-type workflow default, tenant workflow default, then the first enabled agent whose role can both submit responses and promote approved field values.
### Workflow Execution
The `startEntityWorkflow()` function initiates a new workflow run:
1. **Load definition** -- `loadEntityWorkflowDefinition()` fetches the entity, its type config, field configs, schema properties, and entity metadata. It compiles these into a workflow definition.
2. **Filter target nodes** -- If specific fields were requested (partial extraction), only nodes for those fields and their transitive dependencies are included. Locked fields in `entity.metadata.lockedFields` are removed from the run up front.
3. **Ensure criteria set** -- Before any field work is seeded, the runtime ensures a default criteria set exists for the entity type so `submitResponse` is always available.
4. **Compute initial statuses** -- Each node's initial status is determined by checking whether its dependencies are already met and whether the target field already has a value (for non-forced runs).
5. **Create run record** -- A `workflow_runs` row is created with the trigger type and metadata.
6. **Seed node runs** -- `workflow_node_runs` rows are created for each node with their computed initial statuses.
7. **Advance the run** -- `advanceWorkflowRun()` takes over and executes all claimable nodes.
### Advancing a Workflow Run
The `advanceWorkflowRun()` function is the core execution loop:
1. **Expire stale claims** -- Reset any nodes that were claimed too long ago.
2. **Re-evaluate statuses** -- Check each blocked/waiting node against current entity content and dependency completion. Promote nodes whose dependencies are now met.
3. **Find ready nodes** -- Nodes in `pending` status that match the caller's agent slug filter (if any).
4. **Claim and execute** -- For each ready node, attempt an atomic claim. If successful, execute the node based on its type:
- `agent_task` field node: Resolves the assigned agent, injects field-specific prompt context, requires `submitResponse`, and validates the output contract.
- `agent_task` free-form node: Resolves the assigned agent, executes the task, and validates the output contract.
- `wait_human`: These nodes are never in `pending`; they go directly to `waiting_human`.
5. **Loop** -- After executing a batch, refresh entity content (fields may have changed) and re-evaluate. Continue until no more claimable nodes exist.
6. **Finalize** -- Compute the run's final status from the aggregate of node statuses.
If a field has `requiresApproval: true`, the node does not mark itself complete when the agent finishes. Instead it records `awaitingApproval`, transitions to `waiting_human`, and only unblocks dependents after a promoted response writes the approved value into `entities.content`.
Workflow node metadata now records both the configured assignee slug and the resolved runtime agent (`resolvedAgentSlug`, `resolvedAgentName`). This is the value the UI should display when fallback resolution picks a different agent than the one originally configured on the node.
### Human-in-the-Loop Resumption
When an entity is updated (via UI inline editing or agent tools), the `entity/updated` Inngest event triggers the `entity-update-resume` function. This function finds active workflow runs with `waiting_human` nodes for the updated entity and re-evaluates them. If the human input node's target field now has a value, the node transitions to `completed`, potentially unblocking downstream nodes.
### Agent Heartbeat Resumption
Agents configured with heartbeat scheduling periodically check for workflow nodes assigned to them. The `resumeWorkflowRunsForAgent()` function:
1. Expires stale claims for this agent.
2. Queries `workflow_node_runs` for claimable nodes with `assignee_agent_slug` matching this agent, including retryable failures.
3. Groups claimable nodes by workflow run.
4. Advances each run, restricting execution to nodes assigned to this agent.
The resumption loop now processes grouped runs with bounded parallelism rather than serially walking one run at a time. The implementation uses `Promise.allSettled()` so one failing run does not block the rest of the agent's claimable work.
This enables distributed workflow execution where different agents handle different steps of the same workflow.
### Automation Step Execution
`executeAutomationStep()` in `features/workflows/execute-automation-step.ts` runs a single named step of an entity automation:
1. **Injects tenant context** -- The entire call is wrapped in `withToolContext({ tenantId }, fn)` so every AI tool the agent calls during the step can resolve the correct tenant via `getTenantIdForTool()`. Without this wrapping, tools that call `getActiveTenantId()` fail silently in background job contexts (Inngest, heartbeat) because there is no HTTP request to read cookies from.
2. **Creates a chat record** -- A `chats` row is inserted so the run is visible in the admin UI and the transcript is linkable from the run history panel.
3. **Resolves and runs the agent** -- Resolves the step's `agentSlug`, builds tools and system prompt, and executes the agent.
4. **Persists messages** -- The user prompt and all assistant messages (including intermediate tool calls and results) are written to `messages` in a single batch under the created chat.
The `chatId` and token usage are stored in `workflow_node_runs.metadata` so the run history UI can surface per-step transcript links and aggregate token counts.
### Automation Run History
`getAutomationRuns()` in `features/automations/server/actions.ts` returns paginated run history for any automation type:
- **Heartbeat automations** (`id` prefixed `heartbeat:`) — queries `agent_heartbeat_runs` directly.
- **Entity automations** (bare UUID) — queries `workflow_runs` joined with `workflow_node_runs`. Token usage is aggregated from node metadata. The first `chatId` found across a run's nodes is surfaced as the run-level transcript link.
The `RunHistoryPanel` component in `features/automations/components/run-history-panel.tsx` renders this data on the automation entity detail page, showing per-step status, duration, error messages, and links to chat transcripts.
## API Reference
### Workflow Actions (`features/workflows/server/actions.ts`)
| Function | Signature | Description |
|----------|-----------|-------------|
| `createWorkflowRun` | `(input: { entityId, tenantId, triggerType, metadata? }) => Promise<WorkflowRunRecord>` | Creates a new workflow run record. |
| `seedWorkflowNodeRuns` | `(input: { workflowRunId, entityId, tenantId, nodes }) => Promise<WorkflowNodeRunRecord[]>` | Inserts node run records for all nodes in a workflow. |
| `updateWorkflowNodeRun` | `(nodeRunId, patch) => Promise<void>` | Updates a node run's status, metadata, error message, claim info, or timing. |
| `claimWorkflowNodeRun` | `(nodeRunId, claimedBy, claimedAt?) => Promise<WorkflowNodeRunRecord \| null>` | Atomically claims a node via the `claim_workflow_node_run` SQL function. Returns null if claim fails. |
| `expireStaleWorkflowNodeClaims` | `(input: { olderThanMs, tenantId?, runId?, entityId?, agentSlug? }) => Promise<WorkflowNodeRunRecord[]>` | Resets nodes claimed longer than the threshold to `failed`. |
| `completeWorkflowRun` | `(runId, status, durationMs?, metadata?) => Promise<void>` | Finalizes a workflow run with its status and timing. |
| `getLatestWorkflowRun` | `(entityId, tenantId) => Promise<WorkflowRunRecord \| null>` | Returns the most recent workflow run for an entity. |
| `getWorkflowRunById` | `(runId) => Promise<WorkflowRunRecord \| null>` | Fetches a single workflow run by ID. |
| `getWorkflowRunHistory` | `(entityId, limit?, tenantId?) => Promise<WorkflowRunRecord[]>` | Paginated run history for an entity. |
| `getActiveWorkflowRunsForEntity` | `(entityId) => Promise<WorkflowRunRecord[]>` | Returns runs in running, waiting_human, or partial status. |
| `getWorkflowNodesForRun` | `(runId) => Promise<WorkflowNodeRunRecord[]>` | All node runs for a workflow run, ordered by creation time. |
| `getWorkflowNodesForRuns` | `(runIds) => Promise<Record<string, WorkflowNodeRunRecord[]>>` | Batch fetch, grouped by run ID. |
| `getClaimableWorkflowNodeRunsForAgent` | `(tenantId, agentSlug, limit?) => Promise<WorkflowNodeRunRecord[]>` | Claimable nodes assigned to a specific agent, including retryable failures. |
### Workflow Execution (`features/workflows/run-workflow.ts`)
| Function | Signature | Description |
|----------|-----------|-------------|
| `startEntityWorkflow` | `(entityId, options?) => Promise<RunEntityWorkflowResult>` | Compiles definition, creates run, seeds nodes, and advances. |
| `advanceWorkflowRun` | `(runId, options) => Promise<RunEntityWorkflowResult>` | Claims and executes ready nodes in a loop until no more work. |
| `resumeWorkflowRunsForAgent` | `(options) => Promise<ResumeWorkflowRunsForAgentResult>` | Finds and advances all runs with claimable nodes for an agent. |
| `runEntityWorkflow` | `(entityId, options?) => Promise<RunEntityWorkflowResult>` | Alias for `startEntityWorkflow`. |
### Compilation (`features/workflows/compile.ts`)
| Function | Signature | Description |
|----------|-----------|-------------|
| `compileEntityWorkflowDefinition` | `(params: CompileParams) => WorkflowDefinition` | Compiles field configs + detail blocks into a DAG of workflow nodes. |
### Output Validation (`features/workflows/validate-output.ts`)
| Function | Signature | Description |
|----------|-----------|-------------|
| `validateOutputContract` | `(params: { entityId, node, admin }) => Promise<OutputContractResult>` | Checks entity state after agent task execution against the node's declared output type. For `field`/`fields` types, falls back to checking `entity_responses` tagged with the node run ID before declaring failure. |
### Workflow Types (`features/workflows/types.ts`)
| Export | Description |
|--------|-------------|
| `WorkflowFieldOverride` | Per-field override with `instructions`, `parentResponseId`, and `responseSource`. Passed via `RunEntityWorkflowOptions.fieldOverrides`. |
## For Agents
Agents interact with workflows through three dedicated tools available in chat:
- **`getWorkflowStatus`** -- Read-only tool to inspect the current workflow state for an entity. Returns run status and node details.
- **`triggerWorkflow`** -- Starts a new workflow run for an entity, optionally targeting specific fields.
- **`retryNode`** -- Retries a failed workflow node.
These are Tier 1 safe tools -- they do not modify entity data directly (though triggering a workflow causes agents to execute, which may modify data).
Agents also participate in workflows as executors. When a node has `assigneeAgentSlug` set, the engine resolves that agent and invokes it to complete the task. During heartbeat runs, agents automatically claim and resume workflow nodes assigned to them via `resumeWorkflowRunsForAgent()`.
## UI Components
### WorkflowProgress (`features/workflows/components/workflow-progress.tsx`)
The primary entity-level workflow UI, rendered in the entity detail sidebar. Shows:
- **Progress bar** with step completion count
- **Pulsing indicator** when a workflow is actively running
- **Counts for next / waiting / blocked work**
- **Toast notifications** on workflow status transitions
- **Auto-refresh** when a run finishes so newly promoted values appear immediately
### WorkflowTimeline (`features/workflows/components/workflow-timeline.tsx`)
Entity-level board view for the latest run. Nodes are grouped into four always-visible lanes:
- **In Flight** -- currently claimed and executing
- **Next Up** -- ready to run as soon as an agent claims them
- **Waiting on You** -- human-input and approval-gated steps
- **Blocked** -- downstream work waiting on dependencies or stuck behind failures
Completed work is shown in a collapsible **Done** lane.
### WorkflowBacklogBoard (`features/workflows/components/workflow-backlog-board.tsx`)
Workspace-level kanban board for active runs. It flattens node runs across entities into the same four lanes so operators can see current work, the next backlog, user-blocked steps, and blocked runs moving through the system without opening each record individually.
### Live Polling (`features/workflows/hooks/use-entity-workflow.ts`)
The `useEntityWorkflow` hook polls every 5 seconds when the latest run has an active status (`running` or `waiting_human`). Polling stops automatically when the run completes. Uses React Query's `refetchInterval` callback.
### Status Utilities (`features/workflows/lib/workflow-ui-utils.ts`)
Shared constants and functions for workflow UI:
- `NODE_STATUS_CONFIG` -- icon, label, and CSS class for each node status
- `isActiveRunStatus()` -- whether a run status indicates active processing
- `isActiveNodeStatus()` -- whether a node needs attention (running, waiting, failed)
- `runStatusMessage()` -- human-readable toast messages for status transitions
## Design Decisions
**Compilation over configuration.** Rather than storing workflow definitions as static config, the engine compiles them on-the-fly from field configs. This means adding a field extraction instruction automatically creates a workflow node with no extra workflow document to keep in sync.
**Atomic claiming via SQL function.** The `claim_workflow_node_run` function uses a PostgreSQL atomic update over claimable rows. This eliminates race conditions without application-level locking or distributed coordination.
**Content refresh after each node.** After executing a node, the engine re-fetches entity content from the database. This ensures that field values written by one node are visible when evaluating downstream dependencies. Without this refresh, dependency-based unblocking could see stale data.
**Run status derived, not stored.** The workflow run status is computed from the aggregate of node statuses via `finalizeRunStatus()` rather than being independently tracked. This prevents status drift between the run and its nodes.
**15-minute claim timeout.** Stale claims are expired after 15 minutes. This balances between allowing long-running extractions to complete and recovering from worker crashes. The timeout is configurable via the `WORKFLOW_NODE_CLAIM_TIMEOUT_MS` constant.
**Two-step output contract validation.** `validateOutputContract()` checks entity content first, then falls back to checking `entity_responses` records tagged with the current node run ID. This prevents spurious failures when the agent correctly called `submitResponse` but the async promotion pipeline had not yet written the value back to `entities.content`. The fallback is cheap: it queries at most 10 responses, filtered by a unique node run ID.
**Approval gates block downstream work.** A field response that requires approval is not treated as complete merely because the agent finished. The run pauses in `waiting_human` until a reviewer promotion writes the value into entity content, which keeps the DAG honest and makes user blockers explicit in the UI.
**Feedback reruns default to `responseSource: "feedback"`.** When a `feedback_rejected` trigger starts a rerun, the response tool is configured with `responseSource: "feedback"` automatically. This ensures the new response is correctly tagged for lineage tracking, distinguishing corrections from initial extractions without requiring callers to remember to set the source.
**Automation steps wrap execution in `withToolContext`.** Background agent execution (Inngest, heartbeat) has no HTTP request context. Next.js's `getActiveTenantId()` reads from cookies, which are unavailable in background jobs. `executeAutomationStep` wraps the entire agent call in `withToolContext({ tenantId }, fn)` so all 19+ AI tool `execute` functions can call `getTenantIdForTool()` and receive the correct tenant ID from `AsyncLocalStorage`. Any new background execution path that invokes agent tools must apply the same wrapper.
## Related Modules
- **Extraction** (`features/entities/extraction/`) -- Field extraction config compiles directly into workflow `agent_task` nodes.
- **Agent System** (`features/agents/`) -- Agent resolution, tool set building, and heartbeat scheduling drive workflow execution.
- **Inngest Functions** (`features/inngest/functions/`) -- `entity-extraction.ts` triggers workflows on entity creation; `entity-update-resume` handles human input completion.
- **Responses** (`features/responses/`) -- Workflow field nodes use `submitResponse`, approval, and promotion to turn agent output into versioned record values.