# TrustGraph Python API Reference ## Installation ```bash pip install trustgraph ``` ## Quick Start All classes and types are imported from the `trustgraph.api` package: ```python from trustgraph.api import Api, Triple, ConfigKey # Create API client api = Api(url="http://localhost:8088/") # Get a flow instance flow = api.flow().id("default") # Execute a graph RAG query response = flow.graph_rag( query="What are the main topics?", user="trustgraph", collection="default" ) ``` ## Table of Contents ### Core - [Api](#api) ### Flow Clients - [Flow](#flow) - [FlowInstance](#flowinstance) - [AsyncFlow](#asyncflow) - [AsyncFlowInstance](#asyncflowinstance) ### WebSocket Clients - [SocketClient](#socketclient) - [SocketFlowInstance](#socketflowinstance) - [AsyncSocketClient](#asyncsocketclient) - [AsyncSocketFlowInstance](#asyncsocketflowinstance) ### Bulk Operations - [BulkClient](#bulkclient) - [AsyncBulkClient](#asyncbulkclient) ### Metrics - [Metrics](#metrics) - [AsyncMetrics](#asyncmetrics) ### Data Types - [Triple](#triple) - [ConfigKey](#configkey) - [ConfigValue](#configvalue) - [DocumentMetadata](#documentmetadata) - [ProcessingMetadata](#processingmetadata) - [CollectionMetadata](#collectionmetadata) - [StreamingChunk](#streamingchunk) - [AgentThought](#agentthought) - [AgentObservation](#agentobservation) - [AgentAnswer](#agentanswer) - [RAGChunk](#ragchunk) ### Exceptions - [ProtocolException](#protocolexception) - [TrustGraphException](#trustgraphexception) - [AgentError](#agenterror) - [ConfigError](#configerror) - [DocumentRagError](#documentragerror) - [FlowError](#flowerror) - [GatewayError](#gatewayerror) - [GraphRagError](#graphragerror) - [LLMError](#llmerror) - [LoadError](#loaderror) - [LookupError](#lookuperror) - [NLPQueryError](#nlpqueryerror) - [RowsQueryError](#rowsqueryerror) - [RequestError](#requesterror) - [StructuredQueryError](#structuredqueryerror) - [UnexpectedError](#unexpectederror) - [ApplicationException](#applicationexception) --- ## `Api` ```python from trustgraph.api import Api ``` Main TrustGraph API client for synchronous and asynchronous operations. This class provides access to all TrustGraph services including flow management, knowledge graph operations, document processing, RAG queries, and more. It supports both REST-based and WebSocket-based communication patterns. The client can be used as a context manager for automatic resource cleanup: ```python with Api(url="http://localhost:8088/") as api: result = api.flow().id("default").graph_rag(query="test") ``` ### Methods ### `__aenter__(self)` Enter asynchronous context manager. ### `__aexit__(self, *args)` Exit asynchronous context manager and close connections. ### `__enter__(self)` Enter synchronous context manager. ### `__exit__(self, *args)` Exit synchronous context manager and close connections. ### `__init__(self, url='http://localhost:8088/', timeout=60, token: str | None = None)` Initialize the TrustGraph API client. **Arguments:** - `url`: Base URL for TrustGraph API (default: "http://localhost:8088/") - `timeout`: Request timeout in seconds (default: 60) - `token`: Optional bearer token for authentication **Example:** ```python # Local development api = Api() # Production with authentication api = Api( url="https://trustgraph.example.com/", timeout=120, token="your-api-token" ) ``` ### `aclose(self)` Close all asynchronous client connections. This method closes async WebSocket, bulk operation, and flow connections. It is automatically called when exiting an async context manager. **Example:** ```python api = Api() async_socket = api.async_socket() # ... use async_socket await api.aclose() # Clean up connections # Or use async context manager (automatic cleanup) async with Api() as api: async_socket = api.async_socket() # ... use async_socket # Automatically closed ``` ### `async_bulk(self)` Get an asynchronous bulk operations client. Provides async/await style bulk import/export operations via WebSocket for efficient handling of large datasets. **Returns:** AsyncBulkClient: Asynchronous bulk operations client **Example:** ```python async_bulk = api.async_bulk() # Export triples asynchronously async for triple in async_bulk.export_triples(flow="default"): print(f"{triple.s} {triple.p} {triple.o}") # Import with async generator async def triple_gen(): yield Triple(s="subj", p="pred", o="obj") # ... more triples await async_bulk.import_triples( flow="default", triples=triple_gen() ) ``` ### `async_flow(self)` Get an asynchronous REST-based flow client. Provides async/await style access to flow operations. This is preferred for async Python applications and frameworks (FastAPI, aiohttp, etc.). **Returns:** AsyncFlow: Asynchronous flow client **Example:** ```python async_flow = api.async_flow() # List flows flow_ids = await async_flow.list() # Execute operations instance = async_flow.id("default") result = await instance.text_completion( system="You are helpful", prompt="Hello" ) ``` ### `async_metrics(self)` Get an asynchronous metrics client. Provides async/await style access to Prometheus metrics. **Returns:** AsyncMetrics: Asynchronous metrics client **Example:** ```python async_metrics = api.async_metrics() prometheus_text = await async_metrics.get() print(prometheus_text) ``` ### `async_socket(self)` Get an asynchronous WebSocket client for streaming operations. Provides async/await style WebSocket access with streaming support. This is the preferred method for async streaming in Python. **Returns:** AsyncSocketClient: Asynchronous WebSocket client **Example:** ```python async_socket = api.async_socket() flow = async_socket.flow("default") # Stream agent responses async for chunk in flow.agent( question="Explain quantum computing", user="trustgraph", streaming=True ): if hasattr(chunk, 'content'): print(chunk.content, end='', flush=True) ``` ### `bulk(self)` Get a synchronous bulk operations client for import/export. Bulk operations allow efficient transfer of large datasets via WebSocket connections, including triples, embeddings, entity contexts, and objects. **Returns:** BulkClient: Synchronous bulk operations client **Example:** ```python bulk = api.bulk() # Export triples for triple in bulk.export_triples(flow="default"): print(f"{triple.s} {triple.p} {triple.o}") # Import triples def triple_generator(): yield Triple(s="subj", p="pred", o="obj") # ... more triples bulk.import_triples(flow="default", triples=triple_generator()) ``` ### `close(self)` Close all synchronous client connections. This method closes WebSocket and bulk operation connections. It is automatically called when exiting a context manager. **Example:** ```python api = Api() socket = api.socket() # ... use socket api.close() # Clean up connections # Or use context manager (automatic cleanup) with Api() as api: socket = api.socket() # ... use socket # Automatically closed ``` ### `collection(self)` Get a Collection client for managing data collections. Collections organize documents and knowledge graph data into logical groupings for isolation and access control. **Returns:** Collection: Collection management client **Example:** ```python collection = api.collection() # List collections colls = collection.list_collections(user="trustgraph") # Update collection metadata collection.update_collection( user="trustgraph", collection="default", name="Default Collection", description="Main data collection" ) ``` ### `config(self)` Get a Config client for managing configuration settings. **Returns:** Config: Configuration management client **Example:** ```python config = api.config() # Get configuration values values = config.get([ConfigKey(type="llm", key="model")]) # Set configuration config.put([ConfigValue(type="llm", key="model", value="gpt-4")]) ``` ### `flow(self)` Get a Flow client for managing and interacting with flows. Flows are the primary execution units in TrustGraph, providing access to services like agents, RAG queries, embeddings, and document processing. **Returns:** Flow: Flow management client **Example:** ```python flow_client = api.flow() # List available blueprints blueprints = flow_client.list_blueprints() # Get a specific flow instance flow_instance = flow_client.id("default") response = flow_instance.text_completion( system="You are helpful", prompt="Hello" ) ``` ### `knowledge(self)` Get a Knowledge client for managing knowledge graph cores. **Returns:** Knowledge: Knowledge graph management client **Example:** ```python knowledge = api.knowledge() # List available KG cores cores = knowledge.list_kg_cores(user="trustgraph") # Load a KG core knowledge.load_kg_core(id="core-123", user="trustgraph") ``` ### `library(self)` Get a Library client for document management. The library provides document storage, metadata management, and processing workflow coordination. **Returns:** Library: Document library management client **Example:** ```python library = api.library() # Add a document library.add_document( document=b"Document content", id="doc-123", metadata=[], user="trustgraph", title="My Document", comments="Test document" ) # List documents docs = library.get_documents(user="trustgraph") ``` ### `metrics(self)` Get a synchronous metrics client for monitoring. Retrieves Prometheus-formatted metrics from the TrustGraph service for monitoring and observability. **Returns:** Metrics: Synchronous metrics client **Example:** ```python metrics = api.metrics() prometheus_text = metrics.get() print(prometheus_text) ``` ### `request(self, path, request)` Make a low-level REST API request. This method is primarily for internal use but can be used for direct API access when needed. **Arguments:** - `path`: API endpoint path (relative to base URL) - `request`: Request payload as a dictionary **Returns:** dict: Response object **Raises:** - `ProtocolException`: If the response status is not 200 or response is not JSON - `ApplicationException`: If the response contains an error **Example:** ```python response = api.request("flow", { "operation": "list-flows" }) ``` ### `socket(self)` Get a synchronous WebSocket client for streaming operations. WebSocket connections provide streaming support for real-time responses from agents, RAG queries, and text completions. This method returns a synchronous wrapper around the WebSocket protocol. **Returns:** SocketClient: Synchronous WebSocket client **Example:** ```python socket = api.socket() flow = socket.flow("default") # Stream agent responses for chunk in flow.agent( question="Explain quantum computing", user="trustgraph", streaming=True ): if hasattr(chunk, 'content'): print(chunk.content, end='', flush=True) ``` --- ## `Flow` ```python from trustgraph.api import Flow ``` Flow management client for blueprint and flow instance operations. This class provides methods for managing flow blueprints (templates) and flow instances (running flows). Blueprints define the structure and parameters of flows, while instances represent active flows that can execute services. ### Methods ### `__init__(self, api)` Initialize Flow client. **Arguments:** - `api`: Parent Api instance for making requests ### `delete_blueprint(self, blueprint_name)` Delete a flow blueprint. **Arguments:** - `blueprint_name`: Name of the blueprint to delete **Example:** ```python api.flow().delete_blueprint("old-blueprint") ``` ### `get(self, id)` Get the definition of a running flow instance. **Arguments:** - `id`: Flow instance ID **Returns:** dict: Flow instance definition **Example:** ```python flow_def = api.flow().get("default") print(flow_def) ``` ### `get_blueprint(self, blueprint_name)` Get a flow blueprint definition by name. **Arguments:** - `blueprint_name`: Name of the blueprint to retrieve **Returns:** dict: Blueprint definition as a dictionary **Example:** ```python blueprint = api.flow().get_blueprint("default") print(blueprint) # Blueprint configuration ``` ### `id(self, id='default')` Get a FlowInstance for executing operations on a specific flow. **Arguments:** - `id`: Flow identifier (default: "default") **Returns:** FlowInstance: Flow instance for service operations **Example:** ```python flow = api.flow().id("my-flow") response = flow.text_completion( system="You are helpful", prompt="Hello" ) ``` ### `list(self)` List all active flow instances. **Returns:** list[str]: List of flow instance IDs **Example:** ```python flows = api.flow().list() print(flows) # ['default', 'flow-1', 'flow-2', ...] ``` ### `list_blueprints(self)` List all available flow blueprints. **Returns:** list[str]: List of blueprint names **Example:** ```python blueprints = api.flow().list_blueprints() print(blueprints) # ['default', 'custom-flow', ...] ``` ### `put_blueprint(self, blueprint_name, definition)` Create or update a flow blueprint. **Arguments:** - `blueprint_name`: Name for the blueprint - `definition`: Blueprint definition dictionary **Example:** ```python definition = { "services": ["text-completion", "graph-rag"], "parameters": {"model": "gpt-4"} } api.flow().put_blueprint("my-blueprint", definition) ``` ### `request(self, path=None, request=None)` Make a flow-scoped API request. **Arguments:** - `path`: Optional path suffix for flow endpoints - `request`: Request payload dictionary **Returns:** dict: Response object **Raises:** - `RuntimeError`: If request parameter is not specified ### `start(self, blueprint_name, id, description, parameters=None)` Start a new flow instance from a blueprint. **Arguments:** - `blueprint_name`: Name of the blueprint to instantiate - `id`: Unique identifier for the flow instance - `description`: Human-readable description - `parameters`: Optional parameters dictionary **Example:** ```python api.flow().start( blueprint_name="default", id="my-flow", description="My custom flow", parameters={"model": "gpt-4"} ) ``` ### `stop(self, id)` Stop a running flow instance. **Arguments:** - `id`: Flow instance ID to stop **Example:** ```python api.flow().stop("my-flow") ``` --- ## `FlowInstance` ```python from trustgraph.api import FlowInstance ``` Flow instance client for executing services on a specific flow. This class provides access to all TrustGraph services including: - Text completion and embeddings - Agent operations with state management - Graph and document RAG queries - Knowledge graph operations (triples, objects) - Document loading and processing - Natural language to GraphQL query conversion - Structured data analysis and schema detection - MCP tool execution - Prompt templating Services are accessed through a running flow instance identified by ID. ### Methods ### `__init__(self, api, id)` Initialize FlowInstance. **Arguments:** - `api`: Parent Flow client - `id`: Flow instance identifier ### `agent(self, question, user='trustgraph', state=None, group=None, history=None)` Execute an agent operation with reasoning and tool use capabilities. Agents can perform multi-step reasoning, use tools, and maintain conversation state across interactions. This is a synchronous non-streaming version. **Arguments:** - `question`: User question or instruction - `user`: User identifier (default: "trustgraph") - `state`: Optional state dictionary for stateful conversations - `group`: Optional group identifier for multi-user contexts - `history`: Optional conversation history as list of message dicts **Returns:** str: Agent's final answer **Example:** ```python flow = api.flow().id("default") # Simple question answer = flow.agent( question="What is the capital of France?", user="trustgraph" ) # With conversation history history = [ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi! How can I help?"} ] answer = flow.agent( question="Tell me about Paris", user="trustgraph", history=history ) ``` ### `detect_type(self, sample)` Detect the data type of a structured data sample. **Arguments:** - `sample`: Data sample to analyze (string content) **Returns:** dict with detected_type, confidence, and optional metadata ### `diagnose_data(self, sample, schema_name=None, options=None)` Perform combined data diagnosis: detect type and generate descriptor. **Arguments:** - `sample`: Data sample to analyze (string content) - `schema_name`: Optional target schema name for descriptor generation - `options`: Optional parameters (e.g., delimiter for CSV) **Returns:** dict with detected_type, confidence, descriptor, and metadata ### `document_embeddings_query(self, text, user, collection, limit=10)` Query document chunks using semantic similarity. Finds document chunks whose content is semantically similar to the input text, using vector embeddings. **Arguments:** - `text`: Query text for semantic search - `user`: User/keyspace identifier - `collection`: Collection identifier - `limit`: Maximum number of results (default: 10) **Returns:** dict: Query results with chunks containing chunk_id and score **Example:** ```python flow = api.flow().id("default") results = flow.document_embeddings_query( text="machine learning algorithms", user="trustgraph", collection="research-papers", limit=5 ) # results contains {"chunks": [{"chunk_id": "doc1/p0/c0", "score": 0.95}, ...]} ``` ### `document_rag(self, query, user='trustgraph', collection='default', doc_limit=10)` Execute document-based Retrieval-Augmented Generation (RAG) query. Document RAG uses vector embeddings to find relevant document chunks, then generates a response using an LLM with those chunks as context. **Arguments:** - `query`: Natural language query - `user`: User/keyspace identifier (default: "trustgraph") - `collection`: Collection identifier (default: "default") - `doc_limit`: Maximum document chunks to retrieve (default: 10) **Returns:** str: Generated response incorporating document context **Example:** ```python flow = api.flow().id("default") response = flow.document_rag( query="Summarize the key findings", user="trustgraph", collection="research-papers", doc_limit=5 ) print(response) ``` ### `embeddings(self, texts)` Generate vector embeddings for one or more texts. Converts texts into dense vector representations suitable for semantic search and similarity comparison. **Arguments:** - `texts`: List of input texts to embed **Returns:** list[list[list[float]]]: Vector embeddings, one set per input text **Example:** ```python flow = api.flow().id("default") vectors = flow.embeddings(["quantum computing"]) print(f"Embedding dimension: {len(vectors[0][0])}") ``` ### `generate_descriptor(self, sample, data_type, schema_name, options=None)` Generate a descriptor for structured data mapping to a specific schema. **Arguments:** - `sample`: Data sample to analyze (string content) - `data_type`: Data type (csv, json, xml) - `schema_name`: Target schema name for descriptor generation - `options`: Optional parameters (e.g., delimiter for CSV) **Returns:** dict with descriptor and metadata ### `graph_embeddings_query(self, text, user, collection, limit=10)` Query knowledge graph entities using semantic similarity. Finds entities in the knowledge graph whose descriptions are semantically similar to the input text, using vector embeddings. **Arguments:** - `text`: Query text for semantic search - `user`: User/keyspace identifier - `collection`: Collection identifier - `limit`: Maximum number of results (default: 10) **Returns:** dict: Query results with similar entities **Example:** ```python flow = api.flow().id("default") results = flow.graph_embeddings_query( text="physicist who discovered radioactivity", user="trustgraph", collection="scientists", limit=5 ) # results contains {"entities": [{"entity": {...}, "score": 0.95}, ...]} ``` ### `graph_rag(self, query, user='trustgraph', collection='default', entity_limit=50, triple_limit=30, max_subgraph_size=150, max_path_length=2, edge_score_limit=30, edge_limit=25)` Execute graph-based Retrieval-Augmented Generation (RAG) query. Graph RAG uses knowledge graph structure to find relevant context by traversing entity relationships, then generates a response using an LLM. **Arguments:** - `query`: Natural language query - `user`: User/keyspace identifier (default: "trustgraph") - `collection`: Collection identifier (default: "default") - `entity_limit`: Maximum entities to retrieve (default: 50) - `triple_limit`: Maximum triples per entity (default: 30) - `max_subgraph_size`: Maximum total triples in subgraph (default: 150) - `max_path_length`: Maximum traversal depth (default: 2) - `edge_score_limit`: Max edges for semantic pre-filter (default: 50) - `edge_limit`: Max edges after LLM scoring (default: 25) **Returns:** str: Generated response incorporating graph context **Example:** ```python flow = api.flow().id("default") response = flow.graph_rag( query="Tell me about Marie Curie's discoveries", user="trustgraph", collection="scientists", entity_limit=20, max_path_length=3 ) print(response) ``` ### `load_document(self, document, id=None, metadata=None, user=None, collection=None)` Load a binary document for processing. Uploads a document (PDF, DOCX, images, etc.) for extraction and processing through the flow's document pipeline. **Arguments:** - `document`: Document content as bytes - `id`: Optional document identifier (auto-generated if None) - `metadata`: Optional metadata (list of Triples or object with emit method) - `user`: User/keyspace identifier (optional) - `collection`: Collection identifier (optional) **Returns:** dict: Processing response **Raises:** - `RuntimeError`: If metadata is provided without id **Example:** ```python flow = api.flow().id("default") # Load a PDF document with open("research.pdf", "rb") as f: result = flow.load_document( document=f.read(), id="research-001", user="trustgraph", collection="papers" ) ``` ### `load_text(self, text, id=None, metadata=None, charset='utf-8', user=None, collection=None)` Load text content for processing. Uploads text content for extraction and processing through the flow's text pipeline. **Arguments:** - `text`: Text content as bytes - `id`: Optional document identifier (auto-generated if None) - `metadata`: Optional metadata (list of Triples or object with emit method) - `charset`: Character encoding (default: "utf-8") - `user`: User/keyspace identifier (optional) - `collection`: Collection identifier (optional) **Returns:** dict: Processing response **Raises:** - `RuntimeError`: If metadata is provided without id **Example:** ```python flow = api.flow().id("default") # Load text content text_content = b"This is the document content..." result = flow.load_text( text=text_content, id="text-001", charset="utf-8", user="trustgraph", collection="documents" ) ``` ### `mcp_tool(self, name, parameters={})` Execute a Model Context Protocol (MCP) tool. MCP tools provide extensible functionality for agents and workflows, allowing integration with external systems and services. **Arguments:** - `name`: Tool name/identifier - `parameters`: Tool parameters dictionary (default: {}) **Returns:** str or dict: Tool execution result **Raises:** - `ProtocolException`: If response format is invalid **Example:** ```python flow = api.flow().id("default") # Execute a tool result = flow.mcp_tool( name="search-web", parameters={"query": "latest AI news", "limit": 5} ) ``` ### `nlp_query(self, question, max_results=100)` Convert a natural language question to a GraphQL query. **Arguments:** - `question`: Natural language question - `max_results`: Maximum number of results to return (default: 100) **Returns:** dict with graphql_query, variables, detected_schemas, confidence ### `prompt(self, id, variables)` Execute a prompt template with variable substitution. Prompt templates allow reusable prompt patterns with dynamic variable substitution, useful for consistent prompt engineering. **Arguments:** - `id`: Prompt template identifier - `variables`: Dictionary of variable name to value mappings **Returns:** str or dict: Rendered prompt result (text or structured object) **Raises:** - `ProtocolException`: If response format is invalid **Example:** ```python flow = api.flow().id("default") # Text template result = flow.prompt( id="summarize-template", variables={"topic": "quantum computing", "length": "brief"} ) # Structured template result = flow.prompt( id="extract-entities", variables={"text": "Marie Curie won Nobel Prizes"} ) ``` ### `request(self, path, request)` Make a service request on this flow instance. **Arguments:** - `path`: Service path (e.g., "service/text-completion") - `request`: Request payload dictionary **Returns:** dict: Service response ### `row_embeddings_query(self, text, schema_name, user='trustgraph', collection='default', index_name=None, limit=10)` Query row data using semantic similarity on indexed fields. Finds rows whose indexed field values are semantically similar to the input text, using vector embeddings. This enables fuzzy/semantic matching on structured data. **Arguments:** - `text`: Query text for semantic search - `schema_name`: Schema name to search within - `user`: User/keyspace identifier (default: "trustgraph") - `collection`: Collection identifier (default: "default") - `index_name`: Optional index name to filter search to specific index - `limit`: Maximum number of results (default: 10) **Returns:** dict: Query results with matches containing index_name, index_value, text, and score **Example:** ```python flow = api.flow().id("default") # Search for customers by name similarity results = flow.row_embeddings_query( text="John Smith", schema_name="customers", user="trustgraph", collection="sales", limit=5 ) # Filter to specific index results = flow.row_embeddings_query( text="machine learning engineer", schema_name="employees", index_name="job_title", limit=10 ) ``` ### `rows_query(self, query, user='trustgraph', collection='default', variables=None, operation_name=None)` Execute a GraphQL query against structured rows in the knowledge graph. Queries structured data using GraphQL syntax, allowing complex queries with filtering, aggregation, and relationship traversal. **Arguments:** - `query`: GraphQL query string - `user`: User/keyspace identifier (default: "trustgraph") - `collection`: Collection identifier (default: "default") - `variables`: Optional query variables dictionary - `operation_name`: Optional operation name for multi-operation documents **Returns:** dict: GraphQL response with 'data', 'errors', and/or 'extensions' fields **Raises:** - `ProtocolException`: If system-level error occurs **Example:** ```python flow = api.flow().id("default") # Simple query query = ''' { scientists(limit: 10) { name field discoveries } } ''' result = flow.rows_query( query=query, user="trustgraph", collection="scientists" ) # Query with variables query = ''' query GetScientist($name: String!) { scientists(name: $name) { name nobelPrizes } } ''' result = flow.rows_query( query=query, variables={"name": "Marie Curie"} ) ``` ### `schema_selection(self, sample, options=None)` Select matching schemas for a data sample using prompt analysis. **Arguments:** - `sample`: Data sample to analyze (string content) - `options`: Optional parameters **Returns:** dict with schema_matches array and metadata ### `sparql_query(self, query, user='trustgraph', collection='default', limit=10000)` Execute a SPARQL query against the knowledge graph. **Arguments:** - `query`: SPARQL 1.1 query string - `user`: User/keyspace identifier (default: "trustgraph") - `collection`: Collection identifier (default: "default") - `limit`: Safety limit on results (default: 10000) **Returns:** dict with query results. Structure depends on query type: - SELECT: {"query-type": "select", "variables": [...], "bindings": [...]} - ASK: {"query-type": "ask", "ask-result": bool} - CONSTRUCT/DESCRIBE: {"query-type": "construct", "triples": [...]} **Raises:** - `ProtocolException`: If an error occurs ### `structured_query(self, question, user='trustgraph', collection='default')` Execute a natural language question against structured data. Combines NLP query conversion and GraphQL execution. **Arguments:** - `question`: Natural language question - `user`: Cassandra keyspace identifier (default: "trustgraph") - `collection`: Data collection identifier (default: "default") **Returns:** dict with data and optional errors ### `text_completion(self, system, prompt)` Execute text completion using the flow's LLM. **Arguments:** - `system`: System prompt defining the assistant's behavior - `prompt`: User prompt/question **Returns:** str: Generated response text **Example:** ```python flow = api.flow().id("default") response = flow.text_completion( system="You are a helpful assistant", prompt="What is quantum computing?" ) print(response) ``` ### `triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=10000)` Query knowledge graph triples using pattern matching. Searches for RDF triples matching the given subject, predicate, and/or object patterns. Unspecified parameters act as wildcards. **Arguments:** - `s`: Subject URI (optional, use None for wildcard) - `p`: Predicate URI (optional, use None for wildcard) - `o`: Object URI or Literal (optional, use None for wildcard) - `user`: User/keyspace identifier (optional) - `collection`: Collection identifier (optional) - `limit`: Maximum results to return (default: 10000) **Returns:** list[Triple]: List of matching Triple objects **Raises:** - `RuntimeError`: If s or p is not a Uri, or o is not Uri/Literal **Example:** ```python from trustgraph.knowledge import Uri, Literal flow = api.flow().id("default") # Find all triples about a specific subject triples = flow.triples_query( s=Uri("http://example.org/person/marie-curie"), user="trustgraph", collection="scientists" ) # Find all instances of a specific relationship triples = flow.triples_query( p=Uri("http://example.org/ontology/discovered"), limit=100 ) ``` --- ## `AsyncFlow` ```python from trustgraph.api import AsyncFlow ``` Asynchronous flow management client using REST API. Provides async/await based flow management operations including listing, starting, stopping flows, and managing flow class definitions. Also provides access to flow-scoped services like agents, RAG, and queries via non-streaming REST endpoints. Note: For streaming support, use AsyncSocketClient instead. ### Methods ### `__init__(self, url: str, timeout: int, token: str | None) -> None` Initialize async flow client. **Arguments:** - `url`: Base URL for TrustGraph API - `timeout`: Request timeout in seconds - `token`: Optional bearer token for authentication ### `aclose(self) -> None` Close async client and cleanup resources. Note: Cleanup is handled automatically by aiohttp session context managers. This method is provided for consistency with other async clients. ### `delete_class(self, class_name: str)` Delete a flow class definition. Removes a flow class blueprint from the system. Does not affect running flow instances. **Arguments:** - `class_name`: Flow class name to delete **Example:** ```python async_flow = await api.async_flow() # Delete a flow class await async_flow.delete_class("old-flow-class") ``` ### `get(self, id: str) -> Dict[str, Any]` Get flow definition. Retrieves the complete flow configuration including its class name, description, and parameters. **Arguments:** - `id`: Flow identifier **Returns:** dict: Flow definition object **Example:** ```python async_flow = await api.async_flow() # Get flow definition flow_def = await async_flow.get("default") print(f"Flow class: {flow_def.get('class-name')}") print(f"Description: {flow_def.get('description')}") ``` ### `get_class(self, class_name: str) -> Dict[str, Any]` Get flow class definition. Retrieves the blueprint definition for a flow class, including its configuration schema and service bindings. **Arguments:** - `class_name`: Flow class name **Returns:** dict: Flow class definition object **Example:** ```python async_flow = await api.async_flow() # Get flow class definition class_def = await async_flow.get_class("default") print(f"Services: {class_def.get('services')}") ``` ### `id(self, flow_id: str)` Get an async flow instance client. Returns a client for interacting with a specific flow's services (agent, RAG, queries, embeddings, etc.). **Arguments:** - `flow_id`: Flow identifier **Returns:** AsyncFlowInstance: Client for flow-specific operations **Example:** ```python async_flow = await api.async_flow() # Get flow instance flow = async_flow.id("default") # Use flow services result = await flow.graph_rag( query="What is TrustGraph?", user="trustgraph", collection="default" ) ``` ### `list(self) -> List[str]` List all flow identifiers. Retrieves IDs of all flows currently deployed in the system. **Returns:** list[str]: List of flow identifiers **Example:** ```python async_flow = await api.async_flow() # List all flows flows = await async_flow.list() print(f"Available flows: {flows}") ``` ### `list_classes(self) -> List[str]` List all flow class names. Retrieves names of all flow classes (blueprints) available in the system. **Returns:** list[str]: List of flow class names **Example:** ```python async_flow = await api.async_flow() # List available flow classes classes = await async_flow.list_classes() print(f"Available flow classes: {classes}") ``` ### `put_class(self, class_name: str, definition: Dict[str, Any])` Create or update a flow class definition. Stores a flow class blueprint that can be used to instantiate flows. **Arguments:** - `class_name`: Flow class name - `definition`: Flow class definition object **Example:** ```python async_flow = await api.async_flow() # Create a custom flow class class_def = { "services": { "agent": {"module": "agent", "config": {...}}, "graph-rag": {"module": "graph-rag", "config": {...}} } } await async_flow.put_class("custom-flow", class_def) ``` ### `request(self, path: str, request_data: Dict[str, Any]) -> Dict[str, Any]` Make async HTTP POST request to Gateway API. Internal method for making authenticated requests to the TrustGraph API. **Arguments:** - `path`: API endpoint path (relative to base URL) - `request_data`: Request payload dictionary **Returns:** dict: Response object from API **Raises:** - `ProtocolException`: If HTTP status is not 200 or response is not valid JSON - `ApplicationException`: If API returns an error response ### `start(self, class_name: str, id: str, description: str, parameters: Dict | None = None)` Start a new flow instance. Creates and starts a flow from a flow class definition with the specified parameters. **Arguments:** - `class_name`: Flow class name to instantiate - `id`: Identifier for the new flow instance - `description`: Human-readable description of the flow - `parameters`: Optional configuration parameters for the flow **Example:** ```python async_flow = await api.async_flow() # Start a flow from a class await async_flow.start( class_name="default", id="my-flow", description="Custom flow instance", parameters={"model": "claude-3-opus"} ) ``` ### `stop(self, id: str)` Stop a running flow. Stops and removes a flow instance, freeing its resources. **Arguments:** - `id`: Flow identifier to stop **Example:** ```python async_flow = await api.async_flow() # Stop a flow await async_flow.stop("my-flow") ``` --- ## `AsyncFlowInstance` ```python from trustgraph.api import AsyncFlowInstance ``` Asynchronous flow instance client. Provides async/await access to flow-scoped services including agents, RAG queries, embeddings, and graph queries. All operations return complete responses (non-streaming). Note: For streaming support, use AsyncSocketFlowInstance instead. ### Methods ### `__init__(self, flow: trustgraph.api.async_flow.AsyncFlow, flow_id: str)` Initialize async flow instance. **Arguments:** - `flow`: Parent AsyncFlow client - `flow_id`: Flow identifier ### `agent(self, question: str, user: str, state: Dict | None = None, group: str | None = None, history: List | None = None, **kwargs: Any) -> Dict[str, Any]` Execute an agent operation (non-streaming). Runs an agent to answer a question, with optional conversation state and history. Returns the complete response after the agent has finished processing. Note: This method does not support streaming. For real-time agent thoughts and observations, use AsyncSocketFlowInstance.agent() instead. **Arguments:** - `question`: User question or instruction - `user`: User identifier - `state`: Optional state dictionary for conversation context - `group`: Optional group identifier for session management - `history`: Optional conversation history list - `**kwargs`: Additional service-specific parameters **Returns:** dict: Complete agent response including answer and metadata **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Execute agent result = await flow.agent( question="What is the capital of France?", user="trustgraph" ) print(f"Answer: {result.get('response')}") ``` ### `document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, **kwargs: Any) -> str` Execute document-based RAG query (non-streaming). Performs Retrieval-Augmented Generation using document embeddings. Retrieves relevant document chunks via semantic search, then generates a response grounded in the retrieved documents. Returns complete response. Note: This method does not support streaming. For streaming RAG responses, use AsyncSocketFlowInstance.document_rag() instead. **Arguments:** - `query`: User query text - `user`: User identifier - `collection`: Collection identifier containing documents - `doc_limit`: Maximum number of document chunks to retrieve (default: 10) - `**kwargs`: Additional service-specific parameters **Returns:** str: Complete generated response grounded in document data **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Query documents response = await flow.document_rag( query="What does the documentation say about authentication?", user="trustgraph", collection="docs", doc_limit=5 ) print(response) ``` ### `embeddings(self, texts: list, **kwargs: Any)` Generate embeddings for input texts. Converts texts into numerical vector representations using the flow's configured embedding model. Useful for semantic search and similarity comparisons. **Arguments:** - `texts`: List of input texts to embed - `**kwargs`: Additional service-specific parameters **Returns:** dict: Response containing embedding vectors **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Generate embeddings result = await flow.embeddings(texts=["Sample text to embed"]) vectors = result.get("vectors") print(f"Embedding dimension: {len(vectors[0][0])}") ``` ### `graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any)` Query graph embeddings for semantic entity search. Performs semantic search over graph entity embeddings to find entities most relevant to the input text. Returns entities ranked by similarity. **Arguments:** - `text`: Query text for semantic search - `user`: User identifier - `collection`: Collection identifier containing graph embeddings - `limit`: Maximum number of results to return (default: 10) - `**kwargs`: Additional service-specific parameters **Returns:** dict: Response containing ranked entity matches with similarity scores **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Find related entities results = await flow.graph_embeddings_query( text="machine learning algorithms", user="trustgraph", collection="tech-kb", limit=5 ) for entity in results.get("entities", []): print(f"{entity['name']}: {entity['score']}") ``` ### `graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, **kwargs: Any) -> str` Execute graph-based RAG query (non-streaming). Performs Retrieval-Augmented Generation using knowledge graph data. Identifies relevant entities and their relationships, then generates a response grounded in the graph structure. Returns complete response. Note: This method does not support streaming. For streaming RAG responses, use AsyncSocketFlowInstance.graph_rag() instead. **Arguments:** - `query`: User query text - `user`: User identifier - `collection`: Collection identifier containing the knowledge graph - `max_subgraph_size`: Maximum number of triples per subgraph (default: 1000) - `max_subgraph_count`: Maximum number of subgraphs to retrieve (default: 5) - `max_entity_distance`: Maximum graph distance for entity expansion (default: 3) - `**kwargs`: Additional service-specific parameters **Returns:** str: Complete generated response grounded in graph data **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Query knowledge graph response = await flow.graph_rag( query="What are the relationships between these entities?", user="trustgraph", collection="medical-kb", max_subgraph_count=3 ) print(response) ``` ### `request(self, service: str, request_data: Dict[str, Any]) -> Dict[str, Any]` Make request to a flow-scoped service. Internal method for calling services within this flow instance. **Arguments:** - `service`: Service name (e.g., "agent", "graph-rag", "triples") - `request_data`: Service request payload **Returns:** dict: Service response object **Raises:** - `ProtocolException`: If request fails or response is invalid - `ApplicationException`: If service returns an error ### `row_embeddings_query(self, text: str, schema_name: str, user: str = 'trustgraph', collection: str = 'default', index_name: str | None = None, limit: int = 10, **kwargs: Any)` Query row embeddings for semantic search on structured data. Performs semantic search over row index embeddings to find rows whose indexed field values are most similar to the input text. Enables fuzzy/semantic matching on structured data. **Arguments:** - `text`: Query text for semantic search - `schema_name`: Schema name to search within - `user`: User identifier (default: "trustgraph") - `collection`: Collection identifier (default: "default") - `index_name`: Optional index name to filter search to specific index - `limit`: Maximum number of results to return (default: 10) - `**kwargs`: Additional service-specific parameters **Returns:** dict: Response containing matches with index_name, index_value, text, and score **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Search for customers by name similarity results = await flow.row_embeddings_query( text="John Smith", schema_name="customers", user="trustgraph", collection="sales", limit=5 ) for match in results.get("matches", []): print(f"{match['index_name']}: {match['index_value']} (score: {match['score']})") ``` ### `rows_query(self, query: str, user: str, collection: str, variables: Dict | None = None, operation_name: str | None = None, **kwargs: Any)` Execute a GraphQL query on stored rows. Queries structured data rows using GraphQL syntax. Supports complex queries with variables and named operations. **Arguments:** - `query`: GraphQL query string - `user`: User identifier - `collection`: Collection identifier containing rows - `variables`: Optional GraphQL query variables - `operation_name`: Optional operation name for multi-operation queries - `**kwargs`: Additional service-specific parameters **Returns:** dict: GraphQL response with data and/or errors **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Execute GraphQL query query = ''' query GetUsers($status: String!) { users(status: $status) { id name email } } ''' result = await flow.rows_query( query=query, user="trustgraph", collection="users", variables={"status": "active"} ) for user in result.get("data", {}).get("users", []): print(f"{user['name']}: {user['email']}") ``` ### `text_completion(self, system: str, prompt: str, **kwargs: Any) -> str` Generate text completion (non-streaming). Generates a text response from an LLM given a system prompt and user prompt. Returns the complete response text. Note: This method does not support streaming. For streaming text generation, use AsyncSocketFlowInstance.text_completion() instead. **Arguments:** - `system`: System prompt defining the LLM's behavior - `prompt`: User prompt or question - `**kwargs`: Additional service-specific parameters **Returns:** str: Complete generated text response **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Generate text response = await flow.text_completion( system="You are a helpful assistant.", prompt="Explain quantum computing in simple terms." ) print(response) ``` ### `triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs: Any)` Query RDF triples using pattern matching. Searches for triples matching the specified subject, predicate, and/or object patterns. Patterns use None as a wildcard to match any value. **Arguments:** - `s`: Subject pattern (None for wildcard) - `p`: Predicate pattern (None for wildcard) - `o`: Object pattern (None for wildcard) - `user`: User identifier (None for all users) - `collection`: Collection identifier (None for all collections) - `limit`: Maximum number of triples to return (default: 100) - `**kwargs`: Additional service-specific parameters **Returns:** dict: Response containing matching triples **Example:** ```python async_flow = await api.async_flow() flow = async_flow.id("default") # Find all triples with a specific predicate results = await flow.triples_query( p="knows", user="trustgraph", collection="social", limit=50 ) for triple in results.get("triples", []): print(f"{triple['s']} knows {triple['o']}") ``` --- ## `SocketClient` ```python from trustgraph.api import SocketClient ``` Synchronous WebSocket client with persistent connection. Maintains a single websocket connection and multiplexes requests by ID via a background reader task. Provides synchronous generators for streaming responses. ### Methods ### `__init__(self, url: str, timeout: int, token: str | None) -> None` Initialize self. See help(type(self)) for accurate signature. ### `close(self) -> None` Close the persistent WebSocket connection. ### `flow(self, flow_id: str) -> 'SocketFlowInstance'` --- ## `SocketFlowInstance` ```python from trustgraph.api import SocketFlowInstance ``` Synchronous WebSocket flow instance for streaming operations. Provides the same interface as REST FlowInstance but with WebSocket-based streaming support for real-time responses. ### Methods ### `__init__(self, client: trustgraph.api.socket_client.SocketClient, flow_id: str) -> None` Initialize self. See help(type(self)) for accurate signature. ### `agent(self, question: str, user: str, state: Dict[str, Any] | None = None, group: str | None = None, history: List[Dict[str, Any]] | None = None, streaming: bool = False, **kwargs: Any) -> Dict[str, Any] | Iterator[trustgraph.api.types.StreamingChunk]` Execute an agent operation with streaming support. ### `agent_explain(self, question: str, user: str, collection: str, state: Dict[str, Any] | None = None, group: str | None = None, history: List[Dict[str, Any]] | None = None, **kwargs: Any) -> Iterator[trustgraph.api.types.StreamingChunk | trustgraph.api.types.ProvenanceEvent]` Execute an agent operation with explainability support. ### `document_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any) -> Dict[str, Any]` Query document chunks using semantic similarity. ### `document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, streaming: bool = False, **kwargs: Any) -> str | Iterator[str]` Execute document-based RAG query with optional streaming. ### `document_rag_explain(self, query: str, user: str, collection: str, doc_limit: int = 10, **kwargs: Any) -> Iterator[trustgraph.api.types.RAGChunk | trustgraph.api.types.ProvenanceEvent]` Execute document-based RAG query with explainability support. ### `embeddings(self, texts: list, **kwargs: Any) -> Dict[str, Any]` Generate vector embeddings for one or more texts. ### `graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any) -> Dict[str, Any]` Query knowledge graph entities using semantic similarity. ### `graph_rag(self, query: str, user: str, collection: str, entity_limit: int = 50, triple_limit: int = 30, max_subgraph_size: int = 1000, max_path_length: int = 2, edge_score_limit: int = 30, edge_limit: int = 25, streaming: bool = False, **kwargs: Any) -> str | Iterator[str]` Execute graph-based RAG query with optional streaming. ### `graph_rag_explain(self, query: str, user: str, collection: str, entity_limit: int = 50, triple_limit: int = 30, max_subgraph_size: int = 1000, max_path_length: int = 2, edge_score_limit: int = 30, edge_limit: int = 25, **kwargs: Any) -> Iterator[trustgraph.api.types.RAGChunk | trustgraph.api.types.ProvenanceEvent]` Execute graph-based RAG query with explainability support. ### `mcp_tool(self, name: str, parameters: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]` Execute a Model Context Protocol (MCP) tool. ### `prompt(self, id: str, variables: Dict[str, str], streaming: bool = False, **kwargs: Any) -> str | Iterator[str]` Execute a prompt template with optional streaming. ### `row_embeddings_query(self, text: str, schema_name: str, user: str = 'trustgraph', collection: str = 'default', index_name: str | None = None, limit: int = 10, **kwargs: Any) -> Dict[str, Any]` Query row data using semantic similarity on indexed fields. ### `rows_query(self, query: str, user: str, collection: str, variables: Dict[str, Any] | None = None, operation_name: str | None = None, **kwargs: Any) -> Dict[str, Any]` Execute a GraphQL query against structured rows. ### `sparql_query_stream(self, query: str, user: str = 'trustgraph', collection: str = 'default', limit: int = 10000, batch_size: int = 20, **kwargs: Any) -> Iterator[Dict[str, Any]]` Execute a SPARQL query with streaming batches. ### `text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> str | Iterator[str]` Execute text completion with optional streaming. ### `triples_query(self, s: str | Dict[str, Any] | None = None, p: str | Dict[str, Any] | None = None, o: str | Dict[str, Any] | None = None, g: str | None = None, user: str | None = None, collection: str | None = None, limit: int = 100, **kwargs: Any) -> List[Dict[str, Any]]` Query knowledge graph triples using pattern matching. ### `triples_query_stream(self, s: str | Dict[str, Any] | None = None, p: str | Dict[str, Any] | None = None, o: str | Dict[str, Any] | None = None, g: str | None = None, user: str | None = None, collection: str | None = None, limit: int = 100, batch_size: int = 20, **kwargs: Any) -> Iterator[List[Dict[str, Any]]]` Query knowledge graph triples with streaming batches. --- ## `AsyncSocketClient` ```python from trustgraph.api import AsyncSocketClient ``` Asynchronous WebSocket client with persistent connection. Maintains a single websocket connection and multiplexes requests by ID, routing responses via a background reader task. Use as an async context manager for proper lifecycle management: async with AsyncSocketClient(url, timeout, token) as client: result = await client._send_request(...) Or call connect()/aclose() manually. ### Methods ### `__aenter__(self)` ### `__aexit__(self, exc_type, exc_val, exc_tb)` ### `__init__(self, url: str, timeout: int, token: str | None)` Initialize self. See help(type(self)) for accurate signature. ### `aclose(self)` Close the persistent WebSocket connection cleanly. ### `connect(self)` Establish the persistent websocket connection. ### `flow(self, flow_id: str)` Get async flow instance for WebSocket operations --- ## `AsyncSocketFlowInstance` ```python from trustgraph.api import AsyncSocketFlowInstance ``` Asynchronous WebSocket flow instance ### Methods ### `__init__(self, client: trustgraph.api.async_socket_client.AsyncSocketClient, flow_id: str)` Initialize self. See help(type(self)) for accurate signature. ### `agent(self, question: str, user: str, state: Dict[str, Any] | None = None, group: str | None = None, history: list | None = None, streaming: bool = False, **kwargs) -> Dict[str, Any] | AsyncIterator` Agent with optional streaming ### `document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, streaming: bool = False, **kwargs)` Document RAG with optional streaming ### `embeddings(self, texts: list, **kwargs)` Generate text embeddings ### `graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs)` Query graph embeddings for semantic search ### `graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, streaming: bool = False, **kwargs)` Graph RAG with optional streaming ### `mcp_tool(self, name: str, parameters: Dict[str, Any], **kwargs)` Execute MCP tool ### `prompt(self, id: str, variables: Dict[str, str], streaming: bool = False, **kwargs)` Execute prompt with optional streaming ### `row_embeddings_query(self, text: str, schema_name: str, user: str = 'trustgraph', collection: str = 'default', index_name: str | None = None, limit: int = 10, **kwargs)` Query row embeddings for semantic search on structured data ### `rows_query(self, query: str, user: str, collection: str, variables: Dict | None = None, operation_name: str | None = None, **kwargs)` GraphQL query against structured rows ### `text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs)` Text completion with optional streaming ### `triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs)` Triple pattern query --- ### `build_term(value: Any, term_type: str | None = None, datatype: str | None = None, language: str | None = None) -> Dict[str, Any] | None` Build wire-format Term dict from a value. Auto-detection rules (when term_type is None): - Already a dict with 't' key -> return as-is (already a Term) - Starts with http://, https://, urn: -> IRI - Wrapped in <> (e.g., ) -> IRI (angle brackets stripped) - Anything else -> literal **Arguments:** - `value`: The term value (string, dict, or None) - `term_type`: One of 'iri', 'literal', or None for auto-detect - `datatype`: Datatype for literal objects (e.g., xsd:integer) - `language`: Language tag for literal objects (e.g., en) **Returns:** dict: Wire-format Term dict, or None if value is None --- ## `BulkClient` ```python from trustgraph.api import BulkClient ``` Synchronous bulk operations client for import/export. Provides efficient bulk data transfer via WebSocket for large datasets. Wraps async WebSocket operations with synchronous generators for ease of use. Note: For true async support, use AsyncBulkClient instead. ### Methods ### `__init__(self, url: str, timeout: int, token: str | None) -> None` Initialize synchronous bulk client. **Arguments:** - `url`: Base URL for TrustGraph API (HTTP/HTTPS will be converted to WS/WSS) - `timeout`: WebSocket timeout in seconds - `token`: Optional bearer token for authentication ### `close(self) -> None` Close connections ### `export_document_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]` Bulk export document embeddings from a flow. Efficiently downloads all document chunk embeddings via WebSocket streaming. **Arguments:** - `flow`: Flow identifier - `**kwargs`: Additional parameters (reserved for future use) **Returns:** Iterator[Dict[str, Any]]: Stream of embedding dictionaries **Example:** ```python bulk = api.bulk() # Export and process document embeddings for embedding in bulk.export_document_embeddings(flow="default"): chunk_id = embedding.get("chunk_id") vector = embedding.get("embedding") print(f"{chunk_id}: {len(vector)} dimensions") ``` ### `export_entity_contexts(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]` Bulk export entity contexts from a flow. Efficiently downloads all entity context information via WebSocket streaming. **Arguments:** - `flow`: Flow identifier - `**kwargs`: Additional parameters (reserved for future use) **Returns:** Iterator[Dict[str, Any]]: Stream of context dictionaries **Example:** ```python bulk = api.bulk() # Export and process entity contexts for context in bulk.export_entity_contexts(flow="default"): entity = context.get("entity") text = context.get("context") print(f"{entity}: {text[:100]}...") ``` ### `export_graph_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]` Bulk export graph embeddings from a flow. Efficiently downloads all graph entity embeddings via WebSocket streaming. **Arguments:** - `flow`: Flow identifier - `**kwargs`: Additional parameters (reserved for future use) **Returns:** Iterator[Dict[str, Any]]: Stream of embedding dictionaries **Example:** ```python bulk = api.bulk() # Export and process embeddings for embedding in bulk.export_graph_embeddings(flow="default"): entity = embedding.get("entity") vector = embedding.get("embedding") print(f"{entity}: {len(vector)} dimensions") ``` ### `export_triples(self, flow: str, **kwargs: Any) -> Iterator[trustgraph.api.types.Triple]` Bulk export RDF triples from a flow. Efficiently downloads all triples via WebSocket streaming. **Arguments:** - `flow`: Flow identifier - `**kwargs`: Additional parameters (reserved for future use) **Returns:** Iterator[Triple]: Stream of Triple objects **Example:** ```python bulk = api.bulk() # Export and process triples for triple in bulk.export_triples(flow="default"): print(f"{triple.s} -> {triple.p} -> {triple.o}") ``` ### `import_document_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None` Bulk import document embeddings into a flow. Efficiently uploads document chunk embeddings via WebSocket streaming for use in document RAG queries. **Arguments:** - `flow`: Flow identifier - `embeddings`: Iterator yielding embedding dictionaries - `**kwargs`: Additional parameters (reserved for future use) **Example:** ```python bulk = api.bulk() # Generate document embeddings to import def doc_embedding_generator(): yield {"chunk_id": "doc1/p0/c0", "embedding": [0.1, 0.2, ...]} yield {"chunk_id": "doc1/p0/c1", "embedding": [0.3, 0.4, ...]} # ... more embeddings bulk.import_document_embeddings( flow="default", embeddings=doc_embedding_generator() ) ``` ### `import_entity_contexts(self, flow: str, contexts: Iterator[Dict[str, Any]], metadata: Dict[str, Any] | None = None, batch_size: int = 100, **kwargs: Any) -> None` Bulk import entity contexts into a flow. Efficiently uploads entity context information via WebSocket streaming. Entity contexts provide additional textual context about graph entities for improved RAG performance. **Arguments:** - `flow`: Flow identifier - `contexts`: Iterator yielding context dictionaries - `metadata`: Metadata dict with id, metadata, user, collection - `batch_size`: Number of contexts per batch (default 100) - `**kwargs`: Additional parameters (reserved for future use) **Example:** ```python bulk = api.bulk() # Generate entity contexts to import def context_generator(): yield {"entity": {"v": "entity1", "e": True}, "context": "Description..."} yield {"entity": {"v": "entity2", "e": True}, "context": "Description..."} # ... more contexts bulk.import_entity_contexts( flow="default", contexts=context_generator(), metadata={"id": "doc1", "metadata": [], "user": "user1", "collection": "default"} ) ``` ### `import_graph_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None` Bulk import graph embeddings into a flow. Efficiently uploads graph entity embeddings via WebSocket streaming. **Arguments:** - `flow`: Flow identifier - `embeddings`: Iterator yielding embedding dictionaries - `**kwargs`: Additional parameters (reserved for future use) **Example:** ```python bulk = api.bulk() # Generate embeddings to import def embedding_generator(): yield {"entity": "entity1", "embedding": [0.1, 0.2, ...]} yield {"entity": "entity2", "embedding": [0.3, 0.4, ...]} # ... more embeddings bulk.import_graph_embeddings( flow="default", embeddings=embedding_generator() ) ``` ### `import_rows(self, flow: str, rows: Iterator[Dict[str, Any]], **kwargs: Any) -> None` Bulk import structured rows into a flow. Efficiently uploads structured data rows via WebSocket streaming for use in GraphQL queries. **Arguments:** - `flow`: Flow identifier - `rows`: Iterator yielding row dictionaries - `**kwargs`: Additional parameters (reserved for future use) **Example:** ```python bulk = api.bulk() # Generate rows to import def row_generator(): yield {"id": "row1", "name": "Row 1", "value": 100} yield {"id": "row2", "name": "Row 2", "value": 200} # ... more rows bulk.import_rows( flow="default", rows=row_generator() ) ``` ### `import_triples(self, flow: str, triples: Iterator[trustgraph.api.types.Triple], metadata: Dict[str, Any] | None = None, batch_size: int = 100, **kwargs: Any) -> None` Bulk import RDF triples into a flow. Efficiently uploads large numbers of triples via WebSocket streaming. **Arguments:** - `flow`: Flow identifier - `triples`: Iterator yielding Triple objects - `metadata`: Metadata dict with id, metadata, user, collection - `batch_size`: Number of triples per batch (default 100) - `**kwargs`: Additional parameters (reserved for future use) **Example:** ```python from trustgraph.api import Triple bulk = api.bulk() # Generate triples to import def triple_generator(): yield Triple(s="subj1", p="pred", o="obj1") yield Triple(s="subj2", p="pred", o="obj2") # ... more triples # Import triples bulk.import_triples( flow="default", triples=triple_generator(), metadata={"id": "doc1", "metadata": [], "user": "user1", "collection": "default"} ) ``` --- ## `AsyncBulkClient` ```python from trustgraph.api import AsyncBulkClient ``` Asynchronous bulk operations client ### Methods ### `__init__(self, url: str, timeout: int, token: str | None) -> None` Initialize self. See help(type(self)) for accurate signature. ### `aclose(self) -> None` Close connections ### `export_document_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]` Bulk export document embeddings via WebSocket ### `export_entity_contexts(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]` Bulk export entity contexts via WebSocket ### `export_graph_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]` Bulk export graph embeddings via WebSocket ### `export_triples(self, flow: str, **kwargs: Any) -> AsyncIterator[trustgraph.api.types.Triple]` Bulk export triples via WebSocket ### `import_document_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None` Bulk import document embeddings via WebSocket ### `import_entity_contexts(self, flow: str, contexts: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None` Bulk import entity contexts via WebSocket ### `import_graph_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None` Bulk import graph embeddings via WebSocket ### `import_rows(self, flow: str, rows: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None` Bulk import rows via WebSocket ### `import_triples(self, flow: str, triples: AsyncIterator[trustgraph.api.types.Triple], **kwargs: Any) -> None` Bulk import triples via WebSocket --- ## `Metrics` ```python from trustgraph.api import Metrics ``` Synchronous metrics client ### Methods ### `__init__(self, url: str, timeout: int, token: str | None) -> None` Initialize self. See help(type(self)) for accurate signature. ### `get(self) -> str` Get Prometheus metrics as text --- ## `AsyncMetrics` ```python from trustgraph.api import AsyncMetrics ``` Asynchronous metrics client ### Methods ### `__init__(self, url: str, timeout: int, token: str | None) -> None` Initialize self. See help(type(self)) for accurate signature. ### `aclose(self) -> None` Close connections ### `get(self) -> str` Get Prometheus metrics as text --- ## `ExplainabilityClient` ```python from trustgraph.api import ExplainabilityClient ``` Client for fetching explainability entities with eventual consistency handling. Uses quiescence detection: fetch, wait, fetch again, compare. If results are the same, data is stable. ### Methods ### `__init__(self, flow_instance, retry_delay: float = 0.2, max_retries: int = 10)` Initialize explainability client. **Arguments:** - `flow_instance`: A SocketFlowInstance for querying triples - `retry_delay`: Delay between retries in seconds (default: 0.2) - `max_retries`: Maximum retry attempts (default: 10) ### `detect_session_type(self, session_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None) -> str` Detect whether a session is GraphRAG or Agent type. **Arguments:** - `session_uri`: The session/question URI - `graph`: Named graph - `user`: User/keyspace identifier - `collection`: Collection identifier **Returns:** "graphrag" or "agent" ### `fetch_agent_trace(self, session_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None, api: Any = None, max_content: int = 10000) -> Dict[str, Any]` Fetch the complete Agent trace starting from a session URI. Follows the provenance chain for all patterns: - ReAct: Question -> Analysis(s) -> Conclusion - Supervisor: Question -> Decomposition -> Finding(s) -> Synthesis - Plan-then-Execute: Question -> Plan -> StepResult(s) -> Synthesis **Arguments:** - `session_uri`: The agent session/question URI - `graph`: Named graph (default: urn:graph:retrieval) - `user`: User/keyspace identifier - `collection`: Collection identifier - `api`: TrustGraph Api instance for librarian access (optional) - `max_content`: Maximum content length for conclusion **Returns:** Dict with question, steps (mixed entity list), conclusion/synthesis ### `fetch_docrag_trace(self, question_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None, api: Any = None, max_content: int = 10000) -> Dict[str, Any]` Fetch the complete DocumentRAG trace starting from a question URI. Follows the provenance chain: Question -> Grounding -> Exploration -> Synthesis **Arguments:** - `question_uri`: The question entity URI - `graph`: Named graph (default: urn:graph:retrieval) - `user`: User/keyspace identifier - `collection`: Collection identifier - `api`: TrustGraph Api instance for librarian access (optional) - `max_content`: Maximum content length for synthesis **Returns:** Dict with question, grounding, exploration, synthesis entities ### `fetch_document_content(self, document_uri: str, api: Any, user: str | None = None, max_content: int = 10000) -> str` Fetch content from the librarian by document URI. **Arguments:** - `document_uri`: The document URI in the librarian - `api`: TrustGraph Api instance for librarian access - `user`: User identifier for librarian - `max_content`: Maximum content length to return **Returns:** The document content as a string ### `fetch_edge_selection(self, uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None) -> trustgraph.api.explainability.EdgeSelection | None` Fetch an edge selection entity (used by Focus). **Arguments:** - `uri`: The edge selection URI - `graph`: Named graph to query - `user`: User/keyspace identifier - `collection`: Collection identifier **Returns:** EdgeSelection or None if not found ### `fetch_entity(self, uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None) -> trustgraph.api.explainability.ExplainEntity | None` Fetch an explainability entity by URI with eventual consistency handling. Uses quiescence detection: 1. Fetch triples for URI 2. If zero results, retry 3. If non-zero results, wait and fetch again 4. If same results, data is stable - parse and return 5. If different results, data still being written - retry **Arguments:** - `uri`: The entity URI to fetch - `graph`: Named graph to query (e.g., "urn:graph:retrieval") - `user`: User/keyspace identifier - `collection`: Collection identifier **Returns:** ExplainEntity subclass or None if not found ### `fetch_focus_with_edges(self, uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None) -> trustgraph.api.explainability.Focus | None` Fetch a Focus entity and all its edge selections. **Arguments:** - `uri`: The Focus entity URI - `graph`: Named graph to query - `user`: User/keyspace identifier - `collection`: Collection identifier **Returns:** Focus with populated edge_selections, or None ### `fetch_graphrag_trace(self, question_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None, api: Any = None, max_content: int = 10000) -> Dict[str, Any]` Fetch the complete GraphRAG trace starting from a question URI. Follows the provenance chain: Question -> Grounding -> Exploration -> Focus -> Synthesis **Arguments:** - `question_uri`: The question entity URI - `graph`: Named graph (default: urn:graph:retrieval) - `user`: User/keyspace identifier - `collection`: Collection identifier - `api`: TrustGraph Api instance for librarian access (optional) - `max_content`: Maximum content length for synthesis **Returns:** Dict with question, grounding, exploration, focus, synthesis entities ### `list_sessions(self, graph: str | None = None, user: str | None = None, collection: str | None = None, limit: int = 50) -> List[trustgraph.api.explainability.Question]` List all explainability sessions (questions) in a collection. **Arguments:** - `graph`: Named graph (default: urn:graph:retrieval) - `user`: User/keyspace identifier - `collection`: Collection identifier - `limit`: Maximum number of sessions to return **Returns:** List of Question entities sorted by timestamp (newest first) ### `resolve_edge_labels(self, edge: Dict[str, str], user: str | None = None, collection: str | None = None) -> Tuple[str, str, str]` Resolve labels for all components of an edge triple. **Arguments:** - `edge`: Dict with "s", "p", "o" keys - `user`: User/keyspace identifier - `collection`: Collection identifier **Returns:** Tuple of (s_label, p_label, o_label) ### `resolve_label(self, uri: str, user: str | None = None, collection: str | None = None) -> str` Resolve rdfs:label for a URI, with caching. **Arguments:** - `uri`: The URI to get label for - `user`: User/keyspace identifier - `collection`: Collection identifier **Returns:** The label if found, otherwise the URI itself --- ## `ExplainEntity` ```python from trustgraph.api import ExplainEntity ``` Base class for explainability entities. **Fields:** - `uri`: - `entity_type`: ### Methods ### `__init__(self, uri: str, entity_type: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `Question` ```python from trustgraph.api import Question ``` Question entity - the user's query that started the session. **Fields:** - `uri`: - `entity_type`: - `query`: - `timestamp`: - `question_type`: ### Methods ### `__init__(self, uri: str, entity_type: str = '', query: str = '', timestamp: str = '', question_type: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `Exploration` ```python from trustgraph.api import Exploration ``` Exploration entity - edges/chunks retrieved from the knowledge store. **Fields:** - `uri`: - `entity_type`: - `edge_count`: - `chunk_count`: - `entities`: typing.List[str] ### Methods ### `__init__(self, uri: str, entity_type: str = '', edge_count: int = 0, chunk_count: int = 0, entities: List[str] = ) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `Focus` ```python from trustgraph.api import Focus ``` Focus entity - selected edges with LLM reasoning (GraphRAG only). **Fields:** - `uri`: - `entity_type`: - `selected_edge_uris`: typing.List[str] - `edge_selections`: typing.List[trustgraph.api.explainability.EdgeSelection] ### Methods ### `__init__(self, uri: str, entity_type: str = '', selected_edge_uris: List[str] = , edge_selections: List[trustgraph.api.explainability.EdgeSelection] = ) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `Synthesis` ```python from trustgraph.api import Synthesis ``` Synthesis entity - the final answer. **Fields:** - `uri`: - `entity_type`: - `document`: ### Methods ### `__init__(self, uri: str, entity_type: str = '', document: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `Analysis` ```python from trustgraph.api import Analysis ``` Analysis+ToolUse entity - decision + tool call (Agent only). **Fields:** - `uri`: - `entity_type`: - `action`: - `arguments`: - `thought`: ### Methods ### `__init__(self, uri: str, entity_type: str = '', action: str = '', arguments: str = '', thought: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `Observation` ```python from trustgraph.api import Observation ``` Observation entity - standalone tool result (Agent only). **Fields:** - `uri`: - `entity_type`: - `document`: ### Methods ### `__init__(self, uri: str, entity_type: str = '', document: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `Conclusion` ```python from trustgraph.api import Conclusion ``` Conclusion entity - final answer (Agent only). **Fields:** - `uri`: - `entity_type`: - `document`: ### Methods ### `__init__(self, uri: str, entity_type: str = '', document: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `EdgeSelection` ```python from trustgraph.api import EdgeSelection ``` A selected edge with reasoning from GraphRAG Focus step. **Fields:** - `uri`: - `edge`: typing.Dict[str, str] | None - `reasoning`: ### Methods ### `__init__(self, uri: str, edge: Dict[str, str] | None = None, reasoning: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ### `wire_triples_to_tuples(wire_triples: List[Dict[str, Any]]) -> List[Tuple[str, str, Any]]` Convert wire-format triples to (s, p, o) tuples. --- ### `extract_term_value(term: Dict[str, Any]) -> Any` Extract value from a wire-format Term dict. --- ## `Triple` ```python from trustgraph.api import Triple ``` RDF triple representing a knowledge graph statement. **Fields:** - `s`: - `p`: - `o`: ### Methods ### `__init__(self, s: str, p: str, o: str) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `Uri` ```python from trustgraph.api import Uri ``` str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'. ### Methods ### `is_literal(self)` ### `is_triple(self)` ### `is_uri(self)` --- ## `Literal` ```python from trustgraph.api import Literal ``` str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'. ### Methods ### `is_literal(self)` ### `is_triple(self)` ### `is_uri(self)` --- ## `ConfigKey` ```python from trustgraph.api import ConfigKey ``` Configuration key identifier. **Fields:** - `type`: - `key`: ### Methods ### `__init__(self, type: str, key: str) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `ConfigValue` ```python from trustgraph.api import ConfigValue ``` Configuration key-value pair. **Fields:** - `type`: - `key`: - `value`: ### Methods ### `__init__(self, type: str, key: str, value: str) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `DocumentMetadata` ```python from trustgraph.api import DocumentMetadata ``` Metadata for a document in the library. **Attributes:** - `parent_id: Parent document ID for child documents (empty for top`: level docs) **Fields:** - `id`: - `time`: - `kind`: - `title`: - `comments`: - `metadata`: typing.List[trustgraph.api.types.Triple] - `user`: - `tags`: typing.List[str] - `parent_id`: - `document_type`: ### Methods ### `__init__(self, id: str, time: datetime.datetime, kind: str, title: str, comments: str, metadata: List[trustgraph.api.types.Triple], user: str, tags: List[str], parent_id: str = '', document_type: str = 'source') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `ProcessingMetadata` ```python from trustgraph.api import ProcessingMetadata ``` Metadata for an active document processing job. **Fields:** - `id`: - `document_id`: - `time`: - `flow`: - `user`: - `collection`: - `tags`: typing.List[str] ### Methods ### `__init__(self, id: str, document_id: str, time: datetime.datetime, flow: str, user: str, collection: str, tags: List[str]) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `CollectionMetadata` ```python from trustgraph.api import CollectionMetadata ``` Metadata for a data collection. Collections provide logical grouping and isolation for documents and knowledge graph data. **Attributes:** - `name: Human`: readable collection name **Fields:** - `user`: - `collection`: - `name`: - `description`: - `tags`: typing.List[str] ### Methods ### `__init__(self, user: str, collection: str, name: str, description: str, tags: List[str]) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `StreamingChunk` ```python from trustgraph.api import StreamingChunk ``` Base class for streaming response chunks. Used for WebSocket-based streaming operations where responses are delivered incrementally as they are generated. **Fields:** - `content`: - `end_of_message`: ### Methods ### `__init__(self, content: str, end_of_message: bool = False) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `AgentThought` ```python from trustgraph.api import AgentThought ``` Agent reasoning/thought process chunk. Represents the agent's internal reasoning or planning steps during execution. These chunks show how the agent is thinking about the problem. **Fields:** - `content`: - `end_of_message`: - `chunk_type`: - `message_id`: ### Methods ### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'thought', message_id: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `AgentObservation` ```python from trustgraph.api import AgentObservation ``` Agent tool execution observation chunk. Represents the result or observation from executing a tool or action. These chunks show what the agent learned from using tools. **Fields:** - `content`: - `end_of_message`: - `chunk_type`: - `message_id`: ### Methods ### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'observation', message_id: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `AgentAnswer` ```python from trustgraph.api import AgentAnswer ``` Agent final answer chunk. Represents the agent's final response to the user's query after completing its reasoning and tool use. **Attributes:** - `chunk_type: Always "final`: answer" **Fields:** - `content`: - `end_of_message`: - `chunk_type`: - `end_of_dialog`: - `message_id`: ### Methods ### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'final-answer', end_of_dialog: bool = False, message_id: str = '') -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `RAGChunk` ```python from trustgraph.api import RAGChunk ``` RAG (Retrieval-Augmented Generation) streaming chunk. Used for streaming responses from graph RAG, document RAG, text completion, and other generative services. **Fields:** - `content`: - `end_of_message`: - `chunk_type`: - `end_of_stream`: - `error`: typing.Dict[str, str] | None ### Methods ### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'rag', end_of_stream: bool = False, error: Dict[str, str] | None = None) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `ProvenanceEvent` ```python from trustgraph.api import ProvenanceEvent ``` Provenance event for explainability. Emitted during retrieval queries when explainable mode is enabled. Each event represents a provenance node created during query processing. **Fields:** - `explain_id`: - `explain_graph`: - `event_type`: - `entity`: - `triples`: ### Methods ### `__init__(self, explain_id: str, explain_graph: str = '', event_type: str = '', entity: object = None, triples: list = ) -> None` Initialize self. See help(type(self)) for accurate signature. --- ## `ProtocolException` ```python from trustgraph.api import ProtocolException ``` Raised when WebSocket protocol errors occur --- ## `TrustGraphException` ```python from trustgraph.api import TrustGraphException ``` Base class for all TrustGraph service errors --- ## `AgentError` ```python from trustgraph.api import AgentError ``` Agent service error --- ## `ConfigError` ```python from trustgraph.api import ConfigError ``` Configuration service error --- ## `DocumentRagError` ```python from trustgraph.api import DocumentRagError ``` Document RAG retrieval error --- ## `FlowError` ```python from trustgraph.api import FlowError ``` Flow management error --- ## `GatewayError` ```python from trustgraph.api import GatewayError ``` API Gateway error --- ## `GraphRagError` ```python from trustgraph.api import GraphRagError ``` Graph RAG retrieval error --- ## `LLMError` ```python from trustgraph.api import LLMError ``` LLM service error --- ## `LoadError` ```python from trustgraph.api import LoadError ``` Data loading error --- ## `LookupError` ```python from trustgraph.api import LookupError ``` Lookup/search error --- ## `NLPQueryError` ```python from trustgraph.api import NLPQueryError ``` NLP query service error --- ## `RowsQueryError` ```python from trustgraph.api import RowsQueryError ``` Rows query service error --- ## `RequestError` ```python from trustgraph.api import RequestError ``` Request processing error --- ## `StructuredQueryError` ```python from trustgraph.api import StructuredQueryError ``` Structured query service error --- ## `UnexpectedError` ```python from trustgraph.api import UnexpectedError ``` Unexpected/unknown error --- ## `ApplicationException` ```python from trustgraph.api import ApplicationException ``` Base class for all TrustGraph service errors ---