feat: run ingest agents through llm runtime

This commit is contained in:
Andrey Avtomonov 2026-05-15 16:17:19 +02:00
parent bbcfffacb6
commit 418a8e17ae
34 changed files with 386 additions and 282 deletions

View file

@ -1,4 +1,5 @@
import type { KtxLlmProvider } from '@ktx/llm';
import type { KtxLogger } from '../core/index.js';
import { AiSdkKtxLlmRuntime, type AgentTelemetryPort } from '../llm/ai-sdk-runtime.js';
import type { KtxLlmDebugRequestRecorder } from '../llm/debug-request-recorder.js';
import type { AgentRunnerPort, RunLoopParams, RunLoopResult } from '../llm/runtime-port.js';

View file

@ -1,7 +1,6 @@
import type { KtxModelRole } from '@ktx/llm';
import type { ToolSet } from 'ai';
import type { AgentRunnerService } from '../../agent/index.js';
import { type KtxLogger, noopLogger } from '../../core/index.js';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../llm/index.js';
import type { MemoryAction } from '../../memory/index.js';
import type { ContextCandidateForDedup, CuratorPaginationPort, CuratorPaginationReport } from '../ports.js';
import type {
@ -38,7 +37,7 @@ export interface CuratorPaginationInput {
modelRole: KtxModelRole;
buildSystemPrompt: () => string;
buildUserPrompt: (input: CuratorPaginationPromptInput) => string;
buildToolSet: (passNumber: number) => ToolSet;
buildToolSet: (passNumber: number) => KtxRuntimeToolSet;
getReconciliationActions: () => MemoryAction[];
onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void;
}
@ -50,7 +49,7 @@ interface CuratorPaginationResult extends ReconciliationOutcome {
export interface CuratorPaginationServiceDeps {
store: ContextCandidateStorePort;
agentRunner: AgentRunnerService;
agentRunner: AgentRunnerPort;
settings: CuratorPaginationSettings;
logger?: KtxLogger;
}

View file

@ -200,7 +200,7 @@ const makeDeps = () => {
const slValidator = { validateSingleSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }) };
const toolsetFactory = {
createIngestWuToolset: vi.fn().mockReturnValue({
toAiSdkTools: vi.fn().mockReturnValue({}),
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
}),
@ -419,7 +419,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
sessions.push(toolSession);
return {
toAiSdkTools: vi.fn().mockReturnValue({}),
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
@ -591,7 +591,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toAiSdkTools: vi.fn().mockReturnValue({}),
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
@ -663,7 +663,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toAiSdkTools: vi.fn().mockReturnValue({}),
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
@ -834,7 +834,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
it('stores memory-flow provenance and transcript summaries in the ingest report body', async () => {
const deps = makeDeps();
deps.toolsetFactory.createIngestWuToolset.mockReturnValue({
toAiSdkTools: vi.fn().mockReturnValue({
toRuntimeTools: vi.fn().mockReturnValue({
read_raw_span: {
description: 'read a raw span',
inputSchema: {},
@ -1376,7 +1376,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toAiSdkTools: vi.fn().mockReturnValue({}),
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
@ -1933,7 +1933,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toAiSdkTools: vi.fn().mockReturnValue({}),
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};

View file

@ -1,9 +1,9 @@
import { mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import { type Tool, tool } from 'ai';
import pLimit from 'p-limit';
import { z } from 'zod';
import { type KtxLogger, noopLogger } from '../core/index.js';
import { createRuntimeToolDescriptorFromAiTool, type KtxRuntimeToolSet } from '../llm/index.js';
import type { CaptureSession, MemoryAction } from '../memory/index.js';
import type { SemanticLayerService, SemanticLayerSource, SlValidationDeps } from '../sl/index.js';
import { createTouchedSlSources, type ToolContext, type ToolSession } from '../tools/index.js';
@ -694,8 +694,9 @@ export class IngestBundleRunner {
};
const skillsLoadedPerWu: string[] = [];
const loadSkillTool: Record<string, Tool> = {
load_skill: tool({
const loadSkillTool: KtxRuntimeToolSet = {
load_skill: {
name: 'load_skill',
description:
'Load a skill to get specialized instructions. Call this when a skill listed in the system prompt matches the current task.',
inputSchema: z.object({ name: z.string() }),
@ -705,19 +706,23 @@ export class IngestBundleRunner {
const available =
(await this.deps.skillsRegistry.listSkills('memory_agent')).map((s) => s.name).join(', ') ||
'(none)';
return `Skill "${name}" not available. Available: ${available}`;
return { markdown: `Skill "${name}" not available. Available: ${available}` };
}
const body = await readFile(join(skill.path, 'SKILL.md'), 'utf-8');
if (!skillsLoadedPerWu.includes(skill.name)) {
skillsLoadedPerWu.push(skill.name);
}
return {
const structured = {
name: skill.name,
skillDirectory: skill.path,
content: this.deps.skillsRegistry.stripFrontmatter(body),
};
return {
markdown: `# ${structured.name}\n\n${structured.content}`,
structured,
};
},
}),
},
};
const priorProvenance = await this.deps.provenance.findLatestArtifactsForRawPaths(
@ -726,12 +731,15 @@ export class IngestBundleRunner {
wu.rawFiles,
);
const wuEmitUnmappedFallbackTool = {
emit_unmapped_fallback: createEmitUnmappedFallbackTool({
stageIndex,
allowedPaths: new Set(wu.rawFiles),
tableRefExists: (tableRef) =>
this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef),
}),
emit_unmapped_fallback: createRuntimeToolDescriptorFromAiTool(
'emit_unmapped_fallback',
createEmitUnmappedFallbackTool({
stageIndex,
allowedPaths: new Set(wu.rawFiles),
tableRefExists: (tableRef) =>
this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef),
}),
),
};
const systemPrompt = buildWuSystemPrompt({
@ -765,7 +773,7 @@ export class IngestBundleRunner {
wu: wuInner,
loadSkillTool,
emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool,
toolsetTools: wuToolset.toAiSdkTools(wuToolContext),
toolsetTools: wuToolset.toRuntimeTools(wuToolContext),
}),
join(transcriptDir, `${wuInner.unitKey}.jsonl`),
wuInner.unitKey,
@ -921,53 +929,79 @@ export class IngestBundleRunner {
ingest: ingestToolMetadata,
session: rcToolSession,
};
const rcLoadSkill: Record<string, Tool> = {
load_skill: tool({
const rcLoadSkill: KtxRuntimeToolSet = {
load_skill: {
name: 'load_skill',
description: 'Load a skill.',
inputSchema: z.object({ name: z.string() }),
execute: async ({ name }) => {
const skill = await this.deps.skillsRegistry.getSkill(name, 'memory_agent');
if (!skill) {
return `Skill "${name}" not found`;
return { markdown: `Skill "${name}" not found` };
}
const body = await readFile(join(skill.path, 'SKILL.md'), 'utf-8');
return { name: skill.name, content: this.deps.skillsRegistry.stripFrontmatter(body) };
const structured = { name: skill.name, content: this.deps.skillsRegistry.stripFrontmatter(body) };
return { markdown: `# ${structured.name}\n\n${structured.content}`, structured };
},
}),
},
};
const allStagedPaths = new Set<string>([...currentHashes.keys()]);
const rcRawSpanTool = { read_raw_span: createReadRawSpanTool({ stagedDir, allowedPaths: allStagedPaths }) };
const rcStageListTool = { stage_list: createStageListTool({ stageIndex }) };
const rcStageDiffTool = { stage_diff: createStageDiffTool({ stageIndex }) };
const rcRawSpanTool = {
read_raw_span: createRuntimeToolDescriptorFromAiTool(
'read_raw_span',
createReadRawSpanTool({ stagedDir, allowedPaths: allStagedPaths }),
),
};
const rcStageListTool = {
stage_list: createRuntimeToolDescriptorFromAiTool('stage_list', createStageListTool({ stageIndex })),
};
const rcStageDiffTool = {
stage_diff: createRuntimeToolDescriptorFromAiTool('stage_diff', createStageDiffTool({ stageIndex })),
};
const rcEvictionListTool = {
eviction_list: createEvictionListTool({
provenance: this.deps.provenance,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
deletedRawPaths: eviction?.deletedRawPaths ?? [],
}),
eviction_list: createRuntimeToolDescriptorFromAiTool(
'eviction_list',
createEvictionListTool({
provenance: this.deps.provenance,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
deletedRawPaths: eviction?.deletedRawPaths ?? [],
}),
),
};
const rcEmitConflictResolutionTool = {
emit_conflict_resolution: createEmitConflictResolutionTool({ stageIndex }),
emit_conflict_resolution: createRuntimeToolDescriptorFromAiTool(
'emit_conflict_resolution',
createEmitConflictResolutionTool({ stageIndex }),
),
};
const rcEmitEvictionDecisionTool = {
emit_eviction_decision: createEmitEvictionDecisionTool({
stageIndex,
deletedRawPaths: eviction?.deletedRawPaths ?? [],
}),
emit_eviction_decision: createRuntimeToolDescriptorFromAiTool(
'emit_eviction_decision',
createEmitEvictionDecisionTool({
stageIndex,
deletedRawPaths: eviction?.deletedRawPaths ?? [],
}),
),
};
const rcEmitArtifactResolutionTool = {
emit_artifact_resolution: createEmitArtifactResolutionTool({
stageIndex,
allowedPaths: allStagedPaths,
}),
emit_artifact_resolution: createRuntimeToolDescriptorFromAiTool(
'emit_artifact_resolution',
createEmitArtifactResolutionTool({
stageIndex,
allowedPaths: allStagedPaths,
}),
),
};
const rcEmitUnmappedFallbackTool = {
emit_unmapped_fallback: createEmitUnmappedFallbackTool({
stageIndex,
allowedPaths: allStagedPaths,
tableRefExists: (tableRef) => this.tableRefExistsInSemanticLayer(rcScopedSl, slConnectionIds, tableRef),
}),
emit_unmapped_fallback: createRuntimeToolDescriptorFromAiTool(
'emit_unmapped_fallback',
createEmitUnmappedFallbackTool({
stageIndex,
allowedPaths: allStagedPaths,
tableRefExists: (tableRef) => this.tableRefExistsInSemanticLayer(rcScopedSl, slConnectionIds, tableRef),
}),
),
};
const reconcileBaseFraming = await this.deps.promptService.loadPrompt('memory_agent_bundle_ingest_reconcile');
@ -1026,7 +1060,7 @@ export class IngestBundleRunner {
emitArtifactResolutionTool: rcEmitArtifactResolutionTool,
emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool,
readRawSpanTool: rcRawSpanTool,
toolsetTools: rcToolset.toAiSdkTools(rcToolContext),
toolsetTools: rcToolset.toRuntimeTools(rcToolContext),
}),
join(transcriptDir, 'reconcile.jsonl'),
'reconcile',
@ -1075,7 +1109,7 @@ export class IngestBundleRunner {
emitArtifactResolutionTool: rcEmitArtifactResolutionTool,
emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool,
readRawSpanTool: rcRawSpanTool,
toolsetTools: rcToolset.toAiSdkTools(rcToolContext),
toolsetTools: rcToolset.toRuntimeTools(rcToolContext),
}),
join(transcriptDir, 'reconcile.jsonl'),
'reconcile',

View file

@ -55,13 +55,39 @@ describe('createLocalBundleIngestRuntime', () => {
}),
).toThrow(
[
'ktx ingest requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner.',
'ktx ingest requires llm.provider.backend: anthropic, vertex, gateway, or claude-code, or an injected agentRunner.',
`Configure an Anthropic provider, then rerun ingest:`,
` ktx setup --project-dir ${project.projectDir} --anthropic-api-key-env ANTHROPIC_API_KEY --anthropic-model claude-sonnet-4-6 --no-input`,
].join('\n'),
);
});
it('uses a runtime-backed agent runner when claude-code is configured', () => {
const runtime = {
generateText: vi.fn(),
generateObject: vi.fn(),
runAgentLoop: vi.fn(async () => ({ stopReason: 'natural' as const })),
};
project.config.llm = {
provider: { backend: 'claude-code' },
models: { default: 'sonnet' },
promptCaching: { enabled: false },
};
const createLlmRuntime = vi.fn(() => runtime);
const created = createLocalBundleIngestRuntime({
project,
adapters: [new FakeSourceAdapter()],
createLlmRuntime,
});
expect(created).toBeDefined();
expect(createLlmRuntime).toHaveBeenCalledWith(
project.config.llm,
expect.objectContaining({ projectDir: project.projectDir }),
);
});
it('builds runner deps with local SQLite stores and context tools enabled', async () => {
const agentRunner = new AgentRunnerService({ llmProvider: { getModel: () => ({}) as never } as any });

View file

@ -1,20 +1,20 @@
import { mkdirSync } from 'node:fs';
import { join } from 'node:path';
import { fileURLToPath } from 'node:url';
import type { KtxLlmProvider } from '@ktx/llm';
import type { Tool } from 'ai';
import YAML from 'yaml';
import type { AgentRunnerService } from '../agent/index.js';
import { AgentRunnerService as DefaultAgentRunnerService } from '../agent/index.js';
import { localConnectionInfoFromConfig, type KtxSqlQueryExecutorPort } from '../connections/index.js';
import type { KtxEmbeddingPort, KtxLogger } from '../core/index.js';
import { noopLogger, SessionWorktreeService } from '../core/index.js';
import type { KtxSemanticLayerComputePort } from '../daemon/index.js';
import {
createJsonlKtxLlmDebugRequestRecorder,
createRuntimeToolDescriptorFromAiTool,
createLocalKtxEmbeddingProviderFromConfig,
createLocalKtxLlmProviderFromConfig,
createLocalKtxLlmRuntimeFromConfig,
KtxIngestEmbeddingPortAdapter,
RuntimeAgentRunner,
type AgentRunnerPort,
type KtxLlmRuntimePort,
type KtxRuntimeToolSet,
} from '../llm/index.js';
import type { KtxLocalProject } from '../project/index.js';
import { ktxLocalStateDbPath } from '../project/index.js';
@ -100,8 +100,9 @@ const LOCAL_SHAPE_WARNING = 'Local ingest validates semantic-layer YAML shape on
export interface CreateLocalBundleIngestRuntimeOptions {
project: KtxLocalProject;
adapters: SourceAdapter[];
agentRunner?: AgentRunnerService;
llmProvider?: KtxLlmProvider;
agentRunner?: AgentRunnerPort;
llmRuntime?: KtxLlmRuntimePort;
createLlmRuntime?: typeof createLocalKtxLlmRuntimeFromConfig;
llmDebugRequestFile?: string;
memoryModel?: string;
semanticLayerCompute?: KtxSemanticLayerComputePort;
@ -456,12 +457,12 @@ class NoopKnowledgeEventPort implements KnowledgeEventPort {
class LocalIngestToolSet implements IngestToolsetLike {
constructor(
private readonly tools: BaseTool[],
private readonly sourceTools: Record<string, Tool> = {},
private readonly sourceTools: KtxRuntimeToolSet = {},
) {}
toAiSdkTools(context: ToolContext) {
toRuntimeTools(context: ToolContext): KtxRuntimeToolSet {
return {
...Object.fromEntries(this.tools.map((tool) => [tool.name, tool.toAiSdkTool(context)])),
...Object.fromEntries(this.tools.map((tool) => [tool.name, tool.toRuntimeTool(context)])),
...this.sourceTools,
};
}
@ -541,13 +542,16 @@ class LocalIngestToolsetFactory implements IngestToolsetFactoryPort {
}
createIngestWuToolset(session: ToolSession, options?: { includeContextEvidenceTools?: boolean }): IngestToolsetLike {
const sourceTools: Record<string, Tool> =
const sourceTools: KtxRuntimeToolSet =
session.ingest?.sourceKey === 'historic-sql'
? {
emit_historic_sql_evidence: createEmitHistoricSqlEvidenceTool({
connectionId: session.connectionId,
session,
}),
emit_historic_sql_evidence: createRuntimeToolDescriptorFromAiTool(
'emit_historic_sql_evidence',
createEmitHistoricSqlEvidenceTool({
connectionId: session.connectionId,
session,
}),
),
}
: {};
return new LocalIngestToolSet(
@ -571,36 +575,35 @@ function nextLocalJobId(): string {
function localIngestLlmProviderGuardMessage(projectDir: string): string {
return [
'ktx ingest requires llm.provider.backend: anthropic, vertex, or gateway, or an injected agentRunner.',
'ktx ingest requires llm.provider.backend: anthropic, vertex, gateway, or claude-code, or an injected agentRunner.',
'Configure an Anthropic provider, then rerun ingest:',
` ktx setup --project-dir ${projectDir} --anthropic-api-key-env ANTHROPIC_API_KEY --anthropic-model claude-sonnet-4-6 --no-input`,
].join('\n');
}
function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions): {
agentRunner: AgentRunnerService;
llmProvider?: KtxLlmProvider;
agentRunner: AgentRunnerPort;
llmRuntime?: KtxLlmRuntimePort;
} {
const llmProvider =
options.llmProvider ?? createLocalKtxLlmProviderFromConfig(options.project.config.llm) ?? undefined;
const llmRuntime =
options.llmRuntime ??
(options.createLlmRuntime ?? createLocalKtxLlmRuntimeFromConfig)(options.project.config.llm, {
projectDir: options.project.projectDir,
env: process.env,
}) ??
undefined;
if (options.agentRunner) {
return { agentRunner: options.agentRunner, ...(llmProvider ? { llmProvider } : {}) };
return { agentRunner: options.agentRunner, ...(llmRuntime ? { llmRuntime } : {}) };
}
if (!llmProvider) {
if (!llmRuntime) {
throw new Error(localIngestLlmProviderGuardMessage(options.project.projectDir));
}
return {
agentRunner: new DefaultAgentRunnerService({
llmProvider,
logger: options.logger ?? noopLogger,
...(options.llmDebugRequestFile
? { debugRequestRecorder: createJsonlKtxLlmDebugRequestRecorder(options.llmDebugRequestFile) }
: {}),
}),
llmProvider,
agentRunner: new RuntimeAgentRunner(llmRuntime),
llmRuntime,
};
}
@ -627,7 +630,7 @@ export function createLocalBundleIngestRuntime(
const knowledgeIndex = new LocalKnowledgeIndex(options.project, embedding);
const knowledgeEvents = new NoopKnowledgeEventPort();
const wikiService = new KnowledgeWikiService(rootFileStore, embedding, knowledgeIndex, options.project.git, logger);
const { agentRunner, llmProvider } = resolveAgentRunner(options);
const { agentRunner, llmRuntime } = resolveAgentRunner(options);
const promptService = new PromptService({ promptsDir, partials: [], logger });
const storage = new LocalIngestStorage(options.project);
const registry = registerAdapters(options.adapters);
@ -681,10 +684,11 @@ export function createLocalBundleIngestRuntime(
commitMessages: new LocalCommitMessagePort(),
embedding,
contextEvidenceIndex: new ContextEvidenceIndexService({ store: contextStore, embeddings: embedding, logger }),
pageTriage: llmProvider
llmRuntime,
pageTriage: llmRuntime
? new PageTriageService({
store: contextStore,
llmProvider,
llmRuntime,
settings: {
enabled: true,
maxConcurrency: 2,

View file

@ -1,11 +1,10 @@
import { randomUUID } from 'node:crypto';
import { cp, mkdir, rm } from 'node:fs/promises';
import { isAbsolute, resolve } from 'node:path';
import type { KtxLlmProvider } from '@ktx/llm';
import type { AgentRunnerService } from '../agent/index.js';
import type { KtxSqlQueryExecutorPort } from '../connections/index.js';
import type { KtxLogger } from '../core/index.js';
import type { KtxSemanticLayerComputePort } from '../daemon/index.js';
import type { AgentRunnerPort, KtxLlmRuntimePort } from '../llm/index.js';
import type { KtxLocalProject } from '../project/index.js';
import { ktxLocalStateDbPath } from '../project/index.js';
import { planMetabaseFanoutChildren } from './adapters/metabase/fanout-planner.js';
@ -28,8 +27,8 @@ export interface RunLocalIngestOptions {
trigger?: IngestTrigger;
jobId?: string;
memoryFlow?: MemoryFlowEventSink;
agentRunner?: AgentRunnerService;
llmProvider?: KtxLlmProvider;
agentRunner?: AgentRunnerPort;
llmRuntime?: KtxLlmRuntimePort;
llmDebugRequestFile?: string;
memoryModel?: string;
semanticLayerCompute?: KtxSemanticLayerComputePort;
@ -41,7 +40,7 @@ export interface LocalIngestMcpOptions
extends Pick<
RunLocalIngestOptions,
| 'agentRunner'
| 'llmProvider'
| 'llmRuntime'
| 'memoryModel'
| 'semanticLayerCompute'
| 'queryExecutor'
@ -167,8 +166,8 @@ async function runScheduledPullJob(options: {
trigger?: IngestTrigger;
jobId?: string;
memoryFlow?: MemoryFlowEventSink;
agentRunner?: AgentRunnerService;
llmProvider?: KtxLlmProvider;
agentRunner?: AgentRunnerPort;
llmRuntime?: KtxLlmRuntimePort;
memoryModel?: string;
semanticLayerCompute?: KtxSemanticLayerComputePort;
queryExecutor?: KtxSqlQueryExecutorPort;
@ -221,7 +220,7 @@ export async function runLocalIngest(options: RunLocalIngestOptions): Promise<Lo
jobId,
memoryFlow: options.memoryFlow,
agentRunner: options.agentRunner,
llmProvider: options.llmProvider,
llmRuntime: options.llmRuntime,
memoryModel: options.memoryModel,
semanticLayerCompute: options.semanticLayerCompute,
queryExecutor: options.queryExecutor,
@ -406,7 +405,7 @@ export async function runLocalMetabaseIngest(
jobId: childJobId,
memoryFlow: options.memoryFlow,
agentRunner: options.agentRunner,
llmProvider: options.llmProvider,
llmRuntime: options.llmRuntime,
memoryModel: options.memoryModel,
semanticLayerCompute: options.semanticLayerCompute,
queryExecutor: options.queryExecutor,

View file

@ -1,8 +1,7 @@
import type { ToolSet } from 'ai';
import type { KtxModelRole } from '@ktx/llm';
import type { AgentRunnerService } from '../agent/index.js';
import type { KtxEmbeddingPort } from '../core/embedding.js';
import type { GitService, KtxFileStorePort, KtxLogger, SessionOutcome } from '../core/index.js';
import type { AgentRunnerPort, KtxLlmRuntimePort, KtxRuntimeToolSet } from '../llm/index.js';
import type { CaptureSession, MemoryAction, MemoryKnowledgeSlRefsPort } from '../memory/index.js';
import type { PromptService } from '../prompts/index.js';
import type { SkillsRegistryService } from '../skills/index.js';
@ -163,7 +162,7 @@ export interface IngestCommitMessagePort {
}
export interface IngestToolsetLike {
toAiSdkTools(context: ToolContext): ToolSet;
toRuntimeTools(context: ToolContext): KtxRuntimeToolSet;
}
export interface IngestToolsetFactoryPort {
@ -315,7 +314,7 @@ export interface CuratorPaginationPort {
items: ReconcileCandidateForPrompt[];
runState: ReconcilePromptRunState;
}) => string;
buildToolSet: (passNumber: number) => ToolSet;
buildToolSet: (passNumber: number) => KtxRuntimeToolSet;
getReconciliationActions: () => MemoryAction[];
onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void;
}): Promise<ReconciliationOutcome & { report: CuratorPaginationReport; warnings: string[] }>;
@ -350,7 +349,8 @@ export interface IngestBundleRunnerDeps {
registry: SourceAdapterRegistryPort;
diffSetService: DiffSetComputerPort;
sessionWorktreeService: IngestSessionWorktreePort;
agentRunner: AgentRunnerService;
agentRunner: AgentRunnerPort;
llmRuntime?: KtxLlmRuntimePort;
gitService: GitService;
lockingService: IngestLockPort;
storage: IngestStoragePort;

View file

@ -141,26 +141,17 @@ describe('buildReconcileToolSet', () => {
toolsetTools: { sl_write_source: { description: 'sl write', inputSchema: {} as any, execute: slWrite } as any },
});
const correction = await toolSet.sl_write_source.execute?.(
{ connectionId: 'warehouse', sourceName: 'accounts' },
{ toolCallId: 't1' } as any,
);
const correction = await toolSet.sl_write_source.execute?.({ connectionId: 'warehouse', sourceName: 'accounts' });
expect(slWrite).not.toHaveBeenCalled();
expect(correction).toMatchObject({ structured: { success: false, reason: 'verification_ledger_required' } });
await toolSet.record_verification_ledger.execute?.(
{
summary: 'Verified warehouse.accounts with entity_details.',
verifiedIdentifiers: ['warehouse.accounts'],
unverifiedIdentifiers: [],
},
{ toolCallId: 't2' } as any,
);
const written = await toolSet.sl_write_source.execute?.(
{ connectionId: 'warehouse', sourceName: 'accounts' },
{ toolCallId: 't3' } as any,
);
await toolSet.record_verification_ledger.execute?.({
summary: 'Verified warehouse.accounts with entity_details.',
verifiedIdentifiers: ['warehouse.accounts'],
unverifiedIdentifiers: [],
});
const written = await toolSet.sl_write_source.execute?.({ connectionId: 'warehouse', sourceName: 'accounts' });
expect(slWrite).toHaveBeenCalledTimes(1);
expect(written).toMatchObject({ structured: { success: true } });

View file

@ -1,5 +1,5 @@
import type { Tool, ToolSet } from 'ai';
import { buildCanonicalPinsPromptBlock, type CanonicalPin } from '../canonical-pins.js';
import type { KtxRuntimeToolSet } from '../../llm/index.js';
import {
createVerificationLedgerState,
VERIFICATION_LEDGER_PROMPT,
@ -181,19 +181,19 @@ export function buildReconcileUserPrompt(
}
export interface ReconcileToolSetInput {
loadSkillTool: Record<string, Tool>;
stageListTool: Record<string, Tool>;
stageDiffTool: Record<string, Tool>;
evictionListTool: Record<string, Tool>;
emitConflictResolutionTool: Record<string, Tool>;
emitEvictionDecisionTool: Record<string, Tool>;
emitArtifactResolutionTool: Record<string, Tool>;
emitUnmappedFallbackTool: Record<string, Tool>;
readRawSpanTool: Record<string, Tool>;
toolsetTools: ToolSet;
loadSkillTool: KtxRuntimeToolSet;
stageListTool: KtxRuntimeToolSet;
stageDiffTool: KtxRuntimeToolSet;
evictionListTool: KtxRuntimeToolSet;
emitConflictResolutionTool: KtxRuntimeToolSet;
emitEvictionDecisionTool: KtxRuntimeToolSet;
emitArtifactResolutionTool: KtxRuntimeToolSet;
emitUnmappedFallbackTool: KtxRuntimeToolSet;
readRawSpanTool: KtxRuntimeToolSet;
toolsetTools: KtxRuntimeToolSet;
}
export function buildReconcileToolSet(input: ReconcileToolSetInput): ToolSet {
export function buildReconcileToolSet(input: ReconcileToolSetInput): KtxRuntimeToolSet {
const state = createVerificationLedgerState();
return withVerificationLedger(
{

View file

@ -87,21 +87,18 @@ describe('buildWuToolSet', () => {
toolsetTools: { wiki_write: { description: 'write', inputSchema: {} as any, execute: wikiWrite } as any },
});
const correction = await toolSet.wiki_write.execute?.({ key: 'customer-rules' }, { toolCallId: 't1' } as any);
const correction = await toolSet.wiki_write.execute?.({ key: 'customer-rules' });
expect(wikiWrite).not.toHaveBeenCalled();
expect(correction).toMatchObject({ structured: { success: false, reason: 'verification_ledger_required' } });
expect(String((correction as any).markdown)).toContain('record_verification_ledger');
await toolSet.record_verification_ledger.execute?.(
{
summary: 'No warehouse identifiers will be emitted in this wiki write.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
},
{ toolCallId: 't2' } as any,
);
const written = await toolSet.wiki_write.execute?.({ key: 'customer-rules' }, { toolCallId: 't3' } as any);
await toolSet.record_verification_ledger.execute?.({
summary: 'No warehouse identifiers will be emitted in this wiki write.',
verifiedIdentifiers: [],
unverifiedIdentifiers: [],
});
const written = await toolSet.wiki_write.execute?.({ key: 'customer-rules' });
expect(wikiWrite).toHaveBeenCalledTimes(1);
expect(written).toMatchObject({ structured: { success: true } });

View file

@ -1,6 +1,6 @@
import type { Tool, ToolSet } from 'ai';
import { buildCanonicalPinsPromptBlock, type CanonicalPin } from '../canonical-pins.js';
import { createLookerQueryToSlTool } from '../adapters/looker/tools/looker-query-to-sl.tool.js';
import { createRuntimeToolDescriptorFromAiTool, type KtxRuntimeToolSet } from '../../llm/index.js';
import type { IngestProvenanceRow } from '../ports.js';
import { createReadRawFileTool } from '../tools/read-raw-file.tool.js';
import { createReadRawSpanTool } from '../tools/read-raw-span.tool.js';
@ -88,12 +88,12 @@ export interface BuildWuToolSetInput {
sourceKey?: string;
stagedDir: string;
wu: WorkUnit;
loadSkillTool: Record<string, Tool>;
emitUnmappedFallbackTool: Record<string, Tool>;
toolsetTools: ToolSet;
loadSkillTool: KtxRuntimeToolSet;
emitUnmappedFallbackTool: KtxRuntimeToolSet;
toolsetTools: KtxRuntimeToolSet;
}
function withoutWriteSlTools(toolset: ToolSet, wu: WorkUnit): ToolSet {
function withoutWriteSlTools(toolset: KtxRuntimeToolSet, wu: WorkUnit): KtxRuntimeToolSet {
if (!wu.slDisallowed) {
return toolset;
}
@ -103,9 +103,12 @@ function withoutWriteSlTools(toolset: ToolSet, wu: WorkUnit): ToolSet {
return next;
}
export function buildWuToolSet(input: BuildWuToolSetInput): ToolSet {
export function buildWuToolSet(input: BuildWuToolSetInput): KtxRuntimeToolSet {
const allowedPaths = new Set<string>([...input.wu.rawFiles, ...input.wu.dependencyPaths]);
const lookerTools: ToolSet = input.sourceKey === 'looker' ? { looker_query_to_sl: createLookerQueryToSlTool() } : {};
const lookerTools: KtxRuntimeToolSet =
input.sourceKey === 'looker'
? { looker_query_to_sl: createRuntimeToolDescriptorFromAiTool('looker_query_to_sl', createLookerQueryToSlTool()) }
: {};
const state = createVerificationLedgerState();
return withVerificationLedger(
withoutWriteSlTools(
@ -114,8 +117,14 @@ export function buildWuToolSet(input: BuildWuToolSetInput): ToolSet {
...lookerTools,
...input.loadSkillTool,
...input.emitUnmappedFallbackTool,
read_raw_file: createReadRawFileTool({ stagedDir: input.stagedDir, allowedPaths }),
read_raw_span: createReadRawSpanTool({ stagedDir: input.stagedDir, allowedPaths }),
read_raw_file: createRuntimeToolDescriptorFromAiTool(
'read_raw_file',
createReadRawFileTool({ stagedDir: input.stagedDir, allowedPaths }),
),
read_raw_span: createRuntimeToolDescriptorFromAiTool(
'read_raw_span',
createReadRawSpanTool({ stagedDir: input.stagedDir, allowedPaths }),
),
},
input.wu,
),

View file

@ -1,6 +1,5 @@
import type { AgentRunnerService } from '@ktx/context/agent';
import type { KtxModelRole } from '@ktx/llm';
import type { Tool } from 'ai';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '@ktx/context';
import type { CaptureSession, MemoryAction } from '../../memory/index.js';
import { listTouchedSlSources, type TouchedSlSource } from '../../tools/index.js';
import type { WorkUnit } from '../types.js';
@ -14,12 +13,12 @@ export interface TouchedValidationResult {
export interface WorkUnitExecutionDeps {
sessionWorktreeGit: { revParseHead(): Promise<string | null> };
agentRunner: AgentRunnerService;
agentRunner: AgentRunnerPort;
validateTouchedSources: (touched: TouchedSlSource[]) => Promise<TouchedValidationResult>;
resetHardTo: (targetSha: string) => Promise<void>;
buildSystemPrompt: (wu: WorkUnit) => string;
buildUserPrompt: (wu: WorkUnit) => string;
buildToolSet: (wu: WorkUnit) => Record<string, Tool>;
buildToolSet: (wu: WorkUnit) => KtxRuntimeToolSet;
captureSession: CaptureSession;
sessionActions: MemoryAction[];
modelRole: KtxModelRole;

View file

@ -1,16 +1,15 @@
import type { AgentRunnerService } from '@ktx/context/agent';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '@ktx/context';
import type { KtxModelRole } from '@ktx/llm';
import type { ToolSet } from 'ai';
import type { EvictionUnit } from '../types.js';
import type { StageIndex } from './stage-index.types.js';
export interface ReconciliationContext {
stageIndex: StageIndex;
evictionUnit: EvictionUnit | undefined;
agentRunner: AgentRunnerService;
agentRunner: AgentRunnerPort;
buildSystemPrompt: (idx: StageIndex, ev: EvictionUnit | undefined) => string;
buildUserPrompt: (idx: StageIndex, ev: EvictionUnit | undefined) => string;
buildToolSet: () => ToolSet;
buildToolSet: () => KtxRuntimeToolSet;
modelRole: KtxModelRole;
stepBudget: number;
sourceKey: string;

View file

@ -1,6 +1,6 @@
import { appendFile, mkdir } from 'node:fs/promises';
import { dirname } from 'node:path';
import type { ToolExecuteFunction, ToolExecutionOptions, ToolSet } from 'ai';
import type { KtxRuntimeToolSet } from '../../llm/index.js';
export interface ToolCallLogEntry {
ts: string;
@ -31,7 +31,7 @@ interface ToolCallLoggerOptions {
* sequential (`generateText` awaits each tool result), so per-WU files are
* effectively single-writer and lines land in call order.
*/
export function wrapToolsWithLogger<T extends ToolSet>(
export function wrapToolsWithLogger<T extends KtxRuntimeToolSet>(
tools: T,
logFilePath: string,
wuKey: string,
@ -44,17 +44,13 @@ export function wrapToolsWithLogger<T extends ToolSet>(
wrapped[name] = original;
continue;
}
const wrappedExecute: ToolExecuteFunction<unknown, unknown> = async (
input: unknown,
opts: ToolExecutionOptions,
) => {
const wrappedExecute = async (input: unknown) => {
const start = Date.now();
try {
const output = await (originalExecute as ToolExecuteFunction<unknown, unknown>)(input, opts);
const output = await originalExecute(input);
const entry: ToolCallLogEntry = {
ts: new Date().toISOString(),
wuKey,
toolCallId: opts.toolCallId,
toolName: name,
durationMs: Date.now() - start,
input,
@ -67,7 +63,6 @@ export function wrapToolsWithLogger<T extends ToolSet>(
const entry: ToolCallLogEntry = {
ts: new Date().toISOString(),
wuKey,
toolCallId: opts.toolCallId,
toolName: name,
durationMs: Date.now() - start,
input,

View file

@ -1,5 +1,5 @@
import { tool, type ToolExecuteFunction, type ToolExecutionOptions, type ToolSet } from 'ai';
import { z } from 'zod';
import type { KtxRuntimeToolDescriptor, KtxRuntimeToolSet } from '../../llm/index.js';
const verificationLedgerInputSchema = z.object({
summary: z.string().min(1).max(2000),
@ -37,22 +37,19 @@ export function createVerificationLedgerState(): VerificationLedgerState {
return { entries: [] };
}
export function withVerificationLedger(tools: ToolSet, state: VerificationLedgerState): ToolSet {
const wrapped: ToolSet = {};
export function withVerificationLedger(tools: KtxRuntimeToolSet, state: VerificationLedgerState): KtxRuntimeToolSet {
const wrapped: KtxRuntimeToolSet = {};
for (const [name, original] of Object.entries(tools)) {
if (!WRITE_TOOL_NAMES.has(name) || typeof original.execute !== 'function') {
wrapped[name] = original;
continue;
}
const originalExecute = original.execute;
const guardedExecute: ToolExecuteFunction<unknown, unknown> = async (
input: unknown,
opts: ToolExecutionOptions,
) => {
const guardedExecute = async (input: unknown) => {
if (state.entries.length === 0) {
return verificationRequiredOutput(name);
}
return (originalExecute as ToolExecuteFunction<unknown, unknown>)(input, opts);
return originalExecute(input);
};
wrapped[name] = { ...original, execute: guardedExecute };
}
@ -60,8 +57,9 @@ export function withVerificationLedger(tools: ToolSet, state: VerificationLedger
return wrapped;
}
function createRecordVerificationLedgerTool(state: VerificationLedgerState) {
return tool({
function createRecordVerificationLedgerTool(state: VerificationLedgerState): KtxRuntimeToolDescriptor {
return {
name: 'record_verification_ledger',
description:
'Record the pre-write verification ledger required by loaded ingest skills. Call this before wiki/SL/fallback writes to state what was verified, which tool calls support it, and what remains intentionally unverified.',
inputSchema: verificationLedgerInputSchema,
@ -78,7 +76,7 @@ function createRecordVerificationLedgerTool(state: VerificationLedgerState) {
structured: { success: true, entry },
};
},
});
};
}
function verificationRequiredOutput(toolName: string) {

View file

@ -1,5 +1,5 @@
import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider } from '@ktx/llm';
import { generateText, Output, stepCountIs, type FlexibleSchema, type TelemetrySettings } from 'ai';
import { generateText, Output, stepCountIs, type FlexibleSchema, type TelemetrySettings, type ToolSet } from 'ai';
import type { z } from 'zod';
import { noopLogger, type KtxLogger } from '../core/index.js';
import { summarizeKtxLlmDebugRequest, type KtxLlmDebugRequestRecorder } from './debug-request-recorder.js';
@ -52,7 +52,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
temperature: input.temperature ?? 0,
...(split.system ? { system: split.system } : {}),
messages: split.messages,
tools: built.tools,
tools: built.tools as ToolSet,
...(hasTools(tools)
? {
experimental_repairToolCall: this.deps.llmProvider.repairToolCallHandler({
@ -84,7 +84,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
temperature: input.temperature ?? 0,
...(split.system ? { system: split.system } : {}),
messages: split.messages,
tools: built.tools,
tools: built.tools as ToolSet,
...(hasTools(tools)
? {
experimental_repairToolCall: this.deps.llmProvider.repairToolCallHandler({
@ -137,7 +137,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
}),
...(promptMessages.system ? { system: promptMessages.system } : {}),
messages: promptMessages.messages,
tools: built.tools,
tools: built.tools as ToolSet,
onStepFinish: async () => {
stepIndex += 1;
if (!params.onStepFinish) {

View file

@ -16,7 +16,7 @@ function initMessage(overrides: Partial<Extract<SDKMessage, { type: 'system'; su
return {
type: 'system',
subtype: 'init',
apiKeySource: 'none',
apiKeySource: 'none' as never,
claude_code_version: '0.3.142',
cwd: '/tmp/project',
tools: [],
@ -27,7 +27,7 @@ function initMessage(overrides: Partial<Extract<SDKMessage, { type: 'system'; su
output_style: 'default',
skills: [],
plugins: [],
uuid: 'init-id',
uuid: '00000000-0000-4000-8000-000000000001',
session_id: 'session-id',
...overrides,
};
@ -51,7 +51,7 @@ function resultMessage(overrides: Partial<Extract<SDKMessage, { type: 'result' }
modelUsage: {},
permission_denials: [],
errors: [],
uuid: 'result-id',
uuid: '00000000-0000-4000-8000-000000000002',
session_id: 'session-id',
...overrides,
} as Extract<SDKMessage, { type: 'result' }>;
@ -59,7 +59,7 @@ function resultMessage(overrides: Partial<Extract<SDKMessage, { type: 'result' }
describe('ClaudeCodeKtxLlmRuntime', () => {
it('passes isolation options and scrubbed env to text generation', async () => {
const query = vi.fn(() => stream([initMessage(), resultMessage({ result: 'hello' })]));
const query = vi.fn((_input: any) => stream([initMessage(), resultMessage({ result: 'hello' })]));
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
@ -88,7 +88,7 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
it('validates structured output with the caller schema', async () => {
const schema = z.object({ answer: z.string() });
const query = vi.fn(() => stream([initMessage(), resultMessage({ structured_output: { answer: 'yes' } })]));
const query = vi.fn((_input: any) => stream([initMessage(), resultMessage({ structured_output: { answer: 'yes' } })]));
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
@ -104,16 +104,16 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
});
it('registers only exact KTX MCP tool ids and denies non-KTX tools', async () => {
const query = vi.fn(() =>
const query = vi.fn((_input: any) =>
stream([
initMessage({ tools: ['mcp__ktx__load_skill'], mcp_servers: [{ name: 'ktx', status: 'connected' }] }),
{
type: 'assistant',
message: { role: 'assistant', content: [] },
parent_tool_use_id: null,
uuid: 'assistant-1',
uuid: '00000000-0000-4000-8000-000000000003',
session_id: 'session-id',
} as SDKMessage,
} as unknown as SDKMessage,
resultMessage({ subtype: 'error_max_turns', is_error: true }),
]),
);
@ -164,7 +164,7 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
});
it('auth probe uses isolation options and a scrubbed env', async () => {
const query = vi.fn(() => stream([initMessage(), resultMessage({ result: 'ok' })]));
const query = vi.fn((_input: any) => stream([initMessage(), resultMessage({ result: 'ok' })]));
await expect(
runClaudeCodeAuthProbe({ projectDir: '/tmp/project', model: 'sonnet', query, env: { ANTHROPIC_API_KEY: 'sk-ant-test' } }),

View file

@ -183,6 +183,9 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
if (error) {
throw error;
}
if (result.subtype !== 'success') {
throw new Error(`Claude Code query failed (${result.subtype})`);
}
return result.result;
}
@ -209,6 +212,9 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
if (error) {
throw error;
}
if (result.subtype !== 'success') {
throw new Error(`Claude Code query failed (${result.subtype})`);
}
return (input.schema as z.ZodType<TOutput>).parse(result.structured_output);
}

View file

@ -19,7 +19,13 @@ export type {
RunLoopStopReason,
} from './runtime-port.js';
export { RuntimeAgentRunner } from './runtime-port.js';
export { createAiSdkToolSet, createClaudeSdkTools, normalizeKtxRuntimeToolOutput } from './runtime-tools.js';
export {
createAiSdkToolSet,
createClaudeSdkTools,
createRuntimeToolDescriptorFromAiTool,
createRuntimeToolSetFromAiSdkTools,
normalizeKtxRuntimeToolOutput,
} from './runtime-tools.js';
export type {
KtxLlmDebugProviderOptionsEntry,
KtxLlmDebugRequest,

View file

@ -1,4 +1,4 @@
import { tool as aiTool, type ToolSet } from 'ai';
import { tool as aiTool, type Tool, type ToolSet } from 'ai';
import { tool as claudeTool, type SdkMcpToolDefinition } from '@anthropic-ai/claude-agent-sdk';
import type { CallToolResult } from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';
@ -67,3 +67,25 @@ export function createClaudeSdkTools(tools: KtxRuntimeToolSet = {}): Array<SdkMc
export function mcpToolIds(tools: KtxRuntimeToolSet = {}): string[] {
return Object.keys(tools).map((name) => `mcp__ktx__${name}`);
}
export function createRuntimeToolDescriptorFromAiTool(name: string, aiSdkTool: Tool): KtxRuntimeToolDescriptor {
return {
name,
description: aiSdkTool.description ?? '',
inputSchema: aiSdkTool.inputSchema as KtxRuntimeToolDescriptor['inputSchema'],
execute: async (input) => {
if (typeof aiSdkTool.execute !== 'function') {
throw new Error(`KTX runtime tool "${name}" has no execute function`);
}
return normalizeKtxRuntimeToolOutput(
await aiSdkTool.execute(input as never, { toolCallId: `runtime-${name}` } as never),
);
},
};
}
export function createRuntimeToolSetFromAiSdkTools(tools: ToolSet = {}): KtxRuntimeToolSet {
return Object.fromEntries(
Object.entries(tools).map(([name, aiSdkTool]) => [name, createRuntimeToolDescriptorFromAiTool(name, aiSdkTool as Tool)]),
);
}

View file

@ -670,7 +670,7 @@ export function createLocalProjectMcpContextPorts(
jobIdFactory: options.localIngest?.jobIdFactory,
pullConfigOptions: options.localIngest?.pullConfigOptions,
agentRunner: options.localIngest?.agentRunner,
llmProvider: options.localIngest?.llmProvider,
llmRuntime: options.localIngest?.llmRuntime,
memoryModel: options.localIngest?.memoryModel,
semanticLayerCompute: options.localIngest?.semanticLayerCompute ?? options.semanticLayerCompute,
queryExecutor: options.localIngest?.queryExecutor ?? options.queryExecutor,
@ -704,7 +704,7 @@ export function createLocalProjectMcpContextPorts(
trigger: input.trigger,
jobId: options.localIngest?.jobIdFactory?.(),
agentRunner: options.localIngest?.agentRunner,
llmProvider: options.localIngest?.llmProvider,
llmRuntime: options.localIngest?.llmRuntime,
memoryModel: options.localIngest?.memoryModel,
semanticLayerCompute: options.localIngest?.semanticLayerCompute ?? options.semanticLayerCompute,
queryExecutor: options.localIngest?.queryExecutor ?? options.queryExecutor,

View file

@ -1,13 +1,17 @@
import { join } from 'node:path';
import { fileURLToPath } from 'node:url';
import type { KtxLlmProvider } from '@ktx/llm';
import YAML from 'yaml';
import { AgentRunnerService } from '../agent/index.js';
import { localConnectionInfoFromConfig } from '../connections/index.js';
import type { KtxEmbeddingPort, KtxFileStorePort, KtxFileWriteResult } from '../core/index.js';
import { type KtxLogger, noopLogger, SessionWorktreeService } from '../core/index.js';
import type { KtxSemanticLayerComputePort } from '../daemon/index.js';
import { createLocalKtxLlmProviderFromConfig } from '../llm/index.js';
import {
createLocalKtxLlmRuntimeFromConfig,
RuntimeAgentRunner,
type AgentRunnerPort,
type KtxLlmRuntimePort,
type KtxRuntimeToolSet,
} from '../llm/index.js';
import type { KtxLocalProject } from '../project/index.js';
import { PromptService } from '../prompts/index.js';
import { SkillsRegistryService } from '../skills/index.js';
@ -63,8 +67,8 @@ const LOCAL_AUTHOR = { name: 'KTX Local', email: 'local@ktx.local' };
const LOCAL_SHAPE_WARNING = 'Local memory capture validates semantic-layer YAML shape only.';
export interface CreateLocalProjectMemoryCaptureOptions {
llmProvider?: KtxLlmProvider;
agentRunner?: AgentRunnerService;
llmRuntime?: KtxLlmRuntimePort;
agentRunner?: AgentRunnerPort;
memoryModel?: string;
semanticLayerCompute?: KtxSemanticLayerComputePort;
queryExecutor?: { execute(input: { connectionId: string; sql: string; maxRows?: number }): Promise<KtxQueryResult> };
@ -89,7 +93,8 @@ export function createLocalProjectMemoryCapture(
const slSearchService = new SlSearchService(embedding, slSourcesRepository, logger);
const wikiService = new KnowledgeWikiService(rootFileStore, embedding, knowledgeIndex, project.git, logger);
const authorResolver = new LocalAuthorResolver();
const llmProvider = options.llmProvider ?? createLocalKtxLlmProviderFromConfig(project.config.llm);
const llmRuntime =
options.llmRuntime ?? createLocalKtxLlmRuntimeFromConfig(project.config.llm, { projectDir: project.projectDir });
const toolsetFactory = new LocalMemoryToolsetFactory({
project,
embedding,
@ -104,10 +109,7 @@ export function createLocalProjectMemoryCapture(
});
const agentRunner =
options.agentRunner ??
new AgentRunnerService({
llmProvider: requireLlmProvider(llmProvider),
logger,
});
new RuntimeAgentRunner(requireLlmRuntime(llmRuntime));
const memoryAgent = new MemoryAgentService({
settings: {
knowledge: { userScopedKnowledgeEnabled: false },
@ -143,11 +145,11 @@ export function createLocalProjectMemoryCapture(
});
}
function requireLlmProvider(provider: KtxLlmProvider | null | undefined): KtxLlmProvider {
if (!provider) {
function requireLlmRuntime(runtime: KtxLlmRuntimePort | null | undefined): KtxLlmRuntimePort {
if (!runtime) {
throw new Error('createLocalProjectMemoryCapture requires llm.provider.backend or an injected agentRunner');
}
return provider;
return runtime;
}
class LocalMemoryFileStore implements MemoryFileStorePort {
@ -386,8 +388,8 @@ class LocalShapeOnlySlValidator implements SlValidatorPort<SlValidationDeps> {
class LocalMemoryToolSet implements MemoryToolSetLike {
constructor(private readonly tools: BaseTool[]) {}
toAiSdkTools(context: ToolContext) {
return Object.fromEntries(this.tools.map((tool) => [tool.name, tool.toAiSdkTool(context)]));
toRuntimeTools(context: ToolContext): KtxRuntimeToolSet {
return Object.fromEntries(this.tools.map((tool) => [tool.name, tool.toRuntimeTool(context)]));
}
}

View file

@ -1,3 +1,6 @@
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
// Module-level mock for 'ai' so generateText is a stub. This file is separate from
@ -124,11 +127,11 @@ const buildMocks = (overrides: Partial<BuiltMocks> = {}): BuiltMocks => {
slValidator: { validateSingleSource: vi.fn().mockResolvedValue({ errors: [], warnings: [] }) },
toolsetFactory: {
createIngestWuToolset: vi.fn().mockReturnValue({
toAiSdkTools: vi.fn().mockReturnValue({}),
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
}),
createToolset: vi.fn().mockReturnValue({
toAiSdkTools: vi.fn().mockReturnValue({}),
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
}),
},
@ -241,6 +244,39 @@ describe('MemoryAgentService.ingest — session-branch orchestration', () => {
expect(result.commitHash).toBe('cafebabe');
});
it('normalizes load_skill output to markdown while preserving structured payload', async () => {
const tempDir = await mkdtemp(join(tmpdir(), 'ktx-memory-skill-'));
const skillDir = join(tempDir, 'memory_agent');
await mkdir(skillDir, { recursive: true });
await writeFile(join(skillDir, 'SKILL.md'), '---\nname: memory_agent\n---\nSkill body', 'utf-8');
try {
const agentRunner = {
runLoop: vi.fn(async (params: any) => {
const result = await params.toolSet.load_skill.execute({ name: 'memory_agent' });
expect(result.markdown).toContain('memory_agent');
expect(result.structured).toMatchObject({ name: 'memory_agent' });
return { stopReason: 'natural' as const };
}),
};
const mocks = buildMocks({
agentRunner,
skillsRegistry: {
listSkills: vi.fn().mockResolvedValue([{ name: 'memory_agent', path: skillDir }]),
buildSkillsPrompt: vi.fn().mockReturnValue(''),
getSkill: vi.fn().mockResolvedValue({ name: 'memory_agent', path: skillDir }),
stripFrontmatter: vi.fn().mockReturnValue('Skill body'),
},
});
const svc = buildService(mocks);
await svc.ingest(baseInput);
expect(agentRunner.runLoop).toHaveBeenCalled();
} finally {
await rm(tempDir, { recursive: true, force: true });
}
});
it('logs prompt debug output when KTX_MEMORY_AGENT_DEBUG_PROMPTS is enabled', async () => {
const previousDebugPrompts = process.env.KTX_MEMORY_AGENT_DEBUG_PROMPTS;
const mocks = buildMocks();

View file

@ -1,10 +1,10 @@
import { createHash } from 'node:crypto';
import { readFile } from 'node:fs/promises';
import { join } from 'node:path';
import { tool } from 'ai';
import * as YAML from 'yaml';
import { z } from 'zod';
import { type KtxLogger, noopLogger } from '../core/index.js';
import type { KtxRuntimeToolSet } from '../llm/index.js';
import {
revertSourceToPreHead,
type SemanticLayerSource,
@ -125,8 +125,9 @@ export class MemoryAgentService {
session: toolSession,
};
const loadSkillTool = {
load_skill: tool({
const loadSkillTool: KtxRuntimeToolSet = {
load_skill: {
name: 'load_skill',
description:
'Load a skill to get specialized instructions. Call this when a skill listed in the system prompt matches the current task.',
inputSchema: z.object({
@ -137,23 +138,27 @@ export class MemoryAgentService {
if (!skill) {
const available =
(await this.deps.skillsRegistry.listSkills('memory_agent')).map((s) => s.name).join(', ') || '(none)';
return `Skill "${name}" not available to the memory agent. Available: ${available}`;
return { markdown: `Skill "${name}" not available to the memory agent. Available: ${available}` };
}
try {
const body = await readFile(join(skill.path, 'SKILL.md'), 'utf-8');
if (!skillsLoaded.includes(skill.name)) {
skillsLoaded.push(skill.name);
}
return {
const structured = {
name: skill.name,
skillDirectory: skill.path,
content: this.deps.skillsRegistry.stripFrontmatter(body),
};
return {
markdown: `# ${structured.name}\n\n${structured.content}`,
structured,
};
} catch (e) {
return `Error loading skill "${name}": ${e instanceof Error ? e.message : String(e)}`;
return { markdown: `Error loading skill "${name}": ${e instanceof Error ? e.message : String(e)}` };
}
},
}),
},
};
const skillNames: string[] = [...DEFAULT_SKILL_NAMES];
@ -212,7 +217,7 @@ export class MemoryAgentService {
modelRole: 'candidateExtraction',
systemPrompt,
userPrompt: prompt,
toolSet: { ...toolset.toAiSdkTools(toolContext), ...loadSkillTool },
toolSet: { ...toolset.toRuntimeTools(toolContext), ...loadSkillTool },
stepBudget,
telemetryTags: {
operationName: 'memory-agent-ingest',

View file

@ -1,5 +1,4 @@
import type { Tool } from 'ai';
import type { AgentRunnerService } from '../agent/index.js';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../llm/index.js';
import type { GitService, KtxFileStorePort, KtxLogger, SessionWorktreeService } from '../core/index.js';
import type { PromptService } from '../prompts/index.js';
import type { SkillsRegistryService } from '../skills/index.js';
@ -118,7 +117,7 @@ export interface MemoryCommitMessagePort {
export interface MemoryFileStorePort extends KtxFileStorePort<MemoryFileStorePort>, MemoryCommitMessagePort {}
export interface MemoryToolSetLike {
toAiSdkTools(context: ToolContext): Record<string, Tool>;
toRuntimeTools(context: ToolContext): KtxRuntimeToolSet;
}
export interface MemoryToolsetFactoryPort {
@ -150,7 +149,7 @@ export interface MemoryAgentServiceDeps {
slSourcesRepository: SlSourcesIndexPort;
sessionWorktreeService: SessionWorktreeService<MemoryFileStorePort>;
semanticLayerSourceReconciler: MemorySlSourceReconcilerPort;
agentRunner: AgentRunnerService;
agentRunner: AgentRunnerPort;
slValidator: SlValidatorPort<SlValidationDeps>;
toolsetFactory: MemoryToolsetFactoryPort;
telemetry?: MemoryTelemetryPort;

View file

@ -264,7 +264,6 @@ export type {
} from './relationship-graph-resolver.js';
export { resolveKtxRelationshipGraph } from './relationship-graph-resolver.js';
export type {
KtxRelationshipLlmProposalGenerateText,
KtxRelationshipLlmProposalResult,
KtxRelationshipLlmProposalSettings,
ProposeKtxRelationshipCandidatesWithLlmInput,

View file

@ -356,7 +356,7 @@ describe('local scan enrichment', () => {
it('honors scan relationship config when LLM proposals are disabled', async () => {
const providers = createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 3 });
const getModel = vi.fn(() => ({ modelId: 'provider/language-model', provider: 'gateway' }));
const generateObject = vi.fn();
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'relationships',
@ -365,9 +365,9 @@ describe('local scan enrichment', () => {
context: { runId: 'scan-run-llm-disabled' },
providers: {
...providers,
llm: {
...providers.llm,
getModel: getModel as never,
llmRuntime: {
...providers.llmRuntime,
generateObject: generateObject as never,
},
},
relationshipSettings: {
@ -378,7 +378,7 @@ describe('local scan enrichment', () => {
});
expect(result.summary.llmRelationshipValidation).toBe('skipped');
expect(getModel).not.toHaveBeenCalledWith('candidateExtraction');
expect(generateObject).not.toHaveBeenCalled();
});
it('skips relationship detection when scan relationships are disabled', async () => {
@ -628,7 +628,7 @@ describe('local scan enrichment', () => {
connector: scanConnector,
context: { runId: 'scan-run-batched-embeddings' },
providers: {
llm: deterministicProviders.llm,
llmRuntime: deterministicProviders.llmRuntime,
embedding: {
dimensions: 3,
maxBatchSize: 2,
@ -658,7 +658,7 @@ describe('local scan enrichment', () => {
providerIdentity: { provider: 'deterministic', embeddingDimensions: 6 },
});
const getModel = vi.spyOn(providers.llm, 'getModel');
const generateText = vi.spyOn(providers.llmRuntime, 'generateText');
const embedBatch = vi.spyOn(providers.embedding, 'embedBatch');
const second = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -676,7 +676,7 @@ describe('local scan enrichment', () => {
expect(first.state.resumedStages).toEqual([]);
expect(second.state.resumedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(second.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(getModel).not.toHaveBeenCalled();
expect(generateText).not.toHaveBeenCalled();
expect(embedBatch).not.toHaveBeenCalled();
expect(second.descriptionUpdates).toEqual(first.descriptionUpdates);
expect(second.embeddingUpdates).toEqual(first.embeddingUpdates);
@ -711,7 +711,7 @@ describe('local scan enrichment', () => {
tables: [{ ...firstTable, name: 'customers' }],
})),
};
const getModel = vi.spyOn(providers.llm, 'getModel');
const generateText = vi.spyOn(providers.llmRuntime, 'generateText');
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -727,7 +727,7 @@ describe('local scan enrichment', () => {
expect(result.state.resumedStages).toEqual([]);
expect(result.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(getModel).toHaveBeenCalled();
expect(generateText).toHaveBeenCalled();
});
it('runs providerless enriched scans as relationship-only discovery enrichment', async () => {

View file

@ -638,7 +638,7 @@ describe('local scan', () => {
enrichmentProviders: {
llmRuntime: {
...deterministicLlmRuntime(),
generateObject,
generateObject: generateObject as never,
},
embedding: {
dimensions: 8,

View file

@ -6,6 +6,7 @@ import { gunzipSync } from 'node:zlib';
import Database from 'better-sqlite3';
import YAML from 'yaml';
import { z } from 'zod';
import type { KtxLlmRuntimePort } from '../llm/index.js';
import type { KtxEnrichedRelationship, KtxEnrichedSchema, KtxRelationshipType } from './enrichment-types.js';
import { snapshotToKtxEnrichedSchema } from './local-enrichment.js';
import type { KtxRelationshipDiscoveryCandidate } from './relationship-candidates.js';
@ -13,7 +14,6 @@ import {
generateKtxRelationshipDiscoveryCandidates,
mergeKtxRelationshipDiscoveryCandidates,
} from './relationship-candidates.js';
import type { KtxLlmProvider } from '@ktx/llm';
import { proposeKtxRelationshipCandidatesWithLlm } from './relationship-llm-proposal.js';
import {
discoverKtxCompositeRelationships,
@ -527,7 +527,7 @@ export function isKtxRelationshipBenchmarkTuningEligible(input: {
}
export function ktxRelationshipBenchmarkDetectorWithLlm(
llmProvider: KtxLlmProvider,
llmRuntime: KtxLlmRuntimePort,
): KtxRelationshipBenchmarkDetector {
return {
async detect(input) {
@ -566,7 +566,7 @@ export function ktxRelationshipBenchmarkDetectorWithLlm(
connectionId: input.snapshot.connectionId,
schema: input.schema,
profile: profiles,
llmProvider,
llmRuntime,
});
const candidates = mergeKtxRelationshipDiscoveryCandidates([
...broadRelationshipCandidates,

View file

@ -1,6 +1,6 @@
import type { KtxLlmProvider } from '@ktx/llm';
import Database from 'better-sqlite3';
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { KtxLlmRuntimePort } from '../llm/index.js';
import { buildDefaultKtxProjectConfig } from '../project/config.js';
import { snapshotToKtxEnrichedSchema } from './local-enrichment.js';
import {
@ -216,29 +216,11 @@ function connector(executor: InMemorySqliteExecutor | null): KtxScanConnector {
};
}
function llmProvider(): KtxLlmProvider {
const model = { modelId: 'claude-sonnet-4-6', provider: 'anthropic' };
function llmRuntime(output: unknown): KtxLlmRuntimePort {
return {
getModel: vi.fn(() => model as ReturnType<KtxLlmProvider['getModel']>),
getModelByName: vi.fn(() => model as ReturnType<KtxLlmProvider['getModelByName']>),
cacheMarker: vi.fn(),
repairToolCallHandler: vi.fn(),
thinkingProviderOptions: vi.fn(() => ({})),
telemetryConfig: vi.fn(() => undefined),
promptCachingConfig: vi.fn(
() =>
({
enabled: false,
systemTtl: '1h',
toolsTtl: '1h',
historyTtl: '5m',
cacheSystem: true,
cacheTools: true,
cacheHistory: true,
vertexFallbackTo5m: false,
}) as ReturnType<KtxLlmProvider['promptCachingConfig']>,
),
activeBackend: vi.fn(() => 'anthropic' as ReturnType<KtxLlmProvider['activeBackend']>),
generateText: vi.fn(),
generateObject: vi.fn(async () => output) as KtxLlmRuntimePort['generateObject'],
runAgentLoop: vi.fn(),
};
}
@ -505,21 +487,19 @@ describe('production relationship discovery', () => {
INSERT INTO customers (id) VALUES (1), (2);
INSERT INTO orders (id, buyer_ref) VALUES (10, 1), (11, 2);
`);
const generateText = vi.fn(async () => ({
output: {
pkCandidates: [{ table: 'customers', column: 'id', confidence: 0.91, rationale: 'Unique customer key.' }],
fkCandidates: [
{
fromTable: 'orders',
fromColumn: 'buyer_ref',
toTable: 'customers',
toColumn: 'id',
confidence: 0.89,
rationale: 'Buyer reference values align with customer identifiers.',
},
],
},
}));
const llmOutput = {
pkCandidates: [{ table: 'customers', column: 'id', confidence: 0.91, rationale: 'Unique customer key.' }],
fkCandidates: [
{
fromTable: 'orders',
fromColumn: 'buyer_ref',
toTable: 'customers',
toColumn: 'id',
confidence: 0.89,
rationale: 'Buyer reference values align with customer identifiers.',
},
],
};
const result = await discoverKtxRelationships({
connectionId: 'warehouse',
@ -528,8 +508,7 @@ describe('production relationship discovery', () => {
schema: snapshotToKtxEnrichedSchema(llmOnlyRelationshipSnapshot()),
context: { runId: 'llm-relationship-orchestrator' },
settings: relationshipSettings(),
llmProvider: llmProvider(),
generateText,
llmRuntime: llmRuntime(llmOutput),
});
expect(result.llmRelationshipValidation).toBe('completed');

View file

@ -7,7 +7,7 @@ import { proposeKtxRelationshipCandidatesWithLlm } from './relationship-llm-prop
function llmRuntime(output?: unknown): KtxLlmRuntimePort {
return {
generateText: vi.fn(),
generateObject: vi.fn(async () => output),
generateObject: vi.fn(async () => output) as KtxLlmRuntimePort['generateObject'],
runAgentLoop: vi.fn(),
};
}

View file

@ -249,7 +249,6 @@ export async function proposeKtxRelationshipCandidatesWithLlm(
system,
prompt,
schema: relationshipLlmProposalSchema,
generateText: input.generateText,
});
const output = relationshipLlmProposalSchema.parse(generated);
const mapped = mapValidProposals(input.schema, output, settings);

View file

@ -171,7 +171,7 @@ export abstract class BaseTool<TInput extends ZodType = ZodType> {
return {
name: toolName,
description: this.description,
inputSchema: this.inputSchema as KtxRuntimeToolDescriptor['inputSchema'],
inputSchema: this.inputSchema as unknown as KtxRuntimeToolDescriptor['inputSchema'],
execute: async (params) => {
const callContext = { ...context };
if (!callContext.userId) {