Migrate flow manager to ref-backed Effect state

This commit is contained in:
elpresidank 2026-06-02 01:26:40 -05:00
parent ba64fc5add
commit 3809a38c46
3 changed files with 997 additions and 732 deletions

View file

@ -13,18 +13,18 @@ Verified source roots:
- Installed Effect beta used by this workspace: `ts/node_modules/effect` - Installed Effect beta used by this workspace: `ts/node_modules/effect`
Current signal counts from `ts/packages` after the 2026-06-02 Current signal counts from `ts/packages` after the 2026-06-02
flow-manager/librarian runtime normalization slice: FlowManager ref-backed state slice:
| Signal | Count | | Signal | Count |
| --- | ---: | | --- | ---: |
| `Effect.runPromise` | 198 | | `Effect.runPromise` | 204 |
| `Map<` | 71 | | `Map<` | 75 |
| `WebSocket` | 47 | | `WebSocket` | 47 |
| `new Map` | 53 | | `new Map` | 56 |
| `toPromiseRequestor` | 0 | | `toPromiseRequestor` | 0 |
| `makeAsyncProcessor` | 19 | | `makeAsyncProcessor` | 19 |
| `receive(` | 18 | | `receive(` | 18 |
| `while (` | 11 | | `while (` | 10 |
| `new Error` | 14 | | `new Error` | 14 |
| `new Promise` | 10 | | `new Promise` | 10 |
| `JSON.parse` | 7 | | `JSON.parse` | 7 |
@ -42,6 +42,10 @@ Notes:
- `Effect.runPromise` is expected at external Promise compatibility - `Effect.runPromise` is expected at external Promise compatibility
boundaries, but each match should still be audited for avoidable internal boundaries, but each match should still be audited for avoidable internal
runtime ownership. runtime ownership.
- The `Effect.runPromise`, `Map<`, and `new Map` counts increased in this
snapshot because the FlowManager slice added focused service tests and
Promise compatibility facades while removing the service's internal mutable
object state.
## Loop Passes ## Loop Passes
@ -215,6 +219,37 @@ Notes:
- `cd ts && bun run test` - `cd ts && bun run test`
- `git diff --check` - `git diff --check`
### 2026-06-02: FlowManager Ref-Backed State Slice
- Status: migrated and root-verified.
- Completed:
- `ts/packages/flow/src/flow-manager/service.ts` now exposes a typed
`FlowManagerService` instead of `AsyncProcessorRuntime & Record<string,
any>`.
- Runtime state now lives in
`SynchronizedRef<FlowManagerServiceState>` with `flows`, `blueprints`, the
request consumer, response producer, and config request client.
- Flow operations now have Effect-returning handlers with Promise facades
only on exported compatibility methods.
- Blueprint config loading now narrows runtime values before constructing
`Blueprint` records, replacing the prior `parsed as Blueprint` shortcut.
- `start-flow` and `stop-flow` mutate the flow map through
`SynchronizedRef.modifyEffect`, making duplicate checks and map updates
atomic.
- The consume loop now uses `Effect.whileLoop`; the remaining
`consumer.receive(2000)` call is a pubsub boundary for this service.
- New flow-manager tests cover tagged errors, ref-backed flow mutation,
config push/delete requests, blueprint narrowing, duplicate concurrent
starts, and message-level flow-error responses.
- Verification:
- `bun run --cwd ts/packages/flow test -- src/__tests__/flow-manager-service.test.ts`
- `bun run --cwd ts/packages/flow build`
- `bun run --cwd ts/packages/flow test`
- `cd ts && bun run check`
- `cd ts && bun run build`
- `cd ts && bun run test`
- `git diff --check`
## Subagent Findings To Preserve ## Subagent Findings To Preserve
- MCP/workbench: - MCP/workbench:
@ -224,13 +259,12 @@ Notes:
the client API is less Promise-first. the client API is less Promise-first.
- MCP env is now Config-backed; continue that policy for future MCP settings. - MCP env is now Config-backed; continue that policy for future MCP settings.
- Flow stateful services: - Flow stateful services:
- Config service and KnowledgeCore service ref-backed state are complete. - Config service, KnowledgeCore service, and FlowManager ref-backed state
Librarian and flow-manager now have native Effect module startup are complete. Librarian now has native Effect module startup
(`NodeRuntime.runMain` with `ManagedRuntime` compatibility facades), but (`NodeRuntime.runMain` with a `ManagedRuntime` compatibility facade), but
they still have mutable poller service objects. These remain good it still has a mutable poller service object. It remains a good candidate
candidates for `Context` services, scoped layers, for `Context` services, scoped layers, `Ref`/`SynchronizedRef`,
`Ref`/`SynchronizedRef`, `Schedule`, and managed `Schedule`, and managed persistence.
persistence.
- Persistence IO should move toward `FileSystem` or `KeyValueStore` where - Persistence IO should move toward `FileSystem` or `KeyValueStore` where
the installed beta has the needed provider surface. the installed beta has the needed provider surface.
- Base messaging/processors: - Base messaging/processors:
@ -256,11 +290,10 @@ Notes:
## Ranked Findings ## Ranked Findings
### P0: Continue Stateful Flow Services To Scoped Effect Services ### P0: Migrate Librarian Stateful Service To Scoped Effect Service
- TrustGraph evidence: - TrustGraph evidence:
- `ts/packages/flow/src/librarian/service.ts` - `ts/packages/flow/src/librarian/service.ts`
- `ts/packages/flow/src/flow-manager/service.ts`
- Effect primitives: - Effect primitives:
- `Context`, `Layer.scoped`, `Ref`, `SynchronizedRef`, `Schedule`, - `Context`, `Layer.scoped`, `Ref`, `SynchronizedRef`, `Schedule`,
`Effect.addFinalizer`, `Config`, `Schema`, `FileSystem`, `Effect.addFinalizer`, `Config`, `Schema`, `FileSystem`,

View file

@ -0,0 +1,231 @@
import {Effect, SynchronizedRef} from "effect";
import {describe, expect, it} from "vitest";
import {
topics,
type BackendConsumer,
type BackendProducer,
type ConfigRequest,
type ConfigResponse,
type CreateConsumerOptions,
type CreateProducerOptions,
type FlowRequest,
type FlowResponse,
type Message,
type PubSubBackend,
type RequestResponse,
} from "@trustgraph/base";
import {FlowManagerError, makeFlowManagerService} from "../flow-manager/service.js";
class NoopPubSub implements PubSubBackend {
readonly sentByTopic = new Map<string, Array<unknown>>();
async createProducer<T>(options: CreateProducerOptions<T>): Promise<BackendProducer<T>> {
return {
send: async (message) => {
const sent = this.sentByTopic.get(options.topic) ?? [];
sent.push(message);
this.sentByTopic.set(options.topic, sent);
},
flush: async () => undefined,
close: async () => undefined,
};
}
async createConsumer<T>(_options: CreateConsumerOptions): Promise<BackendConsumer<T>> {
return {
receive: async () => null,
acknowledge: async () => undefined,
negativeAcknowledge: async () => undefined,
unsubscribe: async () => undefined,
close: async () => undefined,
};
}
async close(): Promise<void> {}
}
class RecordingConfigClient implements RequestResponse<ConfigRequest, ConfigResponse> {
readonly requests: Array<ConfigRequest> = [];
constructor(
private readonly blueprints: Array<{readonly key: string; readonly value: unknown}> = [],
private readonly flows: Array<{readonly key: string; readonly value: unknown}> = [],
private readonly legacyFlows: Array<{readonly key: string; readonly value: unknown}> = [],
) {}
async start(): Promise<void> {}
async stop(): Promise<void> {}
async request(request: ConfigRequest): Promise<ConfigResponse> {
this.requests.push(request);
if (request.operation !== "getvalues") return {};
if (request.type === "flow-blueprint") {
return {values: this.blueprints};
}
if (request.type === "flow") {
return {values: this.flows};
}
if (request.type === "flows") {
return {values: this.legacyFlows};
}
return {values: []};
}
}
const makeService = (backend: PubSubBackend = new NoopPubSub()) =>
makeFlowManagerService({
id: "flow-manager-test",
manageProcessSignals: false,
pubsub: backend,
});
const seedConfigClient = async (
service: ReturnType<typeof makeFlowManagerService>,
configClient: RecordingConfigClient,
) =>
Effect.runPromise(
SynchronizedRef.update(service.state, (state) => ({
...state,
configClient,
})),
);
const seedResponseProducer = async (
backend: NoopPubSub,
service: ReturnType<typeof makeFlowManagerService>,
) => {
const responseProducer = await backend.createProducer<FlowResponse>({
topic: topics.flowResponse,
});
await Effect.runPromise(
SynchronizedRef.update(service.state, (state) => ({
...state,
responseProducer,
})),
);
};
describe("FlowManagerService operations", () => {
it("uses tagged errors for invalid flow mutations", async () => {
const service = makeService();
const startError = await service.handleStartFlow({operation: "start-flow"})
.catch((caught: unknown) => caught);
const stopError = await service.handleStopFlow({operation: "stop-flow"})
.catch((caught: unknown) => caught);
expect(startError).toBeInstanceOf(FlowManagerError);
expect(startError).toMatchObject({_tag: "FlowManagerError", operation: "start-flow"});
expect(stopError).toBeInstanceOf(FlowManagerError);
expect(stopError).toMatchObject({_tag: "FlowManagerError", operation: "stop-flow"});
});
it("mutates flow state through the ref and pushes config updates", async () => {
const configClient = new RecordingConfigClient();
const service = makeService();
await seedConfigClient(service, configClient);
await service.handleStartFlow({
operation: "start-flow",
"flow-id": "flow-a",
description: "alpha",
parameters: {limit: 3},
});
let state = await Effect.runPromise(SynchronizedRef.get(service.state));
expect(state.flows.get("flow-a")).toMatchObject({
id: "flow-a",
blueprintName: "default",
description: "alpha",
parameters: {limit: 3},
status: "running",
});
await service.handleStopFlow({
operation: "stop-flow",
"flow-id": "flow-a",
});
state = await Effect.runPromise(SynchronizedRef.get(service.state));
expect(state.flows.has("flow-a")).toBe(false);
expect(configClient.requests.map((request) => ({
operation: request.operation,
keys: request.keys,
}))).toEqual([
{operation: "put", keys: ["flows"]},
{operation: "put", keys: ["flow"]},
{operation: "delete", keys: ["flows", "flow-a"]},
{operation: "delete", keys: ["flow", "flow-a"]},
{operation: "put", keys: ["flows"]},
{operation: "put", keys: ["flow"]},
]);
});
it("decodes valid blueprint config and skips invalid blueprint records", async () => {
const configClient = new RecordingConfigClient([
{
key: "custom",
value: "{\"description\":\"Custom\",\"topics\":{\"input\":\"topic.in\"},\"extra\":true}",
},
{
key: "broken",
value: "{\"description\":\"Missing topics\"}",
},
]);
const service = makeService();
await seedConfigClient(service, configClient);
await service.refreshBlueprintsFromConfig();
const state = await Effect.runPromise(SynchronizedRef.get(service.state));
expect(state.blueprints.get("custom")).toMatchObject({
description: "Custom",
topics: {input: "topic.in"},
extra: true,
});
expect(state.blueprints.has("broken")).toBe(false);
expect(state.blueprints.has("default")).toBe(true);
});
it("serializes duplicate starts through the ref-backed map", async () => {
const configClient = new RecordingConfigClient();
const service = makeService();
await seedConfigClient(service, configClient);
const results = await Promise.allSettled([
service.handleStartFlow({operation: "start-flow", "flow-id": "flow-a"}),
service.handleStartFlow({operation: "start-flow", "flow-id": "flow-a"}),
]);
const state = await Effect.runPromise(SynchronizedRef.get(service.state));
expect(state.flows.get("flow-a")).toMatchObject({id: "flow-a"});
expect(results.filter((result) => result.status === "fulfilled")).toHaveLength(1);
expect(results.filter((result) => result.status === "rejected")).toHaveLength(1);
});
it("sends flow-error responses from handleMessageEffect", async () => {
const backend = new NoopPubSub();
const configClient = new RecordingConfigClient();
const service = makeService(backend);
await seedConfigClient(service, configClient);
await seedResponseProducer(backend, service);
const message: Message<FlowRequest> = {
value: () => ({operation: "start-flow"}),
properties: () => ({id: "request-1"}),
};
await Effect.runPromise(service.handleMessageEffect(message));
expect(backend.sentByTopic.get(topics.flowResponse)).toEqual([
{
error: {
type: "flow-error",
message: "Missing flow-id",
},
},
]);
});
});

File diff suppressed because it is too large Load diff