Use Effect fn for knowledge core helpers

This commit is contained in:
elpresidank 2026-06-04 06:28:27 -05:00
parent 475bc3cb6c
commit 3890a598b5
2 changed files with 120 additions and 104 deletions

View file

@ -1920,6 +1920,28 @@ Notes:
- `cd ts && bun run lint` - `cd ts && bun run lint`
- `git diff --check` - `git diff --check`
### 2026-06-04: KnowledgeCore Effect.fn Helper Slice
- Status: migrated and package-verified.
- Completed:
- `ts/packages/flow/src/cores/service.ts` now defines its reusable
generator helpers with `Effect.fn` or `Effect.fnUntraced` instead of
arrow functions returning `Effect.gen`.
- `sendResponse` and `consumeOnceEffect` use `Effect.fnUntraced` because
they are small hot-path helper functions.
- `readPersistedKnowledgeEffect`, `persistStateEffect`,
`closeKnowledgeResourcesEffect`, and `runKnowledgeCoreServiceEffect` use
named `Effect.fn` wrappers while preserving their existing catch/logging
behavior.
- The focused scan for `=> Effect.gen` in `cores/service.ts` is clean.
- Verification:
- `cd ts/packages/flow && bunx --bun vitest run src/__tests__/knowledge-core-service.test.ts`
- `cd ts && bun run check:tsgo`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `cd ts && bun run lint`
- `git diff --check`
## Subagent Findings To Preserve ## Subagent Findings To Preserve
- MCP/workbench: - MCP/workbench:
@ -2045,9 +2067,9 @@ Notes:
- Gateway dispatcher static service registries, streaming membership, and - Gateway dispatcher static service registries, streaming membership, and
scoped requestor cache now use Effect `HashMap`/`HashSet`; gateway scoped requestor cache now use Effect `HashMap`/`HashSet`; gateway
term-bearing service membership sets now use Effect `HashSet` too. term-bearing service membership sets now use Effect `HashSet` too.
- FlowManager `() => Effect.gen(...)` factories are normalized to - FlowManager and KnowledgeCore `() => Effect.gen(...)` factories are
`Effect.fn` / `Effect.fnUntraced`. Sibling service factories still need a normalized to `Effect.fn` / `Effect.fnUntraced`. Config and Librarian
focused scan before treating them as valid migration targets. helper factories still need focused follow-up slices.
- ConfigService and KnowledgeCore operation dispatch now use `effect/Match` - ConfigService and KnowledgeCore operation dispatch now use `effect/Match`
with `Match.exhaustive`; FlowManager and Librarian operation dispatch now with `Match.exhaustive`; FlowManager and Librarian operation dispatch now
use `effect/Match` with runtime-preserving `Match.orElse` fallbacks. use `effect/Match` with runtime-preserving `Match.orElse` fallbacks.

View file

@ -236,28 +236,22 @@ const closeResource = (
), ),
); );
const sendResponse = ( const sendResponse = Effect.fnUntraced(function* (
stateRef: SynchronizedRef.SynchronizedRef<KnowledgeCoreServiceState>, stateRef: SynchronizedRef.SynchronizedRef<KnowledgeCoreServiceState>,
response: KnowledgeResponse, response: KnowledgeResponse,
requestId: string, requestId: string,
operation = "respond", operation = "respond",
): Effect.Effect<void, KnowledgeCoreServiceError> => ) {
Effect.gen(function* () { const responseProducer = (yield* SynchronizedRef.get(stateRef)).responseProducer;
const responseProducer = (yield* SynchronizedRef.get(stateRef)).responseProducer; if (responseProducer === null) {
if (responseProducer === null) { return yield* knowledgeCoreServiceError(operation, "Knowledge response producer not started");
return yield* knowledgeCoreServiceError(operation, "Knowledge response producer not started"); }
}
yield* tryPromise(operation, () => responseProducer.send(response, {id: requestId})); yield* tryPromise(operation, () => responseProducer.send(response, {id: requestId}));
}); });
const readPersistedKnowledgeEffect = ( const readPersistedKnowledgeEffect = Effect.fn("KnowledgeCoreService.readPersistedKnowledge")(
persistPath: string, function* (persistPath: string) {
): Effect.Effect<{
readonly kgCores: KnowledgeCoreStore;
readonly deCores: DocumentCoreStore;
} | null, never> =>
Effect.gen(function* () {
const raw = yield* tryPromise("load-read", () => readTextFile(persistPath)); const raw = yield* tryPromise("load-read", () => readTextFile(persistPath));
const current = S.decodeUnknownOption(PersistedKnowledgeSnapshotJsonSchema)(raw); const current = S.decodeUnknownOption(PersistedKnowledgeSnapshotJsonSchema)(raw);
if (O.isSome(current)) { if (O.isSome(current)) {
@ -276,36 +270,39 @@ const readPersistedKnowledgeEffect = (
} }
return yield* knowledgeCoreServiceError("load-decode", "Persisted knowledge state did not match any known shape"); return yield* knowledgeCoreServiceError("load-decode", "Persisted knowledge state did not match any known shape");
}).pipe( },
Effect.catch(() => (effect) =>
Effect.log("[KnowledgeCoreService] No persisted state found, starting fresh").pipe( effect.pipe(
Effect.flatMap(() => Effect.catch(() =>
Effect.succeed<{ Effect.log("[KnowledgeCoreService] No persisted state found, starting fresh").pipe(
readonly kgCores: KnowledgeCoreStore; Effect.flatMap(() =>
readonly deCores: DocumentCoreStore; Effect.succeed<{
} | null>(null) readonly kgCores: KnowledgeCoreStore;
), readonly deCores: DocumentCoreStore;
) } | null>(null)
),
)
),
), ),
); );
const persistStateEffect = ( const persistStateEffect = Effect.fn("KnowledgeCoreService.persistState")(
persistPath: string, function* (persistPath: string, state: KnowledgeCoreServiceState) {
state: KnowledgeCoreServiceState,
): Effect.Effect<void, never> =>
Effect.gen(function* () {
const snapshot = toPersistedSnapshot(state); const snapshot = toPersistedSnapshot(state);
const json = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(snapshot).pipe( const json = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(snapshot).pipe(
Effect.mapError((cause) => knowledgeCoreServiceError("persist-encode", cause)), Effect.mapError((cause) => knowledgeCoreServiceError("persist-encode", cause)),
); );
yield* tryPromise("persist-write", () => writeTextFile(persistPath, json)); yield* tryPromise("persist-write", () => writeTextFile(persistPath, json));
}).pipe( },
Effect.catch((error) => (effect) =>
Effect.logError("[KnowledgeCoreService] Failed to persist state", { effect.pipe(
error: error.message, Effect.catch((error) =>
}), Effect.logError("[KnowledgeCoreService] Failed to persist state", {
error: error.message,
}),
),
), ),
); );
const listIds = ( const listIds = (
store: ReadonlyMap<string, unknown>, store: ReadonlyMap<string, unknown>,
@ -323,86 +320,83 @@ const listIds = (
return ids; return ids;
}; };
const closeKnowledgeResourcesEffect = ( const closeKnowledgeResourcesEffect = Effect.fn("KnowledgeCoreService.closeResources")(function* (
stateRef: SynchronizedRef.SynchronizedRef<KnowledgeCoreServiceState>, stateRef: SynchronizedRef.SynchronizedRef<KnowledgeCoreServiceState>,
): Effect.Effect<void, KnowledgeCoreServiceError> => ) {
Effect.gen(function* () { const state = yield* SynchronizedRef.get(stateRef);
const state = yield* SynchronizedRef.get(stateRef);
const consumer = state.consumer; const consumer = state.consumer;
if (consumer !== null) { if (consumer !== null) {
yield* tryPromise("close-consumer", () => consumer.close()); yield* tryPromise("close-consumer", () => consumer.close());
} }
const responseProducer = state.responseProducer; const responseProducer = state.responseProducer;
if (responseProducer !== null) { if (responseProducer !== null) {
yield* tryPromise("close-response-producer", () => responseProducer.close()); yield* tryPromise("close-response-producer", () => responseProducer.close());
} }
yield* updateHandles(stateRef, { yield* updateHandles(stateRef, {
consumer: null, consumer: null,
responseProducer: null, responseProducer: null,
});
}); });
});
const consumeOnceEffect = ( const consumeOnceEffect = Effect.fnUntraced(function* (
service: KnowledgeCoreService, service: KnowledgeCoreService,
): Effect.Effect<void, KnowledgeCoreServiceError> => ) {
Effect.gen(function* () { const consumer = (yield* SynchronizedRef.get(service.state)).consumer;
const consumer = (yield* SynchronizedRef.get(service.state)).consumer; if (consumer === null) {
if (consumer === null) { return yield* knowledgeCoreServiceError("consume", "Knowledge request consumer not started");
return yield* knowledgeCoreServiceError("consume", "Knowledge request consumer not started"); }
}
const msg = yield* tryPromise("consume-receive", () => consumer.receive(2000)); const msg = yield* tryPromise("consume-receive", () => consumer.receive(2000));
if (msg === null) return; if (msg === null) return;
yield* service.handleMessageEffect(msg); yield* service.handleMessageEffect(msg);
yield* tryPromise("consume-acknowledge", () => consumer.acknowledge(msg)); yield* tryPromise("consume-acknowledge", () => consumer.acknowledge(msg));
}); });
const runKnowledgeCoreServiceEffect = ( const runKnowledgeCoreServiceEffect = Effect.fn("KnowledgeCoreService.run")(function* (
service: KnowledgeCoreService, service: KnowledgeCoreService,
): Effect.Effect<void, KnowledgeCoreServiceError> => ) {
Effect.gen(function* () { yield* tryPromise("ensure-directory", () => ensureDirectory(service.dataDir));
yield* tryPromise("ensure-directory", () => ensureDirectory(service.dataDir)); yield* service.loadFromDiskEffect;
yield* service.loadFromDiskEffect;
const responseProducer = yield* tryPromise("response-producer", () => const responseProducer = yield* tryPromise("response-producer", () =>
service.pubsub.createProducer<KnowledgeResponse>({ service.pubsub.createProducer<KnowledgeResponse>({
topic: topics.knowledgeResponse, topic: topics.knowledgeResponse,
schema: KnowledgeResponseSchema, schema: KnowledgeResponseSchema,
}), }),
); );
yield* updateHandles(service.state, {responseProducer}); yield* updateHandles(service.state, {responseProducer});
const consumer = yield* tryPromise("consumer", () => const consumer = yield* tryPromise("consumer", () =>
service.pubsub.createConsumer<KnowledgeRequest>({ service.pubsub.createConsumer<KnowledgeRequest>({
topic: topics.knowledgeRequest, topic: topics.knowledgeRequest,
subscription: `${service.config.id}-knowledge-request`, subscription: `${service.config.id}-knowledge-request`,
schema: KnowledgeRequestSchema, schema: KnowledgeRequestSchema,
}), }),
); );
yield* updateHandles(service.state, {consumer}); yield* updateHandles(service.state, {consumer});
yield* Effect.log(`[KnowledgeCoreService] Listening on ${topics.knowledgeRequest}`); yield* Effect.log(`[KnowledgeCoreService] Listening on ${topics.knowledgeRequest}`);
yield* Effect.whileLoop({ yield* Effect.whileLoop({
while: () => service.running, while: () => service.running,
body: () => body: () =>
consumeOnceEffect(service).pipe( consumeOnceEffect(service).pipe(
Effect.catch((error) => { Effect.catch((error) => {
if (!service.running) return Effect.void; if (!service.running) return Effect.void;
return Effect.logError("[KnowledgeCoreService] Error in consume loop", { return Effect.logError("[KnowledgeCoreService] Error in consume loop", {
error: error.message, error: error.message,
}).pipe( }).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
); );
}), }),
), ),
step: () => undefined, step: () => undefined,
});
}); });
});
const listKgCoresEffect = ( const listKgCoresEffect = (
stateRef: SynchronizedRef.SynchronizedRef<KnowledgeCoreServiceState>, stateRef: SynchronizedRef.SynchronizedRef<KnowledgeCoreServiceState>,