Merge branch 'main' into review-pasted-findings-v3

This commit is contained in:
Andrey Avtomonov 2026-05-30 17:54:33 +02:00 committed by GitHub
commit 90a7e3467b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 621 additions and 1729 deletions

View file

@ -6,6 +6,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { initKtxProject, type KtxLocalProject } from '../../../src/context/project/project.js';
import { LocalMetabaseDiscoveryCache } from '../../../src/context/ingest/adapters/metabase/local-source-state-store.js';
import { getLocalIngestStatus, runLocalMetabaseIngest } from '../../../src/context/ingest/local-ingest.js';
import { ingestReportOutcome } from '../../../src/context/ingest/reports.js';
import type { ChunkResult, FetchContext, SourceAdapter } from '../../../src/context/ingest/types.js';
class TestAgentRunner implements AgentRunnerPort {
@ -202,6 +203,24 @@ describe('runLocalMetabaseIngest', () => {
expect(result.children[1]?.report.body.failedWorkUnits).toEqual(['metabase-db-2']);
});
it('keeps a child that saved memory out of all_failed when another child fails', async () => {
await seedMetabaseState();
const agentRunner = new TestAgentRunner();
const ids = ['metabase-child-1', 'metabase-child-2'];
const result = await runLocalMetabaseIngest({
project,
adapters: [new FakeMetabaseSourceAdapter()],
metabaseConnectionId: 'prod-metabase',
agentRunner,
jobIdFactory: () => ids.shift() ?? 'metabase-child-extra',
});
expect(result.status).toBe('partial_failure');
expect(ingestReportOutcome(result.children[0].report)).toBe('done');
expect(ingestReportOutcome(result.children[1].report)).toBe('error');
});
it('captures fetch-time child failures and continues later mappings', async () => {
await seedMetabaseState();
project.config.connections.warehouse_c = { driver: 'postgres', url: 'postgres://localhost/c' };

View file

@ -166,7 +166,7 @@ describe('memory-flow event mapping', () => {
runId: 'run-1',
connectionId: 'warehouse',
adapter: 'lookml',
status: 'error',
status: 'done',
sourceDir: null,
syncId: 'sync-2',
reportId: 'report-1',
@ -308,7 +308,7 @@ describe('memory-flow event mapping', () => {
sourceReportPath: 'report-1',
fallbackReason: null,
});
expect(replay.status).toBe('error');
expect(replay.status).toBe('done');
expect(replay.reportId).toBe('report-1');
expect(replay.reportPath).toBe('report-1');
expect(replay.events[0]).toMatchObject({ type: 'source_acquired', emittedAt: '2026-05-01T10:00:00.000Z' });

View file

@ -0,0 +1,71 @@
import { describe, expect, it } from 'vitest';
import { ingestReportOutcome } from '../../../src/context/ingest/reports.js';
import type { IngestReportSnapshot } from '../../../src/context/ingest/reports.js';
function report(body: Partial<IngestReportSnapshot['body']>): IngestReportSnapshot {
return {
id: 'r',
runId: 'run',
jobId: 'job',
connectionId: 'warehouse',
sourceKey: 'metabase',
createdAt: '2026-05-29T00:00:00.000Z',
body: {
syncId: 'sync',
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
commitSha: null,
workUnits: [],
failedWorkUnits: [],
reconciliationSkipped: false,
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
evictionInputs: [],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [],
toolTranscripts: [],
...body,
},
};
}
const savingWorkUnit = {
unitKey: 'ok',
rawFiles: ['cards/1.json'],
status: 'success' as const,
actions: [{ target: 'sl' as const, type: 'updated' as const, key: 'warehouse.orders', detail: 'measure' }],
touchedSlSources: [],
};
const failedWorkUnit = {
unitKey: 'bad',
rawFiles: ['cards/2.json'],
status: 'failed' as const,
reason: 'tool write failed',
actions: [],
touchedSlSources: [],
};
describe('ingestReportOutcome', () => {
it('returns done when there are no failed work units', () => {
expect(ingestReportOutcome(report({ workUnits: [savingWorkUnit] }))).toBe('done');
});
it('returns partial when failed work units coexist with saved memory', () => {
expect(
ingestReportOutcome(report({ workUnits: [savingWorkUnit, failedWorkUnit], failedWorkUnits: ['bad'] })),
).toBe('partial');
});
it('returns error when failed work units produced no saved memory', () => {
expect(ingestReportOutcome(report({ workUnits: [failedWorkUnit], failedWorkUnits: ['bad'] }))).toBe('error');
});
it('returns error for a stage-level failure even if artifacts were recorded', () => {
expect(ingestReportOutcome(report({ status: 'failed', workUnits: [savingWorkUnit], failedWorkUnits: [] }))).toBe(
'error',
);
});
});

View file

@ -65,7 +65,7 @@
},
"limit": {
"default": 10,
"description": "Maximum wiki pages to return. Defaults to 10.",
"description": "Maximum wiki pages to return.",
"type": "integer",
"minimum": 1,
"maximum": 50
@ -307,7 +307,7 @@
{
"name": "sl_query",
"title": "Semantic Layer Query",
"description": "Execute a semantic-layer query and return rows, headers, generated SQL, and plan details. Example: sl_query({ connectionId: \"warehouse\", measures: [\"orders.order_count\"], dimensions: [{ field: \"orders.created_at\", granularity: \"month\" }] }).",
"description": "Execute a semantic-layer query and return headers, rows, and total row count, plus correctness notes (e.g. compile-only or fan-out) when relevant. The generated SQL and full query plan are omitted by default; request them with include: [\"sql\"] and/or include: [\"plan\"]. Example: sl_query({ connectionId: \"warehouse\", measures: [\"orders.order_count\"], dimensions: [{ field: \"orders.created_at\", granularity: \"month\" }], include: [\"sql\"] }).",
"inputSchema": {
"type": "object",
"properties": {
@ -403,7 +403,7 @@
},
"direction": {
"default": "asc",
"description": "Sort direction: \"asc\" or \"desc\". Defaults to \"asc\".",
"description": "Sort direction for this field.",
"type": "string",
"enum": [
"asc",
@ -418,15 +418,27 @@
},
"limit": {
"default": 1000,
"description": "Maximum rows to return. Defaults to 1000.",
"description": "Maximum rows to return.",
"type": "integer",
"minimum": 0,
"maximum": 9007199254740991
},
"include_empty": {
"default": true,
"description": "Whether to include empty dimension groups. Defaults to true.",
"description": "Whether to include empty dimension groups.",
"type": "boolean"
},
"include": {
"default": [],
"description": "Extra detail to attach to the response: \"sql\" for the generated SQL, \"plan\" for the full query plan.",
"type": "array",
"items": {
"type": "string",
"enum": [
"plan",
"sql"
]
}
}
},
"required": [
@ -443,9 +455,6 @@
"dialect": {
"type": "string"
},
"sql": {
"type": "string"
},
"headers": {
"type": "array",
"items": {
@ -462,6 +471,15 @@
"totalRows": {
"type": "number"
},
"notes": {
"type": "array",
"items": {
"type": "string"
}
},
"sql": {
"type": "string"
},
"plan": {
"type": "object",
"propertyNames": {
@ -471,7 +489,6 @@
}
},
"required": [
"sql",
"headers",
"rows",
"totalRows"
@ -1241,8 +1258,8 @@
}
},
"limit": {
"description": "Maximum refs to return. Defaults to 15.",
"default": 15,
"description": "Maximum refs to return.",
"default": 10,
"type": "integer",
"minimum": 1,
"maximum": 50
@ -1396,7 +1413,7 @@
"description": "Parser-validated read-only SQL, e.g. \"select count(*) from public.orders\"."
},
"maxRows": {
"description": "Maximum rows to return. Defaults to 1000.",
"description": "Maximum rows to return.",
"default": 1000,
"type": "integer",
"minimum": 1,

View file

@ -347,16 +347,12 @@ describe('createKtxMcpServer', () => {
content: [
{
type: 'text',
text: JSON.stringify(
{
headers: ['status', 'count'],
headerTypes: ['text', 'bigint'],
rows: [['paid', 42]],
rowCount: 1,
},
null,
2,
),
text: JSON.stringify({
headers: ['status', 'count'],
headerTypes: ['text', 'bigint'],
rows: [['paid', 42]],
rowCount: 1,
}),
},
],
structuredContent: {
@ -638,6 +634,92 @@ describe('createKtxMcpServer', () => {
);
});
it('sl_query default response omits plan and sql but keeps compile-only and fan-out notes', async () => {
const fake = makeFakeServer();
const semanticLayer: KtxSemanticLayerMcpPort = {
readSource: vi.fn(),
query: vi.fn<KtxSemanticLayerMcpPort['query']>().mockResolvedValue({
connectionId: 'warehouse',
dialect: 'postgres',
sql: 'select count(*) from public.orders',
headers: ['order_count'],
rows: [],
totalRows: 0,
plan: {
sources_used: ['orders'],
has_fan_out: true,
fan_out_description: 'orders fans out across line_items',
execution: { mode: 'compile_only', reason: 'No execution adapter configured.' },
},
}),
};
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'local-user' },
contextTools: { semanticLayer },
});
const result = await getTool(fake.tools, 'sl_query').handler({
connectionId: 'warehouse',
measures: ['orders.order_count'],
});
expect(result).toMatchObject({
structuredContent: {
connectionId: 'warehouse',
dialect: 'postgres',
headers: ['order_count'],
rows: [],
totalRows: 0,
notes: ['No execution adapter configured.', 'orders fans out across line_items'],
},
});
const structured = (result as { structuredContent: Record<string, unknown> }).structuredContent;
expect(structured.sql).toBeUndefined();
expect(structured.plan).toBeUndefined();
});
it('sl_query attaches sql and plan only when include requests them', async () => {
const fake = makeFakeServer();
const plan = { sources_used: ['orders'], execution: { mode: 'executed' } };
const semanticLayer: KtxSemanticLayerMcpPort = {
readSource: vi.fn(),
query: vi.fn<KtxSemanticLayerMcpPort['query']>().mockResolvedValue({
connectionId: 'warehouse',
dialect: 'postgres',
sql: 'select count(*) from public.orders',
headers: ['order_count'],
rows: [[3]],
totalRows: 1,
plan,
}),
};
createKtxMcpServer({
server: fake.server,
userContext: { userId: 'local-user' },
contextTools: { semanticLayer },
});
const result = await getTool(fake.tools, 'sl_query').handler({
connectionId: 'warehouse',
measures: ['orders.order_count'],
include: ['plan', 'sql'],
});
expect(result).toMatchObject({
structuredContent: {
sql: 'select count(*) from public.orders',
plan,
rows: [[3]],
totalRows: 1,
},
});
const structured = (result as { structuredContent: Record<string, unknown> }).structuredContent;
expect(structured.notes).toBeUndefined();
});
it('entity_details rejects sql-style schema table ref aliases', async () => {
const fake = makeFakeServer();
const entityDetails = makeAllContextTools().entityDetails!;
@ -838,7 +920,7 @@ describe('createKtxMcpServer', () => {
connectionId: '00000000-0000-4000-8000-000000000001',
}),
).resolves.toEqual({
content: [{ type: 'text', text: JSON.stringify({ runId: 'run-1' }, null, 2) }],
content: [{ type: 'text', text: JSON.stringify({ runId: 'run-1' }) }],
structuredContent: { runId: 'run-1' },
});
expect(ingest.ingest).toHaveBeenCalledWith({
@ -865,21 +947,17 @@ describe('createKtxMcpServer', () => {
content: [
{
type: 'text',
text: JSON.stringify(
{
runId: 'run-1',
status: 'done',
stage: 'done',
done: true,
captured: { wiki: ['revenue'], sl: [], xrefs: [] },
error: null,
commitHash: 'abc123',
skillsLoaded: ['wiki_capture'],
signalDetected: true,
},
null,
2,
),
text: JSON.stringify({
runId: 'run-1',
status: 'done',
stage: 'done',
done: true,
captured: { wiki: ['revenue'], sl: [], xrefs: [] },
error: null,
commitHash: 'abc123',
skillsLoaded: ['wiki_capture'],
signalDetected: true,
}),
},
],
structuredContent: {
@ -1087,19 +1165,15 @@ describe('createKtxMcpServer', () => {
content: [
{
type: 'text',
text: JSON.stringify(
{
connections: [
{
id: '00000000-0000-4000-8000-000000000001',
name: 'Warehouse',
connectionType: 'POSTGRES',
},
],
},
null,
2,
),
text: JSON.stringify({
connections: [
{
id: '00000000-0000-4000-8000-000000000001',
name: 'Warehouse',
connectionType: 'POSTGRES',
},
],
}),
},
],
structuredContent: {

View file

@ -403,7 +403,7 @@ describe('runKtxIngest', () => {
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
});
it('returns a non-zero code when Metabase fanout has failed children', async () => {
it('returns a non-zero code when a Metabase fanout child fully fails', async () => {
const projectDir = join(tempDir, 'project');
await writeMetabaseConfig(projectDir);
const io = makeIo();
@ -441,7 +441,7 @@ describe('runKtxIngest', () => {
{
runLocalMetabaseIngest: async () => ({
metabaseConnectionId: 'prod-metabase',
status: 'partial_failure',
status: 'all_failed',
totals: { workUnits: 1, failedWorkUnits: 1 },
children: [
{
@ -467,9 +467,83 @@ describe('runKtxIngest', () => {
),
).resolves.toBe(1);
expect(io.stdout()).toContain('Metabase fanout: partial_failure');
expect(io.stdout()).toContain('Failed tasks: 1');
expect(io.stdout()).toContain('Metabase fanout: all_failed');
expect(io.stdout()).toContain('status=error');
});
it('exits 0 and reports status=partial when a Metabase child saved memory despite a failure', async () => {
const projectDir = join(tempDir, 'project');
await writeMetabaseConfig(projectDir);
const io = makeIo();
const report = localFakeBundleReport('metabase-child-1', {
id: 'report-metabase-child-1',
runId: 'run-a',
jobId: 'metabase-child-1',
connectionId: 'warehouse_a',
sourceKey: 'metabase',
body: {
failedWorkUnits: ['metabase-db-2'],
workUnits: [
{
unitKey: 'metabase-db-1',
rawFiles: ['cards/1.json'],
status: 'success',
actions: [{ target: 'sl', type: 'updated', key: 'warehouse.orders', detail: 'measure' }],
touchedSlSources: [],
},
{
unitKey: 'metabase-db-2',
rawFiles: ['cards/2.json'],
status: 'failed',
reason: 'bad SQL',
actions: [],
touchedSlSources: [],
},
],
},
});
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'prod-metabase',
adapter: 'metabase',
outputMode: 'plain',
},
io.io,
{
runLocalMetabaseIngest: async () => ({
metabaseConnectionId: 'prod-metabase',
status: 'partial_failure',
totals: { workUnits: 2, failedWorkUnits: 1 },
children: [
{
jobId: 'metabase-child-1',
metabaseConnectionId: 'prod-metabase',
metabaseDatabaseId: 1,
targetConnectionId: 'warehouse_a',
result: {
jobId: 'metabase-child-1',
runId: 'run-a',
syncId: 'sync-a',
diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 },
workUnitCount: 2,
failedWorkUnits: ['metabase-db-2'],
artifactsWritten: 1,
commitSha: 'abc',
},
report,
},
],
}),
},
),
).resolves.toBe(0);
expect(io.stdout()).toContain('Metabase fanout: partial_failure');
expect(io.stdout()).toContain('status=partial');
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
});
@ -1140,6 +1214,63 @@ describe('runKtxIngest', () => {
expect(io.stdout()).toContain('Status: error\n');
});
it('exits 0 and reports Status: partial when a single-source ingest saved memory despite a failure', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const sourceDir = join(tempDir, 'source');
await mkdir(join(sourceDir, 'orders'), { recursive: true });
await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8');
const partialReport = localFakeBundleReport('local-job-partial', {
connectionId: 'warehouse',
sourceKey: 'fake',
body: {
failedWorkUnits: ['orders-bad'],
workUnits: [
{
unitKey: 'orders-ok',
rawFiles: ['orders/orders.json'],
status: 'success',
actions: [{ target: 'wiki', type: 'created', key: 'wiki/orders.md', detail: 'orders' }],
touchedSlSources: [],
},
{
unitKey: 'orders-bad',
rawFiles: ['orders/bad.json'],
status: 'failed',
reason: 'writer tool failed',
actions: [],
touchedSlSources: [],
},
],
},
});
const runLocal = vi.fn(async (_input: RunLocalIngestOptions) => ({
result: {
jobId: 'local-job-partial',
runId: partialReport.runId,
syncId: partialReport.body.syncId,
diffSummary: partialReport.body.diffSummary,
workUnitCount: partialReport.body.workUnits.length,
failedWorkUnits: partialReport.body.failedWorkUnits,
artifactsWritten: 1,
commitSha: partialReport.body.commitSha,
},
report: partialReport,
}));
const io = makeIo();
await expect(
runKtxIngest(
{ command: 'run', projectDir, connectionId: 'warehouse', adapter: 'fake', sourceDir, outputMode: 'plain' },
io.io,
{ runLocalIngest: runLocal, jobIdFactory: () => 'local-job-partial' },
),
).resolves.toBe(0);
expect(io.stdout()).toContain('Status: partial\n');
});
it('prints trace path and error status for stored failed ingest reports', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);

View file

@ -398,6 +398,59 @@ describe('setup status', () => {
expect(rendered).toContain('KTX context built: yes');
});
it('reports context ready after a partial ingest report saved memory', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'setup:',
' database_connection_ids:',
' - warehouse',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:DATABASE_URL',
'ingest:',
' embeddings:',
' backend: none',
' dimensions: 8',
'',
].join('\n'),
'utf-8',
);
await writeKtxSetupState(tempDir, { completed_steps: ['project', 'databases'] });
await persistLocalBundleReport(
tempDir,
localFakeBundleReport('warehouse-job-partial', {
connectionId: 'warehouse',
sourceKey: 'fake',
body: {
failedWorkUnits: ['orders-bad'],
workUnits: [
{
unitKey: 'orders-ok',
rawFiles: ['orders/orders.json'],
status: 'success',
actions: [{ target: 'wiki', type: 'created', key: 'wiki/orders.md', detail: 'orders' }],
touchedSlSources: [],
},
{
unitKey: 'orders-bad',
rawFiles: ['orders/bad.json'],
status: 'failed',
reason: 'writer tool failed',
actions: [],
touchedSlSources: [],
},
],
},
}),
);
const status = await readKtxSetupStatus(tempDir);
expect(status.context).toMatchObject({ ready: true, status: 'completed' });
});
it('formats plain and JSON setup status payloads', async () => {
const status = await readKtxSetupStatus(tempDir);
const rendered = formatKtxSetupStatus(status);