From df0a0c068eec41d3d4b55a6d883653aadd05bb15 Mon Sep 17 00:00:00 2001 From: elpresidank Date: Tue, 2 Jun 2026 03:27:15 -0500 Subject: [PATCH] Remove legacy subscriber fanout --- ts/EFFECT_NATIVE_REWRITE_AUDIT.md | 49 ++++- ts/packages/base/src/messaging/index.ts | 1 - ts/packages/base/src/messaging/subscriber.ts | 211 ------------------- 3 files changed, 43 insertions(+), 218 deletions(-) delete mode 100644 ts/packages/base/src/messaging/subscriber.ts diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 83ce30e2..95da8916 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,18 +12,18 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 Base -producer/requestor spec accessor slice: +Current signal counts from `ts/packages` after the 2026-06-02 native PubSub +boundary slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 168 | -| `Map<` | 84 | +| `Effect.runPromise` | 165 | +| `Map<` | 82 | | `WebSocket` | 62 | -| `new Map` | 62 | +| `new Map` | 60 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | -| `receive(` | 18 | +| `receive(` | 17 | | `while (` | 9 | | `new Error` | 8 | | `new Promise` | 10 | @@ -88,6 +88,11 @@ Notes: migrated flow service producer/requestor lookups off caller-chosen generic string calls. Spec object handles are scoped per `Flow` through WeakMaps and finalizers delete only the handle they registered. +- The native PubSub boundary slice removed the unused legacy + `messaging/subscriber.ts` async queue/fanout implementation. Effect's native + `PubSub` is an in-process hub and does not replace the broker-backed + `PubSubBackend`/NATS boundary, but it should be preferred for future + in-process broadcast/fanout needs. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -688,6 +693,31 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Native PubSub Boundary Slice + +- Status: migrated and package-verified. +- Completed: + - Confirmed Effect's native `PubSub` module is an in-process asynchronous hub + with scoped subscriptions, not a NATS/Pulsar-compatible broker boundary. + - Kept TrustGraph's `PubSubBackend` and `PubSub` service as the broker + adapter layer because it owns topics, broker producers/consumers, + acknowledgement, schema codecs, and backend lifecycle. + - Removed the unused legacy `ts/packages/base/src/messaging/subscriber.ts` + implementation, which duplicated in-process async queue/fanout behavior. + - Removed the corresponding `makeAsyncQueue`, `makeSubscriber`, + `Subscriber`, and `AsyncQueue` barrel exports from + `ts/packages/base/src/messaging/index.ts`. +- Remaining: + - Future in-process fanout or request-streaming code should use + `effect/PubSub`, `Queue`, `Stream.fromPubSub`, or `Channel.fromPubSub` + rather than adding another local async queue implementation. + - Do not replace `PubSubBackend` with `effect/PubSub` unless the code path is + explicitly local-only and does not need broker semantics. +- Verification: + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -712,6 +742,9 @@ Notes: - Subscriber queues/maps and dynamic flow state should continue moving toward `Queue`, `Deferred`, `SynchronizedRef`, `Schedule`, and scoped layers. + - The legacy `messaging/subscriber.ts` async queue/fanout implementation is + removed. Use native `effect/PubSub` for future in-process fanout, while + keeping `PubSubBackend` for broker-backed messaging. - Existing constructor shims preserve callable-plus-newable public exports; removing them needs a public API split or real class redesign. - Typed string registries in `Flow` now have Schema-backed parameter specs @@ -813,6 +846,10 @@ Do not flag these as rewrite blockers without additional proof: - Base `AsyncProcessor`, `Flow`, and `FlowProcessor` callable-plus-newable export assertions are compatibility boundaries unless the public constructor API is intentionally redesigned. +- TrustGraph `PubSubBackend` / backend `PubSub` service is a broker adapter + boundary for NATS/Pulsar-style topics, acknowledgement, schema codecs, and + backend lifecycle. Effect's native `PubSub` can replace in-process fanout + helpers, but not the distributed broker abstraction by itself. ## Acceptance For Final Loop Completion diff --git a/ts/packages/base/src/messaging/index.ts b/ts/packages/base/src/messaging/index.ts index 8cbef12e..68ced507 100644 --- a/ts/packages/base/src/messaging/index.ts +++ b/ts/packages/base/src/messaging/index.ts @@ -1,6 +1,5 @@ export { makeProducer, type Producer } from "./producer.js"; export { makeConsumer, type Consumer, type MessageHandler, type FlowContext, type ConsumerOptions } from "./consumer.js"; -export { makeAsyncQueue, makeSubscriber, type Subscriber, type AsyncQueue } from "./subscriber.js"; export { makeRequestResponse, type RequestResponse, type RequestResponseOptions } from "./request-response.js"; export { ConsumerFactory, diff --git a/ts/packages/base/src/messaging/subscriber.ts b/ts/packages/base/src/messaging/subscriber.ts deleted file mode 100644 index 8287d0ff..00000000 --- a/ts/packages/base/src/messaging/subscriber.ts +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Fan-out subscriber: routes responses to waiting callers by request ID. - * - * Python reference: trustgraph-base/trustgraph/base/subscriber.py - */ - -import type { PubSubBackend, BackendConsumer } from "../backend/types.js"; -import { Duration, Effect, Fiber } from "effect"; -import { messagingDeliveryError, messagingLifecycleError, messagingTimeoutError } from "../errors.js"; - -type Resolver = { - queue: AsyncQueue; -}; - -/** - * Simple async queue for inter-task communication (replaces asyncio.Queue). - */ -export interface AsyncQueue { - readonly push: (item: T) => void; - readonly pop: (timeoutMs?: number) => Promise; - readonly length: number; -} - -export function makeAsyncQueue(): AsyncQueue { - const buffer: T[] = []; - const waiters: Array<(value: T) => void> = []; - - return { - push: (item) => { - const waiter = waiters.shift(); - if (waiter !== undefined) { - waiter(item); - } else { - buffer.push(item); - } - }, - pop: (timeoutMs) => { - const buffered = buffer.shift(); - if (buffered !== undefined) return Promise.resolve(buffered); - - const take = Effect.callback((resume) => { - const waiter = (value: T) => { - resume(Effect.succeed(value)); - }; - - waiters.push(waiter); - - return Effect.sync(() => { - const idx = waiters.indexOf(waiter); - if (idx !== -1) waiters.splice(idx, 1); - }); - }); - - return Effect.runPromise( - timeoutMs === undefined - ? take - : take.pipe( - Effect.timeout(Duration.millis(timeoutMs)), - Effect.catchTag("TimeoutError", () => - Effect.fail(messagingTimeoutError("queue.pop", timeoutMs)), - ), - ), - ); - }, - get length() { - return buffer.length; - }, - }; -} - -export interface Subscriber { - readonly start: () => Promise; - readonly stop: () => Promise; - readonly subscribe: (id: string) => AsyncQueue; - readonly subscribeAll: (id: string) => AsyncQueue; - readonly unsubscribe: (id: string) => void; - readonly unsubscribeAll: (id: string) => void; -} - -export function makeSubscriber( - pubsub: PubSubBackend, - topic: string, - subscription: string, -): Subscriber { - let backend: BackendConsumer | null = null; - let running = false; - let fiber: Fiber.Fiber | null = null; - - // ID-specific subscriptions (request/response correlation) - const idSubscribers = new Map>(); - // Wildcard subscribers (receive all messages) - const allSubscribers = new Map>(); - - const dispatchLoop = Effect.fn("Subscriber.dispatchLoop")(function* () { - let consecutiveErrors = 0; - const dispatchOnce = Effect.fn("Subscriber.dispatchOnce")(function* () { - const currentBackend = backend; - if (currentBackend === null) { - return yield* messagingLifecycleError( - `${topic}:${subscription}`, - "dispatch", - "Subscriber backend not started", - ); - } - - const msg = yield* Effect.tryPromise({ - try: () => currentBackend.receive(2000), - catch: (error) => messagingDeliveryError(topic, "receive", error), - }); - if (msg === null) return; - - consecutiveErrors = 0; - - const props = msg.properties(); - const id = props.id; - const value = msg.value(); - - // Route to ID-specific subscriber - if (id !== undefined && id.length > 0) { - const sub = idSubscribers.get(id); - if (sub !== undefined) { - sub.queue.push(value); - } - } - - // Broadcast to all-subscribers - for (const sub of allSubscribers.values()) { - sub.queue.push(value); - } - - yield* Effect.tryPromise({ - try: () => currentBackend.acknowledge(msg), - catch: (error) => messagingDeliveryError(topic, "acknowledge", error), - }); - }); - - yield* Effect.whileLoop({ - while: () => running, - body: () => - dispatchOnce().pipe( - Effect.catch((error) => { - if (!running) return Effect.void; - consecutiveErrors++; - const logEffect = consecutiveErrors <= 3 - ? Effect.logError("[Subscriber] Error", { error }) - : consecutiveErrors === 4 - ? Effect.logError("[Subscriber] Suppressing further errors (will retry with backoff)", { error }) - : Effect.void; - const delay = Math.min(1000 * 2 ** (consecutiveErrors - 1), 10_000); - return logEffect.pipe(Effect.flatMap(() => Effect.sleep(Duration.millis(delay)))); - }), - ), - step: () => undefined, - }); - }); - - return { - start: () => - Effect.runPromise( - Effect.gen(function* () { - backend = yield* Effect.tryPromise({ - try: () => - pubsub.createConsumer({ - topic, - subscription, - }), - catch: (error) => - messagingLifecycleError(`${topic}:${subscription}`, "create-consumer", error), - }); - running = true; - fiber = yield* dispatchLoop().pipe(Effect.forkDetach); - }), - ), - stop: () => - Effect.runPromise( - Effect.gen(function* () { - running = false; - const activeFiber = fiber; - fiber = null; - if (activeFiber !== null) { - yield* Fiber.interrupt(activeFiber); - } - const currentBackend = backend; - if (currentBackend !== null) { - backend = null; - yield* Effect.tryPromise({ - try: () => currentBackend.close(), - catch: (error) => - messagingLifecycleError(`${topic}:${subscription}`, "close-consumer", error), - }); - } - }), - ), - subscribe: (id) => { - const queue = makeAsyncQueue(); - idSubscribers.set(id, { queue }); - return queue; - }, - subscribeAll: (id) => { - const queue = makeAsyncQueue(); - allSubscribers.set(id, { queue }); - return queue; - }, - unsubscribe: (id) => { - idSubscribers.delete(id); - }, - unsubscribeAll: (id) => { - allSubscribers.delete(id); - }, - }; -}