diff --git a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md index 6fcd7a73..a9ac57d8 100644 --- a/ts/EFFECT_NATIVE_REWRITE_AUDIT.md +++ b/ts/EFFECT_NATIVE_REWRITE_AUDIT.md @@ -12,22 +12,22 @@ Verified source roots: - Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4` - Installed Effect beta used by this workspace: `ts/node_modules/effect` -Current signal counts from `ts/packages` after the 2026-06-02 RAG and agent -requestor bridge slice: +Current signal counts from `ts/packages` after the 2026-06-02 KnowledgeCore +ref-backed state slice: | Signal | Count | | --- | ---: | -| `Effect.runPromise` | 198 | -| `Map<` | 65 | +| `Effect.runPromise` | 200 | +| `Map<` | 72 | | `WebSocket` | 51 | -| `new Map` | 47 | +| `new Map` | 53 | | `toPromiseRequestor` | 0 | | `makeAsyncProcessor` | 19 | | `receive(` | 18 | -| `while (` | 12 | +| `while (` | 11 | | `new Error` | 14 | | `new Promise` | 10 | -| `JSON.parse` | 8 | +| `JSON.parse` | 7 | | `localStorage` | 9 | | `JSON.stringify` | 6 | | `setTimeout` | 4 | @@ -135,7 +135,7 @@ Notes: ### 2026-06-02: RAG And Agent Requestor Bridge Slice -- Status: migrated, root-verified, and ready to commit. +- Status: migrated, root-verified, committed, and pushed. - Completed: - `ts/packages/flow/src/retrieval/graph-rag.ts` and `ts/packages/flow/src/retrieval/document-rag.ts` now accept @@ -163,6 +163,39 @@ Notes: - `cd ts && bun run build` - `cd ts && bun run test` +### 2026-06-02: KnowledgeCore Ref-Backed State Slice + +- Status: migrated and root-verified. +- Completed: + - `ts/packages/flow/src/cores/service.ts` now exposes a typed + `KnowledgeCoreService` instead of `AsyncProcessorRuntime & Record`. + - Runtime state now lives in + `SynchronizedRef` with `kgCores`, `deCores`, + the request consumer, and response producer. + - Knowledge operations now have Effect-returning handlers with Promise + facades only on exported compatibility methods. + - Persistence now decodes legacy and current snapshot shapes with Effect + Schema and encodes JSON through Schema rather than raw + `JSON.parse`/`JSON.stringify` plus assertions. + - The consume loop now uses `Effect.whileLoop`; the remaining + `consumer.receive(2000)` call is a pubsub boundary for this service. + - The service exposes `runMain()` through `NodeRuntime.runMain`; legacy + `run()` uses `ManagedRuntime`, and `ts/scripts/run-knowledge.ts` delegates + to `runMain()`. + - `ts/packages/base/src/schema/messages.ts` now models legacy hyphenated + knowledge request/response aliases so the service can preserve the wire + shape without response type assertions. + - New knowledge-core tests cover ref-backed mutation, graph embedding alias + responses, concurrent state updates, and legacy persistence loading. +- Verification: + - `bun run --cwd ts/packages/base build` + - `bun run --cwd ts/packages/flow build` + - `bun run --cwd ts/packages/flow test` + - `cd ts && bun run check` + - `cd ts && bun run build` + - `cd ts && bun run test` + ## Subagent Findings To Preserve - MCP/workbench: @@ -172,8 +205,9 @@ Notes: the client API is less Promise-first. - MCP env is now Config-backed; continue that policy for future MCP settings. - Flow stateful services: - - Config service ref-backed state is complete. Librarian, cores, and - flow-manager still have mutable poller service objects. These remain good + - Config service and KnowledgeCore service ref-backed state are complete. + Librarian and flow-manager still have mutable poller service objects. + These remain good candidates for `Context` services, scoped layers, `Ref`/`SynchronizedRef`, `Schedule`, and managed persistence. @@ -206,7 +240,6 @@ Notes: - TrustGraph evidence: - `ts/packages/flow/src/librarian/service.ts` - - `ts/packages/flow/src/cores/service.ts` - `ts/packages/flow/src/flow-manager/service.ts` - Effect primitives: - `Context`, `Layer.scoped`, `Ref`, `SynchronizedRef`, `Schedule`, @@ -314,7 +347,7 @@ Notes: ## Recommended PR Order -1. Librarian, cores, or flow-manager scoped state migration. +1. Librarian or flow-manager scoped state migration. 2. Client RPC managed runtime/scoped layer cleanup. 3. Base processor registry and constructor shim redesign. 4. Gateway RPC callback and client streaming completion cleanup. diff --git a/ts/packages/base/src/schema/messages.ts b/ts/packages/base/src/schema/messages.ts index 355123c5..355e5dc1 100644 --- a/ts/packages/base/src/schema/messages.ts +++ b/ts/packages/base/src/schema/messages.ts @@ -381,7 +381,9 @@ export const KnowledgeRequest = S.Struct({ collection: S.optionalKey(S.String), triples: OptionalMutableArray(Triple), graphEmbeddings: OptionalMutableArray(GraphEmbedding), + "graph-embeddings": OptionalMutableArray(GraphEmbedding), documentEmbeddings: S.optionalKey(DocumentEmbeddingsCore), + "document-embeddings": S.optionalKey(DocumentEmbeddingsCore), }); export type KnowledgeRequest = typeof KnowledgeRequest.Type; @@ -391,7 +393,9 @@ export const KnowledgeResponse = S.Struct({ eos: S.optionalKey(S.Boolean), triples: OptionalMutableArray(Triple), graphEmbeddings: OptionalMutableArray(GraphEmbedding), + "graph-embeddings": OptionalMutableArray(GraphEmbedding), documentEmbeddings: S.optionalKey(DocumentEmbeddingsCore), + "document-embeddings": S.optionalKey(DocumentEmbeddingsCore), }); export type KnowledgeResponse = typeof KnowledgeResponse.Type; diff --git a/ts/packages/flow/src/__tests__/knowledge-core-service.test.ts b/ts/packages/flow/src/__tests__/knowledge-core-service.test.ts new file mode 100644 index 00000000..052f92c9 --- /dev/null +++ b/ts/packages/flow/src/__tests__/knowledge-core-service.test.ts @@ -0,0 +1,192 @@ +import {mkdtemp, rm} from "node:fs/promises"; +import {tmpdir} from "node:os"; +import {join} from "node:path"; +import {Effect, SynchronizedRef} from "effect"; +import {describe, expect, it} from "vitest"; +import { + topics, + type BackendConsumer, + type BackendProducer, + type CreateConsumerOptions, + type CreateProducerOptions, + type KnowledgeRequest, + type KnowledgeResponse, + type Message, + type PubSubBackend, + type Triple, +} from "@trustgraph/base"; +import {makeKnowledgeCoreService} from "../cores/service.js"; + +class NoopPubSub implements PubSubBackend { + readonly sentByTopic = new Map>(); + + async createProducer(options: CreateProducerOptions): Promise> { + return { + send: async (message) => { + const sent = this.sentByTopic.get(options.topic) ?? []; + sent.push(message); + this.sentByTopic.set(options.topic, sent); + }, + flush: async () => undefined, + close: async () => undefined, + }; + } + + async createConsumer(_options: CreateConsumerOptions): Promise> { + return { + receive: async () => null, + acknowledge: async (_message: Message) => undefined, + negativeAcknowledge: async (_message: Message) => undefined, + unsubscribe: async () => undefined, + close: async () => undefined, + }; + } + + async close(): Promise {} +} + +const sampleTriple: Triple = { + s: {type: "IRI", iri: "https://example.test/a"}, + p: {type: "IRI", iri: "https://example.test/related"}, + o: {type: "LITERAL", value: "alpha"}, +}; + +const makeService = (dataDir: string, backend: PubSubBackend = new NoopPubSub()) => + makeKnowledgeCoreService({ + id: "knowledge-test", + manageProcessSignals: false, + pubsub: backend, + dataDir, + }); + +const seedResponseProducer = async ( + backend: NoopPubSub, + service: ReturnType, +) => { + const responseProducer = await backend.createProducer({ + topic: topics.knowledgeResponse, + }); + await Effect.runPromise( + SynchronizedRef.update(service.state, (state) => ({ + ...state, + responseProducer, + })), + ); +}; + +describe("KnowledgeCoreService operations", () => { + it("stores knowledge cores through ref-backed state and preserves graph embedding aliases", async () => { + const dir = await mkdtemp(join(tmpdir(), "trustgraph-knowledge-service-")); + const backend = new NoopPubSub(); + const service = makeService(dir, backend); + await seedResponseProducer(backend, service); + + const request: KnowledgeRequest = { + operation: "put-kg-core", + user: "alice", + id: "core-a", + triples: [sampleTriple], + "graph-embeddings": [ + { + entity: {type: "IRI", iri: "https://example.test/a"}, + vectors: [[1, 2, 3]], + }, + ], + }; + + await service.putKgCore(request, "put-1"); + const state = await Effect.runPromise(SynchronizedRef.get(service.state)); + const core = state.kgCores.get("alice:core-a"); + + await service.getKgCore({ + operation: "get-kg-core", + user: "alice", + id: "core-a", + }, "get-1"); + await rm(dir, {recursive: true, force: true}); + + expect(core?.triples).toEqual([sampleTriple]); + expect(core?.graphEmbeddings).toEqual([ + { + entity: {type: "IRI", iri: "https://example.test/a"}, + vectors: [[1, 2, 3]], + }, + ]); + expect(backend.sentByTopic.get(topics.knowledgeResponse)).toEqual([ + {}, + { + triples: [sampleTriple], + eos: false, + }, + { + graphEmbeddings: [ + { + entity: {type: "IRI", iri: "https://example.test/a"}, + vectors: [[1, 2, 3]], + }, + ], + "graph-embeddings": [ + { + entity: {type: "IRI", iri: "https://example.test/a"}, + vectors: [[1, 2, 3]], + }, + ], + eos: true, + }, + ]); + }); + + it("serializes concurrent mutations through ref-backed maps", async () => { + const dir = await mkdtemp(join(tmpdir(), "trustgraph-knowledge-service-")); + const backend = new NoopPubSub(); + const service = makeService(dir, backend); + await seedResponseProducer(backend, service); + + await Promise.all([ + service.putKgCore({ + operation: "put-kg-core", + user: "alice", + id: "core-b", + triples: [sampleTriple], + }, "put-a"), + service.putKgCore({ + operation: "put-kg-core", + user: "alice", + id: "core-b", + triples: [ + { + s: {type: "IRI", iri: "https://example.test/b"}, + p: {type: "IRI", iri: "https://example.test/related"}, + o: {type: "LITERAL", value: "beta"}, + }, + ], + }, "put-b"), + ]); + + const state = await Effect.runPromise(SynchronizedRef.get(service.state)); + await rm(dir, {recursive: true, force: true}); + + expect(state.kgCores.get("alice:core-b")?.triples).toHaveLength(2); + }); + + it("loads the legacy persisted knowledge shape with schema decoding", async () => { + const dir = await mkdtemp(join(tmpdir(), "trustgraph-knowledge-service-")); + const persistPath = join(dir, "knowledge-state.json"); + await Bun.write( + persistPath, + JSON.stringify({ + "alice:legacy": { + triples: [sampleTriple], + graphEmbeddings: [], + }, + }), + ); + const service = makeService(dir); + + await service.loadFromDisk(); + const state = await Effect.runPromise(SynchronizedRef.get(service.state)); + await rm(dir, {recursive: true, force: true}); + + expect(state.kgCores.get("alice:legacy")?.triples).toEqual([sampleTriple]); + }); +}); diff --git a/ts/packages/flow/src/cores/service.ts b/ts/packages/flow/src/cores/service.ts index cf725ab9..7b4fbcf7 100644 --- a/ts/packages/flow/src/cores/service.ts +++ b/ts/packages/flow/src/cores/service.ts @@ -1,48 +1,71 @@ /** - * Knowledge core service — manages stored knowledge graph cores (triples + embeddings). - * - * An AsyncProcessor (NOT FlowProcessor) that: - * 1. Listens on knowledge-request topic - * 2. Handles CRUD operations for knowledge graph cores - * 3. Each core stores triples and graph embeddings keyed by user:id - * 4. Persists state to JSON + * Knowledge core service — manages stored knowledge graph cores. * * Python reference: trustgraph-flow/trustgraph/knowledge/service/service.py */ +import {NodeRuntime} from "@effect/platform-node"; import { + KnowledgeRequest as KnowledgeRequestSchema, + KnowledgeResponse as KnowledgeResponseSchema, + Term as TermSchema, + Triple as TripleSchema, + errorMessage, + loadProcessorRuntimeConfig, makeAsyncProcessor, makeProcessorProgram, - type ProcessorConfig, - type AsyncProcessorRuntime, + optionalStringConfig, topics, + type AsyncProcessorRuntime, + type BackendConsumer, + type BackendProducer, + type KnowledgeOperation, type KnowledgeRequest, type KnowledgeResponse, - type Triple, - type Term, - errorMessage, + type Message, + type ProcessorConfig, } from "@trustgraph/base"; -import type { Message } from "@trustgraph/base"; -import { Config, Context, Duration, Effect } from "effect"; +import {Duration, Effect, Layer, ManagedRuntime, SynchronizedRef} from "effect"; +import * as O from "effect/Option"; import * as S from "effect/Schema"; -import { ensureDirectory, joinPath, readTextFile, writeTextFile } from "../runtime/effect-files.js"; +import {ensureDirectory, joinPath, readTextFile, writeTextFile} from "../runtime/effect-files.js"; export interface KnowledgeCoreServiceConfig extends ProcessorConfig { - dataDir?: string; + readonly dataDir?: string; } -interface KnowledgeCore { - triples: Triple[]; - graphEmbeddings: { entity: Term; vectors: number[][] }[]; -} +const NumberArray = S.Array(S.Number).pipe(S.mutable); +const NumberArrays = S.Array(NumberArray).pipe(S.mutable); -interface DocumentEmbeddingsCore { - metadata?: Record; - chunks?: unknown[]; - [key: string]: unknown; -} +const GraphEmbeddingSchema = S.Struct({ + entity: TermSchema, + vectors: NumberArrays, +}); +type GraphEmbedding = typeof GraphEmbeddingSchema.Type; -export type KnowledgeCoreService = AsyncProcessorRuntime & Record; +const DocumentEmbeddingsCoreSchema = S.StructWithRest( + S.Struct({ + metadata: S.optionalKey(S.Record(S.String, S.Unknown)), + chunks: S.optionalKey(S.Unknown.pipe(S.Array, S.mutable)), + }), + [S.Record(S.String, S.Unknown)], +); +type DocumentEmbeddingsCore = typeof DocumentEmbeddingsCoreSchema.Type; + +const KnowledgeCoreSchema = S.Struct({ + triples: S.Array(TripleSchema).pipe(S.mutable), + graphEmbeddings: S.Array(GraphEmbeddingSchema).pipe(S.mutable), +}); +type KnowledgeCore = typeof KnowledgeCoreSchema.Type; + +const PersistedKnowledgeSnapshotSchema = S.Struct({ + kg: S.Record(S.String, KnowledgeCoreSchema), + de: S.optionalKey(S.Record(S.String, S.Array(DocumentEmbeddingsCoreSchema).pipe(S.mutable))), +}); +const PersistedKnowledgeSnapshotJsonSchema = PersistedKnowledgeSnapshotSchema.pipe(S.fromJsonString); +const LegacyKnowledgeSnapshotJsonSchema = S.Record(S.String, KnowledgeCoreSchema).pipe(S.fromJsonString); +type PersistedKnowledgeSnapshot = typeof PersistedKnowledgeSnapshotSchema.Type; +type LegacyKnowledgeSnapshot = typeof LegacyKnowledgeSnapshotJsonSchema.Type; export class KnowledgeCoreServiceError extends S.TaggedErrorClass()( "KnowledgeCoreServiceError", @@ -52,21 +75,145 @@ export class KnowledgeCoreServiceError extends S.TaggedErrorClass; - close(): Promise; -} - -interface CloseableResource { - close(): Promise; -} - const knowledgeCoreServiceError = (operation: string, cause: unknown): KnowledgeCoreServiceError => KnowledgeCoreServiceError.make({ operation, message: errorMessage(cause), }); +type KnowledgeCoreStore = Map; +type DocumentCoreStore = Map>; + +interface KnowledgeCoreServiceState { + readonly kgCores: KnowledgeCoreStore; + readonly deCores: DocumentCoreStore; + readonly consumer: BackendConsumer | null; + readonly responseProducer: BackendProducer | null; +} + +export interface KnowledgeCoreService extends AsyncProcessorRuntime { + readonly state: SynchronizedRef.SynchronizedRef; + readonly dataDir: string; + readonly persistPath: string; + readonly coreKey: (user: string, id: string) => string; + readonly graphEmbeddings: (request: KnowledgeRequest) => ReadonlyArray; + readonly documentEmbeddings: (request: KnowledgeRequest) => DocumentEmbeddingsCore | undefined; + readonly handleMessage: (msg: Message) => Promise; + readonly handleMessageEffect: (msg: Message) => Effect.Effect; + readonly handleOperation: (request: KnowledgeRequest, requestId: string) => Promise; + readonly handleOperationEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly listKgCores: (request: KnowledgeRequest, requestId: string) => Promise; + readonly listKgCoresEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly getKgCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly getKgCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly deleteKgCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly deleteKgCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly putKgCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly putKgCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly loadKgCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly loadKgCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly unloadKgCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly unloadKgCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly listDeCores: (request: KnowledgeRequest, requestId: string) => Promise; + readonly listDeCoresEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly getDeCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly getDeCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly deleteDeCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly deleteDeCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly putDeCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly putDeCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly loadDeCore: (request: KnowledgeRequest, requestId: string) => Promise; + readonly loadDeCoreEffect: (request: KnowledgeRequest, requestId: string) => Effect.Effect; + readonly persist: () => Promise; + readonly persistEffect: Effect.Effect; + readonly loadFromDisk: () => Promise; + readonly loadFromDiskEffect: Effect.Effect; +} + +const initialState = (): KnowledgeCoreServiceState => ({ + kgCores: new Map(), + deCores: new Map>(), + consumer: null, + responseProducer: null, +}); + +const cloneKnowledgeCore = (core: KnowledgeCore): KnowledgeCore => ({ + triples: Array.from(core.triples), + graphEmbeddings: core.graphEmbeddings.map((entry) => ({ + entity: entry.entity, + vectors: entry.vectors.map((vector) => Array.from(vector)), + })), +}); + +const cloneKgStore = (store: KnowledgeCoreStore): KnowledgeCoreStore => { + const next = new Map(); + for (const [key, core] of store) { + next.set(key, cloneKnowledgeCore(core)); + } + return next; +}; + +const cloneDeStore = (store: DocumentCoreStore): DocumentCoreStore => { + const next = new Map>(); + for (const [key, cores] of store) { + next.set(key, Array.from(cores)); + } + return next; +}; + +const toPersistedSnapshot = (state: KnowledgeCoreServiceState): PersistedKnowledgeSnapshot => { + const kg: Record = {}; + const de: Record> = {}; + + for (const [key, core] of state.kgCores) { + kg[key] = cloneKnowledgeCore(core); + } + for (const [key, core] of state.deCores) { + de[key] = Array.from(core); + } + + return {kg, de}; +}; + +const kgStoreFromRecord = (record: LegacyKnowledgeSnapshot): KnowledgeCoreStore => { + const store = new Map(); + for (const [key, core] of Object.entries(record)) { + store.set(key, cloneKnowledgeCore(core)); + } + return store; +}; + +const deStoreFromRecord = ( + record: Record> | undefined, +): DocumentCoreStore => { + const store = new Map>(); + for (const [key, core] of Object.entries(record ?? {})) { + store.set(key, Array.from(core)); + } + return store; +}; + +const coreKey = (user: string, id: string): string => `${user}:${id}`; + +const graphEmbeddingsFor = (request: KnowledgeRequest): ReadonlyArray => + request.graphEmbeddings ?? request["graph-embeddings"] ?? []; + +const documentEmbeddingsFor = (request: KnowledgeRequest): DocumentEmbeddingsCore | undefined => + request.documentEmbeddings ?? request["document-embeddings"]; + +const updateHandles = ( + stateRef: SynchronizedRef.SynchronizedRef, + handles: { + readonly consumer?: BackendConsumer | null; + readonly responseProducer?: BackendProducer | null; + }, +) => + SynchronizedRef.updateAndGet(stateRef, (state) => ({ + ...state, + consumer: handles.consumer === undefined ? state.consumer : handles.consumer, + responseProducer: handles.responseProducer === undefined ? state.responseProducer : handles.responseProducer, + })); + const tryPromise = ( operation: string, evaluate: () => Promise, @@ -76,35 +223,8 @@ const tryPromise = ( catch: (cause) => knowledgeCoreServiceError(operation, cause), }); -const trySync = ( - operation: string, - evaluate: () => A, -): Effect.Effect => - Effect.try({ - try: evaluate, - catch: (cause) => knowledgeCoreServiceError(operation, cause), - }); - -const failPromise = (operation: string, cause: unknown): Promise => - Effect.runPromise(Effect.fail(knowledgeCoreServiceError(operation, cause))); - -const sendResponse = ( - service: KnowledgeCoreService, - response: KnowledgeResponse, - requestId: string, - operation = "respond", -): Effect.Effect => - Effect.gen(function* () { - const responseProducer = service.responseProducer as KnowledgeResponseProducer | null | undefined; - if (responseProducer === null || responseProducer === undefined) { - return yield* knowledgeCoreServiceError(operation, "Knowledge response producer not started"); - } - - yield* tryPromise(operation, () => responseProducer.send(response, { id: requestId })); - }); - const closeResource = ( - resource: CloseableResource, + resource: {readonly close: () => Promise}, operation: string, ): Effect.Effect => tryPromise(operation, () => resource.close()).pipe( @@ -116,578 +236,566 @@ const closeResource = ( ), ); -export function makeKnowledgeCoreService(config: KnowledgeCoreServiceConfig): KnowledgeCoreService { - const service = makeAsyncProcessor(config, { - run: () => service.run(Context.empty()), - }) as KnowledgeCoreService; - const baseStop = service.stop; - service.cores = new Map(); - service.deCores = new Map(); - service.consumer = null; - service.responseProducer = null; - const dataDir = config.dataDir ?? "./data/knowledge"; - service.dataDir = dataDir; - service.persistPath = joinPath(dataDir, "knowledge-state.json"); - Object.assign(service, { - - - coreKey: function(this: KnowledgeCoreService, user: string, id: string): string { - return `${user}:${id}`; - - }, - - - - run: function(this: KnowledgeCoreService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - if (config.dataDir === undefined) { - const configuredDataDir = yield* Config.string("KNOWLEDGE_DATA_DIR").pipe( - Config.withDefault("./data/knowledge"), - ); - service.dataDir = configuredDataDir; - service.persistPath = joinPath(configuredDataDir, "knowledge-state.json"); - } - - yield* tryPromise("ensure-directory", () => ensureDirectory(service.dataDir)); - // Load persisted state - yield* tryPromise("load", () => service.loadFromDisk()); - - // Create producer - service.responseProducer = yield* tryPromise("response-producer", () => - service.pubsub.createProducer({ - topic: topics.knowledgeResponse, - }), - ); - - // Create consumer - service.consumer = yield* tryPromise("consumer", () => - service.pubsub.createConsumer({ - topic: topics.knowledgeRequest, - subscription: `${service.config.id}-knowledge-request`, - }), - ); - - yield* Effect.log(`[KnowledgeCoreService] Listening on ${topics.knowledgeRequest}`); - - // Main consume loop - while (service.running) { - const shouldContinue = yield* Effect.gen(function* () { - const consumer = service.consumer; - if (consumer === null || consumer === undefined) { - return yield* knowledgeCoreServiceError("consume", "Knowledge request consumer not started"); - } - - const msg = yield* tryPromise("consume-receive", () => consumer.receive(2000)); - if (msg === null) return true; - - yield* tryPromise("consume-handle", () => service.handleMessage(msg)); - yield* tryPromise("consume-acknowledge", () => consumer.acknowledge(msg)); - - return true; - }).pipe( - Effect.catch((error) => { - if (!service.running) return Effect.succeed(false); - return Effect.logError("[KnowledgeCoreService] Error in consume loop", { - error: error.message, - }).pipe( - Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), - Effect.as(true), - ); - }), - ); - - if (!shouldContinue) break; - } - }), - ); - - }, - - - - handleMessage: function(this: KnowledgeCoreService, msg: Message): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const request = msg.value(); - const props = msg.properties(); - const requestId = props.id; - - if (requestId === undefined || requestId.length === 0) { - yield* Effect.logWarning("[KnowledgeCoreService] Received request without id, ignoring"); - return; - } - - yield* tryPromise("operation", () => service.handleOperation(request, requestId)).pipe( - Effect.catch((error) => - sendResponse( - service, - { error: { type: "knowledge-error", message: error.message } }, - requestId, - "respond-error", - ), - ), - ); - }), - ); - - }, - - - - handleOperation: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - switch (request.operation) { - case "list-kg-cores": - return this.listKgCores(request, requestId); - case "get-kg-core": - return this.getKgCore(request, requestId); - case "delete-kg-core": - return this.deleteKgCore(request, requestId); - case "put-kg-core": - return this.putKgCore(request, requestId); - case "load-kg-core": - return this.loadKgCore(request, requestId); - case "unload-kg-core": - return this.unloadKgCore(request, requestId); - case "list-de-cores": - return this.listDeCores(request, requestId); - case "get-de-core": - return this.getDeCore(request, requestId); - case "delete-de-core": - return this.deleteDeCore(request, requestId); - case "put-de-core": - return this.putDeCore(request, requestId); - case "load-de-core": - return this.loadDeCore(request, requestId); - default: - return failPromise("operation", `Unknown knowledge operation: ${request.operation as string}`); - } - - }, - - - - requestRecord: function(this: KnowledgeCoreService, request: KnowledgeRequest): Record { - return request as Record; - - }, - - - - graphEmbeddings: function(this: KnowledgeCoreService, request: KnowledgeRequest): { entity: Term; vectors: number[][] }[] { - const req = this.requestRecord(request); - const value = request.graphEmbeddings ?? req["graph-embeddings"]; - return Array.isArray(value) ? value as { entity: Term; vectors: number[][] }[] : []; - - }, - - - - documentEmbeddings: function(this: KnowledgeCoreService, request: KnowledgeRequest): DocumentEmbeddingsCore | undefined { - const req = this.requestRecord(request); - const value = request.documentEmbeddings ?? req["document-embeddings"]; - if (typeof value !== "object" || value === null || Array.isArray(value)) return undefined; - return value as DocumentEmbeddingsCore; - - }, - - - - listKgCores: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const prefix = user.length > 0 ? `${user}:` : ""; - - const ids: string[] = []; - for (const key of (service.cores as Map).keys()) { - if (prefix.length === 0 || key.startsWith(prefix)) { - // Extract the ID portion after the user prefix - const id = key.slice(prefix.length); - ids.push(id); - } - } - - yield* sendResponse(service, { ids }, requestId); - }), - ); - - }, - - - - getKgCore: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const coreId = request.id ?? ""; - const key = service.coreKey(user, coreId); - - const core = service.cores.get(key); - if (core === undefined) { - return yield* knowledgeCoreServiceError("get-kg-core", `Knowledge core not found: ${key}`); - } - - // Send triples and embeddings in batches - const BATCH_SIZE = 100; - - // Send triples in batches - for (let i = 0; i < core.triples.length; i += BATCH_SIZE) { - const batch = core.triples.slice(i, i + BATCH_SIZE); - const isLast = i + BATCH_SIZE >= core.triples.length && core.graphEmbeddings.length === 0; - - yield* sendResponse( - service, - { triples: batch, eos: isLast }, - requestId, - "respond-kg-triples", - ); - } - - // Send graph embeddings in batches - for (let i = 0; i < core.graphEmbeddings.length; i += BATCH_SIZE) { - const batch = core.graphEmbeddings.slice(i, i + BATCH_SIZE); - const isLast = i + BATCH_SIZE >= core.graphEmbeddings.length; - - yield* sendResponse( - service, - { graphEmbeddings: batch, "graph-embeddings": batch, eos: isLast } as KnowledgeResponse, - requestId, - "respond-kg-embeddings", - ); - } - - // If core was empty, send a final eos - if (core.triples.length === 0 && core.graphEmbeddings.length === 0) { - yield* sendResponse(service, { eos: true }, requestId, "respond-kg-empty"); - } - }), - ); - - }, - - - - deleteKgCore: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const coreId = request.id ?? ""; - const key = service.coreKey(user, coreId); - - service.cores.delete(key); - yield* tryPromise("persist-delete-kg-core", () => service.persist()); - - yield* Effect.log(`[KnowledgeCoreService] Deleted core: ${key}`); - yield* sendResponse(service, {}, requestId); - }), - ); - - }, - - - - putKgCore: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const coreId = request.id ?? ""; - const key = service.coreKey(user, coreId); - - let core = service.cores.get(key); - if (core === undefined) { - core = { triples: [], graphEmbeddings: [] }; - service.cores.set(key, core); - } - - // Append triples if provided - if (request.triples !== undefined && request.triples.length > 0) { - core.triples.push(...request.triples); - } - - // Append graph embeddings if provided - const graphEmbeddings = service.graphEmbeddings(request); - if (graphEmbeddings.length > 0) { - core.graphEmbeddings.push(...graphEmbeddings); - } - - yield* tryPromise("persist-put-kg-core", () => service.persist()); - - yield* Effect.log( - `[KnowledgeCoreService] Updated core ${key}: triples=${core.triples.length}, embeddings=${core.graphEmbeddings.length}`, - ); - yield* sendResponse(service, {}, requestId); - }), - ); - - }, - - - - loadKgCore: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const coreId = request.id ?? ""; - const key = service.coreKey(user, coreId); - - const core = service.cores.get(key); - if (core === undefined) { - return yield* knowledgeCoreServiceError("load-kg-core", `Knowledge core not found: ${key}`); - } - - if (core.triples.length > 0) { - yield* Effect.acquireUseRelease( - tryPromise("triples-producer", () => - service.pubsub.createProducer({ topic: "tg.flow.triples" }), - ), - (producer) => - tryPromise("send-triples", () => - producer.send({ - metadata: { - id: coreId, - root: coreId, - user, - collection: request.collection ?? "default", - }, - triples: core.triples, - }), - ), - (producer) => closeResource(producer, "close-triples-producer"), - ); - } - - yield* Effect.log( - `[KnowledgeCoreService] Loaded core ${key} (triples=${core.triples.length}, embeddings=${core.graphEmbeddings.length})`, - ); - yield* sendResponse(service, {}, requestId); - }), - ); - - }, - - - - unloadKgCore: function(this: KnowledgeCoreService, _request: KnowledgeRequest, requestId: string): Promise { - return Effect.runPromise(sendResponse(this, {}, requestId)); - - }, - - - - listDeCores: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const prefix = user.length > 0 ? `${user}:` : ""; - const ids = [...service.deCores.keys()] - .filter((key) => prefix.length === 0 || key.startsWith(prefix)) - .map((key) => key.slice(prefix.length)); - yield* sendResponse(service, { ids }, requestId); - }), - ); - - }, - - - - getDeCore: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const coreId = request.id ?? ""; - const key = service.coreKey(user, coreId); - const core = service.deCores.get(key); - if (core === undefined) { - return yield* knowledgeCoreServiceError("get-de-core", `Document embeddings core not found: ${key}`); - } - - for (let i = 0; i < core.length; i++) { - const isLast = i === core.length - 1; - yield* sendResponse( - service, - { - documentEmbeddings: core[i], - "document-embeddings": core[i], - eos: isLast, - } as KnowledgeResponse, - requestId, - "respond-de-core", - ); - } - if (core.length === 0) { - yield* sendResponse(service, { eos: true }, requestId, "respond-de-empty"); - } - }), - ); - - }, - - - - deleteDeCore: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const coreId = request.id ?? ""; - service.deCores.delete(service.coreKey(user, coreId)); - yield* tryPromise("persist-delete-de-core", () => service.persist()); - yield* sendResponse(service, {}, requestId); - }), - ); - - }, - - - - putDeCore: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const coreId = request.id ?? ""; - const key = service.coreKey(user, coreId); - const item = service.documentEmbeddings(request); - if (item === undefined) { - return yield* knowledgeCoreServiceError("put-de-core", "put-de-core requires document-embeddings"); - } - const core = service.deCores.get(key) ?? []; - core.push(item); - service.deCores.set(key, core); - yield* tryPromise("persist-put-de-core", () => service.persist()); - yield* sendResponse(service, {}, requestId); - }), - ); - - }, - - - - loadDeCore: function(this: KnowledgeCoreService, request: KnowledgeRequest, requestId: string): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const user = request.user ?? ""; - const coreId = request.id ?? ""; - const key = service.coreKey(user, coreId); - if (!(service.deCores as Map).has(key)) { - return yield* knowledgeCoreServiceError("load-de-core", `Document embeddings core not found: ${key}`); - } - yield* sendResponse(service, {}, requestId); - }), - ); - - }, - - - - // ---------- Persistence ---------- - - persist: function(this: KnowledgeCoreService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - // Serialize Map to object - const data: { - kg: Record; - de: Record; - } = { kg: {}, de: {} }; - for (const [key, core] of service.cores) { - data.kg[key] = core; - } - for (const [key, core] of service.deCores) { - data.de[key] = core; - } - - const json = yield* trySync("persist-serialize", () => JSON.stringify(data, null, 2)); - yield* tryPromise("persist-write", () => writeTextFile(service.persistPath, json)); - }).pipe( - Effect.catch((error) => - Effect.logError("[KnowledgeCoreService] Failed to persist state", { - error: error.message, - }), - ), - ), - ); - - }, - - - - loadFromDisk: function(this: KnowledgeCoreService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - const raw = yield* tryPromise("load-read", () => readTextFile(service.persistPath)); - const parsed = yield* trySync("load-parse", () => - JSON.parse(raw) as Record | { - kg?: Record; - de?: Record; - }, - ); - - service.cores.clear(); - service.deCores.clear(); - const kg = "kg" in parsed && parsed.kg !== undefined ? parsed.kg : parsed as Record; - for (const [key, core] of Object.entries(kg)) { - service.cores.set(key, core); - } - if ("de" in parsed && parsed.de !== undefined) { - for (const [key, core] of Object.entries(parsed.de)) { - service.deCores.set(key, core); - } - } - - yield* Effect.log(`[KnowledgeCoreService] Loaded persisted state (kg=${service.cores.size}, de=${service.deCores.size})`); - }).pipe( - Effect.catch(() => - Effect.log("[KnowledgeCoreService] No persisted state found, starting fresh"), - ), - ), - ); - - }, - - - - stop: function(this: KnowledgeCoreService): Promise { - const service = this; - return Effect.runPromise( - Effect.gen(function* () { - if (service.consumer !== null) { - yield* tryPromise("close-consumer", () => service.consumer.close()); - service.consumer = null; - } - if (service.responseProducer !== null) { - yield* tryPromise("close-response-producer", () => service.responseProducer.close()); - service.responseProducer = null; - } - yield* tryPromise("base-stop", () => baseStop()); - }), - ); - - } +const sendResponse = ( + stateRef: SynchronizedRef.SynchronizedRef, + response: KnowledgeResponse, + requestId: string, + operation = "respond", +): Effect.Effect => + Effect.gen(function* () { + const responseProducer = (yield* SynchronizedRef.get(stateRef)).responseProducer; + if (responseProducer === null) { + return yield* knowledgeCoreServiceError(operation, "Knowledge response producer not started"); + } + + yield* tryPromise(operation, () => responseProducer.send(response, {id: requestId})); }); + +const readPersistedKnowledgeEffect = ( + persistPath: string, +): Effect.Effect<{ + readonly kgCores: KnowledgeCoreStore; + readonly deCores: DocumentCoreStore; +} | null, never> => + Effect.gen(function* () { + const raw = yield* tryPromise("load-read", () => readTextFile(persistPath)); + const current = S.decodeUnknownOption(PersistedKnowledgeSnapshotJsonSchema)(raw); + if (O.isSome(current)) { + return { + kgCores: kgStoreFromRecord(current.value.kg), + deCores: deStoreFromRecord(current.value.de), + }; + } + + const legacy = S.decodeUnknownOption(LegacyKnowledgeSnapshotJsonSchema)(raw); + if (O.isSome(legacy)) { + return { + kgCores: kgStoreFromRecord(legacy.value), + deCores: new Map>(), + }; + } + + return yield* knowledgeCoreServiceError("load-decode", "Persisted knowledge state did not match any known shape"); + }).pipe( + Effect.catch(() => + Effect.log("[KnowledgeCoreService] No persisted state found, starting fresh").pipe( + Effect.flatMap(() => + Effect.succeed<{ + readonly kgCores: KnowledgeCoreStore; + readonly deCores: DocumentCoreStore; + } | null>(null) + ), + ) + ), + ); + +const persistStateEffect = ( + persistPath: string, + state: KnowledgeCoreServiceState, +): Effect.Effect => + Effect.gen(function* () { + const snapshot = toPersistedSnapshot(state); + const json = yield* S.encodeUnknownEffect(S.UnknownFromJsonString)(snapshot).pipe( + Effect.mapError((cause) => knowledgeCoreServiceError("persist-encode", cause)), + ); + yield* tryPromise("persist-write", () => writeTextFile(persistPath, json)); + }).pipe( + Effect.catch((error) => + Effect.logError("[KnowledgeCoreService] Failed to persist state", { + error: error.message, + }), + ), + ); + +const listIds = ( + store: ReadonlyMap, + user: string, +): Array => { + const prefix = user.length > 0 ? `${user}:` : ""; + const ids: Array = []; + + for (const key of store.keys()) { + if (prefix.length === 0 || key.startsWith(prefix)) { + ids.push(key.slice(prefix.length)); + } + } + + return ids; +}; + +const closeKnowledgeResourcesEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, +): Effect.Effect => + Effect.gen(function* () { + const state = yield* SynchronizedRef.get(stateRef); + + const consumer = state.consumer; + if (consumer !== null) { + yield* tryPromise("close-consumer", () => consumer.close()); + } + + const responseProducer = state.responseProducer; + if (responseProducer !== null) { + yield* tryPromise("close-response-producer", () => responseProducer.close()); + } + + yield* updateHandles(stateRef, { + consumer: null, + responseProducer: null, + }); + }); + +const consumeOnceEffect = ( + service: KnowledgeCoreService, +): Effect.Effect => + Effect.gen(function* () { + const consumer = (yield* SynchronizedRef.get(service.state)).consumer; + if (consumer === null) { + return yield* knowledgeCoreServiceError("consume", "Knowledge request consumer not started"); + } + + const msg = yield* tryPromise("consume-receive", () => consumer.receive(2000)); + if (msg === null) return; + + yield* service.handleMessageEffect(msg); + yield* tryPromise("consume-acknowledge", () => consumer.acknowledge(msg)); + }); + +const runKnowledgeCoreServiceEffect = ( + service: KnowledgeCoreService, +): Effect.Effect => + Effect.gen(function* () { + yield* tryPromise("ensure-directory", () => ensureDirectory(service.dataDir)); + yield* service.loadFromDiskEffect; + + const responseProducer = yield* tryPromise("response-producer", () => + service.pubsub.createProducer({ + topic: topics.knowledgeResponse, + schema: KnowledgeResponseSchema, + }), + ); + yield* updateHandles(service.state, {responseProducer}); + + const consumer = yield* tryPromise("consumer", () => + service.pubsub.createConsumer({ + topic: topics.knowledgeRequest, + subscription: `${service.config.id}-knowledge-request`, + schema: KnowledgeRequestSchema, + }), + ); + yield* updateHandles(service.state, {consumer}); + + yield* Effect.log(`[KnowledgeCoreService] Listening on ${topics.knowledgeRequest}`); + + yield* Effect.whileLoop({ + while: () => service.running, + body: () => + consumeOnceEffect(service).pipe( + Effect.catch((error) => { + if (!service.running) return Effect.void; + return Effect.logError("[KnowledgeCoreService] Error in consume loop", { + error: error.message, + }).pipe( + Effect.flatMap(() => Effect.sleep(Duration.millis(1000))), + ); + }), + ), + step: () => undefined, + }); + }); + +const listKgCoresEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: KnowledgeRequest, + requestId: string, +) => + SynchronizedRef.get(stateRef).pipe( + Effect.flatMap((state) => sendResponse(stateRef, {ids: listIds(state.kgCores, request.user ?? "")}, requestId)), + ); + +const getKgCoreEffect = Effect.fn("getKgCoreEffect")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + request: KnowledgeRequest, + requestId: string, +) { + const key = coreKey(request.user ?? "", request.id ?? ""); + const core = (yield* SynchronizedRef.get(stateRef)).kgCores.get(key); + if (core === undefined) { + return yield* knowledgeCoreServiceError("get-kg-core", `Knowledge core not found: ${key}`); + } + + const batchSize = 100; + for (let i = 0; i < core.triples.length; i += batchSize) { + const batch = core.triples.slice(i, i + batchSize); + const isLast = i + batchSize >= core.triples.length && core.graphEmbeddings.length === 0; + yield* sendResponse(stateRef, {triples: batch, eos: isLast}, requestId, "respond-kg-triples"); + } + + for (let i = 0; i < core.graphEmbeddings.length; i += batchSize) { + const batch = core.graphEmbeddings.slice(i, i + batchSize); + const isLast = i + batchSize >= core.graphEmbeddings.length; + yield* sendResponse( + stateRef, + { + graphEmbeddings: batch, + "graph-embeddings": batch, + eos: isLast, + }, + requestId, + "respond-kg-embeddings", + ); + } + + if (core.triples.length === 0 && core.graphEmbeddings.length === 0) { + yield* sendResponse(stateRef, {eos: true}, requestId, "respond-kg-empty"); + } +}); + +const deleteKgCoreEffect = Effect.fn("deleteKgCoreEffect")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + persistPath: string, + request: KnowledgeRequest, + requestId: string, +) { + const key = coreKey(request.user ?? "", request.id ?? ""); + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { + const kgCores = cloneKgStore(state.kgCores); + kgCores.delete(key); + return {...state, kgCores}; + }); + + yield* persistStateEffect(persistPath, next); + yield* Effect.log(`[KnowledgeCoreService] Deleted core: ${key}`); + yield* sendResponse(stateRef, {}, requestId); +}); + +const putKgCoreEffect = Effect.fn("putKgCoreEffect")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + persistPath: string, + request: KnowledgeRequest, + requestId: string, +) { + const key = coreKey(request.user ?? "", request.id ?? ""); + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { + const kgCores = cloneKgStore(state.kgCores); + const existing = kgCores.get(key) ?? {triples: [], graphEmbeddings: []}; + const core: KnowledgeCore = { + triples: [ + ...existing.triples, + ...Array.from(request.triples ?? []), + ], + graphEmbeddings: [ + ...existing.graphEmbeddings, + ...graphEmbeddingsFor(request).map((entry) => ({ + entity: entry.entity, + vectors: entry.vectors.map((vector) => Array.from(vector)), + })), + ], + }; + kgCores.set(key, core); + return {...state, kgCores}; + }); + + const core = next.kgCores.get(key); + yield* persistStateEffect(persistPath, next); + yield* Effect.log( + `[KnowledgeCoreService] Updated core ${key}: triples=${core?.triples.length ?? 0}, embeddings=${core?.graphEmbeddings.length ?? 0}`, + ); + yield* sendResponse(stateRef, {}, requestId); +}); + +const loadKgCoreEffect = Effect.fn("loadKgCoreEffect")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + service: KnowledgeCoreService, + request: KnowledgeRequest, + requestId: string, +) { + const user = request.user ?? ""; + const coreId = request.id ?? ""; + const key = coreKey(user, coreId); + const core = (yield* SynchronizedRef.get(stateRef)).kgCores.get(key); + if (core === undefined) { + return yield* knowledgeCoreServiceError("load-kg-core", `Knowledge core not found: ${key}`); + } + + if (core.triples.length > 0) { + yield* Effect.acquireUseRelease( + tryPromise("triples-producer", () => + service.pubsub.createProducer({topic: "tg.flow.triples"}), + ), + (producer) => + tryPromise("send-triples", () => + producer.send({ + metadata: { + id: coreId, + root: coreId, + user, + collection: request.collection ?? "default", + }, + triples: core.triples, + }), + ), + (producer) => closeResource(producer, "close-triples-producer"), + ); + } + + yield* Effect.log( + `[KnowledgeCoreService] Loaded core ${key} (triples=${core.triples.length}, embeddings=${core.graphEmbeddings.length})`, + ); + yield* sendResponse(stateRef, {}, requestId); +}); + +const listDeCoresEffect = ( + stateRef: SynchronizedRef.SynchronizedRef, + request: KnowledgeRequest, + requestId: string, +) => + SynchronizedRef.get(stateRef).pipe( + Effect.flatMap((state) => sendResponse(stateRef, {ids: listIds(state.deCores, request.user ?? "")}, requestId)), + ); + +const getDeCoreEffect = Effect.fn("getDeCoreEffect")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + request: KnowledgeRequest, + requestId: string, +) { + const key = coreKey(request.user ?? "", request.id ?? ""); + const core = (yield* SynchronizedRef.get(stateRef)).deCores.get(key); + if (core === undefined) { + return yield* knowledgeCoreServiceError("get-de-core", `Document embeddings core not found: ${key}`); + } + + for (const [index, item] of core.entries()) { + yield* sendResponse( + stateRef, + { + documentEmbeddings: item, + "document-embeddings": item, + eos: index === core.length - 1, + }, + requestId, + "respond-de-core", + ); + } + + if (core.length === 0) { + yield* sendResponse(stateRef, {eos: true}, requestId, "respond-de-empty"); + } +}); + +const deleteDeCoreEffect = Effect.fn("deleteDeCoreEffect")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + persistPath: string, + request: KnowledgeRequest, + requestId: string, +) { + const key = coreKey(request.user ?? "", request.id ?? ""); + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { + const deCores = cloneDeStore(state.deCores); + deCores.delete(key); + return {...state, deCores}; + }); + + yield* persistStateEffect(persistPath, next); + yield* sendResponse(stateRef, {}, requestId); +}); + +const putDeCoreEffect = Effect.fn("putDeCoreEffect")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + persistPath: string, + request: KnowledgeRequest, + requestId: string, +) { + const item = documentEmbeddingsFor(request); + if (item === undefined) { + return yield* knowledgeCoreServiceError("put-de-core", "put-de-core requires document-embeddings"); + } + + const key = coreKey(request.user ?? "", request.id ?? ""); + const next = yield* SynchronizedRef.updateAndGet(stateRef, (state) => { + const deCores = cloneDeStore(state.deCores); + deCores.set(key, [...(deCores.get(key) ?? []), item]); + return {...state, deCores}; + }); + + yield* persistStateEffect(persistPath, next); + yield* sendResponse(stateRef, {}, requestId); +}); + +const loadDeCoreEffect = Effect.fn("loadDeCoreEffect")(function* ( + stateRef: SynchronizedRef.SynchronizedRef, + request: KnowledgeRequest, + requestId: string, +) { + const key = coreKey(request.user ?? "", request.id ?? ""); + const exists = (yield* SynchronizedRef.get(stateRef)).deCores.has(key); + if (!exists) { + return yield* knowledgeCoreServiceError("load-de-core", `Document embeddings core not found: ${key}`); + } + yield* sendResponse(stateRef, {}, requestId); +}); + +export function makeKnowledgeCoreService(config: KnowledgeCoreServiceConfig): KnowledgeCoreService { + const dataDir = config.dataDir ?? "./data/knowledge"; + const persistPath = joinPath(dataDir, "knowledge-state.json"); + const state = SynchronizedRef.makeUnsafe(initialState()); + let service: KnowledgeCoreService | undefined; + + const getService = Effect.sync(() => service).pipe( + Effect.flatMap((current) => + current === undefined + ? Effect.fail(knowledgeCoreServiceError("service", "Knowledge core service not initialized")) + : Effect.succeed(current) + ), + ); + + const base = makeAsyncProcessor(config, { + runEffect: () => getService.pipe(Effect.flatMap(runKnowledgeCoreServiceEffect)), + }); + const baseStop = base.stop; + + const handleOperationEffect = (request: KnowledgeRequest, requestId: string) => { + const operation: KnowledgeOperation = request.operation; + + switch (operation) { + case "list-kg-cores": + return listKgCoresEffect(state, request, requestId); + case "get-kg-core": + return getKgCoreEffect(state, request, requestId); + case "delete-kg-core": + return deleteKgCoreEffect(state, persistPath, request, requestId); + case "put-kg-core": + return putKgCoreEffect(state, persistPath, request, requestId); + case "load-kg-core": + return getService.pipe(Effect.flatMap((current) => loadKgCoreEffect(state, current, request, requestId))); + case "unload-kg-core": + return sendResponse(state, {}, requestId); + case "list-de-cores": + return listDeCoresEffect(state, request, requestId); + case "get-de-core": + return getDeCoreEffect(state, request, requestId); + case "delete-de-core": + return deleteDeCoreEffect(state, persistPath, request, requestId); + case "put-de-core": + return putDeCoreEffect(state, persistPath, request, requestId); + case "load-de-core": + return loadDeCoreEffect(state, request, requestId); + } + }; + + const handleMessageEffect = Effect.fn("KnowledgeCoreService.handleMessage")(function* (msg: Message) { + const request = yield* S.decodeUnknownEffect(KnowledgeRequestSchema)(msg.value()).pipe( + Effect.mapError((cause) => knowledgeCoreServiceError("decode", cause)), + ); + const requestId = msg.properties().id; + + if (requestId === undefined || requestId.length === 0) { + yield* Effect.logWarning("[KnowledgeCoreService] Received request without id, ignoring"); + return; + } + + yield* handleOperationEffect(request, requestId).pipe( + Effect.catch((error) => + sendResponse( + state, + {error: {type: "knowledge-error", message: error.message}}, + requestId, + "respond-error", + ) + ), + ); + }); + + const loadFromDiskEffect = Effect.fn("KnowledgeCoreService.loadFromDisk")(function* () { + const loaded = yield* readPersistedKnowledgeEffect(persistPath); + if (loaded === null) return; + + const next = yield* SynchronizedRef.updateAndGet(state, (current) => ({ + ...current, + kgCores: loaded.kgCores, + deCores: loaded.deCores, + })); + + yield* Effect.log(`[KnowledgeCoreService] Loaded persisted state (kg=${next.kgCores.size}, de=${next.deCores.size})`); + }); + + service = Object.assign(base, { + state, + dataDir, + persistPath, + coreKey, + graphEmbeddings: graphEmbeddingsFor, + documentEmbeddings: documentEmbeddingsFor, + handleMessage: (msg: Message) => Effect.runPromise(handleMessageEffect(msg)), + handleMessageEffect, + handleOperation: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(handleOperationEffect(request, requestId)), + handleOperationEffect, + listKgCores: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(listKgCoresEffect(state, request, requestId)), + listKgCoresEffect: (request: KnowledgeRequest, requestId: string) => listKgCoresEffect(state, request, requestId), + getKgCore: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(getKgCoreEffect(state, request, requestId)), + getKgCoreEffect: (request: KnowledgeRequest, requestId: string) => getKgCoreEffect(state, request, requestId), + deleteKgCore: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(deleteKgCoreEffect(state, persistPath, request, requestId)), + deleteKgCoreEffect: (request: KnowledgeRequest, requestId: string) => deleteKgCoreEffect(state, persistPath, request, requestId), + putKgCore: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(putKgCoreEffect(state, persistPath, request, requestId)), + putKgCoreEffect: (request: KnowledgeRequest, requestId: string) => putKgCoreEffect(state, persistPath, request, requestId), + loadKgCore: (request: KnowledgeRequest, requestId: string) => + Effect.runPromise(getService.pipe(Effect.flatMap((current) => loadKgCoreEffect(state, current, request, requestId)))), + loadKgCoreEffect: (request: KnowledgeRequest, requestId: string) => + getService.pipe(Effect.flatMap((current) => loadKgCoreEffect(state, current, request, requestId))), + unloadKgCore: (_request: KnowledgeRequest, requestId: string) => Effect.runPromise(sendResponse(state, {}, requestId)), + unloadKgCoreEffect: (_request: KnowledgeRequest, requestId: string) => sendResponse(state, {}, requestId), + listDeCores: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(listDeCoresEffect(state, request, requestId)), + listDeCoresEffect: (request: KnowledgeRequest, requestId: string) => listDeCoresEffect(state, request, requestId), + getDeCore: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(getDeCoreEffect(state, request, requestId)), + getDeCoreEffect: (request: KnowledgeRequest, requestId: string) => getDeCoreEffect(state, request, requestId), + deleteDeCore: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(deleteDeCoreEffect(state, persistPath, request, requestId)), + deleteDeCoreEffect: (request: KnowledgeRequest, requestId: string) => deleteDeCoreEffect(state, persistPath, request, requestId), + putDeCore: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(putDeCoreEffect(state, persistPath, request, requestId)), + putDeCoreEffect: (request: KnowledgeRequest, requestId: string) => putDeCoreEffect(state, persistPath, request, requestId), + loadDeCore: (request: KnowledgeRequest, requestId: string) => Effect.runPromise(loadDeCoreEffect(state, request, requestId)), + loadDeCoreEffect: (request: KnowledgeRequest, requestId: string) => loadDeCoreEffect(state, request, requestId), + persist: () => Effect.runPromise(SynchronizedRef.get(state).pipe(Effect.flatMap((current) => persistStateEffect(persistPath, current)))), + persistEffect: SynchronizedRef.get(state).pipe(Effect.flatMap((current) => persistStateEffect(persistPath, current))), + loadFromDisk: () => Effect.runPromise(loadFromDiskEffect()), + loadFromDiskEffect: loadFromDiskEffect(), + stop: () => + Effect.runPromise( + closeKnowledgeResourcesEffect(state).pipe( + Effect.flatMap(() => + tryPromise("base-stop", () => baseStop()) + ), + ), + ), + }); + return service; } export const KnowledgeCoreService = makeKnowledgeCoreService; +export const loadKnowledgeCoreServiceRuntimeConfig = Effect.fn("loadKnowledgeCoreServiceRuntimeConfig")(function* () { + const processorConfig = yield* loadProcessorRuntimeConfig("knowledge-svc", { + manageProcessSignals: false, + }); + const dataDir = yield* optionalStringConfig("KNOWLEDGE_DATA_DIR"); + return { + ...processorConfig, + ...(dataDir !== undefined ? {dataDir} : {}), + } satisfies KnowledgeCoreServiceConfig; +}); + export const program = makeProcessorProgram({ id: "knowledge-svc", + loadConfig: loadKnowledgeCoreServiceRuntimeConfig(), make: (config) => makeKnowledgeCoreService(config), }); +const knowledgeCoreRuntime = ManagedRuntime.make(Layer.empty); + export function run(): Promise { - return Effect.runPromise(program); + return knowledgeCoreRuntime.runPromise(program); +} + +export function runMain(): void { + NodeRuntime.runMain(program); } diff --git a/ts/scripts/run-knowledge.ts b/ts/scripts/run-knowledge.ts index f96b6128..8ee0ec12 100644 --- a/ts/scripts/run-knowledge.ts +++ b/ts/scripts/run-knowledge.ts @@ -7,9 +7,6 @@ * NATS_URL (default: nats://localhost:4222) * KNOWLEDGE_DATA_DIR (optional, e.g., ./data/knowledge) */ -import { run } from "../packages/flow/src/cores/service.js"; +import {runMain} from "../packages/flow/src/cores/service.js"; -run().catch((err) => { - console.error("Knowledge core service failed:", err); - process.exit(1); -}); +runMain();