test: move slow suites to ci tier

This commit is contained in:
Andrey Avtomonov 2026-05-11 09:38:10 +02:00
parent eee46896f1
commit 04d628c16c
10 changed files with 1681 additions and 1538 deletions

View file

@ -39,6 +39,12 @@ jobs:
- name: Run TypeScript checks
run: pnpm run check
- name: Run slow TypeScript tests
run: pnpm run test:slow
- name: Run CLI smoke tests
run: pnpm run smoke
- name: Setup Python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:

View file

@ -30,6 +30,7 @@
"relationships:verify-orbit": "node scripts/relationship-orbit-verification.mjs",
"smoke": "pnpm run build && pnpm --filter @ktx/cli run smoke",
"test": "node --test scripts/*.test.mjs && pnpm --filter './packages/*' run test",
"test:slow": "pnpm --filter @ktx/context run test:slow && pnpm --filter @ktx/cli run test:slow",
"type-check": "pnpm --filter './packages/*' run type-check"
},
"devDependencies": {

View file

@ -28,7 +28,8 @@
"assets:demo": "node scripts/build-demo-assets.mjs",
"build": "node -e \"fs.rmSync('dist', { recursive: true, force: true })\" && tsc -p tsconfig.json && node ../../scripts/prepare-cli-bin.mjs",
"smoke": "vitest run src/standalone-smoke.test.ts src/example-smoke.test.ts --testTimeout 30000",
"test": "vitest run",
"test": "vitest run --exclude src/standalone-smoke.test.ts --exclude src/example-smoke.test.ts --exclude src/setup-databases.test.ts --exclude src/scan.test.ts --exclude src/commands/connection-metabase-setup.test.ts --exclude src/setup-models.test.ts --exclude src/setup-sources.test.ts --exclude src/setup.test.ts --exclude src/connection.test.ts --exclude src/setup-embeddings.test.ts --exclude src/ingest.test.ts --exclude src/commands/connection-mapping.test.ts --exclude src/ingest-viz.test.ts --exclude src/demo.test.ts --exclude src/setup-project.test.ts --exclude src/sl.test.ts --exclude src/local-scan-connectors.test.ts --exclude src/commands/connection-notion.test.ts",
"test:slow": "vitest run src/setup-databases.test.ts src/scan.test.ts src/commands/connection-metabase-setup.test.ts src/setup-models.test.ts src/setup-sources.test.ts src/setup.test.ts src/connection.test.ts src/setup-embeddings.test.ts src/ingest.test.ts src/commands/connection-mapping.test.ts src/ingest-viz.test.ts src/demo.test.ts src/setup-project.test.ts src/sl.test.ts src/local-scan-connectors.test.ts src/commands/connection-notion.test.ts --testTimeout 30000",
"type-check": "tsc -p tsconfig.json --noEmit"
},
"dependencies": {

View file

@ -0,0 +1,863 @@
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import {
type LocalIngestResult,
type MemoryFlowReplayInput,
type RunLocalIngestOptions,
} from '@ktx/context/ingest';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { runKtxIngest } from './ingest.js';
import {
completedLocalBundleRun,
emitLiveLocalMemoryFlow,
localFakeBundleReport,
makeIo,
persistLocalBundleReport,
writeBundleReportFile,
writeWarehouseConfig,
} from './ingest.test-utils.js';
import { resetVizFallbackWarningsForTest } from './viz-fallback.js';
describe('runKtxIngest viz and replay', () => {
let tempDir: string;
let originalTerm: string | undefined;
beforeEach(async () => {
resetVizFallbackWarningsForTest();
originalTerm = process.env.TERM;
process.env.TERM = 'xterm-256color';
tempDir = await mkdtemp(join(tmpdir(), 'ktx-cli-ingest-'));
});
afterEach(async () => {
if (originalTerm === undefined) {
delete process.env.TERM;
} else {
process.env.TERM = originalTerm;
}
await rm(tempDir, { recursive: true, force: true });
});
it('renders live memory-flow frames for run --viz when stdout is interactive', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const runLocal = vi.fn(async (input: RunLocalIngestOptions): Promise<LocalIngestResult> => {
input.memoryFlow?.emit({ type: 'source_acquired', adapter: 'fake', trigger: 'manual_resync', fileCount: 1 });
input.memoryFlow?.update({ syncId: 'sync-live-1' });
input.memoryFlow?.emit({ type: 'raw_snapshot_written', syncId: 'sync-live-1', rawFileCount: 1 });
input.memoryFlow?.emit({ type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 });
input.memoryFlow?.update({
plannedWorkUnits: [
{
unitKey: 'fake-orders',
rawFiles: ['orders/orders.json'],
peerFileCount: 0,
dependencyCount: 0,
},
],
});
input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 });
input.memoryFlow?.emit({ type: 'report_created', runId: 'live-viz-run' });
input.memoryFlow?.finish('done');
return completedLocalBundleRun(input, 'live-viz-run');
});
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
const startLiveMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => null);
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'viz',
},
io.io,
{
runLocalIngest: runLocal,
startLiveMemoryFlow,
jobIdFactory: () => 'live-viz-run',
now: () => new Date('2026-04-30T14:00:00.000Z'),
},
),
).resolves.toBe(0);
expect(runLocal).toHaveBeenCalledWith(expect.objectContaining({ memoryFlow: expect.any(Object) }));
expect(io.stdout()).toContain('\u001b[2J\u001b[H');
expect((io.stdout().match(/KTX memory flow/g) ?? []).length).toBeGreaterThan(1);
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
expect(io.stdout()).toContain('fake-orders');
expect(io.stderr()).toBe('');
});
it('uses the TUI live session for run --viz when stdin and stdout are interactive', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const runLocal = vi.fn(async (input: RunLocalIngestOptions): Promise<LocalIngestResult> => {
emitLiveLocalMemoryFlow(input.memoryFlow);
return completedLocalBundleRun(input, 'live-viz-run');
});
const liveSession = {
update: vi.fn(),
close: vi.fn(),
isClosed: vi.fn(() => false),
};
const startLiveMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => liveSession);
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'viz',
},
io.io,
{
runLocalIngest: runLocal,
startLiveMemoryFlow,
jobIdFactory: () => 'live-viz-run',
now: () => new Date('2026-04-30T14:00:00.000Z'),
},
),
).resolves.toBe(0);
expect(startLiveMemoryFlow).toHaveBeenCalledTimes(1);
expect(startLiveMemoryFlow.mock.calls[0]?.[0]).toMatchObject({
runId: 'live-viz-run',
connectionId: 'warehouse',
adapter: 'fake',
status: 'running',
});
expect(liveSession.update).toHaveBeenCalled();
expect(liveSession.close).toHaveBeenCalledTimes(1);
expect(io.stdout()).not.toContain('\u001b[2J\u001b[H');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toBe('');
});
it('prints a final plain summary after live viz completes', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
const liveSession = {
update: vi.fn(),
close: vi.fn(),
isClosed: vi.fn(() => false),
};
const startLiveMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => liveSession);
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => {
emitLiveLocalMemoryFlow(input.memoryFlow);
return completedLocalBundleRun(input, 'live-summary');
});
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
outputMode: 'viz',
},
io.io,
{ runLocalIngest: runLocal, startLiveMemoryFlow },
),
).resolves.toBe(0);
expect(liveSession.close).toHaveBeenCalledTimes(1);
expect(io.stdout()).toContain('Memory-flow summary: done');
expect(io.stdout()).toContain('Connection: warehouse');
});
it('falls back to text live rendering when the TUI live session is unavailable', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const runLocal = vi.fn(async (input: RunLocalIngestOptions): Promise<LocalIngestResult> => {
emitLiveLocalMemoryFlow(input.memoryFlow);
return completedLocalBundleRun(input, 'live-viz-run');
});
const startLiveMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => null);
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'viz',
},
io.io,
{
runLocalIngest: runLocal,
startLiveMemoryFlow,
jobIdFactory: () => 'live-viz-run',
},
),
).resolves.toBe(0);
expect(startLiveMemoryFlow).toHaveBeenCalledTimes(1);
expect(io.stdout()).toContain('\u001b[2J\u001b[H');
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
});
it('falls back to text live rendering when TUI startup fails with a redacted warning', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const runLocal = vi.fn(async (input: RunLocalIngestOptions): Promise<LocalIngestResult> => {
emitLiveLocalMemoryFlow(input.memoryFlow);
return completedLocalBundleRun(input, 'live-viz-run');
});
const startLiveMemoryFlow = vi.fn(
async (_input: MemoryFlowReplayInput, ioArg: { stderr: { write(chunk: string): void } }) => {
ioArg.stderr.write('TUI visualization unavailable: Failed [redacted-url] [redacted]; using text renderer.\n');
return null;
},
);
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'viz',
},
io.io,
{
runLocalIngest: runLocal,
startLiveMemoryFlow,
jobIdFactory: () => 'live-viz-run',
},
),
).resolves.toBe(0);
expect(io.stderr()).toContain('TUI visualization unavailable: Failed [redacted-url] [redacted]');
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
expect(io.stdout()).toContain('\u001b[2J\u001b[H');
});
it('does not start live TUI when run --viz disables input', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const runLocal = vi.fn(async (input: RunLocalIngestOptions): Promise<LocalIngestResult> => {
return completedLocalBundleRun(input, 'no-input-live-viz-run');
});
const startLiveMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => ({
update: vi.fn(),
close: vi.fn(),
isClosed: vi.fn(() => false),
}));
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'viz',
inputMode: 'disabled',
},
io.io,
{ runLocalIngest: runLocal, startLiveMemoryFlow },
),
).resolves.toBe(0);
expect(startLiveMemoryFlow).not.toHaveBeenCalled();
expect(runLocal).toHaveBeenCalledWith(expect.not.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
});
it('does not attach a live memory-flow sink for plain run output', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => completedLocalBundleRun(input, 'plain-run'));
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'plain',
},
io.io,
{ runLocalIngest: runLocal },
),
).resolves.toBe(0);
expect(runLocal).toHaveBeenCalledWith(expect.not.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('Job: plain-run');
expect(io.stdout()).not.toContain('KTX memory flow');
});
it('falls back to plain run output for run --viz when stdout is not interactive', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const io = makeIo({ isTTY: false });
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => completedLocalBundleRun(input, 'non-tty-viz-run'));
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'viz',
},
io.io,
{
runLocalIngest: runLocal,
jobIdFactory: () => 'non-tty-viz-run',
},
),
).resolves.toBe(0);
expect(io.stdout()).toContain('Job: non-tty-viz-run');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(
'Visualization requested but stdout is not an interactive terminal; printing plain output.',
);
});
it('falls back to plain run output for run --viz when stdin raw mode is unavailable', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const io = makeIo({ isTTY: true, stdinIsTTY: true, rawMode: false, columns: 120 });
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => completedLocalBundleRun(input, 'raw-missing-viz-run'));
const startLiveMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => ({
update: vi.fn(),
close: vi.fn(),
isClosed: vi.fn(() => false),
}));
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
sourceDir,
outputMode: 'viz',
},
io.io,
{
runLocalIngest: runLocal,
startLiveMemoryFlow,
jobIdFactory: () => 'raw-missing-viz-run',
},
),
).resolves.toBe(0);
expect(startLiveMemoryFlow).not.toHaveBeenCalled();
expect(runLocal).toHaveBeenCalledWith(expect.not.objectContaining({ memoryFlow: expect.anything() }));
expect(io.stdout()).toContain('Job: raw-missing-viz-run');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(
'Visualization requested but stdin raw mode is unavailable; printing plain output.',
);
});
it('returns an error code for missing status', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const io = makeIo();
await expect(
runKtxIngest({ command: 'status', projectDir, runId: 'missing-run', outputMode: 'plain' }, io.io),
).resolves.toBe(1);
expect(io.stderr()).toContain('Local ingest run or report "missing-run" was not found');
});
it('uses the latest local ingest report when status has no run id', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
await persistLocalBundleReport(projectDir, localFakeBundleReport('older-run'));
await persistLocalBundleReport(projectDir, localFakeBundleReport('newer-run'));
const io = makeIo();
await expect(runKtxIngest({ command: 'status', projectDir, outputMode: 'plain' }, io.io)).resolves.toBe(0);
expect(io.stdout()).toContain('Run: run-newer-run');
expect(io.stdout()).toContain('Job: newer-run');
expect(io.stderr()).toBe('');
});
it('renders the latest local ingest report through watch when run id is omitted', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
await persistLocalBundleReport(projectDir, localFakeBundleReport('watch-latest'));
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest({ command: 'watch', projectDir, outputMode: 'viz', inputMode: 'disabled' }, io.io),
).resolves.toBe(0);
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
expect(io.stdout()).toContain('Run: run-watch-latest');
expect(io.stderr()).toBe('');
});
it('renders report-file replay through the memory-flow TUI', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const reportFile = await writeBundleReportFile(tempDir);
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'replay',
projectDir,
runId: 'job-1',
reportFile,
outputMode: 'viz',
inputMode: 'disabled',
},
io.io,
),
).resolves.toBe(0);
expect(io.stdout()).toContain('KTX memory flow warehouse/metabase done');
expect(io.stdout()).toContain('Saved 2 memories from 2 raw files');
expect(io.stdout()).toContain('Commit: abc12345 Run: run-1 Report: report-1');
expect(io.stdout()).toContain('SOURCE');
expect(io.stdout()).toContain('ACTIONS');
expect(io.stdout()).toContain('SAVED');
expect(io.stderr()).toBe('');
});
it('prints report-file JSON without looking up local ingest status', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const reportFile = await writeBundleReportFile(tempDir);
const io = makeIo();
await expect(
runKtxIngest({ command: 'status', projectDir, runId: 'report-1', reportFile, outputMode: 'json' }, io.io),
).resolves.toBe(0);
const parsed = JSON.parse(io.stdout());
expect(parsed).toMatchObject({
id: 'report-1',
runId: 'run-1',
jobId: 'job-1',
connectionId: 'warehouse',
sourceKey: 'metabase',
});
expect(io.stderr()).toBe('');
});
it('routes interactive report-file replay through the stored TUI renderer', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const reportFile = await writeBundleReportFile(tempDir);
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
const renderStoredMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => true);
await expect(
runKtxIngest(
{
command: 'replay',
projectDir,
runId: 'run-1',
reportFile,
outputMode: 'viz',
},
io.io,
{ renderStoredMemoryFlow },
),
).resolves.toBe(0);
expect(renderStoredMemoryFlow).toHaveBeenCalledTimes(1);
expect(renderStoredMemoryFlow.mock.calls[0]?.[0]).toMatchObject({
runId: 'run-1',
reportId: 'report-1',
connectionId: 'warehouse',
adapter: 'metabase',
});
expect(io.stdout()).toBe('');
expect(io.stderr()).toBe('');
});
it('rejects report-file replay when the requested id does not match the report', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const reportFile = await writeBundleReportFile(tempDir);
const io = makeIo();
await expect(
runKtxIngest({ command: 'replay', projectDir, runId: 'unrelated-id', reportFile, outputMode: 'plain' }, io.io),
).resolves.toBe(1);
expect(io.stderr()).toContain(
`Report file ${reportFile} does not match ingest replay id "unrelated-id"; expected one of report-1, run-1, job-1`,
);
expect(io.stdout()).toBe('');
});
it('renders memory-flow snapshot for status --viz when stdout is interactive', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('viz-run-1'));
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{ command: 'status', projectDir, runId: 'viz-run-1', outputMode: 'viz', inputMode: 'disabled' },
io.io,
),
).resolves.toBe(0);
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
expect(io.stdout()).toContain('SOURCE');
expect(io.stdout()).toContain('CHUNKS');
expect(io.stdout()).toContain('WORKUNITS');
expect(io.stdout()).toContain('Saved 2 memories from 2 raw files');
expect(io.stderr()).toBe('');
});
it('uses the TUI renderer for stored status --viz when stdin and stdout are interactive', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('tui-viz-run'));
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
const renderStoredMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => true);
await expect(
runKtxIngest(
{
command: 'status',
projectDir,
runId: 'tui-viz-run',
outputMode: 'viz',
},
io.io,
{ renderStoredMemoryFlow },
),
).resolves.toBe(0);
expect(renderStoredMemoryFlow).toHaveBeenCalledTimes(1);
expect(renderStoredMemoryFlow.mock.calls[0]?.[0]).toMatchObject({
runId: 'run-tui-viz-run',
connectionId: 'warehouse',
adapter: 'fake',
});
expect(io.stdout()).toBe('');
expect(io.stderr()).toBe('');
});
it('falls back to the text renderer when TUI declines stored status --viz', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('tui-fallback-run'));
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120, keypresses: [{ name: 'q' }] });
const renderStoredMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => false);
await expect(
runKtxIngest(
{
command: 'status',
projectDir,
runId: 'tui-fallback-run',
outputMode: 'viz',
},
io.io,
{ renderStoredMemoryFlow },
),
).resolves.toBe(0);
expect(renderStoredMemoryFlow).toHaveBeenCalledTimes(1);
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
});
it('does not use TUI for stored --viz when input is disabled', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('tui-no-input-run'));
const io = makeIo({ isTTY: true, stdinIsTTY: true, columns: 120 });
const renderStoredMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => true);
await expect(
runKtxIngest(
{
command: 'replay',
projectDir,
runId: 'tui-no-input-run',
outputMode: 'viz',
inputMode: 'disabled',
},
io.io,
{ renderStoredMemoryFlow },
),
).resolves.toBe(0);
expect(renderStoredMemoryFlow).not.toHaveBeenCalled();
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
});
it('falls back to plain status for stored --viz when stdin raw mode is unavailable', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('raw-missing-stored-viz-run'));
const io = makeIo({ isTTY: true, stdinIsTTY: true, rawMode: false, columns: 120 });
const renderStoredMemoryFlow = vi.fn(async (_input: MemoryFlowReplayInput, _io: unknown) => true);
await expect(
runKtxIngest(
{
command: 'replay',
projectDir,
runId: 'raw-missing-stored-viz-run',
outputMode: 'viz',
},
io.io,
{ renderStoredMemoryFlow },
),
).resolves.toBe(0);
expect(renderStoredMemoryFlow).not.toHaveBeenCalled();
expect(io.stdout()).toContain('Run: run-raw-missing-stored-viz-run');
expect(io.stdout()).toContain('Job: raw-missing-stored-viz-run');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(
'Visualization requested but stdin raw mode is unavailable; printing plain output.',
);
});
it('keeps stored --viz snapshot-only when input is disabled', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('no-input-viz-run'));
const io = makeIo({ isTTY: true, columns: 120 });
await expect(
runKtxIngest(
{
command: 'replay',
projectDir,
runId: 'no-input-viz-run',
outputMode: 'viz',
inputMode: 'disabled',
},
io.io,
),
).resolves.toBe(0);
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
expect(io.stdout()).not.toContain('\u001b[2J\u001b[H');
expect(io.stderr()).toBe('');
});
it('keeps disabled-input stored --viz snapshot output even when stdin raw mode is unavailable', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('disabled-raw-missing-viz-run'));
const io = makeIo({ isTTY: true, stdinIsTTY: true, rawMode: false, columns: 120 });
await expect(
runKtxIngest(
{
command: 'replay',
projectDir,
runId: 'disabled-raw-missing-viz-run',
outputMode: 'viz',
inputMode: 'disabled',
},
io.io,
),
).resolves.toBe(0);
expect(io.stdout()).toContain('KTX memory flow warehouse/fake done');
expect(io.stdout()).not.toContain('\u001b[2J\u001b[H');
expect(io.stderr()).toBe('');
});
it('degrades stored --viz snapshots to plain status when stdout is redirected even when input is disabled', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('redirected-no-input-viz-run'));
const io = makeIo({ isTTY: false });
await expect(
runKtxIngest(
{
command: 'replay',
projectDir,
runId: 'redirected-no-input-viz-run',
outputMode: 'viz',
inputMode: 'disabled',
},
io.io,
),
).resolves.toBe(0);
expect(io.stdout()).toContain('Run: run-redirected-no-input-viz-run');
expect(io.stdout()).toContain('Job: redirected-no-input-viz-run');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(
'Visualization requested but stdout is not an interactive terminal; printing plain output.',
);
});
it('degrades ingest replay --viz to plain status when TERM is dumb', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('dumb-terminal-viz-run'));
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{ command: 'replay', projectDir, runId: 'dumb-terminal-viz-run', outputMode: 'viz' },
io.io,
{ env: { ...process.env, TERM: 'dumb' } },
),
).resolves.toBe(0);
expect(io.stdout()).toContain('Run: run-dumb-terminal-viz-run');
expect(io.stdout()).toContain('Job: dumb-terminal-viz-run');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(
'Visualization requested but TERM=dumb does not support the visual renderer; printing plain output.',
);
});
it('falls back to plain status for --viz when stdout is not interactive', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('viz-run-2'));
const io = makeIo({ isTTY: false });
await expect(
runKtxIngest({ command: 'replay', projectDir, runId: 'viz-run-2', outputMode: 'viz' }, io.io),
).resolves.toBe(0);
expect(io.stdout()).toContain('Run: run-viz-run-2');
expect(io.stdout()).toContain('Job: viz-run-2');
expect(io.stdout()).not.toContain('KTX memory flow');
expect(io.stderr()).toContain(
'Visualization requested but stdout is not an interactive terminal; printing plain output.',
);
});
it('prints JSON for status --json', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
await persistLocalBundleReport(projectDir, localFakeBundleReport('json-run-1'));
const io = makeIo();
await expect(
runKtxIngest({ command: 'status', projectDir, runId: 'json-run-1', outputMode: 'json' }, io.io),
).resolves.toBe(0);
expect(JSON.parse(io.stdout())).toMatchObject({
runId: 'run-json-run-1',
jobId: 'json-run-1',
sourceKey: 'fake',
connectionId: 'warehouse',
});
expect(io.stderr()).toBe('');
});
});

View file

@ -0,0 +1,710 @@
import { EventEmitter } from 'node:events';
import { access, mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { AgentRunnerService, type RunLoopParams } from '@ktx/context/agent';
import {
LocalLookerRuntimeStore,
LocalMetabaseSourceStateReader,
MetabaseSourceAdapter,
getLocalIngestStatus,
type ChunkResult,
type FetchContext,
type IngestReportSnapshot,
type LocalIngestResult,
type LocalMetabaseFanoutProgress,
type LookerMappingClient,
type LookerRuntimeClient,
type LookerTableIdentifierParser,
type MemoryFlowEventSink,
type MemoryFlowReplayInput,
type MetabaseCard,
type MetabaseCardSummary,
type MetabaseClientFactory,
type MetabaseRuntimeClient,
type RunLocalIngestOptions,
type SourceAdapter,
type SqliteBundleIngestStore,
} from '@ktx/context/ingest';
import { ktxLocalStateDbPath, loadKtxProject } from '@ktx/context/project';
import { expect, vi } from 'vitest';
import { type KtxIngestArgs, runKtxIngest } from './ingest.js';
export function makeIo(
options: {
isTTY?: boolean;
stdinIsTTY?: boolean;
columns?: number;
rawMode?: boolean;
keypresses?: { name?: string; ctrl?: boolean }[];
} = {},
) {
let stdout = '';
let stderr = '';
type TestKey = { name?: string; ctrl?: boolean };
class TestStdin extends EventEmitter {
isTTY = options.stdinIsTTY ?? false;
isRaw = false;
setRawMode =
options.rawMode === false
? undefined
: (value: boolean): void => {
this.isRaw = value;
};
resume(): void {
return undefined;
}
pause(): void {
return undefined;
}
override on(eventName: string | symbol, listener: (chunk: string, key: TestKey) => void): this {
const result = super.on(eventName, listener);
if (eventName === 'keypress') {
for (const key of options.keypresses ?? []) {
queueMicrotask(() => listener('', key));
}
}
return result;
}
override off(eventName: string | symbol, listener: (chunk: string, key: TestKey) => void): this {
return super.off(eventName, listener);
}
override removeListener(eventName: string | symbol, listener: (chunk: string, key: TestKey) => void): this {
return super.removeListener(eventName, listener);
}
}
const stdin = new TestStdin();
return {
io: {
stdin,
stdout: {
isTTY: options.isTTY,
columns: options.columns,
write: (chunk: string) => {
stdout += chunk;
},
},
stderr: {
write: (chunk: string) => {
stderr += chunk;
},
},
},
stdout: () => stdout,
stderr: () => stderr,
};
}
export async function writeWarehouseConfig(projectDir: string): Promise<void> {
await mkdir(projectDir, { recursive: true });
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: warehouse',
'connections:',
' prod-metabase:',
' driver: metabase',
' warehouse_a:',
' driver: postgres',
'ingest:',
' adapters:',
' - fake',
'',
].join('\n'),
'utf-8',
);
}
export async function writeMetabaseConfig(projectDir: string): Promise<void> {
await mkdir(projectDir, { recursive: true });
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
'ingest:',
' adapters:',
' - metabase',
' embeddings:',
' backend: deterministic',
'',
].join('\n'),
'utf-8',
);
}
export function bundleReportSnapshot(): IngestReportSnapshot {
return {
id: 'report-1',
runId: 'run-1',
jobId: 'job-1',
connectionId: 'warehouse',
sourceKey: 'metabase',
createdAt: '2026-04-30T12:00:00.000Z',
body: {
syncId: 'sync-1',
diffSummary: { added: 2, modified: 0, deleted: 0, unchanged: 0 },
commitSha: 'abc12345',
workUnits: [
{
unitKey: 'cards',
rawFiles: ['cards/1.json', 'cards/2.json'],
status: 'success',
actions: [
{ target: 'wiki', type: 'created', key: 'knowledge/global/revenue.md', detail: 'Revenue overview' },
{ target: 'sl', type: 'updated', key: 'warehouse.orders', detail: 'Added order amount measure' },
],
touchedSlSources: [{ connectionId: 'warehouse', sourceName: 'warehouse.orders' }],
},
],
failedWorkUnits: [],
reconciliationSkipped: false,
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
evictionInputs: [],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [
{
rawPath: 'cards/1.json',
artifactKind: 'wiki',
artifactKey: 'knowledge/global/revenue.md',
actionType: 'wiki_written',
},
{
rawPath: 'cards/2.json',
artifactKind: 'sl',
artifactKey: 'warehouse.orders',
actionType: 'measure_added',
},
],
toolTranscripts: [
{
unitKey: 'cards',
path: 'tool-transcripts/cards.jsonl',
toolCallCount: 4,
errorCount: 0,
toolNames: ['ingest_triage', 'knowledge_capture', 'sl_capture'],
},
],
},
};
}
export function completedLocalBundleRun(input: RunLocalIngestOptions, jobId: string): LocalIngestResult {
const nextReport = localFakeBundleReport(jobId, {
id: 'report-live-1',
runId: 'run-live-1',
connectionId: input.connectionId,
sourceKey: input.adapter,
});
return {
result: {
jobId,
runId: nextReport.runId,
syncId: nextReport.body.syncId,
diffSummary: nextReport.body.diffSummary,
workUnitCount: nextReport.body.workUnits.length,
failedWorkUnits: nextReport.body.failedWorkUnits,
artifactsWritten: nextReport.body.provenanceRows.length,
commitSha: nextReport.body.commitSha,
},
report: nextReport,
};
}
export class CliLookerSlWritingAgentRunner extends AgentRunnerService {
override runLoop = vi.fn(async (params: RunLoopParams) => {
if (
params.telemetryTags?.operationName === 'ingest-bundle-wu' &&
params.telemetryTags?.unitKey === 'looker-explore-ecommerce-orders'
) {
const slWrite = params.toolSet.sl_write_source;
if (!slWrite?.execute) {
throw new Error('sl_write_source tool was not available to the Looker WorkUnit');
}
const result = await slWrite.execute(
{
connectionId: 'prod-warehouse',
sourceName: 'looker__ecommerce__orders',
source: {
name: 'looker__ecommerce__orders',
table: 'public.orders',
grain: ['id'],
columns: [
{ name: 'id', type: 'number' },
{ name: 'revenue', type: 'number' },
],
measures: [{ name: 'total_revenue', expr: 'sum(revenue)' }],
},
},
{ toolCallId: 'cli-looker-sl-write', messages: [] },
);
if (!result.structured.success) {
throw new Error(result.markdown);
}
}
return { stopReason: 'natural' as const };
});
constructor() {
super({ llmProvider: { getModel: () => ({}) as never } as never });
}
}
export class CliMetabaseAgentRunner extends AgentRunnerService {
override runLoop = vi.fn(async () => ({ stopReason: 'natural' as const }));
constructor() {
super({ llmProvider: { getModel: () => ({}) as never } as never });
}
}
export class CliMetabaseSourceAdapter implements SourceAdapter {
readonly source = 'metabase';
readonly skillNames: string[] = [];
readonly fetchCalls: Array<{ metabaseConnectionId: string; metabaseDatabaseId: number; connectionId: string }> = [];
private readonly databaseByStagedDir = new Map<string, number>();
detect(): Promise<boolean> {
return Promise.resolve(true);
}
async fetch(pullConfig: unknown, stagedDir: string, ctx: FetchContext): Promise<void> {
const config = pullConfig as { metabaseConnectionId: string; metabaseDatabaseId: number };
this.fetchCalls.push({
metabaseConnectionId: config.metabaseConnectionId,
metabaseDatabaseId: config.metabaseDatabaseId,
connectionId: ctx.connectionId,
});
this.databaseByStagedDir.set(stagedDir, config.metabaseDatabaseId);
await mkdir(join(stagedDir, 'cards'), { recursive: true });
await mkdir(join(stagedDir, 'databases'), { recursive: true });
await writeFile(
join(stagedDir, 'cards', `${config.metabaseDatabaseId}.json`),
JSON.stringify({ connectionId: ctx.connectionId, databaseId: config.metabaseDatabaseId }),
'utf-8',
);
await writeFile(
join(stagedDir, 'databases', `${config.metabaseDatabaseId}.json`),
JSON.stringify({ metabaseConnectionId: config.metabaseConnectionId }),
'utf-8',
);
}
async chunk(stagedDir: string): Promise<ChunkResult> {
const databaseId = this.databaseByStagedDir.get(stagedDir);
if (!databaseId) {
throw new Error(`Missing Metabase database id for staged dir ${stagedDir}`);
}
return {
workUnits: [
{
unitKey: `metabase-db-${databaseId}`,
rawFiles: [`cards/${databaseId}.json`],
peerFileIndex: [],
dependencyPaths: [`databases/${databaseId}.json`],
},
],
};
}
}
const SYNC_MODE_METABASE_CARDS: MetabaseCard[] = [
{
id: 101,
name: 'Collection 12 Revenue',
description: null,
type: 'question',
query_type: 'native',
database_id: 1,
collection_id: 12,
archived: false,
result_metadata: [],
dataset_query: { type: 'native', database: 1, native: { query: 'select 101 as id' } },
parameters: [],
dashboard_count: 0,
},
{
id: 102,
name: 'Collection 12 Margin',
description: null,
type: 'question',
query_type: 'native',
database_id: 1,
collection_id: 12,
archived: false,
result_metadata: [],
dataset_query: { type: 'native', database: 1, native: { query: 'select 102 as id' } },
parameters: [],
dashboard_count: 0,
},
{
id: 103,
name: 'Collection 13 Pipeline',
description: null,
type: 'question',
query_type: 'native',
database_id: 1,
collection_id: 13,
archived: false,
result_metadata: [],
dataset_query: { type: 'native', database: 1, native: { query: 'select 103 as id' } },
parameters: [],
dashboard_count: 0,
},
];
function metabaseCardSummary(card: MetabaseCard): MetabaseCardSummary {
return {
id: card.id,
name: card.name,
archived: card.archived,
database_id: card.database_id,
collection_id: card.collection_id,
};
}
function createSyncModeMetabaseClient(): MetabaseRuntimeClient {
const cardsById = new Map(SYNC_MODE_METABASE_CARDS.map((card) => [card.id, card]));
return {
testConnection: async () => ({ success: true }),
getCurrentUser: async () => ({ id: 1, email: 'local@example.test' }),
getDatabases: async () => [{ id: 1, name: 'Warehouse A', engine: 'postgres' }],
getDatabase: async (id) => ({ id, name: 'Warehouse A', engine: 'postgres' }),
getCollectionTree: async () => [
{ id: 12, name: 'Selected Collection', parent_id: 'root', children: [] },
{ id: 13, name: 'Other Collection', parent_id: 'root', children: [] },
],
getCollection: async (id) => ({
id,
name: id === 12 ? 'Selected Collection' : 'Other Collection',
parent_id: 'root',
children: [],
}),
getCollectionItems: async (collectionId) =>
SYNC_MODE_METABASE_CARDS.filter((card) => card.collection_id === collectionId).map((card) => ({
id: card.id,
model: 'card',
name: card.name,
collection_id: card.collection_id,
database_id: card.database_id,
})),
getCard: async (id) => {
const card = cardsById.get(id);
if (!card) {
throw new Error(`unexpected card ${id}`);
}
return card;
},
getAllCards: async () => SYNC_MODE_METABASE_CARDS.map(metabaseCardSummary),
convertMbqlToNative: async () => ({ query: 'select 1' }),
getNativeSql: (card) => card.dataset_query?.native?.query ?? null,
getTemplateTags: () => ({}),
getCardSql: async (card) => card.dataset_query?.native?.query ?? null,
getResolvedSql: async (card) => ({
resolvedSql: card.dataset_query?.native?.query ?? `select ${card.id} as id`,
templateTags: [],
resolutionStatus: 'resolved',
}),
cleanup: async () => undefined,
};
}
export class StaticMetabaseClientFactory implements MetabaseClientFactory {
constructor(private readonly client: MetabaseRuntimeClient) {}
createClient(): MetabaseRuntimeClient {
return this.client;
}
}
type SyncModeCase = {
name: string;
syncMode: 'ALL' | 'ONLY' | 'EXCEPT';
selections: Array<{ selectionType: 'collection' | 'item'; metabaseObjectId: number }>;
expectedRawFiles: string[];
expectedWorkUnitKeys: string[];
};
export async function runPublicMetabaseSyncModeCase(tempDir: string, input: SyncModeCase): Promise<void> {
const projectDir = join(tempDir, `metabase-sync-mode-${input.name}`);
await mkdir(projectDir, { recursive: true });
await writeFile(
join(projectDir, 'ktx.yaml'),
[
`project: metabase-sync-mode-${input.name}`,
'connections:',
' prod-metabase:',
' driver: metabase',
' api_url: https://metabase.example.test',
' api_key: literal-test-key',
' warehouse_a:',
' driver: postgres',
' url: postgresql://readonly@db.example.test/warehouse_a',
'ingest:',
' adapters:',
' - metabase',
' embeddings:',
' backend: deterministic',
'',
].join('\n'),
'utf-8',
);
const project = await loadKtxProject({ projectDir });
const store = new LocalMetabaseSourceStateReader({ dbPath: ktxLocalStateDbPath(project) });
await store.replaceSourceState({
connectionId: 'prod-metabase',
syncMode: input.syncMode,
defaultTagNames: ['sync-mode-smoke'],
selections: input.selections,
mappings: [
{
metabaseDatabaseId: 1,
metabaseDatabaseName: 'Warehouse A',
metabaseEngine: 'postgres',
metabaseHost: 'db.example.test',
metabaseDbName: 'warehouse_a',
targetConnectionId: 'warehouse_a',
syncEnabled: true,
source: 'refresh',
},
],
});
const adapter = new MetabaseSourceAdapter({
clientFactory: new StaticMetabaseClientFactory(createSyncModeMetabaseClient()),
sourceStateReader: store,
});
const jobId = `metabase-sync-mode-${input.name}-child`;
const io = makeIo();
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'prod-metabase',
adapter: 'metabase',
outputMode: 'plain',
},
io.io,
{
createAdapters: vi.fn(() => [adapter]),
jobIdFactory: () => jobId,
localIngestOptions: {
agentRunner: new CliMetabaseAgentRunner(),
},
},
),
).resolves.toBe(0);
expect(io.stderr()).toBe('');
expect(io.stdout()).toContain('Metabase fan-out: all_succeeded');
expect(io.stdout()).toContain(`target=warehouse_a database=1 status=done job=${jobId}`);
const report = await getLocalIngestStatus(project, jobId);
expect(report).not.toBeNull();
expect(report?.body.workUnits.map((wu) => wu.unitKey).sort()).toEqual(input.expectedWorkUnitKeys);
expect(report?.body.workUnits.flatMap((wu) => wu.rawFiles).sort()).toEqual(input.expectedRawFiles);
}
type CliLookerRuntimeClient = LookerRuntimeClient &
Pick<LookerMappingClient, 'listLookerConnections'> & {
cleanup: ReturnType<typeof vi.fn<NonNullable<LookerRuntimeClient['cleanup']>>>;
};
export function makeCliLookerRuntimeClient(): CliLookerRuntimeClient {
const lookerModels = {
source: 'looker',
fetchedAt: '2026-05-05T00:00:00.000Z',
models: [{ name: 'ecommerce', label: 'Ecommerce', explores: [{ name: 'orders', label: 'Orders' }] }],
};
const lookerExplore = {
source: 'looker',
modelName: 'ecommerce',
exploreName: 'orders',
label: 'Orders',
description: null,
connectionName: 'analytics',
viewName: 'orders',
rawSqlTableName: 'public.orders',
fields: {
dimensions: [{ name: 'orders.id', label: null, type: null, sql: null, description: null }],
measures: [{ name: 'orders.revenue', label: null, type: null, sql: null, description: null }],
},
joins: [
{
name: 'users',
type: 'left_outer',
relationship: 'many_to_one',
rawSqlTableName: 'public.users',
sqlOn: '${orders.user_id} = ${users.id}',
from: null,
targetTable: null,
},
],
targetWarehouseConnectionId: null,
targetTable: null,
};
return {
listLookerConnections: vi.fn().mockResolvedValue([
{
name: 'analytics',
host: 'db.example.test',
database: 'analytics',
schema: null,
dialect: 'postgres',
},
]),
listDashboards: vi.fn().mockResolvedValue([{ id: '10', updatedAt: '2026-05-05T08:00:00.000Z' }]),
getDashboard: vi.fn().mockResolvedValue({
lookerId: '10',
title: 'Revenue Overview',
description: 'Revenue dashboard',
folderId: '7',
ownerId: '3',
updatedAt: '2026-05-05T08:00:00.000Z',
tiles: [{ id: '100', title: 'Revenue', lookId: null, query: { model: 'ecommerce', view: 'orders' } }],
}),
listLooks: vi.fn().mockResolvedValue([{ id: '20', updatedAt: '2026-05-05T08:10:00.000Z' }]),
getLook: vi.fn().mockResolvedValue({
lookerId: '20',
title: 'Revenue Look',
description: null,
folderId: '7',
ownerId: '3',
updatedAt: '2026-05-05T08:10:00.000Z',
query: { model: 'ecommerce', view: 'orders', fields: ['orders.revenue'] },
}),
listFolders: vi.fn().mockResolvedValue({ folders: [{ id: '7', name: 'Shared', parentId: null, path: ['Shared'] }] }),
listUsers: vi.fn().mockResolvedValue([{ id: '3', displayName: 'Ada Lovelace', email: 'ada@example.test' }]),
listGroups: vi.fn().mockResolvedValue([{ id: '4', name: 'Analysts' }]),
listLookmlModels: vi.fn().mockResolvedValue(lookerModels),
getExplore: vi.fn().mockResolvedValue(lookerExplore),
getSignals: vi.fn().mockResolvedValue({
dashboardUsage: [{ contentId: '10', queryCount30d: 12, uniqueUsers30d: 3, lastRunAt: null, topUsers: ['3'] }],
lookUsage: [{ contentId: '20', queryCount30d: 4, uniqueUsers30d: 2, lastRunAt: null, topUsers: ['3'] }],
scheduledPlans: [
{ contentId: '10', contentType: 'dashboard', isScheduled: true, scheduleCount: 1, recipientCount: 4 },
],
favorites: [{ contentId: '10', contentType: 'dashboard', favoriteCount: 2 }],
}),
cleanup: vi.fn<NonNullable<LookerRuntimeClient['cleanup']>>().mockResolvedValue(undefined),
};
}
interface TestLookerTableIdentifierParser extends LookerTableIdentifierParser {
parse: ReturnType<typeof vi.fn<LookerTableIdentifierParser['parse']>>;
}
export function makeCliLookerParser(): TestLookerTableIdentifierParser {
return {
parse: vi.fn<LookerTableIdentifierParser['parse']>().mockResolvedValue({
'ecommerce.orders': {
ok: true,
catalog: null,
schema: 'public',
name: 'orders',
canonical_table: 'public.orders',
},
'ecommerce.orders.users': {
ok: true,
catalog: null,
schema: 'public',
name: 'users',
canonical_table: 'public.users',
},
}),
};
}
export function localFakeBundleReport(jobId: string, overrides: Partial<IngestReportSnapshot> = {}): IngestReportSnapshot {
const report = bundleReportSnapshot();
return {
...report,
id: `report-${jobId}`,
runId: `run-${jobId}`,
jobId,
connectionId: 'warehouse',
sourceKey: 'fake',
...overrides,
body: {
...report.body,
syncId: 'sync-live-1',
...(overrides.body ?? {}),
},
};
}
export async function localBundleStore(projectDir: string, ids: [string, string]): Promise<SqliteBundleIngestStore> {
const { SqliteBundleIngestStore } = await import('@ktx/context/ingest');
const project = await loadKtxProject({ projectDir });
return new SqliteBundleIngestStore({
dbPath: ktxLocalStateDbPath(project),
idFactory: (() => {
let index = 0;
return () => ids[index++] ?? `generated-${index}`;
})(),
});
}
export async function persistLocalBundleReport(projectDir: string, report = bundleReportSnapshot()): Promise<void> {
const store = await localBundleStore(projectDir, [report.runId, report.id]);
const run = await store.create({
jobId: report.jobId,
connectionId: report.connectionId,
sourceKey: report.sourceKey,
syncId: report.body.syncId,
trigger: 'manual_resync',
});
await store.markCompleted(run.id, report.body.diffSummary);
await store.create({
runId: run.id,
jobId: report.jobId,
connectionId: report.connectionId,
sourceKey: report.sourceKey,
body: report.body,
});
}
export async function writeBundleReportFile(tempDir: string, report = bundleReportSnapshot()): Promise<string> {
const reportFile = join(tempDir, 'bundle-report.json');
await writeFile(reportFile, `${JSON.stringify(report, null, 2)}\n`, 'utf-8');
return reportFile;
}
export function emitLiveLocalMemoryFlow(memoryFlow: MemoryFlowEventSink | undefined): void {
memoryFlow?.emit({ type: 'source_acquired', adapter: 'fake', trigger: 'manual_resync', fileCount: 1 });
memoryFlow?.update({ syncId: 'sync-live-1' });
memoryFlow?.emit({ type: 'raw_snapshot_written', syncId: 'sync-live-1', rawFileCount: 1 });
memoryFlow?.emit({ type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 });
memoryFlow?.update({
plannedWorkUnits: [
{
unitKey: 'fake-orders',
rawFiles: ['orders/orders.json'],
peerFileCount: 0,
dependencyCount: 0,
},
],
});
memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 });
memoryFlow?.emit({ type: 'report_created', runId: 'live-viz-run' });
memoryFlow?.finish('done');
}

File diff suppressed because it is too large Load diff

View file

@ -124,7 +124,8 @@
"search:pglite-spike": "node scripts/pglite-hybrid-search-spike.mjs",
"search:pglite-owner-prototype": "node scripts/pglite-owner-process-prototype.mjs",
"search:pglite-sl-prototype": "node scripts/pglite-sl-search-prototype.mjs",
"test": "vitest run",
"test": "vitest run --exclude src/scan/relationship-benchmarks.test.ts --exclude src/scan/local-scan.test.ts --exclude src/mcp/local-project-ports.test.ts --exclude src/ingest/local-stage-ingest.test.ts --exclude src/sl/pglite-sl-search-prototype.test.ts --exclude src/core/git.service.test.ts --exclude src/ingest/local-adapters.test.ts --exclude src/ingest/local-bundle-ingest.test.ts --exclude src/ingest/local-metabase-ingest.test.ts --exclude src/sl/local-sl.test.ts --exclude src/search/pglite-owner-process.test.ts --exclude src/scan/local-enrichment-artifacts.test.ts --exclude src/search/pglite-spike.test.ts --exclude src/wiki/local-knowledge.test.ts --exclude src/sl/local-query.test.ts --exclude src/scan/relationship-review-decisions.test.ts --exclude src/scan/relationship-profiling.test.ts",
"test:slow": "vitest run src/scan/local-scan.test.ts src/mcp/local-project-ports.test.ts src/ingest/local-stage-ingest.test.ts src/sl/pglite-sl-search-prototype.test.ts src/core/git.service.test.ts src/ingest/local-adapters.test.ts src/ingest/local-bundle-ingest.test.ts src/ingest/local-metabase-ingest.test.ts src/sl/local-sl.test.ts src/search/pglite-owner-process.test.ts src/scan/local-enrichment-artifacts.test.ts src/search/pglite-spike.test.ts src/wiki/local-knowledge.test.ts src/sl/local-query.test.ts src/scan/relationship-review-decisions.test.ts src/scan/relationship-profiling.test.ts --testTimeout 30000",
"type-check": "tsc -p tsconfig.json --noEmit"
},
"dependencies": {

View file

@ -143,6 +143,8 @@ describe('standalone example docs', () => {
assert.match(packageJson.scripts.smoke, /src\/standalone-smoke\.test\.ts/);
assert.match(packageJson.scripts.smoke, /src\/example-smoke\.test\.ts/);
assert.match(packageJson.scripts.test, /--exclude src\/standalone-smoke\.test\.ts/);
assert.match(packageJson.scripts.test, /--exclude src\/example-smoke\.test\.ts/);
});
it('documents daemon HTTP database, source generation, LookML, embedding, and code execution support', async () => {

View file

@ -27,6 +27,8 @@ describe('standalone KTX CI workflow', () => {
'cache-dependency-path: "pnpm-lock.yaml"',
'pnpm install --frozen-lockfile',
'pnpm run check',
'pnpm run test:slow',
'pnpm run smoke',
'actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405',
'python-version: "3.13"',
'astral-sh/setup-uv@eac588ad8def6316056a12d4907a9d4d84ff7a3b',

View file

@ -0,0 +1,73 @@
import assert from 'node:assert/strict';
import { readFile } from 'node:fs/promises';
import { describe, it } from 'node:test';
async function readJson(path) {
return JSON.parse(await readFile(new URL(path, import.meta.url), 'utf8'));
}
function assertScriptContainsAll(script, expected) {
for (const item of expected) {
assert.match(script, new RegExp(item.replaceAll('/', '\\/').replaceAll('.', '\\.')));
}
}
describe('test tiering', () => {
const cliSlowTests = [
'src/setup-databases.test.ts',
'src/scan.test.ts',
'src/commands/connection-metabase-setup.test.ts',
'src/setup-models.test.ts',
'src/setup-sources.test.ts',
'src/setup.test.ts',
'src/connection.test.ts',
'src/setup-embeddings.test.ts',
'src/ingest.test.ts',
'src/commands/connection-mapping.test.ts',
'src/ingest-viz.test.ts',
'src/demo.test.ts',
'src/setup-project.test.ts',
'src/sl.test.ts',
'src/local-scan-connectors.test.ts',
'src/commands/connection-notion.test.ts',
];
const contextSlowTests = [
'src/scan/local-scan.test.ts',
'src/mcp/local-project-ports.test.ts',
'src/ingest/local-stage-ingest.test.ts',
'src/sl/pglite-sl-search-prototype.test.ts',
'src/core/git.service.test.ts',
'src/ingest/local-adapters.test.ts',
'src/ingest/local-bundle-ingest.test.ts',
'src/ingest/local-metabase-ingest.test.ts',
'src/sl/local-sl.test.ts',
'src/search/pglite-owner-process.test.ts',
'src/scan/local-enrichment-artifacts.test.ts',
'src/search/pglite-spike.test.ts',
'src/wiki/local-knowledge.test.ts',
'src/sl/local-query.test.ts',
'src/scan/relationship-review-decisions.test.ts',
'src/scan/relationship-profiling.test.ts',
];
it('keeps slow package tests out of default local package test scripts', async () => {
const cliPackage = await readJson('../packages/cli/package.json');
const contextPackage = await readJson('../packages/context/package.json');
assertScriptContainsAll(cliPackage.scripts.test, cliSlowTests.map((file) => `--exclude ${file}`));
assertScriptContainsAll(contextPackage.scripts.test, contextSlowTests.map((file) => `--exclude ${file}`));
assert.match(contextPackage.scripts.test, /--exclude src\/scan\/relationship-benchmarks\.test\.ts/);
});
it('provides explicit slow package test scripts for CI', async () => {
const rootPackage = await readJson('../package.json');
const cliPackage = await readJson('../packages/cli/package.json');
const contextPackage = await readJson('../packages/context/package.json');
assert.equal(rootPackage.scripts['test:slow'], 'pnpm --filter @ktx/context run test:slow && pnpm --filter @ktx/cli run test:slow');
assertScriptContainsAll(cliPackage.scripts['test:slow'], cliSlowTests);
assertScriptContainsAll(contextPackage.scripts['test:slow'], contextSlowTests);
assert.doesNotMatch(contextPackage.scripts['test:slow'], /relationship-benchmarks\.test\.ts/);
});
});