From cd6c9107d799e35c8b471c796365a2ebcc915b85 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 11 Jun 2026 08:25:13 -0500 Subject: [PATCH] refactor(ts): make maintenance scripts effect native --- ts/scripts/create-test-pdf.ts | 27 +- ts/scripts/effect-laws.allowlist.json | 2 +- ts/scripts/inventory-native-classes.ts | 121 +++--- ts/scripts/seed-config.ts | 146 +++++-- ts/scripts/seed-demo.ts | 341 ++++++++++------ ts/scripts/seed-flows.ts | 159 ++++++-- ts/scripts/test-pipeline.ts | 524 ++++++++++++++----------- 7 files changed, 845 insertions(+), 475 deletions(-) diff --git a/ts/scripts/create-test-pdf.ts b/ts/scripts/create-test-pdf.ts index 1e5be842..749d2640 100644 --- a/ts/scripts/create-test-pdf.ts +++ b/ts/scripts/create-test-pdf.ts @@ -5,8 +5,11 @@ * extractor can identify. Writes to data/test.pdf. */ +import { BunRuntime } from "@effect/platform-bun"; +import * as BunFileSystem from "@effect/platform-bun/BunFileSystem"; +import { Effect } from "effect"; +import * as FileSystem from "effect/FileSystem"; import { PDFDocument, StandardFonts } from "pdf-lib"; -import { writeFileSync, mkdirSync } from "node:fs"; const PAGE_1 = `Acme Corporation: Company Overview @@ -24,10 +27,11 @@ CloudSync was officially launched in January 2024. The platform competes with es Acme Corporation is headquartered in San Francisco, California. The company employs approximately 200 people across engineering, sales, and operations departments.`; -async function main(): Promise { - const pdf = await PDFDocument.create(); - const font = await pdf.embedFont(StandardFonts.Helvetica); - const boldFont = await pdf.embedFont(StandardFonts.HelveticaBold); +const main = Effect.fn("createTestPdf.main")(function*() { + const fs = yield* FileSystem.FileSystem; + const pdf = yield* Effect.promise(() => PDFDocument.create()); + const font = yield* Effect.promise(() => pdf.embedFont(StandardFonts.Helvetica)); + const boldFont = yield* Effect.promise(() => pdf.embedFont(StandardFonts.HelveticaBold)); for (const [i, text] of [PAGE_1, PAGE_2].entries()) { const page = pdf.addPage([612, 792]); // US Letter @@ -54,14 +58,11 @@ async function main(): Promise { } } - const pdfBytes = await pdf.save(); + const pdfBytes = yield* Effect.promise(() => pdf.save()); - mkdirSync("data", { recursive: true }); - writeFileSync("data/test.pdf", pdfBytes); + yield* fs.makeDirectory("data", { recursive: true }); + yield* fs.writeFile("data/test.pdf", pdfBytes); console.log(`Created data/test.pdf (${pdfBytes.length} bytes, 2 pages)`); -} - -main().catch((err) => { - console.error("Failed to create test PDF:", err); - process.exit(1); }); + +BunRuntime.runMain(main().pipe(Effect.provide(BunFileSystem.layer))); diff --git a/ts/scripts/effect-laws.allowlist.json b/ts/scripts/effect-laws.allowlist.json index fa293e2a..60a64de1 100644 --- a/ts/scripts/effect-laws.allowlist.json +++ b/ts/scripts/effect-laws.allowlist.json @@ -1 +1 @@ -{"exemptions":[],"baseline":[{"rule":"no-error-throw","path":"packages/workbench/src/main.tsx","count":1},{"rule":"no-error-throw","path":"scripts/seed-config.ts","count":1},{"rule":"no-error-throw","path":"scripts/seed-demo.ts","count":4},{"rule":"no-error-throw","path":"scripts/seed-flows.ts","count":2},{"rule":"no-error-throw","path":"scripts/test-pipeline.ts","count":2},{"rule":"no-native-fetch","path":"scripts/seed-config.ts","count":1},{"rule":"no-native-fetch","path":"scripts/seed-demo.ts","count":11},{"rule":"no-native-fetch","path":"scripts/seed-flows.ts","count":1},{"rule":"no-native-fetch","path":"scripts/test-pipeline.ts","count":5},{"rule":"no-native-json","path":"scripts/seed-config.ts","count":6},{"rule":"no-native-json","path":"scripts/seed-demo.ts","count":6},{"rule":"no-native-json","path":"scripts/seed-flows.ts","count":3},{"rule":"no-native-json","path":"scripts/test-pipeline.ts","count":6},{"rule":"no-native-sort","path":"packages/client/src/socket/trustgraph-socket.ts","count":2},{"rule":"no-native-sort","path":"packages/flow/src/config/service.ts","count":3},{"rule":"no-native-sort","path":"packages/flow/src/cores/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/flow-manager/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/librarian/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/retrieval/graph-rag.ts","count":1},{"rule":"no-native-sort","path":"packages/mcp/src/server-effect.ts","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/atoms/workbench.ts","count":2},{"rule":"no-native-sort","path":"packages/workbench/src/components/chat/explain-graph.tsx","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/pages/graph.tsx","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/qa/mock-api.ts","count":1},{"rule":"no-native-sort","path":"scripts/inventory-native-classes.ts","count":1},{"rule":"no-native-sort","path":"scripts/seed-demo.ts","count":1},{"rule":"no-native-sort","path":"scripts/seed-flows.ts","count":1},{"rule":"no-native-timers","path":"scripts/test-pipeline.ts","count":2},{"rule":"no-node-fs-path","path":"scripts/create-test-pdf.ts","count":1},{"rule":"no-node-fs-path","path":"scripts/inventory-native-classes.ts","count":2},{"rule":"no-process-env","path":"scripts/seed-config.ts","count":2},{"rule":"no-process-env","path":"scripts/seed-demo.ts","count":5},{"rule":"no-process-env","path":"scripts/seed-flows.ts","count":1},{"rule":"no-process-env","path":"scripts/test-pipeline.ts","count":11},{"rule":"no-schema-suffix","path":"packages/base/src/schema/primitives.ts","count":1}]} +{"exemptions":[],"baseline":[{"rule":"no-error-throw","path":"packages/workbench/src/main.tsx","count":1},{"rule":"no-native-sort","path":"packages/client/src/socket/trustgraph-socket.ts","count":2},{"rule":"no-native-sort","path":"packages/flow/src/config/service.ts","count":3},{"rule":"no-native-sort","path":"packages/flow/src/cores/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/flow-manager/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/librarian/service.ts","count":1},{"rule":"no-native-sort","path":"packages/flow/src/retrieval/graph-rag.ts","count":1},{"rule":"no-native-sort","path":"packages/mcp/src/server-effect.ts","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/atoms/workbench.ts","count":2},{"rule":"no-native-sort","path":"packages/workbench/src/components/chat/explain-graph.tsx","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/pages/graph.tsx","count":1},{"rule":"no-native-sort","path":"packages/workbench/src/qa/mock-api.ts","count":1},{"rule":"no-schema-suffix","path":"packages/base/src/schema/primitives.ts","count":1}]} diff --git a/ts/scripts/inventory-native-classes.ts b/ts/scripts/inventory-native-classes.ts index fe6b50ea..09e5f7e2 100644 --- a/ts/scripts/inventory-native-classes.ts +++ b/ts/scripts/inventory-native-classes.ts @@ -1,5 +1,8 @@ -import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; -import { join, relative, sep } from "node:path"; +import { BunRuntime } from "@effect/platform-bun"; +import * as BunFileSystem from "@effect/platform-bun/BunFileSystem"; +import { Array as A, Effect, Layer, Order, Path, Schema as S } from "effect"; +import * as FileSystem from "effect/FileSystem"; +import type { PlatformError } from "effect/PlatformError"; import ts from "typescript"; type Scope = "production" | "non-production"; @@ -17,10 +20,8 @@ interface ClassFinding { } const root = process.cwd(); -const packagesSrc = join(root, "packages"); -const scriptsDir = join(root, "scripts"); - const sourceExtensions = new Set([".ts", ".tsx", ".mts", ".cts"]); +const ignoredDirectories = new Set(["dist", "node_modules", ".turbo"]); const effectClassPatterns = [ /\bS\.(Class|TaggedClass|TaggedErrorClass|ErrorClass)\b/, /\bSchema\.(Class|TaggedClass|TaggedErrorClass|ErrorClass)\b/, @@ -33,6 +34,11 @@ const effectClassPatterns = [ /\bEffect\.Service\b/, ]; +class InventoryFailed extends S.TaggedErrorClass()( + "InventoryFailed", + { message: S.String }, +) {} + function extensionOf(path: string): string { const match = path.match(/\.[cm]?tsx?$/); return match?.[0] ?? ""; @@ -42,30 +48,32 @@ function isSourceFile(path: string): boolean { return sourceExtensions.has(extensionOf(path)); } -function walk(dir: string): string[] { - if (!existsSync(dir)) { - return []; - } +const walk = Effect.fn("inventory.walk")(function*( + dir: string, +): Effect.Effect { + const fs = yield* FileSystem.FileSystem; + const platformPath = yield* Path.Path; + const exists = yield* fs.exists(dir); + if (!exists) return []; - const files: string[] = []; - for (const entry of readdirSync(dir)) { - if (entry === "dist" || entry === "node_modules" || entry === ".turbo") { - continue; - } - - const path = join(dir, entry); - const stat = statSync(path); - if (stat.isDirectory()) { - files.push(...walk(path)); - } else if (stat.isFile() && isSourceFile(path)) { - files.push(path); - } - } - return files; -} + const entries = yield* fs.readDirectory(dir); + const chunks = yield* Effect.all( + entries.map((entry) => + Effect.gen(function*() { + if (ignoredDirectories.has(entry)) return []; + const fullPath = platformPath.join(dir, entry); + const stat = yield* fs.stat(fullPath); + if (stat.type === "Directory") return yield* walk(fullPath); + return stat.type === "File" && isSourceFile(fullPath) ? [fullPath] : []; + }) + ), + { concurrency: 16 }, + ); + return chunks.flat(); +}); function isProductionPackageSource(path: string): boolean { - const rel = relative(root, path).split(sep).join("/"); + const rel = path; return ( rel.startsWith("packages/") && rel.includes("/src/") && @@ -107,8 +115,12 @@ function classify(scope: Scope, extendsText?: string): Pick { + const fs = yield* FileSystem.FileSystem; + const sourceText = yield* fs.readFileString(path); const source = ts.createSourceFile( path, sourceText, @@ -116,7 +128,7 @@ function inspectFile(path: string): ClassFinding[] { true, path.endsWith(".tsx") ? ts.ScriptKind.TSX : ts.ScriptKind.TS, ); - const scope: Scope = isProductionPackageSource(path) ? "production" : "non-production"; + const scope: Scope = isProductionPackageSource(relativePath) ? "production" : "non-production"; const findings: ClassFinding[] = []; function visit(node: ts.Node): void { @@ -125,7 +137,7 @@ function inspectFile(path: string): ClassFinding[] { const extendsText = getExtendsText(node, source); const { classification, reason } = classify(scope, extendsText); findings.push({ - file: relative(root, path).split(sep).join("/"), + file: relativePath, line: position.line + 1, column: position.character + 1, name: getClassName(node), @@ -141,14 +153,7 @@ function inspectFile(path: string): ClassFinding[] { visit(source); return findings; -} - -const files = [...walk(packagesSrc), ...walk(scriptsDir)].sort(); -const findings = files.flatMap(inspectFile); -const productionFindings = findings.filter((finding) => finding.scope === "production"); -const blocking = productionFindings.filter((finding) => finding.classification === "blocking"); -const candidates = productionFindings.filter((finding) => finding.classification === "candidate-effect-exemption"); -const nonProduction = findings.filter((finding) => finding.scope === "non-production"); +}); function printGroup(title: string, group: ClassFinding[]): void { console.log(`${title}: ${group.length}`); @@ -160,13 +165,37 @@ function printGroup(title: string, group: ClassFinding[]): void { } } -printGroup("Blocking production native classes", blocking); -printGroup("Candidate Effect class-shaped exemptions", candidates); -printGroup("Non-production class declarations", nonProduction); +const program = Effect.fn("inventory.main")(function*() { + const platformPath = yield* Path.Path; + const packageFiles = yield* walk(platformPath.join(root, "packages")); + const scriptFiles = yield* walk(platformPath.join(root, "scripts")); + const files = A.sort([...packageFiles, ...scriptFiles], Order.String); + const findings = (yield* Effect.all( + files.map((file) => + inspectFile(file, platformPath.relative(root, file).split(platformPath.sep).join("/")) + ), + { concurrency: 16 }, + )).flat(); + const productionFindings = findings.filter((finding) => finding.scope === "production"); + const blocking = productionFindings.filter((finding) => finding.classification === "blocking"); + const candidates = productionFindings.filter((finding) => finding.classification === "candidate-effect-exemption"); + const nonProduction = findings.filter((finding) => finding.scope === "non-production"); -if (blocking.length > 0) { - console.error(`\nFound ${blocking.length} blocking production native class declarations.`); - process.exit(1); -} + printGroup("Blocking production native classes", blocking); + printGroup("Candidate Effect class-shaped exemptions", candidates); + printGroup("Non-production class declarations", nonProduction); -console.log("\nNo blocking production native class declarations found."); + if (blocking.length > 0) { + const message = `Found ${blocking.length} blocking production native class declarations.`; + console.error(`\n${message}`); + return yield* InventoryFailed.make({ message }); + } + + console.log("\nNo blocking production native class declarations found."); +}); + +BunRuntime.runMain( + program().pipe( + Effect.provide(Layer.merge(BunFileSystem.layer, Path.layer)), + ), +); diff --git a/ts/scripts/seed-config.ts b/ts/scripts/seed-config.ts index 04c483af..ba352032 100644 --- a/ts/scripts/seed-config.ts +++ b/ts/scripts/seed-config.ts @@ -6,25 +6,117 @@ * Requires: gateway + config service running */ -const GATEWAY_URL = process.env.GATEWAY_URL ?? "http://localhost:8088"; +import { BunRuntime } from "@effect/platform-bun"; +import * as BunHttpClient from "@effect/platform-bun/BunHttpClient"; +import { Config, Effect, Option as O, Schema as S } from "effect"; +import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"; -async function pushConfig(keys: string[], values: Record): Promise { - const res = await fetch(`${GATEWAY_URL}/api/v1/config`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ operation: "put", keys, values }), - }); - const data = await res.json(); - if (data.error) throw new Error(`Config push failed: ${data.error.message}`); - console.log(` Pushed config [${keys.join("/")}] → version ${data.version}`); -} +const DEFAULT_GATEWAY_URL = "http://localhost:8088"; + +class SeedConfigError extends S.TaggedErrorClass()( + "SeedConfigError", + { + operation: S.String, + message: S.String, + }, +) {} + +const GatewayErrorBody = S.Struct({ + message: S.optionalKey(S.String), +}); + +const ConfigPushResponse = S.Struct({ + version: S.optionalKey(S.Number), + error: S.optionalKey(GatewayErrorBody), +}); + +const stringifyJson = (operation: string, value: unknown) => + S.encodeUnknownEffect(S.UnknownFromJsonString)(value).pipe( + Effect.mapError((cause) => + SeedConfigError.make({ + operation, + message: String(cause), + }) + ), + ); + +const decodeConfigResponse = (operation: string, value: unknown) => + S.decodeUnknownEffect(ConfigPushResponse)(value).pipe( + Effect.mapError((cause) => + SeedConfigError.make({ + operation, + message: String(cause), + }) + ), + ); + +const postJson = Effect.fn("seed-config.postJson")(function* ( + gatewayUrl: string, + path: string, + body: unknown, +) { + const bodyText = yield* stringifyJson("encode-request", body); + const request = HttpClientRequest.post(`${gatewayUrl}${path}`, { acceptJson: true }).pipe( + HttpClientRequest.bodyText(bodyText, "application/json"), + ); + const response = yield* HttpClient.execute(request).pipe( + Effect.flatMap(HttpClientResponse.filterStatusOk), + Effect.mapError((cause) => + SeedConfigError.make({ + operation: "http-request", + message: String(cause), + }) + ), + ); + const responseText = yield* response.text.pipe( + Effect.mapError((cause) => + SeedConfigError.make({ + operation: "read-response", + message: String(cause), + }) + ), + ); + return yield* S.decodeUnknownEffect(S.UnknownFromJsonString)(responseText).pipe( + Effect.mapError((cause) => + SeedConfigError.make({ + operation: "decode-response-json", + message: String(cause), + }) + ), + ); +}); + +const pushConfig = Effect.fn("seed-config.pushConfig")(function* ( + gatewayUrl: string, + keys: ReadonlyArray, + values: Record, +) { + const data = yield* postJson(gatewayUrl, "/api/v1/config", { + operation: "put", + keys, + values, + }).pipe(Effect.flatMap((response) => decodeConfigResponse("decode-config-response", response))); + + if (data.error !== undefined) { + return yield* SeedConfigError.make({ + operation: "config-push", + message: data.error.message ?? "unknown gateway error", + }); + } + + console.log(` Pushed config [${keys.join("/")}] → version ${data.version ?? "unknown"}`); +}); + +const main = Effect.fn("seed-config.main")(function* () { + const gatewayUrl = yield* Config.string("GATEWAY_URL").pipe(Config.withDefault(DEFAULT_GATEWAY_URL)); + const braveApiKey = yield* Config.redacted("BRAVE_API_KEY").pipe(Config.option); + const hasBraveApiKey = O.isSome(braveApiKey); -async function main(): Promise { console.log("Seeding TrustGraph configuration...\n"); // 1. Prompt templates console.log("── Prompt Templates ──"); - await pushConfig(["prompt"], { + yield* pushConfig(gatewayUrl, ["prompt"], { "extract-relationships": { system: "You are a helpful assistant that extracts structured knowledge from text.", prompt: [ @@ -142,7 +234,7 @@ async function main(): Promise { // 2. Flow definitions (default flow with all topic mappings) console.log("\n── Flow Definitions ──"); - await pushConfig(["flows"], { + yield* pushConfig(gatewayUrl, ["flows"], { default: { topics: { // Document processing pipeline @@ -197,10 +289,9 @@ async function main(): Promise { // 3. MCP server configuration (external tool providers) console.log("\n── MCP Configuration ──"); - const braveApiKey = process.env.BRAVE_API_KEY; - if (braveApiKey) { - await pushConfig(["mcp"], { - "brave-search": JSON.stringify({ + if (hasBraveApiKey) { + yield* pushConfig(gatewayUrl, ["mcp"], { + "brave-search": yield* stringifyJson("encode-brave-search-mcp", { url: "http://localhost:8383/mcp", "remote-name": "brave_web_search", }), @@ -213,19 +304,19 @@ async function main(): Promise { // 4. Agent tool configuration (maps tools to implementations) console.log("\n── Tool Configuration ──"); const toolConfig: Record = { - "knowledge-query": JSON.stringify({ + "knowledge-query": yield* stringifyJson("encode-knowledge-query-tool", { type: "knowledge-query", name: "KnowledgeQuery", description: "Query the knowledge graph for information about entities and their relationships.", group: ["default"], }), - "document-query": JSON.stringify({ + "document-query": yield* stringifyJson("encode-document-query-tool", { type: "document-query", name: "DocumentQuery", description: "Search the document library for relevant information using semantic search.", group: ["default"], }), - "triples-query": JSON.stringify({ + "triples-query": yield* stringifyJson("encode-triples-query-tool", { type: "triples-query", name: "TriplesQuery", description: "Query for specific triples (subject-predicate-object relationships) in the knowledge graph.", @@ -234,8 +325,8 @@ async function main(): Promise { }; // Add Brave Search tool if API key is available - if (braveApiKey) { - toolConfig["brave-search"] = JSON.stringify({ + if (hasBraveApiKey) { + toolConfig["brave-search"] = yield* stringifyJson("encode-brave-search-tool", { type: "mcp-tool", name: "brave-search", description: "Search the web using Brave Search. Returns web search results including titles, URLs, and descriptions.", @@ -248,12 +339,9 @@ async function main(): Promise { console.log(" Brave Search tool added"); } - await pushConfig(["tool"], toolConfig); + yield* pushConfig(gatewayUrl, ["tool"], toolConfig); console.log("\nConfiguration seeded successfully."); -} - -main().catch((err) => { - console.error("Seed failed:", err); - process.exit(1); }); + +BunRuntime.runMain(main().pipe(Effect.provide(BunHttpClient.layer))); diff --git a/ts/scripts/seed-demo.ts b/ts/scripts/seed-demo.ts index 08810981..5df1ff23 100644 --- a/ts/scripts/seed-demo.ts +++ b/ts/scripts/seed-demo.ts @@ -16,17 +16,21 @@ * Also seeds config via the gateway if it's running. */ +import { BunRuntime } from "@effect/platform-bun"; +import * as BunHttpClient from "@effect/platform-bun/BunHttpClient"; +import { Array as A, Config, Effect, Order, Schema as S } from "effect"; +import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"; import { createClient, Graph } from "falkordb"; // --------------------------------------------------------------------------- // Config // --------------------------------------------------------------------------- -const FALKORDB_URL = process.env.FALKORDB_URL ?? "redis://localhost:6380"; -const QDRANT_URL = process.env.QDRANT_URL ?? "http://localhost:6333"; -const OLLAMA_URL = process.env.OLLAMA_URL ?? "http://localhost:11434"; -const GATEWAY_URL = process.env.GATEWAY_URL ?? "http://localhost:8088"; -const EMBED_MODEL = process.env.EMBED_MODEL ?? "mxbai-embed-large"; +const DEFAULT_FALKORDB_URL = "redis://localhost:6380"; +const DEFAULT_QDRANT_URL = "http://localhost:6333"; +const DEFAULT_OLLAMA_URL = "http://localhost:11434"; +const DEFAULT_GATEWAY_URL = "http://localhost:8088"; +const DEFAULT_EMBED_MODEL = "mxbai-embed-large"; const USER = "default"; const COLLECTION = "default"; @@ -44,6 +48,125 @@ interface RawTriple { oIsEntity: boolean; } +interface DemoConfig { + readonly falkorDbUrl: string; + readonly qdrantUrl: string; + readonly ollamaUrl: string; + readonly gatewayUrl: string; + readonly embedModel: string; +} + +class SeedDemoError extends S.TaggedErrorClass()( + "SeedDemoError", + { + operation: S.String, + message: S.String, + }, +) {} + +const GatewayErrorBody = S.Struct({ + message: S.optionalKey(S.String), +}); + +const ConfigPushResponse = S.Struct({ + version: S.optionalKey(S.Number), + error: S.optionalKey(GatewayErrorBody), +}); + +const OllamaEmbedResponse = S.Struct({ + embeddings: S.Array(S.Array(S.Number)), +}); + +const loadConfig = Effect.fn("seed-demo.loadConfig")(function* () { + return { + falkorDbUrl: yield* Config.string("FALKORDB_URL").pipe(Config.withDefault(DEFAULT_FALKORDB_URL)), + qdrantUrl: yield* Config.string("QDRANT_URL").pipe(Config.withDefault(DEFAULT_QDRANT_URL)), + ollamaUrl: yield* Config.string("OLLAMA_URL").pipe(Config.withDefault(DEFAULT_OLLAMA_URL)), + gatewayUrl: yield* Config.string("GATEWAY_URL").pipe(Config.withDefault(DEFAULT_GATEWAY_URL)), + embedModel: yield* Config.string("EMBED_MODEL").pipe(Config.withDefault(DEFAULT_EMBED_MODEL)), + } satisfies DemoConfig; +}); + +const scriptError = (operation: string, cause: unknown) => + SeedDemoError.make({ + operation, + message: String(cause), + }); + +const stringifyJson = (operation: string, value: unknown) => + S.encodeUnknownEffect(S.UnknownFromJsonString)(value).pipe( + Effect.mapError((cause) => scriptError(operation, cause)), + ); + +const decodeJsonText = (operation: string, value: string) => + S.decodeUnknownEffect(S.UnknownFromJsonString)(value).pipe( + Effect.mapError((cause) => scriptError(operation, cause)), + ); + +const decodeWith = (operation: string, schema: S.Codec) => (value: unknown) => + S.decodeUnknownEffect(schema)(value).pipe( + Effect.mapError((cause) => scriptError(operation, cause)), + ); + +const readResponseText = Effect.fn("seed-demo.readResponseText")(function* ( + operation: string, + response: HttpClientResponse.HttpClientResponse, +) { + return yield* response.text.pipe( + Effect.mapError((cause) => scriptError(`${operation}.read-response`, cause)), + ); +}); + +const executeOkText = Effect.fn("seed-demo.executeOkText")(function* ( + operation: string, + request: HttpClientRequest.HttpClientRequest, +) { + const response = yield* HttpClient.execute(request).pipe( + Effect.flatMap(HttpClientResponse.filterStatusOk), + Effect.mapError((cause) => scriptError(`${operation}.http`, cause)), + ); + return yield* readResponseText(operation, response); +}); + +const postJsonText = Effect.fn("seed-demo.postJsonText")(function* ( + operation: string, + url: string, + body: unknown, +) { + const bodyText = yield* stringifyJson(`${operation}.encode-request`, body); + const request = HttpClientRequest.post(url, { acceptJson: true }).pipe( + HttpClientRequest.bodyText(bodyText, "application/json"), + ); + return yield* executeOkText(operation, request); +}); + +const putJsonText = Effect.fn("seed-demo.putJsonText")(function* ( + operation: string, + url: string, + body: unknown, +) { + const bodyText = yield* stringifyJson(`${operation}.encode-request`, body); + const request = HttpClientRequest.put(url, { acceptJson: true }).pipe( + HttpClientRequest.bodyText(bodyText, "application/json"), + ); + return yield* executeOkText(operation, request); +}); + +const httpAvailable = Effect.fn("seed-demo.httpAvailable")(function* (url: string) { + return yield* HttpClient.get(url).pipe( + Effect.timeout("3 seconds"), + Effect.map((response) => response.status >= 200 && response.status < 300), + Effect.catch(() => Effect.succeed(false)), + ); +}); + +const resourceExists = Effect.fn("seed-demo.resourceExists")(function* (url: string) { + return yield* HttpClient.get(url).pipe( + Effect.map((response) => response.status >= 200 && response.status < 300), + Effect.catch(() => Effect.succeed(false)), + ); +}); + // --------------------------------------------------------------------------- // Demo Knowledge Graph — AI Industry // --------------------------------------------------------------------------- @@ -473,16 +596,16 @@ function collectEntities(triples: RawTriple[]): string[] { entities.add(t.o); } } - return [...entities].sort(); + return A.sort(Array.from(entities), Order.String); } // --------------------------------------------------------------------------- // Connectivity checks // --------------------------------------------------------------------------- -async function checkFalkorDB(): Promise { +async function checkFalkorDB(config: DemoConfig): Promise { try { - const client = createClient({ url: FALKORDB_URL }); + const client = createClient({ url: config.falkorDbUrl }); await client.connect(); await client.ping(); await client.disconnect(); @@ -492,39 +615,18 @@ async function checkFalkorDB(): Promise { } } -async function checkQdrant(): Promise { - try { - const res = await fetch(`${QDRANT_URL}/collections`, { signal: AbortSignal.timeout(3000) }); - return res.ok; - } catch { - return false; - } -} +const checkQdrant = (config: DemoConfig) => httpAvailable(`${config.qdrantUrl}/collections`); -async function checkOllama(): Promise { - try { - const res = await fetch(`${OLLAMA_URL}/api/tags`, { signal: AbortSignal.timeout(3000) }); - return res.ok; - } catch { - return false; - } -} +const checkOllama = (config: DemoConfig) => httpAvailable(`${config.ollamaUrl}/api/tags`); -async function checkGateway(): Promise { - try { - const res = await fetch(`${GATEWAY_URL}/api/v1/metrics`, { signal: AbortSignal.timeout(3000) }); - return res.ok; - } catch { - return false; - } -} +const checkGateway = (config: DemoConfig) => httpAvailable(`${config.gatewayUrl}/api/v1/metrics`); // --------------------------------------------------------------------------- // FalkorDB seeding // --------------------------------------------------------------------------- -async function seedFalkorDB(triples: RawTriple[]): Promise { - const client = createClient({ url: FALKORDB_URL }); +async function seedFalkorDB(config: DemoConfig, triples: RawTriple[]): Promise { + const client = createClient({ url: config.falkorDbUrl }); await client.connect(); const graph = new Graph(client, DATABASE); @@ -581,19 +683,16 @@ async function seedFalkorDB(triples: RawTriple[]): Promise { // Ollama embeddings // --------------------------------------------------------------------------- -async function embed(texts: string[]): Promise { - const res = await fetch(`${OLLAMA_URL}/api/embed`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ model: EMBED_MODEL, input: texts }), +const embed = Effect.fn("seed-demo.embed")(function* (config: DemoConfig, texts: string[]) { + const responseText = yield* postJsonText("ollama.embed", `${config.ollamaUrl}/api/embed`, { + model: config.embedModel, + input: texts, }); - if (!res.ok) { - const body = await res.text(); - throw new Error(`Ollama embed failed (${res.status}): ${body}`); - } - const data = (await res.json()) as { embeddings: number[][] }; + const data = yield* decodeJsonText("ollama.embed.decode-json", responseText).pipe( + Effect.flatMap(decodeWith("ollama.embed.decode-response", OllamaEmbedResponse)), + ); return data.embeddings; -} +}); // --------------------------------------------------------------------------- // Document chunks for Doc RAG @@ -705,7 +804,7 @@ const DOCUMENT_CHUNKS: Array<{ id: string; content: string }> = [ // Qdrant seeding (document embeddings) // --------------------------------------------------------------------------- -async function seedDocumentChunks(): Promise { +const seedDocumentChunks = Effect.fn("seed-demo.seedDocumentChunks")(function* (config: DemoConfig) { // Embed all chunk content const BATCH_SIZE = 32; const allVectors: number[][] = []; @@ -713,7 +812,7 @@ async function seedDocumentChunks(): Promise { for (let i = 0; i < texts.length; i += BATCH_SIZE) { const batch = texts.slice(i, i + BATCH_SIZE); - const vecs = await embed(batch); + const vecs = yield* embed(config, batch); allVectors.push(...vecs); process.stdout.write( `\r Embedding doc chunks: ${Math.min(i + BATCH_SIZE, texts.length)}/${texts.length}`, @@ -725,14 +824,10 @@ async function seedDocumentChunks(): Promise { const collectionName = `d_${USER}_${COLLECTION}_${dim}`; // Create collection if needed - const existsRes = await fetch(`${QDRANT_URL}/collections/${collectionName}`); - if (!existsRes.ok) { - await fetch(`${QDRANT_URL}/collections/${collectionName}`, { - method: "PUT", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - vectors: { size: dim, distance: "Cosine" }, - }), + const exists = yield* resourceExists(`${config.qdrantUrl}/collections/${collectionName}`); + if (!exists) { + yield* putJsonText("qdrant.create-doc-collection", `${config.qdrantUrl}/collections/${collectionName}`, { + vectors: { size: dim, distance: "Cosine" }, }); console.log(` Created Qdrant collection: ${collectionName} (dim=${dim})`); } else { @@ -749,32 +844,28 @@ async function seedDocumentChunks(): Promise { }, })); - const res = await fetch(`${QDRANT_URL}/collections/${collectionName}/points`, { - method: "PUT", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ points }), + yield* putJsonText("qdrant.upsert-doc-points", `${config.qdrantUrl}/collections/${collectionName}/points`, { + points, }); - if (!res.ok) { - const body = await res.text(); - throw new Error(`Qdrant doc upsert failed: ${body}`); - } - console.log(` Qdrant: ${points.length} document chunk embeddings stored in ${collectionName}`); -} +}); // --------------------------------------------------------------------------- // Qdrant seeding (graph embeddings) // --------------------------------------------------------------------------- -async function seedQdrant(entities: string[]): Promise { +const seedQdrant = Effect.fn("seed-demo.seedQdrant")(function* ( + config: DemoConfig, + entities: string[], +) { // Batch embed in groups of 32 const BATCH_SIZE = 32; const allVectors: number[][] = []; for (let i = 0; i < entities.length; i += BATCH_SIZE) { const batch = entities.slice(i, i + BATCH_SIZE); - const vecs = await embed(batch); + const vecs = yield* embed(config, batch); allVectors.push(...vecs); process.stdout.write( `\r Embedding entities: ${Math.min(i + BATCH_SIZE, entities.length)}/${entities.length}`, @@ -786,14 +877,10 @@ async function seedQdrant(entities: string[]): Promise { const collectionName = `t_${USER}_${COLLECTION}_${dim}`; // Create collection if needed - const existsRes = await fetch(`${QDRANT_URL}/collections/${collectionName}`); - if (!existsRes.ok) { - await fetch(`${QDRANT_URL}/collections/${collectionName}`, { - method: "PUT", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - vectors: { size: dim, distance: "Cosine" }, - }), + const exists = yield* resourceExists(`${config.qdrantUrl}/collections/${collectionName}`); + if (!exists) { + yield* putJsonText("qdrant.create-entity-collection", `${config.qdrantUrl}/collections/${collectionName}`, { + vectors: { size: dim, distance: "Cosine" }, }); console.log(` Created Qdrant collection: ${collectionName} (dim=${dim})`); } else { @@ -811,41 +898,44 @@ async function seedQdrant(entities: string[]): Promise { payload: { entity }, })); - const res = await fetch(`${QDRANT_URL}/collections/${collectionName}/points`, { - method: "PUT", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ points }), + yield* putJsonText("qdrant.upsert-entity-points", `${config.qdrantUrl}/collections/${collectionName}/points`, { + points, }); - if (!res.ok) { - const body = await res.text(); - throw new Error(`Qdrant upsert failed: ${body}`); - } - upserted += points.length; process.stdout.write(`\r Upserting to Qdrant: ${upserted}/${entities.length}`); } console.log(); console.log(` Qdrant: ${upserted} entity embeddings stored`); -} +}); // --------------------------------------------------------------------------- // Config seeding (via gateway) // --------------------------------------------------------------------------- -async function seedConfig(): Promise { - async function pushConfig(keys: string[], values: Record): Promise { - const res = await fetch(`${GATEWAY_URL}/api/v1/config`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ operation: "put", keys, values }), +const seedConfig = Effect.fn("seed-demo.seedConfig")(function* (config: DemoConfig) { + const pushConfig = Effect.fn("seed-demo.seedConfig.pushConfig")(function* ( + keys: ReadonlyArray, + values: Record, + ) { + const responseText = yield* postJsonText("config.put", `${config.gatewayUrl}/api/v1/config`, { + operation: "put", + keys, + values, }); - const data = (await res.json()) as { error?: { message: string }; version?: number }; - if (data.error) throw new Error(`Config push failed: ${data.error.message}`); - console.log(` Config [${keys.join("/")}] → version ${data.version}`); - } + const data = yield* decodeJsonText("config.put.decode-json", responseText).pipe( + Effect.flatMap(decodeWith("config.put.decode-response", ConfigPushResponse)), + ); + if (data.error !== undefined) { + return yield* SeedDemoError.make({ + operation: "config.put", + message: data.error.message ?? "unknown gateway error", + }); + } + console.log(` Config [${keys.join("/")}] → version ${data.version ?? "unknown"}`); + }); - await pushConfig(["prompt"], { + yield* pushConfig(["prompt"], { "extract-relationships": { system: "You are a helpful assistant that extracts structured knowledge from text.", prompt: [ @@ -914,7 +1004,7 @@ async function seedConfig(): Promise { }, }); - await pushConfig(["flows"], { + yield* pushConfig(["flows"], { default: { topics: { "decode-input": "tg.flow.document", @@ -951,36 +1041,49 @@ async function seedConfig(): Promise { }, }, }); -} +}); // --------------------------------------------------------------------------- // Main // --------------------------------------------------------------------------- -async function main(): Promise { +const main = Effect.fn("seed-demo.main")(function* () { + const config = yield* loadConfig(); + console.log("╔══════════════════════════════════════════════════════════╗"); console.log("║ TrustGraph Demo Seeder — AI Industry KG ║"); console.log("╚══════════════════════════════════════════════════════════╝\n"); // Check services - const [hasFalkor, hasQdrant, hasOllama, hasGateway] = await Promise.all([ - checkFalkorDB(), - checkQdrant(), - checkOllama(), - checkGateway(), - ]); + const availability = yield* Effect.all({ + falkor: Effect.tryPromise({ + try: () => checkFalkorDB(config), + catch: (cause) => scriptError("check-falkordb", cause), + }).pipe(Effect.catch(() => Effect.succeed(false))), + qdrant: checkQdrant(config), + ollama: checkOllama(config), + gateway: checkGateway(config), + }, { concurrency: "unbounded" }); + + const hasFalkor = availability.falkor; + const hasQdrant = availability.qdrant; + const hasOllama = availability.ollama; + const hasGateway = availability.gateway; console.log("Service availability:"); - console.log(` FalkorDB (${FALKORDB_URL}): ${hasFalkor ? "✓" : "✗"}`); - console.log(` Qdrant (${QDRANT_URL}): ${hasQdrant ? "✓" : "✗"}`); - console.log(` Ollama (${OLLAMA_URL}): ${hasOllama ? "✓" : "✗"}`); - console.log(` Gateway (${GATEWAY_URL}): ${hasGateway ? "✓" : "✗"}`); + console.log(` FalkorDB (${config.falkorDbUrl}): ${hasFalkor ? "✓" : "✗"}`); + console.log(` Qdrant (${config.qdrantUrl}): ${hasQdrant ? "✓" : "✗"}`); + console.log(` Ollama (${config.ollamaUrl}): ${hasOllama ? "✓" : "✗"}`); + console.log(` Gateway (${config.gatewayUrl}): ${hasGateway ? "✓" : "✗"}`); console.log(); if (!hasFalkor && !hasQdrant && !hasGateway) { console.error("No services available. Start the TrustGraph stack first:"); console.error(" cd ts/deploy && docker compose up -d falkordb qdrant ollama nats"); - process.exit(1); + return yield* SeedDemoError.make({ + operation: "service-check", + message: "no seed targets available", + }); } const triples = buildTriples(); @@ -991,7 +1094,10 @@ async function main(): Promise { // Seed FalkorDB if (hasFalkor) { console.log("── Seeding FalkorDB ──"); - await seedFalkorDB(triples); + yield* Effect.tryPromise({ + try: () => seedFalkorDB(config, triples), + catch: (cause) => scriptError("seed-falkordb", cause), + }); console.log(); } else { console.log("⚠ Skipping FalkorDB (not available)\n"); @@ -1000,11 +1106,11 @@ async function main(): Promise { // Seed Qdrant (requires Ollama for embeddings) if (hasQdrant && hasOllama) { console.log("── Seeding Qdrant (entity embeddings) ──"); - await seedQdrant(entities); + yield* seedQdrant(config, entities); console.log(); console.log("── Seeding Qdrant (document chunk embeddings) ──"); - await seedDocumentChunks(); + yield* seedDocumentChunks(config); console.log(); } else if (hasQdrant) { console.log("⚠ Skipping Qdrant embeddings (Ollama not available for embedding generation)\n"); @@ -1015,7 +1121,7 @@ async function main(): Promise { // Seed config via gateway if (hasGateway) { console.log("── Seeding Config (prompt templates + flows) ──"); - await seedConfig(); + yield* seedConfig(config); console.log(); } else { console.log("⚠ Skipping config (gateway not available — run `pnpm seed` separately)\n"); @@ -1036,9 +1142,6 @@ async function main(): Promise { console.log(" • What is the Transformer architecture?"); console.log(" • Tell me about Demis Hassabis and his achievements"); console.log("═══════════════════════════════════════════════════════════"); -} - -main().catch((err) => { - console.error("\nSeed failed:", err); - process.exit(1); }); + +BunRuntime.runMain(main().pipe(Effect.provide(BunHttpClient.layer))); diff --git a/ts/scripts/seed-flows.ts b/ts/scripts/seed-flows.ts index 743d4680..67ca7c80 100644 --- a/ts/scripts/seed-flows.ts +++ b/ts/scripts/seed-flows.ts @@ -5,7 +5,64 @@ * Requires: gateway + flow-manager + config service running */ -const GATEWAY_URL = process.env.GATEWAY_URL ?? "http://localhost:8088"; +import { BunRuntime } from "@effect/platform-bun"; +import * as BunHttpClient from "@effect/platform-bun/BunHttpClient"; +import { Array as A, Config, Effect, Order, Schema as S } from "effect"; +import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"; + +const DEFAULT_GATEWAY_URL = "http://localhost:8088"; + +class SeedFlowsError extends S.TaggedErrorClass()( + "SeedFlowsError", + { + operation: S.String, + message: S.String, + }, +) {} + +const GatewayErrorBody = S.Struct({ + message: S.optionalKey(S.String), +}); + +const FlowListResponse = S.Struct({ + "flow-ids": S.optionalKey(S.Array(S.String)), + error: S.optionalKey(GatewayErrorBody), +}); + +const GatewayResponse = S.Struct({ + version: S.optionalKey(S.Number), + error: S.optionalKey(GatewayErrorBody), +}); + +const stringifyJson = (operation: string, value: unknown) => + S.encodeUnknownEffect(S.UnknownFromJsonString)(value).pipe( + Effect.mapError((cause) => + SeedFlowsError.make({ + operation, + message: String(cause), + }) + ), + ); + +const decodeJsonText = (operation: string, value: string) => + S.decodeUnknownEffect(S.UnknownFromJsonString)(value).pipe( + Effect.mapError((cause) => + SeedFlowsError.make({ + operation, + message: String(cause), + }) + ), + ); + +const decodeWith = (operation: string, schema: S.Codec) => (value: unknown) => + S.decodeUnknownEffect(schema)(value).pipe( + Effect.mapError((cause) => + SeedFlowsError.make({ + operation, + message: String(cause), + }) + ), + ); const FLOW_TOPICS = { // Document processing pipeline @@ -79,37 +136,66 @@ const SEEDED_FLOWS = [ }, ] as const; -async function postJson(path: string, body: Record): Promise { - const res = await fetch(`${GATEWAY_URL}${path}`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(body), - }); - const data = await res.json() as T & { error?: { message?: string } }; - if (!res.ok) { - throw new Error(`HTTP ${res.status}: ${JSON.stringify(data)}`); - } +const postJson = Effect.fn("seed-flows.postJson")(function* ( + gatewayUrl: string, + path: string, + body: Record, +) { + const bodyText = yield* stringifyJson("encode-request", body); + const request = HttpClientRequest.post(`${gatewayUrl}${path}`, { acceptJson: true }).pipe( + HttpClientRequest.bodyText(bodyText, "application/json"), + ); + const response = yield* HttpClient.execute(request).pipe( + Effect.flatMap(HttpClientResponse.filterStatusOk), + Effect.mapError((cause) => + SeedFlowsError.make({ + operation: "http-request", + message: String(cause), + }) + ), + ); + const responseText = yield* response.text.pipe( + Effect.mapError((cause) => + SeedFlowsError.make({ + operation: "read-response", + message: String(cause), + }) + ), + ); + return yield* decodeJsonText("decode-response-json", responseText); +}); + +const failOnGatewayError = Effect.fn("seed-flows.failOnGatewayError")(function* ( + operation: string, + data: { readonly error?: { readonly message?: string } }, +) { if (data.error !== undefined) { - throw new Error(data.error.message ?? JSON.stringify(data.error)); + return yield* SeedFlowsError.make({ + operation, + message: data.error.message ?? "unknown gateway error", + }); } - return data; -} +}); -async function listFlows(): Promise> { - const response = await postJson<{ "flow-ids"?: string[] }>("/api/v1/flow", { +const listFlows = Effect.fn("seed-flows.listFlows")(function* (gatewayUrl: string) { + const response = yield* postJson(gatewayUrl, "/api/v1/flow", { operation: "list-flows", - }); + }).pipe(Effect.flatMap(decodeWith("decode-flow-list", FlowListResponse))); + yield* failOnGatewayError("list-flows", response); return new Set(response["flow-ids"] ?? []); -} +}); -async function startMissingFlows(existing: Set): Promise { +const startMissingFlows = Effect.fn("seed-flows.startMissingFlows")(function* ( + gatewayUrl: string, + existing: Set, +) { for (const flow of SEEDED_FLOWS) { if (existing.has(flow.id)) { console.log(` Flow ${flow.id}: already running`); continue; } - await postJson("/api/v1/flow", { + const response = yield* postJson(gatewayUrl, "/api/v1/flow", { operation: "start-flow", "flow-id": flow.id, "blueprint-name": "default", @@ -118,40 +204,41 @@ async function startMissingFlows(existing: Set): Promise { user: "default", collection: "default", }, - }); + }).pipe(Effect.flatMap(decodeWith("decode-start-flow", GatewayResponse))); + yield* failOnGatewayError("start-flow", response); existing.add(flow.id); console.log(` Flow ${flow.id}: started`); } -} +}); -async function ensureFlowConfig(): Promise { +const ensureFlowConfig = Effect.fn("seed-flows.ensureFlowConfig")(function* (gatewayUrl: string) { const values = Object.fromEntries( SEEDED_FLOWS.map((flow) => [flow.id, { topics: FLOW_TOPICS }]), ); - const response = await postJson<{ version?: number }>("/api/v1/config", { + const response = yield* postJson(gatewayUrl, "/api/v1/config", { operation: "put", keys: ["flows"], values, - }); + }).pipe(Effect.flatMap(decodeWith("decode-flow-config", GatewayResponse))); + yield* failOnGatewayError("flow-config", response); console.log(` Flow config topics pushed -> version ${response.version ?? "unknown"}`); -} +}); + +const main = Effect.fn("seed-flows.main")(function* () { + const gatewayUrl = yield* Config.string("GATEWAY_URL").pipe(Config.withDefault(DEFAULT_GATEWAY_URL)); -async function main(): Promise { console.log("Seeding TrustGraph flows...\n"); - const existing = await listFlows(); - await startMissingFlows(existing); + const existing = yield* listFlows(gatewayUrl); + yield* startMissingFlows(gatewayUrl, existing); console.log("\nAligning flow config topics..."); - await ensureFlowConfig(); + yield* ensureFlowConfig(gatewayUrl); - const finalFlows = [...(await listFlows())].sort(); + const finalFlows = A.sort(Array.from(yield* listFlows(gatewayUrl)), Order.String); console.log(`\nActive flows: ${finalFlows.join(", ")}`); console.log("\nFlow seeding complete."); -} - -main().catch((err) => { - console.error("Seed flows failed:", err); - process.exit(1); }); + +BunRuntime.runMain(main().pipe(Effect.provide(BunHttpClient.layer))); diff --git a/ts/scripts/test-pipeline.ts b/ts/scripts/test-pipeline.ts index cb8a9fc9..91be6441 100644 --- a/ts/scripts/test-pipeline.ts +++ b/ts/scripts/test-pipeline.ts @@ -9,7 +9,15 @@ * Usage: pnpm tsx scripts/test-pipeline.ts */ -const GATEWAY_URL = process.env.GATEWAY_URL ?? "http://localhost:8088"; +import { BunRuntime } from "@effect/platform-bun"; +import * as BunHttpClient from "@effect/platform-bun/BunHttpClient"; +import { Config, Effect, Option as O, Schema as S } from "effect"; +import { HttpClient, HttpClientRequest } from "effect/unstable/http"; + +const DEFAULT_GATEWAY_URL = "http://localhost:8088"; +const DEFAULT_LLM_MODEL = "qwen2.5:0.5b"; +const DEFAULT_FALKORDB_URL = "redis://localhost:6380"; +const DEFAULT_PIPELINE_WAIT = 20; interface RpcSocket { close: () => void; @@ -22,24 +30,111 @@ interface RpcSocket { ) => Promise; } -// ─── Helpers ────────────────────────────────────────────────────────── - -async function post(path: string, body: unknown): Promise { - const res = await fetch(`${GATEWAY_URL}${path}`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(body), - }); - const text = await res.text(); - try { - return JSON.parse(text); - } catch { - return { status: res.status, body: text }; - } +interface PipelineConfig { + readonly gatewayUrl: string; + readonly gatewaySecret: string | undefined; + readonly llmModel: string; + readonly pipelineWaitSeconds: number; + readonly falkorDbUrl: string; + readonly skipPipeline: boolean; + readonly skipLlm: boolean; + readonly skipLibrarian: boolean; + readonly skipAgent: boolean; } +class PipelineTestError extends S.TaggedErrorClass()( + "PipelineTestError", + { + operation: S.String, + message: S.String, + }, +) {} + +const QdrantCollectionsResponse = S.Struct({ + result: S.optionalKey(S.Struct({ + collections: S.optionalKey(S.Array(S.Struct({ name: S.String }))), + })), +}); + +const pipelineError = (operation: string, cause: unknown) => + PipelineTestError.make({ + operation, + message: String(cause), + }); + +const skipFlag = (name: string) => + Config.string(name).pipe( + Config.withDefault("0"), + Config.map((value) => value === "1"), + ); + +const loadConfig = Effect.fn("test-pipeline.loadConfig")(function* () { + const gatewaySecret = yield* Config.string("GATEWAY_SECRET").pipe(Config.option); + return { + gatewayUrl: yield* Config.string("GATEWAY_URL").pipe(Config.withDefault(DEFAULT_GATEWAY_URL)), + gatewaySecret: O.getOrUndefined(gatewaySecret), + llmModel: yield* Config.string("LLM_MODEL").pipe(Config.withDefault(DEFAULT_LLM_MODEL)), + pipelineWaitSeconds: yield* Config.number("PIPELINE_WAIT").pipe(Config.withDefault(DEFAULT_PIPELINE_WAIT)), + falkorDbUrl: yield* Config.string("FALKORDB_URL").pipe(Config.withDefault(DEFAULT_FALKORDB_URL)), + skipPipeline: yield* skipFlag("SKIP_PIPELINE"), + skipLlm: yield* skipFlag("SKIP_LLM"), + skipLibrarian: yield* skipFlag("SKIP_LIBRARIAN"), + skipAgent: yield* skipFlag("SKIP_AGENT"), + } satisfies PipelineConfig; +}); + +// ─── Helpers ────────────────────────────────────────────────────────── + +const stringifyJson = (operation: string, value: unknown) => + S.encodeUnknownEffect(S.UnknownFromJsonString)(value).pipe( + Effect.mapError((cause) => pipelineError(operation, cause)), + ); + +const decodeJsonText = (operation: string, value: string) => + S.decodeUnknownEffect(S.UnknownFromJsonString)(value).pipe( + Effect.mapError((cause) => pipelineError(operation, cause)), + ); + +const post = Effect.fn("test-pipeline.post")(function* ( + config: PipelineConfig, + path: string, + body: unknown, +) { + const bodyText = yield* stringifyJson("post.encode-request", body); + const request = HttpClientRequest.post(`${config.gatewayUrl}${path}`, { acceptJson: true }).pipe( + HttpClientRequest.bodyText(bodyText, "application/json"), + ); + const response = yield* HttpClient.execute(request).pipe( + Effect.mapError((cause) => pipelineError("post.http", cause)), + ); + const text = yield* response.text.pipe( + Effect.mapError((cause) => pipelineError("post.read-response", cause)), + ); + return yield* decodeJsonText("post.decode-response", text).pipe( + Effect.catch(() => Effect.succeed({ status: response.status, body: text })), + ); +}); + +const getJson = Effect.fn("test-pipeline.getJson")(function* (url: string) { + const response = yield* HttpClient.get(url, { acceptJson: true }).pipe( + Effect.mapError((cause) => pipelineError("get.http", cause)), + ); + const text = yield* response.text.pipe( + Effect.mapError((cause) => pipelineError("get.read-response", cause)), + ); + return yield* decodeJsonText("get.decode-response", text); +}); + +const gatewayReachable = Effect.fn("test-pipeline.gatewayReachable")(function* (config: PipelineConfig) { + return yield* HttpClient.get(`${config.gatewayUrl}/api/v1/metrics`).pipe( + Effect.map((response) => response.status >= 200 && response.status < 300), + Effect.catch(() => Effect.succeed(false)), + ); +}); + function log(label: string, data: unknown): void { - console.log(`\n[${label}]`, JSON.stringify(data, null, 2)); + console.log(`\n[${label}]`); + console.dir(data, { depth: null }); } function pass(test: string): void { @@ -50,11 +145,18 @@ function fail(test: string, err: unknown): void { console.error(` ✗ ${test}:`, err); } +const catchTest = (name: string, effect: Effect.Effect) => + effect.pipe( + Effect.catch((err) => { + fail(name, err); + return Effect.succeed(false); + }), + ); + // ─── Tests ──────────────────────────────────────────────────────────── -async function testConfigList(): Promise { - try { - const res = await post("/api/v1/config", { operation: "list", keys: [] }); +const testConfigList = (config: PipelineConfig) => catchTest("Config list", Effect.gen(function* () { + const res = yield* post(config, "/api/v1/config", { operation: "list", keys: [] }); log("config/list", res); if (typeof res === "object" && res !== null && "version" in res) { pass("Config list returns version"); @@ -62,15 +164,10 @@ async function testConfigList(): Promise { } fail("Config list", "unexpected response"); return false; - } catch (err) { - fail("Config list", err); - return false; - } -} +})); -async function testConfigPut(): Promise { - try { - const res = await post("/api/v1/config", { +const testConfigPut = (config: PipelineConfig) => catchTest("Config put", Effect.gen(function* () { + const res = yield* post(config, "/api/v1/config", { operation: "put", keys: ["test"], values: { greeting: "hello from trustgraph-ts!" }, @@ -82,15 +179,10 @@ async function testConfigPut(): Promise { } fail("Config put", "unexpected response"); return false; - } catch (err) { - fail("Config put", err); - return false; - } -} +})); -async function testConfigGet(): Promise { - try { - const res = await post("/api/v1/config", { +const testConfigGet = (config: PipelineConfig) => catchTest("Config get", Effect.gen(function* () { + const res = yield* post(config, "/api/v1/config", { operation: "get", keys: ["test"], }); @@ -103,22 +195,17 @@ async function testConfigGet(): Promise { } fail("Config get", "value mismatch"); return false; - } catch (err) { - fail("Config get", err); - return false; - } -} +})); -async function testConfigDelete(): Promise { - try { - const res = await post("/api/v1/config", { +const testConfigDelete = (config: PipelineConfig) => catchTest("Config delete", Effect.gen(function* () { + const res = yield* post(config, "/api/v1/config", { operation: "delete", keys: ["test"], }); log("config/delete", res); // Verify it's gone - const check = await post("/api/v1/config", { + const check = yield* post(config, "/api/v1/config", { operation: "get", keys: ["test"], }) as Record; @@ -130,16 +217,11 @@ async function testConfigDelete(): Promise { } fail("Config delete", "value still present"); return false; - } catch (err) { - fail("Config delete", err); - return false; - } -} +})); -async function testPushFlowConfig(): Promise { - try { +const testPushFlowConfig = (config: PipelineConfig) => catchTest("Flow config push", Effect.gen(function* () { // Push a full flow definition with all service topic mappings - const res = await post("/api/v1/config", { + const res = yield* post(config, "/api/v1/config", { operation: "put", keys: ["flows"], values: { @@ -193,21 +275,14 @@ async function testPushFlowConfig(): Promise { } fail("Flow config push", "unexpected response"); return false; - } catch (err) { - fail("Flow config push", err); - return false; - } -} +})); -async function testTextCompletion(): Promise { - try { +const testTextCompletion = (config: PipelineConfig) => catchTest("Text completion", Effect.gen(function* () { console.log("\n Sending text-completion request (may take a few seconds)..."); - // Use model from env or default to qwen2.5:0.5b (Ollama-compatible) - const model = process.env.LLM_MODEL ?? "qwen2.5:0.5b"; - const res = await post("/api/v1/flow/default/service/text-completion", { + const res = yield* post(config, "/api/v1/flow/default/service/text-completion", { system: "You are a helpful assistant. Reply in one sentence.", prompt: "What is 2+2?", - model, + model: config.llmModel, }); log("text-completion", res); const r = res as Record; @@ -221,55 +296,45 @@ async function testTextCompletion(): Promise { } fail("Text completion", "unexpected response"); return false; - } catch (err) { - fail("Text completion", err); - return false; - } -} +})); -async function testWebSocket(): Promise { +const testWebSocket = (config: PipelineConfig) => catchTest("Effect RPC WebSocket", Effect.gen(function* () { let socket: RpcSocket | undefined; - try { - const { createTrustGraphSocket } = await import( - "../packages/client/src/socket/trustgraph-socket.js" - ); + return yield* Effect.gen(function* () { + const { createTrustGraphSocket } = yield* Effect.tryPromise({ + try: () => import("../packages/client/src/socket/trustgraph-socket.js"), + catch: (cause) => pipelineError("websocket.import", cause), + }); - const gatewayWsUrl = GATEWAY_URL.replace(/^http/, "ws").replace(/\/$/, ""); + const gatewayWsUrl = config.gatewayUrl.replace(/^http/, "ws").replace(/\/$/, ""); socket = createTrustGraphSocket( "pipeline", - process.env.GATEWAY_SECRET, + config.gatewaySecret, `${gatewayWsUrl}/api/v1/rpc`, ); - const response = await Promise.race([ - socket.makeRequest, Record>( + const response = yield* Effect.tryPromise({ + try: () => + socket?.makeRequest, Record>( "config", { operation: "list", keys: [] }, 5000, - ), - new Promise((_, reject) => - setTimeout(() => reject(new Error("connection timeout")), 5000) - ), - ]); + ) ?? Promise.resolve({}), + catch: (cause) => pipelineError("websocket.request", cause), + }).pipe(Effect.timeout("5 seconds")); log("websocket/rpc-response", response); pass("Effect RPC WebSocket round-trip works"); return true; - } catch (err) { - fail("Effect RPC WebSocket", err); - return false; - } finally { - socket?.close(); - } -} + }).pipe(Effect.ensuring(Effect.sync(() => socket?.close()))); +})); // ─── Librarian Tests ────────────────────────────────────────────────── let testDocId = ""; -async function testLibrarianAdd(): Promise { - try { +const testLibrarianAdd = (config: PipelineConfig) => catchTest("Librarian add-document", Effect.gen(function* () { const content = Buffer.from("Hello from TrustGraph TypeScript!").toString("base64"); - const res = await post("/api/v1/librarian", { + const res = yield* post(config, "/api/v1/librarian", { operation: "add-document", user: "test-user", collection: "test-collection", @@ -299,15 +364,10 @@ async function testLibrarianAdd(): Promise { } fail("Librarian add-document", "no documentMetadata.id in response"); return false; - } catch (err) { - fail("Librarian add-document", err); - return false; - } -} +})); -async function testLibrarianList(): Promise { - try { - const res = await post("/api/v1/librarian", { +const testLibrarianList = (config: PipelineConfig) => catchTest("Librarian list-documents", Effect.gen(function* () { + const res = yield* post(config, "/api/v1/librarian", { operation: "list-documents", user: "test-user", }); @@ -320,19 +380,14 @@ async function testLibrarianList(): Promise { } fail("Librarian list-documents", "empty or missing documents array"); return false; - } catch (err) { - fail("Librarian list-documents", err); - return false; - } -} +})); -async function testLibrarianGetContent(): Promise { +const testLibrarianGetContent = (config: PipelineConfig) => catchTest("Librarian get-content", Effect.gen(function* () { if (!testDocId) { fail("Librarian get-content", "no document ID from add test"); return false; } - try { - const res = await post("/api/v1/librarian", { + const res = yield* post(config, "/api/v1/librarian", { operation: "get-document-content", documentId: testDocId, user: "test-user", @@ -350,19 +405,14 @@ async function testLibrarianGetContent(): Promise { } fail("Librarian get-content", "no content in response"); return false; - } catch (err) { - fail("Librarian get-content", err); - return false; - } -} +})); -async function testLibrarianDelete(): Promise { +const testLibrarianDelete = (config: PipelineConfig) => catchTest("Librarian delete", Effect.gen(function* () { if (!testDocId) { fail("Librarian delete", "no document ID from add test"); return false; } - try { - const res = await post("/api/v1/librarian", { + const res = yield* post(config, "/api/v1/librarian", { operation: "remove-document", documentId: testDocId, user: "test-user", @@ -370,7 +420,7 @@ async function testLibrarianDelete(): Promise { log("librarian/delete", res); // Verify it's gone - const listRes = await post("/api/v1/librarian", { + const listRes = yield* post(config, "/api/v1/librarian", { operation: "list-documents", user: "test-user", }) as Record; @@ -381,19 +431,14 @@ async function testLibrarianDelete(): Promise { } fail("Librarian remove-document", "document still present after delete"); return false; - } catch (err) { - fail("Librarian delete", err); - return false; - } -} +})); // ─── Document Load Test ────────────────────────────────────────────── -async function testDocumentLoad(): Promise { - try { +const testDocumentLoad = (config: PipelineConfig) => catchTest("Document load", Effect.gen(function* () { // First upload a test document via librarian const content = Buffer.from("Test document for pipeline processing.").toString("base64"); - const addRes = await post("/api/v1/librarian", { + const addRes = yield* post(config, "/api/v1/librarian", { operation: "add-document", user: "test-user", collection: "test-collection", @@ -418,23 +463,18 @@ async function testDocumentLoad(): Promise { const docId = meta.id as string; // Trigger document processing via the load endpoint - const res = await fetch(`${GATEWAY_URL}/api/v1/flow/default/load`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - documentId: docId, - user: "test-user", - collection: "test-collection", - }), + const data = yield* post(config, "/api/v1/flow/default/load", { + documentId: docId, + user: "test-user", + collection: "test-collection", }); - const data = await res.json() as Record; log("document-load", data); if (data.status === "processing") { pass(`Document load triggered for ${docId.slice(0, 8)}...`); // Clean up the test document - await post("/api/v1/librarian", { + yield* post(config, "/api/v1/librarian", { operation: "remove-document", documentId: docId, user: "test-user", @@ -444,21 +484,25 @@ async function testDocumentLoad(): Promise { } fail("Document load", "unexpected response"); return false; - } catch (err) { - fail("Document load", err); - return false; - } -} +})); // ─── Full Pipeline Test (real PDF) ─────────────────────────────────── -async function testFullPipeline(): Promise { - try { +const testFullPipeline = (config: PipelineConfig) => catchTest("Full pipeline", Effect.gen(function* () { // 1. Generate a test PDF in memory using pdf-lib - const { PDFDocument, StandardFonts } = await import("pdf-lib"); + const { PDFDocument, StandardFonts } = yield* Effect.tryPromise({ + try: () => import("pdf-lib"), + catch: (cause) => pipelineError("full-pipeline.import-pdf-lib", cause), + }); - const pdfDoc = await PDFDocument.create(); - const font = await pdfDoc.embedFont(StandardFonts.Helvetica); + const pdfDoc = yield* Effect.tryPromise({ + try: () => PDFDocument.create(), + catch: (cause) => pipelineError("full-pipeline.create-pdf", cause), + }); + const font = yield* Effect.tryPromise({ + try: () => pdfDoc.embedFont(StandardFonts.Helvetica), + catch: (cause) => pipelineError("full-pipeline.embed-font", cause), + }); const texts = [ "Alice Johnson is a senior engineer at Acme Corporation. Acme develops CloudSync, a cloud storage platform. CloudSync uses Amazon Web Services for hosting.", @@ -470,13 +514,16 @@ async function testFullPipeline(): Promise { page.drawText(text, { x: 50, y: 700, size: 11, font, maxWidth: 500 }); } - const pdfBytes = await pdfDoc.save(); + const pdfBytes = yield* Effect.tryPromise({ + try: () => pdfDoc.save(), + catch: (cause) => pipelineError("full-pipeline.save-pdf", cause), + }); const content = Buffer.from(pdfBytes).toString("base64"); console.log(` Generated test PDF: ${pdfBytes.length} bytes, 2 pages`); // 2. Upload to librarian as application/pdf - const addRes = await post("/api/v1/librarian", { + const addRes = yield* post(config, "/api/v1/librarian", { operation: "add-document", user: "test", collection: "test", @@ -502,61 +549,76 @@ async function testFullPipeline(): Promise { console.log(` Uploaded PDF as document ${docId.slice(0, 8)}...`); // 3. Trigger pipeline processing - const loadRes = await fetch(`${GATEWAY_URL}/api/v1/flow/default/load`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ documentId: docId, user: "test", collection: "test" }), + const loadData = yield* post(config, "/api/v1/flow/default/load", { + documentId: docId, + user: "test", + collection: "test", }); - const loadData = await loadRes.json() as Record; - if (loadData.status !== "processing") { - fail("Full pipeline", `load returned: ${JSON.stringify(loadData)}`); + const loadRecord = loadData as Record; + if (loadRecord.status !== "processing") { + fail("Full pipeline", `load returned: ${String(loadData)}`); return false; } console.log(" Pipeline triggered, waiting for processing..."); // 4. Wait for pipeline to complete (PDF decode + chunking + extraction + storage) // This involves multiple LLM calls so give it time - const waitSecs = Number.parseInt(process.env.PIPELINE_WAIT ?? "20", 10); - for (let i = waitSecs; i > 0; i--) { + for (let i = config.pipelineWaitSeconds; i > 0; i--) { process.stdout.write(`\r Waiting... ${i}s remaining `); - await new Promise((r) => setTimeout(r, 1000)); + yield* Effect.sleep("1 second"); } console.log("\r Processing wait complete. "); // 5. Verify triples in FalkorDB let triplesFound = false; - try { - const { createClient } = await import("falkordb"); - const client = createClient({ - url: process.env.FALKORDB_URL ?? "redis://localhost:6380", - }); - await client.connect(); - const graph = client.graph("falkordb"); - const result = await graph.query("MATCH (n:Node) RETURN count(n) as cnt"); - const count = result.data?.[0]?.[0] ?? 0; - await client.disconnect(); + const falkorCount = yield* Effect.tryPromise({ + try: async () => { + const { createClient } = await import("falkordb"); + const client = createClient({ + url: config.falkorDbUrl, + }); + await client.connect(); + const graph = client.graph("falkordb"); + const result = await graph.query("MATCH (n:Node) RETURN count(n) as cnt"); + const count = result.data?.[0]?.[0] ?? 0; + await client.disconnect(); + return count; + }, + catch: (cause) => pipelineError("full-pipeline.falkordb", cause), + }).pipe( + Effect.catch((err) => { + const errStr = String(err); + if (errStr.includes("Cannot find package") || errStr.includes("MODULE_NOT_FOUND")) { + console.log(" FalkorDB check skipped: falkordb package not available at workspace root"); + } else { + console.log(` FalkorDB check failed: ${err}`); + } + return Effect.succeed(undefined); + }), + ); - if (typeof count === "number" && count > 0) { - console.log(` FalkorDB: ${count} nodes found`); + if (typeof falkorCount === "number" && falkorCount > 0) { + console.log(` FalkorDB: ${falkorCount} nodes found`); triplesFound = true; } else { - console.log(` FalkorDB: no nodes found (count=${count})`); + console.log(` FalkorDB: no nodes found (count=${falkorCount})`); } - } catch (err) { - const errStr = String(err); - if (errStr.includes("Cannot find package") || errStr.includes("MODULE_NOT_FOUND")) { - console.log(" FalkorDB check skipped: falkordb package not available at workspace root"); - } else { - console.log(` FalkorDB check failed: ${err}`); - } - } // 6. Verify embeddings in Qdrant let embeddingsFound = false; - try { - const qdrantRes = await fetch("http://localhost:6333/collections"); - const qdrantData = await qdrantRes.json() as { result?: { collections?: Array<{ name: string }> } }; + const qdrantData = yield* getJson("http://localhost:6333/collections").pipe( + Effect.flatMap((value) => + S.decodeUnknownEffect(QdrantCollectionsResponse)(value).pipe( + Effect.mapError((cause) => pipelineError("full-pipeline.qdrant.decode", cause)), + ) + ), + Effect.catch((err) => { + console.log(` Qdrant check failed: ${err}`); + return Effect.succeed(undefined); + }), + ); + if (qdrantData !== undefined) { const collections = qdrantData.result?.collections ?? []; const testCollections = collections.filter((c) => c.name.startsWith("t_test_test_")); @@ -566,8 +628,6 @@ async function testFullPipeline(): Promise { } else { console.log(` Qdrant: no test collections found (total: ${collections.length} collections)`); } - } catch (err) { - console.log(` Qdrant check failed: ${err}`); } // 7. Report results @@ -584,21 +644,15 @@ async function testFullPipeline(): Promise { // Pipeline triggered but stores not populated yet — partial success pass("Full pipeline: triggered successfully (stores may need more time)"); return true; - } catch (err) { - fail("Full pipeline", err); - return false; - } -} +})); // ─── Agent Test ─────────────────────────────────────────────────────── -async function testAgentQuery(): Promise { - try { +const testAgentQuery = (config: PipelineConfig) => catchTest("Agent", Effect.gen(function* () { console.log("\n Sending agent request (may take a few seconds)..."); - const model = process.env.LLM_MODEL ?? "qwen2.5:0.5b"; - const res = await post("/api/v1/flow/default/service/agent", { + const res = yield* post(config, "/api/v1/flow/default/service/agent", { question: "What is the capital of France?", - model, + model: config.llmModel, }); log("agent", res); const r = res as Record; @@ -615,91 +669,94 @@ async function testAgentQuery(): Promise { } fail("Agent", "unexpected response format"); return false; - } catch (err) { - fail("Agent", err); - return false; - } -} +})); // ─── Main ───────────────────────────────────────────────────────────── -async function main(): Promise { +const main = Effect.fn("test-pipeline.main")(function* () { + const config = yield* loadConfig(); + console.log("╔══════════════════════════════════════════════════╗"); console.log("║ TrustGraph TypeScript — Integration Test ║"); console.log("╚══════════════════════════════════════════════════╝"); - console.log(`\nGateway: ${GATEWAY_URL}`); + console.log(`\nGateway: ${config.gatewayUrl}`); // Check gateway is reachable - try { - const res = await fetch(`${GATEWAY_URL}/api/v1/metrics`); - if (!res.ok) throw new Error(`HTTP ${res.status}`); + const isReachable = yield* gatewayReachable(config); + if (isReachable) { pass("Gateway reachable"); - } catch (err) { - fail("Gateway reachable", err); + } else { + fail("Gateway reachable", "metrics endpoint unavailable"); console.error("\n⚠ Gateway not running. Start it first:"); console.error(" pnpm tsx scripts/run-gateway.ts"); - process.exit(1); + return yield* PipelineTestError.make({ + operation: "gateway-reachable", + message: "gateway metrics endpoint unavailable", + }); } let passed = 0; let failed = 0; - const run = async (name: string, fn: () => Promise) => { + const run = Effect.fn("test-pipeline.run")(function* ( + name: string, + test: Effect.Effect, + ) { console.log(`\n── ${name} ──`); - if (await fn()) passed++; + if (yield* test) passed++; else failed++; - }; + }); // Config CRUD tests - await run("Config List", testConfigList); - await run("Config Put", testConfigPut); - await run("Config Get", testConfigGet); - await run("Config Delete", testConfigDelete); + yield* run("Config List", testConfigList(config)); + yield* run("Config Put", testConfigPut(config)); + yield* run("Config Get", testConfigGet(config)); + yield* run("Config Delete", testConfigDelete(config)); // WebSocket test - await run("WebSocket Round-Trip", testWebSocket); + yield* run("WebSocket Round-Trip", testWebSocket(config)); // Flow config push - await run("Push Flow Config", testPushFlowConfig); + yield* run("Push Flow Config", testPushFlowConfig(config)); // Document pipeline load test (requires librarian + gateway) - if (process.env.SKIP_PIPELINE !== "1" && process.env.SKIP_LIBRARIAN !== "1") { + if (!config.skipPipeline && !config.skipLibrarian) { console.log("\n (Testing document load — set SKIP_PIPELINE=1 to skip)"); - await run("Document Load", testDocumentLoad); + yield* run("Document Load", testDocumentLoad(config)); } else { console.log("\n (Skipping document pipeline load test)"); } // LLM test (only if a running LLM service is available) - if (process.env.SKIP_LLM !== "1") { + if (!config.skipLlm) { console.log("\n (Testing text-completion — set SKIP_LLM=1 to skip)"); - await run("Text Completion", testTextCompletion); + yield* run("Text Completion", testTextCompletion(config)); } else { console.log("\n (SKIP_LLM=1 — skipping LLM test)"); } // Librarian tests (only if librarian service is running) - if (process.env.SKIP_LIBRARIAN !== "1") { + if (!config.skipLibrarian) { console.log("\n (Testing librarian — set SKIP_LIBRARIAN=1 to skip)"); - await run("Librarian Add", testLibrarianAdd); - await run("Librarian List", testLibrarianList); - await run("Librarian Get Content", testLibrarianGetContent); - await run("Librarian Delete", testLibrarianDelete); + yield* run("Librarian Add", testLibrarianAdd(config)); + yield* run("Librarian List", testLibrarianList(config)); + yield* run("Librarian Get Content", testLibrarianGetContent(config)); + yield* run("Librarian Delete", testLibrarianDelete(config)); } else { console.log("\n (SKIP_LIBRARIAN=1 — skipping librarian tests)"); } // Full pipeline test (real PDF → decode → chunk → extract → store) - if (process.env.SKIP_PIPELINE !== "1" && process.env.SKIP_LLM !== "1") { + if (!config.skipPipeline && !config.skipLlm) { console.log("\n (Testing full pipeline with real PDF — set SKIP_PIPELINE=1 to skip)"); - await run("Full Pipeline", testFullPipeline); + yield* run("Full Pipeline", testFullPipeline(config)); } else { console.log("\n (Skipping full pipeline test)"); } // Agent test (only if agent + LLM services are running) - if (process.env.SKIP_AGENT !== "1" && process.env.SKIP_LLM !== "1") { + if (!config.skipAgent && !config.skipLlm) { console.log("\n (Testing agent — set SKIP_AGENT=1 to skip)"); - await run("Agent Query", testAgentQuery); + yield* run("Agent Query", testAgentQuery(config)); } else { console.log("\n (Skipping agent test)"); } @@ -708,7 +765,12 @@ async function main(): Promise { console.log(` Results: ${passed} passed, ${failed} failed`); console.log("══════════════════════════════════════════════════\n"); - process.exit(failed > 0 ? 1 : 0); -} + if (failed > 0) { + return yield* PipelineTestError.make({ + operation: "results", + message: `${failed} integration test(s) failed`, + }); + } +}); -main(); +BunRuntime.runMain(main().pipe(Effect.provide(BunHttpClient.layer)));