git-subtree-dir: ai-context/trustgraph-client git-subtree-split: 908f18cf814470ec3b72cc336bb945fb792ffdec
24 KiB
Streaming Support for TrustGraph Client
Status: Draft for Review Author: Claude Date: 2025-11-27 Version: 1.0
Executive Summary
Extend the TrustGraph TypeScript client to support streaming responses for Graph RAG, Document RAG, Text Completion, and Prompt services. The client already has streaming infrastructure (ServiceCallMulti) used by Agent, but the other services only support single-response mode. This spec proposes minimal changes to enable streaming across all services while maintaining backward compatibility.
Background
Current State
The client has two request patterns:
-
Single-response (
makeRequest→ServiceCall)- Used by: text-completion, graph-rag, document-rag, prompt, and most other services
- Returns Promise that resolves with single response
- Example:
graphRag(text: string): Promise<string>
-
Multi-response (
makeRequestMulti→ServiceCallMulti)- Used by: agent (thoughts/observations/answer), knowledge.getKgCore (large graph streaming)
- Accepts
receiver: (resp: unknown) => booleancallback - Receiver returns
trueto signal end-of-stream - Example:
agent(question, think, observe, answer, error): void
Backend Streaming Protocol
Per STREAMING-IMPLEMENTATION-NOTES.txt, the backend supports streaming when streaming: true is added to requests:
Graph RAG / Document RAG:
- Chunks arrive with
chunkfield - Final chunk has
end_of_stream: true
Text Completion:
- Chunks arrive with
responsefield - Final chunk has
end_of_stream: true
Prompt:
- Chunks arrive with
textfield - Final chunk has
end_of_stream: true
Agent (already implemented):
- Multiple messages with
chunk_type(thought/observation/final-answer) - Final chunk has
end_of_dialog: true
Problem Statement
Primary Issue: Users who want streaming responses for Graph RAG, Document RAG, Text Completion, or Prompt services must:
- Drop down to
makeRequestMultiand handle raw responses - Manually parse
chunk/response/textfields - Check
end_of_streamflag - Handle errors mid-stream
Secondary Issue: The Agent API doesn't correctly implement the backend streaming protocol. The backend sends:
{chunk_type: "thought", content: "I need to", end_of_message: false, end_of_dialog: false}
{chunk_type: "thought", content: " search", end_of_message: false, end_of_dialog: false}
But the client expects:
{thought?: string, observation?: string, answer?: string}
The Agent implementation needs to be updated to handle incremental chunks with completion flags.
Goals
- Fix Agent API to correctly implement backend streaming protocol with chunk-level callbacks
- Add streaming variants for text-completion, graph-rag, document-rag, and prompt services
- Maintain backward compatibility - existing non-streaming APIs unchanged (except Agent which needs fixing)
- Policy-free implementation - no state management (accumulation, buffering, etc.) in client layer
- Minimal callback interface - single receiver callback with chunk and completion flag
- Minimal type changes - reuse existing request/response types where possible
Non-Goals
- Changing the existing non-streaming APIs
- Supporting streaming for services that don't stream (embeddings, triples, etc.)
- Implementing state management (accumulation, buffering) - that belongs in higher layers
- Changing the underlying
ServiceCallMultiimplementation
Design
1. Type Additions
Add streaming-specific response types to src/models/messages.ts:
// Agent streaming response (NEW - replaces old AgentResponse for streaming)
export interface AgentStreamingResponse {
chunk_type?: "thought" | "action" | "observation" | "final-answer" | "error";
content?: string;
end_of_message?: boolean; // Current chunk type is complete
end_of_dialog?: boolean; // Entire agent dialog is complete
// Legacy fields for backward compatibility with non-streaming
thought?: string;
observation?: string;
answer?: string;
error?: string;
}
// Generic streaming response wrapper for RAG/completion services
export interface StreamingChunk {
chunk?: string; // Graph RAG, Document RAG
response?: string; // Text Completion
text?: string; // Prompt
end_of_stream?: boolean;
error?: {
message: string;
type?: string;
};
}
// Request types get optional streaming flag
export interface AgentRequest {
question: string;
user?: string;
streaming?: boolean; // NEW - enable streaming mode
}
export interface GraphRagRequest {
query: string;
user?: string;
collection?: string;
"entity-limit"?: number;
"triple-limit"?: number;
"max-subgraph-size"?: number;
"max-path-length"?: number;
streaming?: boolean; // NEW
}
export interface DocumentRagRequest {
query: string;
user?: string;
collection?: string;
"doc-limit"?: number;
streaming?: boolean; // NEW
}
export interface TextCompletionRequest {
system: string;
prompt: string;
streaming?: boolean; // NEW
}
export interface PromptRequest {
id: string;
terms: Record<string, unknown>;
streaming?: boolean; // NEW
}
export interface PromptResponse {
text: string;
}
2. BaseApi Additions
No changes needed to BaseApi - makeRequestMulti already exists.
3. FlowApi Changes
3.1 Fix Agent Method
Update the existing agent() method to correctly handle the backend streaming protocol:
export class FlowApi {
/**
* Interacts with an AI agent that provides streaming responses
* BREAKING CHANGE: Callbacks now receive (chunk, complete) instead of full messages
*/
agent(
question: string,
think: (chunk: string, complete: boolean) => void,
observe: (chunk: string, complete: boolean) => void,
answer: (chunk: string, complete: boolean) => void,
error: (s: string) => void,
) {
const receiver = (response: unknown) => {
const resp = response as AgentStreamingResponse;
// Check for errors
if (resp.chunk_type === "error" || resp.error) {
const errorMessage = resp.content || resp.error || "Unknown agent error";
error(typeof errorMessage === "string" ? errorMessage : String(errorMessage));
return true; // End streaming on error
}
// Handle streaming chunks by chunk_type
const content = resp.content || "";
const messageComplete = !!resp.end_of_message;
const dialogComplete = !!resp.end_of_dialog;
switch (resp.chunk_type) {
case "thought":
think(content, messageComplete);
break;
case "observation":
observe(content, messageComplete);
break;
case "final-answer":
answer(content, messageComplete);
break;
case "action":
// Actions are typically not streamed incrementally, just logged
console.log("Agent action:", content);
break;
}
return dialogComplete; // End when backend signals end_of_dialog
};
return this.api
.makeRequestMulti<AgentRequest, AgentStreamingResponse>(
"agent",
{
question: question,
user: this.api.user,
streaming: true, // Always use streaming mode
},
receiver,
120000,
2,
this.flowId,
)
.catch((err) => {
const errorMessage =
err instanceof Error ? err.message : err?.toString() || "Unknown error";
error(`Agent request failed: ${errorMessage}`);
});
}
#### 3.2 Add New Streaming Methods
Add streaming variants for other services alongside existing methods in `src/socket/trustgraph-socket.ts`:
```typescript
// ... existing non-streaming methods unchanged ...
/**
* Performs Graph RAG query with streaming response
* @param text - Query text
* @param receiver - Called for each chunk with (chunk, complete) where complete=true on final chunk
* @param onError - Called on error
* @param options - Graph RAG options
* @param collection - Collection name
*/
graphRagStreaming(
text: string,
receiver: (chunk: string, complete: boolean) => void,
onError: (error: string) => void,
options?: GraphRagOptions,
collection?: string,
): void {
const recv = (response: unknown): boolean => {
const resp = response as StreamingChunk;
if (resp.error) {
onError(resp.error.message);
return true; // End streaming
}
const chunk = resp.chunk || "";
const complete = !!resp.end_of_stream;
receiver(chunk, complete);
return complete; // End when backend signals end_of_stream
};
this.api.makeRequestMulti<GraphRagRequest, StreamingChunk>(
"graph-rag",
{
query: text,
user: this.api.user,
collection: collection || "default",
"entity-limit": options?.entityLimit,
"triple-limit": options?.tripleLimit,
"max-subgraph-size": options?.maxSubgraphSize,
"max-path-length": options?.pathLength,
streaming: true,
},
recv,
60000,
undefined,
this.flowId,
);
}
/**
* Performs Document RAG query with streaming response
* @param text - Query text
* @param receiver - Called for each chunk with (chunk, complete) where complete=true on final chunk
* @param onError - Called on error
* @param docLimit - Maximum documents to retrieve
* @param collection - Collection name
*/
documentRagStreaming(
text: string,
receiver: (chunk: string, complete: boolean) => void,
onError: (error: string) => void,
docLimit?: number,
collection?: string,
): void {
const recv = (response: unknown): boolean => {
const resp = response as StreamingChunk;
if (resp.error) {
onError(resp.error.message);
return true;
}
const chunk = resp.chunk || "";
const complete = !!resp.end_of_stream;
receiver(chunk, complete);
return complete;
};
this.api.makeRequestMulti<DocumentRagRequest, StreamingChunk>(
"document-rag",
{
query: text,
user: this.api.user,
collection: collection || "default",
"doc-limit": docLimit,
streaming: true,
},
recv,
60000,
undefined,
this.flowId,
);
}
/**
* Performs text completion with streaming response
* @param system - System prompt
* @param text - User prompt
* @param receiver - Called for each chunk with (chunk, complete) where complete=true on final chunk
* @param onError - Called on error
*/
textCompletionStreaming(
system: string,
text: string,
receiver: (chunk: string, complete: boolean) => void,
onError: (error: string) => void,
): void {
const recv = (response: unknown): boolean => {
const resp = response as StreamingChunk;
if (resp.error) {
onError(resp.error.message);
return true;
}
// Text completion uses 'response' field, not 'chunk'
const chunk = resp.response || "";
const complete = !!resp.end_of_stream;
receiver(chunk, complete);
return complete;
};
this.api.makeRequestMulti<TextCompletionRequest, StreamingChunk>(
"text-completion",
{
system: system,
prompt: text,
streaming: true,
},
recv,
30000,
undefined,
this.flowId,
);
}
/**
* Executes a prompt template with streaming response
* @param id - Prompt template ID
* @param terms - Template variables
* @param receiver - Called for each chunk with (chunk, complete) where complete=true on final chunk
* @param onError - Called on error
*/
promptStreaming(
id: string,
terms: Record<string, unknown>,
receiver: (chunk: string, complete: boolean) => void,
onError: (error: string) => void,
): void {
const recv = (response: unknown): boolean => {
const resp = response as StreamingChunk;
if (resp.error) {
onError(resp.error.message);
return true;
}
// Prompt service uses 'text' field
const chunk = resp.text || "";
const complete = !!resp.end_of_stream;
receiver(chunk, complete);
return complete;
};
this.api.makeRequestMulti<PromptRequest, StreamingChunk>(
"prompt",
{
id: id,
terms: terms,
streaming: true,
},
recv,
30000,
undefined,
this.flowId,
);
}
}
4. BaseApi Convenience Methods (Optional)
For users who don't need flow routing, add streaming methods to BaseApi:
export class BaseApi {
// Existing methods...
/**
* Streaming text completion without flow routing
*/
textCompletionStreaming(
system: string,
prompt: string,
receiver: (chunk: string, complete: boolean) => void,
onError: (error: string) => void,
): void {
const flowApi = new FlowApi(this, undefined);
flowApi.textCompletionStreaming(system, prompt, receiver, onError);
}
// Similar for graphRagStreaming, documentRagStreaming, promptStreaming...
}
Recommendation: Add these for consistency with existing non-streaming methods on BaseApi.
Implementation Plan
Phase 1: Core Types (1 hour)
- Add
streaming?: booleanto request types - Add
StreamingChunkinterface - Add
PromptRequestandPromptResponsetypes (currently missing)
Phase 2: FlowApi Streaming Methods (2 hours)
- Implement
textCompletionStreaming - Implement
graphRagStreaming - Implement
documentRagStreaming - Implement
promptStreaming - Add JSDoc comments
Phase 3: BaseApi Convenience Methods (1 hour)
- Add streaming methods to BaseApi
- Update interface definitions
- Update README with streaming examples
Phase 4: Testing (2 hours)
- Add unit tests for streaming methods
- Add integration tests against mock WebSocket
- Test error handling mid-stream
- Test timeout behavior
- Test concurrent streaming requests
Phase 5: Documentation (1 hour)
- Update README with streaming examples
- Add streaming guide to docs/
- Update API reference
Total Estimated Time: 7 hours
Testing Strategy
Unit Tests
describe("FlowApi streaming", () => {
it("should stream graph-rag chunks", async () => {
const chunks: Array<{ chunk: string; complete: boolean }> = [];
flowApi.graphRagStreaming(
"test query",
(chunk, complete) => {
chunks.push({ chunk, complete });
},
(error) => fail(error),
);
// Simulate streaming chunks
mockWebSocket.simulateMessage({ chunk: "Hello", end_of_stream: false });
mockWebSocket.simulateMessage({ chunk: " world", end_of_stream: false });
mockWebSocket.simulateMessage({ chunk: "", end_of_stream: true });
expect(chunks).toEqual([
{ chunk: "Hello", complete: false },
{ chunk: " world", complete: false },
{ chunk: "", complete: true },
]);
});
it("should handle errors mid-stream", async () => {
let errorMsg = "";
const chunks: string[] = [];
flowApi.graphRagStreaming(
"test query",
(chunk, complete) => {
chunks.push(chunk);
},
(error) => {
errorMsg = error;
},
);
mockWebSocket.simulateMessage({ chunk: "Partial", end_of_stream: false });
mockWebSocket.simulateMessage({
error: { message: "LLM timeout" },
end_of_stream: true,
});
expect(errorMsg).toBe("LLM timeout");
expect(chunks).toEqual(["Partial"]); // Receiver gets chunks before error
});
});
Integration Tests
Test against actual TrustGraph backend (manual testing):
- Start TrustGraph backend with streaming enabled
- Test each streaming method with real queries
- Verify chunks arrive in order
- Verify end_of_stream handling
- Test error scenarios (invalid query, timeout)
Migration Guide
For Users
Graph RAG / Document RAG / Text Completion / Prompt
Before (non-streaming):
const response = await flowApi.graphRag("What is machine learning?");
console.log(response); // Full text after 10-30 seconds
After (streaming):
let accumulated = "";
flowApi.graphRagStreaming(
"What is machine learning?",
(chunk, complete) => {
accumulated += chunk;
updateDisplay(accumulated);
if (complete) {
console.log("Final:", accumulated);
}
},
(error) => {
console.error("Error:", error);
}
);
Agent (BREAKING CHANGE)
Before (old client - incorrect):
flowApi.agent(
"What is machine learning?",
(thought) => console.log("Thinking:", thought), // Full thought received
(observation) => console.log("Observing:", observation), // Full observation received
(answer) => console.log("Answer:", answer), // Full answer received
(error) => console.error(error),
);
After (updated to match backend):
let currentThought = "";
let currentObservation = "";
let currentAnswer = "";
flowApi.agent(
"What is machine learning?",
(chunk, complete) => {
currentThought += chunk;
updateThinkingDisplay(currentThought);
if (complete) {
console.log("Thought complete:", currentThought);
currentThought = ""; // Reset for next thought
}
},
(chunk, complete) => {
currentObservation += chunk;
updateObservationDisplay(currentObservation);
if (complete) {
console.log("Observation complete:", currentObservation);
currentObservation = "";
}
},
(chunk, complete) => {
currentAnswer += chunk;
updateAnswerDisplay(currentAnswer);
if (complete) {
console.log("Final answer:", currentAnswer);
}
},
(error) => console.error(error),
);
Gradual Adoption
For Graph RAG / Document RAG / Text Completion / Prompt:
- Continue using non-streaming APIs (no breaking changes)
- Add streaming variants for user-facing chat interfaces first
- Keep non-streaming for background tasks
- Optionally add feature flag to toggle streaming on/off
For Agent (BREAKING CHANGE):
- Existing Agent users MUST update their callbacks to handle (chunk, complete) signature
- Add accumulation logic in callback handlers
- Use
completeflag to detect when to reset accumulator or take final action
Risks and Mitigations
Risk 1: BREAKING CHANGE for Agent API
Concern: Existing Agent users must update their code when they upgrade.
Mitigation:
- Document the breaking change clearly in release notes
- Provide migration examples in this spec
- Consider: Add deprecation warning in previous version before breaking change
- Consider: Bump major version to signal breaking change
- The old API was incorrect anyway - this fixes a bug in the client
Risk 2: API Surface Growth
Concern: Adding 4 new methods per API class (FlowApi, BaseApi) increases maintenance burden.
Mitigation:
- Methods share identical structure (only field name differs: chunk/response/text)
- Could extract common streaming handler if needed
- Backend already implements streaming, so no protocol risk
Risk 3: TypeScript Type Safety
Concern: StreamingChunk union type may be confusing (chunk vs response vs text).
Mitigation:
- Each service method documents which field it uses
- Runtime code checks correct field
- Implementation is simple enough that field selection is obvious
Risk 4: State Management in User Code
Concern: Users must manually accumulate chunks if they need full text.
Mitigation:
- This is intentional - client stays policy-free
- Higher-level abstractions (React hooks, etc.) can provide accumulation
- For users who don't need streaming behavior, non-streaming APIs remain unchanged
Future Enhancements
1. Async Iterator API
Provide a modern streaming API using async iterators:
async *graphRagStream(text: string): AsyncGenerator<string, void, void> {
// Wraps graphRagStreaming in async iterator
}
// Usage:
for await (const chunk of flowApi.graphRagStream("query")) {
console.log(chunk);
}
2. Retry on Stream Interruption
Currently, retries only apply to initial request. Could add mid-stream retry:
- Detect connection drop mid-stream
- Resume from last chunk (if backend supports resumption)
3. Client-Side Buffering
For very fast chunk arrival, buffer multiple chunks before calling receiver:
- Reduces callback frequency
- Could be opt-in via options parameter
- Note: This would add policy to the client, may be better in higher layers
4. Stream Cancellation
Allow users to cancel in-flight streaming requests:
const cancel = flowApi.graphRagStreaming(...);
// Later:
cancel();
Alternatives Considered
Alternative 1: Separate Callbacks for Chunk and Complete
Use three callbacks: onChunk, onComplete, onError:
graphRagStreaming(
text: string,
onChunk: (chunk: string, accumulated: string) => void,
onComplete: (fullText: string) => void,
onError: (error: string) => void,
)
Rejected because:
- Adds state management (accumulation) to the client layer
- Harder for implementations that need both signals at once
- More verbose callback signature
Alternative 2: Unified Streaming Flag on Existing Methods
Modify existing methods to detect streaming callbacks:
graphRag(
text: string,
options?: GraphRagOptions,
collection?: string,
receiver?: (chunk: string, complete: boolean) => void,
): Promise<string> | void
Rejected because:
- Violates single responsibility principle
- Return type becomes conditional (Promise vs void)
- Hard to type correctly in TypeScript
- Confusing API (streaming vs non-streaming behavior implicit)
Alternative 3: Separate StreamingFlowApi Class
Create a parallel API class for streaming:
export class StreamingFlowApi {
graphRag(text: string, receiver: ..., onError: ...): void;
documentRag(text: string, receiver: ..., onError: ...): void;
}
Rejected because:
- Duplicates all configuration and state management
- Users must manage two API instances
- No clear benefit over method suffixes
Open Questions
-
Should we add streaming to Prompt service?
- Prompt service is not currently in client (no PromptRequest/Response types)
- Could add it alongside streaming support
- Decision: Yes, add it for completeness (mentioned in backend docs)
-
Should we add TypeScript overloads?
- Allow
graphRagStreaming(text, callbacks)vsgraphRagStreaming(text, options, callbacks) - Decision: Use optional parameters (simpler implementation)
- Allow
Conclusion
This proposal adds streaming support to the TrustGraph client and fixes the Agent API to correctly implement the backend protocol:
Changes:
- Fix Agent API (BREAKING): Update callbacks to receive
(chunk, complete)instead of full messages - Add
streaming?: booleanflag to all request types - Add
AgentStreamingResponseandStreamingChunkresponse types - Add
*Streamingmethod variants to FlowApi and BaseApi for RAG/completion services - Use consistent two-callback pattern:
receiver(chunk, complete)andonError(message)across all services
The implementation is straightforward (~7-10 hours including Agent fix), stays minimal and focused, and provides a clean foundation for higher-level abstractions to build upon.
Key Design Principles:
- Policy-free: No accumulation or buffering in client layer
- Minimal callbacks: Single receiver gets both chunk and completion signal
- Protocol-correct: Agent now properly implements backend's chunk_type/content/end_of_message protocol
- Consistent: Same pattern across all streaming services
- Backward compatible: Existing non-streaming APIs unchanged (except Agent which needs fixing)
Breaking Changes:
- Agent API callbacks change from
(fullMessage: string)to(chunk: string, complete: boolean) - Requires major version bump
Recommendation: Approve and implement in current sprint.