Migrate knowledge core service to ref-backed Effect state

This commit is contained in:
elpresidank 2026-06-02 01:07:12 -05:00
parent 5979d38b99
commit 0da0df81c4
5 changed files with 976 additions and 642 deletions

View file

@ -12,22 +12,22 @@ Verified source roots:
- Effect v4 subtree: `/home/elpresidank/YeeBois/projects/beep-effect2/.repos/effect-v4`
- Installed Effect beta used by this workspace: `ts/node_modules/effect`
Current signal counts from `ts/packages` after the 2026-06-02 RAG and agent
requestor bridge slice:
Current signal counts from `ts/packages` after the 2026-06-02 KnowledgeCore
ref-backed state slice:
| Signal | Count |
| --- | ---: |
| `Effect.runPromise` | 198 |
| `Map<` | 65 |
| `Effect.runPromise` | 200 |
| `Map<` | 72 |
| `WebSocket` | 51 |
| `new Map` | 47 |
| `new Map` | 53 |
| `toPromiseRequestor` | 0 |
| `makeAsyncProcessor` | 19 |
| `receive(` | 18 |
| `while (` | 12 |
| `while (` | 11 |
| `new Error` | 14 |
| `new Promise` | 10 |
| `JSON.parse` | 8 |
| `JSON.parse` | 7 |
| `localStorage` | 9 |
| `JSON.stringify` | 6 |
| `setTimeout` | 4 |
@ -135,7 +135,7 @@ Notes:
### 2026-06-02: RAG And Agent Requestor Bridge Slice
- Status: migrated, root-verified, and ready to commit.
- Status: migrated, root-verified, committed, and pushed.
- Completed:
- `ts/packages/flow/src/retrieval/graph-rag.ts` and
`ts/packages/flow/src/retrieval/document-rag.ts` now accept
@ -163,6 +163,39 @@ Notes:
- `cd ts && bun run build`
- `cd ts && bun run test`
### 2026-06-02: KnowledgeCore Ref-Backed State Slice
- Status: migrated and root-verified.
- Completed:
- `ts/packages/flow/src/cores/service.ts` now exposes a typed
`KnowledgeCoreService` instead of `AsyncProcessorRuntime & Record<string,
any>`.
- Runtime state now lives in
`SynchronizedRef<KnowledgeCoreServiceState>` with `kgCores`, `deCores`,
the request consumer, and response producer.
- Knowledge operations now have Effect-returning handlers with Promise
facades only on exported compatibility methods.
- Persistence now decodes legacy and current snapshot shapes with Effect
Schema and encodes JSON through Schema rather than raw
`JSON.parse`/`JSON.stringify` plus assertions.
- The consume loop now uses `Effect.whileLoop`; the remaining
`consumer.receive(2000)` call is a pubsub boundary for this service.
- The service exposes `runMain()` through `NodeRuntime.runMain`; legacy
`run()` uses `ManagedRuntime`, and `ts/scripts/run-knowledge.ts` delegates
to `runMain()`.
- `ts/packages/base/src/schema/messages.ts` now models legacy hyphenated
knowledge request/response aliases so the service can preserve the wire
shape without response type assertions.
- New knowledge-core tests cover ref-backed mutation, graph embedding alias
responses, concurrent state updates, and legacy persistence loading.
- Verification:
- `bun run --cwd ts/packages/base build`
- `bun run --cwd ts/packages/flow build`
- `bun run --cwd ts/packages/flow test`
- `cd ts && bun run check`
- `cd ts && bun run build`
- `cd ts && bun run test`
## Subagent Findings To Preserve
- MCP/workbench:
@ -172,8 +205,9 @@ Notes:
the client API is less Promise-first.
- MCP env is now Config-backed; continue that policy for future MCP settings.
- Flow stateful services:
- Config service ref-backed state is complete. Librarian, cores, and
flow-manager still have mutable poller service objects. These remain good
- Config service and KnowledgeCore service ref-backed state are complete.
Librarian and flow-manager still have mutable poller service objects.
These remain good
candidates for `Context` services,
scoped layers, `Ref`/`SynchronizedRef`, `Schedule`, and managed
persistence.
@ -206,7 +240,6 @@ Notes:
- TrustGraph evidence:
- `ts/packages/flow/src/librarian/service.ts`
- `ts/packages/flow/src/cores/service.ts`
- `ts/packages/flow/src/flow-manager/service.ts`
- Effect primitives:
- `Context`, `Layer.scoped`, `Ref`, `SynchronizedRef`, `Schedule`,
@ -314,7 +347,7 @@ Notes:
## Recommended PR Order
1. Librarian, cores, or flow-manager scoped state migration.
1. Librarian or flow-manager scoped state migration.
2. Client RPC managed runtime/scoped layer cleanup.
3. Base processor registry and constructor shim redesign.
4. Gateway RPC callback and client streaming completion cleanup.

View file

@ -381,7 +381,9 @@ export const KnowledgeRequest = S.Struct({
collection: S.optionalKey(S.String),
triples: OptionalMutableArray(Triple),
graphEmbeddings: OptionalMutableArray(GraphEmbedding),
"graph-embeddings": OptionalMutableArray(GraphEmbedding),
documentEmbeddings: S.optionalKey(DocumentEmbeddingsCore),
"document-embeddings": S.optionalKey(DocumentEmbeddingsCore),
});
export type KnowledgeRequest = typeof KnowledgeRequest.Type;
@ -391,7 +393,9 @@ export const KnowledgeResponse = S.Struct({
eos: S.optionalKey(S.Boolean),
triples: OptionalMutableArray(Triple),
graphEmbeddings: OptionalMutableArray(GraphEmbedding),
"graph-embeddings": OptionalMutableArray(GraphEmbedding),
documentEmbeddings: S.optionalKey(DocumentEmbeddingsCore),
"document-embeddings": S.optionalKey(DocumentEmbeddingsCore),
});
export type KnowledgeResponse = typeof KnowledgeResponse.Type;

View file

@ -0,0 +1,192 @@
import {mkdtemp, rm} from "node:fs/promises";
import {tmpdir} from "node:os";
import {join} from "node:path";
import {Effect, SynchronizedRef} from "effect";
import {describe, expect, it} from "vitest";
import {
topics,
type BackendConsumer,
type BackendProducer,
type CreateConsumerOptions,
type CreateProducerOptions,
type KnowledgeRequest,
type KnowledgeResponse,
type Message,
type PubSubBackend,
type Triple,
} from "@trustgraph/base";
import {makeKnowledgeCoreService} from "../cores/service.js";
class NoopPubSub implements PubSubBackend {
readonly sentByTopic = new Map<string, Array<unknown>>();
async createProducer<T>(options: CreateProducerOptions<T>): Promise<BackendProducer<T>> {
return {
send: async (message) => {
const sent = this.sentByTopic.get(options.topic) ?? [];
sent.push(message);
this.sentByTopic.set(options.topic, sent);
},
flush: async () => undefined,
close: async () => undefined,
};
}
async createConsumer<T>(_options: CreateConsumerOptions): Promise<BackendConsumer<T>> {
return {
receive: async () => null,
acknowledge: async (_message: Message<T>) => undefined,
negativeAcknowledge: async (_message: Message<T>) => undefined,
unsubscribe: async () => undefined,
close: async () => undefined,
};
}
async close(): Promise<void> {}
}
const sampleTriple: Triple = {
s: {type: "IRI", iri: "https://example.test/a"},
p: {type: "IRI", iri: "https://example.test/related"},
o: {type: "LITERAL", value: "alpha"},
};
const makeService = (dataDir: string, backend: PubSubBackend = new NoopPubSub()) =>
makeKnowledgeCoreService({
id: "knowledge-test",
manageProcessSignals: false,
pubsub: backend,
dataDir,
});
const seedResponseProducer = async (
backend: NoopPubSub,
service: ReturnType<typeof makeKnowledgeCoreService>,
) => {
const responseProducer = await backend.createProducer<KnowledgeResponse>({
topic: topics.knowledgeResponse,
});
await Effect.runPromise(
SynchronizedRef.update(service.state, (state) => ({
...state,
responseProducer,
})),
);
};
describe("KnowledgeCoreService operations", () => {
it("stores knowledge cores through ref-backed state and preserves graph embedding aliases", async () => {
const dir = await mkdtemp(join(tmpdir(), "trustgraph-knowledge-service-"));
const backend = new NoopPubSub();
const service = makeService(dir, backend);
await seedResponseProducer(backend, service);
const request: KnowledgeRequest = {
operation: "put-kg-core",
user: "alice",
id: "core-a",
triples: [sampleTriple],
"graph-embeddings": [
{
entity: {type: "IRI", iri: "https://example.test/a"},
vectors: [[1, 2, 3]],
},
],
};
await service.putKgCore(request, "put-1");
const state = await Effect.runPromise(SynchronizedRef.get(service.state));
const core = state.kgCores.get("alice:core-a");
await service.getKgCore({
operation: "get-kg-core",
user: "alice",
id: "core-a",
}, "get-1");
await rm(dir, {recursive: true, force: true});
expect(core?.triples).toEqual([sampleTriple]);
expect(core?.graphEmbeddings).toEqual([
{
entity: {type: "IRI", iri: "https://example.test/a"},
vectors: [[1, 2, 3]],
},
]);
expect(backend.sentByTopic.get(topics.knowledgeResponse)).toEqual([
{},
{
triples: [sampleTriple],
eos: false,
},
{
graphEmbeddings: [
{
entity: {type: "IRI", iri: "https://example.test/a"},
vectors: [[1, 2, 3]],
},
],
"graph-embeddings": [
{
entity: {type: "IRI", iri: "https://example.test/a"},
vectors: [[1, 2, 3]],
},
],
eos: true,
},
]);
});
it("serializes concurrent mutations through ref-backed maps", async () => {
const dir = await mkdtemp(join(tmpdir(), "trustgraph-knowledge-service-"));
const backend = new NoopPubSub();
const service = makeService(dir, backend);
await seedResponseProducer(backend, service);
await Promise.all([
service.putKgCore({
operation: "put-kg-core",
user: "alice",
id: "core-b",
triples: [sampleTriple],
}, "put-a"),
service.putKgCore({
operation: "put-kg-core",
user: "alice",
id: "core-b",
triples: [
{
s: {type: "IRI", iri: "https://example.test/b"},
p: {type: "IRI", iri: "https://example.test/related"},
o: {type: "LITERAL", value: "beta"},
},
],
}, "put-b"),
]);
const state = await Effect.runPromise(SynchronizedRef.get(service.state));
await rm(dir, {recursive: true, force: true});
expect(state.kgCores.get("alice:core-b")?.triples).toHaveLength(2);
});
it("loads the legacy persisted knowledge shape with schema decoding", async () => {
const dir = await mkdtemp(join(tmpdir(), "trustgraph-knowledge-service-"));
const persistPath = join(dir, "knowledge-state.json");
await Bun.write(
persistPath,
JSON.stringify({
"alice:legacy": {
triples: [sampleTriple],
graphEmbeddings: [],
},
}),
);
const service = makeService(dir);
await service.loadFromDisk();
const state = await Effect.runPromise(SynchronizedRef.get(service.state));
await rm(dir, {recursive: true, force: true});
expect(state.kgCores.get("alice:legacy")?.triples).toEqual([sampleTriple]);
});
});

File diff suppressed because it is too large Load diff

View file

@ -7,9 +7,6 @@
* NATS_URL (default: nats://localhost:4222)
* KNOWLEDGE_DATA_DIR (optional, e.g., ./data/knowledge)
*/
import { run } from "../packages/flow/src/cores/service.js";
import {runMain} from "../packages/flow/src/cores/service.js";
run().catch((err) => {
console.error("Knowledge core service failed:", err);
process.exit(1);
});
runMain();