Changed API gateway and SDK

This commit is contained in:
Cyber MacGeddon 2026-04-12 15:55:13 +01:00
parent 4930bc4d2b
commit 1630346bd9
7 changed files with 96 additions and 30 deletions

View file

@ -107,6 +107,7 @@ from .types import (
AgentObservation,
AgentAnswer,
RAGChunk,
TextCompletionResult,
ProvenanceEvent,
)
@ -185,6 +186,7 @@ __all__ = [
"AgentObservation",
"AgentAnswer",
"RAGChunk",
"TextCompletionResult",
"ProvenanceEvent",
# Exceptions

View file

@ -14,6 +14,8 @@ import aiohttp
import json
from typing import Optional, Dict, Any, List
from . types import TextCompletionResult
from . exceptions import ProtocolException, ApplicationException
@ -434,12 +436,11 @@ class AsyncFlowInstance:
return await self.request("agent", request_data)
async def text_completion(self, system: str, prompt: str, **kwargs: Any) -> str:
async def text_completion(self, system: str, prompt: str, **kwargs: Any) -> TextCompletionResult:
"""
Generate text completion (non-streaming).
Generates a text response from an LLM given a system prompt and user prompt.
Returns the complete response text.
Note: This method does not support streaming. For streaming text generation,
use AsyncSocketFlowInstance.text_completion() instead.
@ -450,19 +451,19 @@ class AsyncFlowInstance:
**kwargs: Additional service-specific parameters
Returns:
str: Complete generated text response
TextCompletionResult: Result with text, in_token, out_token, model
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Generate text
response = await flow.text_completion(
result = await flow.text_completion(
system="You are a helpful assistant.",
prompt="Explain quantum computing in simple terms."
)
print(response)
print(result.text)
print(f"Tokens: {result.in_token} in, {result.out_token} out")
```
"""
request_data = {
@ -473,7 +474,12 @@ class AsyncFlowInstance:
request_data.update(kwargs)
result = await self.request("text-completion", request_data)
return result.get("response", "")
return TextCompletionResult(
text=result.get("response", ""),
in_token=result.get("in_token", 0) or 0,
out_token=result.get("out_token", 0) or 0,
model=result.get("model", "") or "",
)
async def graph_rag(self, query: str, user: str, collection: str,
max_subgraph_size: int = 1000, max_subgraph_count: int = 5,

View file

@ -4,7 +4,7 @@ import asyncio
import websockets
from typing import Optional, Dict, Any, AsyncIterator, Union
from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk
from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk, TextCompletionResult
from . exceptions import ProtocolException, ApplicationException
@ -211,7 +211,10 @@ class AsyncSocketClient:
return RAGChunk(
content=content,
end_of_stream=resp.get("end_of_stream", False),
error=None
error=None,
in_token=resp.get("in_token", 0) or 0,
out_token=resp.get("out_token", 0) or 0,
model=resp.get("model", "") or "",
)
async def aclose(self):
@ -269,7 +272,11 @@ class AsyncSocketFlowInstance:
return await self.client._send_request("agent", self.flow_id, request)
async def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs):
"""Text completion with optional streaming"""
"""Text completion with optional streaming.
Non-streaming: returns a TextCompletionResult with text and token counts.
Streaming: returns an async iterator of RAGChunk (with token counts on the final chunk).
"""
request = {
"system": system,
"prompt": prompt,
@ -281,13 +288,18 @@ class AsyncSocketFlowInstance:
return self._text_completion_streaming(request)
else:
result = await self.client._send_request("text-completion", self.flow_id, request)
return result.get("response", "")
return TextCompletionResult(
text=result.get("response", ""),
in_token=result.get("in_token", 0) or 0,
out_token=result.get("out_token", 0) or 0,
model=result.get("model", "") or "",
)
async def _text_completion_streaming(self, request):
"""Helper for streaming text completion"""
"""Helper for streaming text completion. Yields RAGChunk objects."""
async for chunk in self.client._send_request_streaming("text-completion", self.flow_id, request):
if hasattr(chunk, 'content'):
yield chunk.content
if isinstance(chunk, RAGChunk):
yield chunk
async def graph_rag(self, query: str, user: str, collection: str,
max_subgraph_size: int = 1000, max_subgraph_count: int = 5,

View file

@ -11,7 +11,7 @@ import base64
from .. knowledge import hash, Uri, Literal, QuotedTriple
from .. schema import IRI, LITERAL, TRIPLE
from . types import Triple
from . types import Triple, TextCompletionResult
from . exceptions import ProtocolException
@ -360,16 +360,17 @@ class FlowInstance:
prompt: User prompt/question
Returns:
str: Generated response text
TextCompletionResult: Result with text, in_token, out_token, model
Example:
```python
flow = api.flow().id("default")
response = flow.text_completion(
result = flow.text_completion(
system="You are a helpful assistant",
prompt="What is quantum computing?"
)
print(response)
print(result.text)
print(f"Tokens: {result.in_token} in, {result.out_token} out")
```
"""
@ -379,10 +380,17 @@ class FlowInstance:
"prompt": prompt
}
return self.request(
result = self.request(
"service/text-completion",
input
)["response"]
)
return TextCompletionResult(
text=result.get("response", ""),
in_token=result.get("in_token", 0) or 0,
out_token=result.get("out_token", 0) or 0,
model=result.get("model", "") or "",
)
def agent(self, question, user="trustgraph", state=None, group=None, history=None):
"""

View file

@ -14,7 +14,7 @@ import websockets
from typing import Optional, Dict, Any, Iterator, Union, List
from threading import Lock
from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk, StreamingChunk, ProvenanceEvent
from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk, StreamingChunk, ProvenanceEvent, TextCompletionResult
from . exceptions import ProtocolException, raise_from_error_dict
@ -404,7 +404,10 @@ class SocketClient:
return RAGChunk(
content=content,
end_of_stream=resp.get("end_of_stream", False),
error=None
error=None,
in_token=resp.get("in_token", 0) or 0,
out_token=resp.get("out_token", 0) or 0,
model=resp.get("model", "") or "",
)
def _build_provenance_event(self, resp: Dict[str, Any]) -> ProvenanceEvent:
@ -543,8 +546,12 @@ class SocketFlowInstance:
streaming=True, include_provenance=True
)
def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> Union[str, Iterator[str]]:
"""Execute text completion with optional streaming."""
def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> Union[TextCompletionResult, Iterator[RAGChunk]]:
"""Execute text completion with optional streaming.
Non-streaming: returns a TextCompletionResult with text and token counts.
Streaming: returns an iterator of RAGChunk (with token counts on the final chunk).
"""
request = {
"system": system,
"prompt": prompt,
@ -557,12 +564,17 @@ class SocketFlowInstance:
if streaming:
return self._text_completion_generator(result)
else:
return result.get("response", "")
return TextCompletionResult(
text=result.get("response", ""),
in_token=result.get("in_token", 0) or 0,
out_token=result.get("out_token", 0) or 0,
model=result.get("model", "") or "",
)
def _text_completion_generator(self, result: Iterator[StreamingChunk]) -> Iterator[str]:
def _text_completion_generator(self, result: Iterator[StreamingChunk]) -> Iterator[RAGChunk]:
for chunk in result:
if hasattr(chunk, 'content'):
yield chunk.content
if isinstance(chunk, RAGChunk):
yield chunk
def graph_rag(
self,

View file

@ -202,11 +202,37 @@ class RAGChunk(StreamingChunk):
content: Generated text content
end_of_stream: True if this is the final chunk of the stream
error: Optional error information if an error occurred
in_token: Input token count (populated on the final chunk, 0 otherwise)
out_token: Output token count (populated on the final chunk, 0 otherwise)
model: Model identifier (populated on the final chunk, empty otherwise)
chunk_type: Always "rag"
"""
chunk_type: str = "rag"
end_of_stream: bool = False
error: Optional[Dict[str, str]] = None
in_token: int = 0
out_token: int = 0
model: str = ""
@dataclasses.dataclass
class TextCompletionResult:
"""
Result from a text completion request.
Returned by text_completion() in both streaming and non-streaming modes.
In streaming mode, text is None (chunks are delivered via the iterator).
In non-streaming mode, text contains the complete response.
Attributes:
text: Complete response text (None in streaming mode)
in_token: Input token count
out_token: Output token count
model: Model identifier
"""
text: Optional[str]
in_token: int = 0
out_token: int = 0
model: str = ""
@dataclasses.dataclass
class ProvenanceEvent:

View file

@ -28,12 +28,12 @@ def query(url, flow_id, system, prompt, streaming=True, token=None):
if streaming:
# Stream output to stdout without newline
for chunk in response:
print(chunk, end="", flush=True)
print(chunk.content, end="", flush=True)
# Add final newline after streaming
print()
else:
# Non-streaming: print complete response
print(response)
print(response.text)
finally:
# Clean up socket connection