From ae296c77235df3766ddc60314fca906d5d807c45 Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Tue, 21 Apr 2026 11:11:33 +0530 Subject: [PATCH] serialize knowledge file writes behind a per-path mutex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Concurrent track runs on the same note were corrupting the file. In a fresh workspace, four tracks fired on cron at 05:09:17Z (all failed on AI_LoadAPIKeyError, but each still wrote lastRunAt/lastRunId before the agent ran) and three more fired at 05:09:32Z. The resulting Today.md ended with stray fragments "\n>\nes-->\n-->" — tail pieces of that a mis-aimed splice had truncated — and the priorities YAML lost its lastRunId entirely. Two compounding issues in knowledge/track/fileops.ts: 1. updateTrackBlock read the file twice: once via fetch() to resolve fenceStart/fenceEnd, and again via fs.readFile to get the bytes to splice. If another writer landed between the reads, the line indices from read #1 pointed into unrelated content in read #2, so the splice replaced the wrong range and left tag fragments behind. 2. None of the mutators (updateContent, updateTrackBlock, replaceTrackBlockYaml, deleteTrackBlock) held any lock, so concurrent read-modify-writes clobbered each other's updates. The missing lastRunId was exactly that: set by one run, overwritten by another run's stale snapshot. The fix: introduce withFileLock(absPath, fn) in knowledge/file-lock.ts, a per-path Promise-chain mutex modeled on the commitLock pattern in knowledge/version_history.ts. Callers append onto that file's chain and await — wait-queue semantics, FIFO, no timeout. The map self-cleans when a file's chain goes idle so it stays bounded across a long-running process. Wrap all four fileops mutators in it, and also wrap workspace.writeFile (which can touch the same files from the agent's tool surface and previously raced with fileops). Both callers key on the resolved absolute path so they share the same lock for the same file. Reads (fetchAll, fetch, fetchYaml) stay lock-free — fs.writeFile on files this size is atomic enough that readers see either pre- or post-state, never corruption, and stale reads are not a correctness issue for the callers that use them (scheduler, event dispatcher). The debounced version-history commit in workspace.writeFile stays outside the lock; it's deferred work that shouldn't hold up the write. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../packages/core/src/knowledge/file-lock.ts | 18 ++ .../core/src/knowledge/track/fileops.ts | 173 +++++++++--------- .../packages/core/src/workspace/workspace.ts | 65 ++++--- 3 files changed, 144 insertions(+), 112 deletions(-) create mode 100644 apps/x/packages/core/src/knowledge/file-lock.ts diff --git a/apps/x/packages/core/src/knowledge/file-lock.ts b/apps/x/packages/core/src/knowledge/file-lock.ts new file mode 100644 index 00000000..157188cb --- /dev/null +++ b/apps/x/packages/core/src/knowledge/file-lock.ts @@ -0,0 +1,18 @@ +const locks = new Map>(); + +export async function withFileLock(absPath: string, fn: () => Promise): Promise { + const prev = locks.get(absPath) ?? Promise.resolve(); + let release!: () => void; + const gate = new Promise((r) => { release = r; }); + const myTail = prev.then(() => gate); + locks.set(absPath, myTail); + try { + await prev; + return await fn(); + } finally { + release(); + if (locks.get(absPath) === myTail) { + locks.delete(absPath); + } + } +} diff --git a/apps/x/packages/core/src/knowledge/track/fileops.ts b/apps/x/packages/core/src/knowledge/track/fileops.ts index bc741936..bd731823 100644 --- a/apps/x/packages/core/src/knowledge/track/fileops.ts +++ b/apps/x/packages/core/src/knowledge/track/fileops.ts @@ -5,6 +5,7 @@ import { parse as parseYaml, stringify as stringifyYaml } from 'yaml'; import { WorkDir } from '../../config/config.js'; import { TrackBlockSchema } from '@x/shared/dist/track-block.js'; import { TrackStateSchema } from './types.js'; +import { withFileLock } from '../file-lock.js'; const KNOWLEDGE_DIR = path.join(WorkDir, 'knowledge'); @@ -81,42 +82,46 @@ export async function fetchYaml(filePath: string, trackId: string): Promise { - let content = await fs.readFile(absPath(filePath), 'utf-8'); - const openTag = ``; - const closeTag = ``; - const openIdx = content.indexOf(openTag); - const closeIdx = content.indexOf(closeTag); - if (openIdx !== -1 && closeIdx !== -1 && closeIdx > openIdx) { - content = content.slice(0, openIdx + openTag.length) + '\n' + newContent + '\n' + content.slice(closeIdx); - } else { - const block = await fetch(filePath, trackId); - if (!block) { - throw new Error(`Track ${trackId} not found in ${filePath}`); + return withFileLock(absPath(filePath), async () => { + let content = await fs.readFile(absPath(filePath), 'utf-8'); + const openTag = ``; + const closeTag = ``; + const openIdx = content.indexOf(openTag); + const closeIdx = content.indexOf(closeTag); + if (openIdx !== -1 && closeIdx !== -1 && closeIdx > openIdx) { + content = content.slice(0, openIdx + openTag.length) + '\n' + newContent + '\n' + content.slice(closeIdx); + } else { + const block = await fetch(filePath, trackId); + if (!block) { + throw new Error(`Track ${trackId} not found in ${filePath}`); + } + const lines = content.split('\n'); + const insertAt = Math.min(block.fenceEnd + 1, lines.length); + const contentFence = [openTag, newContent, closeTag]; + lines.splice(insertAt, 0, ...contentFence); + content = lines.join('\n'); } - const lines = content.split('\n'); - const insertAt = Math.min(block.fenceEnd + 1, lines.length); - const contentFence = [openTag, newContent, closeTag]; - lines.splice(insertAt, 0, ...contentFence); - content = lines.join('\n'); - } - await fs.writeFile(absPath(filePath), content, 'utf-8'); + await fs.writeFile(absPath(filePath), content, 'utf-8'); + }); } export async function updateTrackBlock(filepath: string, trackId: string, updates: Partial>): Promise { - const block = await fetch(filepath, trackId); - if (!block) { - throw new Error(`Track ${trackId} not found in ${filepath}`); - } - block.track = { ...block.track, ...updates }; + return withFileLock(absPath(filepath), async () => { + const block = await fetch(filepath, trackId); + if (!block) { + throw new Error(`Track ${trackId} not found in ${filepath}`); + } + block.track = { ...block.track, ...updates }; - // read file contents - let content = await fs.readFile(absPath(filepath), 'utf-8'); - const lines = content.split('\n'); - const yaml = stringifyYaml(block.track).trimEnd(); - const yamlLines = yaml ? yaml.split('\n') : []; - lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```'); - content = lines.join('\n'); - await fs.writeFile(absPath(filepath), content, 'utf-8'); + // read file contents + let content = await fs.readFile(absPath(filepath), 'utf-8'); + const lines = content.split('\n'); + const yaml = stringifyYaml(block.track).trimEnd(); + const yamlLines = yaml ? yaml.split('\n') : []; + lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```'); + content = lines.join('\n'); + await fs.writeFile(absPath(filepath), content, 'utf-8'); + }); } /** @@ -127,64 +132,68 @@ export async function updateTrackBlock(filepath: string, trackId: string, update * otherwise the write is rejected. */ export async function replaceTrackBlockYaml(filePath: string, trackId: string, newYaml: string): Promise { - const block = await fetch(filePath, trackId); - if (!block) { - throw new Error(`Track ${trackId} not found in ${filePath}`); - } - const parsed = TrackBlockSchema.safeParse(parseYaml(newYaml)); - if (!parsed.success) { - throw new Error(`Invalid track YAML: ${parsed.error.message}`); - } - if (parsed.data.trackId !== trackId) { - throw new Error(`trackId cannot be changed (was "${trackId}", got "${parsed.data.trackId}")`); - } + return withFileLock(absPath(filePath), async () => { + const block = await fetch(filePath, trackId); + if (!block) { + throw new Error(`Track ${trackId} not found in ${filePath}`); + } + const parsed = TrackBlockSchema.safeParse(parseYaml(newYaml)); + if (!parsed.success) { + throw new Error(`Invalid track YAML: ${parsed.error.message}`); + } + if (parsed.data.trackId !== trackId) { + throw new Error(`trackId cannot be changed (was "${trackId}", got "${parsed.data.trackId}")`); + } - const content = await fs.readFile(absPath(filePath), 'utf-8'); - const lines = content.split('\n'); - const yamlLines = newYaml.trimEnd().split('\n'); - lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```'); - await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8'); + const content = await fs.readFile(absPath(filePath), 'utf-8'); + const lines = content.split('\n'); + const yamlLines = newYaml.trimEnd().split('\n'); + lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```'); + await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8'); + }); } /** * Remove a track block and its sibling target region from the file. */ export async function deleteTrackBlock(filePath: string, trackId: string): Promise { - const block = await fetch(filePath, trackId); - if (!block) { - // Already gone — treat as success. - return; - } - - const content = await fs.readFile(absPath(filePath), 'utf-8'); - const lines = content.split('\n'); - const openTag = ``; - const closeTag = ``; - - // Find target region (may not exist) - let targetStart = -1; - let targetEnd = -1; - for (let i = 0; i < lines.length; i++) { - if (lines[i].includes(openTag)) { targetStart = i; } - if (targetStart !== -1 && lines[i].includes(closeTag)) { targetEnd = i; break; } - } - - // Build a list of [start, end] ranges to remove, sorted descending so - // indices stay valid as we splice. - const ranges: Array<[number, number]> = []; - ranges.push([block.fenceStart, block.fenceEnd]); - if (targetStart !== -1 && targetEnd !== -1 && targetEnd >= targetStart) { - ranges.push([targetStart, targetEnd]); - } - ranges.sort((a, b) => b[0] - a[0]); - - for (const [start, end] of ranges) { - lines.splice(start, end - start + 1); - // Also drop a trailing blank line if the removal left two in a row. - if (start < lines.length && lines[start].trim() === '' && start > 0 && lines[start - 1].trim() === '') { - lines.splice(start, 1); + return withFileLock(absPath(filePath), async () => { + const block = await fetch(filePath, trackId); + if (!block) { + // Already gone — treat as success. + return; } - } - await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8'); + const content = await fs.readFile(absPath(filePath), 'utf-8'); + const lines = content.split('\n'); + const openTag = ``; + const closeTag = ``; + + // Find target region (may not exist) + let targetStart = -1; + let targetEnd = -1; + for (let i = 0; i < lines.length; i++) { + if (lines[i].includes(openTag)) { targetStart = i; } + if (targetStart !== -1 && lines[i].includes(closeTag)) { targetEnd = i; break; } + } + + // Build a list of [start, end] ranges to remove, sorted descending so + // indices stay valid as we splice. + const ranges: Array<[number, number]> = []; + ranges.push([block.fenceStart, block.fenceEnd]); + if (targetStart !== -1 && targetEnd !== -1 && targetEnd >= targetStart) { + ranges.push([targetStart, targetEnd]); + } + ranges.sort((a, b) => b[0] - a[0]); + + for (const [start, end] of ranges) { + lines.splice(start, end - start + 1); + // Also drop a trailing blank line if the removal left two in a row. + if (start < lines.length && lines[start].trim() === '' && start > 0 && lines[start - 1].trim() === '') { + lines.splice(start, 1); + } + } + + await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8'); + }); } \ No newline at end of file diff --git a/apps/x/packages/core/src/workspace/workspace.ts b/apps/x/packages/core/src/workspace/workspace.ts index de1fe212..c991ce16 100644 --- a/apps/x/packages/core/src/workspace/workspace.ts +++ b/apps/x/packages/core/src/workspace/workspace.ts @@ -7,6 +7,7 @@ import { RemoveOptions, WriteFileOptions, WriteFileResult } from 'packages/share import { WorkDir } from '../config/config.js'; import { rewriteWikiLinksForRenamedKnowledgeFile } from './wiki-link-rewrite.js'; import { commitAll } from '../knowledge/version_history.js'; +import { withFileLock } from '../knowledge/file-lock.js'; // ============================================================================ // Path Utilities @@ -249,38 +250,42 @@ export async function writeFile( await fs.mkdir(path.dirname(filePath), { recursive: true }); } - // Check expectedEtag if provided (conflict detection) - if (opts?.expectedEtag) { - const existingStats = await fs.lstat(filePath); - const existingEtag = computeEtag(existingStats.size, existingStats.mtimeMs); - if (existingEtag !== opts.expectedEtag) { - throw new Error('File was modified (ETag mismatch)'); + const result = await withFileLock(filePath, async () => { + // Check expectedEtag if provided (conflict detection) + if (opts?.expectedEtag) { + const existingStats = await fs.lstat(filePath); + const existingEtag = computeEtag(existingStats.size, existingStats.mtimeMs); + if (existingEtag !== opts.expectedEtag) { + throw new Error('File was modified (ETag mismatch)'); + } } - } - // Convert data to buffer based on encoding - let buffer: Buffer; - if (encoding === 'utf8') { - buffer = Buffer.from(data, 'utf8'); - } else if (encoding === 'base64') { - buffer = Buffer.from(data, 'base64'); - } else { - // binary: assume data is base64-encoded - buffer = Buffer.from(data, 'base64'); - } + // Convert data to buffer based on encoding + let buffer: Buffer; + if (encoding === 'utf8') { + buffer = Buffer.from(data, 'utf8'); + } else if (encoding === 'base64') { + buffer = Buffer.from(data, 'base64'); + } else { + // binary: assume data is base64-encoded + buffer = Buffer.from(data, 'base64'); + } - if (atomic) { - // Atomic write: write to temp file, then rename - const tempPath = filePath + '.tmp.' + Date.now() + Math.random().toString(36).slice(2); - await fs.writeFile(tempPath, buffer); - await fs.rename(tempPath, filePath); - } else { - await fs.writeFile(filePath, buffer); - } + if (atomic) { + // Atomic write: write to temp file, then rename + const tempPath = filePath + '.tmp.' + Date.now() + Math.random().toString(36).slice(2); + await fs.writeFile(tempPath, buffer); + await fs.rename(tempPath, filePath); + } else { + await fs.writeFile(filePath, buffer); + } - const stats = await fs.lstat(filePath); - const stat = statToSchema(stats, 'file'); - const etag = computeEtag(stats.size, stats.mtimeMs); + const stats = await fs.lstat(filePath); + const stat = statToSchema(stats, 'file'); + const etag = computeEtag(stats.size, stats.mtimeMs); + + return { stat, etag }; + }); // Schedule a debounced version history commit for knowledge files if (relPath.startsWith('knowledge/') && relPath.endsWith('.md')) { @@ -289,8 +294,8 @@ export async function writeFile( return { path: relPath, - stat, - etag, + stat: result.stat, + etag: result.etag, }; }