diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index bb1a8aee..bb18e659 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -1920,6 +1920,28 @@ Notes: - `cd ts && bun run lint` - `git diff --check` +### 2026-06-04: KnowledgeCore Effect.fn Helper Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/cores/service.ts` now defines its reusable + generator helpers with `Effect.fn` or `Effect.fnUntraced` instead of + arrow functions returning `Effect.gen`. + - `sendResponse` and `consumeOnceEffect` use `Effect.fnUntraced` because + they are small hot-path helper functions. + - `readPersistedKnowledgeEffect`, `persistStateEffect`, + `closeKnowledgeResourcesEffect`, and `runKnowledgeCoreServiceEffect` use + named `Effect.fn` wrappers while preserving their existing catch/logging + behavior. + - The focused scan for `=> Effect.gen` in `cores/service.ts` is clean. +- Verification: + - `cd ts/packages/flow && bunx --bun vitest run src/__tests__/knowledge-core-service.test.ts` + - `cd ts && bun run check:tsgo` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `cd ts && bun run lint` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -2045,9 +2067,9 @@ Notes: - Gateway dispatcher static service registries, streaming membership, and scoped requestor cache now use Effect `HashMap`/`HashSet`; gateway term-bearing service membership sets now use Effect `HashSet` too. - - FlowManager `() => Effect.gen(...)` factories are normalized to - `Effect.fn` / `Effect.fnUntraced`. Sibling service factories still need a - focused scan before treating them as valid migration targets. + - FlowManager and KnowledgeCore `() => Effect.gen(...)` factories are + normalized to `Effect.fn` / `Effect.fnUntraced`. Config and Librarian + helper factories still need focused follow-up slices. - ConfigService and KnowledgeCore operation dispatch now use `effect/Match` with `Match.exhaustive`; FlowManager and Librarian operation dispatch now use `effect/Match` with runtime-preserving `Match.orElse` fallbacks. diff --git a/ts/packages/flow/src/cores/service.ts b/ts/packages/flow/src/cores/service.ts index 37c656e5..292d5fb8 100644 --- a/ts/packages/flow/src/cores/service.ts +++ b/ts/packages/flow/src/cores/service.ts @@ -236,28 +236,22 @@ const closeResource = ( ), ); -const sendResponse = ( +const sendResponse = Effect.fnUntraced(function* ( stateRef: SynchronizedRef.SynchronizedRef, response: KnowledgeResponse, requestId: string, operation = "respond", -): Effect.Effect => - Effect.gen(function* () { - const responseProducer = (yield* SynchronizedRef.get(stateRef)).responseProducer; - if (responseProducer === null) { - return yield* knowledgeCoreServiceError(operation, "Knowledge response producer not started"); - } +) { + const responseProducer = (yield* SynchronizedRef.get(stateRef)).responseProducer; + if (responseProducer === null) { + return yield* knowledgeCoreServiceError(operation, "Knowledge response producer not started"); + } - yield* tryPromise(operation, () => responseProducer.send(response, {id: requestId})); - }); + yield* tryPromise(operation, () => responseProducer.send(response, {id: requestId})); +}); -const readPersistedKnowledgeEffect = ( - persistPath: string, -): Effect.Effect<{ - readonly kgCores: KnowledgeCoreStore; - readonly deCores: DocumentCoreStore; -} | null, never> => - Effect.gen(function* () { +const readPersistedKnowledgeEffect = Effect.fn("KnowledgeCoreService.readPersistedKnowledge")( + function* (persistPath: string) { const raw = yield* tryPromise("load-read", () => readTextFile(persistPath)); const current = S.decodeUnknownOption(PersistedKnowledgeSnapshotJsonSchema)(raw); if (O.isSome(current)) { @@ -276,36 +270,39 @@ const readPersistedKnowledgeEffect = ( } return yield* knowledgeCoreServiceError("load-decode", "Persisted knowledge state did not match any known shape"); - }).pipe( - Effect.catch(() => - Effect.log("[KnowledgeCoreService] No persisted state found, starting fresh").pipe( - Effect.flatMap(() => - Effect.succeed<{ - readonly kgCores: KnowledgeCoreStore; - readonly deCores: DocumentCoreStore; - } | null>(null) - ), - ) + }, + (effect) => + effect.pipe( + Effect.catch(() => + Effect.log("[KnowledgeCoreService] No persisted state found, starting fresh").pipe( + Effect.flatMap(() => + Effect.succeed<{ + readonly kgCores: KnowledgeCoreStore; + readonly deCores: DocumentCoreStore; + } | null>(null) + ), + ) + ), ), ); -const persistStateEffect = ( - persistPath: string, - state: KnowledgeCoreServiceState, -): Effect.Effect => - Effect.gen(function* () { +const persistStateEffect = Effect.fn("KnowledgeCoreService.persistState")( + function* (persistPath: string, state: KnowledgeCoreServiceState) { const snapshot = toPersistedSnapshot(state); const json = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(snapshot).pipe( Effect.mapError((cause) => knowledgeCoreServiceError("persist-encode", cause)), ); yield* tryPromise("persist-write", () => writeTextFile(persistPath, json)); - }).pipe( - Effect.catch((error) => - Effect.logError("[KnowledgeCoreService] Failed to persist state", { - error: error.message, - }), + }, + (effect) => + effect.pipe( + Effect.catch((error) => + Effect.logError("[KnowledgeCoreService] Failed to persist state", { + error: error.message, + }), + ), ), - ); +); const listIds = ( store: ReadonlyMap, @@ -323,86 +320,83 @@ const listIds = ( return ids; }; -const closeKnowledgeResourcesEffect = ( +const closeKnowledgeResourcesEffect = Effect.fn("KnowledgeCoreService.closeResources")(function* ( stateRef: SynchronizedRef.SynchronizedRef, -): Effect.Effect => - Effect.gen(function* () { - const state = yield* SynchronizedRef.get(stateRef); +) { + const state = yield* SynchronizedRef.get(stateRef); - const consumer = state.consumer; - if (consumer !== null) { - yield* tryPromise("close-consumer", () => consumer.close()); - } + const consumer = state.consumer; + if (consumer !== null) { + yield* tryPromise("close-consumer", () => consumer.close()); + } - const responseProducer = state.responseProducer; - if (responseProducer !== null) { - yield* tryPromise("close-response-producer", () => responseProducer.close()); - } + const responseProducer = state.responseProducer; + if (responseProducer !== null) { + yield* tryPromise("close-response-producer", () => responseProducer.close()); + } - yield* updateHandles(stateRef, { - consumer: null, - responseProducer: null, - }); + yield* updateHandles(stateRef, { + consumer: null, + responseProducer: null, }); +}); -const consumeOnceEffect = ( +const consumeOnceEffect = Effect.fnUntraced(function* ( service: KnowledgeCoreService, -): Effect.Effect => - Effect.gen(function* () { - const consumer = (yield* SynchronizedRef.get(service.state)).consumer; - if (consumer === null) { - return yield* knowledgeCoreServiceError("consume", "Knowledge request consumer not started"); - } +) { + const consumer = (yield* SynchronizedRef.get(service.state)).consumer; + if (consumer === null) { + return yield* knowledgeCoreServiceError("consume", "Knowledge request consumer not started"); + } - const msg = yield* tryPromise("consume-receive", () => consumer.receive(2000)); - if (msg === null) return; + const msg = yield* tryPromise("consume-receive", () => consumer.receive(2000)); + if (msg === null) return; - yield* service.handleMessageEffect(msg); - yield* tryPromise("consume-acknowledge", () => consumer.acknowledge(msg)); - }); + yield* service.handleMessageEffect(msg); + yield* tryPromise("consume-acknowledge", () => consumer.acknowledge(msg)); +}); -const runKnowledgeCoreServiceEffect = ( +const runKnowledgeCoreServiceEffect = Effect.fn("KnowledgeCoreService.run")(function* ( service: KnowledgeCoreService, -): Effect.Effect => - Effect.gen(function* () { - yield* tryPromise("ensure-directory", () => ensureDirectory(service.dataDir)); - yield* service.loadFromDiskEffect; +) { + yield* tryPromise("ensure-directory", () => ensureDirectory(service.dataDir)); + yield* service.loadFromDiskEffect; - const responseProducer = yield* tryPromise("response-producer", () => - service.pubsub.createProducer({ - topic: topics.knowledgeResponse, - schema: KnowledgeResponseSchema, - }), - ); - yield* updateHandles(service.state, {responseProducer}); + const responseProducer = yield* tryPromise("response-producer", () => + service.pubsub.createProducer({ + topic: topics.knowledgeResponse, + schema: KnowledgeResponseSchema, + }), + ); + yield* updateHandles(service.state, {responseProducer}); - const consumer = yield* tryPromise("consumer", () => - service.pubsub.createConsumer({ - topic: topics.knowledgeRequest, - subscription: `${service.config.id}-knowledge-request`, - schema: KnowledgeRequestSchema, - }), - ); - yield* updateHandles(service.state, {consumer}); + const consumer = yield* tryPromise("consumer", () => + service.pubsub.createConsumer({ + topic: topics.knowledgeRequest, + subscription: `${service.config.id}-knowledge-request`, + schema: KnowledgeRequestSchema, + }), + ); + yield* updateHandles(service.state, {consumer}); - yield* Effect.log(`[KnowledgeCoreService] Listening on ${topics.knowledgeRequest}`); + yield* Effect.log(`[KnowledgeCoreService] Listening on ${topics.knowledgeRequest}`); - yield* Effect.whileLoop({ - while: () => service.running, - body: () => - consumeOnceEffect(service).pipe( - Effect.catch((error) => { - if (!service.running) return Effect.void; - return Effect.logError("[KnowledgeCoreService] Error in consume loop", { - error: error.message, - }).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), - ); - }), - ), - step: () => undefined, - }); + yield* Effect.whileLoop({ + while: () => service.running, + body: () => + consumeOnceEffect(service).pipe( + Effect.catch((error) => { + if (!service.running) return Effect.void; + return Effect.logError("[KnowledgeCoreService] Error in consume loop", { + error: error.message, + }).pipe( + Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), + ); + }), + ), + step: () => undefined, }); +}); const listKgCoresEffect = ( stateRef: SynchronizedRef.SynchronizedRef,