Documentation source
Integration Substrate
Declarative connectors, one sync engine, provenance ledger, webhook ingress, HITL approvals, and MCP gateway — the one true path for all external data.
## Overview
The integration substrate is the shared platform layer for every external data connection. It is not a warehouse, connector marketplace, or product-specific sync queue. It is the layer that lets agents, humans, schedules, and webhooks use external data and side effects safely — with full provenance, replay from raw objects, and a single admin surface.
**After the 2026-06-12 unification epic** the substrate gained declarative connectors (no-code, browser-authored), source-sync convergence (RSS/API/web feeds run through the same engine), webhook ingress (HMAC/Standard-Webhooks verified), a full admin workbench, and an MCP gateway that exposes every connector's synced data through a single endpoint.
The invariant contract:
1. `agent_connections` holds credentials and provider account identity.
2. `integration_connectors` stores declarative `ConnectorSpec` definitions (or code-first `IntegrationDefinition`s register identically in the definition registry).
3. `integration_runs` records every sync, remap, webhook trigger, and readback attempt.
4. `integration_objects` stores redacted provider-shaped object versions (provenance, replay, audit).
5. `integration_entity_links` maps object versions to the Amble fields they produced.
6. `integration_operations` stages provider writes with idempotency and review state.
7. `integration_operation_events` is the append-only write audit trail.
8. `external_data_sources` / `external_data_points` hold time-series metric data written by `metrics`-target connector resources.
9. `actions`, sessions, Inngest, tools, MCP, and HITL are execution and UX surfaces over this substrate — they reference rows here but never own the durable state.
## Key Concepts
### ConnectorSpec
A `ConnectorSpec` is a Zod-validated, versioned, tenant-scoped row in `integration_connectors`. It declares everything a connector needs without writing TypeScript:
```ts
type ConnectorSpec = {
slug: string; // registry key, e.g. "stripe"
label: string;
version: number; // bump on edit; mapper version recorded on ledger rows
connection: {
presetId: string; // references an agent_connections preset
credentialKey?: string;
configKeys?: string[];
};
http?: {
baseUrl?: string;
rateLimit?: { rps?: number; burst?: number };
retry?: { attempts: number };
};
resources: Array<{
name: string; // "account", "transaction", "contact"
request:
| { kind: "rest"; path: string; method?: "GET" | "POST"; params?: Record<string, string> }
| { kind: "mcp_tool"; tool: string; args?: Record<string, unknown> }
| { kind: "feed"; feedType: "rss" }
| { kind: "web"; strategy: "auto" | "http" | "firecrawl" | "browser" };
pagination: {
style: "none" | "cursor_token" | "sql_offset" | "time_window_offset";
pageSize?: number; maxPages?: number; cursorPath?: string;
hasMorePath?: string; itemsPath?: string;
};
externalId: string; // JMESPath expression — safe for agent-authored input
target:
| { kind: "entity"; entityType: string; titleTemplate: string }
| { kind: "metrics"; sourceKey: string; recordedAtPath?: string;
metrics: Array<{ key: string; label?: string; valuePath: string }> };
fields: Record<string, { path: string; convert?: "moneyToCents" | "dateToIso" | "lower" | "str" | "refPath" | "negate" }>;
dedupe?: { windowDays?: number };
}>;
schedule?: { cadence: "manual" | "hourly" | "daily" | "weekly"; cron?: string };
webhook?: {
verifier: "standard_webhooks" | "hmac_sha256" | "none";
secretRef?: string;
triggers?: Record<string, string>; // event type → resource name
};
mcpExposure?: { resources?: boolean; tools?: boolean };
};
```
`compileConnectorSpec(spec)` produces a standard `IntegrationDefinition` that the existing sync engine executes identically to code-first connectors (Plaid, QuickBooks, Acuity).
### Request kinds
| Kind | What it pulls | Source |
| --- | --- | --- |
| `rest` | REST API endpoint with pagination | `http.baseUrl` + connection credentials |
| `mcp_tool` | Snapshot call to an MCP tool | `agent_connections` MCP connection |
| `feed` | RSS/Atom feed | Connection URL |
| `web` | Web page (HTTP, Firecrawl, or browser) | Connection URL |
### Resource targets
| Target kind | Where data lands | How blocks bind it |
| --- | --- | --- |
| `entity` | `entities` via `upsertEntityKeyed()` | `kind: "entity"` data source |
| `metrics` | `external_data_points` via `recordIntegrationMetrics()` | `kind: "metric"` data source (ADR-0052) |
### Webhook ingress
Inbound webhooks arrive at `/api/webhooks/integrations/[tenantSlug]/[slug]`. The route:
1. Looks up the connector spec by slug and tenant.
2. Verifies the payload against the spec's `webhook.verifier`:
- `standard_webhooks` — [Standard Webhooks](https://www.standardwebhooks.com/) spec (adopted by OpenAI, Anthropic, Supabase, Twilio).
- `hmac_sha256` — raw HMAC-SHA256 over the body using `webhook.secretRef` from the connection config.
- Provider JWT verifiers (e.g. Plaid) are code-registered escape hatches.
3. Fails closed — unverified payloads return 401, nothing queued.
4. Verified events fan into Inngest, which triggers a resource sync for the matching `webhook.triggers` entry.
`secretRef` is the key name inside the connection's credential store — the secret itself is never in the spec row.
### Metric data source (ADR-0052)
The `kind: "metric"` data source is the canonical binding for external time-series data:
```ts
// In a View's dataSources map:
{
kind: "metric",
sourceKey: "ga4-sessions", // matches external_data_sources.source_key
metricKeys: ["sessions", "pageviews"],
mode: "latest" | "history",
// history mode:
fromISO?: string;
toISO?: string;
limit?: number;
}
```
The resolver reads `external_data_points` filtered by `(tenant_id, data_source_id, metric_key)` and returns `MetricRow[]` — consumed directly by chart and stat-card blocks. The `sourceKey` is auto-provisioned as a `source_type: 'integration'` row in `external_data_sources` when a connector with a `metrics` target first syncs.
The legacy `block.config.externalDataSourceId` bypass in chart/stat-card resolvers is **dated-deprecated** — it will be removed after the views-definition migrator runs in production to rewrite existing configs to named `kind: "metric"` data sources.
### Connection health states
`agent_connections` carries `status` and `status_code` fields:
| Status | Meaning | Common cause |
| --- | --- | --- |
| `active` | Credentials valid, last sync succeeded | — |
| `needs_reauth` | OAuth token expired or revoked | `ITEM_LOGIN_REQUIRED`, 401 |
| `erroring` | Repeated sync failures, not auth-related | Provider downtime, schema change |
| `disabled` | Manually disabled by operator | — |
The sync engine sets these automatically from error codes. The admin workbench surfaces re-auth affordances when status is `needs_reauth`.
## How It Works
### Read sync flow
```
agent_connections
-> resolveIntegrationDefinition(tenantId, slug) // LRU: compiles ConnectorSpec or returns code-first
-> generic puller (fetch + p-retry + 4 pagination strategies)
-> integration_runs (one row per sync)
-> integration_objects (provider-shaped versions, post-redaction)
-> mapper / field evaluator (JMESPath + converter registry)
-> upsertEntityKeyed() [target: entity]
OR recordIntegrationMetrics() [target: metrics]
-> integration_entity_links (provenance)
```
**Remap without re-fetch.** If a mapper changes, call the sync with `mode: "remap"` — the engine replays the stored `integration_objects` through the new mapper without hitting the provider again.
**Dry-run mode.** All syncs support `mode: "dry_run"`: runs the full pipeline, stages writes in the ledger, shows the sample mapping, but never writes entities. The workbench uses dry-run before `apply` is allowed.
### Write flow (provider side effects)
```
proposal
-> integration_operations (status: awaiting_human | approved)
-> integration_operation_events (proposed, approval_requested)
-> operator review in /admin/integrations (Approvals tab)
-> executeIntegrationOperation()
-> provider_request / provider_response events
-> readback + succeeded | failed | readback_mismatch
```
Reviewer payload edits are first-class — a `reviewer_edited` event is written before approval.
### Definition registry tenancy
The global `IntegrationDefinition` registry is code-first only. Spec-compiled connectors resolve per-request via `resolveIntegrationDefinition(tenantId, slug)` with a `(tenantId, slug, version)` LRU cache. No tenant spec is globally registered — specs resolve per-request and are isolated by tenant.
### Source-sync convergence
RSS, API, and web feed sources previously managed by `features/source-sync` now compile to `ConnectorSpec` rows with `request.kind: "feed"` or `"web"`. The deprecated scheduling code (`features/source-sync/scheduling.ts`) and `SOURCE_MONITOR_ACTIONS` custom path have been deleted. A one-time cutover script (`scripts/migrate-sources-to-connectors.ts --apply`) migrates existing source-entity rows to `integration_connectors`.
## API Reference
### Core execution
```ts
// Resolve a connector definition (code-first or compiled spec)
resolveIntegrationDefinition(tenantId: string, slug: string): Promise<IntegrationDefinition>
// Run a sync (entity target)
runRawIntegrationSync(
definition: IntegrationDefinition,
connection: AgentConnection,
options: { mode: "apply" | "dry_run" | "remap"; runType?: "backfill" | "incremental" }
): Promise<IntegrationRunResult>
// Write metric data points (server-only seam — never call from client actions)
recordIntegrationMetrics(
tenantId: string,
sourceKey: string,
integrationSlug: string,
resource: string,
points: MetricPoint[]
): Promise<void>
// Stage a provider write for review
stageIntegrationOperation(
definition: IntegrationDefinition,
payload: unknown,
options: { idempotencyKey?: string; requiresApproval?: boolean }
): Promise<IntegrationOperation>
// Execute an approved write
executeIntegrationOperation(operationId: string): Promise<void>
```
### Sync route
```
POST /api/admin/integrations/[slug]/sync
Body: { connectionId: string; mode?: "apply" | "dry_run" | "remap"; runType?: "backfill" | "incremental" }
```
### Webhook ingress
```
POST /api/webhooks/integrations/[tenantSlug]/[slug]
Headers: webhook-id, webhook-timestamp, webhook-signature (Standard-Webhooks)
OR x-signature (HMAC-SHA256)
```
### Scheduled action
The `integrations.scheduledSync` action slug triggers a sync for a given connector. Register it as a cron action row:
```ts
// actions table row
{
slug: "integrations.scheduledSync",
trigger: { type: "cron", cron: "0 2 * * *" },
input: { connectorSlug: "stripe", connectionId: "...", runType: "incremental" }
}
```
## Admin Workbench
`/admin/integrations` is the operator console:
| Tab | What it shows |
| --- | --- |
| **Connectors** | List of all connectors (code-first and spec-defined); edit spec, run sync, view last run status |
| **Spec Editor** | Author or edit a `ConnectorSpec`; OpenAPI import (paste URL to auto-draft endpoints and field maps); dry-run preview against live data |
| **Synced Data** | Browse `integration_objects` by connector and resource; filter by field value; trigger remap |
| **Connections** | `agent_connections` list with health status badges and re-auth affordances |
| **Approvals** | `integration_operations` awaiting review; approve, reject, or edit the approved payload inline |
| **Runs** | Recent `integration_runs` with status, object counts, and error details |
Raw payloads and operation payloads are served through admin-gated server accessors. RLS may allow summary/provenance reads where safe, but raw object and provider-write inspection remain app-mediated.
## MCP Gateway
Amble's MCP server exposes the integrations surface to external clients at a single endpoint. Clients connect once and Amble handles every provider behind it.
**Catalog tool:** `listIntegrations` — returns the tenant's active connectors with slug, label, resource names, last sync time, and connection health.
**Per-connector resources:** Connectors with `mcpExposure.resources: true` expose their synced data as MCP resources named `integration/{slug}/{resource}`. The resource responds with the latest objects from `integration_objects`.
**Per-connector query tool:** Connectors with `mcpExposure.tools: true` get a scoped `query_{slug}_{resource}` MCP tool for filtered reads.
**`mcp_tool` request kind:** A `ConnectorSpec` resource with `request.kind: "mcp_tool"` syncs data by calling a tool on an external MCP server — useful for providers that expose an MCP server rather than a REST API.
MCP is a **serving layer, not a sync pipeline** — background sync runs on the puller/ledger schedule; MCP tools serve already-synced data on demand.
## For Agents
### Creating a connector via `manageIntegration` tool
```
Use the manageIntegration tool to author a new connector spec. Example prompt:
"Create a Stripe connector that pulls invoices daily. The Stripe connection preset
is 'stripe'. Fetch from /v1/invoices with cursor pagination on the 'ending_before'
field. Map id → externalId, customer → fields.customer_id, amount_due → fields.amount
(convert: moneyToCents), status → fields.status. Target entity type 'invoice'."
The tool will compile and dry-run the spec before saving. Review the dry-run
sample output before confirming apply.
```
### Binding metrics to a dashboard block
1. Create a connector with a `metrics` resource target:
```json
{
"target": {
"kind": "metrics",
"sourceKey": "ga4-sessions",
"metrics": [
{ "key": "sessions", "label": "Sessions", "valuePath": "sessions" },
{ "key": "pageviews", "label": "Page Views", "valuePath": "screenPageViews" }
]
}
}
```
2. After the connector syncs, bind a chart or stat-card block to the data:
```json
{
"dataSources": {
"traffic": {
"kind": "metric",
"sourceKey": "ga4-sessions",
"metricKeys": ["sessions", "pageviews"],
"mode": "history"
}
}
}
```
3. The block receives `MetricRow[]` with `key`, `label`, `value`, `recordedAt`, `metadata`.
### Triggering a sync
```
Use the integrations.scheduledSync action or POST to /api/admin/integrations/{slug}/sync
with { connectionId, mode: "apply", runType: "incremental" }.
```
### Approving a staged write
Staged `integration_operations` with `status: awaiting_human` appear in the Approvals tab of `/admin/integrations`. Agents can also read and approve them via the `integration_operations` entity tools if the agent has the appropriate permissions.
## Design Decisions
**JMESPath for field extraction, not JSONata or jsonpath-plus.** JMESPath is extraction-only with no eval surface — safe for agent-authored field-map expressions by design. JSONata is Turing-complete with a CVE history (CVE-2024-27307 class). jsonpath-plus is eval-based (CVE-2026-1615). Transformations beyond extraction go through the explicit converter registry (`moneyToCents`, `dateToIso`, `lower`, `str`, `refPath`, `negate`) — never an expression language.
**Spec executor produces `IntegrationDefinition`s — no parallel system.** The compiler outputs the same contract code-first connectors implement. One registry, one execution path. Code-first connectors (Plaid, QuickBooks, Acuity) are not migrated to specs unless a change forces a touch — the escape-hatch cost is zero.
**`source_key` on `external_data_sources`, not a new table.** The `kind: "metric"` data source resolves by `(tenant_id, source_key)` — a stable, agent-readable string key. Backfilling `id::text` as `source_key` for existing rows keeps old bypass configs resolving during the migration window.
**Webhook secrets in connection credentials, not in the spec row.** `webhook.secretRef` names a key in the connection's credential store. The spec row is safe to display in admin UI; the secret never leaks through it.
**MCP as serving layer, not sync pipeline.** MCP resources serve already-synced data from `integration_objects`. Background sync is decoupled — it runs on its own schedule/webhook trigger and writes to the ledger. `mcp_tool` resource requests are snapshot-style pulls, not streaming subscriptions.
**Standard Webhooks library (1KB) over Svix infrastructure.** The Standard Webhooks spec is adopted by OpenAI, Anthropic, Supabase, and Twilio — a stable interop target. The raw library requires no hosted infrastructure.
**Fail-closed webhook ingress.** Unverified payloads return 401 — nothing is queued, no partial state written. This is the only safe default for a public inbound endpoint.
## Open Follow-ups
- **SSRF guard consolidation.** `validateOutboundUrl` lives in the puller but now has non-puller consumers. Extract to `features/integrations/lib/outbound-url.ts`; add DNS-resolution pinning against DNS-rebinding. Tag guard rejections with `AbortError` so the retry budget isn't burned on deterministic failures. (P2)
- **Title-template brace parsing.** `renderTitleTemplate` mis-handles nested/unbalanced `{{field}}` braces. Fix in `features/integrations/declarative/evaluate.ts`. (P2)
- **Promote `integrations-no-deep-imports` from warn to error.** PR #2432 shipped the rule at warn to give the 7 known violators a grace window. Fix the violators (listed in `.dependency-cruiser.cjs` rule comment) and flip to error. (P2)
- **Metric-rows dedup helper.** The metric-row → stat-card mapping exists in both the legacy server resolver and the V2 module path. Consolidate into one pure helper once the `externalDataSourceId` bypass is deleted. (P2)
## Related
- [Data Model](/docs/data-model) — `integration_connectors`, `agent_connections`, `external_data_sources` schema
- [Block System](/docs/features/block-system) — how `kind: "metric"` data sources bind to chart and stat-card blocks
- [Agent System](/docs/features/agent-system) — `manageIntegration` tool, MCP connections
- [Actions](/docs/features/actions) — `integrations.scheduledSync` cron action
- [ADR-0039](/docs/architecture) — block engine unification (render-engine contract)
- `documents/adr/0052-metric-data-source-kind.md` — metric data-source kind design