feat(connector): add Amazon Athena connector via Glue Data Catalog (#309)

* feat(connector): add Amazon Athena connector via Glue Data Catalog

* fix(athena): address reviewer feedback

* fix(athena): wire scope discovery, fix normalizeDriver, tighten types and tests

* fix(athena): honor databases scope, wire sql-analysis dialect, harden config resolution

- introspect() limits to the configured `databases` scope instead of scanning
  every Glue database in the account (docs promised this; connector ignored it)
- add athena -> athena to sql-analysis SQLGLOT_DIALECTS so `ktx sql` and MCP
  read-only validation parse Athena SQL under the Trino grammar, not postgres
- stringConfigValue coerces a resolved-empty `env:` reference to undefined so
  optional fields fall back to their defaults (workgroup 'primary', catalog
  'AwsDataCatalog') instead of ''
- drop trailing whitespace in dialect.test.ts

* fix(athena): integrate with main's SQL/non-SQL dialect split and add dialect notes

Rebase onto main, which introduced the KtxDialect (core) vs KtxSqlDialect
(SQL-only) split for MongoDB:
- KtxAthenaDialect implements KtxSqlDialect; the connector resolves it via
  getSqlDialectForDriver so SQL-generation methods stay in scope
- add authored athena.md SQL notes for the sql_dialect_notes MCP tool, required
  now that athena resolves to the athena sqlglot dialect (dialect-notes coverage
  is derived from the warehouse-driver registry)

---------

Co-authored-by: Andrey Avtomonov <andreybavt@gmail.com>
This commit is contained in:
Patel Dhrit 2026-07-02 06:00:26 -07:00 committed by GitHub
parent 6d01030745
commit fe7e6bd1fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 2047 additions and 6 deletions

View file

@ -51,6 +51,8 @@
"@ai-sdk/devtools": "0.0.18",
"@ai-sdk/google-vertex": "^4.0.134",
"@anthropic-ai/claude-agent-sdk": "0.3.146",
"@aws-sdk/client-athena": "^3.1068.0",
"@aws-sdk/client-glue": "^3.1068.0",
"@clack/core": "1.3.1",
"@clack/prompts": "1.4.0",
"@clickhouse/client": "^1.18.5",

View file

@ -10,6 +10,7 @@ export const KTX_DATABASE_DRIVER_IDS = [
'sqlserver',
'bigquery',
'snowflake',
'athena',
] as const;
// mongodb is a database driver but has no SQL dialect, so it sits outside the

View file

@ -0,0 +1,555 @@
import { AthenaClient, StartQueryExecutionCommand, GetQueryExecutionCommand, GetQueryResultsCommand } from '@aws-sdk/client-athena';
import { GlueClient, GetDatabasesCommand, GetTablesCommand } from '@aws-sdk/client-glue';
import { getSqlDialectForDriver } from '../../context/connections/dialects.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import {
connectorTestFailure,
createKtxConnectorCapabilities,
type KtxConnectorTestResult,
type KtxColumnSampleInput,
type KtxColumnSampleResult,
type KtxColumnStatsInput,
type KtxColumnStatsResult,
type KtxQueryResult,
type KtxReadOnlyQueryInput,
type KtxScanConnector,
type KtxScanContext,
type KtxScanInput,
type KtxSchemaColumn,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '../../context/scan/types.js';
import { scopedTableNames } from '../../context/scan/table-ref.js';
import { resolveStringReference } from '../shared/string-reference.js';
export interface KtxAthenaConnectionConfig {
driver?: string;
region?: string;
s3_staging_dir?: string;
workgroup?: string;
catalog?: string;
database?: string;
databases?: string[];
[key: string]: unknown;
}
export interface KtxAthenaResolvedConnectionConfig {
region: string;
s3StagingDir: string;
workgroup: string;
catalog: string;
database: string | undefined;
databases: string[];
}
interface KtxAthenaQueryExecutionStatus {
State?: string;
StateChangeReason?: string;
}
interface KtxAthenaQueryExecution {
Status?: KtxAthenaQueryExecutionStatus;
}
interface KtxAthenaColumnInfo {
Name?: string;
Type?: string;
}
interface KtxAthenaDatum {
VarCharValue?: string;
}
interface KtxAthenaRow {
Data?: KtxAthenaDatum[];
}
interface KtxAthenaResultSet {
Rows?: KtxAthenaRow[];
ResultSetMetadata?: { ColumnInfo?: KtxAthenaColumnInfo[] };
}
/** @internal */
export interface KtxAthenaClient {
startQueryExecution(input: {
QueryString: string;
ResultConfiguration: { OutputLocation: string };
WorkGroup: string;
QueryExecutionContext?: { Database?: string; Catalog?: string };
}): Promise<{ QueryExecutionId?: string }>;
getQueryExecution(input: { QueryExecutionId: string }): Promise<{ QueryExecution?: KtxAthenaQueryExecution }>;
getQueryResults(input: { QueryExecutionId: string; NextToken?: string }): Promise<{
ResultSet?: KtxAthenaResultSet;
NextToken?: string;
}>;
}
interface KtxGlueColumnDef {
Name?: string;
Type?: string;
Comment?: string;
}
interface KtxGlueStorageDescriptor {
Columns?: KtxGlueColumnDef[];
}
/** @internal */
export interface KtxGlueTable {
Name?: string;
TableType?: string;
StorageDescriptor?: KtxGlueStorageDescriptor;
PartitionKeys?: KtxGlueColumnDef[];
Description?: string;
Parameters?: Record<string, string>;
}
/** @internal */
export interface KtxGlueClient {
getDatabases(input: { CatalogId?: string; NextToken?: string }): Promise<{
DatabaseList?: Array<{ Name?: string }>;
NextToken?: string;
}>;
getTables(input: { DatabaseName: string; CatalogId?: string; NextToken?: string }): Promise<{
TableList?: KtxGlueTable[];
NextToken?: string;
}>;
}
export interface KtxAthenaClientFactory {
createAthenaClient(region: string): KtxAthenaClient;
createGlueClient(region: string): KtxGlueClient;
}
class DefaultAthenaClientFactory implements KtxAthenaClientFactory {
createAthenaClient(region: string): KtxAthenaClient {
const client = new AthenaClient({ region });
return {
startQueryExecution: async (input) => {
const result = await client.send(
new StartQueryExecutionCommand({
QueryString: input.QueryString,
ResultConfiguration: { OutputLocation: input.ResultConfiguration.OutputLocation },
WorkGroup: input.WorkGroup,
QueryExecutionContext: input.QueryExecutionContext,
}),
);
return { QueryExecutionId: result.QueryExecutionId };
},
getQueryExecution: async (input) => {
const result = await client.send(new GetQueryExecutionCommand({ QueryExecutionId: input.QueryExecutionId }));
return {
QueryExecution: result.QueryExecution
? {
Status: {
State: result.QueryExecution.Status?.State,
StateChangeReason: result.QueryExecution.Status?.StateChangeReason,
},
}
: undefined,
};
},
getQueryResults: async (input) => {
const result = await client.send(
new GetQueryResultsCommand({ QueryExecutionId: input.QueryExecutionId, NextToken: input.NextToken }),
);
return {
ResultSet: result.ResultSet as KtxAthenaResultSet | undefined,
NextToken: result.NextToken,
};
},
};
}
createGlueClient(region: string): KtxGlueClient {
const client = new GlueClient({ region });
return {
getDatabases: async (input) => {
const result = await client.send(new GetDatabasesCommand({ CatalogId: input.CatalogId, NextToken: input.NextToken }));
return {
DatabaseList: result.DatabaseList?.map((db) => ({ Name: db.Name })),
NextToken: result.NextToken,
};
},
getTables: async (input) => {
const result = await client.send(
new GetTablesCommand({ DatabaseName: input.DatabaseName, CatalogId: input.CatalogId, NextToken: input.NextToken }),
);
return {
TableList: result.TableList as KtxGlueTable[] | undefined,
NextToken: result.NextToken,
};
},
};
}
}
function stringConfigValue(
connection: KtxAthenaConnectionConfig | undefined,
key: keyof KtxAthenaConnectionConfig,
env: NodeJS.ProcessEnv,
): string | undefined {
const value = connection?.[key];
if (typeof value !== 'string' || value.trim().length === 0) return undefined;
// Resolve before checking emptiness: an unset `env:` reference resolves to '',
// which must become undefined so `?? default` applies instead of keeping ''.
const resolved = resolveStringReference(value.trim(), env).trim();
return resolved.length > 0 ? resolved : undefined;
}
function configuredAthenaDatabases(connection: KtxAthenaConnectionConfig): string[] {
if (!Array.isArray(connection.databases)) return [];
const selected = connection.databases
.filter((database): database is string => typeof database === 'string' && database.trim().length > 0)
.map((database) => database.trim());
return [...new Set(selected)];
}
export function isKtxAthenaConnectionConfig(
connection: unknown,
): connection is KtxAthenaConnectionConfig {
return (
typeof connection === 'object' &&
connection !== null &&
String((connection as { driver?: unknown }).driver ?? '').toLowerCase() === 'athena'
);
}
/** @internal */
export function athenaConnectionConfigFromConfig(input: {
connectionId: string;
connection: KtxAthenaConnectionConfig | undefined;
env?: NodeJS.ProcessEnv;
}): KtxAthenaResolvedConnectionConfig {
const inputDriver = input.connection?.driver ?? 'unknown';
if (!isKtxAthenaConnectionConfig(input.connection)) {
throw new Error(`Native Athena connector cannot run driver "${String(inputDriver)}"`);
}
const env = input.env ?? process.env;
const region = stringConfigValue(input.connection, 'region', env);
if (!region) {
throw new Error(`Native Athena connector requires connections.${input.connectionId}.region`);
}
const s3StagingDir = stringConfigValue(input.connection, 's3_staging_dir', env);
if (!s3StagingDir) {
throw new Error(`Native Athena connector requires connections.${input.connectionId}.s3_staging_dir`);
}
return {
region,
s3StagingDir,
workgroup: stringConfigValue(input.connection, 'workgroup', env) ?? 'primary',
catalog: stringConfigValue(input.connection, 'catalog', env) ?? 'AwsDataCatalog',
database: stringConfigValue(input.connection, 'database', env),
databases: configuredAthenaDatabases(input.connection),
};
}
function glueTableKind(tableType: string | undefined): 'table' | 'view' {
const t = String(tableType ?? '').toUpperCase();
if (t === 'VIRTUAL_VIEW') return 'view';
return 'table';
}
const POLL_INTERVAL_MS = 250;
const QUERY_TIMEOUT_MS = 5 * 60 * 1000;
export interface KtxAthenaScanConnectorOptions {
connectionId: string;
connection: KtxAthenaConnectionConfig | undefined;
clientFactory?: KtxAthenaClientFactory;
env?: NodeJS.ProcessEnv;
now?: () => Date;
}
export class KtxAthenaScanConnector implements KtxScanConnector {
readonly id: string;
readonly driver = 'athena' as const;
readonly capabilities = createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: false,
readOnlySql: true,
nestedAnalysis: false,
formalForeignKeys: false,
estimatedRowCounts: false,
});
private readonly connectionId: string;
private readonly resolved: KtxAthenaResolvedConnectionConfig;
private readonly clientFactory: KtxAthenaClientFactory;
private readonly now: () => Date;
private readonly dialect = getSqlDialectForDriver('athena');
private athenaClient: KtxAthenaClient | null = null;
private glueClient: KtxGlueClient | null = null;
constructor(options: KtxAthenaScanConnectorOptions) {
this.connectionId = options.connectionId;
this.resolved = athenaConnectionConfigFromConfig({
connectionId: options.connectionId,
connection: options.connection,
env: options.env,
});
this.clientFactory = options.clientFactory ?? new DefaultAthenaClientFactory();
this.now = options.now ?? (() => new Date());
this.id = `athena:${options.connectionId}`;
}
async testConnection(): Promise<KtxConnectorTestResult> {
try {
await this.listDatabasesPaginated({ maxResults: 1 });
return { success: true };
} catch (error) {
return connectorTestFailure(error);
}
}
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
// Honor the configured `databases` scope (written by `ktx setup`); fall back
// to every Glue database only when the scope is unset.
const databases =
this.resolved.databases.length > 0 ? this.resolved.databases : await this.listDatabasesPaginated({});
const tables: KtxSchemaTable[] = [];
for (const database of databases) {
const scopedNames = input.tableScope
? scopedTableNames(input.tableScope, { catalog: this.resolved.catalog, db: database })
: null;
tables.push(...(await this.introspectDatabase(database, scopedNames)));
}
return {
connectionId: this.connectionId,
driver: 'athena',
extractedAt: this.now().toISOString(),
scope: { catalogs: [this.resolved.catalog], datasets: databases },
metadata: {
catalog: this.resolved.catalog,
databases,
table_count: tables.length,
total_columns: tables.reduce((sum, t) => sum + t.columns.length, 0),
},
tables,
warnings: [],
};
}
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult & { headerTypes?: string[] }> {
this.assertConnection(input.connectionId);
const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns));
return { headers: result.headers, headerTypes: result.headerTypes, rows: result.rows, totalRows: result.totalRows };
}
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.query(
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
);
return {
values: result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]),
nullCount: null,
distinctCount: null,
};
}
async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
return null;
}
async executeReadOnly(input: KtxReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
this.assertConnection(input.connectionId);
const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
const result = await this.query(limitedSql);
return { ...result, rowCount: result.rows.length };
}
async listSchemas(): Promise<string[]> {
return this.listDatabasesPaginated({});
}
async listTables(databases?: string[]): Promise<KtxTableListEntry[]> {
const targetDatabases = databases && databases.length > 0 ? databases : await this.listDatabasesPaginated({});
const entries: KtxTableListEntry[] = [];
for (const database of targetDatabases) {
const glueTables = await this.listGlueTablesPaginated(database);
for (const t of glueTables) {
if (!t.Name) continue;
entries.push({
catalog: this.resolved.catalog,
schema: database,
name: t.Name,
kind: glueTableKind(t.TableType),
});
}
}
return entries;
}
async cleanup(): Promise<void> {
this.athenaClient = null;
this.glueClient = null;
}
qTableName(table: Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>): string {
return this.dialect.formatTableName(table);
}
private getAthenaClient(): KtxAthenaClient {
if (!this.athenaClient) {
this.athenaClient = this.clientFactory.createAthenaClient(this.resolved.region);
}
return this.athenaClient;
}
private getGlueClient(): KtxGlueClient {
if (!this.glueClient) {
this.glueClient = this.clientFactory.createGlueClient(this.resolved.region);
}
return this.glueClient;
}
private async listDatabasesPaginated(opts: { maxResults?: number }): Promise<string[]> {
const names: string[] = [];
let nextToken: string | undefined;
do {
const result = await this.getGlueClient().getDatabases({ NextToken: nextToken });
for (const db of result.DatabaseList ?? []) {
if (db.Name) names.push(db.Name);
if (opts.maxResults && names.length >= opts.maxResults) return names;
}
nextToken = result.NextToken;
} while (nextToken);
return names;
}
private async listGlueTablesPaginated(database: string): Promise<KtxGlueTable[]> {
const tables: KtxGlueTable[] = [];
let nextToken: string | undefined;
do {
const result = await this.getGlueClient().getTables({ DatabaseName: database, NextToken: nextToken });
tables.push(...(result.TableList ?? []));
nextToken = result.NextToken;
} while (nextToken);
return tables;
}
private async introspectDatabase(database: string, scopedNames: readonly string[] | null): Promise<KtxSchemaTable[]> {
if (scopedNames && scopedNames.length === 0) return [];
const glueTables = await this.listGlueTablesPaginated(database);
const scopeSet = scopedNames ? new Set(scopedNames) : null;
return glueTables
.filter((t): t is KtxGlueTable & { Name: string } => Boolean(t.Name) && (!scopeSet || scopeSet.has(t.Name!)))
.map((t) => ({
catalog: this.resolved.catalog,
db: database,
name: t.Name,
kind: glueTableKind(t.TableType),
comment: t.Description ?? null,
estimatedRows: null,
columns: this.toSchemaColumns(t),
foreignKeys: [],
}));
}
private toSchemaColumns(table: KtxGlueTable): KtxSchemaColumn[] {
const columns = [...(table.StorageDescriptor?.Columns ?? []), ...(table.PartitionKeys ?? [])];
return columns
.filter((col): col is KtxGlueColumnDef & { Name: string } => Boolean(col.Name))
.map((col) => {
const nativeType = String(col.Type ?? 'string').toLowerCase();
return {
name: col.Name,
nativeType,
normalizedType: this.dialect.mapDataType(nativeType),
dimensionType: this.dialect.mapToDimensionType(nativeType),
nullable: true,
primaryKey: false,
comment: col.Comment ?? null,
};
});
}
private async query(sql: string): Promise<KtxQueryResult> {
const athena = this.getAthenaClient();
const { QueryExecutionId } = await athena.startQueryExecution({
QueryString: sql,
ResultConfiguration: { OutputLocation: this.resolved.s3StagingDir },
WorkGroup: this.resolved.workgroup,
...(this.resolved.database || this.resolved.catalog
? {
QueryExecutionContext: {
...(this.resolved.database ? { Database: this.resolved.database } : {}),
...(this.resolved.catalog ? { Catalog: this.resolved.catalog } : {}),
},
}
: {}),
});
if (!QueryExecutionId) {
throw new Error('Athena did not return a QueryExecutionId');
}
await this.waitForQueryCompletion(athena, QueryExecutionId);
const rows: unknown[][] = [];
let headers: string[] = [];
let headerTypes: string[] = [];
let nextToken: string | undefined;
let firstPage = true;
do {
const result = await athena.getQueryResults({ QueryExecutionId, NextToken: nextToken });
const resultSet = result.ResultSet;
if (firstPage) {
const columnInfo = resultSet?.ResultSetMetadata?.ColumnInfo ?? [];
headers = columnInfo.map((col) => col.Name ?? '');
headerTypes = columnInfo.map((col) => String(col.Type ?? 'varchar').toUpperCase());
firstPage = false;
}
const pageRows = resultSet?.Rows ?? [];
// Athena includes the header row as the first row of the first page — skip it.
const dataRows = nextToken === undefined ? pageRows.slice(1) : pageRows;
for (const row of dataRows) {
rows.push((row.Data ?? []).map((d) => d.VarCharValue ?? null));
}
nextToken = result.NextToken;
} while (nextToken);
return {
headers,
headerTypes: headerTypes.length > 0 ? headerTypes : undefined,
rows,
totalRows: rows.length,
rowCount: rows.length,
};
}
private async waitForQueryCompletion(athena: KtxAthenaClient, queryExecutionId: string): Promise<void> {
const terminalStates = new Set(['SUCCEEDED', 'FAILED', 'CANCELLED']);
const deadline = this.now().getTime() + QUERY_TIMEOUT_MS;
for (;;) {
const { QueryExecution } = await athena.getQueryExecution({ QueryExecutionId: queryExecutionId });
const state = QueryExecution?.Status?.State ?? '';
if (state === 'SUCCEEDED') return;
if (terminalStates.has(state)) {
const reason = QueryExecution?.Status?.StateChangeReason ?? state;
throw new Error(`Athena query ${state}: ${reason}`);
}
if (this.now().getTime() >= deadline) {
throw new Error(`Athena query ${queryExecutionId} timed out after ${QUERY_TIMEOUT_MS / 1000}s`);
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
}
}
private assertConnection(connectionId: string): void {
if (connectionId !== this.connectionId) {
throw new Error(`Athena connector ${this.connectionId} cannot scan connection ${connectionId}`);
}
}
}

View file

@ -0,0 +1,175 @@
import type { KtxSqlDialect } from '../../context/connections/dialects.js';
import {
columnDisplayPartCount,
formatDialectDisplayRef,
formatDialectTableName,
parseDialectDisplayRef,
} from '../../context/connections/dialect-helpers.js';
import type { KtxSchemaDimensionType, KtxTableRef } from '../../context/scan/types.js';
type AthenaTableNameRef = Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>;
/** @internal */
export class KtxAthenaDialect implements KtxSqlDialect {
readonly type = 'athena' as const;
private readonly dimensionTypeMappings: Record<string, KtxSchemaDimensionType> = {
timestamp: 'time',
date: 'time',
bigint: 'number',
int: 'number',
integer: 'number',
tinyint: 'number',
smallint: 'number',
double: 'number',
float: 'number',
real: 'number',
boolean: 'boolean',
};
quoteIdentifier(identifier: string): string {
return `"${identifier.replace(/"/g, '""')}"`;
}
formatTableName(table: AthenaTableNameRef): string {
return formatDialectTableName(table, this.quoteIdentifier.bind(this), 'ansi');
}
formatDisplayRef(table: AthenaTableNameRef): string {
return formatDialectDisplayRef(table, 'ansi');
}
parseDisplayRef(display: string): KtxTableRef | null {
return parseDialectDisplayRef(display, 'ansi');
}
columnDisplayTablePartCount(): 1 | 2 | 3 {
return columnDisplayPartCount('ansi');
}
mapDataType(nativeType: string): string {
const base = nativeType.toLowerCase().trim().split('<')[0]!.split('(')[0]!.trim();
const typeMap: Record<string, string> = {
string: 'VARCHAR',
varchar: 'VARCHAR',
char: 'CHAR',
binary: 'VARBINARY',
bigint: 'BIGINT',
int: 'INTEGER',
integer: 'INTEGER',
tinyint: 'TINYINT',
smallint: 'SMALLINT',
double: 'DOUBLE',
float: 'FLOAT',
real: 'REAL',
decimal: 'DECIMAL',
boolean: 'BOOLEAN',
timestamp: 'TIMESTAMP',
date: 'DATE',
array: 'ARRAY',
map: 'MAP',
struct: 'STRUCT',
uniontype: 'UNION',
};
return typeMap[base] ?? nativeType.toUpperCase();
}
mapToDimensionType(nativeType: string): KtxSchemaDimensionType {
const base = nativeType.toLowerCase().trim().split('<')[0]!.split('(')[0]!.trim();
const mapped = this.dimensionTypeMappings[base];
if (mapped) return mapped;
if (base.includes('timestamp') || base.includes('date')) return 'time';
if (base.includes('int') || base.includes('float') || base.includes('double') || base.includes('decimal') || base.includes('real')) return 'number';
if (base.includes('bool')) return 'boolean';
return 'string';
}
generateSampleQuery(tableName: string, limit: number, columns?: string[]): string {
const columnList =
columns && columns.length > 0 ? columns.map((c) => this.quoteIdentifier(c)).join(', ') : '*';
return `SELECT ${columnList} FROM ${tableName} LIMIT ${limit}`;
}
generateColumnSampleQuery(tableName: string, columnName: string, limit: number): string {
const quoted = this.quoteIdentifier(columnName);
return `SELECT ${quoted} FROM ${tableName} WHERE ${quoted} IS NOT NULL LIMIT ${limit}`;
}
generateCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string {
return `
SELECT approx_distinct(${columnName}) AS cardinality
FROM (
SELECT ${columnName}
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
LIMIT ${sampleSize}
)
`;
}
generateRandomizedCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string {
return `
SELECT approx_distinct(${columnName}) AS cardinality
FROM (
SELECT ${columnName}
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
ORDER BY rand()
LIMIT ${sampleSize}
)
`;
}
generateDistinctValuesQuery(tableName: string, columnName: string, limit: number): string {
return `
SELECT DISTINCT CAST(${columnName} AS VARCHAR) AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
ORDER BY val
LIMIT ${limit}
`;
}
generateColumnStatisticsQuery(_schemaName: string, _tableName: string): string | null {
return null;
}
getNullCountExpression(column: string): string {
return `COUNT_IF(${column} IS NULL)`;
}
getDistinctCountExpression(column: string): string {
return `approx_distinct(${column})`;
}
textLengthExpression(columnSql: string): string {
return `LENGTH(CAST(${columnSql} AS VARCHAR))`;
}
castToText(columnSql: string): string {
return `CAST(${columnSql} AS VARCHAR)`;
}
getSampleValueAggregation(innerSql: string): string {
return `(SELECT array_join(array_agg(CAST(value AS VARCHAR)), '\u001f') FROM (${innerSql}) AS relationship_profile_values)`;
}
getLimitOffsetClause(limit: number, offset?: number): string {
const safeLimit = Math.max(1, Math.floor(limit));
const safeOffset = offset !== undefined ? Math.floor(offset) : 0;
return safeOffset > 0 ? `OFFSET ${safeOffset} LIMIT ${safeLimit}` : `LIMIT ${safeLimit}`;
}
getTopClause(_limit: number): string {
return '';
}
getTableSampleClause(_samplePct: number): string {
return '';
}
getRandomSampleFilter(samplePct: number): string {
if (samplePct <= 0 || samplePct >= 1) return '';
return `rand() < ${samplePct}`;
}
}

View file

@ -0,0 +1,44 @@
import type {
LiveDatabaseIntrospectionOptions,
LiveDatabaseIntrospectionPort,
} from '../../context/ingest/adapters/live-database/types.js';
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
import {
KtxAthenaScanConnector,
type KtxAthenaClientFactory,
type KtxAthenaConnectionConfig,
} from './connector.js';
interface CreateAthenaLiveDatabaseIntrospectionOptions {
connections: Record<string, KtxProjectConnectionConfig>;
clientFactory?: KtxAthenaClientFactory;
now?: () => Date;
}
export function createAthenaLiveDatabaseIntrospection(
options: CreateAthenaLiveDatabaseIntrospectionOptions,
): LiveDatabaseIntrospectionPort {
return {
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
const connection = options.connections[connectionId] as KtxAthenaConnectionConfig | undefined;
const connector = new KtxAthenaScanConnector({
connectionId,
connection,
clientFactory: options.clientFactory,
now: options.now,
});
try {
return await connector.introspect(
{
connectionId,
driver: 'athena',
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
},
{ runId: `athena-${connectionId}` },
);
} finally {
await connector.cleanup();
}
},
};
}

View file

@ -1,3 +1,4 @@
import { KtxAthenaDialect } from '../../connectors/athena/dialect.js';
import { KtxBigQueryDialect } from '../../connectors/bigquery/dialect.js';
import { KtxClickHouseDialect } from '../../connectors/clickhouse/dialect.js';
import { KtxDuckDbDialect } from '../../connectors/duckdb/dialect.js';
@ -54,6 +55,7 @@ export interface KtxSqlDialect extends KtxDialect {
type KtxSqlDriver = Exclude<KtxConnectionDriver, 'mongodb'>;
const sqlDialectFactories: Record<KtxSqlDriver, () => KtxSqlDialect> = {
athena: () => new KtxAthenaDialect(),
bigquery: () => new KtxBigQueryDialect(),
clickhouse: () => new KtxClickHouseDialect(),
duckdb: () => new KtxDuckDbDialect(),

View file

@ -26,6 +26,23 @@ function invalidConnectionConfig(driver: KtxConnectionDriver): Error {
/** @internal */
export const driverRegistrations: Record<KtxConnectionDriver, KtxDriverRegistration> = {
athena: {
driver: 'athena',
scopeConfigKey: 'databases',
hasHistoricSqlReader: false,
load: async () => {
const m = await import('../../connectors/athena/connector.js');
return {
isConnectionConfig: (connection) => m.isKtxAthenaConnectionConfig(connection),
createScanConnector: ({ connectionId, connection }) => {
if (!m.isKtxAthenaConnectionConfig(connection)) {
throw invalidConnectionConfig('athena');
}
return new m.KtxAthenaScanConnector({ connectionId, connection });
},
};
},
},
bigquery: {
driver: 'bigquery',
scopeConfigKey: 'dataset_ids',

View file

@ -14,6 +14,7 @@ const warehouseDrivers = [
'duckdb',
'clickhouse',
'sqlserver',
'athena',
] as const;
type WarehouseDriver = (typeof warehouseDrivers)[number];
@ -56,6 +57,7 @@ const warehouseConnectionSchemas = [
warehouseConnectionSchema('duckdb'),
warehouseConnectionSchema('clickhouse'),
warehouseConnectionSchema('sqlserver'),
warehouseConnectionSchema('athena'),
] as const;
const mongodbConnectionSchema = z

View file

@ -147,12 +147,13 @@ function normalizeDriver(driver: string | undefined): KtxConnectionDriver {
normalized === 'sqlserver' ||
normalized === 'bigquery' ||
normalized === 'snowflake' ||
normalized === 'athena' ||
normalized === 'mongodb'
) {
return normalized;
}
throw new Error(
`Standalone ktx scan supports postgres/sqlite/duckdb/mysql/clickhouse/sqlserver/bigquery/snowflake/mongodb in this phase, received "${driver ?? 'unknown'}"`,
`Standalone ktx scan supports postgres/sqlite/duckdb/mysql/clickhouse/sqlserver/bigquery/snowflake/athena/mongodb in this phase, received "${driver ?? 'unknown'}"`,
);
}

View file

@ -9,6 +9,7 @@ export type KtxConnectionDriver =
| 'snowflake'
| 'mysql'
| 'clickhouse'
| 'athena'
| 'mongodb';
/** Canonical scan-mode registry. Runtime validation derives its allowlist here. */

View file

@ -18,6 +18,7 @@ export const DIALECTS_WITH_NOTES = [
'duckdb',
'clickhouse',
'tsql',
'athena',
] as const;
type DialectWithNotes = (typeof DIALECTS_WITH_NOTES)[number];

View file

@ -16,6 +16,7 @@ const SQLGLOT_DIALECTS: Record<string, SqlAnalysisDialect> = {
duckdb: 'duckdb',
clickhouse: 'clickhouse',
databricks: 'databricks',
athena: 'athena',
};
export function sqlAnalysisDialectForDriver(driver: string | undefined): SqlAnalysisDialect {

View file

@ -0,0 +1,12 @@
**athena** SQL conventions (Trino engine over the Glue Data Catalog):
- **FQTN:** `database.table` (e.g. `analytics.orders`); a bare `table` resolves against the query's default database. Cross-catalog is `catalog.database.table` (e.g. `awsdatacatalog.analytics.orders`).
- **Identifiers:** case-insensitive and folded to lowercase; double-quote (`"Name"`) to keep case, spaces, or a reserved word. String literals use single quotes only.
- **Date/time:** native `DATE`/`TIMESTAMP`. Bucket with `date_trunc('month', ts)`, pull parts with `EXTRACT(YEAR FROM ts)`, shift with `date_add('day', -30, current_date)`, difference with `date_diff('day', a, b)`, and format with `date_format(ts, '%Y-%m')`. Parse text with `date_parse(str, '%Y-%m-%d')` or `from_iso8601_timestamp(str)`; `current_date` / `now()` are available.
- **Top-N / windows:** Athena has no `QUALIFY` — wrap the window in a subquery and filter it: `SELECT * FROM (SELECT ..., ROW_NUMBER() OVER (PARTITION BY key ORDER BY x DESC) AS rn FROM t) WHERE rn = 1` returns one row per key. Use `ORDER BY ... LIMIT n` for a global top-N, and paginate with `OFFSET m LIMIT n` (offset first — `LIMIT n OFFSET m` is a syntax error).
- **Series:** `CROSS JOIN UNNEST(sequence(DATE '2023-01-01', DATE '2023-12-01', INTERVAL '1' MONTH)) AS s(d)` expands a generated array into a date spine (use `sequence(1, 12)` for integers), then `LEFT JOIN` the aggregated facts onto it so empty periods still appear.
- **Rolling window over time:** a native `RANGE` frame spans real dates and tolerates gaps — `AVG(amount) OVER (ORDER BY day RANGE BETWEEN INTERVAL '29' DAY PRECEDING AND CURRENT ROW)` is a trailing 30-day average without a spine; guard minimum periods with `COUNT(*) OVER (<same frame>)`.
- **Approximate aggregates:** `approx_distinct(x)` for cardinality and `approx_percentile(x, 0.5)` for quantiles are far cheaper than exact `COUNT(DISTINCT ...)` on large scans.
- **Arrays & maps:** explode with `CROSS JOIN UNNEST(arr) AS t(x)` (add `WITH ORDINALITY` for an index); build with `array_agg(x)`, join with `array_join(arr, ',')`, index 1-based (`arr[1]`), and read a map with `element_at(m, key)`.
- **Safe cast:** `TRY_CAST(x AS DOUBLE)` yields `NULL` for a value that does not parse instead of raising, so counting residual `NULL`s catches an encoding the sample missed; `TRY(expr)` swallows other runtime errors.
- **Integer division:** `/` between integers truncates (`5 / 2``2`); cast an operand (`x / CAST(y AS DOUBLE)`) to keep the fraction, and round only in the final projection.
- **JSON:** `json_extract_scalar(col, '$.a.b')` returns varchar, `json_extract(col, '$.a')` returns json; cast a JSON string with `CAST(json_parse(col) AS ...)`.

View file

@ -1,3 +1,5 @@
import { createAthenaLiveDatabaseIntrospection } from './connectors/athena/live-database-introspection.js';
import { isKtxAthenaConnectionConfig } from './connectors/athena/connector.js';
import { createBigQueryLiveDatabaseIntrospection } from './connectors/bigquery/live-database-introspection.js';
import { isKtxBigQueryConnectionConfig, KtxBigQueryScanConnector, type KtxBigQueryConnectionConfig } from './connectors/bigquery/connector.js';
import { createClickHouseLiveDatabaseIntrospection } from './connectors/clickhouse/live-database-introspection.js';
@ -125,6 +127,9 @@ function createKtxCliLiveDatabaseIntrospection(
const bigquery = createBigQueryLiveDatabaseIntrospection({
connections: project.config.connections,
});
const athena = createAthenaLiveDatabaseIntrospection({
connections: project.config.connections,
});
return {
async extractSchema(connectionId: string, options?: LiveDatabaseIntrospectionOptions) {
const connection = project.config.connections[connectionId];
@ -160,6 +165,9 @@ function createKtxCliLiveDatabaseIntrospection(
if (isKtxBigQueryConnectionConfig(connection)) {
return bigquery.extractSchema(connectionId, options);
}
if (isKtxAthenaConnectionConfig(connection)) {
return athena.extractSchema(connectionId, options);
}
if (hasSnowflakeDriver(connection)) {
const { createSnowflakeLiveDatabaseIntrospection } = await import('./connectors/snowflake/live-database-introspection.js');
const { isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');;

View file

@ -71,6 +71,7 @@ export type KtxSetupDatabaseDriver =
| 'sqlserver'
| 'bigquery'
| 'snowflake'
| 'athena'
| 'mongodb';
export interface KtxSetupDatabasesArgs {
@ -158,6 +159,7 @@ const DRIVER_OPTIONS: Array<{ value: KtxSetupDatabaseDriver; label: string }> =
{ value: 'mysql', label: 'MySQL' },
{ value: 'clickhouse', label: 'ClickHouse' },
{ value: 'sqlserver', label: 'SQL Server' },
{ value: 'athena', label: 'Amazon Athena' },
{ value: 'mongodb', label: 'MongoDB' },
{ value: 'sqlite', label: 'SQLite' },
{ value: 'duckdb', label: 'DuckDB' },
@ -183,6 +185,7 @@ const DEFAULT_CONNECTION_IDS: Record<KtxSetupDatabaseDriver, string> = {
sqlserver: 'sqlserver-warehouse',
bigquery: 'bigquery-warehouse',
snowflake: 'snowflake-warehouse',
athena: 'athena-warehouse',
mongodb: 'mongodb-source',
};
@ -268,6 +271,13 @@ const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscove
configSingleField: 'schema_name',
suggest: defaultSuggest,
},
athena: {
noun: 'database',
nounPlural: 'databases',
promptLabel: 'Glue databases',
configArrayField: 'databases',
suggest: defaultSuggest,
},
};
type UrlDriverType = Extract<KtxSetupDatabaseDriver, 'postgres' | 'mysql' | 'clickhouse' | 'sqlserver'>;
@ -968,6 +978,47 @@ async function buildConnectionConfig(input: {
...(role ? { role } : {}),
};
}
if (driver === 'athena') {
if (args.inputMode === 'disabled' && !args.databaseUrl) return null;
const region = await promptText(
prompts,
'AWS region\nFor example us-east-1.',
stringConfigField(input.existingConnection, 'region'),
);
if (region === undefined) return 'back';
if (!region) return null;
const s3StagingDir = await promptText(
prompts,
'S3 staging directory\nAthena writes query results here. For example s3://my-bucket/athena-results/.',
stringConfigField(input.existingConnection, 's3_staging_dir'),
);
if (s3StagingDir === undefined) return 'back';
if (!s3StagingDir) return null;
const workgroup = await promptText(
prompts,
'Athena workgroup (optional)\nPress Enter to use the default workgroup "primary".',
stringConfigField(input.existingConnection, 'workgroup'),
);
if (workgroup === undefined) return 'back';
const catalog = await promptText(
prompts,
'Glue Data Catalog name (optional)\nPress Enter to use the default "AwsDataCatalog".',
stringConfigField(input.existingConnection, 'catalog'),
);
if (catalog === undefined) return 'back';
return {
driver: 'athena',
region,
s3_staging_dir: s3StagingDir,
...(workgroup ? { workgroup } : {}),
...(catalog ? { catalog } : {}),
...scriptedScopeConfigForDriver('athena', args.databaseSchemas),
};
}
throw new Error(`Unsupported database driver: ${driver}`);
}

View file

@ -0,0 +1,630 @@
import { describe, expect, it, vi } from 'vitest';
import {
athenaConnectionConfigFromConfig,
isKtxAthenaConnectionConfig,
KtxAthenaScanConnector,
type KtxAthenaClientFactory,
type KtxAthenaClient,
type KtxGlueClient,
} from '../../../src/connectors/athena/connector.js';
import { createAthenaLiveDatabaseIntrospection } from '../../../src/connectors/athena/live-database-introspection.js';
import { tableRefSet } from '../../../src/context/scan/table-ref.js';
function fakeClientFactory(options: { queryState?: string; queryError?: string } = {}): KtxAthenaClientFactory {
const state = options.queryState ?? 'SUCCEEDED';
const queries = new Map<string, string>();
let execCounter = 0;
const fakeAthenaClient: KtxAthenaClient = {
startQueryExecution: vi.fn(async (input) => {
const id = `exec-${++execCounter}`;
queries.set(id, input.QueryString);
return { QueryExecutionId: id };
}),
getQueryExecution: vi.fn(async () => ({
QueryExecution: {
Status: {
State: state,
StateChangeReason: options.queryError,
},
},
})),
getQueryResults: vi.fn(async (input) => {
const sql = queries.get(input.QueryExecutionId) ?? '';
// Column sample query: single-column result for the queried column only.
if (sql.includes('IS NOT NULL')) {
return {
ResultSet: {
ResultSetMetadata: { ColumnInfo: [{ Name: 'status', Type: 'string' }] },
Rows: [
{ Data: [{ VarCharValue: 'status' }] }, // header row
{ Data: [{ VarCharValue: 'paid' }] },
],
},
NextToken: undefined,
};
}
return {
ResultSet: {
ResultSetMetadata: {
ColumnInfo: [
{ Name: 'id', Type: 'bigint' },
{ Name: 'status', Type: 'string' },
],
},
Rows: [
// Header row (Athena always includes it on first page)
{ Data: [{ VarCharValue: 'id' }, { VarCharValue: 'status' }] },
// Data row
{ Data: [{ VarCharValue: '1' }, { VarCharValue: 'paid' }] },
],
},
NextToken: undefined,
};
}),
};
const fakeGlueClient: KtxGlueClient = {
getDatabases: vi.fn(async () => ({
DatabaseList: [{ Name: 'analytics' }],
NextToken: undefined,
})),
getTables: vi.fn(async () => ({
TableList: [
{
Name: 'orders',
TableType: 'EXTERNAL_TABLE',
Description: 'Orders table',
StorageDescriptor: {
Columns: [
{ Name: 'id', Type: 'bigint', Comment: 'Order id' },
{ Name: 'status', Type: 'string' },
],
},
PartitionKeys: [{ Name: 'dt', Type: 'date', Comment: 'Partition date' }],
},
],
NextToken: undefined,
})),
};
return {
createAthenaClient: vi.fn(() => fakeAthenaClient),
createGlueClient: vi.fn(() => fakeGlueClient),
};
}
const connection = {
driver: 'athena',
region: 'us-east-1',
s3_staging_dir: 's3://my-bucket/athena-results/',
workgroup: 'analytics',
catalog: 'AwsDataCatalog',
database: 'analytics',
} as const;
describe('KtxAthenaScanConnector', () => {
it('identifies athena connection configs correctly', () => {
expect(isKtxAthenaConnectionConfig(connection)).toBe(true);
expect(isKtxAthenaConnectionConfig({ driver: 'bigquery' })).toBe(false);
expect(isKtxAthenaConnectionConfig(null)).toBe(false);
expect(isKtxAthenaConnectionConfig(undefined)).toBe(false);
});
it('resolves configuration and throws on missing required fields', () => {
expect(athenaConnectionConfigFromConfig({ connectionId: 'dw', connection })).toMatchObject({
region: 'us-east-1',
s3StagingDir: 's3://my-bucket/athena-results/',
workgroup: 'analytics',
catalog: 'AwsDataCatalog',
database: 'analytics',
});
expect(() =>
athenaConnectionConfigFromConfig({ connectionId: 'dw', connection: { driver: 'athena' } }),
).toThrow('connections.dw.region');
expect(() =>
athenaConnectionConfigFromConfig({
connectionId: 'dw',
connection: { driver: 'athena', region: 'us-east-1' },
}),
).toThrow('connections.dw.s3_staging_dir');
});
it('applies defaults for optional config fields', () => {
const resolved = athenaConnectionConfigFromConfig({
connectionId: 'dw',
connection: { driver: 'athena', region: 'us-east-1', s3_staging_dir: 's3://bucket/' },
});
expect(resolved.workgroup).toBe('primary');
expect(resolved.catalog).toBe('AwsDataCatalog');
expect(resolved.database).toBeUndefined();
});
it('introspects databases, tables, and columns from Glue', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
now: () => new Date('2026-06-21T10:00:00.000Z'),
});
const snapshot = await connector.introspect(
{ connectionId: 'dw', driver: 'athena' },
{ runId: 'scan-1' },
);
expect(snapshot).toMatchObject({
connectionId: 'dw',
driver: 'athena',
extractedAt: '2026-06-21T10:00:00.000Z',
scope: { catalogs: ['AwsDataCatalog'], datasets: ['analytics'] },
metadata: {
catalog: 'AwsDataCatalog',
databases: ['analytics'],
table_count: 1,
total_columns: 3,
},
});
expect(snapshot.tables[0]).toMatchObject({
catalog: 'AwsDataCatalog',
db: 'analytics',
name: 'orders',
kind: 'table',
comment: 'Orders table',
estimatedRows: null,
foreignKeys: [],
});
expect(snapshot.tables[0]?.columns).toEqual([
{
name: 'id',
nativeType: 'bigint',
normalizedType: 'BIGINT',
dimensionType: 'number',
nullable: true,
primaryKey: false,
comment: 'Order id',
},
{
name: 'status',
nativeType: 'string',
normalizedType: 'VARCHAR',
dimensionType: 'string',
nullable: true,
primaryKey: false,
comment: null,
},
{
name: 'dt',
nativeType: 'date',
normalizedType: 'DATE',
dimensionType: 'time',
nullable: true,
primaryKey: false,
comment: 'Partition date',
},
]);
});
it('respects tableScope and excludes tables not in scope', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
now: () => new Date('2026-06-21T10:00:00.000Z'),
});
const scopedSnapshot = await connector.introspect(
{
connectionId: 'dw',
driver: 'athena',
tableScope: tableRefSet([{ catalog: 'AwsDataCatalog', db: 'analytics', name: 'nonexistent' }]),
},
{ runId: 'scan-1' },
);
expect(scopedSnapshot.tables).toHaveLength(0);
const matchingSnapshot = await connector.introspect(
{
connectionId: 'dw',
driver: 'athena',
tableScope: tableRefSet([{ catalog: 'AwsDataCatalog', db: 'analytics', name: 'orders' }]),
},
{ runId: 'scan-1' },
);
expect(matchingSnapshot.tables).toHaveLength(1);
expect(matchingSnapshot.tables[0]?.name).toBe('orders');
});
it('limits introspection to the configured databases scope', async () => {
const requestedDatabases: string[] = [];
const getDatabases = vi.fn(async () => ({
DatabaseList: [{ Name: 'analytics' }, { Name: 'raw' }, { Name: 'staging' }],
NextToken: undefined,
}));
const glueClient: KtxGlueClient = {
getDatabases,
getTables: vi.fn(async (input) => {
requestedDatabases.push(input.DatabaseName);
return {
TableList: [
{
Name: `${input.DatabaseName}_orders`,
TableType: 'EXTERNAL_TABLE',
StorageDescriptor: { Columns: [{ Name: 'id', Type: 'bigint' }] },
},
],
NextToken: undefined,
};
}),
};
const clientFactory: KtxAthenaClientFactory = {
createAthenaClient: vi.fn(() => fakeClientFactory().createAthenaClient('us-east-1')),
createGlueClient: vi.fn(() => glueClient),
};
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection: { ...connection, databases: ['analytics', 'raw'] },
clientFactory,
now: () => new Date('2026-06-21T10:00:00.000Z'),
});
const snapshot = await connector.introspect({ connectionId: 'dw', driver: 'athena' }, { runId: 'scan-1' });
// Scope is taken from config, so the account-wide database list is never enumerated.
expect(getDatabases).not.toHaveBeenCalled();
expect(requestedDatabases).toEqual(['analytics', 'raw']);
expect(snapshot.scope).toMatchObject({ datasets: ['analytics', 'raw'] });
expect(snapshot.tables.map((t) => t.db)).toEqual(['analytics', 'raw']);
});
it('resolves optional env-referenced config to defaults when the variable is unset', () => {
const resolved = athenaConnectionConfigFromConfig({
connectionId: 'dw',
connection: {
driver: 'athena',
region: 'us-east-1',
s3_staging_dir: 's3://bucket/',
workgroup: 'env:ATHENA_WORKGROUP_UNSET',
catalog: 'env:GLUE_CATALOG_UNSET',
},
env: {},
});
expect(resolved.workgroup).toBe('primary');
expect(resolved.catalog).toBe('AwsDataCatalog');
});
it('samples a table via Athena query execution', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
});
const result = await connector.sampleTable(
{
connectionId: 'dw',
table: { catalog: 'AwsDataCatalog', db: 'analytics', name: 'orders' },
columns: ['id', 'status'],
limit: 10,
},
{ runId: 'scan-1' },
);
expect(result).toMatchObject({
headers: ['id', 'status'],
rows: [['1', 'paid']],
totalRows: 1,
});
});
it('samples a column via Athena query execution', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
});
const result = await connector.sampleColumn(
{
connectionId: 'dw',
table: { catalog: 'AwsDataCatalog', db: 'analytics', name: 'orders' },
column: 'status',
limit: 10,
},
{ runId: 'scan-1' },
);
expect(result).toMatchObject({
values: ['paid'],
nullCount: null,
distinctCount: null,
});
});
it('executes read-only SQL and rejects write statements', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
});
await expect(
connector.executeReadOnly(
{ connectionId: 'dw', sql: 'SELECT id, status FROM "analytics"."orders"', maxRows: 100 },
{ runId: 'scan-1' },
),
).resolves.toMatchObject({
headers: ['id', 'status'],
rows: [['1', 'paid']],
rowCount: 1,
});
await expect(
connector.executeReadOnly({ connectionId: 'dw', sql: 'DELETE FROM orders' }, { runId: 'scan-1' }),
).rejects.toThrow('Only read-only SELECT/WITH queries can be executed locally');
});
it('lists schemas (databases) from Glue', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
});
await expect(connector.listSchemas()).resolves.toEqual(['analytics']);
});
it('lists tables from Glue', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
});
await expect(connector.listTables(['analytics'])).resolves.toEqual([
{
catalog: 'AwsDataCatalog',
schema: 'analytics',
name: 'orders',
kind: 'table',
},
]);
});
it('returns null for columnStats', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
});
await expect(
connector.columnStats(
{ connectionId: 'dw', table: { catalog: 'AwsDataCatalog', db: 'analytics', name: 'orders' }, column: 'status' },
{ runId: 'scan-1' },
),
).resolves.toBeNull();
});
it('tests connection successfully', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
});
await expect(connector.testConnection()).resolves.toMatchObject({ success: true });
});
it('returns failure result when testConnection throws', async () => {
const factory = fakeClientFactory();
const glueClient = factory.createGlueClient('us-east-1');
vi.mocked(glueClient.getDatabases).mockRejectedValue(new Error('Access denied'));
const brokenFactory: KtxAthenaClientFactory = {
createAthenaClient: factory.createAthenaClient,
createGlueClient: vi.fn(() => glueClient),
};
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: brokenFactory,
});
await expect(connector.testConnection()).resolves.toMatchObject({
success: false,
error: 'Access denied',
});
});
it('cleans up without throwing', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory(),
});
await connector.listSchemas();
await expect(connector.cleanup()).resolves.toBeUndefined();
});
it('throws when query execution fails', async () => {
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory({ queryState: 'FAILED', queryError: 'Syntax error in SQL' }),
});
await expect(
connector.executeReadOnly({ connectionId: 'dw', sql: 'SELECT 1' }, { runId: 'scan-1' }),
).rejects.toThrow('Athena query FAILED: Syntax error in SQL');
});
it('throws when query execution times out', async () => {
let callCount = 0;
// First now() call sets the deadline; second call simulates time past it.
const now = () => (++callCount === 1 ? new Date(0) : new Date(5 * 60 * 1000 + 1));
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: fakeClientFactory({ queryState: 'RUNNING' }),
now,
});
await expect(
connector.executeReadOnly({ connectionId: 'dw', sql: 'SELECT 1' }, { runId: 'scan-1' }),
).rejects.toThrow('timed out after 300s');
});
it('passes the exact column list to Athena when sampling specific columns', async () => {
const factory = fakeClientFactory();
const athenaClient = factory.createAthenaClient('us-east-1');
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: { createAthenaClient: vi.fn(() => athenaClient), createGlueClient: factory.createGlueClient },
});
await connector.sampleTable(
{
connectionId: 'dw',
table: { catalog: 'AwsDataCatalog', db: 'analytics', name: 'orders' },
columns: ['id', 'status'],
limit: 5,
},
{ runId: 'scan-1' },
);
expect(vi.mocked(athenaClient.startQueryExecution).mock.calls[0]?.[0].QueryString).toBe(
'SELECT "id", "status" FROM "AwsDataCatalog"."analytics"."orders" LIMIT 5',
);
});
it('paginates Glue databases and tables across multiple pages', async () => {
const glueClient: KtxGlueClient = {
getDatabases: vi.fn()
.mockResolvedValueOnce({ DatabaseList: [{ Name: 'db1' }], NextToken: 'page2' })
.mockResolvedValueOnce({ DatabaseList: [{ Name: 'db2' }], NextToken: undefined }),
getTables: vi.fn().mockImplementation(async ({ DatabaseName }: { DatabaseName: string }) => {
if (DatabaseName === 'db1') {
return {
TableList: [
{
Name: 'table_a',
TableType: 'EXTERNAL_TABLE',
StorageDescriptor: { Columns: [{ Name: 'id', Type: 'bigint' }] },
},
],
NextToken: undefined,
};
}
return {
TableList: [
{
Name: 'table_b',
TableType: 'EXTERNAL_TABLE',
StorageDescriptor: { Columns: [{ Name: 'id', Type: 'bigint' }] },
},
],
NextToken: undefined,
};
}),
};
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: {
createAthenaClient: vi.fn(() => fakeClientFactory().createAthenaClient('us-east-1')),
createGlueClient: vi.fn(() => glueClient),
},
now: () => new Date('2026-06-21T10:00:00.000Z'),
});
const snapshot = await connector.introspect({ connectionId: 'dw', driver: 'athena' }, { runId: 'scan-1' });
expect(vi.mocked(glueClient.getDatabases)).toHaveBeenCalledTimes(2);
expect(snapshot.metadata).toMatchObject({ databases: ['db1', 'db2'], table_count: 2 });
expect(snapshot.tables.map((t) => t.name)).toEqual(['table_a', 'table_b']);
});
it('paginates Athena query results across multiple pages', async () => {
const factory = fakeClientFactory();
const athenaClient = factory.createAthenaClient('us-east-1');
vi.mocked(athenaClient.getQueryResults)
.mockResolvedValueOnce({
ResultSet: {
ResultSetMetadata: {
ColumnInfo: [
{ Name: 'id', Type: 'bigint' },
{ Name: 'status', Type: 'string' },
],
},
Rows: [
// Header row — only present on the first page
{ Data: [{ VarCharValue: 'id' }, { VarCharValue: 'status' }] },
{ Data: [{ VarCharValue: '1' }, { VarCharValue: 'paid' }] },
{ Data: [{ VarCharValue: '2' }, { VarCharValue: 'shipped' }] },
],
},
NextToken: 'page-2',
})
.mockResolvedValueOnce({
ResultSet: {
ResultSetMetadata: { ColumnInfo: [] },
// No header row on subsequent pages
Rows: [{ Data: [{ VarCharValue: '3' }, { VarCharValue: 'pending' }] }],
},
NextToken: undefined,
});
const connector = new KtxAthenaScanConnector({
connectionId: 'dw',
connection,
clientFactory: { createAthenaClient: vi.fn(() => athenaClient), createGlueClient: factory.createGlueClient },
});
const result = await connector.executeReadOnly(
{ connectionId: 'dw', sql: 'SELECT id, status FROM "analytics"."orders"', maxRows: 100 },
{ runId: 'scan-1' },
);
expect(result.headers).toEqual(['id', 'status']);
expect(result.rows).toEqual([
['1', 'paid'],
['2', 'shipped'],
['3', 'pending'],
]);
expect(result.rowCount).toBe(3);
expect(vi.mocked(athenaClient.getQueryResults)).toHaveBeenCalledTimes(2);
expect(vi.mocked(athenaClient.getQueryResults).mock.calls[1]?.[0].NextToken).toBe('page-2');
});
it('adapts to the live-database introspection port via factory', async () => {
const introspection = createAthenaLiveDatabaseIntrospection({
connections: { dw: connection },
clientFactory: fakeClientFactory(),
now: () => new Date('2026-06-21T10:00:00.000Z'),
});
await expect(introspection.extractSchema('dw')).resolves.toMatchObject({
connectionId: 'dw',
driver: 'athena',
metadata: { catalog: 'AwsDataCatalog' },
tables: expect.arrayContaining([
expect.objectContaining({
db: 'analytics',
name: 'orders',
columns: expect.arrayContaining([
expect.objectContaining({ name: 'id', dimensionType: 'number' }),
]),
}),
]),
});
});
});

View file

@ -0,0 +1,72 @@
import { describe, expect, it } from 'vitest';
import { KtxAthenaDialect } from '../../../src/connectors/athena/dialect.js';
describe('KtxAthenaDialect', () => {
const dialect = new KtxAthenaDialect();
it('quotes identifiers and formats catalog.database.table names', () => {
expect(dialect.quoteIdentifier('my"col')).toBe('"my""col"');
expect(dialect.formatTableName({ catalog: 'AwsDataCatalog', db: 'analytics', name: 'orders' })).toBe(
'"AwsDataCatalog"."analytics"."orders"',
);
expect(dialect.formatTableName({ db: 'analytics', name: 'orders' })).toBe('"analytics"."orders"');
expect(dialect.formatTableName({ name: 'orders' })).toBe('"orders"');
});
it('maps native Athena/Glue types to normalized types and dimension types', () => {
expect(dialect.mapDataType('bigint')).toBe('BIGINT');
expect(dialect.mapDataType('string')).toBe('VARCHAR');
expect(dialect.mapDataType('array<string>')).toBe('ARRAY');
expect(dialect.mapDataType('map<string,bigint>')).toBe('MAP');
expect(dialect.mapDataType('struct<id:bigint>')).toBe('STRUCT');
expect(dialect.mapDataType('decimal(18,2)')).toBe('DECIMAL');
expect(dialect.mapDataType('UNKNOWN_TYPE')).toBe('UNKNOWN_TYPE');
expect(dialect.mapToDimensionType('timestamp')).toBe('time');
expect(dialect.mapToDimensionType('date')).toBe('time');
expect(dialect.mapToDimensionType('bigint')).toBe('number');
expect(dialect.mapToDimensionType('double')).toBe('number');
expect(dialect.mapToDimensionType('decimal(10,2)')).toBe('number');
expect(dialect.mapToDimensionType('boolean')).toBe('boolean');
expect(dialect.mapToDimensionType('string')).toBe('string');
expect(dialect.mapToDimensionType('varchar')).toBe('string');
});
it('generates correct sample and column-sample SQL', () => {
expect(dialect.generateSampleQuery('"analytics"."orders"', 10, ['id', 'status'])).toBe(
'SELECT "id", "status" FROM "analytics"."orders" LIMIT 10',
);
expect(dialect.generateSampleQuery('"analytics"."orders"', 5)).toBe(
'SELECT * FROM "analytics"."orders" LIMIT 5',
);
expect(dialect.generateColumnSampleQuery('"analytics"."orders"', 'status', 20)).toBe(
'SELECT "status" FROM "analytics"."orders" WHERE "status" IS NOT NULL LIMIT 20',
);
});
it('generates Presto-style cardinality and distinct-values SQL', () => {
expect(dialect.generateCardinalitySampleQuery('"t"', '"col"', 1000)).toContain('approx_distinct');
expect(dialect.generateRandomizedCardinalitySampleQuery('"t"', '"col"', 500)).toContain('rand()');
expect(dialect.generateDistinctValuesQuery('"t"', '"col"', 50)).toContain(
'SELECT DISTINCT CAST("col" AS VARCHAR) AS val',
);
});
it('returns null for column statistics (unsupported)', () => {
expect(dialect.generateColumnStatisticsQuery('analytics', 'orders')).toBeNull();
});
it('produces Trino-correct OFFSET-before-LIMIT ordering', () => {
expect(dialect.getLimitOffsetClause(10)).toBe('LIMIT 10');
expect(dialect.getLimitOffsetClause(10, 0)).toBe('LIMIT 10');
expect(dialect.getLimitOffsetClause(10, 20)).toBe('OFFSET 20 LIMIT 10');
});
it('uses unit-separator (U+001F) as the array_join delimiter', () => {
const sql = dialect.getSampleValueAggregation('SELECT value FROM t');
const separatorIndex =
sql.indexOf("array_join(array_agg(CAST(value AS VARCHAR)), '") +
"array_join(array_agg(CAST(value AS VARCHAR)), '".length;
expect(sql.charCodeAt(separatorIndex)).toBe(0x1f);
});
});

View file

@ -305,7 +305,7 @@ describe('getDialectForDriver', () => {
it('throws with a supported-driver list for unknown drivers', () => {
expect(() => getDialectForDriver('oracle')).toThrow(
'Unsupported driver "oracle". Supported drivers: bigquery, clickhouse, duckdb, mongodb, mysql, postgres, snowflake, sqlite, sqlserver',
'Unsupported driver "oracle". Supported drivers: athena, bigquery, clickhouse, duckdb, mongodb, mysql, postgres, snowflake, sqlite, sqlserver',
);
});

View file

@ -70,6 +70,11 @@ const connectionFixtures: Record<KtxConnectionDriver, FixtureFactory> = {
database: 'ANALYTICS',
schema: 'PUBLIC',
}),
athena: () => ({
driver: 'athena',
region: 'us-east-1',
s3_staging_dir: 's3://my-bucket/athena-results/',
}),
};
const allowedScopeKeys = new Set(['dataset_ids', 'databases', 'schemas', 'schema_names']);
@ -100,6 +105,7 @@ describe('driverRegistrations', () => {
const registryDrivers = Object.keys(driverRegistrations).sort();
expect(listSupportedDrivers()).toEqual(registryDrivers);
expect(listSupportedDrivers()).toEqual([
'athena',
'bigquery',
'clickhouse',
'duckdb',

View file

@ -2175,6 +2175,40 @@ describe('local scan', () => {
};
expect(manifest.tables.orders?.joins?.some((join) => join.to === 'accounts')).toBe(true);
});
it('accepts athena as a native standalone scan driver when the host supplies a live-database adapter', async () => {
await writeFile(
join(project.projectDir, 'ktx.yaml'),
[
'connections:',
' warehouse:',
' driver: athena',
' region: us-east-1',
' s3_staging_dir: s3://my-bucket/athena-results/',
' databases:',
' - analytics',
'ingest:',
' adapters:',
' - live-database',
'',
].join('\n'),
'utf-8',
);
project = await loadKtxProject({ projectDir: project.projectDir });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
jobId: 'scan-run-athena',
now: () => new Date('2026-04-29T17:00:00.000Z'),
});
expect(result.report.driver).toBe('athena');
expect(result.report.artifactPaths.reportPath).toBe(
'raw-sources/warehouse/live-database/2026-04-29-170000-scan-run-athena/scan-report.json',
);
});
});
describe('resolveEnabledTables', () => {

View file

@ -12,6 +12,7 @@ describe('sqlAnalysisDialectForDriver', () => {
expect(sqlAnalysisDialectForDriver('duckdb')).toBe('duckdb');
expect(sqlAnalysisDialectForDriver('clickhouse')).toBe('clickhouse');
expect(sqlAnalysisDialectForDriver('databricks')).toBe('databricks');
expect(sqlAnalysisDialectForDriver('athena')).toBe('athena');
});
it('maps local connection-type spellings to sqlglot dialects', () => {

View file

@ -243,6 +243,7 @@ describe('setup databases step', () => {
{ value: 'mysql', label: 'MySQL' },
{ value: 'clickhouse', label: 'ClickHouse' },
{ value: 'sqlserver', label: 'SQL Server' },
{ value: 'athena', label: 'Amazon Athena' },
{ value: 'mongodb', label: 'MongoDB' },
{ value: 'sqlite', label: 'SQLite' },
{ value: 'duckdb', label: 'DuckDB' },
@ -618,6 +619,29 @@ describe('setup databases step', () => {
},
],
},
{
driver: 'athena',
textValues: ['', 'us-east-1', 's3://my-bucket/athena-results/', '', ''],
expectedTextPrompts: [
{
message: connectionNamePrompt('Amazon Athena'),
placeholder: 'athena-warehouse',
initialValue: 'athena-warehouse',
},
{
message: 'AWS region\nFor example us-east-1.',
},
{
message: 'S3 staging directory\nAthena writes query results here. For example s3://my-bucket/athena-results/.',
},
{
message: 'Athena workgroup (optional)\nPress Enter to use the default workgroup "primary".',
},
{
message: 'Glue Data Catalog name (optional)\nPress Enter to use the default "AwsDataCatalog".',
},
],
},
];
for (const testCase of cases) {
@ -1967,6 +1991,40 @@ describe('setup databases step', () => {
expect(project.config.connections['clickhouse-warehouse']).not.toHaveProperty('schemas');
});
it('maps Athena scripted database schema input to databases field', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' athena-warehouse:',
' driver: athena',
' region: us-east-1',
' s3_staging_dir: s3://my-bucket/athena-results/',
'',
].join('\n'),
'utf-8',
);
await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
skipDatabases: false,
databaseConnectionIds: ['athena-warehouse'],
databaseSchemas: ['analytics', 'raw'],
},
makeIo().io,
{ testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0) },
);
const project = await loadKtxProject({ projectDir: tempDir });
expect(project.config.connections['athena-warehouse']).toMatchObject({
driver: 'athena',
databases: ['analytics', 'raw'],
});
expect(project.config.connections['athena-warehouse']).not.toHaveProperty('schemas');
});
it('does not prompt for a bootstrap BigQuery dataset before scope discovery', async () => {
const prompts = makePromptAdapter({
multiselectValues: [['bigquery']],