diff --git a/packages/cli/src/context-build-view.test.ts b/packages/cli/src/context-build-view.test.ts index 8e48a6f7..c6ef0eb1 100644 --- a/packages/cli/src/context-build-view.test.ts +++ b/packages/cli/src/context-build-view.test.ts @@ -99,11 +99,11 @@ describe('parseScanSummary', () => { describe('parseIngestSummary', () => { it('extracts work units and saved memory', () => { - expect(parseIngestSummary('Work units: 5\nSaved memory: 3 wiki, 2 SL')).toBe('5 items indexed · 3 wiki, 2 SL'); + expect(parseIngestSummary('Work units: 5\nSaved memory: 3 wiki, 2 SL')).toBe('3 wiki, 2 SL'); }); it('extracts work units alone when no saved memory', () => { - expect(parseIngestSummary('Work units: 5\nStatus: done')).toBe('5 items indexed'); + expect(parseIngestSummary('Work units: 5\nStatus: done')).toBe('5 work units'); }); it('extracts saved memory alone when no work units', () => { @@ -467,6 +467,41 @@ describe('runContextBuild', () => { { connectionId: 'dbt_main', status: 'done' }, ]); }); + + it('returns report IDs and artifact paths parsed from target output', async () => { + const io = makeIo(); + const project = projectWithConnections({ + warehouse: { driver: 'postgres' }, + dbt_main: { driver: 'dbt' }, + }); + const executeTarget = vi.fn(async (target, _args, targetIo) => { + if (target.operation === 'scan') { + targetIo.stdout.write('Report: raw-sources/warehouse/live-database/sync-1/scan-report.json\n'); + targetIo.stdout.write('Raw sources: raw-sources/warehouse/live-database/sync-1\n'); + } else { + targetIo.stdout.write('Report: report-dbt-1\n'); + targetIo.stdout.write('Saved memory: 2 wiki, 3 SL\n'); + } + return successResult(target.connectionId, target.driver, target.operation); + }); + + const result = await runContextBuild( + project, + { projectDir: '/tmp/project', inputMode: 'disabled' }, + io.io, + { executeTarget, now: () => 1000 }, + ); + + expect(result).toMatchObject({ + exitCode: 0, + detached: false, + reportIds: ['report-dbt-1'], + artifactPaths: [ + 'raw-sources/warehouse/live-database/sync-1/scan-report.json', + 'raw-sources/warehouse/live-database/sync-1', + ], + }); + }); }); describe('viewStateFromSourceProgress', () => { diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index 4c57784d..bb661655 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -44,6 +44,8 @@ export interface ContextBuildArgs { export interface ContextBuildResult { exitCode: number; detached: boolean; + reportIds?: string[]; + artifactPaths?: string[]; } export interface ContextBuildSourceProgressUpdate { @@ -237,12 +239,41 @@ export function parseScanSummary(output: string): string | null { } export function parseIngestSummary(output: string): string | null { - const parts: string[] = []; - const workUnits = output.match(/Work units: (\d+)/); - if (workUnits) parts.push(`${workUnits[1]} items indexed`); const savedMemory = output.match(/Saved memory: (.+)/); - if (savedMemory) parts.push(savedMemory[1]); - return parts.length > 0 ? parts.join(' · ') : null; + if (savedMemory) return savedMemory[1]; + const workUnits = output.match(/Work units: (\d+)/); + if (workUnits) return `${workUnits[1]} work units`; + return null; +} + +function collectOutputMetadata( + output: string, + operation: KtxPublicIngestPlanTarget['operation'], +): { reportIds: string[]; artifactPaths: string[] } { + const reportIds = new Set(); + const artifactPaths = new Set(); + for (const line of output.split(/\r?\n/)) { + const trimmed = line.trim(); + const reportLine = trimmed.match(/^Report:\s*(.+)$/); + if (reportLine) { + const value = reportLine[1].trim(); + if (value && value !== 'none') { + if (operation === 'scan') artifactPaths.add(value); + else reportIds.add(value); + } + } + const rawSourcesLine = trimmed.match(/^Raw sources:\s*(.+)$/); + if (rawSourcesLine) { + const value = rawSourcesLine[1].trim(); + if (value && value !== 'none') artifactPaths.add(value); + } + if (operation === 'source-ingest') { + for (const match of trimmed.matchAll(/\breport=([^\s]+)/g)) { + reportIds.add(match[1]); + } + } + } + return { reportIds: [...reportIds], artifactPaths: [...artifactPaths] }; } interface CapturedIo { @@ -428,6 +459,8 @@ export async function runContextBuild( const orderedTargets = [...state.primarySources, ...state.contextSources]; const execTarget = deps.executeTarget ?? executePublicIngestTarget; + const reportIds = new Set(); + const artifactPaths = new Set(); let detached = false; let cleanupKeystroke: (() => void) | null = null; @@ -492,10 +525,14 @@ export async function runContextBuild( targetState.status = failed ? 'failed' : 'done'; targetState.detailLine = null; if (!failed) { + const capturedOutput = capture.captured(); + const metadata = collectOutputMetadata(capturedOutput, targetState.target.operation); + for (const reportId of metadata.reportIds) reportIds.add(reportId); + for (const artifactPath of metadata.artifactPaths) artifactPaths.add(artifactPath); targetState.summaryText = targetState.target.operation === 'scan' - ? parseScanSummary(capture.captured()) - : parseIngestSummary(capture.captured()); + ? parseScanSummary(capturedOutput) + : parseIngestSummary(capturedOutput); } if (failed) hasFailure = true; @@ -521,5 +558,10 @@ export async function runContextBuild( paint(false); } - return { exitCode: hasFailure ? 1 : 0, detached: false }; + return { + exitCode: hasFailure ? 1 : 0, + detached: false, + ...(reportIds.size > 0 ? { reportIds: [...reportIds] } : {}), + ...(artifactPaths.size > 0 ? { artifactPaths: [...artifactPaths] } : {}), + }; } diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 5c536f0f..5a18938b 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -222,6 +222,39 @@ function completedLocalBundleRun(input: RunLocalIngestOptions, jobId: string): L }; } +function failedLocalBundleRun(input: RunLocalIngestOptions, jobId: string): LocalIngestResult { + const failedWorkUnit = { + ...bundleReportSnapshot().body.workUnits[0], + status: 'failed' as const, + reason: 'writer tool failed', + actions: [], + touchedSlSources: [], + }; + const nextReport = localFakeBundleReport(jobId, { + id: 'report-failed-1', + runId: 'run-failed-1', + connectionId: input.connectionId, + sourceKey: input.adapter, + body: { + workUnits: [failedWorkUnit], + failedWorkUnits: [failedWorkUnit.unitKey], + }, + }); + return { + result: { + jobId, + runId: nextReport.runId, + syncId: nextReport.body.syncId, + diffSummary: nextReport.body.diffSummary, + workUnitCount: nextReport.body.workUnits.length, + failedWorkUnits: nextReport.body.failedWorkUnits, + artifactsWritten: nextReport.body.provenanceRows.length, + commitSha: nextReport.body.commitSha, + }, + report: nextReport, + }; +} + class CliLookerSlWritingAgentRunner extends AgentRunnerService { override runLoop = vi.fn(async (params: RunLoopParams) => { if ( @@ -621,7 +654,10 @@ function makeCliLookerParser() { }; } -function localFakeBundleReport(jobId: string, overrides: Partial = {}): IngestReportSnapshot { +function localFakeBundleReport( + jobId: string, + overrides: Partial> & { body?: Partial } = {}, +): IngestReportSnapshot { const report = bundleReportSnapshot(); return { ...report, @@ -826,6 +862,77 @@ describe('runKtxIngest', () => { expect(io.stderr()).toBe(''); }); + it('returns a non-zero code when Metabase fan-out has failed children', async () => { + const projectDir = join(tempDir, 'project'); + await initKtxProject({ projectDir, projectName: 'warehouse' }); + await writeMetabaseConfig(projectDir); + const io = makeIo(); + const report = localFakeBundleReport('metabase-child-1', { + id: 'report-metabase-child-1', + runId: 'run-a', + jobId: 'metabase-child-1', + connectionId: 'warehouse_a', + sourceKey: 'metabase', + body: { + failedWorkUnits: ['metabase-db-1'], + workUnits: [ + { + unitKey: 'metabase-db-1', + rawFiles: ['cards/1.json'], + status: 'failed', + reason: 'tool write failed', + actions: [], + touchedSlSources: [], + }, + ], + }, + }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'prod-metabase', + adapter: 'metabase', + outputMode: 'plain', + }, + io.io, + { + runLocalMetabaseIngest: async () => ({ + metabaseConnectionId: 'prod-metabase', + status: 'partial_failure', + totals: { workUnits: 1, failedWorkUnits: 1 }, + children: [ + { + jobId: 'metabase-child-1', + metabaseConnectionId: 'prod-metabase', + metabaseDatabaseId: 1, + targetConnectionId: 'warehouse_a', + result: { + jobId: 'metabase-child-1', + runId: 'run-a', + syncId: 'sync-a', + diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 }, + workUnitCount: 1, + failedWorkUnits: ['metabase-db-1'], + artifactsWritten: 0, + commitSha: null, + }, + report, + }, + ], + }), + }, + ), + ).resolves.toBe(1); + + expect(io.stdout()).toContain('Metabase fan-out: partial_failure'); + expect(io.stdout()).toContain('Failed work units: 1'); + expect(io.stdout()).toContain('status=error'); + expect(io.stderr()).toBe(''); + }); + it('prints Metabase fan-out progress before the final summary', async () => { const projectDir = join(tempDir, 'project'); await initKtxProject({ projectDir, projectName: 'warehouse' }); @@ -1143,6 +1250,38 @@ describe('runKtxIngest', () => { expect(io.stdout()).toContain('Diff: +2/~0/-0/=0\n'); }); + it('returns a non-zero code when local ingest reports failed work units', async () => { + const projectDir = join(tempDir, 'project'); + await initKtxProject({ projectDir, projectName: 'warehouse' }); + await writeWarehouseConfig(projectDir); + const sourceDir = join(tempDir, 'source'); + await mkdir(join(sourceDir, 'orders'), { recursive: true }); + await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8'); + const runLocal = vi.fn(async (input: RunLocalIngestOptions) => failedLocalBundleRun(input, 'local-job-failed')); + + const io = makeIo(); + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'warehouse', + adapter: 'fake', + sourceDir, + outputMode: 'plain', + }, + io.io, + { + runLocalIngest: runLocal, + jobIdFactory: () => 'local-job-failed', + }, + ), + ).resolves.toBe(1); + + expect(io.stderr()).toBe(''); + expect(io.stdout()).toContain('Status: error\n'); + }); + it('passes the debug LLM request file to local ingest runs', async () => { const projectDir = join(tempDir, 'project'); await initKtxProject({ projectDir, projectName: 'warehouse' }); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index d6748991..2e33372c 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -111,6 +111,16 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void } function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIngestIo): void { + const counts = result.children.reduce( + (acc, child) => { + const childCounts = reportActionCounts(child.report); + return { + wikiCount: acc.wikiCount + childCounts.wikiCount, + slCount: acc.slCount + childCounts.slCount, + }; + }, + { wikiCount: 0, slCount: 0 }, + ); io.stdout.write(`Metabase fan-out: ${result.status}\n`); io.stdout.write(`Source: ${result.metabaseConnectionId}\n`); io.stdout.write(`Children: ${result.children.length}\n`); @@ -118,10 +128,11 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng io.stdout.write(`Work units: ${result.totals.workUnits}\n`); io.stdout.write(`Failed work units: ${result.totals.failedWorkUnits}\n`); } + io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`); for (const child of result.children) { const status = reportStatus(child.report); io.stdout.write( - `- target=${child.targetConnectionId} database=${child.metabaseDatabaseId} status=${status} job=${child.jobId}\n`, + `- target=${child.targetConnectionId} database=${child.metabaseDatabaseId} status=${status} job=${child.jobId} report=${child.report.id}\n`, ); } } @@ -326,7 +337,7 @@ export async function runKtxIngest( } else { writeMetabaseFanoutStatus(result, io); } - return 0; + return result.status === 'all_succeeded' ? 0 : 1; } const jobId = deps.jobIdFactory?.(); @@ -377,14 +388,14 @@ export async function runKtxIngest( liveTui?.close(); liveTui = null; io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot)); - return 0; + return reportStatus(result.report) === 'done' ? 0 : 1; } await writeReportRecord(result.report, runOutputMode, io, { interactive: (args.inputMode ?? 'auto') === 'auto', renderStoredMemoryFlow: deps.renderStoredMemoryFlow, env, }); - return 0; + return reportStatus(result.report) === 'done' ? 0 : 1; } finally { liveTui?.close(); } diff --git a/packages/cli/src/setup-agents.test.ts b/packages/cli/src/setup-agents.test.ts index 739c9912..e41fc8a5 100644 --- a/packages/cli/src/setup-agents.test.ts +++ b/packages/cli/src/setup-agents.test.ts @@ -3,6 +3,7 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { + formatInstallSummary, plannedKtxAgentFiles, readKtxAgentInstallManifest, removeKtxAgentInstall, @@ -37,11 +38,13 @@ describe('setup agents', () => { it('plans project-scoped CLI and MCP files for every target', () => { expect(plannedKtxAgentFiles({ projectDir: tempDir, target: 'claude-code', scope: 'project', mode: 'both' })).toEqual([ - { kind: 'file', path: join(tempDir, '.claude/skills/ktx/SKILL.md') }, + { kind: 'file', path: join(tempDir, '.claude/skills/ktx/SKILL.md'), role: 'skill' }, + { kind: 'file', path: join(tempDir, '.claude/rules/ktx.md'), role: 'rule' }, { kind: 'json-key', path: join(tempDir, '.mcp.json'), jsonPath: ['mcpServers', 'ktx'] }, ]); expect(plannedKtxAgentFiles({ projectDir: tempDir, target: 'codex', scope: 'project', mode: 'cli' })).toEqual([ - { kind: 'file', path: join(tempDir, '.agents/skills/ktx/SKILL.md') }, + { kind: 'file', path: join(tempDir, '.agents/skills/ktx/SKILL.md'), role: 'skill' }, + { kind: 'file', path: join(tempDir, '.codex/instructions/ktx.md'), role: 'rule' }, ]); expect(plannedKtxAgentFiles({ projectDir: tempDir, target: 'cursor', scope: 'project', mode: 'mcp' })).toEqual([ { kind: 'json-key', path: join(tempDir, '.cursor/mcp.json'), jsonPath: ['mcpServers', 'ktx'] }, @@ -113,6 +116,7 @@ describe('setup agents', () => { await expect(removeKtxAgentInstall(tempDir, io.io)).resolves.toBe(0); await expect(stat(join(tempDir, '.claude/skills/ktx/SKILL.md'))).rejects.toThrow(); + await expect(stat(join(tempDir, '.claude/rules/ktx.md'))).rejects.toThrow(); await expect(stat(join(tempDir, '.claude/skills/ktx/keep.txt'))).resolves.toBeDefined(); await expect(readKtxAgentInstallManifest(tempDir)).resolves.toEqual(null); }); @@ -173,4 +177,71 @@ describe('setup agents', () => { }), ); }); + + it('prints per-agent install summary after successful installation', async () => { + const io = makeIo(); + + await runKtxSetupAgentsStep( + { + projectDir: tempDir, + inputMode: 'disabled', + yes: true, + agents: true, + target: 'claude-code', + scope: 'project', + mode: 'both', + skipAgents: false, + }, + io.io, + ); + + const output = io.stdout(); + expect(output).toContain('Agent integration complete'); + expect(output).toContain('Claude Code'); + expect(output).toContain('+ Skill installed'); + expect(output).toContain('.claude/skills/ktx/SKILL.md'); + expect(output).toContain('+ Rule installed'); + expect(output).toContain('.claude/rules/ktx.md'); + expect(output).toContain('+ MCP config added'); + expect(output).toContain('.mcp.json'); + }); + + it('formats summary with relative paths for project scope', () => { + const summary = formatInstallSummary( + [{ target: 'cursor', scope: 'project', mode: 'both' }], + [ + { kind: 'file', path: join(tempDir, '.cursor/rules/ktx.mdc') }, + { kind: 'json-key', path: join(tempDir, '.cursor/mcp.json'), jsonPath: ['mcpServers', 'ktx'] }, + ], + tempDir, + ); + + expect(summary).toContain('Cursor'); + expect(summary).toContain('+ Rule installed'); + expect(summary).toContain('.cursor/rules/ktx.mdc'); + expect(summary).toContain('+ MCP config added'); + expect(summary).toContain('.cursor/mcp.json'); + expect(summary).not.toContain(tempDir); + }); + + it('formats summary with multiple agent targets', () => { + const summary = formatInstallSummary( + [ + { target: 'claude-code', scope: 'project', mode: 'cli' }, + { target: 'codex', scope: 'project', mode: 'mcp' }, + ], + [ + { kind: 'file', path: join(tempDir, '.claude/skills/ktx/SKILL.md'), role: 'skill' }, + { kind: 'file', path: join(tempDir, '.claude/rules/ktx.md'), role: 'rule' }, + { kind: 'json-key', path: join(tempDir, '.agents/mcp/ktx.json'), jsonPath: ['mcpServers', 'ktx'] }, + ], + tempDir, + ); + + expect(summary).toContain('Claude Code'); + expect(summary).toContain('+ Skill installed'); + expect(summary).toContain('+ Rule installed'); + expect(summary).toContain('Codex'); + expect(summary).toContain('+ MCP config added'); + }); }); diff --git a/packages/cli/src/setup-agents.ts b/packages/cli/src/setup-agents.ts index 303f5844..55bb9a76 100644 --- a/packages/cli/src/setup-agents.ts +++ b/packages/cli/src/setup-agents.ts @@ -1,5 +1,5 @@ import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'; -import { dirname, join, resolve } from 'node:path'; +import { dirname, join, relative, resolve } from 'node:path'; import { cancel, isCancel, multiselect, select } from '@clack/prompts'; import { loadKtxProject, markKtxSetupStepComplete, serializeKtxProjectConfig } from '@ktx/context/project'; import type { KtxCliIo } from './cli-runtime.js'; @@ -37,7 +37,10 @@ export interface KtxAgentInstallManifest { projectDir: string; installedAt: string; installs: Array<{ target: KtxAgentTarget; scope: KtxAgentScope; mode: KtxAgentInstallMode }>; - entries: Array<{ kind: 'file'; path: string } | { kind: 'json-key'; path: string; jsonPath: string[] }>; + entries: Array< + | { kind: 'file'; path: string; role?: 'skill' | 'rule' } + | { kind: 'json-key'; path: string; jsonPath: string[] } + >; } type InstallEntry = KtxAgentInstallManifest['entries'][number]; @@ -54,11 +57,17 @@ export function plannedKtxAgentFiles(input: { }): InstallEntry[] { if (input.scope === 'global') { if (input.target === 'claude-code') { - return [{ kind: 'file', path: join(process.env.HOME ?? '', '.claude/skills/ktx/SKILL.md') }]; + const home = process.env.HOME ?? ''; + return [ + { kind: 'file', path: join(home, '.claude/skills/ktx/SKILL.md'), role: 'skill' as const }, + { kind: 'file', path: join(home, '.claude/rules/ktx.md'), role: 'rule' as const }, + ]; } if (input.target === 'codex') { + const codexHome = process.env.CODEX_HOME ?? join(process.env.HOME ?? '', '.codex'); return [ - { kind: 'file', path: join(process.env.CODEX_HOME ?? join(process.env.HOME ?? '', '.codex'), 'skills/ktx/SKILL.md') }, + { kind: 'file', path: join(codexHome, 'skills/ktx/SKILL.md'), role: 'skill' as const }, + { kind: 'file', path: join(codexHome, 'instructions/ktx.md'), role: 'rule' as const }, ]; } throw new Error(`Global ${input.target} installation is not supported; use --project.`); @@ -66,12 +75,16 @@ export function plannedKtxAgentFiles(input: { const root = resolve(input.projectDir); const cliEntries: Partial> = { - 'claude-code': { kind: 'file', path: join(root, '.claude/skills/ktx/SKILL.md') }, - codex: { kind: 'file', path: join(root, '.agents/skills/ktx/SKILL.md') }, + 'claude-code': { kind: 'file', path: join(root, '.claude/skills/ktx/SKILL.md'), role: 'skill' }, + codex: { kind: 'file', path: join(root, '.agents/skills/ktx/SKILL.md'), role: 'skill' }, cursor: { kind: 'file', path: join(root, '.cursor/rules/ktx.mdc') }, opencode: { kind: 'file', path: join(root, '.opencode/commands/ktx.md') }, universal: { kind: 'file', path: join(root, '.agents/skills/ktx/SKILL.md') }, }; + const ruleEntries: Partial> = { + 'claude-code': { kind: 'file', path: join(root, '.claude/rules/ktx.md'), role: 'rule' }, + codex: { kind: 'file', path: join(root, '.codex/instructions/ktx.md'), role: 'rule' }, + }; const mcpEntries: Record = { 'claude-code': { kind: 'json-key', path: join(root, '.mcp.json'), jsonPath: ['mcpServers', 'ktx'] }, codex: { kind: 'json-key', path: join(root, '.agents/mcp/ktx.json'), jsonPath: ['mcpServers', 'ktx'] }, @@ -80,7 +93,7 @@ export function plannedKtxAgentFiles(input: { universal: { kind: 'json-key', path: join(root, '.agents/mcp/ktx.json'), jsonPath: ['mcpServers', 'ktx'] }, }; return [ - ...(input.mode === 'cli' || input.mode === 'both' ? [cliEntries[input.target]] : []), + ...(input.mode === 'cli' || input.mode === 'both' ? [cliEntries[input.target], ruleEntries[input.target]] : []), ...(input.mode === 'mcp' || input.mode === 'both' ? [mcpEntries[input.target]] : []), ].filter((entry): entry is InstallEntry => entry !== undefined); } @@ -113,6 +126,17 @@ function cliInstructionContent(input: { projectDir: string; target: KtxAgentTarg ].join('\n'); } +function ruleInstructionContent(input: { projectDir: string }): string { + return [ + `Use the \`ktx\` CLI to query local semantic context, wiki knowledge, and execute safe SQL for this project (\`--project-dir ${input.projectDir}\`).`, + '', + 'Use when the user asks about data schemas, metrics, dimensions, database structure, or wants to run SQL queries.', + '', + 'Do not use for general programming, code review, or tasks unrelated to data and analytics.', + '', + ].join('\n'); +} + function mcpConfig(projectDir: string): Record { return { command: 'ktx', @@ -245,6 +269,55 @@ function createPromptAdapter(): KtxSetupAgentsPromptAdapter { }; } +const targetDisplayNames: Record = { + 'claude-code': 'Claude Code', + codex: 'Codex', + cursor: 'Cursor', + opencode: 'OpenCode', + universal: 'Universal .agents', +}; + +const fileEntryLabels: Record = { + 'claude-code': 'Skill installed', + codex: 'Skill installed', + cursor: 'Rule installed', + opencode: 'Command installed', + universal: 'Skill installed', +}; + +export function formatInstallSummary( + installs: Array<{ target: KtxAgentTarget; scope: KtxAgentScope; mode: KtxAgentInstallMode }>, + entries: InstallEntry[], + projectDir: string, +): string { + const entriesByTarget = new Map(); + let idx = 0; + for (const install of installs) { + const planned = plannedKtxAgentFiles({ projectDir, ...install }); + entriesByTarget.set(install.target, entries.slice(idx, idx + planned.length)); + idx += planned.length; + } + + const lines: string[] = []; + for (const install of installs) { + const targetEntries = entriesByTarget.get(install.target) ?? []; + lines.push(` ${targetDisplayNames[install.target]}`); + for (const entry of targetEntries) { + const displayPath = + install.scope === 'global' ? entry.path : relative(projectDir, entry.path); + if (entry.kind === 'file') { + const label = entry.role === 'rule' ? 'Rule installed' : fileEntryLabels[install.target]; + lines.push(` + ${label}`); + lines.push(` ${displayPath}`); + } else { + lines.push(` + MCP config added`); + lines.push(` ${displayPath}`); + } + } + } + return lines.join('\n'); +} + async function installTarget(input: { projectDir: string; target: KtxAgentTarget; @@ -254,8 +327,12 @@ async function installTarget(input: { const entries = plannedKtxAgentFiles(input); for (const entry of entries) { if (entry.kind === 'file') { + const content = + entry.role === 'rule' + ? ruleInstructionContent({ projectDir: input.projectDir }) + : cliInstructionContent({ projectDir: input.projectDir, target: input.target }); await mkdir(dirname(entry.path), { recursive: true }); - await writeFile(entry.path, cliInstructionContent({ projectDir: input.projectDir, target: input.target }), 'utf-8'); + await writeFile(entry.path, content, 'utf-8'); } else { await writeJsonKey(entry.path, entry.jsonPath, mcpConfig(input.projectDir)); } @@ -311,7 +388,6 @@ export async function runKtxSetupAgentsStep( { value: 'cursor', label: 'Cursor' }, { value: 'opencode', label: 'OpenCode' }, { value: 'universal', label: 'Universal .agents' }, - { value: 'back', label: 'Back' }, ], required: true, })) as KtxAgentTarget[]); @@ -327,7 +403,7 @@ export async function runKtxSetupAgentsStep( for (const install of installs) entries.push(...(await installTarget({ projectDir: args.projectDir, ...install }))); await writeManifest(args.projectDir, mergeManifest(args.projectDir, await readKtxAgentInstallManifest(args.projectDir), installs, entries)); await markAgentsComplete(args.projectDir); - io.stdout.write(`Agent integration installed for ${installs.map((install) => install.target).join(', ')}.\n`); + io.stdout.write(`\nAgent integration complete\n\n${formatInstallSummary(installs, entries, args.projectDir)}\n`); return { status: 'ready', projectDir: args.projectDir, installs }; } catch (error) { io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); diff --git a/packages/cli/src/setup-context.test.ts b/packages/cli/src/setup-context.test.ts index d19be04c..0d803b7b 100644 --- a/packages/cli/src/setup-context.test.ts +++ b/packages/cli/src/setup-context.test.ts @@ -166,7 +166,12 @@ describe('setup context build state', () => { it('runs setup context build, verifies readiness, and marks context complete', async () => { await writeReadyProject(tempDir); const io = makeIo(); - const runContextBuildMock = vi.fn(async () => ({ exitCode: 0, detached: false })); + const runContextBuildMock = vi.fn(async () => ({ + exitCode: 0, + detached: false, + reportIds: ['report-docs-1'], + artifactPaths: ['raw-sources/warehouse/live-database/sync-1/scan-report.json'], + })); const verifyContextReady = vi.fn(async () => ({ ready: true, agentContextReady: true, @@ -204,6 +209,8 @@ describe('setup context build state', () => { runId: 'setup-context-local-abc123', status: 'completed', completedAt: '2026-05-09T10:00:00.000Z', + reportIds: ['report-docs-1'], + artifactPaths: ['raw-sources/warehouse/live-database/sync-1/scan-report.json'], }); expect(io.stdout()).toContain('KTX context is ready for agents.'); }); diff --git a/packages/cli/src/setup-context.ts b/packages/cli/src/setup-context.ts index 79f6cdd7..f88635f4 100644 --- a/packages/cli/src/setup-context.ts +++ b/packages/cli/src/setup-context.ts @@ -592,12 +592,16 @@ async function runBuild( }, }, ); + const completedReportIds = buildResult.reportIds ?? []; + const completedArtifactPaths = buildResult.artifactPaths ?? []; if (buildResult.detached) { const updatedAt = now().toISOString(); await writeKtxSetupContextState(args.projectDir, { ...runningState, status: 'detached', updatedAt, + reportIds: completedReportIds, + artifactPaths: completedArtifactPaths, ...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}), }); return { status: 'detached', projectDir: args.projectDir, runId }; @@ -608,6 +612,8 @@ async function runBuild( ...runningState, status: 'failed', updatedAt, + reportIds: completedReportIds, + artifactPaths: completedArtifactPaths, retryableFailedTargets: [...targets.primarySourceConnectionIds, ...targets.contextSourceConnectionIds], failureReason: 'Context build failed.', ...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}), @@ -622,6 +628,8 @@ async function runBuild( ...runningState, status: 'failed', updatedAt, + reportIds: completedReportIds, + artifactPaths: completedArtifactPaths, retryableFailedTargets: readiness.failedTargets ?? [], failureReason: readiness.details.join(' '), ...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}), @@ -640,6 +648,8 @@ async function runBuild( status: 'completed', updatedAt: completedAt, completedAt, + reportIds: completedReportIds, + artifactPaths: completedArtifactPaths, retryableFailedTargets: [], ...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}), }); diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index 41b12f95..3f268ce8 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -962,10 +962,95 @@ describe('setup databases step', () => { }); }); + it('prompts for discovered Postgres schemas before the first scan', async () => { + const io = makeIo(); + const prompts = makePromptAdapter({ + selectValues: ['url'], + textValues: ['', 'env:DATABASE_URL'], + multiselectValues: [['orbit_analytics', 'orbit_raw']], + }); + const testConnection = vi.fn(async () => 0); + const scanConnection = vi.fn(async asyncScanProjectDir => { + const config = parseKtxProjectConfig(await readFile(join(asyncScanProjectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['postgres-warehouse']).toMatchObject({ + schemas: ['orbit_analytics', 'orbit_raw'], + }); + return 0; + }); + const listSchemas = vi.fn(async () => ['orbit_analytics', 'orbit_raw', 'public']); + + const result = await runKtxSetupDatabasesStep( + { + projectDir: tempDir, + inputMode: 'auto', + databaseDrivers: ['postgres'], + databaseSchemas: [], + skipDatabases: false, + }, + io.io, + { prompts, testConnection, scanConnection, listSchemas }, + ); + + expect(result.status).toBe('ready'); + expect(listSchemas).toHaveBeenCalledWith(tempDir, 'postgres-warehouse'); + expect(prompts.multiselect).toHaveBeenCalledWith({ + message: expect.stringContaining('PostgreSQL schemas to scan'), + options: [ + { value: 'orbit_analytics', label: 'orbit_analytics' }, + { value: 'orbit_raw', label: 'orbit_raw' }, + { value: 'public', label: 'public' }, + ], + initialValues: ['orbit_analytics', 'orbit_raw'], + required: true, + }); + const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections['postgres-warehouse']).toMatchObject({ + schemas: ['orbit_analytics', 'orbit_raw'], + }); + expect(io.stdout()).toContain('Schemas: orbit_analytics, orbit_raw'); + }); + + it('auto-selects all discovered Postgres schemas in non-interactive setup', async () => { + const io = makeIo(); + const prompts = makePromptAdapter({}); + const testConnection = vi.fn(async () => 0); + const scanConnection = vi.fn(async asyncScanProjectDir => { + const config = parseKtxProjectConfig(await readFile(join(asyncScanProjectDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections.warehouse).toMatchObject({ + schemas: ['orbit_analytics', 'orbit_raw', 'public'], + }); + return 0; + }); + const listSchemas = vi.fn(async () => ['orbit_analytics', 'orbit_raw', 'public']); + + const result = await runKtxSetupDatabasesStep( + { + projectDir: tempDir, + inputMode: 'disabled', + databaseDrivers: ['postgres'], + databaseConnectionId: 'warehouse', + databaseUrl: 'env:DATABASE_URL', + databaseSchemas: [], + skipDatabases: false, + }, + io.io, + { prompts, testConnection, scanConnection, listSchemas }, + ); + + expect(result.status).toBe('ready'); + expect(prompts.multiselect).not.toHaveBeenCalled(); + const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections.warehouse).toMatchObject({ + schemas: ['orbit_analytics', 'orbit_raw', 'public'], + }); + expect(io.stdout()).toContain('Schemas: orbit_analytics, orbit_raw, public'); + }); + it('adds one non-interactive Postgres URL connection, tests it, scans it, and marks databases complete', async () => { const io = makeIo(); const testConnection = vi.fn(async () => 0); const scanConnection = vi.fn(async () => 0); + const listSchemas = vi.fn(async () => ['orbit_analytics', 'orbit_raw', 'public']); const result = await runKtxSetupDatabasesStep( { @@ -978,10 +1063,11 @@ describe('setup databases step', () => { skipDatabases: false, }, io.io, - { testConnection, scanConnection }, + { testConnection, scanConnection, listSchemas }, ); expect(result.status).toBe('ready'); + expect(listSchemas).not.toHaveBeenCalled(); expect(testConnection).toHaveBeenCalledWith(tempDir, 'warehouse', expect.anything()); expect(scanConnection).toHaveBeenCalledWith(tempDir, 'warehouse', expect.anything()); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index e22f4741..1838725d 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -52,6 +52,7 @@ export interface KtxSetupDatabasesPromptAdapter { message: string; options: Array<{ value: string; label: string }>; required?: boolean; + initialValues?: string[]; }): Promise; select(options: { message: string; options: Array<{ value: string; label: string }> }): Promise; text(options: { message: string; placeholder?: string; initialValue?: string }): Promise; @@ -76,6 +77,7 @@ export interface KtxSetupDatabasesDeps { prompts?: KtxSetupDatabasesPromptAdapter; testConnection?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise; scanConnection?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise; + listSchemas?: (projectDir: string, connectionId: string) => Promise; historicSqlProbe?: KtxSetupHistoricSqlProbe; } @@ -255,6 +257,21 @@ async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Pr } } +async function defaultListSchemas(projectDir: string, connectionId: string): Promise { + const project = await loadKtxProject({ projectDir }); + const connection = project.config.connections[connectionId]; + const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('@ktx/connector-postgres'); + if (!isKtxPostgresConnectionConfig(connection)) { + return []; + } + const connector = new KtxPostgresScanConnector({ connectionId, connection }); + try { + return await connector.listSchemas(); + } finally { + await connector.cleanup(); + } +} + function existingConnectionIdsByDriver( connections: Record, driver: KtxSetupDatabaseDriver, @@ -814,6 +831,113 @@ async function writeConnectionConfig(input: { } } +function configuredSchemas(connection: KtxProjectConnectionConfig | undefined): string[] { + if (!connection) return []; + if (Array.isArray(connection.schemas)) { + return connection.schemas + .filter((schema): schema is string => typeof schema === 'string' && schema.trim().length > 0) + .map((schema) => schema.trim()); + } + return typeof connection.schema === 'string' && connection.schema.trim().length > 0 ? [connection.schema.trim()] : []; +} + +function defaultSchemaSelection(schemas: string[]): string[] { + const nonPublic = schemas.filter((schema) => schema !== 'public'); + return nonPublic.length > 0 ? nonPublic : schemas; +} + +async function writeConnectionSchemas(input: { + projectDir: string; + connectionId: string; + schemas: string[]; +}): Promise { + const project = await loadKtxProject({ projectDir: input.projectDir }); + const connection = project.config.connections[input.connectionId]; + if (!connection) return; + const { schema: _schema, ...connectionWithoutLegacySchema } = connection; + await writeConnectionConfig({ + projectDir: input.projectDir, + connectionId: input.connectionId, + connection: { + ...connectionWithoutLegacySchema, + schemas: unique(input.schemas), + }, + }); +} + +async function maybeConfigurePostgresSchemas(input: { + projectDir: string; + connectionId: string; + args: KtxSetupDatabasesArgs; + prompts: KtxSetupDatabasesPromptAdapter; + deps: KtxSetupDatabasesDeps; + io: KtxCliIo; +}): Promise { + const project = await loadKtxProject({ projectDir: input.projectDir }); + const connection = project.config.connections[input.connectionId]; + if (normalizeDriver(connection?.driver) !== 'postgres') { + return true; + } + + if (configuredSchemas(connection).length > 0) { + return true; + } + + if (input.args.databaseSchemas.length > 0) { + await writeConnectionSchemas({ + projectDir: input.projectDir, + connectionId: input.connectionId, + schemas: input.args.databaseSchemas, + }); + return true; + } + + let discoveredSchemas: string[]; + try { + discoveredSchemas = unique( + await (input.deps.listSchemas ?? defaultListSchemas)(input.projectDir, input.connectionId), + ); + } catch (error) { + input.io.stderr.write( + `Could not discover PostgreSQL schemas for ${input.connectionId}; continuing with existing schema scope. ` + + `Pass --database-schema to set it explicitly. ${error instanceof Error ? error.message : String(error)}\n`, + ); + return true; + } + if (discoveredSchemas.length === 0) { + return true; + } + + let selectedSchemas: string[]; + if (input.args.inputMode === 'disabled' || discoveredSchemas.length === 1) { + selectedSchemas = discoveredSchemas; + } else { + const initialValues = defaultSchemaSelection(discoveredSchemas); + const choices = await input.prompts.multiselect({ + message: withMultiselectNavigation( + 'PostgreSQL schemas to scan\nKTX found multiple non-system schemas. Select every schema agents should use.', + ), + options: discoveredSchemas.map((schema) => ({ value: schema, label: schema })), + initialValues, + required: true, + }); + if (choices.includes('back')) { + return false; + } + selectedSchemas = choices.length > 0 ? choices : initialValues; + } + + await writeConnectionSchemas({ + projectDir: input.projectDir, + connectionId: input.connectionId, + schemas: selectedSchemas, + }); + writeSetupSection(input.io, `Selecting schemas for ${input.connectionId}`, [ + `Schemas: ${selectedSchemas.join(', ')}`, + ]); + return true; +} + async function ensureHistoricSqlAdapterEnabled(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); if (project.config.ingest.adapters.includes('historic-sql')) { @@ -902,6 +1026,8 @@ async function validateAndScanConnection(input: { connectionId: string; io: KtxCliIo; deps: KtxSetupDatabasesDeps; + args: KtxSetupDatabasesArgs; + prompts: KtxSetupDatabasesPromptAdapter; }): Promise { const testConnection = input.deps.testConnection ?? defaultTestConnection; const scanConnection = input.deps.scanConnection ?? defaultScanConnection; @@ -923,6 +1049,10 @@ async function validateAndScanConnection(input: { testLines.push(`Driver: ${driverDisplay}${Number.isFinite(tableCount) ? ` · Tables: ${tableCount}` : ''}`); writeSetupSection(input.io, `Testing ${input.connectionId}`, testLines); + if (!(await maybeConfigurePostgresSchemas(input))) { + return false; + } + await maybeRunHistoricSqlSetupProbe({ projectDir: input.projectDir, connectionId: input.connectionId, @@ -1069,7 +1199,7 @@ export async function runKtxSetupDatabasesStep( prompts, }); if (historicSqlResult === 'back') return { status: 'back', projectDir: args.projectDir }; - if (!(await validateAndScanConnection({ projectDir: args.projectDir, connectionId, io, deps }))) { + if (!(await validateAndScanConnection({ projectDir: args.projectDir, connectionId, io, deps, args, prompts }))) { return { status: 'failed', projectDir: args.projectDir }; } selectedConnectionIds.push(connectionId); @@ -1209,6 +1339,8 @@ export async function runKtxSetupDatabasesStep( connectionId: connectionChoice.connectionId, io, deps, + args, + prompts, })) ) { if (args.inputMode === 'disabled') return { status: 'failed', projectDir: args.projectDir }; diff --git a/packages/cli/src/setup-ready-menu.test.ts b/packages/cli/src/setup-ready-menu.test.ts index 1e64488e..643d8b3d 100644 --- a/packages/cli/src/setup-ready-menu.test.ts +++ b/packages/cli/src/setup-ready-menu.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from 'vitest'; -import { isKtxSetupReady, runKtxSetupReadyChangeMenu } from './setup-ready-menu.js'; +import { isKtxPreAgentSetupReady, isKtxSetupReady, runKtxSetupReadyChangeMenu } from './setup-ready-menu.js'; import type { KtxSetupStatus } from './setup.js'; const readyStatus: KtxSetupStatus = { @@ -20,6 +20,13 @@ describe('setup ready menu', () => { expect(isKtxSetupReady({ ...readyStatus, agents: [] })).toBe(false); }); + it('recognizes pre-agent readiness without requiring agents', () => { + expect(isKtxPreAgentSetupReady(readyStatus)).toBe(true); + expect(isKtxPreAgentSetupReady({ ...readyStatus, agents: [] })).toBe(true); + expect(isKtxPreAgentSetupReady({ ...readyStatus, embeddings: { ready: false } })).toBe(false); + expect(isKtxPreAgentSetupReady({ ...readyStatus, context: { ready: false, status: 'not_started' } })).toBe(false); + }); + it('maps ready-project menu choices to setup sections', async () => { const prompts = { select: vi.fn(async () => 'agents'), cancel: vi.fn() }; diff --git a/packages/cli/src/setup-ready-menu.ts b/packages/cli/src/setup-ready-menu.ts index 675655f2..a101e45a 100644 --- a/packages/cli/src/setup-ready-menu.ts +++ b/packages/cli/src/setup-ready-menu.ts @@ -14,18 +14,21 @@ export interface KtxSetupReadyMenuDeps { prompts?: KtxSetupReadyMenuPromptAdapter; } -export function isKtxSetupReady(status: KtxSetupStatus): boolean { +export function isKtxPreAgentSetupReady(status: KtxSetupStatus): boolean { return ( status.project.ready && status.llm.ready && status.embeddings.ready && status.databases.every((database) => database.ready) && status.sources.every((source) => source.ready) && - status.context.ready && - status.agents.some((agent) => agent.ready) + status.context.ready ); } +export function isKtxSetupReady(status: KtxSetupStatus): boolean { + return isKtxPreAgentSetupReady(status) && status.agents.some((agent) => agent.ready); +} + function createPromptAdapter(): KtxSetupReadyMenuPromptAdapter { return { async select(options) { diff --git a/packages/cli/src/setup-sources.test.ts b/packages/cli/src/setup-sources.test.ts index b79e8e66..1a281261 100644 --- a/packages/cli/src/setup-sources.test.ts +++ b/packages/cli/src/setup-sources.test.ts @@ -205,7 +205,7 @@ describe('setup sources step', () => { mappings: { databaseMappings: { '1': 'warehouse' }, syncEnabled: { '1': true }, - syncMode: 'ONLY', + syncMode: 'ALL', }, }); expect(runMapping).toHaveBeenCalledWith(projectDir, 'prod_metabase', io.io); @@ -707,7 +707,7 @@ describe('setup sources step', () => { mappings: { databaseMappings: { '1': 'warehouse' }, syncEnabled: { '1': true }, - syncMode: 'ONLY', + syncMode: 'ALL', }, }, deps: { diff --git a/packages/cli/src/setup-sources.ts b/packages/cli/src/setup-sources.ts index 73d191dc..e6e7f41b 100644 --- a/packages/cli/src/setup-sources.ts +++ b/packages/cli/src/setup-sources.ts @@ -463,7 +463,7 @@ function buildMetabaseConnection(args: KtxSetupSourcesArgs): KtxProjectConnectio mappings: { databaseMappings: { [String(args.metabaseDatabaseId)]: args.sourceWarehouseConnectionId }, syncEnabled: { [String(args.metabaseDatabaseId)]: true }, - syncMode: 'ONLY', + syncMode: 'ALL', }, }; } diff --git a/packages/cli/src/setup.test.ts b/packages/cli/src/setup.test.ts index cf9d22a8..20f12e6e 100644 --- a/packages/cli/src/setup.test.ts +++ b/packages/cli/src/setup.test.ts @@ -1550,6 +1550,102 @@ describe('setup status', () => { expect(calls).toEqual(['agents']); }); + it('skips to agent setup when context is ready but agents are not configured', async () => { + const calls: string[] = []; + const io = makeIo(); + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'project: revenue', + 'setup:', + ' completed_steps:', + ' - project', + ' - llm', + ' - embeddings', + ' - sources', + ' - context', + ' database_connection_ids: []', + 'connections: {}', + 'llm:', + ' provider:', + ' backend: anthropic', + ' models:', + ' default: claude-sonnet-4-6', + 'ingest:', + ' embeddings:', + ' backend: openai', + ' model: text-embedding-3-small', + ' dimensions: 1536', + '', + ].join('\n'), + 'utf-8', + ); + await writeKtxSetupContextState(tempDir, { + runId: 'setup-context-local-ready', + status: 'completed', + startedAt: '2026-05-09T10:00:00.000Z', + updatedAt: '2026-05-09T10:02:00.000Z', + completedAt: '2026-05-09T10:02:00.000Z', + primarySourceConnectionIds: [], + contextSourceConnectionIds: [], + reportIds: [], + artifactPaths: [], + retryableFailedTargets: [], + commands: contextBuildCommands(tempDir, 'setup-context-local-ready'), + }); + + const readyMenuSelect = vi.fn(); + await expect( + runKtxSetup( + { + command: 'run', + projectDir: tempDir, + mode: 'existing', + agents: false, + inputMode: 'auto', + yes: false, + skipLlm: false, + skipEmbeddings: false, + skipDatabases: false, + skipSources: false, + skipAgents: false, + databaseSchemas: [], + }, + io.io, + { + readyMenuDeps: { prompts: { select: readyMenuSelect, cancel: vi.fn() } }, + model: async (args) => { + expect(args.skipLlm).toBe(true); + return { status: 'skipped', projectDir: tempDir }; + }, + embeddings: async (args) => { + expect(args.skipEmbeddings).toBe(true); + return { status: 'skipped', projectDir: tempDir }; + }, + databases: async (args) => { + expect(args.skipDatabases).toBe(true); + return { status: 'skipped', projectDir: tempDir }; + }, + sources: async (args) => { + expect(args.skipSources).toBe(true); + return { status: 'skipped', projectDir: tempDir }; + }, + agents: async () => { + calls.push('agents'); + return { + status: 'ready', + projectDir: tempDir, + installs: [{ target: 'codex', scope: 'project', mode: 'cli' }], + }; + }, + }, + ), + ).resolves.toBe(0); + + expect(readyMenuSelect).not.toHaveBeenCalled(); + expect(calls).toEqual(['agents']); + }); + it('runs only project resolution, context gate, and agent setup in --agents mode', async () => { const io = makeIo(); const context = vi.fn(async () => ({ status: 'ready' as const, projectDir: tempDir, runId: 'setup-context-local-test' })); diff --git a/packages/cli/src/setup.ts b/packages/cli/src/setup.ts index 2aae882e..b9b0b412 100644 --- a/packages/cli/src/setup.ts +++ b/packages/cli/src/setup.ts @@ -24,7 +24,12 @@ import { import { type KtxSetupEmbeddingsDeps, runKtxSetupEmbeddingsStep } from './setup-embeddings.js'; import { type KtxSetupModelDeps, runKtxSetupAnthropicModelStep } from './setup-models.js'; import { type KtxSetupProjectDeps, runKtxSetupProjectStep } from './setup-project.js'; -import { isKtxSetupReady, type KtxSetupReadyMenuDeps, runKtxSetupReadyChangeMenu } from './setup-ready-menu.js'; +import { + isKtxPreAgentSetupReady, + isKtxSetupReady, + type KtxSetupReadyMenuDeps, + runKtxSetupReadyChangeMenu, +} from './setup-ready-menu.js'; import { type KtxSetupSourcesDeps, type KtxSetupSourceType, runKtxSetupSourcesStep } from './setup-sources.js'; import { withMenuOptionsSpacing } from './prompt-navigation.js'; import { @@ -531,9 +536,13 @@ async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetup } } - if (args.inputMode !== 'disabled' && !agentsRequested && isKtxSetupReady(currentStatus)) { - readyAction = (await runKtxSetupReadyChangeMenu(currentStatus, deps.readyMenuDeps)).action; - if (readyAction === 'exit') return 0; + if (args.inputMode !== 'disabled' && !agentsRequested) { + if (isKtxSetupReady(currentStatus)) { + readyAction = (await runKtxSetupReadyChangeMenu(currentStatus, deps.readyMenuDeps)).action; + if (readyAction === 'exit') return 0; + } else if (isKtxPreAgentSetupReady(currentStatus)) { + readyAction = 'agents'; + } } const runOnly = readyAction; diff --git a/packages/context/src/core/git.service.test.ts b/packages/context/src/core/git.service.test.ts index 308bbd4d..14e93495 100644 --- a/packages/context/src/core/git.service.test.ts +++ b/packages/context/src/core/git.service.test.ts @@ -256,6 +256,31 @@ describe('GitService', () => { await service.removeWorktree(wtDir).catch(() => undefined); await rm(wtDir, { recursive: true, force: true }).catch(() => undefined); }); + + it('serializes concurrent commits from scoped services targeting the same worktree', async () => { + const { commitHash } = await writeAndCommit('seed.md', 'seed'); + const parent = await realpath(join(tempDir, '..')); + const wtDir = join(parent, `wt-${Date.now()}-fw-concurrent`); + await service.addWorktree(wtDir, 'session/concurrent', commitHash); + + const first = service.forWorktree(wtDir); + const second = service.forWorktree(wtDir); + await writeFile(join(wtDir, 'a.md'), 'a\n', 'utf-8'); + await writeFile(join(wtDir, 'b.md'), 'b\n', 'utf-8'); + + const [a, b] = await Promise.all([ + first.commitFile('a.md', 'add a', 'System User', 'system@example.com'), + second.commitFile('b.md', 'add b', 'System User', 'system@example.com'), + ]); + + expect(a.commitHash).toMatch(/^[0-9a-f]{40}$/); + expect(b.commitHash).toMatch(/^[0-9a-f]{40}$/); + await expect(first.getFileAtCommit('a.md', a.commitHash)).resolves.toBe('a\n'); + await expect(second.getFileAtCommit('b.md', b.commitHash)).resolves.toBe('b\n'); + + await service.removeWorktree(wtDir).catch(() => undefined); + await rm(wtDir, { recursive: true, force: true }).catch(() => undefined); + }); }); describe('squashMergeIntoMain', () => { diff --git a/packages/context/src/core/git.service.ts b/packages/context/src/core/git.service.ts index 5da67e59..6539f9fd 100644 --- a/packages/context/src/core/git.service.ts +++ b/packages/context/src/core/git.service.ts @@ -32,6 +32,8 @@ export type SquashMergeResult = | { ok: false; conflict: true; conflictPaths: string[] }; export class GitService { + private static readonly mutationQueues = new Map>(); + private readonly logger: KtxLogger; private git!: SimpleGit; private configDir: string; @@ -92,6 +94,15 @@ export class GitService { commitMessage: string, author: string, authorEmail: string, + ): Promise { + return this.withMutationQueue(() => this.commitFileUnlocked(filePath, commitMessage, author, authorEmail)); + } + + private async commitFileUnlocked( + filePath: string, + commitMessage: string, + author: string, + authorEmail: string, ): Promise { try { // Stage the file @@ -166,6 +177,15 @@ export class GitService { commitMessage: string, author: string, authorEmail: string, + ): Promise { + return this.withMutationQueue(() => this.commitFilesUnlocked(filePaths, commitMessage, author, authorEmail)); + } + + private async commitFilesUnlocked( + filePaths: string[], + commitMessage: string, + author: string, + authorEmail: string, ): Promise { try { for (const filePath of filePaths) { @@ -231,6 +251,10 @@ export class GitService { if (filePaths.length === 0) { return; } + return this.withMutationQueue(() => this.checkoutFilesUnlocked(filePaths)); + } + + private async checkoutFilesUnlocked(filePaths: string[]): Promise { try { await this.git.checkout(['--', ...filePaths]); } catch (error) { @@ -292,6 +316,10 @@ export class GitService { if (!trimmed) { return; } + return this.withMutationQueue(() => this.addNoteUnlocked(commitHash, trimmed)); + } + + private async addNoteUnlocked(commitHash: string, trimmed: string): Promise { try { await this.git.raw(['notes', 'add', '-f', '-m', trimmed, commitHash]); } catch (error) { @@ -343,6 +371,15 @@ export class GitService { commitMessage: string, author: string, authorEmail: string, + ): Promise { + return this.withMutationQueue(() => this.deleteFileUnlocked(filePath, commitMessage, author, authorEmail)); + } + + private async deleteFileUnlocked( + filePath: string, + commitMessage: string, + author: string, + authorEmail: string, ): Promise { try { // Remove the file from git @@ -485,6 +522,13 @@ export class GitService { async squashTo( preHead: string, options: { message: string; author: string; authorEmail: string; expectedAuthor?: string }, + ): Promise<{ squashed: boolean; commitHash: string | null; reason?: string; squashedCount?: number }> { + return this.withMutationQueue(() => this.squashToUnlocked(preHead, options)); + } + + private async squashToUnlocked( + preHead: string, + options: { message: string; author: string; authorEmail: string; expectedAuthor?: string }, ): Promise<{ squashed: boolean; commitHash: string | null; reason?: string; squashedCount?: number }> { const { message, author, authorEmail } = options; const expectedAuthor = options.expectedAuthor ?? author; @@ -560,6 +604,15 @@ export class GitService { author: string, authorEmail: string, commitMessage: string, + ): Promise { + return this.withMutationQueue(() => this.squashMergeIntoMainUnlocked(branch, author, authorEmail, commitMessage)); + } + + private async squashMergeIntoMainUnlocked( + branch: string, + author: string, + authorEmail: string, + commitMessage: string, ): Promise { // Diff of HEAD..branch (two dots) lists commits/files reachable from `branch` that // aren't on HEAD — i.e. exactly what the squash would apply. Three dots (HEAD...branch) @@ -615,7 +668,7 @@ export class GitService { * range, which can pause the sequencer on conflicts. */ async resetHardTo(targetSha: string): Promise { - await this.git.raw(['reset', '--hard', targetSha]); + await this.withMutationQueue(() => this.git.raw(['reset', '--hard', targetSha])); } /** @@ -667,6 +720,10 @@ export class GitService { * Used by the memory agent to isolate per-session writes from interactive saves on main. */ async addWorktree(path: string, branch: string, startSha: string): Promise { + await this.withMutationQueue(() => this.addWorktreeUnlocked(path, branch, startSha)); + } + + private async addWorktreeUnlocked(path: string, branch: string, startSha: string): Promise { try { await this.git.raw(['worktree', 'add', '-b', branch, path, startSha]); } catch (error) { @@ -679,6 +736,10 @@ export class GitService { * worktrees are ktx-internal — a clean working tree is not required. */ async removeWorktree(path: string): Promise { + await this.withMutationQueue(() => this.removeWorktreeUnlocked(path)); + } + + private async removeWorktreeUnlocked(path: string): Promise { try { await this.git.raw(['worktree', 'remove', '--force', path]); } catch (error) { @@ -724,7 +785,7 @@ export class GitService { } async deleteBranch(branch: string, force = false): Promise { - await this.git.raw(['branch', force ? '-D' : '-d', branch]); + await this.withMutationQueue(() => this.git.raw(['branch', force ? '-D' : '-d', branch])); } /** @@ -745,6 +806,15 @@ export class GitService { commitMessage: string, author: string, authorEmail: string, + ): Promise { + return this.withMutationQueue(() => this.deleteDirectoryUnlocked(directoryPath, commitMessage, author, authorEmail)); + } + + private async deleteDirectoryUnlocked( + directoryPath: string, + commitMessage: string, + author: string, + authorEmail: string, ): Promise { try { // Remove the directory recursively from git @@ -795,6 +865,17 @@ export class GitService { commitMessage: string, author: string, authorEmail: string, + ): Promise { + return this.withMutationQueue(() => + this.deleteDirectoriesUnlocked(directoryPaths, commitMessage, author, authorEmail), + ); + } + + private async deleteDirectoriesUnlocked( + directoryPaths: string[], + commitMessage: string, + author: string, + authorEmail: string, ): Promise { if (directoryPaths.length === 0) { return { @@ -852,4 +933,27 @@ export class GitService { created: true, }; } + + private async withMutationQueue(operation: () => Promise): Promise { + const key = this.configDir; + const previous = GitService.mutationQueues.get(key) ?? Promise.resolve(); + let release: () => void = () => {}; + const current = previous.catch(() => undefined).then( + () => + new Promise((resolve) => { + release = resolve; + }), + ); + GitService.mutationQueues.set(key, current); + + await previous.catch(() => undefined); + try { + return await operation(); + } finally { + release(); + if (GitService.mutationQueues.get(key) === current) { + GitService.mutationQueues.delete(key); + } + } + } } diff --git a/packages/context/src/ingest/adapters/metabase/chunk.test.ts b/packages/context/src/ingest/adapters/metabase/chunk.test.ts index 46a3ce97..1991e147 100644 --- a/packages/context/src/ingest/adapters/metabase/chunk.test.ts +++ b/packages/context/src/ingest/adapters/metabase/chunk.test.ts @@ -284,6 +284,18 @@ describe('chunkMetabaseStagedDir — syncMode enum coverage', () => { expect(allRawFiles).not.toContain('cards/200.json'); }); + it('ONLY with no selections includes every matching card for old generated configs', async () => { + await writeInline(dir, 'sync-config.json', { + ...BASE_SYNC, + syncMode: 'ONLY', + selections: [], + }); + const result = await chunkMetabaseStagedDir(dir); + const allRawFiles = result.workUnits.flatMap((wu) => wu.rawFiles); + expect(allRawFiles).toContain('cards/100.json'); + expect(allRawFiles).toContain('cards/200.json'); + }); + it('EXCEPT excludes cards in selected collections; includes the rest', async () => { await writeInline(dir, 'sync-config.json', { ...BASE_SYNC, diff --git a/packages/context/src/ingest/adapters/metabase/chunk.ts b/packages/context/src/ingest/adapters/metabase/chunk.ts index 2fe719c5..ab2b1d78 100644 --- a/packages/context/src/ingest/adapters/metabase/chunk.ts +++ b/packages/context/src/ingest/adapters/metabase/chunk.ts @@ -66,7 +66,7 @@ function cardMatchesSyncConfig(card: StagedCardFile, config: StagedSyncConfig): if (card.archived) { return false; } - if (config.syncMode === 'ALL') { + if (config.syncMode === 'ALL' || (config.syncMode === 'ONLY' && config.selections.length === 0)) { return true; } const selectedCollections = new Set( diff --git a/packages/context/src/ingest/adapters/metabase/client.test.ts b/packages/context/src/ingest/adapters/metabase/client.test.ts index d6d7a4d9..f81939c6 100644 --- a/packages/context/src/ingest/adapters/metabase/client.test.ts +++ b/packages/context/src/ingest/adapters/metabase/client.test.ts @@ -327,6 +327,40 @@ describe('MetabaseClient.getResolvedSql', () => { expect(result?.resolvedSql).toBe('SELECT * FROM (SELECT a, b FROM base) t '); }); + it('inlines native-query snippets before checking for remaining variables', async () => { + const requestSpy = vi.fn().mockResolvedValue([ + { + id: 1, + name: 'account_join', + content: 'LEFT JOIN accounts a ON a.account_id = mart.account_id', + }, + ]); + const requestWithCustomRetrySpy = vi.fn(); + const client = makeClient((client) => { + Reflect.set(client, 'request', requestSpy); + Reflect.set(client, 'requestWithCustomRetry', requestWithCustomRetrySpy); + }); + const card = nativeCard('SELECT a.account_name FROM mart {{snippet: account_join}}', { + 'snippet: account_join': { + id: 'snippet-tag', + name: 'snippet: account_join', + type: 'snippet', + 'snippet-name': 'account_join', + 'snippet-id': 1, + }, + }); + + const result = await client.getResolvedSql(card); + + expect(requestSpy).toHaveBeenCalledWith('GET', '/api/native-query-snippet'); + expect(requestWithCustomRetrySpy).not.toHaveBeenCalled(); + expect(result?.resolutionStatus).toBe('resolved'); + expect(result?.resolvedSql).toBe( + 'SELECT a.account_name FROM mart LEFT JOIN accounts a ON a.account_id = mart.account_id', + ); + expect(result?.resolvedSql).not.toContain('{{snippet:'); + }); + it('uses /api/dataset/native for naked variables and prepends a warning comment', async () => { const requestSpy = vi.fn().mockResolvedValue({ query: "SELECT * WHERE id = 'placeholder' AND n = 1" }); const client = makeClient((client) => { diff --git a/packages/context/src/ingest/adapters/metabase/client.ts b/packages/context/src/ingest/adapters/metabase/client.ts index 2ddd970a..70e70964 100644 --- a/packages/context/src/ingest/adapters/metabase/client.ts +++ b/packages/context/src/ingest/adapters/metabase/client.ts @@ -39,6 +39,13 @@ interface TemplateTagInfo { dummyValue: string | null; } +interface NativeQuerySnippet { + id: number; + name: string; + content: string; + archived?: boolean | null; +} + interface CreateCardParams { name: string; databaseId: number; @@ -100,6 +107,43 @@ function collectRemainingPlaceholderNames(sql: string): Set { return names; } +function collectRemainingSnippetNames(sql: string): Set { + const names = new Set(); + for (const match of sql.matchAll(/\{\{\s*snippet:\s*([^}]+?)\s*\}\}/gi)) { + names.add(match[1].trim()); + } + return names; +} + +function normalizeSnippetName(name: string | null | undefined): string { + return (name ?? '').replace(/^snippet:\s*/i, '').trim().toLowerCase(); +} + +function parseNativeQuerySnippets(value: unknown): NativeQuerySnippet[] { + const rawItems = Array.isArray(value) + ? value + : typeof value === 'object' && value !== null && Array.isArray((value as { data?: unknown }).data) + ? (value as { data: unknown[] }).data + : []; + const snippets: NativeQuerySnippet[] = []; + for (const item of rawItems) { + if (typeof item !== 'object' || item === null || Array.isArray(item)) { + continue; + } + const rec = item as Record; + if (typeof rec.id !== 'number' || typeof rec.name !== 'string' || typeof rec.content !== 'string') { + continue; + } + snippets.push({ + id: rec.id, + name: rec.name, + content: rec.content, + ...(typeof rec.archived === 'boolean' ? { archived: rec.archived } : {}), + }); + } + return snippets; +} + function injectNativeSql(datasetQuery: MetabaseDatasetQuery, sql: string): MetabaseDatasetQuery { if (datasetQuery?.stages?.[0]?.native !== undefined) { const stages = [...(datasetQuery.stages ?? [])]; @@ -148,6 +192,7 @@ export class MetabaseClient implements MetabaseRuntimeClient { private readonly logger: MetabaseClientLogger; private readonly baseUrl: string; private readonly config: MetabaseClientConfig; + private snippetCache: Promise | null = null; constructor( runtime: MetabaseClientRuntimeConfig, @@ -261,6 +306,63 @@ export class MetabaseClient implements MetabaseRuntimeClient { return this.request('GET', '/api/card/?f=all'); } + private getNativeQuerySnippets(): Promise { + this.snippetCache ??= this.request('GET', '/api/native-query-snippet').then(parseNativeQuerySnippets); + return this.snippetCache; + } + + private async inlineNativeQuerySnippets( + sql: string, + templateTags: MetabaseTemplateTag[], + cardId: number, + ): Promise<{ sql: string; unresolved: string[] }> { + const names = collectRemainingSnippetNames(sql); + if (names.size === 0) { + return { sql, unresolved: [] }; + } + + let snippets: NativeQuerySnippet[]; + try { + snippets = await this.getNativeQuerySnippets(); + } catch (error) { + this.logger.warn( + `[metabase] failed to load native query snippets for card ${cardId}; leaving snippet placeholders unresolved: ${error instanceof Error ? error.message : String(error)}`, + ); + return { sql, unresolved: [...names] }; + } + + const snippetsById = new Map(); + const snippetsByName = new Map(); + for (const snippet of snippets) { + if (snippet.archived === true) { + continue; + } + snippetsById.set(snippet.id, snippet); + snippetsByName.set(normalizeSnippetName(snippet.name), snippet); + } + + const snippetTags = templateTags.filter((tag) => tag.type === 'snippet'); + const unresolved = new Set(); + const inlinedSql = sql.replace(/\{\{\s*snippet:\s*([^}]+?)\s*\}\}/gi, (match, rawName: string) => { + const normalizedName = normalizeSnippetName(rawName); + const tag = snippetTags.find( + (candidate) => + normalizeSnippetName(candidate['snippet-name']) === normalizedName || + normalizeSnippetName(candidate.name) === normalizedName, + ); + const snippet = + (typeof tag?.['snippet-id'] === 'number' ? snippetsById.get(tag['snippet-id']) : undefined) ?? + snippetsByName.get(normalizedName); + if (!snippet) { + unresolved.add(rawName.trim()); + return match; + } + return snippet.content; + }); + + return { sql: inlinedSql, unresolved: [...unresolved] }; + } + async convertMbqlToNative(datasetQuery: MetabaseDatasetQuery): Promise { return this.request('POST', '/api/dataset/native', { ...datasetQuery, @@ -351,7 +453,18 @@ export class MetabaseClient implements MetabaseRuntimeClient { // silently filter rows out — see incident with auction_seller_bidder_pair_suspicion). let processedSql = stripOptionalClauses(nativeQuery); - // Step 2: inline {{#CARD_ID}} card references locally. Recursively strip optional + // Step 2: inline native-query snippets. Metabase's substitution endpoint does not + // always expand {{snippet: name}} for fetched card SQL, but the snippets API does. + const snippetResult = await this.inlineNativeQuerySnippets(processedSql, templateTagEntries, card.id); + processedSql = snippetResult.sql; + if (snippetResult.unresolved.length > 0) { + this.logger.warn( + `[metabase] card ${card.id} has unresolved SQL snippets: ${snippetResult.unresolved.join(', ')}`, + ); + return { resolvedSql: processedSql, templateTags, resolutionStatus: 'fallback' }; + } + + // Step 3: inline {{#CARD_ID}} card references locally. Recursively strip optional // clauses in referenced cards too — the same reasoning applies all the way down. try { processedSql = await expandCardReferences(processedSql, { @@ -361,7 +474,17 @@ export class MetabaseClient implements MetabaseRuntimeClient { if (!referencedNative) { throw new Error(`referenced card ${id} has no native query`); } - return { native_query: stripOptionalClauses(referencedNative) }; + const referencedSnippetResult = await this.inlineNativeQuerySnippets( + stripOptionalClauses(referencedNative), + Object.values(this.getTemplateTags(referenced)), + referenced.id, + ); + if (referencedSnippetResult.unresolved.length > 0) { + throw new Error( + `referenced card ${id} has unresolved SQL snippets: ${referencedSnippetResult.unresolved.join(', ')}`, + ); + } + return { native_query: referencedSnippetResult.sql }; }, }); } catch (err) { @@ -372,7 +495,7 @@ export class MetabaseClient implements MetabaseRuntimeClient { throw err; } - // Step 3: collect template tags that still appear in the SQL after strip + inline. + // Step 4: collect template tags that still appear in the SQL after strip + inline. // Anything bracketed-only is gone now; anything card-referenced is inlined. const remainingNames = collectRemainingPlaceholderNames(processedSql); const remainingTags = templateTagEntries.filter((tag) => tag.type !== 'snippet' && remainingNames.has(tag.name)); @@ -381,7 +504,7 @@ export class MetabaseClient implements MetabaseRuntimeClient { return { resolvedSql: processedSql, templateTags, resolutionStatus: 'resolved' }; } - // Step 4: dummy-substitute the remaining naked {{ var }} placeholders via Metabase's + // Step 5: dummy-substitute the remaining naked {{ var }} placeholders via Metabase's // substitution endpoint. Only required because we can't translate dimension-tag // bindings to warehouse columns ourselves. Prepend a SQL comment listing every // dummy substitution so downstream consumers (the metabase_ingest LLM) know which diff --git a/packages/context/src/ingest/adapters/metabase/fetch-scope.test.ts b/packages/context/src/ingest/adapters/metabase/fetch-scope.test.ts index 1d8d2478..9768c0c9 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch-scope.test.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch-scope.test.ts @@ -57,13 +57,9 @@ describe('computeFetchScope', () => { }); }); - it('returns empty explicit scope for ONLY with no selections', () => { + it('treats generated ONLY with no selections as all', () => { const scope = computeFetchScope({ ...BASE_CONFIG, syncMode: 'ONLY', selections: [] }); - expect(scope).toEqual({ - kind: 'explicit', - includeCardIds: new Set(), - includeCollectionIds: new Set(), - }); + expect(scope).toEqual({ kind: 'all' }); }); }); diff --git a/packages/context/src/ingest/adapters/metabase/fetch-scope.ts b/packages/context/src/ingest/adapters/metabase/fetch-scope.ts index bee97ec8..e09ef7c3 100644 --- a/packages/context/src/ingest/adapters/metabase/fetch-scope.ts +++ b/packages/context/src/ingest/adapters/metabase/fetch-scope.ts @@ -11,7 +11,7 @@ export type FetchScope = * union the fetcher switches on. Pure function; no I/O, no side effects. */ export function computeFetchScope(syncConfig: StagedSyncConfig): FetchScope { - if (syncConfig.syncMode === 'ALL') { + if (syncConfig.syncMode === 'ALL' || (syncConfig.syncMode === 'ONLY' && syncConfig.selections.length === 0)) { return { kind: 'all' }; } const cardIds = new Set(); diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 6ba778e5..0515842a 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -79,6 +79,21 @@ function countMemoryFlowActions(actions: MemoryAction[], target: MemoryAction['t return actions.filter((action) => action.target === target).length; } +function isStructuredToolFailure(output: unknown): boolean { + if (!output || typeof output !== 'object') { + return false; + } + const structured = (output as { structured?: unknown }).structured; + return !!structured && typeof structured === 'object' && (structured as { success?: unknown }).success === false; +} + +function isFailedToolCall(entry: ToolCallLogEntry): boolean { + if (entry.error) { + return true; + } + return (entry.toolName === 'sl_write_source' || entry.toolName === 'wiki_write') && isStructuredToolFailure(entry.output); +} + function reportIdFromCreateResult(result: unknown): string | undefined { if (!result || typeof result !== 'object' || !('id' in result)) { return undefined; @@ -344,7 +359,7 @@ export class IngestBundleRunner { toolNames: new Set(), } satisfies MutableToolTranscriptSummary); current.toolCallCount += 1; - current.errorCount += entry.error ? 1 : 0; + current.errorCount += isFailedToolCall(entry) ? 1 : 0; current.toolNames.add(entry.toolName); transcriptSummaries.set(entry.wuKey, current); }; @@ -712,6 +727,7 @@ export class IngestBundleRunner { sourceKey: job.sourceKey, connectionId: job.connectionId, jobId: job.jobId, + toolFailureCount: (unitKey) => transcriptSummaries.get(unitKey)?.errorCount ?? 0, onStepFinish: ({ stepIndex, stepBudget }) => { memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget }); }, diff --git a/packages/context/src/ingest/local-bundle-ingest.test.ts b/packages/context/src/ingest/local-bundle-ingest.test.ts index aa423d9e..6e9aa4aa 100644 --- a/packages/context/src/ingest/local-bundle-ingest.test.ts +++ b/packages/context/src/ingest/local-bundle-ingest.test.ts @@ -1,6 +1,7 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; +import Database from 'better-sqlite3'; import { AgentRunnerService } from '../agent/index.js'; import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js'; import { makeLocalGitRepo } from '../test/make-local-git-repo.js'; @@ -57,6 +58,34 @@ class LookerSlWritingAgentRunner extends AgentRunnerService { } } +class WikiWritingAgentRunner extends AgentRunnerService { + override runLoop = vi.fn(async (params: any) => { + if (params.telemetryTags?.operationName === 'ingest-bundle-wu') { + const wikiWrite = params.toolSet.wiki_write; + if (!wikiWrite?.execute) { + throw new Error('wiki_write tool was not available to the WorkUnit'); + } + const result = await wikiWrite.execute( + { + key: 'orders_context', + summary: 'Orders source context', + content: 'Orders are purchase records used for revenue analysis.', + tags: ['orders'], + }, + { toolCallId: 'wiki-write' }, + ); + if (!result.structured.success) { + throw new Error(result.markdown); + } + } + return { stopReason: 'natural' as const }; + }); + + constructor() { + super({ llmProvider: { getModel: () => ({}) as never } as never }); + } +} + function makeLookerRuntimeClient() { const lookerModels = { models: [{ name: 'ecommerce', label: 'Ecommerce', explores: [{ name: 'orders', label: 'Orders' }] }], @@ -252,6 +281,33 @@ describe('canonical local ingest', () => { }); }); + it('indexes wiki pages written by local ingest into the SQLite knowledge tables', async () => { + const sourceDir = join(tempDir, 'source'); + await mkdir(join(sourceDir, 'orders'), { recursive: true }); + await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8'); + const agentRunner = new WikiWritingAgentRunner(); + + const result = await runLocalIngest({ + project, + adapters: [new FakeSourceAdapter()], + adapter: 'fake', + connectionId: 'warehouse', + sourceDir, + jobId: 'wiki-local-1', + agentRunner, + }); + + expect(result.result.failedWorkUnits).toEqual([]); + const db = new Database(join(project.projectDir, '.ktx', 'db.sqlite'), { readonly: true }); + try { + expect(db.prepare('SELECT key, summary FROM knowledge_pages ORDER BY key').all()).toEqual([ + { key: 'orders_context', summary: 'Orders source context' }, + ]); + } finally { + db.close(); + } + }); + it('rejects direct Metabase scheduled pulls before requiring a local ingest LLM provider', async () => { const projectDir = join(tempDir, 'metabase-project'); await initKtxProject({ projectDir, projectName: 'warehouse' }); diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index 6665682b..f7c8be80 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -56,6 +56,8 @@ import { type KnowledgeIndexPort, KnowledgeWikiService, searchLocalKnowledgePages, + SqliteKnowledgeIndex, + type SqliteKnowledgeIndexPage, WikiListTagsTool, WikiReadTool, WikiRemoveTool, @@ -257,6 +259,17 @@ function parseWiki(raw: string): { summary: string; content: string } { }; } +function parseWikiTags(raw: string): string[] { + const match = raw.match(/^---\n([\s\S]*?)\n---\n?/); + if (!match) { + return []; + } + const frontmatter = (YAML.parse(match[1]) ?? {}) as Record; + return Array.isArray(frontmatter.tags) + ? frontmatter.tags.filter((tag): tag is string => typeof tag === 'string') + : []; +} + function scoreText(text: string, query: string): number { const normalized = query.toLowerCase().trim(); if (!normalized) { @@ -271,21 +284,49 @@ function scoreText(text: string, query: string): number { } class LocalKnowledgeIndex implements KnowledgeIndexPort { - constructor(private readonly project: KtxLocalProject) {} + private readonly sqlite: SqliteKnowledgeIndex; - async upsertPage(): Promise {} - - async applyDiffTransactional(): Promise {} - - async getExistingSearchTexts(): Promise> { - return new Map(); + constructor(private readonly project: KtxLocalProject) { + this.sqlite = new SqliteKnowledgeIndex({ dbPath: ktxLocalStateDbPath(project) }); } - async deleteStale(): Promise {} + async upsertPage(): Promise { + await this.syncAllPagesFromDisk(); + } - async deleteByScope(): Promise {} + async applyDiffTransactional(): Promise { + await this.syncAllPagesFromDisk(); + } - async deleteByKey(): Promise {} + async getExistingSearchTexts( + scope: string, + scopeId: string | null, + ): Promise> { + const prefix = scope === 'GLOBAL' ? 'knowledge/global/' : `knowledge/user/${scopeId}/`; + const result = new Map(); + for (const [path, page] of this.sqlite.getExistingPages()) { + if (!path.startsWith(prefix)) { + continue; + } + result.set(path.slice(prefix.length).replace(/\.md$/, ''), { + searchText: page.searchText, + hasEmbedding: page.embedding !== null, + }); + } + return result; + } + + async deleteStale(): Promise { + await this.syncAllPagesFromDisk(); + } + + async deleteByScope(): Promise { + await this.syncAllPagesFromDisk(); + } + + async deleteByKey(): Promise { + await this.syncAllPagesFromDisk(); + } async findPageByKey(scope: string, scopeId: string | null, pageKey: string) { const path = scope === 'GLOBAL' ? `knowledge/global/${pageKey}.md` : `knowledge/user/${scopeId}/${pageKey}.md`; @@ -344,6 +385,41 @@ class LocalKnowledgeIndex implements KnowledgeIndexPort { .sort((left, right) => right.rrfScore - left.rrfScore || left.pageKey.localeCompare(right.pageKey)) .slice(0, limit); } + + private async syncAllPagesFromDisk(): Promise { + const listed = await this.project.fileStore.listFiles('knowledge', true); + const pages: SqliteKnowledgeIndexPage[] = []; + for (const file of listed.files.filter((entry) => entry.endsWith('.md'))) { + const parsedPath = parseKnowledgeIndexPath(file); + if (!parsedPath) { + continue; + } + const path = `knowledge/${file}`; + const raw = await this.project.fileStore.readFile(path); + const parsed = parseWiki(raw.content); + pages.push({ + path, + key: parsedPath.pageKey, + scope: parsedPath.scope, + summary: parsed.summary, + content: parsed.content, + tags: parseWikiTags(raw.content), + embedding: null, + }); + } + this.sqlite.sync(pages); + } +} + +function parseKnowledgeIndexPath(file: string): { scope: 'GLOBAL' | 'USER'; pageKey: string } | null { + const segments = file.split('/'); + if (segments.length === 2 && segments[0] === 'global') { + return { scope: 'GLOBAL', pageKey: segments[1].replace(/\.md$/, '') }; + } + if (segments.length === 3 && segments[0] === 'user') { + return { scope: 'USER', pageKey: segments[2].replace(/\.md$/, '') }; + } + return null; } class NoopKnowledgeEventPort implements KnowledgeEventPort { diff --git a/packages/context/src/ingest/stages/stage-3-work-units.test.ts b/packages/context/src/ingest/stages/stage-3-work-units.test.ts index ba01d60d..23ec3fa8 100644 --- a/packages/context/src/ingest/stages/stage-3-work-units.test.ts +++ b/packages/context/src/ingest/stages/stage-3-work-units.test.ts @@ -106,6 +106,21 @@ describe('Stage 3 — executeWorkUnit', () => { expect(deps.resetHardTo).toHaveBeenCalledWith('pre'); }); + it('tool failures reset to the pre-WU SHA and mark WU failed even when the loop ends naturally', async () => { + const deps = makeDeps(); + deps.sessionWorktreeGit.revParseHead = vi.fn().mockResolvedValueOnce('pre').mockResolvedValueOnce('post'); + deps.agentRunner.runLoop = vi.fn().mockResolvedValue({ stopReason: 'natural' }); + deps.toolFailureCount = vi.fn().mockReturnValue(2); + + const outcome = await executeWorkUnit(deps, makeWu()); + + expect(outcome.status).toBe('failed'); + expect(outcome.reason).toContain('2 tool call(s) failed'); + expect(outcome.actions).toEqual([]); + expect(outcome.touchedSlSources).toEqual([]); + expect(deps.resetHardTo).toHaveBeenCalledWith('pre'); + }); + it('runner loop thrown exception resets to the pre-WU SHA and marks WU failed', async () => { const deps = makeDeps(); deps.sessionWorktreeGit.revParseHead = vi.fn().mockResolvedValueOnce('pre').mockResolvedValueOnce('post'); diff --git a/packages/context/src/ingest/stages/stage-3-work-units.ts b/packages/context/src/ingest/stages/stage-3-work-units.ts index bbf23079..b6e64f86 100644 --- a/packages/context/src/ingest/stages/stage-3-work-units.ts +++ b/packages/context/src/ingest/stages/stage-3-work-units.ts @@ -28,6 +28,7 @@ export interface WorkUnitExecutionDeps { connectionId: string; jobId: string; onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void; + toolFailureCount?: (unitKey: string) => number; } export interface WorkUnitOutcome { @@ -128,6 +129,11 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit) return failWithReset(runResult.error?.message ?? 'agent loop errored'); } + const toolFailureCount = deps.toolFailureCount?.(wu.unitKey) ?? 0; + if (toolFailureCount > 0) { + return failWithReset(`${toolFailureCount} tool call(s) failed during WorkUnit ${wu.unitKey}`); + } + const touched = listTouchedSlSources(deps.captureSession.touchedSlSources); if (touched.length > 0) { const validation = await deps.validateTouchedSources(touched);