mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
feat(connectors): rename snowflake pool config
This commit is contained in:
parent
b741baf680
commit
a320b392c5
2 changed files with 47 additions and 17 deletions
|
|
@ -8,7 +8,7 @@ vi.mock('snowflake-sdk', () => ({
|
|||
}));
|
||||
|
||||
import { createSnowflakeLiveDatabaseIntrospection } from '../../connectors/snowflake/live-database-introspection.js';
|
||||
import { isKtxSnowflakeConnectionConfig, KtxSnowflakeScanConnector, snowflakeConnectionConfigFromConfig, type KtxSnowflakeDriver, type KtxSnowflakeDriverFactory } from '../../connectors/snowflake/connector.js';
|
||||
import { isKtxSnowflakeConnectionConfig, KtxSnowflakeScanConnector, snowflakeConnectionConfigFromConfig, type KtxSnowflakeConnectionConfig, type KtxSnowflakeDriver, type KtxSnowflakeDriverFactory } from '../../connectors/snowflake/connector.js';
|
||||
import { tableRefSet } from '../../context/scan/table-ref.js';
|
||||
|
||||
function fakeDriverFactory(): KtxSnowflakeDriverFactory {
|
||||
|
|
@ -140,8 +140,8 @@ describe('KtxSnowflakeScanConnector', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('defaults and validates Snowflake maxSessions', () => {
|
||||
const baseConnection = {
|
||||
it('defaults and validates Snowflake maxConnections', () => {
|
||||
const baseConnection: KtxSnowflakeConnectionConfig = {
|
||||
driver: 'snowflake',
|
||||
authMethod: 'password',
|
||||
account: 'acct',
|
||||
|
|
@ -150,32 +150,59 @@ describe('KtxSnowflakeScanConnector', () => {
|
|||
schema_name: 'PUBLIC',
|
||||
username: 'reader',
|
||||
password: 'fixture-pass', // pragma: allowlist secret
|
||||
} as const;
|
||||
};
|
||||
|
||||
expect(
|
||||
snowflakeConnectionConfigFromConfig({
|
||||
connectionId: 'warehouse',
|
||||
connection: baseConnection,
|
||||
}),
|
||||
).toMatchObject({ maxSessions: 4 });
|
||||
).toMatchObject({ maxConnections: 4 });
|
||||
|
||||
expect(
|
||||
snowflakeConnectionConfigFromConfig({
|
||||
connectionId: 'warehouse',
|
||||
connection: { ...baseConnection, maxSessions: 8 },
|
||||
connection: { ...baseConnection, maxConnections: 8 },
|
||||
}),
|
||||
).toMatchObject({ maxSessions: 8 });
|
||||
).toMatchObject({ maxConnections: 8 });
|
||||
|
||||
for (const maxSessions of [0, -1, 1.5, Number.NaN]) {
|
||||
expect(
|
||||
snowflakeConnectionConfigFromConfig({
|
||||
connectionId: 'warehouse',
|
||||
connection: { ...baseConnection, maxConnections: '12' as never },
|
||||
}),
|
||||
).toMatchObject({ maxConnections: 12 });
|
||||
|
||||
for (const maxConnections of [0, -1, 1.5, Number.NaN, 'abc' as never]) {
|
||||
expect(() =>
|
||||
snowflakeConnectionConfigFromConfig({
|
||||
connectionId: 'warehouse',
|
||||
connection: { ...baseConnection, maxSessions },
|
||||
connection: { ...baseConnection, maxConnections },
|
||||
}),
|
||||
).toThrow('connections.warehouse.maxSessions must be a positive integer');
|
||||
).toThrow('connections.warehouse.maxConnections must be a positive integer');
|
||||
}
|
||||
});
|
||||
|
||||
it('rejects stale Snowflake maxSessions config', () => {
|
||||
const baseConnection: KtxSnowflakeConnectionConfig = {
|
||||
driver: 'snowflake',
|
||||
authMethod: 'password',
|
||||
account: 'acct',
|
||||
warehouse: 'WH',
|
||||
database: 'ANALYTICS',
|
||||
schema_name: 'PUBLIC',
|
||||
username: 'reader',
|
||||
password: 'fixture-pass', // pragma: allowlist secret
|
||||
};
|
||||
|
||||
expect(() =>
|
||||
snowflakeConnectionConfigFromConfig({
|
||||
connectionId: 'warehouse',
|
||||
connection: { ...baseConnection, maxSessions: 8 },
|
||||
}),
|
||||
).toThrow('connections.warehouse.maxSessions has been renamed to maxConnections');
|
||||
});
|
||||
|
||||
it('uses one lazy Snowflake pool and drains it during cleanup', async () => {
|
||||
const { pool, executedSql } = installSnowflakePoolMock();
|
||||
const close = vi.fn(async () => undefined);
|
||||
|
|
@ -191,7 +218,7 @@ describe('KtxSnowflakeScanConnector', () => {
|
|||
username: 'reader',
|
||||
password: 'fixture-pass', // pragma: allowlist secret
|
||||
role: 'ANALYST',
|
||||
maxSessions: 3,
|
||||
maxConnections: 3,
|
||||
},
|
||||
sdkOptionsProvider: {
|
||||
resolve: vi.fn(async () => ({ sdkOptions: { application: 'ktx-test' }, close })),
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ export interface KtxSnowflakeConnectionConfig {
|
|||
privateKey?: string;
|
||||
passphrase?: string;
|
||||
role?: string;
|
||||
maxSessions?: number;
|
||||
maxConnections?: number;
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
|
|
@ -39,7 +39,7 @@ export interface KtxSnowflakeResolvedConnectionConfig {
|
|||
privateKey?: string;
|
||||
passphrase?: string;
|
||||
role?: string;
|
||||
maxSessions: number;
|
||||
maxConnections: number;
|
||||
}
|
||||
|
||||
export interface KtxSnowflakeRawColumnMetadata {
|
||||
|
|
@ -218,6 +218,9 @@ export function snowflakeConnectionConfigFromConfig(input: {
|
|||
if (!isKtxSnowflakeConnectionConfig(input.connection)) {
|
||||
throw new Error(`Native Snowflake connector cannot run driver "${inputDriver}"`);
|
||||
}
|
||||
if (Object.prototype.hasOwnProperty.call(input.connection, 'maxSessions')) {
|
||||
throw new Error(`connections.${input.connectionId}.maxSessions has been renamed to maxConnections`);
|
||||
}
|
||||
const env = input.env ?? process.env;
|
||||
const authMethod = input.connection?.authMethod ?? 'password';
|
||||
const account = stringConfigValue(input.connection, 'account', env);
|
||||
|
|
@ -249,9 +252,9 @@ export function snowflakeConnectionConfigFromConfig(input: {
|
|||
database,
|
||||
schemas: resolvedSchemas,
|
||||
username,
|
||||
maxSessions: positiveIntegerConfigValue({
|
||||
maxConnections: positiveIntegerConfigValue({
|
||||
connection: input.connection,
|
||||
key: 'maxSessions',
|
||||
key: 'maxConnections',
|
||||
connectionId: input.connectionId,
|
||||
defaultValue: 4,
|
||||
}),
|
||||
|
|
@ -322,7 +325,7 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
|
|||
const message = error instanceof Error ? error.message : String(error);
|
||||
if (/timeout/i.test(message) && /pool|acquire/i.test(message)) {
|
||||
throw new Error(
|
||||
"Snowflake session pool exhausted after 60s - consider lowering maxSessions or increasing your account's concurrent-statement limit.",
|
||||
"Snowflake session pool exhausted after 60s - consider lowering maxConnections or increasing your account's concurrent-statement limit.",
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
|
|
@ -432,7 +435,7 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
|
|||
if (!this.pool) {
|
||||
this.pool = snowflake.createPool(await this.resolveConnectionOptions(), {
|
||||
min: 0,
|
||||
max: this.resolved.maxSessions,
|
||||
max: this.resolved.maxConnections,
|
||||
evictionRunIntervalMillis: 30_000,
|
||||
acquireTimeoutMillis: 60_000,
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue