diff --git a/Makefile b/Makefile index 4d79f554..197a6c63 100644 --- a/Makefile +++ b/Makefile @@ -77,8 +77,8 @@ some-containers: -t ${CONTAINER_BASE}/trustgraph-base:${VERSION} . ${DOCKER} build -f containers/Containerfile.flow \ -t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} . - ${DOCKER} build -f containers/Containerfile.unstructured \ - -t ${CONTAINER_BASE}/trustgraph-unstructured:${VERSION} . +# ${DOCKER} build -f containers/Containerfile.unstructured \ +# -t ${CONTAINER_BASE}/trustgraph-unstructured:${VERSION} . # ${DOCKER} build -f containers/Containerfile.vertexai \ # -t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} . # ${DOCKER} build -f containers/Containerfile.mcp \ diff --git a/dev-tools/library_client.py b/dev-tools/library_client.py new file mode 100644 index 00000000..ae9d6857 --- /dev/null +++ b/dev-tools/library_client.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +""" +Client utility for browsing and loading documents from the TrustGraph +public document library. + +Usage: + python library_client.py list + python library_client.py search + python library_client.py load-all + python library_client.py load-doc + python library_client.py load-match +""" + +import json +import urllib.request +import sys +import os +import argparse + +from trustgraph.api import Api +from trustgraph.api.types import Uri, Literal, Triple + +BUCKET_URL = "https://storage.googleapis.com/trustgraph-library" +INDEX_URL = f"{BUCKET_URL}/index.json" + +default_url = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/") +default_user = "trustgraph" +default_token = os.getenv("TRUSTGRAPH_TOKEN", None) + + +def fetch_index(): + with urllib.request.urlopen(INDEX_URL) as resp: + return json.loads(resp.read()) + + +def fetch_document_metadata(doc_id): + url = f"{BUCKET_URL}/{doc_id}.json" + with urllib.request.urlopen(url) as resp: + return json.loads(resp.read()) + + +def fetch_document_content(doc_id): + url = f"{BUCKET_URL}/{doc_id}.epub" + with urllib.request.urlopen(url) as resp: + return resp.read() + + +def search_index(index, query): + query = query.lower() + results = [] + for doc in index: + title = doc.get("title", "").lower() + comments = doc.get("comments", "").lower() + tags = [t.lower() for t in doc.get("tags", [])] + if (query in title or query in comments or + any(query in t for t in tags)): + results.append(doc) + return results + + +def print_index(index): + if not index: + return + + # Calculate column widths + id_width = max(len(str(doc.get("id", ""))) for doc in index) + title_width = max(len(doc.get("title", "")) for doc in index) + + # Cap title width for readability + title_width = min(title_width, 60) + id_width = max(id_width, 2) + + try: + term_width = os.get_terminal_size().columns + except OSError: + term_width = 120 + + tags_width = max(term_width - id_width - title_width - 6, 20) + + header = f"{'ID':<{id_width}} {'Title':<{title_width}} {'Tags':<{tags_width}}" + print(header) + print("-" * len(header)) + + for doc in index: + eid = str(doc.get("id", "")) + title = doc.get("title", "") + if len(title) > title_width: + title = title[:title_width - 3] + "..." + tags = ", ".join(doc.get("tags", [])) + if len(tags) > tags_width: + tags = tags[:tags_width - 3] + "..." + print(f"{eid:<{id_width}} {title:<{title_width}} {tags}") + + +def convert_value(v): + """Convert a JSON triple value to a Uri or Literal.""" + if v["type"] == "uri": + return Uri(v["value"]) + else: + return Literal(v["value"]) + + +def convert_metadata(metadata_json): + """Convert JSON metadata triples to Triple objects.""" + triples = [] + for t in metadata_json: + triples.append(Triple( + s=convert_value(t["s"]), + p=convert_value(t["p"]), + o=convert_value(t["o"]), + )) + return triples + + +def load_document(api, user, doc_entry): + """Fetch metadata and content for a document, then load into TrustGraph.""" + doc_id = doc_entry["id"] + title = doc_entry["title"] + + print(f" [{doc_id}] {title}") + + print(f" fetching metadata...") + doc_json = fetch_document_metadata(doc_id) + doc = doc_json[0] + + print(f" fetching content...") + content = fetch_document_content(doc_id) + + print(f" loading into TrustGraph ({len(content) // 1024}KB)...") + metadata = convert_metadata(doc["metadata"]) + + api.add_document( + id=doc["id"], + metadata=metadata, + user=user, + kind=doc["kind"], + title=doc["title"], + comments=doc["comments"], + tags=doc["tags"], + document=content, + ) + + print(f" done.") + + +def load_documents(api, user, docs): + """Load a list of documents.""" + print(f"Loading {len(docs)} document(s)...\n") + for doc in docs: + try: + load_document(api, user, doc) + except Exception as e: + print(f" FAILED: {e}", file=sys.stderr) + print() + print("Complete.") + + +def main(): + parser = argparse.ArgumentParser( + description="Browse and load documents from the TrustGraph public document library.", + ) + + parser.add_argument( + "-u", "--url", default=default_url, + help=f"TrustGraph API URL (default: {default_url})", + ) + parser.add_argument( + "-U", "--user", default=default_user, + help=f"User ID (default: {default_user})", + ) + parser.add_argument( + "-t", "--token", default=default_token, + help="Authentication token (default: $TRUSTGRAPH_TOKEN)", + ) + + sub = parser.add_subparsers(dest="command") + + sub.add_parser("list", help="List all documents") + + search_parser = sub.add_parser("search", help="Search documents") + search_parser.add_argument("query", help="Text to search for") + + sub.add_parser("load-all", help="Load all documents into TrustGraph") + + load_doc_parser = sub.add_parser("load-doc", help="Load a document by ID") + load_doc_parser.add_argument("id", help="Document ID (ebook number)") + + load_match_parser = sub.add_parser( + "load-match", help="Load all documents matching a search term", + ) + load_match_parser.add_argument("query", help="Text to search for") + + args = parser.parse_args() + + if args.command is None: + parser.print_help() + sys.exit(1) + + index = fetch_index() + + if args.command in ("list", "search"): + if args.command == "list": + print_index(index) + else: + results = search_index(index, args.query) + if results: + print_index(results) + else: + print("No matches found.", file=sys.stderr) + sys.exit(1) + return + + # Load commands need the API + api = Api(args.url, token=args.token).library() + + if args.command == "load-all": + load_documents(api, args.user, index) + + elif args.command == "load-doc": + matches = [d for d in index if str(d.get("id")) == args.id] + if not matches: + print(f"No document with ID '{args.id}' found.", file=sys.stderr) + sys.exit(1) + load_documents(api, args.user, matches) + + elif args.command == "load-match": + results = search_index(index, args.query) + if results: + load_documents(api, args.user, results) + else: + print("No matches found.", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/dev-tools/tests/agent_dag/analyse_trace.py b/dev-tools/tests/agent_dag/analyse_trace.py new file mode 100644 index 00000000..b71cdebe --- /dev/null +++ b/dev-tools/tests/agent_dag/analyse_trace.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +""" +Analyse a captured agent trace JSON file and check DAG integrity. + +Usage: + python analyse_trace.py react.json + python analyse_trace.py -u http://localhost:8088/ react.json +""" + +import argparse +import asyncio +import json +import os +import sys +import websockets + +DEFAULT_URL = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/") +DEFAULT_USER = "trustgraph" +DEFAULT_COLLECTION = "default" +DEFAULT_FLOW = "default" +GRAPH = "urn:graph:retrieval" + +# Namespace prefixes +PROV = "http://www.w3.org/ns/prov#" +RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" +RDFS = "http://www.w3.org/2000/01/rdf-schema#" +TG = "https://trustgraph.ai/ns/" + +PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom" +RDF_TYPE = RDF + "type" + +TG_ANALYSIS = TG + "Analysis" +TG_TOOL_USE = TG + "ToolUse" +TG_OBSERVATION_TYPE = TG + "Observation" +TG_CONCLUSION = TG + "Conclusion" +TG_SYNTHESIS = TG + "Synthesis" +TG_QUESTION = TG + "Question" + + +def shorten(uri): + """Shorten a URI for display.""" + for prefix, short in [ + (PROV, "prov:"), (RDF, "rdf:"), (RDFS, "rdfs:"), (TG, "tg:"), + ]: + if isinstance(uri, str) and uri.startswith(prefix): + return short + uri[len(prefix):] + return str(uri) + + +async def fetch_triples(ws, flow, subject, user, collection, request_counter): + """Query triples for a given subject URI.""" + request_counter[0] += 1 + req_id = f"q-{request_counter[0]}" + + msg = { + "id": req_id, + "service": "triples", + "flow": flow, + "request": { + "s": {"t": "i", "i": subject}, + "g": GRAPH, + "user": user, + "collection": collection, + "limit": 100, + }, + } + + await ws.send(json.dumps(msg)) + + while True: + raw = await ws.recv() + resp = json.loads(raw) + if resp.get("id") == req_id: + inner = resp.get("response", {}) + if isinstance(inner, dict): + return inner.get("response", []) + return inner + + +def extract_term(term): + """Extract value from wire-format term.""" + if not term: + return "" + t = term.get("t", "") + if t == "i": + return term.get("i", "") + elif t == "l": + return term.get("v", "") + elif t == "t": + tr = term.get("tr", {}) + return { + "s": extract_term(tr.get("s", {})), + "p": extract_term(tr.get("p", {})), + "o": extract_term(tr.get("o", {})), + } + return str(term) + + +def parse_triples(wire_triples): + """Convert wire triples to (s, p, o) tuples.""" + result = [] + for t in wire_triples: + s = extract_term(t.get("s", {})) + p = extract_term(t.get("p", {})) + o = extract_term(t.get("o", {})) + result.append((s, p, o)) + return result + + +def get_types(tuples): + """Get rdf:type values from parsed triples.""" + return {o for s, p, o in tuples if p == RDF_TYPE} + + +def get_derived_from(tuples): + """Get prov:wasDerivedFrom targets from parsed triples.""" + return [o for s, p, o in tuples if p == PROV_WAS_DERIVED_FROM] + + +async def analyse(path, url, flow, user, collection): + with open(path) as f: + messages = json.load(f) + + print(f"Total messages: {len(messages)}") + print() + + # ---- Pass 1: collect explain IDs and check streaming chunks ---- + + explain_ids = [] + errors = [] + + for i, msg in enumerate(messages): + resp = msg.get("response", {}) + chunk_type = resp.get("chunk_type", "?") + + if chunk_type == "explain": + explain_id = resp.get("explain_id", "") + explain_ids.append(explain_id) + print(f" {i:3d} {chunk_type} {explain_id}") + else: + print(f" {i:3d} {chunk_type}") + + # Rule 7: message_id on content chunks + if chunk_type in ("thought", "observation", "answer"): + mid = resp.get("message_id", "") + if not mid: + errors.append( + f"[msg {i}] {chunk_type} chunk missing message_id" + ) + + print() + print(f"Explain IDs ({len(explain_ids)}):") + for eid in explain_ids: + print(f" {eid}") + + # ---- Pass 2: fetch triples for each explain ID ---- + + ws_url = url.replace("http://", "ws://").replace("https://", "wss://") + ws_url = f"{ws_url.rstrip('/')}/api/v1/socket" + + request_counter = [0] + # entity_id -> parsed triples [(s, p, o), ...] + entities = {} + + print() + print("Fetching triples...") + print() + + async with websockets.connect(ws_url, ping_interval=20, ping_timeout=60) as ws: + for eid in explain_ids: + wire = await fetch_triples( + ws, flow, eid, user, collection, request_counter, + ) + + tuples = parse_triples(wire) if isinstance(wire, list) else [] + entities[eid] = tuples + + print(f" {eid}") + for s, p, o in tuples: + o_short = str(o) + if len(o_short) > 80: + o_short = o_short[:77] + "..." + print(f" {shorten(p)} = {o_short}") + print() + + # ---- Pass 3: check rules ---- + + all_ids = set(entities.keys()) + + # Collect entity metadata + roots = [] # entities with no wasDerivedFrom + conclusions = [] # tg:Conclusion entities + analyses = [] # tg:Analysis entities + observations = [] # tg:Observation entities + + for eid, tuples in entities.items(): + types = get_types(tuples) + parents = get_derived_from(tuples) + + if not tuples: + errors.append(f"[{eid}] entity has no triples in store") + + if not parents: + roots.append(eid) + + if TG_CONCLUSION in types: + conclusions.append(eid) + if TG_ANALYSIS in types: + analyses.append(eid) + if TG_OBSERVATION_TYPE in types: + observations.append(eid) + + # Rule 4: every non-root entity has wasDerivedFrom + if parents: + for parent in parents: + # Rule 5: parent exists in known entities + if parent not in all_ids: + errors.append( + f"[{eid}] wasDerivedFrom target not in explain set: " + f"{parent}" + ) + + # Rule 6: Analysis entities must have ToolUse type + if TG_ANALYSIS in types and TG_TOOL_USE not in types: + errors.append( + f"[{eid}] Analysis entity missing tg:ToolUse type" + ) + + # Rule 1: exactly one root + if len(roots) == 0: + errors.append("No root entity found (all have wasDerivedFrom)") + elif len(roots) > 1: + errors.append( + f"Multiple roots ({len(roots)}) — expected exactly 1:" + ) + for r in roots: + types = get_types(entities[r]) + type_labels = ", ".join(shorten(t) for t in types) + errors.append(f" root: {r} [{type_labels}]") + + # Rule 2: exactly one terminal node (nothing derives from it) + # Build set of entities that are parents of something + has_children = set() + for eid, tuples in entities.items(): + for parent in get_derived_from(tuples): + has_children.add(parent) + + terminals = [eid for eid in all_ids if eid not in has_children] + if len(terminals) == 0: + errors.append("No terminal entity found (cycle?)") + elif len(terminals) > 1: + errors.append( + f"Multiple terminal entities ({len(terminals)}) — expected exactly 1:" + ) + for t in terminals: + types = get_types(entities[t]) + type_labels = ", ".join(shorten(ty) for ty in types) + errors.append(f" terminal: {t} [{type_labels}]") + + # Rule 8: Observation should not derive from Analysis if a sub-trace + # exists as a sibling. Check: if an Analysis has both a Question child + # and an Observation child, the Observation should derive from the + # sub-trace's Synthesis, not from the Analysis. + for obs_id in observations: + obs_parents = get_derived_from(entities[obs_id]) + for parent in obs_parents: + if parent in entities: + parent_types = get_types(entities[parent]) + if TG_ANALYSIS in parent_types: + # Check if this Analysis also has a Question child + # (i.e. a sub-trace exists) + has_subtrace = False + for other_id, other_tuples in entities.items(): + if other_id == obs_id: + continue + other_parents = get_derived_from(other_tuples) + other_types = get_types(other_tuples) + if (parent in other_parents + and TG_QUESTION in other_types): + has_subtrace = True + break + if has_subtrace: + errors.append( + f"[{obs_id}] Observation derives from Analysis " + f"{parent} which has a sub-trace — should derive " + f"from the sub-trace's Synthesis instead" + ) + + # ---- Report ---- + + print() + print("=" * 60) + if errors: + print(f"ERRORS ({len(errors)}):") + print() + for err in errors: + print(f" !! {err}") + else: + print("ALL CHECKS PASSED") + print("=" * 60) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("input", help="JSON trace file") + parser.add_argument("-u", "--url", default=DEFAULT_URL) + parser.add_argument("-f", "--flow", default=DEFAULT_FLOW) + parser.add_argument("-U", "--user", default=DEFAULT_USER) + parser.add_argument("-C", "--collection", default=DEFAULT_COLLECTION) + args = parser.parse_args() + + asyncio.run(analyse( + args.input, args.url, args.flow, + args.user, args.collection, + )) + + +if __name__ == "__main__": + main() diff --git a/dev-tools/tests/agent_dag/ws_capture.py b/dev-tools/tests/agent_dag/ws_capture.py new file mode 100644 index 00000000..3002d563 --- /dev/null +++ b/dev-tools/tests/agent_dag/ws_capture.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +""" +Connect to TrustGraph websocket, run an agent query, capture all +response messages to a JSON file. + +Usage: + python ws_capture.py -q "What is the document about?" -o trace.json + python ws_capture.py -q "..." -u http://localhost:8088/ -o out.json +""" + +import argparse +import asyncio +import json +import os +import websockets + +DEFAULT_URL = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/") +DEFAULT_USER = "trustgraph" +DEFAULT_COLLECTION = "default" +DEFAULT_FLOW = "default" + + +async def capture(url, flow, question, user, collection, output): + + # Convert to ws URL + ws_url = url.replace("http://", "ws://").replace("https://", "wss://") + ws_url = f"{ws_url.rstrip('/')}/api/v1/socket" + + async with websockets.connect(ws_url, ping_interval=20, ping_timeout=120) as ws: + + request = { + "id": "capture", + "service": "agent", + "flow": flow, + "request": { + "question": question, + "user": user, + "collection": collection, + "streaming": True, + }, + } + + await ws.send(json.dumps(request)) + + messages = [] + + async for raw in ws: + msg = json.loads(raw) + + if msg.get("id") != "capture": + continue + + messages.append(msg) + + if msg.get("complete"): + break + + with open(output, "w") as f: + json.dump(messages, f, indent=2) + + print(f"Captured {len(messages)} messages to {output}") + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("-q", "--question", required=True) + parser.add_argument("-o", "--output", default="trace.json") + parser.add_argument("-u", "--url", default=DEFAULT_URL) + parser.add_argument("-f", "--flow", default=DEFAULT_FLOW) + parser.add_argument("-U", "--user", default=DEFAULT_USER) + parser.add_argument("-C", "--collection", default=DEFAULT_COLLECTION) + args = parser.parse_args() + + asyncio.run(capture( + args.url, args.flow, args.question, + args.user, args.collection, args.output, + )) + + +if __name__ == "__main__": + main() diff --git a/dev-tools/tests/librarian/simple_text_download.py b/dev-tools/tests/librarian/simple_text_download.py new file mode 100644 index 00000000..6af2a60d --- /dev/null +++ b/dev-tools/tests/librarian/simple_text_download.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +""" +Minimal example: download a text document in tiny chunks via websocket API +""" + +import asyncio +import json +import base64 +import websockets + +async def main(): + url = "ws://localhost:8088/api/v1/socket" + + document_id = "test-chunked-doc-001" + chunk_size = 10 # Tiny chunks! + + request_id = 0 + + async def send_request(ws, request): + nonlocal request_id + request_id += 1 + msg = { + "id": f"req-{request_id}", + "service": "librarian", + "request": request + } + await ws.send(json.dumps(msg)) + response = json.loads(await ws.recv()) + if "error" in response: + raise Exception(response["error"]) + return response.get("response", {}) + + async with websockets.connect(url) as ws: + + print(f"Fetching document: {document_id}") + print(f"Chunk size: {chunk_size} bytes") + print() + + chunk_index = 0 + all_content = b"" + + while True: + resp = await send_request(ws, { + "operation": "stream-document", + "user": "trustgraph", + "document-id": document_id, + "chunk-index": chunk_index, + "chunk-size": chunk_size, + }) + + chunk_data = base64.b64decode(resp["content"]) + total_chunks = resp["total-chunks"] + total_bytes = resp["total-bytes"] + + print(f"Chunk {chunk_index}: {chunk_data}") + + all_content += chunk_data + chunk_index += 1 + + if chunk_index >= total_chunks: + break + + print() + print(f"Complete: {all_content}") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/dev-tools/tests/librarian/simple_text_upload.py b/dev-tools/tests/librarian/simple_text_upload.py new file mode 100644 index 00000000..e21bd185 --- /dev/null +++ b/dev-tools/tests/librarian/simple_text_upload.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +""" +Minimal example: upload a small text document via websocket API +""" + +import asyncio +import json +import base64 +import time +import websockets + +async def main(): + url = "ws://localhost:8088/api/v1/socket" + + # Small text content + content = b"AAAAAAAAAABBBBBBBBBBCCCCCCCCCC" + + request_id = 0 + + async def send_request(ws, request): + nonlocal request_id + request_id += 1 + msg = { + "id": f"req-{request_id}", + "service": "librarian", + "request": request + } + await ws.send(json.dumps(msg)) + response = json.loads(await ws.recv()) + if "error" in response: + raise Exception(response["error"]) + return response.get("response", {}) + + async with websockets.connect(url) as ws: + + print(f"Uploading {len(content)} bytes...") + + resp = await send_request(ws, { + "operation": "add-document", + "document-metadata": { + "id": "test-chunked-doc-001", + "time": int(time.time()), + "kind": "text/plain", + "title": "My Test Document", + "comments": "Small doc for chunk testing", + "user": "trustgraph", + "tags": ["test"], + "metadata": [], + }, + "content": base64.b64encode(content).decode("utf-8"), + }) + + print("Done!") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/dev-tools/tests/relay/test_rev_gateway.py b/dev-tools/tests/relay/test_rev_gateway.py new file mode 100644 index 00000000..fe200e46 --- /dev/null +++ b/dev-tools/tests/relay/test_rev_gateway.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +""" +WebSocket Test Client + +A simple client to test the reverse gateway through the relay. +Connects to the relay's /in endpoint and allows sending test messages. + +Usage: + python test_client.py [--uri URI] [--interactive] +""" + +import asyncio +import json +import logging +import argparse +import uuid +from aiohttp import ClientSession, WSMsgType + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("test_client") + +class TestClient: + """Simple WebSocket test client""" + + def __init__(self, uri: str): + self.uri = uri + self.session = None + self.ws = None + self.running = False + self.message_counter = 0 + self.client_id = str(uuid.uuid4())[:8] + + async def connect(self): + """Connect to the WebSocket""" + self.session = ClientSession() + logger.info(f"Connecting to {self.uri}") + self.ws = await self.session.ws_connect(self.uri) + logger.info("Connected successfully") + + async def disconnect(self): + """Disconnect from WebSocket""" + if self.ws and not self.ws.closed: + await self.ws.close() + if self.session and not self.session.closed: + await self.session.close() + logger.info("Disconnected") + + async def send_message(self, service: str, request_data: dict, flow: str = "default"): + """Send a properly formatted TrustGraph message""" + self.message_counter += 1 + message = { + "id": f"{self.client_id}-{self.message_counter}", + "service": service, + "request": request_data, + "flow": flow + } + + message_json = json.dumps(message, indent=2) + logger.info(f"Sending message:\n{message_json}") + await self.ws.send_str(json.dumps(message)) + + async def listen_for_responses(self): + """Listen for incoming messages""" + logger.info("Listening for responses...") + + async for msg in self.ws: + if msg.type == WSMsgType.TEXT: + try: + response = json.loads(msg.data) + logger.info(f"Received response:\n{json.dumps(response, indent=2)}") + except json.JSONDecodeError: + logger.info(f"Received text: {msg.data}") + elif msg.type == WSMsgType.BINARY: + logger.info(f"Received binary data: {len(msg.data)} bytes") + elif msg.type == WSMsgType.ERROR: + logger.error(f"WebSocket error: {self.ws.exception()}") + break + else: + logger.info(f"Connection closed: {msg.type}") + break + + async def interactive_mode(self): + """Interactive mode for manual testing""" + print("\n=== Interactive Test Client ===") + print("Available commands:") + print(" text-completion - Test text completion service") + print(" agent - Test agent service") + print(" embeddings - Test embeddings service") + print(" custom - Send custom message") + print(" quit - Exit") + print() + + # Start response listener + listen_task = asyncio.create_task(self.listen_for_responses()) + + try: + while True: + try: + command = input("Command> ").strip().lower() + + if command == "quit": + break + elif command == "text-completion": + await self.send_message("text-completion", { + "system": "You are a helpful assistant.", + "prompt": "What is 2+2?" + }) + elif command == "agent": + await self.send_message("agent", { + "question": "What is the capital of France?" + }) + elif command == "embeddings": + await self.send_message("embeddings", { + "text": "Hello world" + }) + elif command == "custom": + service = input("Service name> ").strip() + request_json = input("Request JSON> ").strip() + try: + request_data = json.loads(request_json) + await self.send_message(service, request_data) + except json.JSONDecodeError as e: + print(f"Invalid JSON: {e}") + elif command == "": + continue + else: + print(f"Unknown command: {command}") + + except KeyboardInterrupt: + break + except EOFError: + break + except Exception as e: + logger.error(f"Error in interactive mode: {e}") + + finally: + listen_task.cancel() + try: + await listen_task + except asyncio.CancelledError: + pass + + async def run_predefined_tests(self): + """Run a series of predefined tests""" + print("\n=== Running Predefined Tests ===") + + # Start response listener + listen_task = asyncio.create_task(self.listen_for_responses()) + + try: + # Test 1: Text completion + print("\n1. Testing text-completion service...") + await self.send_message("text-completion", { + "system": "You are a helpful assistant.", + "prompt": "What is 2+2?" + }) + await asyncio.sleep(2) + + # Test 2: Agent + print("\n2. Testing agent service...") + await self.send_message("agent", { + "question": "What is the capital of France?" + }) + await asyncio.sleep(2) + + # Test 3: Embeddings + print("\n3. Testing embeddings service...") + await self.send_message("embeddings", { + "text": "Hello world" + }) + await asyncio.sleep(2) + + # Test 4: Invalid service + print("\n4. Testing invalid service...") + await self.send_message("nonexistent-service", { + "test": "data" + }) + await asyncio.sleep(2) + + print("\nTests completed. Waiting for any remaining responses...") + await asyncio.sleep(3) + + finally: + listen_task.cancel() + try: + await listen_task + except asyncio.CancelledError: + pass + +async def main(): + parser = argparse.ArgumentParser( + description="WebSocket Test Client for Reverse Gateway" + ) + parser.add_argument( + '--uri', + default='ws://localhost:8080/in', + help='WebSocket URI to connect to (default: ws://localhost:8080/in)' + ) + parser.add_argument( + '--interactive', '-i', + action='store_true', + help='Run in interactive mode' + ) + parser.add_argument( + '--verbose', '-v', + action='store_true', + help='Enable verbose logging' + ) + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + client = TestClient(args.uri) + + try: + await client.connect() + + if args.interactive: + await client.interactive_mode() + else: + await client.run_predefined_tests() + + except KeyboardInterrupt: + print("\nShutdown requested by user") + except Exception as e: + logger.error(f"Client error: {e}") + finally: + await client.disconnect() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/dev-tools/tests/relay/websocket_relay.py b/dev-tools/tests/relay/websocket_relay.py new file mode 100644 index 00000000..d537f7da --- /dev/null +++ b/dev-tools/tests/relay/websocket_relay.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +WebSocket Relay Test Harness + +This script creates a relay server with two WebSocket endpoints: +- /in - for test clients to connect to +- /out - for reverse gateway to connect to + +Messages are bidirectionally relayed between the two connections. + +Usage: + python websocket_relay.py [--port PORT] [--host HOST] +""" + +import asyncio +import logging +import argparse +from aiohttp import web, WSMsgType +import weakref +from typing import Optional, Set + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("websocket_relay") + +class WebSocketRelay: + """WebSocket relay that forwards messages between 'in' and 'out' connections""" + + def __init__(self): + self.in_connections: Set = weakref.WeakSet() + self.out_connections: Set = weakref.WeakSet() + + async def handle_in_connection(self, request): + """Handle incoming connections on /in endpoint""" + ws = web.WebSocketResponse() + await ws.prepare(request) + + self.in_connections.add(ws) + logger.info(f"New 'in' connection. Total in: {len(self.in_connections)}, out: {len(self.out_connections)}") + + try: + async for msg in ws: + if msg.type == WSMsgType.TEXT: + data = msg.data + logger.info(f"IN → OUT: {data}") + await self._forward_to_out(data) + elif msg.type == WSMsgType.BINARY: + data = msg.data + logger.info(f"IN → OUT: {len(data)} bytes (binary)") + await self._forward_to_out(data, binary=True) + elif msg.type == WSMsgType.ERROR: + logger.error(f"WebSocket error on 'in' connection: {ws.exception()}") + break + else: + break + except Exception as e: + logger.error(f"Error in 'in' connection handler: {e}") + finally: + logger.info(f"'in' connection closed. Remaining in: {len(self.in_connections)}, out: {len(self.out_connections)}") + + return ws + + async def handle_out_connection(self, request): + """Handle outgoing connections on /out endpoint""" + ws = web.WebSocketResponse() + await ws.prepare(request) + + self.out_connections.add(ws) + logger.info(f"New 'out' connection. Total in: {len(self.in_connections)}, out: {len(self.out_connections)}") + + try: + async for msg in ws: + if msg.type == WSMsgType.TEXT: + data = msg.data + logger.info(f"OUT → IN: {data}") + await self._forward_to_in(data) + elif msg.type == WSMsgType.BINARY: + data = msg.data + logger.info(f"OUT → IN: {len(data)} bytes (binary)") + await self._forward_to_in(data, binary=True) + elif msg.type == WSMsgType.ERROR: + logger.error(f"WebSocket error on 'out' connection: {ws.exception()}") + break + else: + break + except Exception as e: + logger.error(f"Error in 'out' connection handler: {e}") + finally: + logger.info(f"'out' connection closed. Remaining in: {len(self.in_connections)}, out: {len(self.out_connections)}") + + return ws + + async def _forward_to_out(self, data, binary=False): + """Forward message from 'in' to all 'out' connections""" + if not self.out_connections: + logger.warning("No 'out' connections available to forward message") + return + + closed_connections = [] + for ws in list(self.out_connections): + try: + if ws.closed: + closed_connections.append(ws) + continue + + if binary: + await ws.send_bytes(data) + else: + await ws.send_str(data) + except Exception as e: + logger.error(f"Error forwarding to 'out' connection: {e}") + closed_connections.append(ws) + + # Clean up closed connections + for ws in closed_connections: + if ws in self.out_connections: + self.out_connections.discard(ws) + + async def _forward_to_in(self, data, binary=False): + """Forward message from 'out' to all 'in' connections""" + if not self.in_connections: + logger.warning("No 'in' connections available to forward message") + return + + closed_connections = [] + for ws in list(self.in_connections): + try: + if ws.closed: + closed_connections.append(ws) + continue + + if binary: + await ws.send_bytes(data) + else: + await ws.send_str(data) + except Exception as e: + logger.error(f"Error forwarding to 'in' connection: {e}") + closed_connections.append(ws) + + # Clean up closed connections + for ws in closed_connections: + if ws in self.in_connections: + self.in_connections.discard(ws) + +async def create_app(relay): + """Create the web application with routes""" + app = web.Application() + + # Add routes + app.router.add_get('/in', relay.handle_in_connection) + app.router.add_get('/out', relay.handle_out_connection) + + # Add a simple status endpoint + async def status(request): + status_info = { + 'in_connections': len(relay.in_connections), + 'out_connections': len(relay.out_connections), + 'status': 'running' + } + return web.json_response(status_info) + + app.router.add_get('/status', status) + app.router.add_get('/', status) # Root also shows status + + return app + +def main(): + parser = argparse.ArgumentParser( + description="WebSocket Relay Test Harness" + ) + parser.add_argument( + '--host', + default='localhost', + help='Host to bind to (default: localhost)' + ) + parser.add_argument( + '--port', + type=int, + default=8080, + help='Port to bind to (default: 8080)' + ) + parser.add_argument( + '--verbose', '-v', + action='store_true', + help='Enable verbose logging' + ) + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + relay = WebSocketRelay() + + print(f"Starting WebSocket Relay on {args.host}:{args.port}") + print(f" 'in' endpoint: ws://{args.host}:{args.port}/in") + print(f" 'out' endpoint: ws://{args.host}:{args.port}/out") + print(f" Status: http://{args.host}:{args.port}/status") + print() + print("Usage:") + print(f" Test client connects to: ws://{args.host}:{args.port}/in") + print(f" Reverse gateway connects to: ws://{args.host}:{args.port}/out") + + web.run_app(create_app(relay), host=args.host, port=args.port) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/dev-tools/tests/triples/load_test_triples.py b/dev-tools/tests/triples/load_test_triples.py new file mode 100755 index 00000000..a147d041 --- /dev/null +++ b/dev-tools/tests/triples/load_test_triples.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +""" +Load test triples into the triple store for testing tg-query-graph. + +Tests all graph features: +- SPO with IRI objects +- SPO with literal objects +- Literals with XML datatypes +- Literals with language tags +- Quoted triples (RDF-star) +- Named graphs +""" + +import asyncio +import json +import os +import websockets + +# Configuration +API_URL = os.getenv("TRUSTGRAPH_URL", "http://localhost:8088/") +TOKEN = os.getenv("TRUSTGRAPH_TOKEN", None) +FLOW = "default" +USER = "trustgraph" +COLLECTION = "default" +DOCUMENT_ID = "test-triples-001" + +# Namespaces +EX = "http://example.org/" +RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" +RDFS = "http://www.w3.org/2000/01/rdf-schema#" +XSD = "http://www.w3.org/2001/XMLSchema#" +TG = "https://trustgraph.ai/ns/" + + +def iri(value): + """Build IRI term.""" + return {"t": "i", "i": value} + + +def literal(value, datatype=None, language=None): + """Build literal term with optional datatype or language.""" + term = {"t": "l", "v": value} + if datatype: + term["dt"] = datatype + if language: + term["ln"] = language + return term + + +def quoted_triple(s, p, o): + """Build quoted triple term (RDF-star).""" + return { + "t": "t", + "tr": {"s": s, "p": p, "o": o} + } + + +def triple(s, p, o, g=None): + """Build a complete triple dict.""" + t = {"s": s, "p": p, "o": o} + if g: + t["g"] = g + return t + + +# Test triples covering all features +TEST_TRIPLES = [ + # 1. Basic SPO with IRI object + triple( + iri(f"{EX}marie-curie"), + iri(f"{RDF}type"), + iri(f"{EX}Scientist") + ), + + # 2. SPO with IRI object (relationship) + triple( + iri(f"{EX}marie-curie"), + iri(f"{EX}discovered"), + iri(f"{EX}radium") + ), + + # 3. Simple literal (no datatype/language) + triple( + iri(f"{EX}marie-curie"), + iri(f"{RDFS}label"), + literal("Marie Curie") + ), + + # 4. Literal with language tag (English) + triple( + iri(f"{EX}marie-curie"), + iri(f"{RDFS}label"), + literal("Marie Curie", language="en") + ), + + # 5. Literal with language tag (French) + triple( + iri(f"{EX}marie-curie"), + iri(f"{RDFS}label"), + literal("Marie Curie", language="fr") + ), + + # 6. Literal with language tag (Polish) + triple( + iri(f"{EX}marie-curie"), + iri(f"{RDFS}label"), + literal("Maria Sk\u0142odowska-Curie", language="pl") + ), + + # 7. Literal with xsd:integer datatype + triple( + iri(f"{EX}marie-curie"), + iri(f"{EX}birthYear"), + literal("1867", datatype=f"{XSD}integer") + ), + + # 8. Literal with xsd:date datatype + triple( + iri(f"{EX}marie-curie"), + iri(f"{EX}birthDate"), + literal("1867-11-07", datatype=f"{XSD}date") + ), + + # 9. Literal with xsd:boolean datatype + triple( + iri(f"{EX}marie-curie"), + iri(f"{EX}nobelLaureate"), + literal("true", datatype=f"{XSD}boolean") + ), + + # 10. Quoted triple in object position (RDF 1.2 style) + # "Wikipedia asserts that Marie Curie discovered radium" + triple( + iri(f"{EX}wikipedia"), + iri(f"{TG}asserts"), + quoted_triple( + iri(f"{EX}marie-curie"), + iri(f"{EX}discovered"), + iri(f"{EX}radium") + ) + ), + + # 11. Quoted triple with literal inside (object position) + # "NLP-v1.0 extracted that Marie Curie has label Marie Curie" + triple( + iri(f"{EX}nlp-v1"), + iri(f"{TG}extracted"), + quoted_triple( + iri(f"{EX}marie-curie"), + iri(f"{RDFS}label"), + literal("Marie Curie") + ) + ), + + # 12. Triple in a named graph (g is plain string, not Term) + triple( + iri(f"{EX}radium"), + iri(f"{RDF}type"), + iri(f"{EX}Element"), + g=f"{EX}chemistry-graph" + ), + + # 13. Another triple in the same named graph + triple( + iri(f"{EX}radium"), + iri(f"{EX}atomicNumber"), + literal("88", datatype=f"{XSD}integer"), + g=f"{EX}chemistry-graph" + ), + + # 14. Triple in a different named graph + triple( + iri(f"{EX}pierre-curie"), + iri(f"{EX}spouseOf"), + iri(f"{EX}marie-curie"), + g=f"{EX}biography-graph" + ), +] + + +async def load_triples(): + """Load test triples via WebSocket bulk import.""" + # Convert HTTP URL to WebSocket URL + ws_url = API_URL.replace("http://", "ws://").replace("https://", "wss://") + ws_url = f"{ws_url.rstrip('/')}/api/v1/flow/{FLOW}/import/triples" + if TOKEN: + ws_url = f"{ws_url}?token={TOKEN}" + + metadata = { + "id": DOCUMENT_ID, + "metadata": [], + "user": USER, + "collection": COLLECTION + } + + print(f"Connecting to {ws_url}...") + async with websockets.connect(ws_url, ping_interval=20, ping_timeout=60) as websocket: + message = { + "metadata": metadata, + "triples": TEST_TRIPLES + } + print(f"Sending {len(TEST_TRIPLES)} test triples...") + await websocket.send(json.dumps(message)) + print("Triples sent successfully!") + + print("\nTest triples loaded:") + print(" - 2 basic IRI triples (type, relationship)") + print(" - 4 literal triples (plain + 3 languages: en, fr, pl)") + print(" - 3 typed literal triples (xsd:integer, xsd:date, xsd:boolean)") + print(" - 2 quoted triples (RDF-star provenance)") + print(" - 3 triples in named graphs (chemistry-graph, biography-graph)") + print(f"\nTotal: {len(TEST_TRIPLES)} triples") + print(f"User: {USER}, Collection: {COLLECTION}") + + +def main(): + print("Loading test triples for tg-query-graph testing\n") + asyncio.run(load_triples()) + print("\nDone! Now test with:") + print(" tg-query-graph -s http://example.org/marie-curie") + print(" tg-query-graph -p http://www.w3.org/2000/01/rdf-schema#label") + print(" tg-query-graph -o 'Marie Curie' --object-language en") + print(" tg-query-graph --format json | jq .") + + +if __name__ == "__main__": + main() diff --git a/docs/api-gateway-changes-v1.8-to-v2.1.md b/docs/api-gateway-changes-v1.8-to-v2.1.md deleted file mode 100644 index 099dadb0..00000000 --- a/docs/api-gateway-changes-v1.8-to-v2.1.md +++ /dev/null @@ -1,108 +0,0 @@ -# API Gateway Changes: v1.8 to v2.1 - -## Summary - -The API gateway gained new WebSocket service dispatchers for embeddings -queries, a new REST streaming endpoint for document content, and underwent -a significant wire format change from `Value` to `Term`. The "objects" -service was renamed to "rows". - ---- - -## New WebSocket Service Dispatchers - -These are new request/response services available through the WebSocket -multiplexer at `/api/v1/socket` (flow-scoped): - -| Service Key | Description | -|-------------|-------------| -| `document-embeddings` | Queries document chunks by text similarity. Request/response uses `DocumentEmbeddingsRequest`/`DocumentEmbeddingsResponse` schemas. | -| `row-embeddings` | Queries structured data rows by text similarity on indexed fields. Request/response uses `RowEmbeddingsRequest`/`RowEmbeddingsResponse` schemas. | - -These join the existing `graph-embeddings` dispatcher (which was already -present in v1.8 but may have been updated). - -### Full list of WebSocket flow service dispatchers (v2.1) - -Request/response services (via `/api/v1/flow/{flow}/service/{kind}` or -WebSocket mux): - -- `agent`, `text-completion`, `prompt`, `mcp-tool` -- `graph-rag`, `document-rag` -- `embeddings`, `graph-embeddings`, `document-embeddings` -- `triples`, `rows`, `nlp-query`, `structured-query`, `structured-diag` -- `row-embeddings` - ---- - -## New REST Endpoint - -| Method | Path | Description | -|--------|------|-------------| -| `GET` | `/api/v1/document-stream` | Streams document content from the library as raw bytes. Query parameters: `user` (required), `document-id` (required), `chunk-size` (optional, default 1MB). Returns the document content in chunked transfer encoding, decoded from base64 internally. | - ---- - -## Renamed Service: "objects" to "rows" - -| v1.8 | v2.1 | Notes | -|------|------|-------| -| `objects_query.py` / `ObjectsQueryRequestor` | `rows_query.py` / `RowsQueryRequestor` | Schema changed from `ObjectsQueryRequest`/`ObjectsQueryResponse` to `RowsQueryRequest`/`RowsQueryResponse`. | -| `objects_import.py` / `ObjectsImport` | `rows_import.py` / `RowsImport` | Import dispatcher for structured data. | - -The WebSocket service key changed from `"objects"` to `"rows"`, and the -import dispatcher key similarly changed from `"objects"` to `"rows"`. - ---- - -## Wire Format Change: Value to Term - -The serialization layer (`serialize.py`) was rewritten to use the new `Term` -type instead of the old `Value` type. - -### Old format (v1.8 — `Value`) - -```json -{"v": "http://example.org/entity", "e": true} -``` - -- `v`: the value (string) -- `e`: boolean flag indicating whether the value is a URI - -### New format (v2.1 — `Term`) - -IRIs: -```json -{"t": "i", "i": "http://example.org/entity"} -``` - -Literals: -```json -{"t": "l", "v": "some text", "d": "datatype-uri", "l": "en"} -``` - -Quoted triples (RDF-star): -```json -{"t": "r", "r": {"s": {...}, "p": {...}, "o": {...}}} -``` - -- `t`: type discriminator — `"i"` (IRI), `"l"` (literal), `"r"` (quoted triple), `"b"` (blank node) -- Serialization now delegates to `TermTranslator` and `TripleTranslator` from `trustgraph.messaging.translators.primitives` - -### Other serialization changes - -| Field | v1.8 | v2.1 | -|-------|------|------| -| Metadata | `metadata.metadata` (subgraph) | `metadata.root` (simple value) | -| Graph embeddings entity | `entity.vectors` (plural) | `entity.vector` (singular) | -| Document embeddings chunk | `chunk.vectors` + `chunk.chunk` (text) | `chunk.vector` + `chunk.chunk_id` (ID reference) | - ---- - -## Breaking Changes - -- **`Value` to `Term` wire format**: All clients sending/receiving triples, embeddings, or entity contexts through the gateway must update to the new Term format. -- **`objects` to `rows` rename**: WebSocket service key and import key changed. -- **Metadata field change**: `metadata.metadata` (a serialized subgraph) replaced by `metadata.root` (a simple value). -- **Embeddings field changes**: `vectors` (plural) became `vector` (singular); document embeddings now reference `chunk_id` instead of inline `chunk` text. -- **New `/api/v1/document-stream` endpoint**: Additive, not breaking. diff --git a/docs/api.html b/docs/api.html index 7cbddd32..2a03a38b 100644 --- a/docs/api.html +++ b/docs/api.html @@ -422,7 +422,7 @@ data-styled.g138[id="sc-iJQrDi"]{content:"gtHWGb,"}/*!sc*/ -

TrustGraph API Gateway (2.1)

Download OpenAPI specification:

TrustGraph API Gateway (2.2)

Download OpenAPI specification:

REST API for TrustGraph - an AI-powered knowledge graph and RAG system.

Overview

  • AI services: agent, text-completion, prompt, RAG (document/graph)
  • Embeddings: embeddings, graph-embeddings, document-embeddings
  • -
  • Query: triples, rows, nlp-query, structured-query, row-embeddings
  • +
  • Query: triples, rows, nlp-query, structured-query, sparql-query, row-embeddings
  • Data loading: text-load, document-load
  • Utilities: mcp-tool, structured-diag
  • @@ -784,11 +784,26 @@ for processing and handled asynchronously.

    Stop ongoing library document processing.

    list-processing

    List current processing tasks and their status.

    -
    Authorizations:
    bearerAuth
    Request Body schema: application/json
    required
    operation
    required
    string
    Enum: "add-document" "remove-document" "list-documents" "start-processing" "stop-processing" "list-processing"
    Authorizations:
    bearerAuth
    Request Body schema: application/json
    required
    operation
    required
    string
    Enum: "add-document" "remove-document" "list-documents" "get-document-metadata" "get-document-content" "stream-document" "add-child-document" "list-children" "begin-upload" "upload-chunk" "complete-upload" "abort-upload" "get-upload-status" "list-uploads" "start-processing" "stop-processing" "list-processing"

    Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "question": "What is the capital of France?",
    • "user": "alice"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "chunk-type": "thought",
    • "content": "I need to search for information about quantum computing",
    • "end-of-message": false,
    • "end-of-dialog": false
    }

    Document RAG - retrieve and generate from documents

    http://localhost:8088/api/v1/flow/{flow}/service/agent

    Request samples

    Content type
    application/json
    Example
    {
    • "question": "What is the capital of France?",
    • "user": "alice"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "chunk-type": "thought",
    • "content": "I need to search for information about quantum computing",
    • "end-of-message": false,
    • "end-of-dialog": false
    }

    Document RAG - retrieve and generate from documents

    Streaming

    Enable streaming: true to receive the answer as it's generated:

      -
    • Multiple messages with response content
    • +
    • Multiple chunk messages with response content
    • +
    • explain messages with inline provenance triples (explain_triples)
    • Final message with end-of-stream: true
    • +
    • Session ends with end_of_session: true
    +

    Explain events carry explain_id, explain_graph, and explain_triples +inline in the stream, so no follow-up knowledge graph query is needed.

    Without streaming, returns complete answer in single response.

    Parameters

      @@ -1216,7 +1256,7 @@ Each step has: thought, action, arguments, observation.

      " class="sc-iKGpAq sc-cCYyou sc-cjERFZ dXXcln fTBBlJ dkmSdy">

      Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "What are the key findings in the research papers?",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "response": "The research papers present three key findings:\n1. Quantum entanglement exhibits non-local correlations\n2. Bell's inequality is violated in experimental tests\n3. Applications in quantum cryptography are promising\n",
    • "end-of-stream": false
    }

    Graph RAG - retrieve and generate from knowledge graph

    http://localhost:8088/api/v1/flow/{flow}/service/document-rag

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "What are the key findings in the research papers?",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "response": "The research papers present three key findings:\n1. Quantum entanglement exhibits non-local correlations\n2. Bell's inequality is violated in experimental tests\n3. Applications in quantum cryptography are promising\n",
    • "end-of-stream": false
    }

    Graph RAG - retrieve and generate from knowledge graph

    Streaming

    Enable streaming: true to receive the answer as it's generated:

      -
    • Multiple messages with response content
    • +
    • Multiple chunk messages with response content
    • +
    • explain messages with inline provenance triples (explain_triples)
    • Final message with end-of-stream: true
    • +
    • Session ends with end_of_session: true
    +

    Explain events carry explain_id, explain_graph, and explain_triples +inline in the stream, so no follow-up knowledge graph query is needed.

    Without streaming, returns complete answer in single response.

    Parameters

    Control retrieval scope with multiple knobs:

    @@ -1332,7 +1380,7 @@ Each step has: thought, action, arguments, observation.

    " class="sc-iKGpAq sc-cCYyou sc-cjERFZ dXXcln fTBBlJ dkmSdy">

    Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "What connections exist between quantum physics and computer science?",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "response": "Quantum physics and computer science intersect primarily through quantum computing.\nThe knowledge graph shows connections through:\n- Quantum algorithms (Shor's algorithm, Grover's algorithm)\n- Quantum information theory\n- Computational complexity theory\n",
    • "end-of-stream": false
    }

    Text completion - direct LLM generation

    http://localhost:8088/api/v1/flow/{flow}/service/graph-rag

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "What connections exist between quantum physics and computer science?",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    Example
    {
    • "response": "Quantum physics and computer science intersect primarily through quantum computing.\nThe knowledge graph shows connections through:\n- Quantum algorithms (Shor's algorithm, Grover's algorithm)\n- Quantum information theory\n- Computational complexity theory\n",
    • "end-of-stream": false
    }

    Text completion - direct LLM generation

    Text Load Overview

    Fire-and-forget document loading:

      -
    • Input: Text content (base64 encoded)
    • +
    • Input: Text content (raw UTF-8 or base64 encoded)
    • Process: Chunk, embed, store
    • Output: None (202 Accepted)
    @@ -2762,7 +2815,12 @@ encoded <span class="token operator">=</span> base64<sp

    Pipeline runs asynchronously after request returns.

    Text Format

    -

    Text must be base64 encoded:

    +

    Text may be sent as raw UTF-8 text:

    +
    {
    +  "text": "Cancer survival: 2.74× higher hazard ratio"
    +}
    +
    +

    Older clients may still send base64 encoded text:

    text_content = "This is the document..."
     encoded = base64.b64encode(text_content.encode('utf-8'))
     
    @@ -2792,8 +2850,8 @@ encoded = base64
    Authorizations:
    bearerAuth
    path Parameters
    flow
    required
    string
    Example: my-flow

    Flow instance ID

    -
    Request Body schema: application/json
    required
    text
    required
    string <byte>

    Text content (base64 encoded)

    +
    Request Body schema: application/json
    required
    text
    required
    string

    Text content, either raw text or base64 encoded for compatibility with older clients

    id
    string

    Document identifier

    user
    string
    Default: "trustgraph"
    = base64

    Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "text": "VGhpcyBpcyB0aGUgZG9jdW1lbnQgdGV4dC4uLg==",
    • "id": "doc-123",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    { }

    Document Load - load binary documents (PDF, etc.)

    http://localhost:8088/api/v1/flow/{flow}/service/text-load

    Request samples

    Content type
    application/json
    Example
    {
    • "text": "This is the document text...",
    • "id": "doc-123",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    { }

    Document Load - load binary documents (PDF, etc.)

    = base64

    Error response

    Request samples

    Content type
    application/json
    Example
    {
    • "data": "JVBERi0xLjQKJeLjz9MKMSAwIG9iago8PC9UeXBlL0NhdGFsb2cvUGFnZXMgMiAwIFI+PmVuZG9iagoyIDAgb2JqCjw8L1R5cGUvUGFnZXMvS2lkc1szIDAgUl0vQ291bnQgMT4+ZW5kb2JqCg==",
    • "id": "doc-789",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    { }

    Import/Export

    http://localhost:8088/api/v1/flow/{flow}/service/document-load

    Request samples

    Content type
    application/json
    Example
    {
    • "data": "JVBERi0xLjQKJeLjz9MKMSAwIG9iago8PC9UeXBlL0NhdGFsb2cvUGFnZXMgMiAwIFI+PmVuZG9iagoyIDAgb2JqCjw8L1R5cGUvUGFnZXMvS2lkc1szIDAgUl0vQ291bnQgMT4+ZW5kb2JqCg==",
    • "id": "doc-789",
    • "user": "alice",
    • "collection": "research"
    }

    Response samples

    Content type
    application/json
    { }

    SPARQL query - execute SPARQL 1.1 queries against the knowledge graph

    Execute a SPARQL 1.1 query against the knowledge graph.

    +

    Supported Query Types

    +
      +
    • SELECT: Returns variable bindings as a table of results
    • +
    • ASK: Returns true/false for existence checks
    • +
    • CONSTRUCT: Returns a set of triples built from a template
    • +
    • DESCRIBE: Returns triples describing matched resources
    • +
    +

    SPARQL Features

    +

    Supports standard SPARQL 1.1 features including:

    +
      +
    • Basic Graph Patterns (BGPs) with triple pattern matching
    • +
    • OPTIONAL, UNION, FILTER
    • +
    • BIND, VALUES
    • +
    • ORDER BY, LIMIT, OFFSET, DISTINCT
    • +
    • GROUP BY with aggregates (COUNT, SUM, AVG, MIN, MAX, GROUP_CONCAT)
    • +
    • Built-in functions (isIRI, STR, REGEX, CONTAINS, etc.)
    • +
    +

    Query Examples

    +

    Find all entities of a type:

    +
    SELECT ?s ?label WHERE {
    +  ?s <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.com/Person> .
    +  ?s <http://www.w3.org/2000/01/rdf-schema#label> ?label .
    +}
    +LIMIT 10
    +
    +

    Check if an entity exists:

    +
    ASK { <http://example.com/alice> ?p ?o }
    +
    +
    Authorizations:
    bearerAuth
    path Parameters
    flow
    required
    string
    Example: my-flow

    Flow instance ID

    +
    Request Body schema: application/json
    required
    query
    required
    string

    SPARQL 1.1 query string

    +
    user
    string
    Default: "trustgraph"

    User/keyspace identifier

    +
    collection
    string
    Default: "default"

    Collection identifier

    +
    limit
    integer
    Default: 10000

    Safety limit on number of results

    +

    Responses

    Request samples

    Content type
    application/json
    Example
    {
    • "query": "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10",
    • "user": "trustgraph",
    • "collection": "default"
    }

    Response samples

    Content type
    application/json
    Example
    {}

    Import/Export

    Bulk data import and export

    Stream document content from library

    Local development server

    http://localhost:8088/api/metrics/{path}

    Response samples

    Content type
    application/json
    {
    • "error": "Unauthorized"
    }