diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 7a93d62d..d0465403 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,18 +12,20 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Effect AI -adapter and native request/response PubSub slices: +Current signal counts from `ts/packages` after the 2026-06-02 dispatcher +Effect collections slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 169 | +| `Effect.runPromise` | 175 | | `Effect.runPromiseWith` | 0 | | `Effect.cached` | 0 | -| `Layer.succeed` | 12 | -| `Map<` | 37 | +| `Layer.succeed` | 13 | +| `Map<` | 86 | | `WebSocket` | 72 | -| `new Map` | 59 | +| `new Map` | 56 | +| `new Set` | 15 | +| `Set<` | 9 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 17 | @@ -31,7 +33,7 @@ adapter and native request/response PubSub slices: | `new Error` | 7 | | `new Promise` | 9 | | `JSON.parse` | 4 | -| `localStorage` | 11 | +| `localStorage` | 9 | | `JSON.stringify` | 8 | | `setTimeout` | 3 | | `process.env` | 3 | @@ -45,9 +47,12 @@ Notes: - `Effect.runPromise` is expected at external Promise compatibility boundaries, but each match should still be audited for avoidable internal runtime ownership. -- The `Map<` and `new Map` counts increased in this snapshot because the - Librarian slice introduced explicit ref-backed state types and clone helpers - while removing the service object's direct mutable maps/handles. +- The dispatcher Effect collections slice removed native `Map`/`Set` from the + gateway service registries, streaming membership set, and scoped requestor + cache. Remaining broad `Map`/`Set` matches include tests/fakes, WeakMap + compatibility caches, short-lived pure traversal collections, and larger + ref-backed service state that still needs focused `HashMap`/`MutableHashMap` + cleanup. - The `Effect.runPromise` and `WebSocket` counts dropped in this snapshot because `EffectRpcClient` now owns its RPC/socket layer with `ManagedRuntime` and uses Effect's WebSocket constructor layer. @@ -260,6 +265,29 @@ Notes: - `bun run --cwd ts/packages/flow build` - `bun run --cwd ts check:tsgo` +### 2026-06-02: Gateway Dispatcher Effect Collections Slice + +- Status: migrated and package-verified. +- Completed: + - `ts/packages/flow/src/gateway/dispatch/manager.ts` now stores the + flow/global service registries in `effect/HashMap` instead of native + `ReadonlyMap`, while explicit entry arrays preserve the public service-name + ordering. + - Streaming service membership now uses `effect/HashSet` instead of native + `Set`. + - The scoped requestor cache now stores + `HashMap>` in the existing + `SynchronizedRef`, replacing `new Map` cloning with immutable + `HashMap.set`. + - Cache hits and service topic lookups now use `HashMap.get` plus + `effect/Option`, and ref update tuples use `effect/Tuple.make` instead of + `as const` assertions. + - Gateway dispatcher tests now cover concurrent same-key dispatches so the + cache still creates exactly one scoped producer/consumer pair. +- Verification: + - `bun run --cwd ts/packages/flow test -- src/__tests__/gateway-dispatcher.test.ts` + - `cd ts && bun run check:tsgo` + ### 2026-06-02: Strict Base, CLI, MCP, And tsgo Slice - Status: migrated, root-verified, committed, and pushed. @@ -1720,12 +1748,13 @@ Notes: broker receive/error payload boundaries remain numeric milliseconds. - Qdrant graph/doc known-collection caches now use `MutableHashSet`. Short-lived local traversal sets remain no-ops. + - Gateway dispatcher static service registries, streaming membership, and + scoped requestor cache now use Effect `HashMap`/`HashSet`. - FlowManager and sibling service `() => Effect.gen(...)` factories remain a broad mechanical `Effect.fn` / `Effect.fnUntraced` cleanup, best handled after Duration and small collection slices. - Long-lived `Map` / `Set` state in ref-backed services can move toward - Effect collections later; static lookup tables and local pure traversal - maps/sets remain no-ops. + Effect collections later; local pure traversal maps/sets remain no-ops. ## Ranked Findings diff --git a/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts b/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts index 454dfd63..7e326ed7 100644 --- a/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts +++ b/ts/packages/flow/src/__tests__/gateway-dispatcher.test.ts @@ -244,6 +244,27 @@ describe("gateway dispatcher manager", () => { expect(backend.closeCount).toBe(0); }); + it("serializes concurrent requestor creation for the same service", async () => { + const backend = new DispatchBackend(); + const manager = makeDispatcherManager({ + port: 0, + metricsPort: 0, + pubsub: backend, + }); + + await manager.start(); + const [first, second] = await Promise.all([ + manager.dispatchGlobalService("config", { operation: "get" }), + manager.dispatchGlobalService("config", { operation: "list" }), + ]); + await manager.stop(); + + expect(first).toEqual({ ok: true, echo: { operation: "get" } }); + expect(second).toEqual({ ok: true, echo: { operation: "list" } }); + expect(backend.producerOptions.filter((options) => options.topic === "tg.flow.config-request")).toHaveLength(1); + expect(backend.consumerOptions.filter((options) => options.topic === "tg.flow.config-response")).toHaveLength(1); + }); + it("does not start requestors when request serialization fails", async () => { const backend = new DispatchBackend(); const manager = makeDispatcherManager({ diff --git a/ts/packages/flow/src/gateway/dispatch/manager.ts b/ts/packages/flow/src/gateway/dispatch/manager.ts index e31339b0..9b6aee83 100644 --- a/ts/packages/flow/src/gateway/dispatch/manager.ts +++ b/ts/packages/flow/src/gateway/dispatch/manager.ts @@ -8,7 +8,7 @@ * Python reference: trustgraph-flow/trustgraph/gateway/dispatch/manager.py */ -import { Clock, Effect, Exit, Random, Scope, SynchronizedRef } from "effect"; +import { Clock, Effect, Exit, HashMap, HashSet, Option, Random, Scope, SynchronizedRef, Tuple } from "effect"; import { loadMessagingRuntimeConfig, makeNatsBackend, @@ -51,36 +51,45 @@ export type DispatcherStreamError = * These are resolved within a specific flow's interface definitions. * Topic pattern: tg.flow.-request / tg.flow.-response */ -const FLOW_SERVICES: ReadonlyMap = new Map([ - ["agent", { request: "agent-request", response: "agent-response" }], - ["text-completion", { request: "text-completion-request", response: "text-completion-response" }], - ["prompt", { request: "prompt-request", response: "prompt-response" }], - ["graph-rag", { request: "graph-rag-request", response: "graph-rag-response" }], - ["document-rag", { request: "document-rag-request", response: "document-rag-response" }], - ["embeddings", { request: "embeddings-request", response: "embeddings-response" }], - ["graph-embeddings", { request: "graph-embeddings-request", response: "graph-embeddings-response" }], - ["document-embeddings", { request: "doc-embeddings-request", response: "doc-embeddings-response" }], - ["triples", { request: "triples-request", response: "triples-response" }], - ["mcp-tool", { request: "mcp-tool-request", response: "mcp-tool-response" }], -]); +interface ServiceTopics { + readonly request: string; + readonly response: string; +} + +const FLOW_SERVICE_ENTRIES: ReadonlyArray = [ + ["agent", { request: "agent-request", response: "agent-response" }], + ["text-completion", { request: "text-completion-request", response: "text-completion-response" }], + ["prompt", { request: "prompt-request", response: "prompt-response" }], + ["graph-rag", { request: "graph-rag-request", response: "graph-rag-response" }], + ["document-rag", { request: "document-rag-request", response: "document-rag-response" }], + ["embeddings", { request: "embeddings-request", response: "embeddings-response" }], + ["graph-embeddings", { request: "graph-embeddings-request", response: "graph-embeddings-response" }], + ["document-embeddings", { request: "doc-embeddings-request", response: "doc-embeddings-response" }], + ["triples", { request: "triples-request", response: "triples-response" }], + ["mcp-tool", { request: "mcp-tool-request", response: "mcp-tool-response" }], +]; + +const FLOW_SERVICES: HashMap.HashMap = HashMap.fromIterable(FLOW_SERVICE_ENTRIES); /** * Global services (not flow-scoped). * These always use fixed topics regardless of which flow is active. */ -const GLOBAL_SERVICES: ReadonlyMap = new Map([ - ["config", { request: "config-request", response: "config-response" }], - ["flow", { request: "flow-request", response: "flow-response" }], - ["librarian", { request: "librarian-request", response: "librarian-response" }], - ["knowledge", { request: "knowledge-request", response: "knowledge-response" }], - ["collection-management", { request: "collection-management-request", response: "collection-management-response" }], -]); +const GLOBAL_SERVICE_ENTRIES: ReadonlyArray = [ + ["config", { request: "config-request", response: "config-response" }], + ["flow", { request: "flow-request", response: "flow-response" }], + ["librarian", { request: "librarian-request", response: "librarian-response" }], + ["knowledge", { request: "knowledge-request", response: "knowledge-response" }], + ["collection-management", { request: "collection-management-request", response: "collection-management-response" }], +]; + +const GLOBAL_SERVICES: HashMap.HashMap = HashMap.fromIterable(GLOBAL_SERVICE_ENTRIES); /** * Services that support streaming responses (multiple messages per request). * The completion flag is determined by checking for end-of-stream markers. */ -const STREAMING_SERVICES = new Set([ +const STREAMING_SERVICES = HashSet.make( "agent", "text-completion", "graph-rag", @@ -88,7 +97,7 @@ const STREAMING_SERVICES = new Set([ "triples", "knowledge", "librarian", -]); +); function topicName(name: string): string { return `tg.flow.${name}`; @@ -138,15 +147,15 @@ export interface DispatcherManager { } export const dispatcherManagerFlowServiceNames = (): readonly string[] => [ - ...FLOW_SERVICES.keys(), + ...FLOW_SERVICE_ENTRIES.map(([name]) => name), ]; export const dispatcherManagerGlobalServiceNames = (): readonly string[] => [ - ...GLOBAL_SERVICES.keys(), + ...GLOBAL_SERVICE_ENTRIES.map(([name]) => name), ]; export const dispatcherManagerIsStreamingService = (kind: string): boolean => - STREAMING_SERVICES.has(kind); + HashSet.has(STREAMING_SERVICES, kind); export const dispatcherManagerIsCompleteResponse = (response: unknown): boolean => { if (typeof response !== "object" || response === null) return true; @@ -164,7 +173,7 @@ export const dispatcherManagerIsCompleteResponse = (response: unknown): boolean ); }; -type RequestorMap = Map>; +type RequestorMap = HashMap.HashMap>; interface DispatcherRuntime { readonly scope: Scope.Closeable; @@ -191,7 +200,9 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager ) ), ); - const requestors = yield* SynchronizedRef.make(new Map()); + const requestors = yield* SynchronizedRef.make( + HashMap.empty>(), + ); return { scope, requestors, @@ -246,60 +257,49 @@ export function makeDispatcherManager(config: GatewayConfig): DispatcherManager ) { const current = yield* ensureRuntimeEffect(); - return yield* SynchronizedRef.modifyEffect(current.requestors, (requestors) => { - const cached = requestors.get(key); - if (cached !== undefined) { - return Effect.succeed([cached, requestors] as const); - } - - return current.factory.make({ - requestTopic, - responseTopic, - subscription: `gateway-${key}`, - }).pipe( - Scope.provide(current.scope), - Effect.map((requestor) => { - const next = new Map(requestors); - next.set(key, requestor); - return [requestor, next] as const; - }), - ); - }); + return yield* SynchronizedRef.modifyEffect(current.requestors, (requestors) => + Option.match(HashMap.get(requestors, key), { + onNone: () => + current.factory.make({ + requestTopic, + responseTopic, + subscription: `gateway-${key}`, + }).pipe( + Scope.provide(current.scope), + Effect.map((requestor) => Tuple.make(requestor, HashMap.set(requestors, key, requestor))), + ), + onSome: (cached) => Effect.succeed(Tuple.make(cached, requestors)), + }) + ); }); const resolveGlobalTopics = ( kind: string, - ): { requestTopic: string; responseTopic: string } => { - const entry = GLOBAL_SERVICES.get(kind); - if (entry !== undefined) { - return { + ): { requestTopic: string; responseTopic: string } => + Option.match(HashMap.get(GLOBAL_SERVICES, kind), { + onNone: () => ({ + requestTopic: topicName(`${kind}-request`), + responseTopic: topicName(`${kind}-response`), + }), + onSome: (entry) => ({ requestTopic: topicName(entry.request), responseTopic: topicName(entry.response), - }; - } - // Fallback: derive from kind name directly - return { - requestTopic: topicName(`${kind}-request`), - responseTopic: topicName(`${kind}-response`), - }; - }; + }), + }); const resolveFlowTopics = ( kind: string, - ): { requestTopic: string; responseTopic: string } => { - const entry = FLOW_SERVICES.get(kind); - if (entry !== undefined) { - return { + ): { requestTopic: string; responseTopic: string } => + Option.match(HashMap.get(FLOW_SERVICES, kind), { + onNone: () => ({ + requestTopic: topicName(`${kind}-request`), + responseTopic: topicName(`${kind}-response`), + }), + onSome: (entry) => ({ requestTopic: topicName(entry.request), responseTopic: topicName(entry.response), - }; - } - // Fallback: derive from kind name directly - return { - requestTopic: topicName(`${kind}-request`), - responseTopic: topicName(`${kind}-response`), - }; - }; + }), + }); // ---------- Global service dispatch ----------