From 1630346bd9ba01e2227996513ffefdf1dd5f4ef0 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Sun, 12 Apr 2026 15:55:13 +0100 Subject: [PATCH] Changed API gateway and SDK --- trustgraph-base/trustgraph/api/__init__.py | 2 ++ trustgraph-base/trustgraph/api/async_flow.py | 20 ++++++++----- .../trustgraph/api/async_socket_client.py | 26 ++++++++++++----- trustgraph-base/trustgraph/api/flow.py | 20 +++++++++---- .../trustgraph/api/socket_client.py | 28 +++++++++++++------ trustgraph-base/trustgraph/api/types.py | 26 +++++++++++++++++ trustgraph-cli/trustgraph/cli/invoke_llm.py | 4 +-- 7 files changed, 96 insertions(+), 30 deletions(-) diff --git a/trustgraph-base/trustgraph/api/__init__.py b/trustgraph-base/trustgraph/api/__init__.py index 8b703dc7..2f44aad0 100644 --- a/trustgraph-base/trustgraph/api/__init__.py +++ b/trustgraph-base/trustgraph/api/__init__.py @@ -107,6 +107,7 @@ from .types import ( AgentObservation, AgentAnswer, RAGChunk, + TextCompletionResult, ProvenanceEvent, ) @@ -185,6 +186,7 @@ __all__ = [ "AgentObservation", "AgentAnswer", "RAGChunk", + "TextCompletionResult", "ProvenanceEvent", # Exceptions diff --git a/trustgraph-base/trustgraph/api/async_flow.py b/trustgraph-base/trustgraph/api/async_flow.py index 2ff37307..9dd110c6 100644 --- a/trustgraph-base/trustgraph/api/async_flow.py +++ b/trustgraph-base/trustgraph/api/async_flow.py @@ -14,6 +14,8 @@ import aiohttp import json from typing import Optional, Dict, Any, List +from . types import TextCompletionResult + from . exceptions import ProtocolException, ApplicationException @@ -434,12 +436,11 @@ class AsyncFlowInstance: return await self.request("agent", request_data) - async def text_completion(self, system: str, prompt: str, **kwargs: Any) -> str: + async def text_completion(self, system: str, prompt: str, **kwargs: Any) -> TextCompletionResult: """ Generate text completion (non-streaming). Generates a text response from an LLM given a system prompt and user prompt. - Returns the complete response text. Note: This method does not support streaming. For streaming text generation, use AsyncSocketFlowInstance.text_completion() instead. @@ -450,19 +451,19 @@ class AsyncFlowInstance: **kwargs: Additional service-specific parameters Returns: - str: Complete generated text response + TextCompletionResult: Result with text, in_token, out_token, model Example: ```python async_flow = await api.async_flow() flow = async_flow.id("default") - # Generate text - response = await flow.text_completion( + result = await flow.text_completion( system="You are a helpful assistant.", prompt="Explain quantum computing in simple terms." ) - print(response) + print(result.text) + print(f"Tokens: {result.in_token} in, {result.out_token} out") ``` """ request_data = { @@ -473,7 +474,12 @@ class AsyncFlowInstance: request_data.update(kwargs) result = await self.request("text-completion", request_data) - return result.get("response", "") + return TextCompletionResult( + text=result.get("response", ""), + in_token=result.get("in_token", 0) or 0, + out_token=result.get("out_token", 0) or 0, + model=result.get("model", "") or "", + ) async def graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, diff --git a/trustgraph-base/trustgraph/api/async_socket_client.py b/trustgraph-base/trustgraph/api/async_socket_client.py index 7a239b07..9c9c30d5 100644 --- a/trustgraph-base/trustgraph/api/async_socket_client.py +++ b/trustgraph-base/trustgraph/api/async_socket_client.py @@ -4,7 +4,7 @@ import asyncio import websockets from typing import Optional, Dict, Any, AsyncIterator, Union -from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk +from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk, TextCompletionResult from . exceptions import ProtocolException, ApplicationException @@ -211,7 +211,10 @@ class AsyncSocketClient: return RAGChunk( content=content, end_of_stream=resp.get("end_of_stream", False), - error=None + error=None, + in_token=resp.get("in_token", 0) or 0, + out_token=resp.get("out_token", 0) or 0, + model=resp.get("model", "") or "", ) async def aclose(self): @@ -269,7 +272,11 @@ class AsyncSocketFlowInstance: return await self.client._send_request("agent", self.flow_id, request) async def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs): - """Text completion with optional streaming""" + """Text completion with optional streaming. + + Non-streaming: returns a TextCompletionResult with text and token counts. + Streaming: returns an async iterator of RAGChunk (with token counts on the final chunk). + """ request = { "system": system, "prompt": prompt, @@ -281,13 +288,18 @@ class AsyncSocketFlowInstance: return self._text_completion_streaming(request) else: result = await self.client._send_request("text-completion", self.flow_id, request) - return result.get("response", "") + return TextCompletionResult( + text=result.get("response", ""), + in_token=result.get("in_token", 0) or 0, + out_token=result.get("out_token", 0) or 0, + model=result.get("model", "") or "", + ) async def _text_completion_streaming(self, request): - """Helper for streaming text completion""" + """Helper for streaming text completion. Yields RAGChunk objects.""" async for chunk in self.client._send_request_streaming("text-completion", self.flow_id, request): - if hasattr(chunk, 'content'): - yield chunk.content + if isinstance(chunk, RAGChunk): + yield chunk async def graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, diff --git a/trustgraph-base/trustgraph/api/flow.py b/trustgraph-base/trustgraph/api/flow.py index 0aa55347..b39f3db9 100644 --- a/trustgraph-base/trustgraph/api/flow.py +++ b/trustgraph-base/trustgraph/api/flow.py @@ -11,7 +11,7 @@ import base64 from .. knowledge import hash, Uri, Literal, QuotedTriple from .. schema import IRI, LITERAL, TRIPLE -from . types import Triple +from . types import Triple, TextCompletionResult from . exceptions import ProtocolException @@ -360,16 +360,17 @@ class FlowInstance: prompt: User prompt/question Returns: - str: Generated response text + TextCompletionResult: Result with text, in_token, out_token, model Example: ```python flow = api.flow().id("default") - response = flow.text_completion( + result = flow.text_completion( system="You are a helpful assistant", prompt="What is quantum computing?" ) - print(response) + print(result.text) + print(f"Tokens: {result.in_token} in, {result.out_token} out") ``` """ @@ -379,10 +380,17 @@ class FlowInstance: "prompt": prompt } - return self.request( + result = self.request( "service/text-completion", input - )["response"] + ) + + return TextCompletionResult( + text=result.get("response", ""), + in_token=result.get("in_token", 0) or 0, + out_token=result.get("out_token", 0) or 0, + model=result.get("model", "") or "", + ) def agent(self, question, user="trustgraph", state=None, group=None, history=None): """ diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index b6ceba00..68db0ef7 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -14,7 +14,7 @@ import websockets from typing import Optional, Dict, Any, Iterator, Union, List from threading import Lock -from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk, StreamingChunk, ProvenanceEvent +from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk, StreamingChunk, ProvenanceEvent, TextCompletionResult from . exceptions import ProtocolException, raise_from_error_dict @@ -404,7 +404,10 @@ class SocketClient: return RAGChunk( content=content, end_of_stream=resp.get("end_of_stream", False), - error=None + error=None, + in_token=resp.get("in_token", 0) or 0, + out_token=resp.get("out_token", 0) or 0, + model=resp.get("model", "") or "", ) def _build_provenance_event(self, resp: Dict[str, Any]) -> ProvenanceEvent: @@ -543,8 +546,12 @@ class SocketFlowInstance: streaming=True, include_provenance=True ) - def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> Union[str, Iterator[str]]: - """Execute text completion with optional streaming.""" + def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> Union[TextCompletionResult, Iterator[RAGChunk]]: + """Execute text completion with optional streaming. + + Non-streaming: returns a TextCompletionResult with text and token counts. + Streaming: returns an iterator of RAGChunk (with token counts on the final chunk). + """ request = { "system": system, "prompt": prompt, @@ -557,12 +564,17 @@ class SocketFlowInstance: if streaming: return self._text_completion_generator(result) else: - return result.get("response", "") + return TextCompletionResult( + text=result.get("response", ""), + in_token=result.get("in_token", 0) or 0, + out_token=result.get("out_token", 0) or 0, + model=result.get("model", "") or "", + ) - def _text_completion_generator(self, result: Iterator[StreamingChunk]) -> Iterator[str]: + def _text_completion_generator(self, result: Iterator[StreamingChunk]) -> Iterator[RAGChunk]: for chunk in result: - if hasattr(chunk, 'content'): - yield chunk.content + if isinstance(chunk, RAGChunk): + yield chunk def graph_rag( self, diff --git a/trustgraph-base/trustgraph/api/types.py b/trustgraph-base/trustgraph/api/types.py index 55635584..da05e76d 100644 --- a/trustgraph-base/trustgraph/api/types.py +++ b/trustgraph-base/trustgraph/api/types.py @@ -202,11 +202,37 @@ class RAGChunk(StreamingChunk): content: Generated text content end_of_stream: True if this is the final chunk of the stream error: Optional error information if an error occurred + in_token: Input token count (populated on the final chunk, 0 otherwise) + out_token: Output token count (populated on the final chunk, 0 otherwise) + model: Model identifier (populated on the final chunk, empty otherwise) chunk_type: Always "rag" """ chunk_type: str = "rag" end_of_stream: bool = False error: Optional[Dict[str, str]] = None + in_token: int = 0 + out_token: int = 0 + model: str = "" + +@dataclasses.dataclass +class TextCompletionResult: + """ + Result from a text completion request. + + Returned by text_completion() in both streaming and non-streaming modes. + In streaming mode, text is None (chunks are delivered via the iterator). + In non-streaming mode, text contains the complete response. + + Attributes: + text: Complete response text (None in streaming mode) + in_token: Input token count + out_token: Output token count + model: Model identifier + """ + text: Optional[str] + in_token: int = 0 + out_token: int = 0 + model: str = "" @dataclasses.dataclass class ProvenanceEvent: diff --git a/trustgraph-cli/trustgraph/cli/invoke_llm.py b/trustgraph-cli/trustgraph/cli/invoke_llm.py index a1611625..47603442 100644 --- a/trustgraph-cli/trustgraph/cli/invoke_llm.py +++ b/trustgraph-cli/trustgraph/cli/invoke_llm.py @@ -28,12 +28,12 @@ def query(url, flow_id, system, prompt, streaming=True, token=None): if streaming: # Stream output to stdout without newline for chunk in response: - print(chunk, end="", flush=True) + print(chunk.content, end="", flush=True) # Add final newline after streaming print() else: # Non-streaming: print complete response - print(response) + print(response.text) finally: # Clean up socket connection