Update setup and ingest flows

This commit is contained in:
Luca Martial 2026-05-10 23:13:17 -07:00
parent b3dcb577d9
commit c82989119b
29 changed files with 1253 additions and 66 deletions

View file

@ -256,6 +256,31 @@ describe('GitService', () => {
await service.removeWorktree(wtDir).catch(() => undefined);
await rm(wtDir, { recursive: true, force: true }).catch(() => undefined);
});
it('serializes concurrent commits from scoped services targeting the same worktree', async () => {
const { commitHash } = await writeAndCommit('seed.md', 'seed');
const parent = await realpath(join(tempDir, '..'));
const wtDir = join(parent, `wt-${Date.now()}-fw-concurrent`);
await service.addWorktree(wtDir, 'session/concurrent', commitHash);
const first = service.forWorktree(wtDir);
const second = service.forWorktree(wtDir);
await writeFile(join(wtDir, 'a.md'), 'a\n', 'utf-8');
await writeFile(join(wtDir, 'b.md'), 'b\n', 'utf-8');
const [a, b] = await Promise.all([
first.commitFile('a.md', 'add a', 'System User', 'system@example.com'),
second.commitFile('b.md', 'add b', 'System User', 'system@example.com'),
]);
expect(a.commitHash).toMatch(/^[0-9a-f]{40}$/);
expect(b.commitHash).toMatch(/^[0-9a-f]{40}$/);
await expect(first.getFileAtCommit('a.md', a.commitHash)).resolves.toBe('a\n');
await expect(second.getFileAtCommit('b.md', b.commitHash)).resolves.toBe('b\n');
await service.removeWorktree(wtDir).catch(() => undefined);
await rm(wtDir, { recursive: true, force: true }).catch(() => undefined);
});
});
describe('squashMergeIntoMain', () => {

View file

@ -32,6 +32,8 @@ export type SquashMergeResult =
| { ok: false; conflict: true; conflictPaths: string[] };
export class GitService {
private static readonly mutationQueues = new Map<string, Promise<void>>();
private readonly logger: KtxLogger;
private git!: SimpleGit;
private configDir: string;
@ -92,6 +94,15 @@ export class GitService {
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
return this.withMutationQueue(() => this.commitFileUnlocked(filePath, commitMessage, author, authorEmail));
}
private async commitFileUnlocked(
filePath: string,
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
try {
// Stage the file
@ -166,6 +177,15 @@ export class GitService {
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
return this.withMutationQueue(() => this.commitFilesUnlocked(filePaths, commitMessage, author, authorEmail));
}
private async commitFilesUnlocked(
filePaths: string[],
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
try {
for (const filePath of filePaths) {
@ -231,6 +251,10 @@ export class GitService {
if (filePaths.length === 0) {
return;
}
return this.withMutationQueue(() => this.checkoutFilesUnlocked(filePaths));
}
private async checkoutFilesUnlocked(filePaths: string[]): Promise<void> {
try {
await this.git.checkout(['--', ...filePaths]);
} catch (error) {
@ -292,6 +316,10 @@ export class GitService {
if (!trimmed) {
return;
}
return this.withMutationQueue(() => this.addNoteUnlocked(commitHash, trimmed));
}
private async addNoteUnlocked(commitHash: string, trimmed: string): Promise<void> {
try {
await this.git.raw(['notes', 'add', '-f', '-m', trimmed, commitHash]);
} catch (error) {
@ -343,6 +371,15 @@ export class GitService {
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
return this.withMutationQueue(() => this.deleteFileUnlocked(filePath, commitMessage, author, authorEmail));
}
private async deleteFileUnlocked(
filePath: string,
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
try {
// Remove the file from git
@ -485,6 +522,13 @@ export class GitService {
async squashTo(
preHead: string,
options: { message: string; author: string; authorEmail: string; expectedAuthor?: string },
): Promise<{ squashed: boolean; commitHash: string | null; reason?: string; squashedCount?: number }> {
return this.withMutationQueue(() => this.squashToUnlocked(preHead, options));
}
private async squashToUnlocked(
preHead: string,
options: { message: string; author: string; authorEmail: string; expectedAuthor?: string },
): Promise<{ squashed: boolean; commitHash: string | null; reason?: string; squashedCount?: number }> {
const { message, author, authorEmail } = options;
const expectedAuthor = options.expectedAuthor ?? author;
@ -560,6 +604,15 @@ export class GitService {
author: string,
authorEmail: string,
commitMessage: string,
): Promise<SquashMergeResult> {
return this.withMutationQueue(() => this.squashMergeIntoMainUnlocked(branch, author, authorEmail, commitMessage));
}
private async squashMergeIntoMainUnlocked(
branch: string,
author: string,
authorEmail: string,
commitMessage: string,
): Promise<SquashMergeResult> {
// Diff of HEAD..branch (two dots) lists commits/files reachable from `branch` that
// aren't on HEAD — i.e. exactly what the squash would apply. Three dots (HEAD...branch)
@ -615,7 +668,7 @@ export class GitService {
* range, which can pause the sequencer on conflicts.
*/
async resetHardTo(targetSha: string): Promise<void> {
await this.git.raw(['reset', '--hard', targetSha]);
await this.withMutationQueue(() => this.git.raw(['reset', '--hard', targetSha]));
}
/**
@ -667,6 +720,10 @@ export class GitService {
* Used by the memory agent to isolate per-session writes from interactive saves on main.
*/
async addWorktree(path: string, branch: string, startSha: string): Promise<void> {
await this.withMutationQueue(() => this.addWorktreeUnlocked(path, branch, startSha));
}
private async addWorktreeUnlocked(path: string, branch: string, startSha: string): Promise<void> {
try {
await this.git.raw(['worktree', 'add', '-b', branch, path, startSha]);
} catch (error) {
@ -679,6 +736,10 @@ export class GitService {
* worktrees are ktx-internal a clean working tree is not required.
*/
async removeWorktree(path: string): Promise<void> {
await this.withMutationQueue(() => this.removeWorktreeUnlocked(path));
}
private async removeWorktreeUnlocked(path: string): Promise<void> {
try {
await this.git.raw(['worktree', 'remove', '--force', path]);
} catch (error) {
@ -724,7 +785,7 @@ export class GitService {
}
async deleteBranch(branch: string, force = false): Promise<void> {
await this.git.raw(['branch', force ? '-D' : '-d', branch]);
await this.withMutationQueue(() => this.git.raw(['branch', force ? '-D' : '-d', branch]));
}
/**
@ -745,6 +806,15 @@ export class GitService {
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
return this.withMutationQueue(() => this.deleteDirectoryUnlocked(directoryPath, commitMessage, author, authorEmail));
}
private async deleteDirectoryUnlocked(
directoryPath: string,
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
try {
// Remove the directory recursively from git
@ -795,6 +865,17 @@ export class GitService {
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
return this.withMutationQueue(() =>
this.deleteDirectoriesUnlocked(directoryPaths, commitMessage, author, authorEmail),
);
}
private async deleteDirectoriesUnlocked(
directoryPaths: string[],
commitMessage: string,
author: string,
authorEmail: string,
): Promise<GitCommitInfo> {
if (directoryPaths.length === 0) {
return {
@ -852,4 +933,27 @@ export class GitService {
created: true,
};
}
private async withMutationQueue<T>(operation: () => Promise<T>): Promise<T> {
const key = this.configDir;
const previous = GitService.mutationQueues.get(key) ?? Promise.resolve();
let release: () => void = () => {};
const current = previous.catch(() => undefined).then(
() =>
new Promise<void>((resolve) => {
release = resolve;
}),
);
GitService.mutationQueues.set(key, current);
await previous.catch(() => undefined);
try {
return await operation();
} finally {
release();
if (GitService.mutationQueues.get(key) === current) {
GitService.mutationQueues.delete(key);
}
}
}
}

View file

@ -284,6 +284,18 @@ describe('chunkMetabaseStagedDir — syncMode enum coverage', () => {
expect(allRawFiles).not.toContain('cards/200.json');
});
it('ONLY with no selections includes every matching card for old generated configs', async () => {
await writeInline(dir, 'sync-config.json', {
...BASE_SYNC,
syncMode: 'ONLY',
selections: [],
});
const result = await chunkMetabaseStagedDir(dir);
const allRawFiles = result.workUnits.flatMap((wu) => wu.rawFiles);
expect(allRawFiles).toContain('cards/100.json');
expect(allRawFiles).toContain('cards/200.json');
});
it('EXCEPT excludes cards in selected collections; includes the rest', async () => {
await writeInline(dir, 'sync-config.json', {
...BASE_SYNC,

View file

@ -66,7 +66,7 @@ function cardMatchesSyncConfig(card: StagedCardFile, config: StagedSyncConfig):
if (card.archived) {
return false;
}
if (config.syncMode === 'ALL') {
if (config.syncMode === 'ALL' || (config.syncMode === 'ONLY' && config.selections.length === 0)) {
return true;
}
const selectedCollections = new Set(

View file

@ -327,6 +327,40 @@ describe('MetabaseClient.getResolvedSql', () => {
expect(result?.resolvedSql).toBe('SELECT * FROM (SELECT a, b FROM base) t ');
});
it('inlines native-query snippets before checking for remaining variables', async () => {
const requestSpy = vi.fn().mockResolvedValue([
{
id: 1,
name: 'account_join',
content: 'LEFT JOIN accounts a ON a.account_id = mart.account_id',
},
]);
const requestWithCustomRetrySpy = vi.fn();
const client = makeClient((client) => {
Reflect.set(client, 'request', requestSpy);
Reflect.set(client, 'requestWithCustomRetry', requestWithCustomRetrySpy);
});
const card = nativeCard('SELECT a.account_name FROM mart {{snippet: account_join}}', {
'snippet: account_join': {
id: 'snippet-tag',
name: 'snippet: account_join',
type: 'snippet',
'snippet-name': 'account_join',
'snippet-id': 1,
},
});
const result = await client.getResolvedSql(card);
expect(requestSpy).toHaveBeenCalledWith('GET', '/api/native-query-snippet');
expect(requestWithCustomRetrySpy).not.toHaveBeenCalled();
expect(result?.resolutionStatus).toBe('resolved');
expect(result?.resolvedSql).toBe(
'SELECT a.account_name FROM mart LEFT JOIN accounts a ON a.account_id = mart.account_id',
);
expect(result?.resolvedSql).not.toContain('{{snippet:');
});
it('uses /api/dataset/native for naked variables and prepends a warning comment', async () => {
const requestSpy = vi.fn().mockResolvedValue({ query: "SELECT * WHERE id = 'placeholder' AND n = 1" });
const client = makeClient((client) => {

View file

@ -39,6 +39,13 @@ interface TemplateTagInfo {
dummyValue: string | null;
}
interface NativeQuerySnippet {
id: number;
name: string;
content: string;
archived?: boolean | null;
}
interface CreateCardParams {
name: string;
databaseId: number;
@ -100,6 +107,43 @@ function collectRemainingPlaceholderNames(sql: string): Set<string> {
return names;
}
function collectRemainingSnippetNames(sql: string): Set<string> {
const names = new Set<string>();
for (const match of sql.matchAll(/\{\{\s*snippet:\s*([^}]+?)\s*\}\}/gi)) {
names.add(match[1].trim());
}
return names;
}
function normalizeSnippetName(name: string | null | undefined): string {
return (name ?? '').replace(/^snippet:\s*/i, '').trim().toLowerCase();
}
function parseNativeQuerySnippets(value: unknown): NativeQuerySnippet[] {
const rawItems = Array.isArray(value)
? value
: typeof value === 'object' && value !== null && Array.isArray((value as { data?: unknown }).data)
? (value as { data: unknown[] }).data
: [];
const snippets: NativeQuerySnippet[] = [];
for (const item of rawItems) {
if (typeof item !== 'object' || item === null || Array.isArray(item)) {
continue;
}
const rec = item as Record<string, unknown>;
if (typeof rec.id !== 'number' || typeof rec.name !== 'string' || typeof rec.content !== 'string') {
continue;
}
snippets.push({
id: rec.id,
name: rec.name,
content: rec.content,
...(typeof rec.archived === 'boolean' ? { archived: rec.archived } : {}),
});
}
return snippets;
}
function injectNativeSql(datasetQuery: MetabaseDatasetQuery, sql: string): MetabaseDatasetQuery {
if (datasetQuery?.stages?.[0]?.native !== undefined) {
const stages = [...(datasetQuery.stages ?? [])];
@ -148,6 +192,7 @@ export class MetabaseClient implements MetabaseRuntimeClient {
private readonly logger: MetabaseClientLogger;
private readonly baseUrl: string;
private readonly config: MetabaseClientConfig;
private snippetCache: Promise<NativeQuerySnippet[]> | null = null;
constructor(
runtime: MetabaseClientRuntimeConfig,
@ -261,6 +306,63 @@ export class MetabaseClient implements MetabaseRuntimeClient {
return this.request<MetabaseCardSummary[]>('GET', '/api/card/?f=all');
}
private getNativeQuerySnippets(): Promise<NativeQuerySnippet[]> {
this.snippetCache ??= this.request<unknown>('GET', '/api/native-query-snippet').then(parseNativeQuerySnippets);
return this.snippetCache;
}
private async inlineNativeQuerySnippets(
sql: string,
templateTags: MetabaseTemplateTag[],
cardId: number,
): Promise<{ sql: string; unresolved: string[] }> {
const names = collectRemainingSnippetNames(sql);
if (names.size === 0) {
return { sql, unresolved: [] };
}
let snippets: NativeQuerySnippet[];
try {
snippets = await this.getNativeQuerySnippets();
} catch (error) {
this.logger.warn(
`[metabase] failed to load native query snippets for card ${cardId}; leaving snippet placeholders unresolved: ${error instanceof Error ? error.message : String(error)}`,
);
return { sql, unresolved: [...names] };
}
const snippetsById = new Map<number, NativeQuerySnippet>();
const snippetsByName = new Map<string, NativeQuerySnippet>();
for (const snippet of snippets) {
if (snippet.archived === true) {
continue;
}
snippetsById.set(snippet.id, snippet);
snippetsByName.set(normalizeSnippetName(snippet.name), snippet);
}
const snippetTags = templateTags.filter((tag) => tag.type === 'snippet');
const unresolved = new Set<string>();
const inlinedSql = sql.replace(/\{\{\s*snippet:\s*([^}]+?)\s*\}\}/gi, (match, rawName: string) => {
const normalizedName = normalizeSnippetName(rawName);
const tag = snippetTags.find(
(candidate) =>
normalizeSnippetName(candidate['snippet-name']) === normalizedName ||
normalizeSnippetName(candidate.name) === normalizedName,
);
const snippet =
(typeof tag?.['snippet-id'] === 'number' ? snippetsById.get(tag['snippet-id']) : undefined) ??
snippetsByName.get(normalizedName);
if (!snippet) {
unresolved.add(rawName.trim());
return match;
}
return snippet.content;
});
return { sql: inlinedSql, unresolved: [...unresolved] };
}
async convertMbqlToNative(datasetQuery: MetabaseDatasetQuery): Promise<MetabaseNativeQueryResult> {
return this.request<MetabaseNativeQueryResult>('POST', '/api/dataset/native', {
...datasetQuery,
@ -351,7 +453,18 @@ export class MetabaseClient implements MetabaseRuntimeClient {
// silently filter rows out — see incident with auction_seller_bidder_pair_suspicion).
let processedSql = stripOptionalClauses(nativeQuery);
// Step 2: inline {{#CARD_ID}} card references locally. Recursively strip optional
// Step 2: inline native-query snippets. Metabase's substitution endpoint does not
// always expand {{snippet: name}} for fetched card SQL, but the snippets API does.
const snippetResult = await this.inlineNativeQuerySnippets(processedSql, templateTagEntries, card.id);
processedSql = snippetResult.sql;
if (snippetResult.unresolved.length > 0) {
this.logger.warn(
`[metabase] card ${card.id} has unresolved SQL snippets: ${snippetResult.unresolved.join(', ')}`,
);
return { resolvedSql: processedSql, templateTags, resolutionStatus: 'fallback' };
}
// Step 3: inline {{#CARD_ID}} card references locally. Recursively strip optional
// clauses in referenced cards too — the same reasoning applies all the way down.
try {
processedSql = await expandCardReferences(processedSql, {
@ -361,7 +474,17 @@ export class MetabaseClient implements MetabaseRuntimeClient {
if (!referencedNative) {
throw new Error(`referenced card ${id} has no native query`);
}
return { native_query: stripOptionalClauses(referencedNative) };
const referencedSnippetResult = await this.inlineNativeQuerySnippets(
stripOptionalClauses(referencedNative),
Object.values(this.getTemplateTags(referenced)),
referenced.id,
);
if (referencedSnippetResult.unresolved.length > 0) {
throw new Error(
`referenced card ${id} has unresolved SQL snippets: ${referencedSnippetResult.unresolved.join(', ')}`,
);
}
return { native_query: referencedSnippetResult.sql };
},
});
} catch (err) {
@ -372,7 +495,7 @@ export class MetabaseClient implements MetabaseRuntimeClient {
throw err;
}
// Step 3: collect template tags that still appear in the SQL after strip + inline.
// Step 4: collect template tags that still appear in the SQL after strip + inline.
// Anything bracketed-only is gone now; anything card-referenced is inlined.
const remainingNames = collectRemainingPlaceholderNames(processedSql);
const remainingTags = templateTagEntries.filter((tag) => tag.type !== 'snippet' && remainingNames.has(tag.name));
@ -381,7 +504,7 @@ export class MetabaseClient implements MetabaseRuntimeClient {
return { resolvedSql: processedSql, templateTags, resolutionStatus: 'resolved' };
}
// Step 4: dummy-substitute the remaining naked {{ var }} placeholders via Metabase's
// Step 5: dummy-substitute the remaining naked {{ var }} placeholders via Metabase's
// substitution endpoint. Only required because we can't translate dimension-tag
// bindings to warehouse columns ourselves. Prepend a SQL comment listing every
// dummy substitution so downstream consumers (the metabase_ingest LLM) know which

View file

@ -57,13 +57,9 @@ describe('computeFetchScope', () => {
});
});
it('returns empty explicit scope for ONLY with no selections', () => {
it('treats generated ONLY with no selections as all', () => {
const scope = computeFetchScope({ ...BASE_CONFIG, syncMode: 'ONLY', selections: [] });
expect(scope).toEqual({
kind: 'explicit',
includeCardIds: new Set(),
includeCollectionIds: new Set(),
});
expect(scope).toEqual({ kind: 'all' });
});
});

View file

@ -11,7 +11,7 @@ export type FetchScope =
* union the fetcher switches on. Pure function; no I/O, no side effects.
*/
export function computeFetchScope(syncConfig: StagedSyncConfig): FetchScope {
if (syncConfig.syncMode === 'ALL') {
if (syncConfig.syncMode === 'ALL' || (syncConfig.syncMode === 'ONLY' && syncConfig.selections.length === 0)) {
return { kind: 'all' };
}
const cardIds = new Set<number>();

View file

@ -79,6 +79,21 @@ function countMemoryFlowActions(actions: MemoryAction[], target: MemoryAction['t
return actions.filter((action) => action.target === target).length;
}
function isStructuredToolFailure(output: unknown): boolean {
if (!output || typeof output !== 'object') {
return false;
}
const structured = (output as { structured?: unknown }).structured;
return !!structured && typeof structured === 'object' && (structured as { success?: unknown }).success === false;
}
function isFailedToolCall(entry: ToolCallLogEntry): boolean {
if (entry.error) {
return true;
}
return (entry.toolName === 'sl_write_source' || entry.toolName === 'wiki_write') && isStructuredToolFailure(entry.output);
}
function reportIdFromCreateResult(result: unknown): string | undefined {
if (!result || typeof result !== 'object' || !('id' in result)) {
return undefined;
@ -344,7 +359,7 @@ export class IngestBundleRunner {
toolNames: new Set<string>(),
} satisfies MutableToolTranscriptSummary);
current.toolCallCount += 1;
current.errorCount += entry.error ? 1 : 0;
current.errorCount += isFailedToolCall(entry) ? 1 : 0;
current.toolNames.add(entry.toolName);
transcriptSummaries.set(entry.wuKey, current);
};
@ -712,6 +727,7 @@ export class IngestBundleRunner {
sourceKey: job.sourceKey,
connectionId: job.connectionId,
jobId: job.jobId,
toolFailureCount: (unitKey) => transcriptSummaries.get(unitKey)?.errorCount ?? 0,
onStepFinish: ({ stepIndex, stepBudget }) => {
memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget });
},

View file

@ -1,6 +1,7 @@
import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import Database from 'better-sqlite3';
import { AgentRunnerService } from '../agent/index.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js';
import { makeLocalGitRepo } from '../test/make-local-git-repo.js';
@ -57,6 +58,34 @@ class LookerSlWritingAgentRunner extends AgentRunnerService {
}
}
class WikiWritingAgentRunner extends AgentRunnerService {
override runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags?.operationName === 'ingest-bundle-wu') {
const wikiWrite = params.toolSet.wiki_write;
if (!wikiWrite?.execute) {
throw new Error('wiki_write tool was not available to the WorkUnit');
}
const result = await wikiWrite.execute(
{
key: 'orders_context',
summary: 'Orders source context',
content: 'Orders are purchase records used for revenue analysis.',
tags: ['orders'],
},
{ toolCallId: 'wiki-write' },
);
if (!result.structured.success) {
throw new Error(result.markdown);
}
}
return { stopReason: 'natural' as const };
});
constructor() {
super({ llmProvider: { getModel: () => ({}) as never } as never });
}
}
function makeLookerRuntimeClient() {
const lookerModels = {
models: [{ name: 'ecommerce', label: 'Ecommerce', explores: [{ name: 'orders', label: 'Orders' }] }],
@ -252,6 +281,33 @@ describe('canonical local ingest', () => {
});
});
it('indexes wiki pages written by local ingest into the SQLite knowledge tables', async () => {
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const agentRunner = new WikiWritingAgentRunner();
const result = await runLocalIngest({
project,
adapters: [new FakeSourceAdapter()],
adapter: 'fake',
connectionId: 'warehouse',
sourceDir,
jobId: 'wiki-local-1',
agentRunner,
});
expect(result.result.failedWorkUnits).toEqual([]);
const db = new Database(join(project.projectDir, '.ktx', 'db.sqlite'), { readonly: true });
try {
expect(db.prepare('SELECT key, summary FROM knowledge_pages ORDER BY key').all()).toEqual([
{ key: 'orders_context', summary: 'Orders source context' },
]);
} finally {
db.close();
}
});
it('rejects direct Metabase scheduled pulls before requiring a local ingest LLM provider', async () => {
const projectDir = join(tempDir, 'metabase-project');
await initKtxProject({ projectDir, projectName: 'warehouse' });

View file

@ -56,6 +56,8 @@ import {
type KnowledgeIndexPort,
KnowledgeWikiService,
searchLocalKnowledgePages,
SqliteKnowledgeIndex,
type SqliteKnowledgeIndexPage,
WikiListTagsTool,
WikiReadTool,
WikiRemoveTool,
@ -257,6 +259,17 @@ function parseWiki(raw: string): { summary: string; content: string } {
};
}
function parseWikiTags(raw: string): string[] {
const match = raw.match(/^---\n([\s\S]*?)\n---\n?/);
if (!match) {
return [];
}
const frontmatter = (YAML.parse(match[1]) ?? {}) as Record<string, unknown>;
return Array.isArray(frontmatter.tags)
? frontmatter.tags.filter((tag): tag is string => typeof tag === 'string')
: [];
}
function scoreText(text: string, query: string): number {
const normalized = query.toLowerCase().trim();
if (!normalized) {
@ -271,21 +284,49 @@ function scoreText(text: string, query: string): number {
}
class LocalKnowledgeIndex implements KnowledgeIndexPort {
constructor(private readonly project: KtxLocalProject) {}
private readonly sqlite: SqliteKnowledgeIndex;
async upsertPage(): Promise<void> {}
async applyDiffTransactional(): Promise<void> {}
async getExistingSearchTexts(): Promise<Map<string, { searchText: string; hasEmbedding: boolean }>> {
return new Map();
constructor(private readonly project: KtxLocalProject) {
this.sqlite = new SqliteKnowledgeIndex({ dbPath: ktxLocalStateDbPath(project) });
}
async deleteStale(): Promise<void> {}
async upsertPage(): Promise<void> {
await this.syncAllPagesFromDisk();
}
async deleteByScope(): Promise<void> {}
async applyDiffTransactional(): Promise<void> {
await this.syncAllPagesFromDisk();
}
async deleteByKey(): Promise<void> {}
async getExistingSearchTexts(
scope: string,
scopeId: string | null,
): Promise<Map<string, { searchText: string; hasEmbedding: boolean }>> {
const prefix = scope === 'GLOBAL' ? 'knowledge/global/' : `knowledge/user/${scopeId}/`;
const result = new Map<string, { searchText: string; hasEmbedding: boolean }>();
for (const [path, page] of this.sqlite.getExistingPages()) {
if (!path.startsWith(prefix)) {
continue;
}
result.set(path.slice(prefix.length).replace(/\.md$/, ''), {
searchText: page.searchText,
hasEmbedding: page.embedding !== null,
});
}
return result;
}
async deleteStale(): Promise<void> {
await this.syncAllPagesFromDisk();
}
async deleteByScope(): Promise<void> {
await this.syncAllPagesFromDisk();
}
async deleteByKey(): Promise<void> {
await this.syncAllPagesFromDisk();
}
async findPageByKey(scope: string, scopeId: string | null, pageKey: string) {
const path = scope === 'GLOBAL' ? `knowledge/global/${pageKey}.md` : `knowledge/user/${scopeId}/${pageKey}.md`;
@ -344,6 +385,41 @@ class LocalKnowledgeIndex implements KnowledgeIndexPort {
.sort((left, right) => right.rrfScore - left.rrfScore || left.pageKey.localeCompare(right.pageKey))
.slice(0, limit);
}
private async syncAllPagesFromDisk(): Promise<void> {
const listed = await this.project.fileStore.listFiles('knowledge', true);
const pages: SqliteKnowledgeIndexPage[] = [];
for (const file of listed.files.filter((entry) => entry.endsWith('.md'))) {
const parsedPath = parseKnowledgeIndexPath(file);
if (!parsedPath) {
continue;
}
const path = `knowledge/${file}`;
const raw = await this.project.fileStore.readFile(path);
const parsed = parseWiki(raw.content);
pages.push({
path,
key: parsedPath.pageKey,
scope: parsedPath.scope,
summary: parsed.summary,
content: parsed.content,
tags: parseWikiTags(raw.content),
embedding: null,
});
}
this.sqlite.sync(pages);
}
}
function parseKnowledgeIndexPath(file: string): { scope: 'GLOBAL' | 'USER'; pageKey: string } | null {
const segments = file.split('/');
if (segments.length === 2 && segments[0] === 'global') {
return { scope: 'GLOBAL', pageKey: segments[1].replace(/\.md$/, '') };
}
if (segments.length === 3 && segments[0] === 'user') {
return { scope: 'USER', pageKey: segments[2].replace(/\.md$/, '') };
}
return null;
}
class NoopKnowledgeEventPort implements KnowledgeEventPort {

View file

@ -106,6 +106,21 @@ describe('Stage 3 — executeWorkUnit', () => {
expect(deps.resetHardTo).toHaveBeenCalledWith('pre');
});
it('tool failures reset to the pre-WU SHA and mark WU failed even when the loop ends naturally', async () => {
const deps = makeDeps();
deps.sessionWorktreeGit.revParseHead = vi.fn().mockResolvedValueOnce('pre').mockResolvedValueOnce('post');
deps.agentRunner.runLoop = vi.fn().mockResolvedValue({ stopReason: 'natural' });
deps.toolFailureCount = vi.fn().mockReturnValue(2);
const outcome = await executeWorkUnit(deps, makeWu());
expect(outcome.status).toBe('failed');
expect(outcome.reason).toContain('2 tool call(s) failed');
expect(outcome.actions).toEqual([]);
expect(outcome.touchedSlSources).toEqual([]);
expect(deps.resetHardTo).toHaveBeenCalledWith('pre');
});
it('runner loop thrown exception resets to the pre-WU SHA and marks WU failed', async () => {
const deps = makeDeps();
deps.sessionWorktreeGit.revParseHead = vi.fn().mockResolvedValueOnce('pre').mockResolvedValueOnce('post');

View file

@ -28,6 +28,7 @@ export interface WorkUnitExecutionDeps {
connectionId: string;
jobId: string;
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
toolFailureCount?: (unitKey: string) => number;
}
export interface WorkUnitOutcome {
@ -128,6 +129,11 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
return failWithReset(runResult.error?.message ?? 'agent loop errored');
}
const toolFailureCount = deps.toolFailureCount?.(wu.unitKey) ?? 0;
if (toolFailureCount > 0) {
return failWithReset(`${toolFailureCount} tool call(s) failed during WorkUnit ${wu.unitKey}`);
}
const touched = listTouchedSlSources(deps.captureSession.touchedSlSources);
if (touched.length > 0) {
const validation = await deps.validateTouchedSources(touched);