Use Effect fn for config service helpers

This commit is contained in:
elpresidank 2026-06-04 06:31:28 -05:00
parent 3890a598b5
commit 6ba4cf3b32
2 changed files with 193 additions and 177 deletions

View file

@ -1942,6 +1942,27 @@ Notes:
- `cd ts && bun run lint`
- `git diff --check`
### 2026-06-04: Config Service Effect.fn Helper Slice
- Status: migrated and package-verified.
- Completed:
- `ts/packages/flow/src/config/service.ts` now defines its reusable
generator helpers with `Effect.fn` or `Effect.fnUntraced` instead of
arrow functions returning `Effect.gen`.
- `consumeOnceEffect` uses `Effect.fnUntraced`; persistence, push, read,
put/delete, close, and run helpers use named `Effect.fn` wrappers.
- Existing persistence read/write catch/logging behavior is preserved through
`Effect.fn` post-processing functions.
- The focused scan for config-service helper `=> Effect.gen` patterns is
clean.
- Verification:
- `cd ts/packages/flow && bunx --bun vitest run src/__tests__/config-service.test.ts`
- `cd ts && bun run check:tsgo`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `cd ts && bun run lint`
- `git diff --check`
## Subagent Findings To Preserve
- MCP/workbench:
@ -2067,9 +2088,9 @@ Notes:
- Gateway dispatcher static service registries, streaming membership, and
scoped requestor cache now use Effect `HashMap`/`HashSet`; gateway
term-bearing service membership sets now use Effect `HashSet` too.
- FlowManager and KnowledgeCore `() => Effect.gen(...)` factories are
normalized to `Effect.fn` / `Effect.fnUntraced`. Config and Librarian
helper factories still need focused follow-up slices.
- FlowManager, KnowledgeCore, and ConfigService `() => Effect.gen(...)`
factories are normalized to `Effect.fn` / `Effect.fnUntraced`. Librarian
helper factories still need a focused follow-up slice.
- ConfigService and KnowledgeCore operation dispatch now use `effect/Match`
with `Match.exhaustive`; FlowManager and Librarian operation dispatch now
use `effect/Match` with runtime-preserving `Match.orElse` fallbacks.

View file

@ -313,11 +313,8 @@ const updateHandles = (
pushProducer: handles.pushProducer === undefined ? state.pushProducer : handles.pushProducer,
}));
const persistStateEffect = (
persistPath: string | null,
state: ConfigServiceState,
): Effect.Effect<void> =>
Effect.gen(function* () {
const persistStateEffect = Effect.fn("ConfigService.persistState")(
function* (persistPath: string | null, state: ConfigServiceState) {
if (persistPath === null) return;
const payload = {
version: state.version,
@ -332,35 +329,35 @@ const persistStateEffect = (
try: () => writeTextFile(persistPath, json),
catch: (cause) => configServiceError("persist-write", cause),
});
}).pipe(
Effect.catch((err) =>
Effect.logError("[ConfigService] Failed to persist config", {error: err.message}),
},
(effect) =>
effect.pipe(
Effect.catch((err) =>
Effect.logError("[ConfigService] Failed to persist config", {error: err.message}),
),
),
);
);
const pushConfigWithStateEffect = (
const pushConfigWithStateEffect = Effect.fn("ConfigService.pushConfigWithState")(function* (
state: ConfigServiceState,
): Effect.Effect<void, ConfigServiceError> =>
Effect.gen(function* () {
const pushProducer = state.pushProducer;
if (pushProducer === null) return;
) {
const pushProducer = state.pushProducer;
if (pushProducer === null) return;
yield* Effect.tryPromise({
try: () =>
pushProducer.send({
version: state.version,
config: configDumpForState(state),
}),
catch: (cause) => configServiceError("push-config", cause),
});
yield* Effect.log(`[ConfigService] Pushed configuration version ${state.version}`);
yield* Effect.tryPromise({
try: () =>
pushProducer.send({
version: state.version,
config: configDumpForState(state),
}),
catch: (cause) => configServiceError("push-config", cause),
});
const readPersistedConfigEffect = (
persistPath: string,
): Effect.Effect<PersistedConfig | null> =>
Effect.gen(function* () {
yield* Effect.log(`[ConfigService] Pushed configuration version ${state.version}`);
});
const readPersistedConfigEffect = Effect.fn("ConfigService.readPersistedConfig")(
function* (persistPath: string) {
const raw = yield* Effect.tryPromise({
try: () => readTextFile(persistPath),
catch: (cause) => configServiceError("persist-read", cause),
@ -368,11 +365,14 @@ const readPersistedConfigEffect = (
return yield* S.decodeUnknownEffect(PersistedConfigJsonSchema)(raw).pipe(
Effect.mapError((cause) => configServiceError("persist-decode", cause)),
);
}).pipe(
Effect.catch(() =>
Effect.log("[ConfigService] No persisted config found, starting fresh").pipe(
Effect.flatMap(() => Effect.succeed<PersistedConfig | null>(null)),
)
},
(effect) =>
effect.pipe(
Effect.catch(() =>
Effect.log("[ConfigService] No persisted config found, starting fresh").pipe(
Effect.flatMap(() => Effect.succeed<PersistedConfig | null>(null)),
)
),
),
);
@ -515,59 +515,57 @@ const applyDeleteStringKeys = (
};
};
const handlePutEffect = (
const handlePutEffect = Effect.fn("ConfigService.handlePut")(function* (
stateRef: SynchronizedRef.SynchronizedRef<ConfigServiceState>,
persistPath: string | null,
request: ConfigRequest,
): Effect.Effect<ConfigResponse, ConfigServiceError> =>
Effect.gen(function* () {
const values = configValues(request);
if (values.length === 0) return yield* configServiceError("put", "Put requires config values");
) {
const values = configValues(request);
if (values.length === 0) return yield* configServiceError("put", "Put requires config values");
const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => applyPut(state, values));
yield* persistStateEffect(persistPath, next);
yield* pushConfigWithStateEffect(next);
const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => applyPut(state, values));
yield* persistStateEffect(persistPath, next);
yield* pushConfigWithStateEffect(next);
return {version: next.version};
});
return {version: next.version};
});
const handleDeleteEffect = (
const handleDeleteEffect = Effect.fn("ConfigService.handleDelete")(function* (
stateRef: SynchronizedRef.SynchronizedRef<ConfigServiceState>,
persistPath: string | null,
request: ConfigRequest,
): Effect.Effect<ConfigResponse, ConfigServiceError> =>
Effect.gen(function* () {
const workspace = workspaceFor(request);
const keysByObject = objectKeys(request);
if (keysByObject.length > 0) {
const next = yield* SynchronizedRef.updateAndGet(
stateRef,
(state) => applyDeleteObjectKeys(state, workspace, keysByObject),
);
yield* persistStateEffect(persistPath, next);
yield* pushConfigWithStateEffect(next);
return {version: next.version};
}
const keys = stringKeys(request);
if (keys.length === 0) {
return yield* configServiceError("delete", "Delete requires at least one key");
}
const previous = yield* SynchronizedRef.get(stateRef);
if (getWorkspaceStore(previous, workspace) === undefined) {
return {version: previous.version};
}
) {
const workspace = workspaceFor(request);
const keysByObject = objectKeys(request);
if (keysByObject.length > 0) {
const next = yield* SynchronizedRef.updateAndGet(
stateRef,
(state) => applyDeleteStringKeys(state, workspace, keys),
(state) => applyDeleteObjectKeys(state, workspace, keysByObject),
);
yield* persistStateEffect(persistPath, next);
yield* pushConfigWithStateEffect(next);
return {version: next.version};
});
}
const keys = stringKeys(request);
if (keys.length === 0) {
return yield* configServiceError("delete", "Delete requires at least one key");
}
const previous = yield* SynchronizedRef.get(stateRef);
if (getWorkspaceStore(previous, workspace) === undefined) {
return {version: previous.version};
}
const next = yield* SynchronizedRef.updateAndGet(
stateRef,
(state) => applyDeleteStringKeys(state, workspace, keys),
);
yield* persistStateEffect(persistPath, next);
yield* pushConfigWithStateEffect(next);
return {version: next.version};
});
const handleListWithState = (
state: ConfigServiceState,
@ -639,118 +637,115 @@ const handleConfigDumpWithState = (
config: configDumpForState(state, workspaceFor(request)),
});
const closeConfigResourcesEffect = (
const closeConfigResourcesEffect = Effect.fn("ConfigService.closeResources")(function* (
stateRef: SynchronizedRef.SynchronizedRef<ConfigServiceState>,
): Effect.Effect<void, ConfigServiceError> =>
Effect.gen(function* () {
const state = yield* SynchronizedRef.get(stateRef);
) {
const state = yield* SynchronizedRef.get(stateRef);
const consumer = state.consumer;
if (consumer !== null) {
yield* Effect.tryPromise({
try: () => consumer.close(),
catch: (cause) => configServiceError("close-consumer", cause),
});
}
const responseProducer = state.responseProducer;
if (responseProducer !== null) {
yield* Effect.tryPromise({
try: () => responseProducer.close(),
catch: (cause) => configServiceError("close-response-producer", cause),
});
}
const pushProducer = state.pushProducer;
if (pushProducer !== null) {
yield* Effect.tryPromise({
try: () => pushProducer.close(),
catch: (cause) => configServiceError("close-push-producer", cause),
});
}
yield* updateHandles(stateRef, {
consumer: null,
responseProducer: null,
pushProducer: null,
});
});
const consumeOnceEffect = (
service: ConfigService,
): Effect.Effect<void, ConfigServiceError> =>
Effect.gen(function* () {
const state = yield* SynchronizedRef.get(service.state);
const consumer = state.consumer;
if (consumer === null) {
return yield* configServiceError("consume", "Config consumer not started");
}
const msg = yield* Effect.tryPromise({
try: () => consumer.receive(2000),
catch: (cause) => configServiceError("consume-receive", cause),
});
if (msg === null) return;
yield* service.handleMessageEffect(msg);
const consumer = state.consumer;
if (consumer !== null) {
yield* Effect.tryPromise({
try: () => consumer.acknowledge(msg),
catch: (cause) => configServiceError("consume-acknowledge", cause),
try: () => consumer.close(),
catch: (cause) => configServiceError("close-consumer", cause),
});
});
}
const responseProducer = state.responseProducer;
if (responseProducer !== null) {
yield* Effect.tryPromise({
try: () => responseProducer.close(),
catch: (cause) => configServiceError("close-response-producer", cause),
});
}
const pushProducer = state.pushProducer;
if (pushProducer !== null) {
yield* Effect.tryPromise({
try: () => pushProducer.close(),
catch: (cause) => configServiceError("close-push-producer", cause),
});
}
const runConfigServiceEffect = (
yield* updateHandles(stateRef, {
consumer: null,
responseProducer: null,
pushProducer: null,
});
});
const consumeOnceEffect = Effect.fnUntraced(function* (
service: ConfigService,
): Effect.Effect<void, ConfigServiceError> =>
Effect.gen(function* () {
yield* service.loadFromDiskEffect;
) {
const state = yield* SynchronizedRef.get(service.state);
const consumer = state.consumer;
if (consumer === null) {
return yield* configServiceError("consume", "Config consumer not started");
}
const responseProducer = yield* Effect.tryPromise({
try: () =>
service.pubsub.createProducer<ConfigResponse>({
topic: topics.configResponse,
schema: ConfigResponseSchema,
}),
catch: (cause) => configServiceError("response-producer", cause),
});
yield* updateHandles(service.state, {responseProducer});
const pushProducer = yield* Effect.tryPromise({
try: () =>
service.pubsub.createProducer<ConfigPush>({
topic: topics.configPush,
schema: ConfigPushSchema,
}),
catch: (cause) => configServiceError("push-producer", cause),
});
yield* updateHandles(service.state, {pushProducer});
const consumer = yield* Effect.tryPromise({
try: () =>
service.pubsub.createConsumer<ConfigRequest>({
topic: topics.configRequest,
subscription: `${service.config.id}-config-request`,
schema: ConfigRequestSchema,
}),
catch: (cause) => configServiceError("consumer", cause),
});
const state = yield* updateHandles(service.state, {consumer});
yield* pushConfigWithStateEffect(state);
yield* Effect.log(`[ConfigService] Listening on ${topics.configRequest}`);
yield* Effect.whileLoop({
while: () => service.running,
body: () =>
consumeOnceEffect(service).pipe(
Effect.catch((err) => {
if (!service.running) return Effect.void;
return Effect.logError("[ConfigService] Error in consume loop", {error: err.message}).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
);
}),
),
step: () => undefined,
});
const msg = yield* Effect.tryPromise({
try: () => consumer.receive(2000),
catch: (cause) => configServiceError("consume-receive", cause),
});
if (msg === null) return;
yield* service.handleMessageEffect(msg);
yield* Effect.tryPromise({
try: () => consumer.acknowledge(msg),
catch: (cause) => configServiceError("consume-acknowledge", cause),
});
});
const runConfigServiceEffect = Effect.fn("ConfigService.run")(function* (
service: ConfigService,
) {
yield* service.loadFromDiskEffect;
const responseProducer = yield* Effect.tryPromise({
try: () =>
service.pubsub.createProducer<ConfigResponse>({
topic: topics.configResponse,
schema: ConfigResponseSchema,
}),
catch: (cause) => configServiceError("response-producer", cause),
});
yield* updateHandles(service.state, {responseProducer});
const pushProducer = yield* Effect.tryPromise({
try: () =>
service.pubsub.createProducer<ConfigPush>({
topic: topics.configPush,
schema: ConfigPushSchema,
}),
catch: (cause) => configServiceError("push-producer", cause),
});
yield* updateHandles(service.state, {pushProducer});
const consumer = yield* Effect.tryPromise({
try: () =>
service.pubsub.createConsumer<ConfigRequest>({
topic: topics.configRequest,
subscription: `${service.config.id}-config-request`,
schema: ConfigRequestSchema,
}),
catch: (cause) => configServiceError("consumer", cause),
});
const state = yield* updateHandles(service.state, {consumer});
yield* pushConfigWithStateEffect(state);
yield* Effect.log(`[ConfigService] Listening on ${topics.configRequest}`);
yield* Effect.whileLoop({
while: () => service.running,
body: () =>
consumeOnceEffect(service).pipe(
Effect.catch((err) => {
if (!service.running) return Effect.void;
return Effect.logError("[ConfigService] Error in consume loop", {error: err.message}).pipe(
Effect.flatMap(() => Effect.sleep(Duration.millis(1000))),
);
}),
),
step: () => undefined,
});
});
export function makeConfigService(config: ConfigServiceConfig): ConfigService {
const state = SynchronizedRef.makeUnsafe(initialState());