mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
343 lines
10 KiB
TypeScript
343 lines
10 KiB
TypeScript
import { once } from 'node:events';
|
|
import { createServer } from 'node:http';
|
|
import { describe, expect, it, vi } from 'vitest';
|
|
import { createHttpSemanticLayerComputePort, createPythonSemanticLayerComputePort } from '../../../src/context/daemon/semantic-layer-compute.js';
|
|
|
|
const source = {
|
|
name: 'orders',
|
|
table: 'public.orders',
|
|
grain: ['id'],
|
|
columns: [{ name: 'id', type: 'number' }],
|
|
joins: [],
|
|
measures: [{ name: 'order_count', expr: 'count(*)' }],
|
|
};
|
|
|
|
const sourceGenerationInput = {
|
|
tables: [
|
|
{
|
|
name: 'orders',
|
|
db: 'public',
|
|
comment: 'Orders table',
|
|
columns: [
|
|
{ name: 'id', type: 'integer', primaryKey: true, nullable: false, comment: 'Order ID' },
|
|
{ name: 'customer_id', type: 'integer' },
|
|
{ name: 'amount', type: 'decimal', comment: 'Order amount' },
|
|
],
|
|
},
|
|
{
|
|
name: 'customers',
|
|
db: 'public',
|
|
columns: [
|
|
{ name: 'id', type: 'integer', primaryKey: true },
|
|
{ name: 'email', type: 'varchar' },
|
|
],
|
|
},
|
|
],
|
|
links: [
|
|
{
|
|
fromTable: 'orders',
|
|
fromColumn: 'customer_id',
|
|
toTable: 'customers',
|
|
toColumn: 'id',
|
|
relationshipType: 'MANY_TO_ONE',
|
|
},
|
|
],
|
|
dialect: 'postgres',
|
|
};
|
|
|
|
const sourceGenerationDaemonPayload = {
|
|
tables: [
|
|
{
|
|
name: 'orders',
|
|
db: 'public',
|
|
comment: 'Orders table',
|
|
columns: [
|
|
{ name: 'id', type: 'integer', primary_key: true, nullable: false, comment: 'Order ID' },
|
|
{ name: 'customer_id', type: 'integer' },
|
|
{ name: 'amount', type: 'decimal', comment: 'Order amount' },
|
|
],
|
|
},
|
|
{
|
|
name: 'customers',
|
|
db: 'public',
|
|
columns: [
|
|
{ name: 'id', type: 'integer', primary_key: true },
|
|
{ name: 'email', type: 'varchar' },
|
|
],
|
|
},
|
|
],
|
|
links: [
|
|
{
|
|
from_table: 'orders',
|
|
from_column: 'customer_id',
|
|
to_table: 'customers',
|
|
to_column: 'id',
|
|
relationship_type: 'MANY_TO_ONE',
|
|
},
|
|
],
|
|
dialect: 'postgres',
|
|
};
|
|
|
|
const sourceGenerationDaemonResponse = {
|
|
source_count: 2,
|
|
sources: [
|
|
{
|
|
name: 'orders',
|
|
table: 'public.orders',
|
|
grain: ['id'],
|
|
columns: [{ name: 'id', type: 'number' }],
|
|
joins: [
|
|
{
|
|
to: 'customers',
|
|
on: 'customer_id = customers.id',
|
|
relationship: 'many_to_one',
|
|
},
|
|
],
|
|
measures: [{ name: 'record_count', expr: 'count(id)' }],
|
|
},
|
|
],
|
|
};
|
|
|
|
describe('createPythonSemanticLayerComputePort', () => {
|
|
it('calls the semantic-query stdio command', async () => {
|
|
const runJson = vi.fn(async () => ({
|
|
sql: 'select count(*) from public.orders',
|
|
dialect: 'postgres',
|
|
columns: [{ name: 'orders.order_count' }],
|
|
plan: { sources_used: ['orders'] },
|
|
}));
|
|
const port = createPythonSemanticLayerComputePort({
|
|
runJson,
|
|
projectId: 'hashed-project-id',
|
|
});
|
|
|
|
await expect(
|
|
port.query({
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
query: { measures: ['orders.order_count'], dimensions: [] },
|
|
}),
|
|
).resolves.toEqual({
|
|
sql: 'select count(*) from public.orders',
|
|
dialect: 'postgres',
|
|
columns: [{ name: 'orders.order_count' }],
|
|
plan: { sources_used: ['orders'] },
|
|
});
|
|
|
|
expect(runJson).toHaveBeenCalledWith('semantic-query', {
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
query: { measures: ['orders.order_count'], dimensions: [] },
|
|
projectId: 'hashed-project-id',
|
|
});
|
|
});
|
|
|
|
it('calls the semantic-validate stdio command', async () => {
|
|
const runJson = vi.fn(async () => ({
|
|
valid: true,
|
|
errors: [],
|
|
warnings: [],
|
|
per_source_warnings: {},
|
|
}));
|
|
const port = createPythonSemanticLayerComputePort({ runJson });
|
|
|
|
await expect(
|
|
port.validateSources({
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
recentlyTouched: ['orders'],
|
|
}),
|
|
).resolves.toEqual({
|
|
valid: true,
|
|
errors: [],
|
|
warnings: [],
|
|
perSourceWarnings: {},
|
|
});
|
|
|
|
expect(runJson).toHaveBeenCalledWith('semantic-validate', {
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
recently_touched: ['orders'],
|
|
});
|
|
});
|
|
|
|
it('calls the semantic-generate-sources stdio command', async () => {
|
|
const runJson = vi.fn(async () => sourceGenerationDaemonResponse);
|
|
const port = createPythonSemanticLayerComputePort({ runJson });
|
|
|
|
await expect(port.generateSources(sourceGenerationInput)).resolves.toEqual({
|
|
sourceCount: 2,
|
|
sources: sourceGenerationDaemonResponse.sources,
|
|
});
|
|
|
|
expect(runJson).toHaveBeenCalledWith('semantic-generate-sources', sourceGenerationDaemonPayload);
|
|
});
|
|
});
|
|
|
|
describe('createHttpSemanticLayerComputePort', () => {
|
|
it('calls semantic query and validate HTTP endpoints through an injected runner', async () => {
|
|
const requestJson = vi.fn(async (path: string) => {
|
|
if (path === '/semantic-layer/query') {
|
|
return {
|
|
sql: 'select count(*) from public.orders',
|
|
dialect: 'postgres',
|
|
columns: [{ name: 'orders.order_count' }],
|
|
plan: { sources_used: ['orders'] },
|
|
};
|
|
}
|
|
return {
|
|
valid: true,
|
|
errors: [],
|
|
warnings: [],
|
|
per_source_warnings: {},
|
|
};
|
|
});
|
|
const port = createHttpSemanticLayerComputePort({ baseUrl: 'http://127.0.0.1:8765/', requestJson });
|
|
|
|
await expect(
|
|
port.query({
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
query: { measures: ['orders.order_count'], dimensions: [] },
|
|
}),
|
|
).resolves.toEqual({
|
|
sql: 'select count(*) from public.orders',
|
|
dialect: 'postgres',
|
|
columns: [{ name: 'orders.order_count' }],
|
|
plan: { sources_used: ['orders'] },
|
|
});
|
|
|
|
await expect(
|
|
port.validateSources({
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
recentlyTouched: ['orders'],
|
|
}),
|
|
).resolves.toEqual({
|
|
valid: true,
|
|
errors: [],
|
|
warnings: [],
|
|
perSourceWarnings: {},
|
|
});
|
|
|
|
expect(requestJson).toHaveBeenNthCalledWith(1, '/semantic-layer/query', {
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
query: { measures: ['orders.order_count'], dimensions: [] },
|
|
});
|
|
expect(requestJson).toHaveBeenNthCalledWith(2, '/semantic-layer/validate', {
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
recently_touched: ['orders'],
|
|
});
|
|
});
|
|
|
|
it('calls the semantic source-generation HTTP endpoint through an injected runner', async () => {
|
|
const requestJson = vi.fn(async () => sourceGenerationDaemonResponse);
|
|
const port = createHttpSemanticLayerComputePort({ baseUrl: 'http://127.0.0.1:8765/', requestJson });
|
|
|
|
await expect(port.generateSources(sourceGenerationInput)).resolves.toEqual({
|
|
sourceCount: 2,
|
|
sources: sourceGenerationDaemonResponse.sources,
|
|
});
|
|
|
|
expect(requestJson).toHaveBeenCalledWith('/semantic-layer/generate-sources', sourceGenerationDaemonPayload);
|
|
});
|
|
|
|
it('posts JSON to a running HTTP daemon endpoint', async () => {
|
|
const requests: Array<{ url: string | undefined; body: unknown }> = [];
|
|
const server = createServer((request, response) => {
|
|
const chunks: Buffer[] = [];
|
|
request.on('data', (chunk: Buffer) => chunks.push(chunk));
|
|
request.on('end', () => {
|
|
requests.push({
|
|
url: request.url,
|
|
body: JSON.parse(Buffer.concat(chunks).toString('utf8')),
|
|
});
|
|
response.writeHead(200, { 'content-type': 'application/json' });
|
|
response.end(
|
|
JSON.stringify({
|
|
sql: 'select count(*) from public.orders',
|
|
dialect: 'postgres',
|
|
columns: [{ name: 'orders.order_count' }],
|
|
plan: { sources_used: ['orders'] },
|
|
}),
|
|
);
|
|
});
|
|
});
|
|
|
|
server.listen(0, '127.0.0.1');
|
|
await once(server, 'listening');
|
|
try {
|
|
const address = server.address();
|
|
if (!address || typeof address === 'string') {
|
|
throw new Error('expected TCP server address');
|
|
}
|
|
const port = createHttpSemanticLayerComputePort({ baseUrl: `http://127.0.0.1:${address.port}` });
|
|
|
|
await expect(
|
|
port.query({
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
query: { measures: ['orders.order_count'], dimensions: [] },
|
|
}),
|
|
).resolves.toMatchObject({
|
|
sql: 'select count(*) from public.orders',
|
|
dialect: 'postgres',
|
|
});
|
|
|
|
expect(requests).toEqual([
|
|
{
|
|
url: '/semantic-layer/query',
|
|
body: {
|
|
sources: [source],
|
|
dialect: 'postgres',
|
|
query: { measures: ['orders.order_count'], dimensions: [] },
|
|
},
|
|
},
|
|
]);
|
|
} finally {
|
|
server.close();
|
|
}
|
|
});
|
|
|
|
it('posts source-generation JSON to a running HTTP daemon endpoint', async () => {
|
|
const requests: Array<{ url: string | undefined; body: unknown }> = [];
|
|
const server = createServer((request, response) => {
|
|
const chunks: Buffer[] = [];
|
|
request.on('data', (chunk: Buffer) => chunks.push(chunk));
|
|
request.on('end', () => {
|
|
requests.push({
|
|
url: request.url,
|
|
body: JSON.parse(Buffer.concat(chunks).toString('utf8')),
|
|
});
|
|
response.writeHead(200, { 'content-type': 'application/json' });
|
|
response.end(JSON.stringify(sourceGenerationDaemonResponse));
|
|
});
|
|
});
|
|
|
|
server.listen(0, '127.0.0.1');
|
|
await once(server, 'listening');
|
|
try {
|
|
const address = server.address();
|
|
if (!address || typeof address === 'string') {
|
|
throw new Error('expected TCP server address');
|
|
}
|
|
const port = createHttpSemanticLayerComputePort({ baseUrl: `http://127.0.0.1:${address.port}` });
|
|
|
|
await expect(port.generateSources(sourceGenerationInput)).resolves.toEqual({
|
|
sourceCount: 2,
|
|
sources: sourceGenerationDaemonResponse.sources,
|
|
});
|
|
|
|
expect(requests).toEqual([
|
|
{
|
|
url: '/semantic-layer/generate-sources',
|
|
body: sourceGenerationDaemonPayload,
|
|
},
|
|
]);
|
|
} finally {
|
|
server.close();
|
|
}
|
|
});
|
|
});
|