ktx/packages/cli/test/context/ingest/memory-flow/schema.test.ts

189 lines
6.5 KiB
TypeScript
Raw Permalink Normal View History

2026-05-10 23:12:26 +02:00
import { describe, expect, it } from 'vitest';
import {
memoryFlowReplayInputSchema,
memoryFlowStreamEventSchema,
parseMemoryFlowReplayInput,
test: split cli tests from source tree (#216) * feat(cli): define full warehouse dialect contract * test(cli): keep dialect edge tests focused * fix(cli): stabilize dialect contract foundation * refactor(connectors): own read-only query preparation * refactor(connectors): resolve dialects through registry * refactor(connectors): keep concrete dialect classes internal * chore(workspace): enforce dialect import boundary * refactor(cli): resolve relationship dialect at scan boundary * refactor(cli): use dialect display parsing for entity details * refactor(cli): use dialect display parsing for warehouse catalog * refactor(cli): use dialect SQL in relationship workflows * test(cli): verify solid dialect scan workflow closure * test: split cli tests from source tree * refactor(cli): standardize BigQuery scope listing * feat(sqlite): implement connector scope listing * test(connectors): cover required table listing * feat(cli): add warehouse driver registry * refactor(setup): route scope discovery through driver registry * refactor(cli): route local query execution through driver registry * refactor(historic-sql): route dialect support through driver registry * refactor(cli): test warehouse connections through driver registry * fix(cli): close driver registry type export gaps * Improve setup daemon diagnostics * refactor(setup): centralize rail-prefixed diagnostics + query-history fallback Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput into clack.ts so the setup wizard, managed daemons, and embedding/agent steps share one rail-formatted writer. setup-databases.ts also adds a "disable query history and retry" option when the schema-context build fails and query history is the likely culprit, surfaced via a new failed-query-history-unavailable status. * fix(cli): carry catalog through the picker so BigQuery/Snowflake/SQL Server scope filters match The setup picker's KtxTableListEntry was a 2-level { schema, name }, so qualifiedTableId always wrote db.name into enabled_tables. When BigQuery, Snowflake, or SQL Server later ran fast ingest, their introspect step filtered the scope set with scopedTableNames(scope, { catalog: projectId|database, db }) — catalog was non-null on the introspect side but null in the scope refs, so every entry was rejected, the live-database adapter staged zero table files, and detect() failed with 'Adapter "live-database" did not recognize fetched source output'. Align the picker boundary with the canonical 3-level KtxTableRef: - Add catalog: string | null to KtxTableListEntry. - BigQuery/Snowflake/SQL Server listTables populate catalog from the resolved projectId / database; Postgres/MySQL/ClickHouse/SQLite set null. - qualifiedTableId emits catalog.schema.name when catalog is non-null (resolveEnabledTables already accepts the 3-part shape) and schemasFromEnabledTables now goes through parseDottedTableEntry so it recovers the schema correctly from both 2-part and 3-part entries. - Export parseDottedTableEntry from enabled-tables.ts (@internal) for picker reuse. Update listTables expectations in all seven connector tests and the setup / picker test fixtures. Add a picker regression test that covers the catalog-bearing round-trip (save + refine). * fix(cli): allow debug telemetry under opt-out env
2026-05-26 08:49:05 +02:00
} from '../../../../src/context/ingest/memory-flow/schema.js';
import type { MemoryFlowReplayInput } from '../../../../src/context/ingest/memory-flow/types.js';
2026-05-10 23:12:26 +02:00
function snapshot(overrides: Partial<MemoryFlowReplayInput> = {}): MemoryFlowReplayInput {
return {
runId: 'job-1',
connectionId: 'connection-1',
adapter: 'metabase',
status: 'running',
sourceDir: null,
syncId: 'sync-1',
errors: [],
events: [
{ type: 'source_acquired', adapter: 'metabase', trigger: 'manual_resync', fileCount: 2 },
{ type: 'scope_detected', fingerprint: 'scope-1' },
{ type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 },
{ type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 },
feat(ingest): default local ingest to isolated diffs (#128) * docs: add isolated-diff ingestion design * Refine isolated-diff ingestion design after adversarial review iteration 1 * Refine isolated-diff ingestion design after adversarial review iteration 2 * Refine isolated-diff ingestion design after adversarial review iteration 3 * feat: persist ingest trace events * feat: add isolated ingest patch helpers * feat: validate wiki body semantic references * feat: add final ingest artifact gates * feat: execute ingest work units in child worktrees * feat: integrate isolated work unit patches * feat: route selected ingest sources through isolated diffs * test: cover isolated diff ingestion regressions * feat: add isolated diff ingestion v1 core * docs: document ingest trace inspection * docs: add isolated diff ingestion v1 core plan * fix(ingest): tighten final artifact gates * fix(ingest): gate isolated final integration tree * fix(ingest): persist postmortem failure traces * fix(ingest): trace policy conflicts and cleanup child worktrees * test(ingest): verify isolated diff postmortem coverage * docs: add isolated diff ingestion gates and trace closure plan * fix(ingest): gate provenance before isolated diff squash * docs: add isolated diff ingestion provenance gate closure plan * fix(ingest): gate final wiki references * fix(ingest): enforce SL target connection scope * fix(ingest): trace isolated SL target policy gates * test(ingest): cover isolated diff reference and target gates * chore(ingest): verify isolated diff gate closure * docs: add isolated diff ingestion reference and target gate closure plan * fix(ingest): gate global wiki references * docs: add isolated diff ingestion global wiki reference gate closure plan * fix(ingest): validate scan sources and wiki refs * test(ingest): cover isolated diff textual conflict resolver * test(ingest): cover isolated diff resolver integration * feat(ingest): repair isolated diff textual conflicts * feat(ingest): report isolated diff resolver outcomes * test(ingest): verify isolated diff textual conflict repair * test(ingest): align textual conflict failure coverage * docs: add isolated diff textual conflict resolver plan * test(ingest): cover isolated diff gate repair * feat(ingest): add isolated diff gate repair agent * feat(ingest): repair isolated diff semantic gate failures * feat(ingest): wire isolated diff gate repair * test(ingest): verify isolated diff final gate repair * chore(ingest): verify isolated diff gate repair * docs: add isolated diff gate repair plan * Improve ingest progress updates * feat(ingest): route direct-write connectors through isolated diffs * test(ingest): cover non-metabase isolated diff routing * feat(ingest): project metricflow semantic models before work units * test(ingest): verify metricflow isolated projection path * chore(ingest): verify isolated diff connector migration * docs: add isolated diff connector migration plan * feat(ingest): make isolated diff routing the private default * feat(ingest): promote isolated diff to default runner path * feat(ingest): default local ingest to isolated diffs * chore(ingest): remove isolated diff allowlist references * fix(ingest): preserve transient evidence for isolated work units * docs: add isolated diff default promotion plan * refactor(ingest): remove shared worktree WorkUnit path * docs(ingest): align WorkUnit prompts with isolated diffs * test(ingest): drop unused runner import * docs: add isolated diff shared worktree removal plan * docs: add isolated diff gate repair classification plan * fix: restrict claude-code mcp servers * docs: align ingest trace guidance with public CLI --------- Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
2026-05-18 13:38:06 +02:00
{ type: 'stage_progress', stage: 'integration', percent: 80, message: 'Integrating 1/1 patches: orders' },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
{ type: 'work_unit_step', unitKey: 'orders', toolCalls: 1 },
{ type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' },
2026-05-10 23:12:26 +02:00
{ type: 'work_unit_finished', unitKey: 'orders', status: 'success' },
{ type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 },
{ type: 'saved', commitSha: 'abc12345', wikiCount: 1, slCount: 0 },
{ type: 'provenance_recorded', rowCount: 1 },
{ type: 'report_created', runId: 'run-1', reportPath: 'ingest-report.json' },
],
plannedWorkUnits: [{ unitKey: 'orders', rawFiles: ['orders.md'], peerFileCount: 0, dependencyCount: 1 }],
details: {
actions: [
{
unitKey: 'orders',
target: 'wiki',
action: 'created',
key: 'wiki/orders.md',
2026-05-10 23:12:26 +02:00
summary: 'Created orders page',
rawFiles: ['orders.md'],
status: 'success',
},
],
provenance: [
{
rawPath: 'orders.md',
artifactKind: 'wiki',
artifactKey: 'wiki/orders.md',
2026-05-10 23:12:26 +02:00
actionType: 'wiki_written',
},
],
transcripts: [
{
unitKey: 'orders',
path: 'transcripts/orders.jsonl',
toolCallCount: 2,
errorCount: 0,
toolNames: ['wiki_write'],
},
],
},
...overrides,
};
}
describe('memory-flow schemas', () => {
it('parses a full replay input snapshot', () => {
expect(parseMemoryFlowReplayInput(snapshot())).toEqual(snapshot());
});
it('parses replay metadata and timestamped events', () => {
const parsed = parseMemoryFlowReplayInput(
snapshot({
metadata: {
schemaVersion: 1,
mode: 'full',
origin: 'captured',
timing: 'captured',
capturedAt: '2026-05-01T10:00:03.000Z',
sourceReportId: 'report-1',
sourceReportPath: 'reports/report-1.json',
fallbackReason: null,
},
events: [
{
type: 'source_acquired',
adapter: 'metabase',
trigger: 'manual_resync',
fileCount: 2,
emittedAt: '2026-05-01T10:00:00.000Z',
},
],
}),
);
expect(parsed.metadata).toEqual({
schemaVersion: 1,
mode: 'full',
origin: 'captured',
timing: 'captured',
capturedAt: '2026-05-01T10:00:03.000Z',
sourceReportId: 'report-1',
sourceReportPath: 'reports/report-1.json',
fallbackReason: null,
});
expect(parsed.events).toEqual([
{
type: 'source_acquired',
adapter: 'metabase',
trigger: 'manual_resync',
fileCount: 2,
emittedAt: '2026-05-01T10:00:00.000Z',
},
]);
});
it('parses skipped deterministic stages', () => {
const parsed = parseMemoryFlowReplayInput(
snapshot({
status: 'done',
events: [
{ type: 'source_acquired', adapter: 'live-database', trigger: 'demo_deterministic', fileCount: 7 },
{ type: 'scope_detected', fingerprint: 'sqlite' },
{ type: 'raw_snapshot_written', syncId: 'sync-demo', rawFileCount: 7 },
{ type: 'diff_computed', added: 7, modified: 0, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 7, workUnitCount: 0, evictionCount: 0 },
{ type: 'stage_skipped', stage: 'workUnits', reason: 'deterministic mode' },
{ type: 'stage_skipped', stage: 'actions', reason: 'requires LLM' },
{ type: 'stage_skipped', stage: 'gates', reason: 'requires candidate actions' },
{ type: 'stage_skipped', stage: 'saved', reason: 'requires LLM memory synthesis' },
{ type: 'saved', commitSha: null, wikiCount: 0, slCount: 0 },
{ type: 'provenance_recorded', rowCount: 0 },
{
type: 'report_created',
runId: 'scan-demo',
reportPath: 'raw-sources/orbit_demo/live-database/sync-demo/scan-report.json',
},
],
}),
);
expect(parsed.events).toContainEqual({ type: 'stage_skipped', stage: 'workUnits', reason: 'deterministic mode' });
expect(parsed.events).toContainEqual({ type: 'stage_skipped', stage: 'actions', reason: 'requires LLM' });
});
feat(cli): add ingest LLM rate-limit governor with paced retries (#261) * feat(cli): add ingest rate limit governor * feat(cli): wire ingest rate-limit config * feat(cli): report provider rate-limit signals * feat(cli): show ingest rate-limit waits * fix(cli): complete rate-limit event coverage * fix(cli): abort ingest provider calls cleanly * fix(cli): propagate ingest cancellation * fix(cli): reject pre-aborted ingest rate-limit waits * fix(cli): honor Claude rate-limit reset waits * fix(cli): retry thrown Codex rate-limit failures * fix(cli): type Claude rate-limit result details * fix(cli): emit ingest rate-limit countdowns from rejected signals * fix(cli): report ai sdk rate-limit header utilization * fix(cli): gate LLM rate-limit retries on the governor budget The AI SDK and Codex runtimes retried 429 / opaque rate-limit failures up to 6-7 times with no backoff when constructed without a RateLimitGovernor (scan, memory, setup) or with pacing disabled, ignoring Retry-After and worsening the limit. The outer retry loop only cooperates with the governor's pause, so without active pacing there is no backoff to apply. Route the retry bound through a single source: RateLimitGovernor .maxRetryAttempts(), which returns retry.maxAttempts when enabled and 1 (no outer retry) when absent or disabled. All three runtimes (ai-sdk, codex, claude-code) now use it, so ingest.rateLimit.retry.maxAttempts genuinely controls attempts and the hard-coded 6 (plus Codex's off-by-one extra attempt) is gone. Backend-native retry (e.g. the AI SDK's maxRetries) still handles transient 429s. Also correct the ktx.yaml docs for maxWaitMs (caps each wait, not the whole run) and maxAttempts, and sync uv.lock ktx-sl/ktx-daemon to 0.9.0.
2026-06-05 12:10:27 +02:00
it('accepts rate-limit wait replay events', () => {
expect(
memoryFlowReplayInputSchema.parse({
...snapshot(),
events: [
{
type: 'rate_limit_wait',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
resumeAtMs: 2_000,
remainingMs: 1_000,
},
],
}).events[0],
).toEqual({
type: 'rate_limit_wait',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
resumeAtMs: 2_000,
remainingMs: 1_000,
});
});
2026-05-10 23:12:26 +02:00
it('parses snapshot and closed stream events', () => {
expect(memoryFlowStreamEventSchema.parse({ type: 'snapshot', snapshot: snapshot({ status: 'done' }) })).toEqual({
type: 'snapshot',
snapshot: snapshot({ status: 'done' }),
});
expect(memoryFlowStreamEventSchema.parse({ type: 'closed', status: 'done', errors: [] })).toEqual({
type: 'closed',
status: 'done',
errors: [],
});
});
it('rejects invalid replay status values', () => {
expect(() => memoryFlowReplayInputSchema.parse({ ...snapshot(), status: 'complete' })).toThrow();
});
});