Python API implements streaming interfaces (#577)

* Tech spec

* Python CLI utilities updated to use the API including streaming features

* Added type safety to Python API

* Completed missing auth token support in CLI
This commit is contained in:
cybermaggedon 2025-12-04 17:38:57 +00:00 committed by GitHub
parent b957004db9
commit 01aeede78b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
53 changed files with 4489 additions and 715 deletions

View file

@ -1,3 +1,80 @@
from . api import *
# Core API
from .api import Api
# Flow clients
from .flow import Flow, FlowInstance
from .async_flow import AsyncFlow, AsyncFlowInstance
# WebSocket clients
from .socket_client import SocketClient, SocketFlowInstance
from .async_socket_client import AsyncSocketClient, AsyncSocketFlowInstance
# Bulk operation clients
from .bulk_client import BulkClient
from .async_bulk_client import AsyncBulkClient
# Metrics clients
from .metrics import Metrics
from .async_metrics import AsyncMetrics
# Types
from .types import (
Triple,
ConfigKey,
ConfigValue,
DocumentMetadata,
ProcessingMetadata,
CollectionMetadata,
StreamingChunk,
AgentThought,
AgentObservation,
AgentAnswer,
RAGChunk,
)
# Exceptions
from .exceptions import ProtocolException, ApplicationException
__all__ = [
# Core API
"Api",
# Flow clients
"Flow",
"FlowInstance",
"AsyncFlow",
"AsyncFlowInstance",
# WebSocket clients
"SocketClient",
"SocketFlowInstance",
"AsyncSocketClient",
"AsyncSocketFlowInstance",
# Bulk operation clients
"BulkClient",
"AsyncBulkClient",
# Metrics clients
"Metrics",
"AsyncMetrics",
# Types
"Triple",
"ConfigKey",
"ConfigValue",
"DocumentMetadata",
"ProcessingMetadata",
"CollectionMetadata",
"StreamingChunk",
"AgentThought",
"AgentObservation",
"AgentAnswer",
"RAGChunk",
# Exceptions
"ProtocolException",
"ApplicationException",
]

View file

@ -3,6 +3,7 @@ import requests
import json
import base64
import time
from typing import Optional
from . library import Library
from . flow import Flow
@ -26,7 +27,7 @@ def check_error(response):
class Api:
def __init__(self, url="http://localhost:8088/", timeout=60):
def __init__(self, url="http://localhost:8088/", timeout=60, token: Optional[str] = None):
self.url = url
@ -36,6 +37,16 @@ class Api:
self.url += "api/v1/"
self.timeout = timeout
self.token = token
# Lazy initialization for new clients
self._socket_client = None
self._bulk_client = None
self._async_flow = None
self._async_socket_client = None
self._async_bulk_client = None
self._metrics = None
self._async_metrics = None
def flow(self):
return Flow(api=self)
@ -50,8 +61,12 @@ class Api:
url = f"{self.url}{path}"
headers = {}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
# Invoke the API, input is passed as JSON
resp = requests.post(url, json=request, timeout=self.timeout)
resp = requests.post(url, json=request, timeout=self.timeout, headers=headers)
# Should be a 200 status code
if resp.status_code != 200:
@ -72,3 +87,96 @@ class Api:
def collection(self):
return Collection(self)
# New synchronous methods
def socket(self):
"""Synchronous WebSocket-based interface for streaming operations"""
if self._socket_client is None:
from . socket_client import SocketClient
# Extract base URL (remove api/v1/ suffix)
base_url = self.url.rsplit("api/v1/", 1)[0].rstrip("/")
self._socket_client = SocketClient(base_url, self.timeout, self.token)
return self._socket_client
def bulk(self):
"""Synchronous bulk operations interface for import/export"""
if self._bulk_client is None:
from . bulk_client import BulkClient
# Extract base URL (remove api/v1/ suffix)
base_url = self.url.rsplit("api/v1/", 1)[0].rstrip("/")
self._bulk_client = BulkClient(base_url, self.timeout, self.token)
return self._bulk_client
def metrics(self):
"""Synchronous metrics interface"""
if self._metrics is None:
from . metrics import Metrics
# Extract base URL (remove api/v1/ suffix)
base_url = self.url.rsplit("api/v1/", 1)[0].rstrip("/")
self._metrics = Metrics(base_url, self.timeout, self.token)
return self._metrics
# New asynchronous methods
def async_flow(self):
"""Asynchronous REST-based flow interface"""
if self._async_flow is None:
from . async_flow import AsyncFlow
self._async_flow = AsyncFlow(self.url, self.timeout, self.token)
return self._async_flow
def async_socket(self):
"""Asynchronous WebSocket-based interface for streaming operations"""
if self._async_socket_client is None:
from . async_socket_client import AsyncSocketClient
# Extract base URL (remove api/v1/ suffix)
base_url = self.url.rsplit("api/v1/", 1)[0].rstrip("/")
self._async_socket_client = AsyncSocketClient(base_url, self.timeout, self.token)
return self._async_socket_client
def async_bulk(self):
"""Asynchronous bulk operations interface for import/export"""
if self._async_bulk_client is None:
from . async_bulk_client import AsyncBulkClient
# Extract base URL (remove api/v1/ suffix)
base_url = self.url.rsplit("api/v1/", 1)[0].rstrip("/")
self._async_bulk_client = AsyncBulkClient(base_url, self.timeout, self.token)
return self._async_bulk_client
def async_metrics(self):
"""Asynchronous metrics interface"""
if self._async_metrics is None:
from . async_metrics import AsyncMetrics
# Extract base URL (remove api/v1/ suffix)
base_url = self.url.rsplit("api/v1/", 1)[0].rstrip("/")
self._async_metrics = AsyncMetrics(base_url, self.timeout, self.token)
return self._async_metrics
# Resource management
def close(self):
"""Close all synchronous connections"""
if self._socket_client:
self._socket_client.close()
if self._bulk_client:
self._bulk_client.close()
async def aclose(self):
"""Close all asynchronous connections"""
if self._async_socket_client:
await self._async_socket_client.aclose()
if self._async_bulk_client:
await self._async_bulk_client.aclose()
if self._async_flow:
await self._async_flow.aclose()
# Context manager support
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.aclose()

View file

@ -0,0 +1,131 @@
import json
import websockets
from typing import Optional, AsyncIterator, Dict, Any, Iterator
from . types import Triple
class AsyncBulkClient:
"""Asynchronous bulk operations client"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
self.url: str = self._convert_to_ws_url(url)
self.timeout: int = timeout
self.token: Optional[str] = token
def _convert_to_ws_url(self, url: str) -> str:
"""Convert HTTP URL to WebSocket URL"""
if url.startswith("http://"):
return url.replace("http://", "ws://", 1)
elif url.startswith("https://"):
return url.replace("https://", "wss://", 1)
elif url.startswith("ws://") or url.startswith("wss://"):
return url
else:
return f"ws://{url}"
async def import_triples(self, flow: str, triples: AsyncIterator[Triple], **kwargs: Any) -> None:
"""Bulk import triples via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for triple in triples:
message = {
"s": triple.s,
"p": triple.p,
"o": triple.o
}
await websocket.send(json.dumps(message))
async def export_triples(self, flow: str, **kwargs: Any) -> AsyncIterator[Triple]:
"""Bulk export triples via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/triples"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
data = json.loads(raw_message)
yield Triple(
s=data.get("s", ""),
p=data.get("p", ""),
o=data.get("o", "")
)
async def import_graph_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import graph embeddings via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/graph-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for embedding in embeddings:
await websocket.send(json.dumps(embedding))
async def export_graph_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]:
"""Bulk export graph embeddings via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/graph-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
yield json.loads(raw_message)
async def import_document_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import document embeddings via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/document-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for embedding in embeddings:
await websocket.send(json.dumps(embedding))
async def export_document_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]:
"""Bulk export document embeddings via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/document-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
yield json.loads(raw_message)
async def import_entity_contexts(self, flow: str, contexts: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import entity contexts via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for context in contexts:
await websocket.send(json.dumps(context))
async def export_entity_contexts(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]:
"""Bulk export entity contexts via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/entity-contexts"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
yield json.loads(raw_message)
async def import_objects(self, flow: str, objects: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import objects via WebSocket"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/objects"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for obj in objects:
await websocket.send(json.dumps(obj))
async def aclose(self) -> None:
"""Close connections"""
# Cleanup handled by context managers
pass

View file

@ -0,0 +1,245 @@
import aiohttp
import json
from typing import Optional, Dict, Any, List
from . exceptions import ProtocolException, ApplicationException
def check_error(response):
if "error" in response:
try:
msg = response["error"]["message"]
tp = response["error"]["type"]
except:
raise ApplicationException(response["error"])
raise ApplicationException(f"{tp}: {msg}")
class AsyncFlow:
"""Asynchronous REST-based flow interface"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
self.url: str = url
self.timeout: int = timeout
self.token: Optional[str] = token
async def request(self, path: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Make async HTTP request to Gateway API"""
url = f"{self.url}{path}"
headers = {"Content-Type": "application/json"}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, json=request_data, headers=headers) as resp:
if resp.status != 200:
raise ProtocolException(f"Status code {resp.status}")
try:
obj = await resp.json()
except:
raise ProtocolException(f"Expected JSON response")
check_error(obj)
return obj
async def list(self) -> List[str]:
"""List all flows"""
result = await self.request("flow", {"operation": "list-flows"})
return result.get("flow-ids", [])
async def get(self, id: str) -> Dict[str, Any]:
"""Get flow definition"""
result = await self.request("flow", {
"operation": "get-flow",
"flow-id": id
})
return json.loads(result.get("flow", "{}"))
async def start(self, class_name: str, id: str, description: str, parameters: Optional[Dict] = None):
"""Start a flow"""
request_data = {
"operation": "start-flow",
"flow-id": id,
"class-name": class_name,
"description": description
}
if parameters:
request_data["parameters"] = json.dumps(parameters)
await self.request("flow", request_data)
async def stop(self, id: str):
"""Stop a flow"""
await self.request("flow", {
"operation": "stop-flow",
"flow-id": id
})
async def list_classes(self) -> List[str]:
"""List flow classes"""
result = await self.request("flow", {"operation": "list-classes"})
return result.get("class-names", [])
async def get_class(self, class_name: str) -> Dict[str, Any]:
"""Get flow class definition"""
result = await self.request("flow", {
"operation": "get-class",
"class-name": class_name
})
return json.loads(result.get("class-definition", "{}"))
async def put_class(self, class_name: str, definition: Dict[str, Any]):
"""Create/update flow class"""
await self.request("flow", {
"operation": "put-class",
"class-name": class_name,
"class-definition": json.dumps(definition)
})
async def delete_class(self, class_name: str):
"""Delete flow class"""
await self.request("flow", {
"operation": "delete-class",
"class-name": class_name
})
def id(self, flow_id: str):
"""Get async flow instance"""
return AsyncFlowInstance(self, flow_id)
async def aclose(self) -> None:
"""Close connection (cleanup handled by aiohttp session)"""
pass
class AsyncFlowInstance:
"""Asynchronous REST flow instance"""
def __init__(self, flow: AsyncFlow, flow_id: str):
self.flow = flow
self.flow_id = flow_id
async def request(self, service: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Make request to flow-scoped service"""
return await self.flow.request(f"flow/{self.flow_id}/service/{service}", request_data)
async def agent(self, question: str, user: str, state: Optional[Dict] = None,
group: Optional[str] = None, history: Optional[List] = None, **kwargs: Any) -> Dict[str, Any]:
"""Execute agent (non-streaming, use async_socket for streaming)"""
request_data = {
"question": question,
"user": user,
"streaming": False # REST doesn't support streaming
}
if state is not None:
request_data["state"] = state
if group is not None:
request_data["group"] = group
if history is not None:
request_data["history"] = history
request_data.update(kwargs)
return await self.request("agent", request_data)
async def text_completion(self, system: str, prompt: str, **kwargs: Any) -> str:
"""Text completion (non-streaming, use async_socket for streaming)"""
request_data = {
"system": system,
"prompt": prompt,
"streaming": False
}
request_data.update(kwargs)
result = await self.request("text-completion", request_data)
return result.get("response", "")
async def graph_rag(self, question: str, user: str, collection: str,
max_subgraph_size: int = 1000, max_subgraph_count: int = 5,
max_entity_distance: int = 3, **kwargs: Any) -> str:
"""Graph RAG (non-streaming, use async_socket for streaming)"""
request_data = {
"question": question,
"user": user,
"collection": collection,
"max-subgraph-size": max_subgraph_size,
"max-subgraph-count": max_subgraph_count,
"max-entity-distance": max_entity_distance,
"streaming": False
}
request_data.update(kwargs)
result = await self.request("graph-rag", request_data)
return result.get("response", "")
async def document_rag(self, question: str, user: str, collection: str,
doc_limit: int = 10, **kwargs: Any) -> str:
"""Document RAG (non-streaming, use async_socket for streaming)"""
request_data = {
"question": question,
"user": user,
"collection": collection,
"doc-limit": doc_limit,
"streaming": False
}
request_data.update(kwargs)
result = await self.request("document-rag", request_data)
return result.get("response", "")
async def graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any):
"""Query graph embeddings for semantic search"""
request_data = {
"text": text,
"user": user,
"collection": collection,
"limit": limit
}
request_data.update(kwargs)
return await self.request("graph-embeddings", request_data)
async def embeddings(self, text: str, **kwargs: Any):
"""Generate text embeddings"""
request_data = {"text": text}
request_data.update(kwargs)
return await self.request("embeddings", request_data)
async def triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs: Any):
"""Triple pattern query"""
request_data = {"limit": limit}
if s is not None:
request_data["s"] = str(s)
if p is not None:
request_data["p"] = str(p)
if o is not None:
request_data["o"] = str(o)
if user is not None:
request_data["user"] = user
if collection is not None:
request_data["collection"] = collection
request_data.update(kwargs)
return await self.request("triples", request_data)
async def objects_query(self, query: str, user: str, collection: str, variables: Optional[Dict] = None,
operation_name: Optional[str] = None, **kwargs: Any):
"""GraphQL query"""
request_data = {
"query": query,
"user": user,
"collection": collection
}
if variables:
request_data["variables"] = variables
if operation_name:
request_data["operationName"] = operation_name
request_data.update(kwargs)
return await self.request("objects", request_data)

View file

@ -0,0 +1,33 @@
import aiohttp
from typing import Optional, Dict
class AsyncMetrics:
"""Asynchronous metrics client"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
self.url: str = url
self.timeout: int = timeout
self.token: Optional[str] = token
async def get(self) -> str:
"""Get Prometheus metrics as text"""
url: str = f"{self.url}/api/metrics"
headers: Dict[str, str] = {}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as resp:
if resp.status != 200:
raise Exception(f"Status code {resp.status}")
return await resp.text()
async def aclose(self) -> None:
"""Close connections"""
pass

View file

@ -0,0 +1,335 @@
import json
import websockets
from typing import Optional, Dict, Any, AsyncIterator, Union
from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk
from . exceptions import ProtocolException, ApplicationException
class AsyncSocketClient:
"""Asynchronous WebSocket client"""
def __init__(self, url: str, timeout: int, token: Optional[str]):
self.url = self._convert_to_ws_url(url)
self.timeout = timeout
self.token = token
self._request_counter = 0
def _convert_to_ws_url(self, url: str) -> str:
"""Convert HTTP URL to WebSocket URL"""
if url.startswith("http://"):
return url.replace("http://", "ws://", 1)
elif url.startswith("https://"):
return url.replace("https://", "wss://", 1)
elif url.startswith("ws://") or url.startswith("wss://"):
return url
else:
# Assume ws://
return f"ws://{url}"
def flow(self, flow_id: str):
"""Get async flow instance for WebSocket operations"""
return AsyncSocketFlowInstance(self, flow_id)
async def _send_request(self, service: str, flow: Optional[str], request: Dict[str, Any]):
"""Async WebSocket request implementation (non-streaming)"""
# Generate unique request ID
self._request_counter += 1
request_id = f"req-{self._request_counter}"
# Build WebSocket URL with optional token
ws_url = f"{self.url}/api/v1/socket"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
# Build request message
message = {
"id": request_id,
"service": service,
"request": request
}
if flow:
message["flow"] = flow
# Connect and send request
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
await websocket.send(json.dumps(message))
# Wait for single response
raw_message = await websocket.recv()
response = json.loads(raw_message)
if response.get("id") != request_id:
raise ProtocolException(f"Response ID mismatch")
if "error" in response:
raise ApplicationException(response["error"])
if "response" not in response:
raise ProtocolException(f"Missing response in message")
return response["response"]
async def _send_request_streaming(self, service: str, flow: Optional[str], request: Dict[str, Any]):
"""Async WebSocket request implementation (streaming)"""
# Generate unique request ID
self._request_counter += 1
request_id = f"req-{self._request_counter}"
# Build WebSocket URL with optional token
ws_url = f"{self.url}/api/v1/socket"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
# Build request message
message = {
"id": request_id,
"service": service,
"request": request
}
if flow:
message["flow"] = flow
# Connect and send request
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
await websocket.send(json.dumps(message))
# Yield chunks as they arrive
async for raw_message in websocket:
response = json.loads(raw_message)
if response.get("id") != request_id:
continue # Ignore messages for other requests
if "error" in response:
raise ApplicationException(response["error"])
if "response" in response:
resp = response["response"]
# Parse different chunk types
chunk = self._parse_chunk(resp)
yield chunk
# Check if this is the final chunk
if resp.get("end_of_stream") or resp.get("end_of_dialog") or response.get("complete"):
break
def _parse_chunk(self, resp: Dict[str, Any]):
"""Parse response chunk into appropriate type"""
chunk_type = resp.get("chunk_type")
if chunk_type == "thought":
return AgentThought(
content=resp.get("content", ""),
end_of_message=resp.get("end_of_message", False)
)
elif chunk_type == "observation":
return AgentObservation(
content=resp.get("content", ""),
end_of_message=resp.get("end_of_message", False)
)
elif chunk_type == "final-answer":
return AgentAnswer(
content=resp.get("content", ""),
end_of_message=resp.get("end_of_message", False),
end_of_dialog=resp.get("end_of_dialog", False)
)
else:
# RAG-style chunk (or generic chunk)
return RAGChunk(
content=resp.get("chunk", ""),
end_of_stream=resp.get("end_of_stream", False),
error=resp.get("error")
)
async def aclose(self):
"""Close WebSocket connection"""
# Cleanup handled by context manager
pass
class AsyncSocketFlowInstance:
"""Asynchronous WebSocket flow instance"""
def __init__(self, client: AsyncSocketClient, flow_id: str):
self.client = client
self.flow_id = flow_id
async def agent(self, question: str, user: str, state: Optional[Dict[str, Any]] = None,
group: Optional[str] = None, history: Optional[list] = None,
streaming: bool = False, **kwargs) -> Union[Dict[str, Any], AsyncIterator]:
"""Agent with optional streaming"""
request = {
"question": question,
"user": user,
"streaming": streaming
}
if state is not None:
request["state"] = state
if group is not None:
request["group"] = group
if history is not None:
request["history"] = history
request.update(kwargs)
if streaming:
return self.client._send_request_streaming("agent", self.flow_id, request)
else:
return await self.client._send_request("agent", self.flow_id, request)
async def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs):
"""Text completion with optional streaming"""
request = {
"system": system,
"prompt": prompt,
"streaming": streaming
}
request.update(kwargs)
if streaming:
return self._text_completion_streaming(request)
else:
result = await self.client._send_request("text-completion", self.flow_id, request)
return result.get("response", "")
async def _text_completion_streaming(self, request):
"""Helper for streaming text completion"""
async for chunk in self.client._send_request_streaming("text-completion", self.flow_id, request):
if hasattr(chunk, 'content'):
yield chunk.content
async def graph_rag(self, question: str, user: str, collection: str,
max_subgraph_size: int = 1000, max_subgraph_count: int = 5,
max_entity_distance: int = 3, streaming: bool = False, **kwargs):
"""Graph RAG with optional streaming"""
request = {
"question": question,
"user": user,
"collection": collection,
"max-subgraph-size": max_subgraph_size,
"max-subgraph-count": max_subgraph_count,
"max-entity-distance": max_entity_distance,
"streaming": streaming
}
request.update(kwargs)
if streaming:
return self._graph_rag_streaming(request)
else:
result = await self.client._send_request("graph-rag", self.flow_id, request)
return result.get("response", "")
async def _graph_rag_streaming(self, request):
"""Helper for streaming graph RAG"""
async for chunk in self.client._send_request_streaming("graph-rag", self.flow_id, request):
if hasattr(chunk, 'content'):
yield chunk.content
async def document_rag(self, question: str, user: str, collection: str,
doc_limit: int = 10, streaming: bool = False, **kwargs):
"""Document RAG with optional streaming"""
request = {
"question": question,
"user": user,
"collection": collection,
"doc-limit": doc_limit,
"streaming": streaming
}
request.update(kwargs)
if streaming:
return self._document_rag_streaming(request)
else:
result = await self.client._send_request("document-rag", self.flow_id, request)
return result.get("response", "")
async def _document_rag_streaming(self, request):
"""Helper for streaming document RAG"""
async for chunk in self.client._send_request_streaming("document-rag", self.flow_id, request):
if hasattr(chunk, 'content'):
yield chunk.content
async def prompt(self, id: str, variables: Dict[str, str], streaming: bool = False, **kwargs):
"""Execute prompt with optional streaming"""
request = {
"id": id,
"variables": variables,
"streaming": streaming
}
request.update(kwargs)
if streaming:
return self._prompt_streaming(request)
else:
result = await self.client._send_request("prompt", self.flow_id, request)
return result.get("response", "")
async def _prompt_streaming(self, request):
"""Helper for streaming prompt"""
async for chunk in self.client._send_request_streaming("prompt", self.flow_id, request):
if hasattr(chunk, 'content'):
yield chunk.content
async def graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs):
"""Query graph embeddings for semantic search"""
request = {
"text": text,
"user": user,
"collection": collection,
"limit": limit
}
request.update(kwargs)
return await self.client._send_request("graph-embeddings", self.flow_id, request)
async def embeddings(self, text: str, **kwargs):
"""Generate text embeddings"""
request = {"text": text}
request.update(kwargs)
return await self.client._send_request("embeddings", self.flow_id, request)
async def triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs):
"""Triple pattern query"""
request = {"limit": limit}
if s is not None:
request["s"] = str(s)
if p is not None:
request["p"] = str(p)
if o is not None:
request["o"] = str(o)
if user is not None:
request["user"] = user
if collection is not None:
request["collection"] = collection
request.update(kwargs)
return await self.client._send_request("triples", self.flow_id, request)
async def objects_query(self, query: str, user: str, collection: str, variables: Optional[Dict] = None,
operation_name: Optional[str] = None, **kwargs):
"""GraphQL query"""
request = {
"query": query,
"user": user,
"collection": collection
}
if variables:
request["variables"] = variables
if operation_name:
request["operationName"] = operation_name
request.update(kwargs)
return await self.client._send_request("objects", self.flow_id, request)
async def mcp_tool(self, name: str, parameters: Dict[str, Any], **kwargs):
"""Execute MCP tool"""
request = {
"name": name,
"parameters": parameters
}
request.update(kwargs)
return await self.client._send_request("mcp-tool", self.flow_id, request)

View file

@ -0,0 +1,270 @@
import json
import asyncio
import websockets
from typing import Optional, Iterator, Dict, Any, Coroutine
from . types import Triple
from . exceptions import ProtocolException
class BulkClient:
"""Synchronous bulk operations client"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
self.url: str = self._convert_to_ws_url(url)
self.timeout: int = timeout
self.token: Optional[str] = token
def _convert_to_ws_url(self, url: str) -> str:
"""Convert HTTP URL to WebSocket URL"""
if url.startswith("http://"):
return url.replace("http://", "ws://", 1)
elif url.startswith("https://"):
return url.replace("https://", "wss://", 1)
elif url.startswith("ws://") or url.startswith("wss://"):
return url
else:
return f"ws://{url}"
def _run_async(self, coro: Coroutine[Any, Any, Any]) -> Any:
"""Run async coroutine synchronously"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
def import_triples(self, flow: str, triples: Iterator[Triple], **kwargs: Any) -> None:
"""Bulk import triples via WebSocket"""
self._run_async(self._import_triples_async(flow, triples))
async def _import_triples_async(self, flow: str, triples: Iterator[Triple]) -> None:
"""Async implementation of triple import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
for triple in triples:
message = {
"s": triple.s,
"p": triple.p,
"o": triple.o
}
await websocket.send(json.dumps(message))
def export_triples(self, flow: str, **kwargs: Any) -> Iterator[Triple]:
"""Bulk export triples via WebSocket"""
async_gen = self._export_triples_async(flow)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while True:
try:
triple = loop.run_until_complete(async_gen.__anext__())
yield triple
except StopAsyncIteration:
break
finally:
try:
loop.run_until_complete(async_gen.aclose())
except:
pass
async def _export_triples_async(self, flow: str) -> Iterator[Triple]:
"""Async implementation of triple export"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/triples"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
data = json.loads(raw_message)
yield Triple(
s=data.get("s", ""),
p=data.get("p", ""),
o=data.get("o", "")
)
def import_graph_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import graph embeddings via WebSocket"""
self._run_async(self._import_graph_embeddings_async(flow, embeddings))
async def _import_graph_embeddings_async(self, flow: str, embeddings: Iterator[Dict[str, Any]]) -> None:
"""Async implementation of graph embeddings import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/graph-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
for embedding in embeddings:
await websocket.send(json.dumps(embedding))
def export_graph_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]:
"""Bulk export graph embeddings via WebSocket"""
async_gen = self._export_graph_embeddings_async(flow)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while True:
try:
embedding = loop.run_until_complete(async_gen.__anext__())
yield embedding
except StopAsyncIteration:
break
finally:
try:
loop.run_until_complete(async_gen.aclose())
except:
pass
async def _export_graph_embeddings_async(self, flow: str) -> Iterator[Dict[str, Any]]:
"""Async implementation of graph embeddings export"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/graph-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
yield json.loads(raw_message)
def import_document_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import document embeddings via WebSocket"""
self._run_async(self._import_document_embeddings_async(flow, embeddings))
async def _import_document_embeddings_async(self, flow: str, embeddings: Iterator[Dict[str, Any]]) -> None:
"""Async implementation of document embeddings import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/document-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
for embedding in embeddings:
await websocket.send(json.dumps(embedding))
def export_document_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]:
"""Bulk export document embeddings via WebSocket"""
async_gen = self._export_document_embeddings_async(flow)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while True:
try:
embedding = loop.run_until_complete(async_gen.__anext__())
yield embedding
except StopAsyncIteration:
break
finally:
try:
loop.run_until_complete(async_gen.aclose())
except:
pass
async def _export_document_embeddings_async(self, flow: str) -> Iterator[Dict[str, Any]]:
"""Async implementation of document embeddings export"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/document-embeddings"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
yield json.loads(raw_message)
def import_entity_contexts(self, flow: str, contexts: Iterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import entity contexts via WebSocket"""
self._run_async(self._import_entity_contexts_async(flow, contexts))
async def _import_entity_contexts_async(self, flow: str, contexts: Iterator[Dict[str, Any]]) -> None:
"""Async implementation of entity contexts import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
for context in contexts:
await websocket.send(json.dumps(context))
def export_entity_contexts(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]:
"""Bulk export entity contexts via WebSocket"""
async_gen = self._export_entity_contexts_async(flow)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while True:
try:
context = loop.run_until_complete(async_gen.__anext__())
yield context
except StopAsyncIteration:
break
finally:
try:
loop.run_until_complete(async_gen.aclose())
except:
pass
async def _export_entity_contexts_async(self, flow: str) -> Iterator[Dict[str, Any]]:
"""Async implementation of entity contexts export"""
ws_url = f"{self.url}/api/v1/flow/{flow}/export/entity-contexts"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
async for raw_message in websocket:
yield json.loads(raw_message)
def import_objects(self, flow: str, objects: Iterator[Dict[str, Any]], **kwargs: Any) -> None:
"""Bulk import objects via WebSocket"""
self._run_async(self._import_objects_async(flow, objects))
async def _import_objects_async(self, flow: str, objects: Iterator[Dict[str, Any]]) -> None:
"""Async implementation of objects import"""
ws_url = f"{self.url}/api/v1/flow/{flow}/import/objects"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
for obj in objects:
await websocket.send(json.dumps(obj))
def close(self) -> None:
"""Close connections"""
# Cleanup handled by context managers
pass

View file

@ -211,6 +211,21 @@ class FlowInstance:
input
)["vectors"]
def graph_embeddings_query(self, text, user, collection, limit=10):
# Query graph embeddings for semantic search
input = {
"text": text,
"user": user,
"collection": collection,
"limit": limit
}
return self.request(
"service/graph-embeddings",
input
)
def prompt(self, id, variables):
input = {

View file

@ -0,0 +1,27 @@
import requests
from typing import Optional, Dict
class Metrics:
"""Synchronous metrics client"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
self.url: str = url
self.timeout: int = timeout
self.token: Optional[str] = token
def get(self) -> str:
"""Get Prometheus metrics as text"""
url: str = f"{self.url}/api/metrics"
headers: Dict[str, str] = {}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
resp = requests.get(url, timeout=self.timeout, headers=headers)
if resp.status_code != 200:
raise Exception(f"Status code {resp.status_code}")
return resp.text

View file

@ -0,0 +1,445 @@
import json
import asyncio
import websockets
from typing import Optional, Dict, Any, Iterator, Union, List
from threading import Lock
from . types import AgentThought, AgentObservation, AgentAnswer, RAGChunk, StreamingChunk
from . exceptions import ProtocolException, ApplicationException
class SocketClient:
"""Synchronous WebSocket client (wraps async websockets library)"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
self.url: str = self._convert_to_ws_url(url)
self.timeout: int = timeout
self.token: Optional[str] = token
self._connection: Optional[Any] = None
self._request_counter: int = 0
self._lock: Lock = Lock()
self._loop: Optional[asyncio.AbstractEventLoop] = None
def _convert_to_ws_url(self, url: str) -> str:
"""Convert HTTP URL to WebSocket URL"""
if url.startswith("http://"):
return url.replace("http://", "ws://", 1)
elif url.startswith("https://"):
return url.replace("https://", "wss://", 1)
elif url.startswith("ws://") or url.startswith("wss://"):
return url
else:
# Assume ws://
return f"ws://{url}"
def flow(self, flow_id: str) -> "SocketFlowInstance":
"""Get flow instance for WebSocket operations"""
return SocketFlowInstance(self, flow_id)
def _send_request_sync(
self,
service: str,
flow: Optional[str],
request: Dict[str, Any],
streaming: bool = False
) -> Union[Dict[str, Any], Iterator[StreamingChunk]]:
"""Synchronous wrapper around async WebSocket communication"""
# Create event loop if needed
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is running (e.g., in Jupyter), create new loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if streaming:
# For streaming, we need to return an iterator
# Create a generator that runs async code
return self._streaming_generator(service, flow, request, loop)
else:
# For non-streaming, just run the async code and return result
return loop.run_until_complete(self._send_request_async(service, flow, request))
def _streaming_generator(
self,
service: str,
flow: Optional[str],
request: Dict[str, Any],
loop: asyncio.AbstractEventLoop
) -> Iterator[StreamingChunk]:
"""Generator that yields streaming chunks"""
async_gen = self._send_request_async_streaming(service, flow, request)
try:
while True:
try:
chunk = loop.run_until_complete(async_gen.__anext__())
yield chunk
except StopAsyncIteration:
break
finally:
# Clean up async generator
try:
loop.run_until_complete(async_gen.aclose())
except:
pass
async def _send_request_async(
self,
service: str,
flow: Optional[str],
request: Dict[str, Any]
) -> Dict[str, Any]:
"""Async implementation of WebSocket request (non-streaming)"""
# Generate unique request ID
with self._lock:
self._request_counter += 1
request_id = f"req-{self._request_counter}"
# Build WebSocket URL with optional token
ws_url = f"{self.url}/api/v1/socket"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
# Build request message
message = {
"id": request_id,
"service": service,
"request": request
}
if flow:
message["flow"] = flow
# Connect and send request
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
await websocket.send(json.dumps(message))
# Wait for single response
raw_message = await websocket.recv()
response = json.loads(raw_message)
if response.get("id") != request_id:
raise ProtocolException(f"Response ID mismatch")
if "error" in response:
raise ApplicationException(response["error"])
if "response" not in response:
raise ProtocolException(f"Missing response in message")
return response["response"]
async def _send_request_async_streaming(
self,
service: str,
flow: Optional[str],
request: Dict[str, Any]
) -> Iterator[StreamingChunk]:
"""Async implementation of WebSocket request (streaming)"""
# Generate unique request ID
with self._lock:
self._request_counter += 1
request_id = f"req-{self._request_counter}"
# Build WebSocket URL with optional token
ws_url = f"{self.url}/api/v1/socket"
if self.token:
ws_url = f"{ws_url}?token={self.token}"
# Build request message
message = {
"id": request_id,
"service": service,
"request": request
}
if flow:
message["flow"] = flow
# Connect and send request
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket:
await websocket.send(json.dumps(message))
# Yield chunks as they arrive
async for raw_message in websocket:
response = json.loads(raw_message)
if response.get("id") != request_id:
continue # Ignore messages for other requests
if "error" in response:
raise ApplicationException(response["error"])
if "response" in response:
resp = response["response"]
# Parse different chunk types
chunk = self._parse_chunk(resp)
yield chunk
# Check if this is the final chunk
if resp.get("end_of_stream") or resp.get("end_of_dialog") or response.get("complete"):
break
def _parse_chunk(self, resp: Dict[str, Any]) -> StreamingChunk:
"""Parse response chunk into appropriate type"""
chunk_type = resp.get("chunk_type")
if chunk_type == "thought":
return AgentThought(
content=resp.get("content", ""),
end_of_message=resp.get("end_of_message", False)
)
elif chunk_type == "observation":
return AgentObservation(
content=resp.get("content", ""),
end_of_message=resp.get("end_of_message", False)
)
elif chunk_type == "final-answer":
return AgentAnswer(
content=resp.get("content", ""),
end_of_message=resp.get("end_of_message", False),
end_of_dialog=resp.get("end_of_dialog", False)
)
else:
# RAG-style chunk (or generic chunk)
return RAGChunk(
content=resp.get("chunk", ""),
end_of_stream=resp.get("end_of_stream", False),
error=resp.get("error")
)
def close(self) -> None:
"""Close WebSocket connection"""
# Cleanup handled by context manager in async code
pass
class SocketFlowInstance:
"""Synchronous WebSocket flow instance with same interface as REST FlowInstance"""
def __init__(self, client: SocketClient, flow_id: str) -> None:
self.client: SocketClient = client
self.flow_id: str = flow_id
def agent(
self,
question: str,
user: str,
state: Optional[Dict[str, Any]] = None,
group: Optional[str] = None,
history: Optional[List[Dict[str, Any]]] = None,
streaming: bool = False,
**kwargs: Any
) -> Union[Dict[str, Any], Iterator[StreamingChunk]]:
"""Agent with optional streaming"""
request = {
"question": question,
"user": user,
"streaming": streaming
}
if state is not None:
request["state"] = state
if group is not None:
request["group"] = group
if history is not None:
request["history"] = history
request.update(kwargs)
return self.client._send_request_sync("agent", self.flow_id, request, streaming)
def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> Union[str, Iterator[str]]:
"""Text completion with optional streaming"""
request = {
"system": system,
"prompt": prompt,
"streaming": streaming
}
request.update(kwargs)
result = self.client._send_request_sync("text-completion", self.flow_id, request, streaming)
if streaming:
# For text completion, yield just the content
for chunk in result:
if hasattr(chunk, 'content'):
yield chunk.content
else:
return result.get("response", "")
def graph_rag(
self,
question: str,
user: str,
collection: str,
max_subgraph_size: int = 1000,
max_subgraph_count: int = 5,
max_entity_distance: int = 3,
streaming: bool = False,
**kwargs: Any
) -> Union[str, Iterator[str]]:
"""Graph RAG with optional streaming"""
request = {
"question": question,
"user": user,
"collection": collection,
"max-subgraph-size": max_subgraph_size,
"max-subgraph-count": max_subgraph_count,
"max-entity-distance": max_entity_distance,
"streaming": streaming
}
request.update(kwargs)
result = self.client._send_request_sync("graph-rag", self.flow_id, request, streaming)
if streaming:
for chunk in result:
if hasattr(chunk, 'content'):
yield chunk.content
else:
return result.get("response", "")
def document_rag(
self,
question: str,
user: str,
collection: str,
doc_limit: int = 10,
streaming: bool = False,
**kwargs: Any
) -> Union[str, Iterator[str]]:
"""Document RAG with optional streaming"""
request = {
"question": question,
"user": user,
"collection": collection,
"doc-limit": doc_limit,
"streaming": streaming
}
request.update(kwargs)
result = self.client._send_request_sync("document-rag", self.flow_id, request, streaming)
if streaming:
for chunk in result:
if hasattr(chunk, 'content'):
yield chunk.content
else:
return result.get("response", "")
def prompt(
self,
id: str,
variables: Dict[str, str],
streaming: bool = False,
**kwargs: Any
) -> Union[str, Iterator[str]]:
"""Execute prompt with optional streaming"""
request = {
"id": id,
"variables": variables,
"streaming": streaming
}
request.update(kwargs)
result = self.client._send_request_sync("prompt", self.flow_id, request, streaming)
if streaming:
for chunk in result:
if hasattr(chunk, 'content'):
yield chunk.content
else:
return result.get("response", "")
def graph_embeddings_query(
self,
text: str,
user: str,
collection: str,
limit: int = 10,
**kwargs: Any
) -> Dict[str, Any]:
"""Query graph embeddings for semantic search"""
request = {
"text": text,
"user": user,
"collection": collection,
"limit": limit
}
request.update(kwargs)
return self.client._send_request_sync("graph-embeddings", self.flow_id, request, False)
def embeddings(self, text: str, **kwargs: Any) -> Dict[str, Any]:
"""Generate text embeddings"""
request = {"text": text}
request.update(kwargs)
return self.client._send_request_sync("embeddings", self.flow_id, request, False)
def triples_query(
self,
s: Optional[str] = None,
p: Optional[str] = None,
o: Optional[str] = None,
user: Optional[str] = None,
collection: Optional[str] = None,
limit: int = 100,
**kwargs: Any
) -> Dict[str, Any]:
"""Triple pattern query"""
request = {"limit": limit}
if s is not None:
request["s"] = str(s)
if p is not None:
request["p"] = str(p)
if o is not None:
request["o"] = str(o)
if user is not None:
request["user"] = user
if collection is not None:
request["collection"] = collection
request.update(kwargs)
return self.client._send_request_sync("triples", self.flow_id, request, False)
def objects_query(
self,
query: str,
user: str,
collection: str,
variables: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None,
**kwargs: Any
) -> Dict[str, Any]:
"""GraphQL query"""
request = {
"query": query,
"user": user,
"collection": collection
}
if variables:
request["variables"] = variables
if operation_name:
request["operationName"] = operation_name
request.update(kwargs)
return self.client._send_request_sync("objects", self.flow_id, request, False)
def mcp_tool(
self,
name: str,
parameters: Dict[str, Any],
**kwargs: Any
) -> Dict[str, Any]:
"""Execute MCP tool"""
request = {
"name": name,
"parameters": parameters
}
request.update(kwargs)
return self.client._send_request_sync("mcp-tool", self.flow_id, request, False)

View file

@ -1,7 +1,7 @@
import dataclasses
import datetime
from typing import List
from typing import List, Optional, Dict, Any
from .. knowledge import hash, Uri, Literal
@dataclasses.dataclass
@ -51,3 +51,33 @@ class CollectionMetadata:
tags : List[str]
created_at : str
updated_at : str
# Streaming chunk types
@dataclasses.dataclass
class StreamingChunk:
"""Base class for streaming chunks"""
content: str
end_of_message: bool = False
@dataclasses.dataclass
class AgentThought(StreamingChunk):
"""Agent reasoning chunk"""
chunk_type: str = "thought"
@dataclasses.dataclass
class AgentObservation(StreamingChunk):
"""Agent tool observation chunk"""
chunk_type: str = "observation"
@dataclasses.dataclass
class AgentAnswer(StreamingChunk):
"""Agent final answer chunk"""
chunk_type: str = "final-answer"
end_of_dialog: bool = False
@dataclasses.dataclass
class RAGChunk(StreamingChunk):
"""RAG streaming chunk"""
end_of_stream: bool = False
error: Optional[Dict[str, str]] = None