mirror of
https://github.com/rowboatlabs/rowboat.git
synced 2026-04-25 00:16:29 +02:00
serialize knowledge file writes behind a per-path mutex
Concurrent track runs on the same note were corrupting the file. In a fresh workspace, four tracks fired on cron at 05:09:17Z (all failed on AI_LoadAPIKeyError, but each still wrote lastRunAt/lastRunId before the agent ran) and three more fired at 05:09:32Z. The resulting Today.md ended with stray fragments "\n>\nes-->\n-->" — tail pieces of <!--/track-target:priorities--> that a mis-aimed splice had truncated — and the priorities YAML lost its lastRunId entirely. Two compounding issues in knowledge/track/fileops.ts: 1. updateTrackBlock read the file twice: once via fetch() to resolve fenceStart/fenceEnd, and again via fs.readFile to get the bytes to splice. If another writer landed between the reads, the line indices from read #1 pointed into unrelated content in read #2, so the splice replaced the wrong range and left tag fragments behind. 2. None of the mutators (updateContent, updateTrackBlock, replaceTrackBlockYaml, deleteTrackBlock) held any lock, so concurrent read-modify-writes clobbered each other's updates. The missing lastRunId was exactly that: set by one run, overwritten by another run's stale snapshot. The fix: introduce withFileLock(absPath, fn) in knowledge/file-lock.ts, a per-path Promise-chain mutex modeled on the commitLock pattern in knowledge/version_history.ts. Callers append onto that file's chain and await — wait-queue semantics, FIFO, no timeout. The map self-cleans when a file's chain goes idle so it stays bounded across a long-running process. Wrap all four fileops mutators in it, and also wrap workspace.writeFile (which can touch the same files from the agent's tool surface and previously raced with fileops). Both callers key on the resolved absolute path so they share the same lock for the same file. Reads (fetchAll, fetch, fetchYaml) stay lock-free — fs.writeFile on files this size is atomic enough that readers see either pre- or post-state, never corruption, and stale reads are not a correctness issue for the callers that use them (scheduler, event dispatcher). The debounced version-history commit in workspace.writeFile stays outside the lock; it's deferred work that shouldn't hold up the write. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
fbbaeea1df
commit
ae296c7723
3 changed files with 144 additions and 112 deletions
18
apps/x/packages/core/src/knowledge/file-lock.ts
Normal file
18
apps/x/packages/core/src/knowledge/file-lock.ts
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
const locks = new Map<string, Promise<void>>();
|
||||||
|
|
||||||
|
export async function withFileLock<T>(absPath: string, fn: () => Promise<T>): Promise<T> {
|
||||||
|
const prev = locks.get(absPath) ?? Promise.resolve();
|
||||||
|
let release!: () => void;
|
||||||
|
const gate = new Promise<void>((r) => { release = r; });
|
||||||
|
const myTail = prev.then(() => gate);
|
||||||
|
locks.set(absPath, myTail);
|
||||||
|
try {
|
||||||
|
await prev;
|
||||||
|
return await fn();
|
||||||
|
} finally {
|
||||||
|
release();
|
||||||
|
if (locks.get(absPath) === myTail) {
|
||||||
|
locks.delete(absPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -5,6 +5,7 @@ import { parse as parseYaml, stringify as stringifyYaml } from 'yaml';
|
||||||
import { WorkDir } from '../../config/config.js';
|
import { WorkDir } from '../../config/config.js';
|
||||||
import { TrackBlockSchema } from '@x/shared/dist/track-block.js';
|
import { TrackBlockSchema } from '@x/shared/dist/track-block.js';
|
||||||
import { TrackStateSchema } from './types.js';
|
import { TrackStateSchema } from './types.js';
|
||||||
|
import { withFileLock } from '../file-lock.js';
|
||||||
|
|
||||||
const KNOWLEDGE_DIR = path.join(WorkDir, 'knowledge');
|
const KNOWLEDGE_DIR = path.join(WorkDir, 'knowledge');
|
||||||
|
|
||||||
|
|
@ -81,42 +82,46 @@ export async function fetchYaml(filePath: string, trackId: string): Promise<stri
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function updateContent(filePath: string, trackId: string, newContent: string): Promise<void> {
|
export async function updateContent(filePath: string, trackId: string, newContent: string): Promise<void> {
|
||||||
let content = await fs.readFile(absPath(filePath), 'utf-8');
|
return withFileLock(absPath(filePath), async () => {
|
||||||
const openTag = `<!--track-target:${trackId}-->`;
|
let content = await fs.readFile(absPath(filePath), 'utf-8');
|
||||||
const closeTag = `<!--/track-target:${trackId}-->`;
|
const openTag = `<!--track-target:${trackId}-->`;
|
||||||
const openIdx = content.indexOf(openTag);
|
const closeTag = `<!--/track-target:${trackId}-->`;
|
||||||
const closeIdx = content.indexOf(closeTag);
|
const openIdx = content.indexOf(openTag);
|
||||||
if (openIdx !== -1 && closeIdx !== -1 && closeIdx > openIdx) {
|
const closeIdx = content.indexOf(closeTag);
|
||||||
content = content.slice(0, openIdx + openTag.length) + '\n' + newContent + '\n' + content.slice(closeIdx);
|
if (openIdx !== -1 && closeIdx !== -1 && closeIdx > openIdx) {
|
||||||
} else {
|
content = content.slice(0, openIdx + openTag.length) + '\n' + newContent + '\n' + content.slice(closeIdx);
|
||||||
const block = await fetch(filePath, trackId);
|
} else {
|
||||||
if (!block) {
|
const block = await fetch(filePath, trackId);
|
||||||
throw new Error(`Track ${trackId} not found in ${filePath}`);
|
if (!block) {
|
||||||
|
throw new Error(`Track ${trackId} not found in ${filePath}`);
|
||||||
|
}
|
||||||
|
const lines = content.split('\n');
|
||||||
|
const insertAt = Math.min(block.fenceEnd + 1, lines.length);
|
||||||
|
const contentFence = [openTag, newContent, closeTag];
|
||||||
|
lines.splice(insertAt, 0, ...contentFence);
|
||||||
|
content = lines.join('\n');
|
||||||
}
|
}
|
||||||
const lines = content.split('\n');
|
await fs.writeFile(absPath(filePath), content, 'utf-8');
|
||||||
const insertAt = Math.min(block.fenceEnd + 1, lines.length);
|
});
|
||||||
const contentFence = [openTag, newContent, closeTag];
|
|
||||||
lines.splice(insertAt, 0, ...contentFence);
|
|
||||||
content = lines.join('\n');
|
|
||||||
}
|
|
||||||
await fs.writeFile(absPath(filePath), content, 'utf-8');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function updateTrackBlock(filepath: string, trackId: string, updates: Partial<z.infer<typeof TrackBlockSchema>>): Promise<void> {
|
export async function updateTrackBlock(filepath: string, trackId: string, updates: Partial<z.infer<typeof TrackBlockSchema>>): Promise<void> {
|
||||||
const block = await fetch(filepath, trackId);
|
return withFileLock(absPath(filepath), async () => {
|
||||||
if (!block) {
|
const block = await fetch(filepath, trackId);
|
||||||
throw new Error(`Track ${trackId} not found in ${filepath}`);
|
if (!block) {
|
||||||
}
|
throw new Error(`Track ${trackId} not found in ${filepath}`);
|
||||||
block.track = { ...block.track, ...updates };
|
}
|
||||||
|
block.track = { ...block.track, ...updates };
|
||||||
|
|
||||||
// read file contents
|
// read file contents
|
||||||
let content = await fs.readFile(absPath(filepath), 'utf-8');
|
let content = await fs.readFile(absPath(filepath), 'utf-8');
|
||||||
const lines = content.split('\n');
|
const lines = content.split('\n');
|
||||||
const yaml = stringifyYaml(block.track).trimEnd();
|
const yaml = stringifyYaml(block.track).trimEnd();
|
||||||
const yamlLines = yaml ? yaml.split('\n') : [];
|
const yamlLines = yaml ? yaml.split('\n') : [];
|
||||||
lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```');
|
lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```');
|
||||||
content = lines.join('\n');
|
content = lines.join('\n');
|
||||||
await fs.writeFile(absPath(filepath), content, 'utf-8');
|
await fs.writeFile(absPath(filepath), content, 'utf-8');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -127,64 +132,68 @@ export async function updateTrackBlock(filepath: string, trackId: string, update
|
||||||
* otherwise the write is rejected.
|
* otherwise the write is rejected.
|
||||||
*/
|
*/
|
||||||
export async function replaceTrackBlockYaml(filePath: string, trackId: string, newYaml: string): Promise<void> {
|
export async function replaceTrackBlockYaml(filePath: string, trackId: string, newYaml: string): Promise<void> {
|
||||||
const block = await fetch(filePath, trackId);
|
return withFileLock(absPath(filePath), async () => {
|
||||||
if (!block) {
|
const block = await fetch(filePath, trackId);
|
||||||
throw new Error(`Track ${trackId} not found in ${filePath}`);
|
if (!block) {
|
||||||
}
|
throw new Error(`Track ${trackId} not found in ${filePath}`);
|
||||||
const parsed = TrackBlockSchema.safeParse(parseYaml(newYaml));
|
}
|
||||||
if (!parsed.success) {
|
const parsed = TrackBlockSchema.safeParse(parseYaml(newYaml));
|
||||||
throw new Error(`Invalid track YAML: ${parsed.error.message}`);
|
if (!parsed.success) {
|
||||||
}
|
throw new Error(`Invalid track YAML: ${parsed.error.message}`);
|
||||||
if (parsed.data.trackId !== trackId) {
|
}
|
||||||
throw new Error(`trackId cannot be changed (was "${trackId}", got "${parsed.data.trackId}")`);
|
if (parsed.data.trackId !== trackId) {
|
||||||
}
|
throw new Error(`trackId cannot be changed (was "${trackId}", got "${parsed.data.trackId}")`);
|
||||||
|
}
|
||||||
|
|
||||||
const content = await fs.readFile(absPath(filePath), 'utf-8');
|
const content = await fs.readFile(absPath(filePath), 'utf-8');
|
||||||
const lines = content.split('\n');
|
const lines = content.split('\n');
|
||||||
const yamlLines = newYaml.trimEnd().split('\n');
|
const yamlLines = newYaml.trimEnd().split('\n');
|
||||||
lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```');
|
lines.splice(block.fenceStart, block.fenceEnd - block.fenceStart + 1, '```track', ...yamlLines, '```');
|
||||||
await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8');
|
await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a track block and its sibling target region from the file.
|
* Remove a track block and its sibling target region from the file.
|
||||||
*/
|
*/
|
||||||
export async function deleteTrackBlock(filePath: string, trackId: string): Promise<void> {
|
export async function deleteTrackBlock(filePath: string, trackId: string): Promise<void> {
|
||||||
const block = await fetch(filePath, trackId);
|
return withFileLock(absPath(filePath), async () => {
|
||||||
if (!block) {
|
const block = await fetch(filePath, trackId);
|
||||||
// Already gone — treat as success.
|
if (!block) {
|
||||||
return;
|
// Already gone — treat as success.
|
||||||
}
|
return;
|
||||||
|
|
||||||
const content = await fs.readFile(absPath(filePath), 'utf-8');
|
|
||||||
const lines = content.split('\n');
|
|
||||||
const openTag = `<!--track-target:${trackId}-->`;
|
|
||||||
const closeTag = `<!--/track-target:${trackId}-->`;
|
|
||||||
|
|
||||||
// Find target region (may not exist)
|
|
||||||
let targetStart = -1;
|
|
||||||
let targetEnd = -1;
|
|
||||||
for (let i = 0; i < lines.length; i++) {
|
|
||||||
if (lines[i].includes(openTag)) { targetStart = i; }
|
|
||||||
if (targetStart !== -1 && lines[i].includes(closeTag)) { targetEnd = i; break; }
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build a list of [start, end] ranges to remove, sorted descending so
|
|
||||||
// indices stay valid as we splice.
|
|
||||||
const ranges: Array<[number, number]> = [];
|
|
||||||
ranges.push([block.fenceStart, block.fenceEnd]);
|
|
||||||
if (targetStart !== -1 && targetEnd !== -1 && targetEnd >= targetStart) {
|
|
||||||
ranges.push([targetStart, targetEnd]);
|
|
||||||
}
|
|
||||||
ranges.sort((a, b) => b[0] - a[0]);
|
|
||||||
|
|
||||||
for (const [start, end] of ranges) {
|
|
||||||
lines.splice(start, end - start + 1);
|
|
||||||
// Also drop a trailing blank line if the removal left two in a row.
|
|
||||||
if (start < lines.length && lines[start].trim() === '' && start > 0 && lines[start - 1].trim() === '') {
|
|
||||||
lines.splice(start, 1);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8');
|
const content = await fs.readFile(absPath(filePath), 'utf-8');
|
||||||
|
const lines = content.split('\n');
|
||||||
|
const openTag = `<!--track-target:${trackId}-->`;
|
||||||
|
const closeTag = `<!--/track-target:${trackId}-->`;
|
||||||
|
|
||||||
|
// Find target region (may not exist)
|
||||||
|
let targetStart = -1;
|
||||||
|
let targetEnd = -1;
|
||||||
|
for (let i = 0; i < lines.length; i++) {
|
||||||
|
if (lines[i].includes(openTag)) { targetStart = i; }
|
||||||
|
if (targetStart !== -1 && lines[i].includes(closeTag)) { targetEnd = i; break; }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build a list of [start, end] ranges to remove, sorted descending so
|
||||||
|
// indices stay valid as we splice.
|
||||||
|
const ranges: Array<[number, number]> = [];
|
||||||
|
ranges.push([block.fenceStart, block.fenceEnd]);
|
||||||
|
if (targetStart !== -1 && targetEnd !== -1 && targetEnd >= targetStart) {
|
||||||
|
ranges.push([targetStart, targetEnd]);
|
||||||
|
}
|
||||||
|
ranges.sort((a, b) => b[0] - a[0]);
|
||||||
|
|
||||||
|
for (const [start, end] of ranges) {
|
||||||
|
lines.splice(start, end - start + 1);
|
||||||
|
// Also drop a trailing blank line if the removal left two in a row.
|
||||||
|
if (start < lines.length && lines[start].trim() === '' && start > 0 && lines[start - 1].trim() === '') {
|
||||||
|
lines.splice(start, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await fs.writeFile(absPath(filePath), lines.join('\n'), 'utf-8');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -7,6 +7,7 @@ import { RemoveOptions, WriteFileOptions, WriteFileResult } from 'packages/share
|
||||||
import { WorkDir } from '../config/config.js';
|
import { WorkDir } from '../config/config.js';
|
||||||
import { rewriteWikiLinksForRenamedKnowledgeFile } from './wiki-link-rewrite.js';
|
import { rewriteWikiLinksForRenamedKnowledgeFile } from './wiki-link-rewrite.js';
|
||||||
import { commitAll } from '../knowledge/version_history.js';
|
import { commitAll } from '../knowledge/version_history.js';
|
||||||
|
import { withFileLock } from '../knowledge/file-lock.js';
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Path Utilities
|
// Path Utilities
|
||||||
|
|
@ -249,38 +250,42 @@ export async function writeFile(
|
||||||
await fs.mkdir(path.dirname(filePath), { recursive: true });
|
await fs.mkdir(path.dirname(filePath), { recursive: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check expectedEtag if provided (conflict detection)
|
const result = await withFileLock(filePath, async () => {
|
||||||
if (opts?.expectedEtag) {
|
// Check expectedEtag if provided (conflict detection)
|
||||||
const existingStats = await fs.lstat(filePath);
|
if (opts?.expectedEtag) {
|
||||||
const existingEtag = computeEtag(existingStats.size, existingStats.mtimeMs);
|
const existingStats = await fs.lstat(filePath);
|
||||||
if (existingEtag !== opts.expectedEtag) {
|
const existingEtag = computeEtag(existingStats.size, existingStats.mtimeMs);
|
||||||
throw new Error('File was modified (ETag mismatch)');
|
if (existingEtag !== opts.expectedEtag) {
|
||||||
|
throw new Error('File was modified (ETag mismatch)');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Convert data to buffer based on encoding
|
// Convert data to buffer based on encoding
|
||||||
let buffer: Buffer;
|
let buffer: Buffer;
|
||||||
if (encoding === 'utf8') {
|
if (encoding === 'utf8') {
|
||||||
buffer = Buffer.from(data, 'utf8');
|
buffer = Buffer.from(data, 'utf8');
|
||||||
} else if (encoding === 'base64') {
|
} else if (encoding === 'base64') {
|
||||||
buffer = Buffer.from(data, 'base64');
|
buffer = Buffer.from(data, 'base64');
|
||||||
} else {
|
} else {
|
||||||
// binary: assume data is base64-encoded
|
// binary: assume data is base64-encoded
|
||||||
buffer = Buffer.from(data, 'base64');
|
buffer = Buffer.from(data, 'base64');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic) {
|
if (atomic) {
|
||||||
// Atomic write: write to temp file, then rename
|
// Atomic write: write to temp file, then rename
|
||||||
const tempPath = filePath + '.tmp.' + Date.now() + Math.random().toString(36).slice(2);
|
const tempPath = filePath + '.tmp.' + Date.now() + Math.random().toString(36).slice(2);
|
||||||
await fs.writeFile(tempPath, buffer);
|
await fs.writeFile(tempPath, buffer);
|
||||||
await fs.rename(tempPath, filePath);
|
await fs.rename(tempPath, filePath);
|
||||||
} else {
|
} else {
|
||||||
await fs.writeFile(filePath, buffer);
|
await fs.writeFile(filePath, buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
const stats = await fs.lstat(filePath);
|
const stats = await fs.lstat(filePath);
|
||||||
const stat = statToSchema(stats, 'file');
|
const stat = statToSchema(stats, 'file');
|
||||||
const etag = computeEtag(stats.size, stats.mtimeMs);
|
const etag = computeEtag(stats.size, stats.mtimeMs);
|
||||||
|
|
||||||
|
return { stat, etag };
|
||||||
|
});
|
||||||
|
|
||||||
// Schedule a debounced version history commit for knowledge files
|
// Schedule a debounced version history commit for knowledge files
|
||||||
if (relPath.startsWith('knowledge/') && relPath.endsWith('.md')) {
|
if (relPath.startsWith('knowledge/') && relPath.endsWith('.md')) {
|
||||||
|
|
@ -289,8 +294,8 @@ export async function writeFile(
|
||||||
|
|
||||||
return {
|
return {
|
||||||
path: relPath,
|
path: relPath,
|
||||||
stat,
|
stat: result.stat,
|
||||||
etag,
|
etag: result.etag,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue