diff --git a/ts/packages/flow/src/config/service.ts b/ts/packages/flow/src/config/service.ts index 5ac9174d..b673147e 100644 --- a/ts/packages/flow/src/config/service.ts +++ b/ts/packages/flow/src/config/service.ts @@ -706,6 +706,7 @@ const runConfigServiceEffect = Effect.fn("ConfigService.run")(function* ( topic: topics.configRequest, subscription: `${service.config.id}-config-request`, schema: ConfigRequestSchema, + initialPosition: "earliest", }).pipe( Effect.mapError((cause) => configServiceError("consumer", cause)), ); diff --git a/ts/packages/flow/src/cores/service.ts b/ts/packages/flow/src/cores/service.ts index b355a56c..44f722f8 100644 --- a/ts/packages/flow/src/cores/service.ts +++ b/ts/packages/flow/src/cores/service.ts @@ -364,6 +364,7 @@ const runKnowledgeCoreServiceEffect = Effect.fn("KnowledgeCoreService.run")(func topic: topics.knowledgeRequest, subscription: `${service.config.id}-knowledge-request`, schema: KnowledgeRequestSchema, + initialPosition: "earliest", }).pipe( Effect.mapError((cause) => knowledgeCoreServiceError("consumer", cause)), ); diff --git a/ts/packages/flow/src/flow-manager/service.ts b/ts/packages/flow/src/flow-manager/service.ts index 6187dd67..6ef5e5d7 100644 --- a/ts/packages/flow/src/flow-manager/service.ts +++ b/ts/packages/flow/src/flow-manager/service.ts @@ -703,6 +703,7 @@ const runFlowManagerServiceEffect = Effect.fn("FlowManager.runService")(function topic: topics.flowRequest, subscription: `${service.config.id}-flow-request`, schema: FlowRequestSchema, + initialPosition: "earliest", }).pipe( Effect.mapError((cause) => flowManagerError("consumer", cause)), ); diff --git a/ts/packages/flow/src/librarian/service.ts b/ts/packages/flow/src/librarian/service.ts index 27bcc2b7..8ff1a061 100644 --- a/ts/packages/flow/src/librarian/service.ts +++ b/ts/packages/flow/src/librarian/service.ts @@ -337,12 +337,14 @@ const runLibrarianServiceEffect = Effect.fn("LibrarianService.run")(function* ( const libConsumer = yield* service.pubsub.createConsumer({ topic: topics.librarianRequest, subscription: `${service.config.id}-librarian-request`, + initialPosition: "earliest", }).pipe( Effect.mapError((cause) => librarianServiceError("librarian-consumer", cause)), ); const colConsumer = yield* service.pubsub.createConsumer({ topic: topics.collectionManagementRequest, subscription: `${service.config.id}-collection-management-request`, + initialPosition: "earliest", }).pipe( Effect.mapError((cause) => librarianServiceError("collection-consumer", cause)), ); diff --git a/ts/scripts/test-pipeline.ts b/ts/scripts/test-pipeline.ts index 61256a2a..e61c471f 100644 --- a/ts/scripts/test-pipeline.ts +++ b/ts/scripts/test-pipeline.ts @@ -11,7 +11,7 @@ import { BunRuntime } from "@effect/platform-bun"; import * as BunHttpClient from "@effect/platform-bun/BunHttpClient"; -import { DispatchInput, makeEffectRpcClient } from "@trustgraph/client"; +import { DispatchInput, makeEffectRpcClient } from "../packages/client/src/index.js"; import { Config, Effect, Option as O, Schema as S } from "effect"; import { HttpClient, HttpClientRequest } from "effect/unstable/http";