ktx/packages/cli/src/local-adapters.test.ts

141 lines
3.9 KiB
TypeScript

import { mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { loadKtxProject } from '@ktx/context/project';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
function sqlAnalysisStub() {
return {
async analyzeForFingerprint(sql: string) {
return {
fingerprint: 'fp',
normalizedSql: sql,
tablesTouched: [],
literalSlots: [],
};
},
async analyzeBatch() {
return new Map();
},
};
}
async function writeProject(projectDir: string, body: string): Promise<void> {
await writeFile(join(projectDir, 'ktx.yaml'), body, 'utf-8');
}
describe('CLI local ingest adapters', () => {
let tempDir: string;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-cli-local-adapters-'));
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
it('registers Postgres historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' historicSql:',
' enabled: true',
' dialect: postgres',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'warehouse',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
it('registers BigQuery historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' bq:',
' driver: bigquery',
' readonly: true',
' dataset_id: analytics',
' location: us',
' credentials_json: \'{"project_id":"demo-project"}\'',
' historicSql:',
' enabled: true',
' dialect: bigquery',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'bq',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
it('registers Snowflake historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' sf:',
' driver: snowflake',
' readonly: true',
' account: acct',
' warehouse: wh',
' database: ANALYTICS',
' schema_name: PUBLIC',
' username: reader',
' password: env:SNOWFLAKE_PASSWORD',
' historicSql:',
' enabled: true',
' dialect: snowflake',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'sf',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
});