From 6ba4cf3b326eadeab1e314a8182fd5ea479ce2b7 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Thu, 4 Jun 2026 06:31:28 -0500 Subject: [PATCH] Use Effect fn for config service helpers --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 27 +- ts/packages/flow/src/config/service.ts | 343 ++++++++++++------------- 2 files changed, 193 insertions(+), 177 deletions(-) diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index bb18e659..cbb6ebb8 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1942,6 +1942,27 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: Config Service Effect.fn Helper Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/config/service.ts` now defines its reusable + generator helpers with `Effect.fn` or `Effect.fnUntraced` instead of + arrow functions returning `Effect.gen`. + - `consumeOnceEffect` uses `Effect.fnUntraced`; persistence, push, read, + put/delete, close, and run helpers use named `Effect.fn` wrappers. + - Existing persistence read/write catch/logging behavior is preserved through + `Effect.fn` post-processing functions. + - The focused scan for config-service helper `=> Effect.gen` patterns is + clean. +- Verification: + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/config-service.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2067,9 +2088,9 @@ Notes: - Gateway dispatcher static service registries, streaming membership, and scoped requestor cache now use Effect `HashMap`/`HashSet`; gateway term-bearing service membership sets now use Effect `HashSet` too. - - FlowManager and KnowledgeCore `() => Effect.gen(...)` factories are - normalized to `Effect.fn` / `Effect.fnUntraced`. Config and Librarian - helper factories still need focused follow-up slices. + - FlowManager, KnowledgeCore, and ConfigService `() => Effect.gen(...)` + factories are normalized to `Effect.fn` / `Effect.fnUntraced`. Librarian + helper factories still need a focused follow-up slice. - ConfigService and KnowledgeCore operation dispatch now use `effect/Match` with `Match.exhaustive`; FlowManager and Librarian operation dispatch now use `effect/Match` with runtime-preserving `Match.orElse` fallbacks. diff --git a/ts/packages/flow/src/config/service.ts b/ts/packages/flow/src/config/service.ts index 1ce38948..46b7bb97 100644 --- a/ts/packages/flow/src/config/service.ts +++ b/ts/packages/flow/src/config/service.ts @@ -313,11 +313,8 @@ const updateHandles = ( pushProducer: handles.pushProducer === undefined ? state.pushProducer : handles.pushProducer, })); -const persistStateEffect = ( - persistPath: string | null, - state: ConfigServiceState, -): Effect.Effect => - Effect.gen(function* () { +const persistStateEffect = Effect.fn("ConfigService.persistState")( + function* (persistPath: string | null, state: ConfigServiceState) { if (persistPath === null) return; const payload = { version: state.version, @@ -332,35 +329,35 @@ const persistStateEffect = ( try: () => writeTextFile(persistPath, json), catch: (cause) => configServiceError("persist-write", cause), }); - }).pipe( - Effect.catch((err) => - Effect.logError("[ConfigService] Failed to persist config", {error: err.message}), + }, + (effect) => + effect.pipe( + Effect.catch((err) => + Effect.logError("[ConfigService] Failed to persist config", {error: err.message}), + ), ), - ); +); -const pushConfigWithStateEffect = ( +const pushConfigWithStateEffect = Effect.fn("ConfigService.pushConfigWithState")(function* ( state: ConfigServiceState, -): Effect.Effect => - Effect.gen(function* () { - const pushProducer = state.pushProducer; - if (pushProducer === null) return; +) { + const pushProducer = state.pushProducer; + if (pushProducer === null) return; - yield* Effect.tryPromise({ - try: () => - pushProducer.send({ - version: state.version, - config: configDumpForState(state), - }), - catch: (cause) => configServiceError("push-config", cause), - }); - - yield* Effect.log(`[ConfigService] Pushed configuration version ${state.version}`); + yield* Effect.tryPromise({ + try: () => + pushProducer.send({ + version: state.version, + config: configDumpForState(state), + }), + catch: (cause) => configServiceError("push-config", cause), }); -const readPersistedConfigEffect = ( - persistPath: string, -): Effect.Effect => - Effect.gen(function* () { + yield* Effect.log(`[ConfigService] Pushed configuration version ${state.version}`); +}); + +const readPersistedConfigEffect = Effect.fn("ConfigService.readPersistedConfig")( + function* (persistPath: string) { const raw = yield* Effect.tryPromise({ try: () => readTextFile(persistPath), catch: (cause) => configServiceError("persist-read", cause), @@ -368,11 +365,14 @@ const readPersistedConfigEffect = ( return yield* S.decodeUnknownEffect(PersistedConfigJsonSchema)(raw).pipe( Effect.mapError((cause) => configServiceError("persist-decode", cause)), ); - }).pipe( - Effect.catch(() => - Effect.log("[ConfigService] No persisted config found, starting fresh").pipe( - Effect.flatMap(() => Effect.succeed(null)), - ) + }, + (effect) => + effect.pipe( + Effect.catch(() => + Effect.log("[ConfigService] No persisted config found, starting fresh").pipe( + Effect.flatMap(() => Effect.succeed(null)), + ) + ), ), ); @@ -515,59 +515,57 @@ const applyDeleteStringKeys = ( }; }; -const handlePutEffect = ( +const handlePutEffect = Effect.fn("ConfigService.handlePut")(function* ( stateRef: SynchronizedRef.SynchronizedRef, persistPath: string | null, request: ConfigRequest, -): Effect.Effect => - Effect.gen(function* () { - const values = configValues(request); - if (values.length === 0) return yield* configServiceError("put", "Put requires config values"); +) { + const values = configValues(request); + if (values.length === 0) return yield* configServiceError("put", "Put requires config values"); - const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => applyPut(state, values)); - yield* persistStateEffect(persistPath, next); - yield* pushConfigWithStateEffect(next); + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => applyPut(state, values)); + yield* persistStateEffect(persistPath, next); + yield* pushConfigWithStateEffect(next); - return {version: next.version}; - }); + return {version: next.version}; +}); -const handleDeleteEffect = ( +const handleDeleteEffect = Effect.fn("ConfigService.handleDelete")(function* ( stateRef: SynchronizedRef.SynchronizedRef, persistPath: string | null, request: ConfigRequest, -): Effect.Effect => - Effect.gen(function* () { - const workspace = workspaceFor(request); - const keysByObject = objectKeys(request); - - if (keysByObject.length > 0) { - const next = yield* SynchronizedRef.updateAndGet( - stateRef, - (state) => applyDeleteObjectKeys(state, workspace, keysByObject), - ); - yield* persistStateEffect(persistPath, next); - yield* pushConfigWithStateEffect(next); - return {version: next.version}; - } - - const keys = stringKeys(request); - if (keys.length === 0) { - return yield* configServiceError("delete", "Delete requires at least one key"); - } - - const previous = yield* SynchronizedRef.get(stateRef); - if (getWorkspaceStore(previous, workspace) === undefined) { - return {version: previous.version}; - } +) { + const workspace = workspaceFor(request); + const keysByObject = objectKeys(request); + if (keysByObject.length > 0) { const next = yield* SynchronizedRef.updateAndGet( stateRef, - (state) => applyDeleteStringKeys(state, workspace, keys), + (state) => applyDeleteObjectKeys(state, workspace, keysByObject), ); yield* persistStateEffect(persistPath, next); yield* pushConfigWithStateEffect(next); return {version: next.version}; - }); + } + + const keys = stringKeys(request); + if (keys.length === 0) { + return yield* configServiceError("delete", "Delete requires at least one key"); + } + + const previous = yield* SynchronizedRef.get(stateRef); + if (getWorkspaceStore(previous, workspace) === undefined) { + return {version: previous.version}; + } + + const next = yield* SynchronizedRef.updateAndGet( + stateRef, + (state) => applyDeleteStringKeys(state, workspace, keys), + ); + yield* persistStateEffect(persistPath, next); + yield* pushConfigWithStateEffect(next); + return {version: next.version}; +}); const handleListWithState = ( state: ConfigServiceState, @@ -639,118 +637,115 @@ const handleConfigDumpWithState = ( config: configDumpForState(state, workspaceFor(request)), }); -const closeConfigResourcesEffect = ( +const closeConfigResourcesEffect = Effect.fn("ConfigService.closeResources")(function* ( stateRef: SynchronizedRef.SynchronizedRef, -): Effect.Effect => - Effect.gen(function* () { - const state = yield* SynchronizedRef.get(stateRef); +) { + const state = yield* SynchronizedRef.get(stateRef); - const consumer = state.consumer; - if (consumer !== null) { - yield* Effect.tryPromise({ - try: () => consumer.close(), - catch: (cause) => configServiceError("close-consumer", cause), - }); - } - const responseProducer = state.responseProducer; - if (responseProducer !== null) { - yield* Effect.tryPromise({ - try: () => responseProducer.close(), - catch: (cause) => configServiceError("close-response-producer", cause), - }); - } - const pushProducer = state.pushProducer; - if (pushProducer !== null) { - yield* Effect.tryPromise({ - try: () => pushProducer.close(), - catch: (cause) => configServiceError("close-push-producer", cause), - }); - } - - yield* updateHandles(stateRef, { - consumer: null, - responseProducer: null, - pushProducer: null, - }); - }); - -const consumeOnceEffect = ( - service: ConfigService, -): Effect.Effect => - Effect.gen(function* () { - const state = yield* SynchronizedRef.get(service.state); - const consumer = state.consumer; - if (consumer === null) { - return yield* configServiceError("consume", "Config consumer not started"); - } - - const msg = yield* Effect.tryPromise({ - try: () => consumer.receive(2000), - catch: (cause) => configServiceError("consume-receive", cause), - }); - if (msg === null) return; - - yield* service.handleMessageEffect(msg); + const consumer = state.consumer; + if (consumer !== null) { yield* Effect.tryPromise({ - try: () => consumer.acknowledge(msg), - catch: (cause) => configServiceError("consume-acknowledge", cause), + try: () => consumer.close(), + catch: (cause) => configServiceError("close-consumer", cause), }); - }); + } + const responseProducer = state.responseProducer; + if (responseProducer !== null) { + yield* Effect.tryPromise({ + try: () => responseProducer.close(), + catch: (cause) => configServiceError("close-response-producer", cause), + }); + } + const pushProducer = state.pushProducer; + if (pushProducer !== null) { + yield* Effect.tryPromise({ + try: () => pushProducer.close(), + catch: (cause) => configServiceError("close-push-producer", cause), + }); + } -const runConfigServiceEffect = ( + yield* updateHandles(stateRef, { + consumer: null, + responseProducer: null, + pushProducer: null, + }); +}); + +const consumeOnceEffect = Effect.fnUntraced(function* ( service: ConfigService, -): Effect.Effect => - Effect.gen(function* () { - yield* service.loadFromDiskEffect; +) { + const state = yield* SynchronizedRef.get(service.state); + const consumer = state.consumer; + if (consumer === null) { + return yield* configServiceError("consume", "Config consumer not started"); + } - const responseProducer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createProducer({ - topic: topics.configResponse, - schema: ConfigResponseSchema, - }), - catch: (cause) => configServiceError("response-producer", cause), - }); - yield* updateHandles(service.state, {responseProducer}); - - const pushProducer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createProducer({ - topic: topics.configPush, - schema: ConfigPushSchema, - }), - catch: (cause) => configServiceError("push-producer", cause), - }); - yield* updateHandles(service.state, {pushProducer}); - - const consumer = yield* Effect.tryPromise({ - try: () => - service.pubsub.createConsumer({ - topic: topics.configRequest, - subscription: `${service.config.id}-config-request`, - schema: ConfigRequestSchema, - }), - catch: (cause) => configServiceError("consumer", cause), - }); - const state = yield* updateHandles(service.state, {consumer}); - - yield* pushConfigWithStateEffect(state); - yield* Effect.log(`[ConfigService] Listening on ${topics.configRequest}`); - - yield* Effect.whileLoop({ - while: () => service.running, - body: () => - consumeOnceEffect(service).pipe( - Effect.catch((err) => { - if (!service.running) return Effect.void; - return Effect.logError("[ConfigService] Error in consume loop", {error: err.message}).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), - ); - }), - ), - step: () => undefined, - }); + const msg = yield* Effect.tryPromise({ + try: () => consumer.receive(2000), + catch: (cause) => configServiceError("consume-receive", cause), }); + if (msg === null) return; + + yield* service.handleMessageEffect(msg); + yield* Effect.tryPromise({ + try: () => consumer.acknowledge(msg), + catch: (cause) => configServiceError("consume-acknowledge", cause), + }); +}); + +const runConfigServiceEffect = Effect.fn("ConfigService.run")(function* ( + service: ConfigService, +) { + yield* service.loadFromDiskEffect; + + const responseProducer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createProducer({ + topic: topics.configResponse, + schema: ConfigResponseSchema, + }), + catch: (cause) => configServiceError("response-producer", cause), + }); + yield* updateHandles(service.state, {responseProducer}); + + const pushProducer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createProducer({ + topic: topics.configPush, + schema: ConfigPushSchema, + }), + catch: (cause) => configServiceError("push-producer", cause), + }); + yield* updateHandles(service.state, {pushProducer}); + + const consumer = yield* Effect.tryPromise({ + try: () => + service.pubsub.createConsumer({ + topic: topics.configRequest, + subscription: `${service.config.id}-config-request`, + schema: ConfigRequestSchema, + }), + catch: (cause) => configServiceError("consumer", cause), + }); + const state = yield* updateHandles(service.state, {consumer}); + + yield* pushConfigWithStateEffect(state); + yield* Effect.log(`[ConfigService] Listening on ${topics.configRequest}`); + + yield* Effect.whileLoop({ + while: () => service.running, + body: () => + consumeOnceEffect(service).pipe( + Effect.catch((err) => { + if (!service.running) return Effect.void; + return Effect.logError("[ConfigService] Error in consume loop", {error: err.message}).pipe( + Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), + ); + }), + ), + step: () => undefined, + }); +}); export function makeConfigService(config: ConfigServiceConfig): ConfigService { const state = SynchronizedRef.makeUnsafe(initialState());