refactor(ts): make maintenance scripts effect native

This commit is contained in:
elpresidank 2026-06-11 08:25:13 -05:00
parent a7bdbb9257
commit cd6c9107d7
7 changed files with 845 additions and 475 deletions

View file

@ -5,7 +5,64 @@
* Requires: gateway + flow-manager + config service running
*/
const GATEWAY_URL = process.env.GATEWAY_URL ?? "http://localhost:8088";
import { BunRuntime } from "@effect/platform-bun";
import * as BunHttpClient from "@effect/platform-bun/BunHttpClient";
import { Array as A, Config, Effect, Order, Schema as S } from "effect";
import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http";
const DEFAULT_GATEWAY_URL = "http://localhost:8088";
class SeedFlowsError extends S.TaggedErrorClass<SeedFlowsError>()(
"SeedFlowsError",
{
operation: S.String,
message: S.String,
},
) {}
const GatewayErrorBody = S.Struct({
message: S.optionalKey(S.String),
});
const FlowListResponse = S.Struct({
"flow-ids": S.optionalKey(S.Array(S.String)),
error: S.optionalKey(GatewayErrorBody),
});
const GatewayResponse = S.Struct({
version: S.optionalKey(S.Number),
error: S.optionalKey(GatewayErrorBody),
});
const stringifyJson = (operation: string, value: unknown) =>
S.encodeUnknownEffect(S.UnknownFromJsonString)(value).pipe(
Effect.mapError((cause) =>
SeedFlowsError.make({
operation,
message: String(cause),
})
),
);
const decodeJsonText = (operation: string, value: string) =>
S.decodeUnknownEffect(S.UnknownFromJsonString)(value).pipe(
Effect.mapError((cause) =>
SeedFlowsError.make({
operation,
message: String(cause),
})
),
);
const decodeWith = <A, I, R>(operation: string, schema: S.Codec<A, I, R>) => (value: unknown) =>
S.decodeUnknownEffect(schema)(value).pipe(
Effect.mapError((cause) =>
SeedFlowsError.make({
operation,
message: String(cause),
})
),
);
const FLOW_TOPICS = {
// Document processing pipeline
@ -79,37 +136,66 @@ const SEEDED_FLOWS = [
},
] as const;
async function postJson<T>(path: string, body: Record<string, unknown>): Promise<T> {
const res = await fetch(`${GATEWAY_URL}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
const data = await res.json() as T & { error?: { message?: string } };
if (!res.ok) {
throw new Error(`HTTP ${res.status}: ${JSON.stringify(data)}`);
}
const postJson = Effect.fn("seed-flows.postJson")(function* (
gatewayUrl: string,
path: string,
body: Record<string, unknown>,
) {
const bodyText = yield* stringifyJson("encode-request", body);
const request = HttpClientRequest.post(`${gatewayUrl}${path}`, { acceptJson: true }).pipe(
HttpClientRequest.bodyText(bodyText, "application/json"),
);
const response = yield* HttpClient.execute(request).pipe(
Effect.flatMap(HttpClientResponse.filterStatusOk),
Effect.mapError((cause) =>
SeedFlowsError.make({
operation: "http-request",
message: String(cause),
})
),
);
const responseText = yield* response.text.pipe(
Effect.mapError((cause) =>
SeedFlowsError.make({
operation: "read-response",
message: String(cause),
})
),
);
return yield* decodeJsonText("decode-response-json", responseText);
});
const failOnGatewayError = Effect.fn("seed-flows.failOnGatewayError")(function* (
operation: string,
data: { readonly error?: { readonly message?: string } },
) {
if (data.error !== undefined) {
throw new Error(data.error.message ?? JSON.stringify(data.error));
return yield* SeedFlowsError.make({
operation,
message: data.error.message ?? "unknown gateway error",
});
}
return data;
}
});
async function listFlows(): Promise<Set<string>> {
const response = await postJson<{ "flow-ids"?: string[] }>("/api/v1/flow", {
const listFlows = Effect.fn("seed-flows.listFlows")(function* (gatewayUrl: string) {
const response = yield* postJson(gatewayUrl, "/api/v1/flow", {
operation: "list-flows",
});
}).pipe(Effect.flatMap(decodeWith("decode-flow-list", FlowListResponse)));
yield* failOnGatewayError("list-flows", response);
return new Set(response["flow-ids"] ?? []);
}
});
async function startMissingFlows(existing: Set<string>): Promise<void> {
const startMissingFlows = Effect.fn("seed-flows.startMissingFlows")(function* (
gatewayUrl: string,
existing: Set<string>,
) {
for (const flow of SEEDED_FLOWS) {
if (existing.has(flow.id)) {
console.log(` Flow ${flow.id}: already running`);
continue;
}
await postJson("/api/v1/flow", {
const response = yield* postJson(gatewayUrl, "/api/v1/flow", {
operation: "start-flow",
"flow-id": flow.id,
"blueprint-name": "default",
@ -118,40 +204,41 @@ async function startMissingFlows(existing: Set<string>): Promise<void> {
user: "default",
collection: "default",
},
});
}).pipe(Effect.flatMap(decodeWith("decode-start-flow", GatewayResponse)));
yield* failOnGatewayError("start-flow", response);
existing.add(flow.id);
console.log(` Flow ${flow.id}: started`);
}
}
});
async function ensureFlowConfig(): Promise<void> {
const ensureFlowConfig = Effect.fn("seed-flows.ensureFlowConfig")(function* (gatewayUrl: string) {
const values = Object.fromEntries(
SEEDED_FLOWS.map((flow) => [flow.id, { topics: FLOW_TOPICS }]),
);
const response = await postJson<{ version?: number }>("/api/v1/config", {
const response = yield* postJson(gatewayUrl, "/api/v1/config", {
operation: "put",
keys: ["flows"],
values,
});
}).pipe(Effect.flatMap(decodeWith("decode-flow-config", GatewayResponse)));
yield* failOnGatewayError("flow-config", response);
console.log(` Flow config topics pushed -> version ${response.version ?? "unknown"}`);
}
});
const main = Effect.fn("seed-flows.main")(function* () {
const gatewayUrl = yield* Config.string("GATEWAY_URL").pipe(Config.withDefault(DEFAULT_GATEWAY_URL));
async function main(): Promise<void> {
console.log("Seeding TrustGraph flows...\n");
const existing = await listFlows();
await startMissingFlows(existing);
const existing = yield* listFlows(gatewayUrl);
yield* startMissingFlows(gatewayUrl, existing);
console.log("\nAligning flow config topics...");
await ensureFlowConfig();
yield* ensureFlowConfig(gatewayUrl);
const finalFlows = [...(await listFlows())].sort();
const finalFlows = A.sort(Array.from(yield* listFlows(gatewayUrl)), Order.String);
console.log(`\nActive flows: ${finalFlows.join(", ")}`);
console.log("\nFlow seeding complete.");
}
main().catch((err) => {
console.error("Seed flows failed:", err);
process.exit(1);
});
BunRuntime.runMain(main().pipe(Effect.provide(BunHttpClient.layer)));