diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 4dc8c889..d261ac5f 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,8 +12,8 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 text completion -provider stream helper slice: +Current signal counts from `ts/packages` after the 2026-06-02 request-response +queue stream slice: | Signal | Count | | --- | ---: | @@ -27,7 +27,7 @@ provider stream helper slice: | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 17 | -| `while (` | 3 | +| `while (` | 2 | | `new Error` | 8 | | `new Promise` | 10 | | `JSON.parse` | 4 | @@ -129,6 +129,11 @@ Notes: text-completion `iterator.next` match is the `toAsyncGenerator` compatibility adapter that exposes Effect streams through the public `AsyncGenerator` provider contract. +- The request-response queue stream slice replaced the Effectful + `waitForResponse` generator loop with `Stream.fromQueue`, + `Stream.filterMapEffect`, `Result`, and `Stream.runHead`, dropping the + remaining `while (` count from 3 to 2. The two remaining production `while` + hits are synchronous parsing/CLI traversal loops, not async polling loops. - `Record` and `throwLibrarianServiceError` are now clean in `ts/packages`. @@ -931,6 +936,32 @@ Notes: - `cd ts && bun run test` - `git diff --check` +### 2026-06-02: Request-Response Queue Stream Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/base/src/messaging/runtime.ts` now waits for accepted + request-response replies by converting the response `Queue` to a + `Stream.fromQueue`. + - Recipient filtering now uses `Stream.filterMapEffect` with `Result` to skip + partial responses until the recipient returns `true`. + - `Stream.runHead` replaces the prior `while (true)`/`Queue.take` loop and + preserves the existing timeout behavior around request-response calls. + - `ts/packages/base/src/__tests__/messaging-runtime.test.ts` now covers + recipient filtering across partial and final responses. +- Remaining: + - The two remaining production `while (` matches are `agent/react/parser.ts` + line-buffer parsing and `cli/src/commands/util.ts` Commander parent + traversal; neither is async polling or resource ownership. +- Verification: + - `bunx --bun vitest run src/__tests__/messaging-runtime.test.ts` + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/base test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + - `git diff --check` + ## Subagent Findings To Preserve - MCP/workbench: @@ -1073,6 +1104,9 @@ Do not flag these as rewrite blockers without additional proof: `HttpApi.make` when they are required or idiomatic for the Effect API. - Plain `Map` usage for local pure transformations, such as graph utility construction, unless the state is long-lived mutable service state. +- Plain synchronous loops for parsing or tree traversal are not Effect migration + blockers unless they hide async polling, resource ownership, or callback + scheduling. - JSON stringification in tests or wire-contract fixtures. Production JSON encode/decode should prefer schema codecs when the encoded form can be preserved. diff --git a/ts/packages/base/src/__tests__/messaging-runtime.test.ts b/ts/packages/base/src/__tests__/messaging-runtime.test.ts index baf02f85..2868e8f3 100644 --- a/ts/packages/base/src/__tests__/messaging-runtime.test.ts +++ b/ts/packages/base/src/__tests__/messaging-runtime.test.ts @@ -215,6 +215,53 @@ describe("Effect-native messaging runtime", () => { }), ); + it.effect( + "waits until the request recipient accepts a response", + Effect.fnUntraced(function* () { + const responseConsumer = new ScriptedConsumer(); + const backend = new RuntimeBackend( + responseConsumer, + (_message, properties) => { + const id = properties?.id ?? ""; + responseConsumer.push(createMessage("partial", { id })); + responseConsumer.push(createMessage("final", { id })); + }, + ); + const seen: Array = []; + + const response = yield* Effect.scoped( + Effect.gen(function* () { + const requestor = yield* makeEffectRequestResponseFromPubSub( + PubSub.fromBackend(backend), + { + ...defaultMessagingRuntimeConfig, + consumerReceiveTimeoutMs: 1, + }, + { + requestTopic: "tg.test.request", + responseTopic: "tg.test.response", + subscription: "sub", + }, + ); + const fiber = yield* requestor.request("request", { + timeoutMs: 250, + recipient: (candidate) => + Effect.sync(() => { + seen.push(candidate); + return candidate === "final"; + }), + }).pipe(Effect.forkChild); + yield* TestClock.adjust(Duration.millis(5)); + return yield* Fiber.join(fiber); + }), + ); + + expect(response).toBe("final"); + expect(seen).toEqual(["partial", "final"]); + expect(responseConsumer.acknowledged.length).toBe(2); + }), + ); + it.effect( "fails request-response calls with a typed timeout", Effect.fnUntraced(function* () { diff --git a/ts/packages/base/src/messaging/runtime.ts b/ts/packages/base/src/messaging/runtime.ts index ea2bb090..5ffbee5d 100644 --- a/ts/packages/base/src/messaging/runtime.ts +++ b/ts/packages/base/src/messaging/runtime.ts @@ -3,7 +3,7 @@ */ import { randomUUID } from "node:crypto"; -import { Context, Duration, Effect, Fiber, Layer, Queue, Scope } from "effect"; +import { Context, Duration, Effect, Fiber, Layer, Queue, Result, Scope, Stream } from "effect"; import * as O from "effect/Option"; import * as S from "effect/Schema"; import type { @@ -403,17 +403,27 @@ const waitForResponse = Effect.fn("waitForResponse")(function* ( queue: Queue.Queue, options: EffectRequestOptions | undefined, ) { - while (true) { - const response = yield* Queue.take(queue); - if (options?.recipient === undefined) { - return response; - } + const response = yield* Stream.fromQueue(queue).pipe( + Stream.filterMapEffect((candidate) => { + if (options?.recipient === undefined) { + return Effect.succeed(Result.succeed(candidate)); + } - const complete = yield* options.recipient(response); - if (complete) { - return response; - } - } + return options.recipient(candidate).pipe( + Effect.map((complete) => + complete + ? Result.succeed(candidate) + : Result.fail(undefined) + ), + ); + }), + Stream.runHead, + ); + + return yield* O.match(response, { + onNone: () => Effect.never, + onSome: Effect.succeed, + }); }); export const makeEffectRequestResponseFromPubSub = Effect.fn("makeEffectRequestResponseFromPubSub")(function* <