Inngest
Background job orchestration with Inngest -- event model, function catalog, step patterns, concurrency controls, and local development.
Inngest
Inngest handles all background job orchestration for the Sprinter Platform. It provides event-driven, durable step functions with automatic retries, concurrency controls, and observability. The Inngest client is registered as sprinter-platform.
Event model
All events are defined with typed payloads in features/inngest/client.ts:
| Event | Payload | Triggered by |
|---|---|---|
entity/created | entityId, tenantId, entityTypeSlug | Entity creation (server actions) |
entity/updated | entityId, tenantId, entityTypeSlug | Entity updates (server actions) |
entity/enrich | entityId, tenantId, sourceUrl | Entity enrichment trigger |
document/uploaded | documentId, tenantId, entityId | Document upload |
agent/heartbeat | agentId, tenantId | Cron dispatcher, manual trigger |
webhook/fire | tenantId, event, payload | Field actions, custom integrations |
extraction/result-rejected | responseId, entityId, tenantId, fieldName, rejectionReason? | Human rejection of an entity_responses value |
task/triggered | taskId, entityId?, tenantId, triggeredBy | Action/task trigger helpers |
session/execute | parentSessionId, tenantId, triggeredBy | triggerTask() and action dispatch |
session/resume | sessionId, tenantId, decision, denialReason?, modifiedArgs? | Human-in-the-loop tool approval |
session/completed | sessionId, parentSessionId, tenantId, taskSlug | Session executor completion |
response.promoted | tenant_id, entity_id, entity_type_slug, criteria_set_slug, response_id | Response promotion |
claim.aggregate.recompute | tenant_id, claim_id, supports_relation_type, evidence_quality_criteria_slug | Response-driven aggregate recompute |
capture.url.routed | tenantId, url, agentSlug, captureBody | URL capture routing |
email/send, email/received | Email payloads | Email delivery and inbound processing |
notification/digest | tenantId, frequency | Scheduled digest |
Events are sent via inngest.send() from server actions and API routes. The typed event definitions ensure payload correctness at compile time.
Function catalog
All functions live in features/inngest/functions/. Each uses the inngest.createFunction() API with named steps for observability and durability.
action-dispatch
ID: action-dispatch
Triggers: entity/created, entity/updated
Concurrency: tenant/entity scoped
Finds active actions whose trigger and scope match the entity event, calls
triggerTask(), and dispatches session/execute for the resulting parent
session. This is the default field-population entrypoint for entity changes.
action-cron
ID: action-cron
Triggers: cron
Finds scheduled active actions and hands eligible work to triggerTask().
session-executor
ID: session-executor
Triggers: session/execute
Executes parent/child sessions, writes ordered session_events, runs tools,
handles structured response submission, and emits follow-up events such as
session/completed.
session-resume
ID: session-resume
Triggers: session/resume
Resumes sessions after human-in-the-loop tool approval or denial.
agent-heartbeat
ID: agent-heartbeat
Triggers: agent/heartbeat
Concurrency: 5 global
Executes a scheduled agent run. The function:
- Fetches the agent record and creates a heartbeat run entry.
- Builds an attention snapshot from claimable actions and context.
- Attempts to claim and trigger eligible assigned work via
triggerTask(). - If no work and mode is "light" with no attention items, completes as a no-op.
- Otherwise, builds a prompt with attention context, creates a chat, executes the agent with its own role permissions, and saves the results.
- Records token usage and metadata on the run.
feedback-rerun
ID: extraction-feedback-rerun
Triggers: extraction/result-rejected
Concurrency: 3 global, 1 per entity
Auto-reruns field population when a response is rejected with a reason. Loads the rejected response value, builds feedback instructions ("Previous attempt returned: X. Rejected because: Y. Please try again addressing this feedback."), and triggers the matching field action with feedback injected as additional instructions.
document-processing
ID: document-processing
Triggers: document/uploaded
Concurrency: 3 global, 2 per tenant
Retries: 3
Multi-step document processing pipeline:
- Fetch -- loads document metadata, marks status as
processing - Parse -- downloads file from storage, runs through format-specific parser (PDF, DOCX, etc.), stores pages in
document_pagestable - Chunk -- reads pages back, creates overlapping chunks (1000 chars, 200 overlap) with page tracking, stores in
document_chunks - Embed -- generates embeddings in batches of 100, upserts vectors into
document_chunks - Finalize -- marks document as
ready, updates linked document entity - Trigger field population -- finds entities linked to the document (via
entity_idorentity_relations) and triggers matching document-ready actions through the actions/sessions runtime.
entity-enrichment
ID: entity-enrichment
Triggers: entity/enrich
Concurrency: 5 global
Retries: 2
Async OG-metadata enrichment for entities with a source_url. Loads the
current entity (tenant-scoped), bails out if the URL changed since the event
was fired, then calls enrichEntity() to fetch OpenGraph metadata, store the
preview image in Supabase storage, and patch the entity with the enriched
fields.
webhook-delivery
ID: webhook-delivery
Triggers: webhook/fire
Retries: 3
Delivers webhook events to registered endpoints. Finds all enabled webhook_endpoints that subscribe to the event type, then delivers the payload to each with HMAC signature verification (X-Webhook-Signature header). Tracks delivery success and failure counts per endpoint.
recompute-claim-aggregates-on-response
ID: recompute-claim-aggregates-on-response
Triggers: response.promoted
Concurrency: 5 global, 1 per source-item entity_id
Retries: 2
Slug-gates on (entity_type, criteria_set) matching the tenant's configured source-item type + evidence-quality criteria set, then fans out one claim.aggregate.recompute event per claim that the promoted source-item supports. The fan-out is durable — wrapped in step.run("fanout", () => inngest.send([...])) so listener retries don't re-emit partial batches. The expensive per-claim derivation runs in the downstream handler, so this listener stays narrow.
recompute-claim-aggregate-for-claim
ID: recompute-claim-aggregate-for-claim
Triggers: claim.aggregate.recompute
Concurrency: 10 global, 1 per (tenant_id, claim_id) pair
Retries: 2
Consumes fanout events from the listener and atomically writes source_count + source_quality_avg to entities.content via the public.apply_claim_aggregate RPC. The per-claim limit-1 concurrency key is the correctness contract — two source-items on the same claim landing inside one Inngest window are serialized end-to-end (read → derive → write), eliminating the stale-aggregate race that bulk-import traffic previously made visible. Deleted claims land as a graceful no-op (RPC returns zero rows); Inngest does not retry. Emits logger.info("claim-aggregate recompute", { tenant_id, claim_id, updated }) for queue-backlog visibility in the Inngest UI.
Step function patterns
Inngest functions use named steps for durability and observability:
export const myFunction = inngest.createFunction(
{
id: "my-function",
name: "Human-Readable Name",
concurrency: [{ limit: 5 }],
retries: 2,
},
{ event: "my/event" },
async ({ event, step }) => {
// Each step is independently retryable
const data = await step.run("fetch-data", async () => {
// This step retries on failure without re-running previous steps
return fetchSomething(event.data.id);
});
const result = await step.run("process", async () => {
return processData(data);
});
return { success: true, result };
}
);Concurrency controls
Functions declare concurrency limits to prevent overloading external services:
- Global limit --
{ limit: 5 }caps total concurrent executions - Per-key limit --
{ limit: 2, key: "event.data.tenantId" }caps per tenant - Per-entity limit --
{ limit: 1, key: "event.data.entityId" }prevents concurrent processing of the same entity
Event filtering
Functions can filter on event data to only trigger for specific conditions:
{
event: "entity/updated",
if: `event.data.entityTypeSlug == 'agent-task'`,
}Registering functions
All Inngest functions must be registered with the Inngest serve handler. The serve endpoint is an API route that Inngest calls to discover and invoke functions.
Local development
Run the Inngest Dev Server alongside your Next.js dev server:
npx inngest-cli@latest devThe Dev Server provides:
- A dashboard at
http://localhost:8288for viewing function runs, events, and step output - Automatic function discovery from your Next.js API route
- Event replay for debugging failed runs
- Step-by-step execution tracing
In development, events are processed immediately (no queue delay). Failed steps show full error traces in the dashboard.
Environment variables
| Variable | Purpose |
|---|---|
INNGEST_EVENT_KEY | Event key for sending events (production) |
INNGEST_SIGNING_KEY | Signing key for webhook verification (production) |
In local development, these are not required -- the Dev Server handles authentication automatically.
Monitoring
Each function run is visible in the Inngest dashboard (cloud or local dev) with:
- Event payload that triggered the run
- Individual step execution times and outputs
- Retry history for failed steps
- Concurrency queue status
For production monitoring, the Inngest Cloud dashboard provides alerting on function failures and latency spikes.