From 88db18fbda0cc341285488b8025c9eca6288a342 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 00:40:44 -0500 Subject: [PATCH] Migrate config service to ref-backed Effect runtime --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 67 +- .../flow/src/__tests__/config-service.test.ts | 84 +- ts/packages/flow/src/config/service.ts | 1504 +++++++++-------- ts/scripts/run-config.ts | 7 +- 4 files changed, 907 insertions(+), 755 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 21c36656..39941eb7 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,23 +12,23 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 strict tsgo -slice: +Current signal counts from `ts/packages` after the 2026-06-02 config service +runtime slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 208 | -| `Map<` | 58 | -| `WebSocket` | 45 | -| `new Map` | 45 | +| `Effect.runPromise` | 207 | +| `Map<` | 65 | +| `WebSocket` | 51 | +| `new Map` | 47 | | `toPromiseRequestor` | 19 | | `makeAsyncProcessor` | 19 | | `receive(` | 18 | -| `while (` | 13 | +| `while (` | 12 | | `new Error` | 14 | | `new Promise` | 10 | | `JSON.parse` | 8 | -| `localStorage` | 8 | +| `localStorage` | 9 | | `JSON.stringify` | 6 | | `setTimeout` | 4 | | `process.env` | 3 | @@ -78,7 +78,7 @@ Notes: ### 2026-06-02: Strict Base, CLI, MCP, And tsgo Slice -- Status: migrated, root-verified, ready to commit. +- Status: migrated, root-verified, committed, and pushed. - Completed: - Base messaging, NATS backend, producer, consumer, subscriber, request/response, runtime factories, processor programs, flow specs, and @@ -102,6 +102,37 @@ Notes: - `cd ts && bun run build` - `git diff --check` +### 2026-06-02: ConfigService Ref-Backed State Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/config/service.ts` now models runtime state as a + `SynchronizedRef` instead of adding mutable + `store`, `version`, consumer, and producer fields onto the processor + object. + - Config operations have Effect-returning handlers with Promise facades only + on the exported compatibility methods. + - Request narrowing now uses `effect/Predicate` rather than request-record + type assertions. + - Persistence remains schema-backed and now reads/writes snapshots from the + ref-backed state. + - The consume loop now uses `Effect.whileLoop`; the remaining + `consumer.receive(2000)` call is a pubsub boundary for this service. + - Service startup now exposes `runMain()` through `NodeRuntime.runMain`. + The legacy `run()` Promise facade uses `ManagedRuntime`, and + `ts/scripts/run-config.ts` delegates directly to `runMain()` instead of + owning its own catch/process-exit wrapper. + - Config-service tests cover tagged invalid mutation errors, workspace + persistence, legacy load, concurrent ref-backed mutations, and push + publishing from the stored producer handle. +- Verification: + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -111,8 +142,9 @@ Notes: the client API is less Promise-first. - MCP env is now Config-backed; continue that policy for future MCP settings. - Flow stateful services: - - Config, librarian, cores, and flow-manager still have mutable poller - service objects. These remain good candidates for `Context` services, + - Config service ref-backed state is complete. Librarian, cores, and + flow-manager still have mutable poller service objects. These remain good + candidates for `Context` services, scoped layers, `Ref`/`SynchronizedRef`, `Schedule`, and managed persistence. - Persistence IO should move toward `FileSystem` or `KeyValueStore` where @@ -140,10 +172,9 @@ Notes: ## Ranked Findings -### P0: Convert Stateful Flow Services To Scoped Effect Services +### P0: Continue Stateful Flow Services To Scoped Effect Services - TrustGraph evidence: - - `ts/packages/flow/src/config/service.ts` - `ts/packages/flow/src/librarian/service.ts` - `ts/packages/flow/src/cores/service.ts` - `ts/packages/flow/src/flow-manager/service.ts` @@ -152,8 +183,12 @@ Notes: `Effect.addFinalizer`, `Config`, `Schema`, `FileSystem`, `KeyValueStore`. - Rewrite shape: - - Model one service at a time as a `Context` service plus scoped layer. + - Model one remaining service at a time as a `Context` service plus scoped + layer or ref-backed state slice. - Store mutable service state in `Ref` or `SynchronizedRef`. + - Run service main programs with platform runtime entrypoints such as + `NodeRuntime.runMain`; keep `ManagedRuntime` only for compatibility + Promise facades. - Replace polling sleep loops with schedules where behavior allows. - Decode persisted payloads and config with schemas at boundaries. - Tests: @@ -267,8 +302,8 @@ Notes: ## Recommended PR Order -1. Config service scoped state migration. -2. RAG and agent requestor bridge removal. +1. RAG and agent requestor bridge removal. +2. Librarian, cores, or flow-manager scoped state migration. 3. Client RPC managed runtime/scoped layer cleanup. 4. Base processor registry and constructor shim redesign. 5. Gateway RPC callback and client streaming completion cleanup. diff --git a/ts/packages/flow/src/__tests__/config-service.test.ts b/ts/packages/flow/src/__tests__/config-service.test.ts index c74f6e62..5b5657ef 100644 --- a/ts/packages/flow/src/__tests__/config-service.test.ts +++ b/ts/packages/flow/src/__tests__/config-service.test.ts @@ -1,7 +1,9 @@ import { mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { Effect, SynchronizedRef } from "effect"; import { describe, expect, it } from "vitest"; +import { topics } from "@trustgraph/base"; import { ConfigServiceError, makeConfigService, @@ -16,9 +18,15 @@ import type { } from "@trustgraph/base"; class NoopPubSub implements PubSubBackend { - async createProducer(_options: CreateProducerOptions): Promise> { + readonly sentByTopic = new Map>(); + + async createProducer(options: CreateProducerOptions): Promise> { return { - send: async () => undefined, + send: async (message) => { + const sent = this.sentByTopic.get(options.topic) ?? []; + sent.push(message); + this.sentByTopic.set(options.topic, sent); + }, flush: async () => undefined, close: async () => undefined, }; @@ -48,10 +56,12 @@ const makeService = (persistPath?: string) => describe("ConfigService operations", () => { it("uses tagged errors for invalid mutations", async () => { const service = makeService(); + const putRequest: ConfigRequest = { operation: "put" }; + const deleteRequest: ConfigRequest = { operation: "delete" }; - const putError = await service.handlePut({ operation: "put" } as ConfigRequest) + const putError = await service.handlePut(putRequest) .catch((caught: unknown) => caught); - const deleteError = await service.handleDelete({ operation: "delete" } as ConfigRequest) + const deleteError = await service.handleDelete(deleteRequest) .catch((caught: unknown) => caught); expect(putError).toBeInstanceOf(ConfigServiceError); @@ -64,13 +74,14 @@ describe("ConfigService operations", () => { const dir = await mkdtemp(join(tmpdir(), "trustgraph-config-service-")); const persistPath = join(dir, "config.json"); const service = makeService(persistPath); - - await service.handlePut({ + const putRequest: ConfigRequest = { operation: "put", values: [ { workspace: "alpha", type: "prompt", key: "system", value: "hello" }, ], - } as ConfigRequest); + }; + + await service.handlePut(putRequest); const persisted = await Bun.file(persistPath).json(); await rm(dir, { recursive: true, force: true }); @@ -97,10 +108,11 @@ describe("ConfigService operations", () => { const service = makeService(persistPath); await service.loadFromDisk(); - const response = service.handleGet({ + const getRequest: ConfigRequest = { operation: "get", keys: ["prompt", "system"], - } as ConfigRequest); + }; + const response = service.handleGet(getRequest); await rm(dir, { recursive: true, force: true }); expect(response).toEqual({ @@ -110,4 +122,58 @@ describe("ConfigService operations", () => { }, }); }); + + it("serializes concurrent mutations through ref-backed state", async () => { + const service = makeService(); + const requests: Array = [ + { operation: "put", values: [{ type: "prompt", key: "a", value: "one" }] }, + { operation: "put", values: [{ type: "prompt", key: "b", value: "two" }] }, + { operation: "put", values: [{ workspace: "beta", type: "prompt", key: "c", value: "three" }] }, + ]; + + await Promise.all(requests.map((request) => service.handlePut(request))); + + expect(service.handleGet({ operation: "get", keys: ["prompt"] })).toEqual({ + version: 3, + values: { + a: "one", + b: "two", + }, + }); + expect(service.handleGetValuesAllWorkspaces({ operation: "getvalues-all-ws", keys: ["prompt"] }).values).toEqual([ + { workspace: "default", type: "prompt", key: "a", value: "one" }, + { workspace: "default", type: "prompt", key: "b", value: "two" }, + { workspace: "beta", type: "prompt", key: "c", value: "three" }, + ]); + }); + + it("pushes config from the stored producer handle", async () => { + const backend = new NoopPubSub(); + const service = makeConfigService({ + id: "config-test", + manageProcessSignals: false, + pubsub: backend, + }); + const pushProducer = await backend.createProducer<{ + readonly version: number; + readonly config: Record; + }>({ topic: topics.configPush }); + + await Effect.runPromise( + SynchronizedRef.update(service.state, (state) => ({ + ...state, + pushProducer, + })), + ); + await service.pushConfig(); + await service.handlePut({ + operation: "put", + values: [{ type: "prompt", key: "system", value: "hello" }], + }); + + expect(backend.sentByTopic.get(topics.configPush)).toEqual([ + { version: 0, config: {} }, + { version: 1, config: { prompt: { system: "hello" } } }, + ]); + }); }); diff --git a/ts/packages/flow/src/config/service.ts b/ts/packages/flow/src/config/service.ts index 3c61ffd9..6e5c6f13 100644 --- a/ts/packages/flow/src/config/service.ts +++ b/ts/packages/flow/src/config/service.ts @@ -1,43 +1,40 @@ /** * Config service — manages system global configuration state. * - * An AsyncProcessor (NOT FlowProcessor) that: - * 1. Listens on config-request topic - * 2. Handles operations: get, put, delete, list, config (full dump) - * 3. Stores config in-memory with a nested Map structure - * 4. On any mutation: increments version, broadcasts ConfigPush on config-push topic - * 5. Optionally persists to a JSON file for restart durability - * * Python reference: trustgraph-flow/trustgraph/config/service/service.py */ -import { Context, Duration, Effect } from "effect"; +import {NodeRuntime} from "@effect/platform-node"; +import {Duration, Effect, Layer, ManagedRuntime, SynchronizedRef} from "effect"; +import * as Predicate from "effect/Predicate"; import * as S from "effect/Schema"; import { - makeAsyncProcessor, - type ProcessorConfig, - type AsyncProcessorRuntime, - topics, ConfigRequest as ConfigRequestSchema, ConfigResponse as ConfigResponseSchema, - type ConfigRequest, - type ConfigResponse, - type ConfigOperation, errorMessage, loadProcessorRuntimeConfig, + makeAsyncProcessor, makeProcessorProgram, optionalStringConfig, + topics, + type AsyncProcessorRuntime, + type BackendConsumer, + type BackendProducer, + type ConfigOperation, + type ConfigRequest, + type ConfigResponse, + type Message, + type ProcessorConfig, } from "@trustgraph/base"; -import type { Message } from "@trustgraph/base"; -import { readTextFile, writeTextFile } from "../runtime/effect-files.js"; +import {readTextFile, writeTextFile} from "../runtime/effect-files.js"; export interface ConfigServiceConfig extends ProcessorConfig { - persistPath?: string; + readonly persistPath?: string; } interface ConfigPush { - version: number; - config: Record; + readonly version: number; + readonly config: Record; } const ConfigPushSchema = S.Struct({ @@ -62,19 +59,28 @@ const configServiceError = (operation: string, cause: unknown): ConfigServiceErr const DEFAULT_WORKSPACE = "default"; interface ConfigKeyLike { - type: string; - key?: string; + readonly type: string; + readonly key?: string; } interface ConfigValueLike { - workspace?: string; - type: string; - key: string; - value: unknown; + readonly workspace?: string; + readonly type: string; + readonly key: string; + readonly value: unknown; } type NamespaceStore = Map; type WorkspaceStore = Map; +type WorkspaceSnapshot = Record>>; + +interface ConfigServiceState { + readonly version: number; + readonly store: Map; + readonly consumer: BackendConsumer | null; + readonly responseProducer: BackendProducer | null; + readonly pushProducer: BackendProducer | null; +} const PersistedConfigSchema = S.Struct({ version: S.optionalKey(S.Number), @@ -84,25 +90,73 @@ const PersistedConfigSchema = S.Struct({ const PersistedConfigJsonSchema = PersistedConfigSchema.pipe(S.fromJsonString); type PersistedConfig = typeof PersistedConfigSchema.Type; -function isRecord(value: unknown): value is Record { - return typeof value === "object" && value !== null && !Array.isArray(value); +export interface ConfigService extends AsyncProcessorRuntime { + readonly state: SynchronizedRef.SynchronizedRef; + readonly persistPath: string | null; + readonly handleMessage: (msg: Message) => Promise; + readonly handleMessageEffect: (msg: Message) => Effect.Effect; + readonly handleOperation: (request: ConfigRequest) => Promise; + readonly handleOperationEffect: (request: ConfigRequest) => Effect.Effect; + readonly handleGet: (request: ConfigRequest) => ConfigResponse; + readonly handlePut: (request: ConfigRequest) => Promise; + readonly handlePutEffect: (request: ConfigRequest) => Effect.Effect; + readonly handleDelete: (request: ConfigRequest) => Promise; + readonly handleDeleteEffect: (request: ConfigRequest) => Effect.Effect; + readonly handleList: (request: ConfigRequest) => ConfigResponse; + readonly handleGetValues: (request: ConfigRequest) => ConfigResponse; + readonly handleGetValuesAllWorkspaces: (request: ConfigRequest) => ConfigResponse; + readonly handleConfigDump: (request: ConfigRequest) => ConfigResponse; + readonly pushConfig: () => Promise; + readonly pushConfigEffect: Effect.Effect; + readonly persist: () => Promise; + readonly persistEffect: Effect.Effect; + readonly loadFromDisk: () => Promise; + readonly loadFromDiskEffect: Effect.Effect; } -function optionalString(value: unknown): string | undefined { - return typeof value === "string" && value.length > 0 ? value : undefined; -} +const initialState = (): ConfigServiceState => ({ + version: 0, + store: new Map(), + consumer: null, + responseProducer: null, + pushProducer: null, +}); -function toPersistedWorkspaces( +const cloneNamespaceStore = (source: NamespaceStore): NamespaceStore => { + const next = new Map(); + for (const [key, value] of source) { + next.set(key, value); + } + return next; +}; + +const cloneWorkspaceStore = (source: WorkspaceStore): WorkspaceStore => { + const next = new Map(); + for (const [namespace, subMap] of source) { + next.set(namespace, cloneNamespaceStore(subMap)); + } + return next; +}; + +const cloneConfigStore = (source: Map): Map => { + const next = new Map(); + for (const [workspace, ws] of source) { + next.set(workspace, cloneWorkspaceStore(ws)); + } + return next; +}; + +const toPersistedWorkspaces = ( store: Map, -): Record>> { - const workspaces: Record>> = {}; +): WorkspaceSnapshot => { + const workspaces: WorkspaceSnapshot = {}; for (const [workspace, ws] of store) { const workspaceData: Record> = {}; for (const [namespace, subMap] of ws) { const obj: Record = {}; - for (const [k, v] of subMap) { - obj[k] = v; + for (const [key, value] of subMap) { + obj[key] = value; } workspaceData[namespace] = obj; } @@ -110,729 +164,723 @@ function toPersistedWorkspaces( } return workspaces; -} +}; -function hydratePersistedConfig( - store: Map, - parsed: PersistedConfig, -): void { - store.clear(); +const storeFromPersistedConfig = (parsed: PersistedConfig): Map => { + const store = new Map(); if (parsed.workspaces !== undefined) { for (const [workspace, namespaces] of Object.entries(parsed.workspaces)) { const ws = new Map(); for (const [namespace, obj] of Object.entries(namespaces)) { const subMap = new Map(); - for (const [k, v] of Object.entries(obj)) { - subMap.set(k, v); + for (const [key, value] of Object.entries(obj)) { + subMap.set(key, value); } ws.set(namespace, subMap); } store.set(workspace, ws); } - return; + return store; } const ws = new Map(); for (const [namespace, obj] of Object.entries(parsed.data ?? {})) { const subMap = new Map(); - for (const [k, v] of Object.entries(obj)) { - subMap.set(k, v); + for (const [key, value] of Object.entries(obj)) { + subMap.set(key, value); } ws.set(namespace, subMap); } store.set(DEFAULT_WORKSPACE, ws); -} + return store; +}; -export type ConfigService = AsyncProcessorRuntime & Record; +const optionalString = (value: unknown): string | undefined => + Predicate.isString(value) && value.length > 0 ? value : undefined; + +const requestProperty = (request: ConfigRequest, property: string): unknown => + Predicate.hasProperty(request, property) ? request[property] : undefined; + +const rawKeys = (request: ConfigRequest): ReadonlyArray => { + const keys = requestProperty(request, "keys"); + return Array.isArray(keys) ? keys : []; +}; + +const stringKeys = (request: ConfigRequest): Array => + rawKeys(request).filter(Predicate.isString); + +const objectKeys = (request: ConfigRequest): Array => + rawKeys(request).flatMap((key) => { + if (!Predicate.isObject(key)) return []; + const type = optionalString(key.type); + if (type === undefined) return []; + const keyValue = optionalString(key.key); + return [ + keyValue === undefined + ? {type} + : {type, key: keyValue}, + ]; + }); + +const workspaceFor = (request: ConfigRequest): string => + optionalString(requestProperty(request, "workspace")) ?? DEFAULT_WORKSPACE; + +const requestType = (request: ConfigRequest): string | undefined => + optionalString(requestProperty(request, "type")) ?? stringKeys(request)[0]; + +const configValues = (request: ConfigRequest): Array => { + const rawValues = requestProperty(request, "values"); + const workspace = workspaceFor(request); + + if (Array.isArray(rawValues)) { + return rawValues.flatMap((value) => { + if (!Predicate.isObject(value)) return []; + const type = optionalString(value.type); + const key = optionalString(value.key); + if (type === undefined || key === undefined) return []; + return [{ + workspace: optionalString(value.workspace) ?? workspace, + type, + key, + value: value.value, + }]; + }); + } + + if (Predicate.isObject(rawValues)) { + const namespace = requestType(request); + if (namespace === undefined) return []; + return Object.entries(rawValues).map(([key, value]) => ({ + workspace, + type: namespace, + key, + value, + })); + } + + return []; +}; + +const getWorkspaceStore = ( + state: ConfigServiceState, + workspace: string, +): WorkspaceStore | undefined => + state.store.get(workspace); + +const getNamespaceStore = ( + state: ConfigServiceState, + workspace: string, + namespace: string, +): NamespaceStore | undefined => + getWorkspaceStore(state, workspace)?.get(namespace); + +const getOrCreateWorkspaceStore = ( + store: Map, + workspace: string, +): WorkspaceStore => { + const existing = store.get(workspace); + if (existing !== undefined) return existing; + const created = new Map(); + store.set(workspace, created); + return created; +}; + +const getOrCreateNamespaceStore = ( + store: Map, + workspace: string, + namespace: string, +): NamespaceStore => { + const ws = getOrCreateWorkspaceStore(store, workspace); + const existing = ws.get(namespace); + if (existing !== undefined) return existing; + const created = new Map(); + ws.set(namespace, created); + return created; +}; + +const configDumpForState = ( + state: ConfigServiceState, + workspace: string = DEFAULT_WORKSPACE, +): Record => { + const config: Record = {}; + const ws = getWorkspaceStore(state, workspace); + + if (ws === undefined) return config; + + for (const [namespace, subMap] of ws) { + const obj: Record = {}; + for (const [key, value] of subMap) { + obj[key] = value; + } + config[namespace] = obj; + } + + return config; +}; + +const stateSnapshot = (stateRef: SynchronizedRef.SynchronizedRef): ConfigServiceState => + SynchronizedRef.getUnsafe(stateRef); + +const updateHandles = ( + stateRef: SynchronizedRef.SynchronizedRef, + handles: { + readonly consumer?: BackendConsumer | null; + readonly responseProducer?: BackendProducer | null; + readonly pushProducer?: BackendProducer | null; + }, +) => + SynchronizedRef.updateAndGet(stateRef, (state) => ({ + ...state, + consumer: handles.consumer === undefined ? state.consumer : handles.consumer, + responseProducer: handles.responseProducer === undefined ? state.responseProducer : handles.responseProducer, + pushProducer: handles.pushProducer === undefined ? state.pushProducer : handles.pushProducer, + })); + +const persistStateEffect = ( + persistPath: string | null, + state: ConfigServiceState, +): Effect.Effect => + Effect.gen(function* () { + if (persistPath === null) return; + const payload = { + version: state.version, + workspaces: toPersistedWorkspaces(state.store), + }; + + const json = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(payload).pipe( + Effect.mapError((cause) => configServiceError("persist-encode", cause)), + ); + + yield* Effect.tryPromise({ + try: () => writeTextFile(persistPath, json), + catch: (cause) => configServiceError("persist-write", cause), + }); + }).pipe( + Effect.catch((err) => + Effect.logError("[ConfigService] Failed to persist config", {error: err.message}), + ), + ); + +const pushConfigWithStateEffect = ( + state: ConfigServiceState, +): Effect.Effect => + Effect.gen(function* () { + const pushProducer = state.pushProducer; + if (pushProducer === null) return; + + yield* Effect.tryPromise({ + try: () => + pushProducer.send({ + version: state.version, + config: configDumpForState(state), + }), + catch: (cause) => configServiceError("push-config", cause), + }); + + yield* Effect.log(`[ConfigService] Pushed configuration version ${state.version}`); + }); + +const readPersistedConfigEffect = ( + persistPath: string, +): Effect.Effect => + Effect.gen(function* () { + const raw = yield* Effect.tryPromise({ + try: () => readTextFile(persistPath), + catch: (cause) => configServiceError("persist-read", cause), + }); + return yield* S.decodeUnknownEffect(PersistedConfigJsonSchema)(raw).pipe( + Effect.mapError((cause) => configServiceError("persist-decode", cause)), + ); + }).pipe( + Effect.catch(() => + Effect.log("[ConfigService] No persisted config found, starting fresh").pipe( + Effect.flatMap(() => Effect.succeed(null)), + ) + ), + ); + +const handleGetWithState = ( + state: ConfigServiceState, + request: ConfigRequest, +): ConfigResponse => { + const workspace = workspaceFor(request); + const keysByObject = objectKeys(request); + + if (keysByObject.length > 0) { + const values = keysByObject.map((key) => ({ + type: key.type, + key: key.key ?? "", + value: key.key !== undefined + ? getNamespaceStore(state, workspace, key.type)?.get(key.key) + : undefined, + })); + return {version: state.version, values}; + } + + const keys = stringKeys(request); + if (keys.length === 0) { + return {version: state.version, values: {}}; + } + + const values: Record = {}; + const namespace = keys[0]; + const subMap = getNamespaceStore(state, workspace, namespace); + + if (subMap !== undefined) { + if (keys.length === 1) { + for (const [key, value] of subMap) { + values[key] = value; + } + } else { + for (const key of keys.slice(1)) { + if (subMap.has(key)) { + values[key] = subMap.get(key); + } + } + } + } + + return {version: state.version, values}; +}; + +const applyPut = ( + state: ConfigServiceState, + values: ReadonlyArray, +): ConfigServiceState => { + const store = cloneConfigStore(state.store); + + for (const item of values) { + getOrCreateNamespaceStore(store, item.workspace ?? DEFAULT_WORKSPACE, item.type) + .set(item.key, item.value); + } + + return { + ...state, + store, + version: state.version + 1, + }; +}; + +const applyDeleteObjectKeys = ( + state: ConfigServiceState, + workspace: string, + keys: ReadonlyArray, +): ConfigServiceState => { + const store = cloneConfigStore(state.store); + const ws = store.get(workspace); + + if (ws !== undefined) { + for (const key of keys) { + if (key.key === undefined) { + ws.delete(key.type); + } else { + const ns = ws.get(key.type); + ns?.delete(key.key); + if (ns !== undefined && ns.size === 0) { + ws.delete(key.type); + } + } + } + } + + return { + ...state, + store, + version: state.version + 1, + }; +}; + +const applyDeleteStringKeys = ( + state: ConfigServiceState, + workspace: string, + keys: ReadonlyArray, +): ConfigServiceState => { + const store = cloneConfigStore(state.store); + const namespace = keys[0]; + const ws = store.get(workspace); + + if (ws === undefined) return state; + + if (keys.length === 1) { + ws.delete(namespace); + } else { + const subMap = ws.get(namespace); + if (subMap !== undefined) { + for (const key of keys.slice(1)) { + subMap.delete(key); + } + if (subMap.size === 0) { + ws.delete(namespace); + } + } + } + + return { + ...state, + store, + version: state.version + 1, + }; +}; + +const handlePutEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + persistPath: string | null, + request: ConfigRequest, +): Effect.Effect => + Effect.gen(function* () { + const values = configValues(request); + if (values.length === 0) return yield* configServiceError("put", "Put requires config values"); + + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => applyPut(state, values)); + yield* persistStateEffect(persistPath, next); + yield* pushConfigWithStateEffect(next); + + return {version: next.version}; + }); + +const handleDeleteEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + persistPath: string | null, + request: ConfigRequest, +): Effect.Effect => + Effect.gen(function* () { + const workspace = workspaceFor(request); + const keysByObject = objectKeys(request); + + if (keysByObject.length > 0) { + const next = yield* SynchronizedRef.updateAndGet( + stateRef, + (state) => applyDeleteObjectKeys(state, workspace, keysByObject), + ); + yield* persistStateEffect(persistPath, next); + yield* pushConfigWithStateEffect(next); + return {version: next.version}; + } + + const keys = stringKeys(request); + if (keys.length === 0) { + return yield* configServiceError("delete", "Delete requires at least one key"); + } + + const previous = yield* SynchronizedRef.get(stateRef); + if (getWorkspaceStore(previous, workspace) === undefined) { + return {version: previous.version}; + } + + const next = yield* SynchronizedRef.updateAndGet( + stateRef, + (state) => applyDeleteStringKeys(state, workspace, keys), + ); + yield* persistStateEffect(persistPath, next); + yield* pushConfigWithStateEffect(next); + return {version: next.version}; + }); + +const handleListWithState = ( + state: ConfigServiceState, + request: ConfigRequest, +): ConfigResponse => { + const workspace = workspaceFor(request); + const ws = getWorkspaceStore(state, workspace); + const namespace = requestType(request); + + if (namespace === undefined) { + return { + version: state.version, + directory: ws !== undefined ? [...ws.keys()] : [], + }; + } + + const subMap = ws?.get(namespace); + return { + version: state.version, + directory: subMap !== undefined ? [...subMap.keys()] : [], + }; +}; + +const handleGetValuesWithState = ( + state: ConfigServiceState, + request: ConfigRequest, +): ConfigResponse => { + const workspace = workspaceFor(request); + const type = requestType(request) ?? ""; + const ws = getWorkspaceStore(state, workspace); + const values: Array<{type: string; key: string; value: unknown}> = []; + + if (ws !== undefined) { + for (const [namespace, subMap] of ws) { + if (type.length > 0 && namespace !== type) continue; + for (const [key, value] of subMap) { + values.push({type: namespace, key, value}); + } + } + } + + return {version: state.version, values}; +}; + +const handleGetValuesAllWorkspacesWithState = ( + state: ConfigServiceState, + request: ConfigRequest, +): ConfigResponse => { + const type = requestType(request) ?? ""; + const values: Array<{workspace: string; type: string; key: string; value: unknown}> = []; + + for (const [workspace, ws] of state.store) { + for (const [namespace, subMap] of ws) { + if (type.length > 0 && namespace !== type) continue; + for (const [key, value] of subMap) { + values.push({workspace, type: namespace, key, value}); + } + } + } + + return {version: state.version, values}; +}; + +const handleConfigDumpWithState = ( + state: ConfigServiceState, + request: ConfigRequest, +): ConfigResponse => ({ + version: state.version, + config: configDumpForState(state, workspaceFor(request)), +}); + +const closeConfigResourcesEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, +): Effect.Effect => + Effect.gen(function* () { + const state = yield* SynchronizedRef.get(stateRef); + + const consumer = state.consumer; + if (consumer !== null) { + yield* Effect.tryPromise({ + try: () => consumer.close(), + catch: (cause) => configServiceError("close-consumer", cause), + }); + } + const responseProducer = state.responseProducer; + if (responseProducer !== null) { + yield* Effect.tryPromise({ + try: () => responseProducer.close(), + catch: (cause) => configServiceError("close-response-producer", cause), + }); + } + const pushProducer = state.pushProducer; + if (pushProducer !== null) { + yield* Effect.tryPromise({ + try: () => pushProducer.close(), + catch: (cause) => configServiceError("close-push-producer", cause), + }); + } + + yield* updateHandles(stateRef, { + consumer: null, + responseProducer: null, + pushProducer: null, + }); + }); + +const consumeOnceEffect = ( + service: ConfigService, +): Effect.Effect => + Effect.gen(function* () { + const state = yield* SynchronizedRef.get(service.state); + const consumer = state.consumer; + if (consumer === null) { + return yield* configServiceError("consume", "Config consumer not started"); + } + + const msg = yield* Effect.tryPromise({ + try: () => consumer.receive(2000), + catch: (cause) => configServiceError("consume-receive", cause), + }); + if (msg === null) return; + + yield* service.handleMessageEffect(msg); + yield* Effect.tryPromise({ + try: () => consumer.acknowledge(msg), + catch: (cause) => configServiceError("consume-acknowledge", cause), + }); + }); + +const runConfigServiceEffect = ( + service: ConfigService, +): Effect.Effect => + Effect.gen(function* () { + yield* service.loadFromDiskEffect; + + const responseProducer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createProducer({ + topic: topics.configResponse, + schema: ConfigResponseSchema, + }), + catch: (cause) => configServiceError("response-producer", cause), + }); + yield* updateHandles(service.state, {responseProducer}); + + const pushProducer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createProducer({ + topic: topics.configPush, + schema: ConfigPushSchema, + }), + catch: (cause) => configServiceError("push-producer", cause), + }); + yield* updateHandles(service.state, {pushProducer}); + + const consumer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createConsumer({ + topic: topics.configRequest, + subscription: `${service.config.id}-config-request`, + schema: ConfigRequestSchema, + }), + catch: (cause) => configServiceError("consumer", cause), + }); + const state = yield* updateHandles(service.state, {consumer}); + + yield* pushConfigWithStateEffect(state); + yield* Effect.log(`[ConfigService] Listening on ${topics.configRequest}`); + + yield* Effect.whileLoop({ + while: () => service.running, + body: () => + consumeOnceEffect(service).pipe( + Effect.catch((err) => { + if (!service.running) return Effect.void; + return Effect.logError("[ConfigService] Error in consume loop", {error: err.message}).pipe( + Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), + ); + }), + ), + step: () => undefined, + }); + }); export function makeConfigService(config: ConfigServiceConfig): ConfigService { - const service = makeAsyncProcessor(config, { - run: () => service.run(Context.empty()), - }) as ConfigService; - const baseStop = service.stop; - service.store = new Map(); - service.version = 0; - service.consumer = null; - service.responseProducer = null; - service.pushProducer = null; - service.persistPath = config.persistPath ?? null; - Object.assign(service, { + const state = SynchronizedRef.makeUnsafe(initialState()); + let service: ConfigService | undefined; + const getService = Effect.sync(() => service).pipe( + Effect.flatMap((current) => + current === undefined + ? Effect.fail(configServiceError("service", "Config service not initialized")) + : Effect.succeed(current) + ), + ); - run: function(this: ConfigService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - // Optionally load persisted state - if (service.persistPath !== null) { - yield* Effect.tryPromise({ - try: () => service.loadFromDisk(), - catch: (cause) => configServiceError("load", cause), - }); - } + const base = makeAsyncProcessor(config, { + runEffect: () => getService.pipe(Effect.flatMap(runConfigServiceEffect)), + }); + const baseStop = base.stop; + const persistPath = config.persistPath ?? null; - // Create producers - service.responseProducer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createProducer({ - topic: topics.configResponse, - schema: ConfigResponseSchema, - }), - catch: (cause) => configServiceError("response-producer", cause), - }); - service.pushProducer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createProducer({ - topic: topics.configPush, - schema: ConfigPushSchema, - }), - catch: (cause) => configServiceError("push-producer", cause), - }); + const handleOperationEffect = (request: ConfigRequest) => { + const op: ConfigOperation = request.operation; - // Create consumer for config requests - service.consumer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createConsumer({ - topic: topics.configRequest, - subscription: `${service.config.id}-config-request`, - schema: ConfigRequestSchema, - }), - catch: (cause) => configServiceError("consumer", cause), - }); + switch (op) { + case "get": + return Effect.succeed(handleGetWithState(stateSnapshot(state), request)); + case "put": + return handlePutEffect(state, persistPath, request); + case "delete": + return handleDeleteEffect(state, persistPath, request); + case "list": + return Effect.succeed(handleListWithState(stateSnapshot(state), request)); + case "config": + return Effect.succeed(handleConfigDumpWithState(stateSnapshot(state), request)); + case "getvalues": + return Effect.succeed(handleGetValuesWithState(stateSnapshot(state), request)); + case "getvalues-all-ws": + return Effect.succeed(handleGetValuesAllWorkspacesWithState(stateSnapshot(state), request)); + } + }; - // Push initial config - yield* Effect.tryPromise({ - try: () => service.pushConfig(), - catch: (cause) => configServiceError("push-initial-config", cause), - }); + const handleMessageEffect = Effect.fn("handleMessageEffect")(function* (msg: Message) { + const request = yield* S.decodeUnknownEffect(ConfigRequestSchema)(msg.value()).pipe( + Effect.mapError((cause) => configServiceError("decode", cause)), + ); + const requestId = msg.properties().id; - yield* Effect.log(`[ConfigService] Listening on ${topics.configRequest}`); + if (requestId === undefined || requestId.length === 0) { + yield* Effect.logWarning("[ConfigService] Received request without id, ignoring"); + return; + } - // Main consume loop - while (service.running) { - const shouldContinue = yield* Effect.gen(function* () { - const consumer = service.consumer; - if (consumer === null) { - return yield* configServiceError("consume", "Config consumer not started"); - } - - const msg = yield* Effect.tryPromise({ - try: () => consumer.receive(2000), - catch: (cause) => configServiceError("consume-receive", cause), - }); - if (msg === null) return true; - - yield* Effect.tryPromise({ - try: () => service.handleMessage(msg), - catch: (cause) => configServiceError("consume-handle", cause), - }); - yield* Effect.tryPromise({ - try: () => consumer.acknowledge(msg), - catch: (cause) => configServiceError("consume-acknowledge", cause), - }); - - return true; - }).pipe( - Effect.catch((err) => { - if (!service.running) return Effect.succeed(false); - return Effect.logError("[ConfigService] Error in consume loop", { error: err.message }).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), - Effect.as(true), - ); - }), - ); - if (!shouldContinue) break; - } - }), - ); - - }, - - - - handleMessage: function(this: ConfigService, msg: Message): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const request = yield* S.decodeUnknownEffect(ConfigRequestSchema)(msg.value()).pipe( - Effect.mapError((cause) => configServiceError("decode", cause)), - ); - const props = msg.properties(); - const requestId = props.id; - - if (requestId === undefined || requestId.length === 0) { - yield* Effect.logWarning("[ConfigService] Received request without id, ignoring"); - return; - } - - const sendResponse = (response: ConfigResponse): Effect.Effect => - Effect.gen(function* () { - const responseProducer = service.responseProducer; - if (responseProducer === null) { - return yield* configServiceError("respond", "Config response producer not started"); - } - yield* Effect.tryPromise({ - try: () => responseProducer.send(response, { id: requestId }), - catch: (cause) => configServiceError("respond", cause), - }); - }); - - yield* Effect.gen(function* () { - const response = yield* Effect.tryPromise({ - try: () => service.handleOperation(request), - catch: (cause) => configServiceError("operation", cause), - }); - yield* sendResponse(response); - }).pipe( - Effect.catch((err) => - sendResponse({ - error: { type: "config-error", message: err.message }, - }), - ), - ); - }), - ); - - }, - - - - handleOperation: function(this: ConfigService, request: ConfigRequest): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const op: ConfigOperation = request.operation; - - switch (op) { - case "get": - return service.handleGet(request); - - case "put": - return yield* Effect.tryPromise({ - try: () => service.handlePut(request), - catch: (cause) => configServiceError("put", cause), - }); - - case "delete": - return yield* Effect.tryPromise({ - try: () => service.handleDelete(request), - catch: (cause) => configServiceError("delete", cause), - }); - - case "list": - return service.handleList(request); - - case "config": - return service.handleConfigDump(request); - - case "getvalues": - return service.handleGetValues(request); - - case "getvalues-all-ws": - return service.handleGetValuesAllWorkspaces(request); - - default: - return yield* configServiceError("operation", `Unknown config operation: ${op as string}`); - } - }), - ); - - }, - - - - requestRecord: function(this: ConfigService, request: ConfigRequest): Record { - return request as Record; - - }, - - - - workspaceFor: function(this: ConfigService, request: ConfigRequest): string { - return optionalString(this.requestRecord(request).workspace) ?? DEFAULT_WORKSPACE; - - }, - - - - workspaceStore: function(this: ConfigService, workspace: string, create: boolean): WorkspaceStore | undefined { - let store = this.store.get(workspace); - if (store === undefined && create) { - store = new Map(); - this.store.set(workspace, store); - } - return store; - - }, - - - - namespaceStore: function(this: ConfigService, workspace: string, namespace: string, create: boolean): NamespaceStore | undefined { - const ws = this.workspaceStore(workspace, create); - if (ws === undefined) return undefined; - - let ns = ws.get(namespace); - if (ns === undefined && create) { - ns = new Map(); - ws.set(namespace, ns); - } - return ns; - - }, - - - - rawKeys: function(this: ConfigService, request: ConfigRequest): unknown[] { - const keys = this.requestRecord(request).keys; - return Array.isArray(keys) ? keys : []; - - }, - - - - stringKeys: function(this: ConfigService, request: ConfigRequest): string[] { - return (this.rawKeys(request) as unknown[]).filter((key: unknown): key is string => typeof key === "string"); - - }, - - - - objectKeys: function(this: ConfigService, request: ConfigRequest): ConfigKeyLike[] { - return (this.rawKeys(request) as unknown[]).flatMap((key: unknown) => { - if (!isRecord(key)) return []; - const type = optionalString(key.type); - if (type === undefined) return []; - const item: ConfigKeyLike = { type }; - const keyValue = optionalString(key.key); - if (keyValue !== undefined) item.key = keyValue; - return [item]; + const sendResponse = (response: ConfigResponse): Effect.Effect => + Effect.gen(function* () { + const responseProducer = (yield* SynchronizedRef.get(state)).responseProducer; + if (responseProducer === null) { + return yield* configServiceError("respond", "Config response producer not started"); + } + yield* Effect.tryPromise({ + try: () => responseProducer.send(response, {id: requestId}), + catch: (cause) => configServiceError("respond", cause), + }); }); - }, - - - - requestType: function(this: ConfigService, request: ConfigRequest): string | undefined { - return optionalString(this.requestRecord(request).type) ?? this.stringKeys(request)[0]; - - }, - - - - configValues: function(this: ConfigService, request: ConfigRequest): ConfigValueLike[] { - const req = this.requestRecord(request); - const rawValues = req.values; - const workspace = this.workspaceFor(request); - - if (Array.isArray(rawValues)) { - return rawValues.flatMap((value) => { - if (!isRecord(value)) return []; - const type = optionalString(value.type); - const key = optionalString(value.key); - if (type === undefined || key === undefined) return []; - return [{ - workspace: optionalString(value.workspace) ?? workspace, - type, - key, - value: value.value, - }]; - }); - } - - if (isRecord(rawValues)) { - const namespace = this.requestType(request); - if (namespace === undefined) return []; - return Object.entries(rawValues).map(([key, value]) => ({ - workspace, - type: namespace, - key, - value, - })); - } - - return []; - - }, - - - - handleGet: function(this: ConfigService, request: ConfigRequest): ConfigResponse { - const workspace = this.workspaceFor(request); - const objectKeys = this.objectKeys(request) as ConfigKeyLike[]; - - if (objectKeys.length > 0) { - const values = objectKeys.map((key) => ({ - type: key.type, - key: key.key ?? "", - value: key.key !== undefined - ? this.namespaceStore(workspace, key.type, false)?.get(key.key) - : undefined, - })); - return { version: this.version, values }; - } - - const keys = this.stringKeys(request) as string[]; - if (keys.length === 0) { - return { version: this.version, values: {} }; - } - - const values: Record = {}; - const namespace = keys[0]; - const subMap = this.namespaceStore(workspace, namespace, false) as NamespaceStore | undefined; - - if (subMap !== undefined) { - if (keys.length === 1) { - // Return entire namespace - for (const [k, v] of subMap) { - values[k] = v; - } - } else { - // Return specific keys within namespace - for (let i = 1; i < keys.length; i++) { - const key = keys[i]; - if (key !== undefined && subMap.has(key)) { - values[key] = subMap.get(key); - } - } - } - } - - return { version: this.version, values }; - - }, - - - - handlePut: function(this: ConfigService, request: ConfigRequest): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const values = service.configValues(request); - if (values.length === 0) return yield* configServiceError("put", "Put requires config values"); - - for (const item of values) { - service.namespaceStore(item.workspace ?? DEFAULT_WORKSPACE, item.type, true)?.set(item.key, item.value); - } - - service.version++; - yield* Effect.tryPromise({ - try: () => service.persist(), - catch: (cause) => configServiceError("persist", cause), - }); - yield* Effect.tryPromise({ - try: () => service.pushConfig(), - catch: (cause) => configServiceError("push-config", cause), - }); - - return { version: service.version }; - }), - ); - - }, - - - - handleDelete: function(this: ConfigService, request: ConfigRequest): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const workspace = service.workspaceFor(request); - const objectKeys = service.objectKeys(request); - if (objectKeys.length > 0) { - for (const key of objectKeys) { - const ws = service.workspaceStore(workspace, false); - if (ws === undefined) continue; - if (key.key === undefined) { - ws.delete(key.type); - } else { - const ns = ws.get(key.type); - ns?.delete(key.key); - if (ns !== undefined && ns.size === 0) ws.delete(key.type); - } - } - - service.version++; - yield* Effect.tryPromise({ - try: () => service.persist(), - catch: (cause) => configServiceError("persist", cause), - }); - yield* Effect.tryPromise({ - try: () => service.pushConfig(), - catch: (cause) => configServiceError("push-config", cause), - }); - return { version: service.version }; - } - - const keys = service.stringKeys(request); - if (keys.length === 0) { - return yield* configServiceError("delete", "Delete requires at least one key"); - } - - const namespace = keys[0]; - const ws = service.workspaceStore(workspace, false); - if (ws === undefined) return { version: service.version }; - - if (keys.length === 1) { - // Delete entire namespace - ws.delete(namespace); - } else { - // Delete specific keys within namespace - const subMap = ws.get(namespace); - if (subMap !== undefined) { - for (let i = 1; i < keys.length; i++) { - subMap.delete(keys[i]); - } - if (subMap.size === 0) { - ws.delete(namespace); - } - } - } - - service.version++; - yield* Effect.tryPromise({ - try: () => service.persist(), - catch: (cause) => configServiceError("persist", cause), - }); - yield* Effect.tryPromise({ - try: () => service.pushConfig(), - catch: (cause) => configServiceError("push-config", cause), - }); - - return { version: service.version }; - }), - ); - - }, - - - - handleList: function(this: ConfigService, request: ConfigRequest): ConfigResponse { - const workspace = this.workspaceFor(request); - const ws = this.workspaceStore(workspace, false); - const namespace = this.requestType(request); - - if (namespace === undefined) { - // List all namespaces - return { - version: this.version, - directory: ws !== undefined ? [...ws.keys()] : [], - }; - } - - const subMap = ws?.get(namespace); - - return { - version: this.version, - directory: subMap !== undefined ? [...subMap.keys()] : [], - }; - - }, - - - - handleGetValues: function(this: ConfigService, request: ConfigRequest): ConfigResponse { - const workspace = this.workspaceFor(request); - const type = this.requestType(request) ?? ""; - const ws = this.workspaceStore(workspace, false); - - const values: { type: string; key: string; value: unknown }[] = []; - - for (const [namespace, subMap] of ws ?? new Map()) { - if ( - type.length === 0 || - namespace === type - ) { - for (const [k, v] of subMap) { - values.push({ type: namespace, key: k, value: v }); - } - } - } - - return { version: this.version, values }; - - }, - - - - handleGetValuesAllWorkspaces: function(this: ConfigService, request: ConfigRequest): ConfigResponse { - const type = this.requestType(request) ?? ""; - const values: { workspace: string; type: string; key: string; value: unknown }[] = []; - - for (const [workspace, ws] of this.store) { - for (const [namespace, subMap] of ws) { - if (type.length > 0 && namespace !== type) continue; - for (const [key, value] of subMap) { - values.push({ workspace, type: namespace, key, value }); - } - } - } - - return { version: this.version, values }; - - }, - - - - handleConfigDump: function(this: ConfigService, request: ConfigRequest): ConfigResponse { - const workspace = this.workspaceFor(request); - const ws = this.workspaceStore(workspace, false); - const config: Record = {}; - - for (const [namespace, subMap] of ws ?? new Map()) { - const obj: Record = {}; - for (const [k, v] of subMap) { - obj[k] = v; - } - config[namespace] = obj; - } - - return { - version: this.version, - config, - }; - - }, - - - - pushConfig: function(this: ConfigService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const pushProducer = service.pushProducer; - if (pushProducer === null) return; - - const config: Record = {}; - const ws = service.workspaceStore(DEFAULT_WORKSPACE, false); - for (const [namespace, subMap] of ws ?? new Map()) { - const obj: Record = {}; - for (const [k, v] of subMap) { - obj[k] = v; - } - config[namespace] = obj; - } - - yield* Effect.tryPromise({ - try: () => - pushProducer.send({ - version: service.version, - config, - }), - catch: (cause) => configServiceError("push-config", cause), - }); - - yield* Effect.log(`[ConfigService] Pushed configuration version ${service.version}`); - }), - ); - - }, - - - - persist: function(this: ConfigService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const persistPath = service.persistPath; - if (persistPath === null) return; - const payload = { - version: service.version, - workspaces: toPersistedWorkspaces(service.store), - }; - - const json = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(payload).pipe( - Effect.mapError((cause) => configServiceError("persist-encode", cause)), - ); - - yield* Effect.tryPromise({ - try: () => writeTextFile(persistPath, json), - catch: (cause) => configServiceError("persist-write", cause), - }); - }).pipe( - Effect.catch((err) => - Effect.logError("[ConfigService] Failed to persist config", { error: err.message }), - ), - ), - ); - - }, - - - - loadFromDisk: function(this: ConfigService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const persistPath = service.persistPath; - if (persistPath === null) return; - - const parsed = yield* Effect.gen(function* () { - const raw = yield* Effect.tryPromise({ - try: () => readTextFile(persistPath), - catch: (cause) => configServiceError("persist-read", cause), - }); - return yield* S.decodeUnknownEffect(PersistedConfigJsonSchema)(raw).pipe( - Effect.mapError((cause) => configServiceError("persist-decode", cause)), - ); - }).pipe( - Effect.catch(() => - Effect.log("[ConfigService] No persisted config found, starting fresh").pipe( - Effect.as(null as PersistedConfig | null), - ), - ), - ); - - if (parsed === null) return; - - service.version = parsed.version ?? 0; - hydratePersistedConfig(service.store, parsed); - - yield* Effect.log(`[ConfigService] Loaded persisted config (version=${service.version}, workspaces=${service.store.size})`); - }), - ); - - }, - - - - stop: function(this: ConfigService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const consumer = service.consumer; - if (consumer !== null) { - yield* Effect.tryPromise({ - try: () => consumer.close(), - catch: (cause) => configServiceError("close-consumer", cause), - }); - service.consumer = null; - } - const responseProducer = service.responseProducer; - if (responseProducer !== null) { - yield* Effect.tryPromise({ - try: () => responseProducer.close(), - catch: (cause) => configServiceError("close-response-producer", cause), - }); - service.responseProducer = null; - } - const pushProducer = service.pushProducer; - if (pushProducer !== null) { - yield* Effect.tryPromise({ - try: () => pushProducer.close(), - catch: (cause) => configServiceError("close-push-producer", cause), - }); - service.pushProducer = null; - } - yield* Effect.tryPromise({ + yield* handleOperationEffect(request).pipe( + Effect.flatMap(sendResponse), + Effect.catch((err) => + sendResponse({ + error: {type: "config-error", message: err.message}, + }) + ), + ); + }); + + const loadFromDiskEffect = Effect.fn("loadFromDiskEffect")(function* () { + if (persistPath === null) return; + const parsed = yield* readPersistedConfigEffect(persistPath); + if (parsed === null) return; + + const next = yield* SynchronizedRef.updateAndGet(state, (current) => ({ + ...current, + version: parsed.version ?? 0, + store: storeFromPersistedConfig(parsed), + })); + + yield* Effect.log(`[ConfigService] Loaded persisted config (version=${next.version}, workspaces=${next.store.size})`); + }); + + service = Object.assign(base, { + state, + persistPath, + handleMessage: (msg: Message) => Effect.runPromise(handleMessageEffect(msg)), + handleMessageEffect, + handleOperation: (request: ConfigRequest) => Effect.runPromise(handleOperationEffect(request)), + handleOperationEffect, + handleGet: (request: ConfigRequest) => handleGetWithState(stateSnapshot(state), request), + handlePut: (request: ConfigRequest) => Effect.runPromise(handlePutEffect(state, persistPath, request)), + handlePutEffect: (request: ConfigRequest) => handlePutEffect(state, persistPath, request), + handleDelete: (request: ConfigRequest) => Effect.runPromise(handleDeleteEffect(state, persistPath, request)), + handleDeleteEffect: (request: ConfigRequest) => handleDeleteEffect(state, persistPath, request), + handleList: (request: ConfigRequest) => handleListWithState(stateSnapshot(state), request), + handleGetValues: (request: ConfigRequest) => handleGetValuesWithState(stateSnapshot(state), request), + handleGetValuesAllWorkspaces: (request: ConfigRequest) => handleGetValuesAllWorkspacesWithState(stateSnapshot(state), request), + handleConfigDump: (request: ConfigRequest) => handleConfigDumpWithState(stateSnapshot(state), request), + pushConfig: () => Effect.runPromise(SynchronizedRef.get(state).pipe(Effect.flatMap(pushConfigWithStateEffect))), + pushConfigEffect: SynchronizedRef.get(state).pipe(Effect.flatMap(pushConfigWithStateEffect)), + persist: () => Effect.runPromise(SynchronizedRef.get(state).pipe(Effect.flatMap((current) => persistStateEffect(persistPath, current)))), + persistEffect: SynchronizedRef.get(state).pipe(Effect.flatMap((current) => persistStateEffect(persistPath, current))), + loadFromDisk: () => Effect.runPromise(loadFromDiskEffect()), + loadFromDiskEffect: loadFromDiskEffect(), + stop: () => + Effect.runPromise( + closeConfigResourcesEffect(state).pipe( + Effect.flatMap(() => + Effect.tryPromise({ try: () => baseStop(), catch: (cause) => configServiceError("stop", cause), - }); - }), - ); - - } + }) + ), + ), + ), }); + return service; } @@ -845,7 +893,7 @@ export const loadConfigServiceRuntimeConfig = Effect.fn("loadConfigServiceRuntim const persistPath = yield* optionalStringConfig("CONFIG_PERSIST_PATH"); return { ...processorConfig, - ...(persistPath !== undefined ? { persistPath } : {}), + ...(persistPath !== undefined ? {persistPath} : {}), } satisfies ConfigServiceConfig; }); @@ -855,6 +903,12 @@ export const program = makeProcessorProgram({ make: (config) => makeConfigService(config), }); +const configServiceRuntime = ManagedRuntime.make(Layer.empty); + export function run(): Promise { - return Effect.runPromise(program); + return configServiceRuntime.runPromise(program); +} + +export function runMain(): void { + NodeRuntime.runMain(program); } diff --git a/ts/scripts/run-config.ts b/ts/scripts/run-config.ts index 73b1ca09..73b2881b 100644 --- a/ts/scripts/run-config.ts +++ b/ts/scripts/run-config.ts @@ -7,9 +7,6 @@ * NATS_URL (default: nats://localhost:4222) * CONFIG_PERSIST_PATH (optional, e.g., ./data/config.json) */ -import { run } from "../packages/flow/src/config/service.js"; +import {runMain} from "../packages/flow/src/config/service.js"; -run().catch((err) => { - console.error("Config service failed:", err); - process.exit(1); -}); +runMain();