Documentation source
Workflow Reliability and Data Fetching Modernization
Atomic entity writes, React Query migration, parallel workflow resumption
## Problem
Three issues found during PR #391 review that affect data integrity, performance, and code consistency.
### 1. TOCTOU Content Writes — Silent Data Loss
Six locations read `entity.content`, merge a field in JavaScript, then write the whole object back. Between read and write, concurrent operations (extraction, workflows, user edits, API syncs) can write to the same entity. The last writer wins, silently discarding the other's fields.
**Affected locations (6):**
| Location | Client | Severity |
|---|---|---|
| `run-workflow.ts` — `attemptFieldRecoveryFromText` (L467-482) | admin | Critical |
| `submissions.ts` — `submitExternalExtractionResult` fallback (L182-190) | admin | Critical |
| `bulk-actions.ts` — `bulkUpdateField` (L160-190) | authenticated | High |
| `api-actions-keyed.ts` — `updateEntityKeyed` (L274-307) | admin | High |
| `api-actions-keyed.ts` — upsert conflict path (L454-493) | admin | High |
| `actions.ts` — `updateEntity` (L168-225) | authenticated | Medium |
### 2. Raw Fetch in 30 Client Components
30 client components use manual `useState` + `useEffect` + `fetch` with hand-rolled cancellation, loading state, and error handling. No caching, no deduplication, no stale-time. The codebase standardizes on React Query with well-established patterns in `features/*/hooks/`.
### 3. Serial Workflow Resumption
`resumeWorkflowRunsForAgent` processes independent workflow runs serially in a `for...of` loop. Each run involves LLM calls taking 2-10 seconds. Runs are fully independent (different entities, atomic claiming via RPC, no shared state).
---
## Solution
### Workstream A: Atomic Entity Content Writes
**Approach:** Two Postgres `SECURITY INVOKER` RPC functions that use the `||` jsonb merge operator for atomic field-level merges. No TOCTOU window.
#### Migration: Two functions
```sql
CREATE FUNCTION merge_entity_content(
p_entity_id uuid,
p_tenant_id uuid,
p_new_content jsonb
) RETURNS jsonb
LANGUAGE sql
AS $$
UPDATE entities
SET content = COALESCE(content, '{}'::jsonb) || p_new_content
WHERE id = p_entity_id AND tenant_id = p_tenant_id
RETURNING content;
$$;
CREATE FUNCTION merge_entity_metadata(
p_entity_id uuid,
p_tenant_id uuid,
p_new_metadata jsonb
) RETURNS jsonb
LANGUAGE sql
AS $$
UPDATE entities
SET metadata = COALESCE(metadata, '{}'::jsonb) || p_new_metadata
WHERE id = p_entity_id AND tenant_id = p_tenant_id
RETURNING metadata;
$$;
GRANT EXECUTE ON FUNCTION merge_entity_content TO authenticated, service_role;
GRANT EXECUTE ON FUNCTION merge_entity_metadata TO authenticated, service_role;
```
**Why `SECURITY INVOKER` (default):** Two of six callers use `createClient()` (authenticated) where RLS enforces tenant access. Four use `createAdminClient()` (service_role) which bypasses RLS. By using the Postgres default (INVOKER), RLS behavior follows the calling client — preserving the existing security model exactly.
**Why not SECURITY DEFINER:** Would bypass RLS for all callers, removing tenant-scoping protection from `updateEntity()` and `bulkUpdateField()`.
#### Caller migration pattern
```typescript
// Before: read → JS merge → write (TOCTOU vulnerable)
const { data: entity } = await client.from("entities").select("content").eq("id", id).single();
await client.from("entities").update({ content: { ...entity.content, [field]: value } }).eq("id", id);
// After: single atomic call
const { data: merged } = await client.rpc("merge_entity_content", {
p_entity_id: id,
p_tenant_id: tenantId,
p_new_content: { [field]: value },
});
```
Each caller keeps its existing client (`createClient` or `createAdminClient`). Activity logging, Inngest event emission (`emitEntityUpdatedEvent`), and embedding regeneration remain caller responsibilities — unchanged from today.
#### What existing triggers handle automatically
- **Audit log:** AFTER UPDATE trigger captures full old/new JSONB
- **`updated_at`:** BEFORE UPDATE trigger auto-bumps timestamp
- **Realtime:** Supabase Changes subscription fires on any UPDATE
- **Embeddings:** `emitEntityUpdatedEvent()` → Inngest `entity/updated` → `entityEmbedding` function regenerates `entities.embedding` vector. This chain is already at the caller level.
#### What callers must continue to handle
- `logActivity()` after the RPC (where it exists today)
- `emitEntityUpdatedEvent()` after the RPC (triggers embedding regen + downstream workflows)
- Locked field filtering before calling the RPC (extraction paths already do this)
#### Special cases
- **`bulk-actions.ts`:** Biggest win — eliminates the read step entirely. Current code reads all entities then writes back. With RPC, it's N parallel `merge_entity_content` calls with no read.
- **`api-actions-keyed.ts`:** Both content and metadata need merging. Two RPC calls replace the read-merge-write.
- **`actions.ts`:** Merges both content and metadata. Conditionally calls each RPC only when the respective field is in the input.
---
### Workstream B: React Query Migration
**Approach:** Batch migration by feature module. For each module, create `hooks/use-*.ts` files with `useQuery`/`useMutation` hooks, then update components to consume them.
#### Standard hook pattern (follows existing codebase conventions)
```typescript
"use client";
import { useQuery } from "@tanstack/react-query";
export function myDataQueryKey(id: string) {
return ["my-data", id] as const;
}
export function useMyData(id: string) {
return useQuery({
queryKey: myDataQueryKey(id),
queryFn: async () => {
const res = await fetch(`/api/my-data/${id}`);
if (!res.ok) throw new Error("Failed to fetch");
return res.json() as Promise<MyDataType>;
},
staleTime: 60_000,
});
}
```
#### Stale time conventions (from existing codebase)
| Data volatility | staleTime | Examples |
|---|---|---|
| High (real-time) | 10-15s | Chat messages, workflow activity |
| Medium (interactive) | 30-60s | Entity data, relations, history |
| Low (config/admin) | 5min | Entity types, criteria sets, stats |
#### Migration batches (30 components, ~6 batches)
**Batch 1 — Admin CRUD (simple GET/POST/PATCH/DELETE patterns):**
- `webhooks/webhook-admin.tsx`
- `api-keys/api-key-admin.tsx`
- `templates/templates-admin.tsx`
- `roles/roles-admin.tsx`
- `ai-models/models-admin.tsx`
**Batch 2 — Admin stats & monitoring:**
- `entities/components/admin/field-extraction-stats.tsx`
- `cost/cost-summary.tsx`
- `agents/jobs-dashboard.tsx`
- `audit/audit-log-viewer.tsx`
**Batch 3 — Documents:**
- `documents/document-library.tsx`
- `documents/document-detail.tsx`
- `documents/document-chunks-viewer.tsx`
- `documents/document-picker-dialog.tsx`
- `documents/document-link-dialog.tsx`
- `documents/document-linked-list.tsx`
**Batch 4 — Agents & connections:**
- `agents/version-history.tsx`
- `agents/connection-dialog.tsx`
- `agents/connections-tab.tsx`
- `source-sync/source-editor-dialog.tsx`
- `source-sync/source-monitor-target-picker.tsx`
**Batch 5 — Memory, skills, blocks:**
- `memory/memory-list.tsx`
- `skills/skill-list.tsx`
- `skills/skill-editor.tsx`
- `blocks/tool-block.tsx`
- `blocks/data-table-block.tsx`
**Batch 6 — Chat (evaluate individually):**
- `chat/chat-page-client.tsx`
- `chat/components/message-input.tsx`
- `chat/components/message-thread.tsx`
- `chat/components/model-selector.tsx`
- `chat/dock/components/dock-window.tsx`
- `chat/chat-panel.tsx`
Chat components may have streaming, WebSocket, or AI SDK transport patterns that don't fit `useQuery`. Each should be evaluated — migrate where it makes sense, skip where the fetch is tightly coupled to streaming/transport logic.
#### Mutation pattern (for CRUD components)
```typescript
import { useMutation, useQueryClient } from "@tanstack/react-query";
export function useDeleteWebhook() {
const queryClient = useQueryClient();
return useMutation({
mutationFn: async (id: string) => {
const res = await fetch(`/api/webhooks/${id}`, { method: "DELETE" });
if (!res.ok) throw new Error("Failed to delete");
},
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: ["webhooks"] });
},
});
}
```
---
### Workstream C: Parallel Workflow Resumption
**Approach:** Replace serial `for...of` loop with batched `Promise.allSettled`, concurrency limit of 3.
#### Current code (serial)
```typescript
for (const runId of runIds) {
const result = await advanceWorkflowRun(runId, options);
results.push(result);
}
```
#### New code (batched parallel)
```typescript
const CONCURRENCY_LIMIT = 3;
for (let i = 0; i < runIds.length; i += CONCURRENCY_LIMIT) {
const batch = runIds.slice(i, i + CONCURRENCY_LIMIT);
const settled = await Promise.allSettled(
batch.map((runId) => advanceWorkflowRun(runId, options)),
);
for (const result of settled) {
if (result.status === "fulfilled") {
claimedNodeCount += result.value.claimedNodeCount ?? 0;
results.push(result.value);
} else {
results.push({
status: "failed",
fieldsExtracted: [],
errors: { _run: String(result.reason) },
});
}
}
}
```
**Why batched, not sliding-window:** Simple, no new dependencies, covers the common case (5-15 runs per heartbeat). Can upgrade to sliding-window later if monitoring shows batch-boundary waste.
**Why `Promise.allSettled`:** Maintains current error isolation — one failed run doesn't block others.
**Why concurrency limit of 3:**
- Prevents DB connection pool exhaustion
- Limits concurrent AI provider calls (rate limit protection)
- 3x throughput improvement for the common case
- Tunable constant — can increase after monitoring
---
## Acceptance Criteria
### Workstream A
- [ ] Migration creates `merge_entity_content` and `merge_entity_metadata` RPC functions
- [ ] All 6 TOCTOU locations migrated to use RPC
- [ ] `pnpm db:types` regenerated after migration
- [ ] Existing tests pass (callers produce same observable behavior)
- [ ] New test: concurrent content writes to same entity preserve both fields
### Workstream B
- [ ] Each batch creates hook files in the appropriate `features/*/hooks/` directory
- [ ] Components use `useQuery` for reads, `useMutation` for writes
- [ ] Query keys exported for cache invalidation
- [ ] StaleTime set per data volatility conventions
- [ ] Chat components individually evaluated — skip where streaming/transport doesn't fit
- [ ] No raw `useEffect` + `fetch` patterns remain (except justified skips)
### Workstream C
- [ ] `resumeWorkflowRunsForAgent` uses batched `Promise.allSettled` with limit 3
- [ ] Failed runs produce error results (not thrown exceptions)
- [ ] Existing workflow tests pass
- [ ] New test: multiple runs process concurrently (timing assertion)
---
## Files Touched
### Workstream A (migration + 6 callers)
- `supabase/migrations/YYYYMMDD_NNN_atomic_entity_content_merge.sql`
- `features/workflows/run-workflow.ts`
- `features/entities/extraction/server/submissions.ts`
- `features/entities/server/bulk-actions.ts`
- `features/entities/server/api-actions-keyed.ts`
- `features/entities/server/actions.ts`
- `lib/supabase/database.types.ts` (regenerated)
### Workstream B (~30 components + ~15 new hook files)
- `features/webhooks/hooks/use-webhooks.ts` (new)
- `features/api-keys/hooks/use-api-keys.ts` (new)
- `features/templates/hooks/use-templates.ts` (new)
- `features/roles/hooks/use-roles.ts` (new)
- `features/ai-models/hooks/use-models.ts` (new)
- `features/entities/hooks/use-field-stats.ts` (new)
- `features/cost/hooks/use-cost-summary.ts` (new)
- `features/agents/hooks/use-agent-jobs.ts` (new)
- `features/audit/hooks/use-audit-log.ts` (new)
- `features/documents/hooks/use-documents.ts` (new)
- `features/agents/hooks/use-agent-connections.ts` (new)
- `features/memory/hooks/use-memories.ts` (new)
- `features/skills/hooks/use-skills.ts` (new)
- `features/blocks/hooks/use-tool-block.ts` (new)
- Plus all 30 component files updated
### Workstream C (1 file)
- `features/workflows/run-workflow.ts`
---
## Execution Order
1. **Workstream A** — highest severity (data integrity), smallest scope
2. **Workstream C** — single file, immediate throughput win
3. **Workstream B** — largest scope, lowest risk, incremental batches
Workstreams A and C can be done in a single PR. Workstream B should be separate PRs per batch (6 PRs).
---
## Trade-offs
- **RPC functions add a DB migration** but eliminate an entire class of data corruption bugs
- **React Query migration touches 30 files** but brings caching, deduplication, and stale-time management to all of them — plus removes ~600 lines of manual fetch boilerplate
- **Concurrency limit of 3 is conservative** — can be tuned up after monitoring shows it's safe
- **Chat components may not all migrate** — streaming and AI SDK transport patterns may not fit useQuery. That's fine — the goal is to migrate where it makes sense, not force 100% coverage.