feat(cli): add ktx admin reindex (#160)

* feat(cli): add admin reindex

* fix: keep lexical-only reindex incremental
This commit is contained in:
Andrey Avtomonov 2026-05-20 01:36:54 +02:00 committed by GitHub
parent 3db3e724cb
commit 6dbb0c8b3a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
53 changed files with 1640 additions and 393 deletions

View file

@ -40,9 +40,9 @@ export interface SlSourcesIndexPort {
sources: Array<{ sourceName: string; searchText: string; embedding: number[] | null; contentHash?: string | null }>,
): Promise<void>;
getExistingSearchTexts(connectionId: string): Promise<Map<string, { searchText: string; hasEmbedding: boolean }>>;
deleteStale(connectionId: string, keepNames: string[]): Promise<void>;
deleteByConnection(connectionId: string): Promise<void>;
deleteByConnectionAndName(connectionId: string, sourceName: string): Promise<void>;
deleteStale(connectionId: string, keepNames: string[]): Promise<number>;
deleteByConnection(connectionId: string): Promise<number>;
deleteByConnectionAndName(connectionId: string, sourceName: string): Promise<number>;
search(
connectionId: string,
queryEmbedding: number[] | null,

View file

@ -223,4 +223,73 @@ describe('SlSearchService', () => {
},
]);
});
it('indexSources reports stats and supports lexical-only indexing', async () => {
const repository = {
upsertSources: vi.fn().mockResolvedValue(undefined),
getExistingSearchTexts: vi.fn().mockResolvedValue(
new Map([
['old_source', { searchText: 'old source', hasEmbedding: true }],
]),
),
deleteStale: vi.fn().mockResolvedValue(1),
deleteByConnection: vi.fn().mockResolvedValue(0),
deleteByConnectionAndName: vi.fn(),
search: vi.fn(),
};
const service = new SlSearchService(null, repository);
const source: SemanticLayerSource = {
name: 'orders',
table: 'public.orders',
grain: ['id'],
columns: [{ name: 'id', type: 'number' }],
joins: [],
measures: [],
};
await expect(service.indexSources('warehouse', [source])).resolves.toEqual({
scanned: 1,
updated: 1,
deleted: 1,
embeddingsRecomputed: 0,
embeddingsFailed: 0,
});
expect(repository.upsertSources).toHaveBeenCalledWith('warehouse', [
expect.objectContaining({ sourceName: 'orders', embedding: null }),
]);
});
it('does not update unchanged lexical-only SL rows on repeated sync', async () => {
const repository = {
upsertSources: vi.fn().mockResolvedValue(undefined),
getExistingSearchTexts: vi.fn().mockResolvedValue(
new Map([
['orders', { searchText: 'orders. table: public.orders. id (number)', hasEmbedding: false }],
]),
),
deleteStale: vi.fn().mockResolvedValue(0),
deleteByConnection: vi.fn().mockResolvedValue(0),
deleteByConnectionAndName: vi.fn(),
search: vi.fn(),
};
const service = new SlSearchService(null, repository);
const source: SemanticLayerSource = {
name: 'orders',
table: 'public.orders',
grain: ['id'],
columns: [{ name: 'id', type: 'number' }],
joins: [],
measures: [],
};
await expect(service.indexSources('warehouse', [source])).resolves.toEqual({
scanned: 1,
updated: 0,
deleted: 0,
embeddingsRecomputed: 0,
embeddingsFailed: 0,
});
expect(repository.upsertSources).toHaveBeenCalledWith('warehouse', []);
expect(repository.deleteStale).toHaveBeenCalledWith('warehouse', ['orders']);
});
});

View file

@ -1,5 +1,6 @@
import type { KtxEmbeddingPort, KtxLogger } from '../core/index.js';
import { noopLogger } from '../core/index.js';
import type { ReindexWorkResult } from '../index-sync/types.js';
import { DEFAULT_PRIORITY, resolveDescription } from './descriptions.js';
import { normalizeSemanticLayerDescriptions } from './description-normalization.js';
import type { SlSourcesIndexPort } from './ports.js';
@ -94,73 +95,71 @@ export function buildSemanticLayerSourceSearchText(
export class SlSearchService {
constructor(
private readonly embeddingService: KtxEmbeddingPort,
private readonly embeddingService: KtxEmbeddingPort | null,
private readonly slSourcesRepository: SlSourcesIndexPort,
private readonly logger: KtxLogger = noopLogger,
) {}
async indexSources(connectionId: string, sources: SemanticLayerSource[]): Promise<void> {
async indexSources(connectionId: string, sources: SemanticLayerSource[]): Promise<ReindexWorkResult> {
const existing = await this.slSourcesRepository.getExistingSearchTexts(connectionId);
if (sources.length === 0) {
await this.slSourcesRepository.deleteByConnection(connectionId);
return;
const deleted = await this.slSourcesRepository.deleteByConnection(connectionId);
return { scanned: 0, updated: 0, deleted, embeddingsRecomputed: 0, embeddingsFailed: 0 };
}
// Detect which sources actually changed by comparing search_text
const existing = await this.slSourcesRepository.getExistingSearchTexts(connectionId);
const searchTexts = sources.map((s) => this.buildSearchText(s));
const embeddingService = this.embeddingService;
const changedIndices: number[] = [];
for (let i = 0; i < sources.length; i++) {
const prev = existing.get(sources[i].name);
if (!prev || prev.searchText !== searchTexts[i] || !prev.hasEmbedding) {
for (let i = 0; i < sources.length; i += 1) {
const previous = existing.get(sources[i]!.name);
if (
!previous ||
previous.searchText !== searchTexts[i] ||
(embeddingService !== null && !previous.hasEmbedding)
) {
changedIndices.push(i);
}
}
if (changedIndices.length === 0) {
// Still clean up stale sources even if nothing changed
const keepNames = sources.map((s) => s.name);
await this.slSourcesRepository.deleteStale(connectionId, keepNames);
this.logger.log(`SL sources for connection ${connectionId}: all ${sources.length} up to date, 0 reindexed`);
return;
}
let changedEmbeddings: (number[] | null)[] = changedIndices.map(() => null);
let embeddingsRecomputed = 0;
let embeddingsFailed = 0;
// Compute embeddings only for changed sources
const changedTexts = changedIndices.map((i) => searchTexts[i]);
let changedEmbeddings: (number[] | null)[];
try {
const batchSize = this.embeddingService.maxBatchSize;
const allEmbeddings: number[][] = [];
for (let i = 0; i < changedTexts.length; i += batchSize) {
const batch = changedTexts.slice(i, i + batchSize);
const batchEmbeddings = await this.embeddingService.computeEmbeddingsBulk(batch);
allEmbeddings.push(...batchEmbeddings);
if (embeddingService && changedIndices.length > 0) {
try {
const changedTexts = changedIndices.map((index) => searchTexts[index]!);
const allEmbeddings: number[][] = [];
for (let i = 0; i < changedTexts.length; i += embeddingService.maxBatchSize) {
const batch = changedTexts.slice(i, i + embeddingService.maxBatchSize);
allEmbeddings.push(...(await embeddingService.computeEmbeddingsBulk(batch)));
}
changedEmbeddings = allEmbeddings;
embeddingsRecomputed = allEmbeddings.length;
} catch (error) {
this.logger.warn(
`Failed to compute SL source embeddings: ${error instanceof Error ? error.message : String(error)}`,
);
embeddingsFailed = changedIndices.length;
}
changedEmbeddings = allEmbeddings;
} catch (error) {
this.logger.warn(
`Failed to compute SL source embeddings: ${error instanceof Error ? error.message : String(error)}`,
);
changedEmbeddings = changedIndices.map(() => null);
}
const rows = changedIndices.map((srcIdx, i) => {
return {
sourceName: sources[srcIdx].name,
searchText: searchTexts[srcIdx],
embedding: changedEmbeddings[i],
};
});
const rows = changedIndices.map((sourceIndex, embeddingIndex) => ({
sourceName: sources[sourceIndex]!.name,
searchText: searchTexts[sourceIndex]!,
embedding: changedEmbeddings[embeddingIndex] ?? null,
}));
await this.slSourcesRepository.upsertSources(connectionId, rows);
// Remove sources that no longer exist in YAML
const keepNames = sources.map((s) => s.name);
await this.slSourcesRepository.deleteStale(connectionId, keepNames);
this.logger.log(
`SL sources for connection ${connectionId}: ${changedIndices.length}/${sources.length} reindexed, ${sources.length - changedIndices.length} unchanged`,
);
const keepNames = sources.map((source) => source.name);
const deleted = await this.slSourcesRepository.deleteStale(connectionId, keepNames);
return {
scanned: sources.length,
updated: changedIndices.length,
deleted,
embeddingsRecomputed,
embeddingsFailed,
};
}
async search(
@ -170,12 +169,14 @@ export class SlSearchService {
minRrfScore = 0,
): Promise<Array<{ sourceName: string; score: number; snippet?: string }>> {
let queryEmbedding: number[] | null = null;
try {
queryEmbedding = await this.embeddingService.computeEmbedding(query);
} catch (error) {
this.logger.warn(
`Failed to compute query embedding, falling back to FTS + trigram: ${error instanceof Error ? error.message : String(error)}`,
);
if (this.embeddingService) {
try {
queryEmbedding = await this.embeddingService.computeEmbedding(query);
} catch (error) {
this.logger.warn(
`Failed to compute query embedding, falling back to FTS + trigram: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
const results = await this.slSourcesRepository.search(connectionId, queryEmbedding, query, limit, minRrfScore);

View file

@ -105,6 +105,33 @@ describe('SqliteSlSourcesIndex', () => {
expect(await index.search('finance', null, 'revenue', 10)).toEqual([]);
});
it('clear removes sources and dictionary rows for one connection only', async () => {
const index = new SqliteSlSourcesIndex({ dbPath });
await index.upsertSources('warehouse', [
{ sourceName: 'orders', searchText: 'orders revenue paid', embedding: null },
]);
await index.upsertSources('finance', [
{ sourceName: 'invoices', searchText: 'invoices revenue paid', embedding: null },
]);
await index.replaceDictionaryEntries('warehouse', [
{ connectionId: 'warehouse', sourceName: 'orders', columnName: 'status', value: 'paid', cardinality: 1 },
]);
await index.replaceDictionaryEntries('finance', [
{ connectionId: 'finance', sourceName: 'invoices', columnName: 'status', value: 'paid', cardinality: 1 },
]);
await expect(index.clear('warehouse')).resolves.toBe(1);
expect(await index.search('warehouse', null, 'revenue', 10)).toEqual([]);
expect(await index.search('finance', null, 'revenue', 10)).toEqual([
expect.objectContaining({ sourceName: 'invoices' }),
]);
await expect(index.searchDictionaryCandidates({ connectionIds: ['warehouse'], queryText: 'paid', limit: 10 }))
.resolves.toEqual([]);
await expect(index.searchDictionaryCandidates({ connectionIds: ['finance'], queryText: 'paid', limit: 10 }))
.resolves.toEqual([expect.objectContaining({ connectionId: 'finance', sourceName: 'invoices' })]);
});
it('returns lane candidates with stable connection-scoped IDs', async () => {
const index = new SqliteSlSourcesIndex({ dbPath });

View file

@ -221,10 +221,9 @@ export class SqliteSlSourcesIndex implements SlSourcesIndexPort {
);
}
async deleteStale(connectionId: string, keepNames: string[]): Promise<void> {
async deleteStale(connectionId: string, keepNames: string[]): Promise<number> {
if (keepNames.length === 0) {
await this.deleteByConnection(connectionId);
return;
return this.deleteByConnection(connectionId);
}
const placeholders = keepNames.map(() => '?').join(', ');
@ -257,18 +256,29 @@ export class SqliteSlSourcesIndex implements SlSourcesIndexPort {
});
remove(stale.map((row) => row.source_name));
return stale.length;
}
async deleteByConnection(connectionId: string): Promise<void> {
async deleteByConnection(connectionId: string): Promise<number> {
return this.clear(connectionId);
}
async clear(connectionId: string): Promise<number> {
const rows = this.db
.prepare('SELECT source_name FROM local_sl_sources WHERE connection_id = ?')
.all(connectionId) as Array<{ source_name: string }>;
const remove = this.db.transaction(() => {
this.db.prepare('DELETE FROM local_sl_sources_fts WHERE connection_id = ?').run(connectionId);
this.db.prepare('DELETE FROM local_sl_sources WHERE connection_id = ?').run(connectionId);
this.db.prepare('DELETE FROM local_sl_dictionary_values_fts WHERE connection_id = ?').run(connectionId);
this.db.prepare('DELETE FROM local_sl_dictionary_values WHERE connection_id = ?').run(connectionId);
});
remove();
return rows.length;
}
async deleteByConnectionAndName(connectionId: string, sourceName: string): Promise<void> {
this.deleteByConnectionAndNameSync(connectionId, sourceName);
async deleteByConnectionAndName(connectionId: string, sourceName: string): Promise<number> {
return this.deleteByConnectionAndNameSync(connectionId, sourceName);
}
async replaceDictionaryEntries(connectionId: string, entries: SlDictionaryEntry[]): Promise<void> {
@ -537,7 +547,7 @@ export class SqliteSlSourcesIndex implements SlSourcesIndexPort {
.filter((row) => row.rrfScore >= minRrfScore);
}
private deleteByConnectionAndNameSync(connectionId: string, sourceName: string): void {
private deleteByConnectionAndNameSync(connectionId: string, sourceName: string): number {
const remove = this.db.transaction(() => {
this.db
.prepare(
@ -548,7 +558,7 @@ export class SqliteSlSourcesIndex implements SlSourcesIndexPort {
`,
)
.run(connectionId, sourceName);
this.db
const result = this.db
.prepare(
`
DELETE FROM local_sl_sources
@ -557,7 +567,8 @@ export class SqliteSlSourcesIndex implements SlSourcesIndexPort {
`,
)
.run(connectionId, sourceName);
return Number(result.changes);
});
remove();
return remove();
}
}