diff --git a/docs/README.api-docs.md b/docs/README.api-docs.md new file mode 100644 index 00000000..8f7fecb8 --- /dev/null +++ b/docs/README.api-docs.md @@ -0,0 +1,48 @@ + +# Auto-generating docs + +## REST and WebSocket API Documentation + +- `specs/build-docs.sh` - Builds the REST and websocket documentation from the + OpenAPI and AsyncAPI specs. + +## Python API Documentation + +The Python API documentation is generated from docstrings using a custom Python script that introspects the `trustgraph.api` package. + +### Prerequisites + +The trustgraph package must be importable. If you're working in a development environment: + +```bash +cd trustgraph-base +pip install -e . +``` + +### Generating Documentation + +From the docs directory: + +```bash +cd docs +python3 generate-api-docs.py > python-api.md +``` + +This generates a single markdown file with complete API documentation showing: +- Installation and quick start guide +- Import statements for each class/type +- Full docstrings with examples +- Table of contents organized by category + +### Documentation Style + +All docstrings follow Google-style format: +- Brief one-line summary +- Detailed description +- Args section with parameter descriptions +- Returns section +- Raises section (when applicable) +- Example code blocks with proper syntax highlighting + +The generated documentation shows the public API exactly as users import it from `trustgraph.api`, without exposing internal module structure. + diff --git a/docs/generate-api-docs.py b/docs/generate-api-docs.py new file mode 100644 index 00000000..c8d54fda --- /dev/null +++ b/docs/generate-api-docs.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +""" +Generate clean markdown documentation for trustgraph.api + +This script introspects the trustgraph.api package and generates markdown +documentation showing the API as users actually import it. +""" + +import sys +import inspect +import importlib +from dataclasses import is_dataclass, fields +from typing import get_type_hints + +# Add parent directory to path +sys.path.insert(0, '../trustgraph-base') + +def parse_docstring(docstring): + """Parse Google-style docstring into sections""" + if not docstring: + return {"description": "", "args": [], "returns": "", "raises": [], "examples": []} + + lines = docstring.split('\n') + result = { + "description": [], + "args": [], + "returns": "", + "raises": [], + "examples": [], + "attributes": [] + } + + current_section = "description" + current_item = None + + for line in lines: + stripped = line.strip() + + # Check for section headers + if stripped in ["Args:", "Arguments:"]: + current_section = "args" + current_item = None + continue + elif stripped in ["Returns:", "Return:"]: + current_section = "returns" + current_item = None + continue + elif stripped in ["Raises:"]: + current_section = "raises" + current_item = None + continue + elif stripped in ["Example:", "Examples:"]: + current_section = "examples" + current_item = None + continue + elif stripped in ["Attributes:"]: + current_section = "attributes" + current_item = None + continue + elif stripped.startswith("Note:"): + current_section = "description" + result["description"].append(line) + continue + + # Process content based on section + if current_section == "description": + result["description"].append(line) + elif current_section == "args": + # Check if this is a new argument (starts with word followed by colon) + if stripped and not line.startswith(' ' * 8) and ':' in stripped: + parts = stripped.split(':', 1) + arg_name = parts[0].strip() + arg_desc = parts[1].strip() if len(parts) > 1 else "" + current_item = {"name": arg_name, "description": arg_desc} + result["args"].append(current_item) + elif current_item and stripped: + # Continuation of previous arg description + current_item["description"] += " " + stripped + elif current_section == "returns": + if stripped: + result["returns"] += stripped + " " + elif current_section == "raises": + if stripped and ':' in stripped: + parts = stripped.split(':', 1) + exc_name = parts[0].strip() + exc_desc = parts[1].strip() if len(parts) > 1 else "" + current_item = {"name": exc_name, "description": exc_desc} + result["raises"].append(current_item) + elif current_item and stripped: + current_item["description"] += " " + stripped + elif current_section == "examples": + result["examples"].append(line) + elif current_section == "attributes": + if stripped and '-' in stripped: + parts = stripped.split('-', 1) + if len(parts) == 2: + attr_name = parts[0].strip().strip('`') + attr_desc = parts[1].strip() + result["attributes"].append({"name": attr_name, "description": attr_desc}) + + # Clean up description + result["description"] = '\n'.join(result["description"]).strip() + result["returns"] = result["returns"].strip() + + return result + +def format_signature(name, obj): + """Format function/method signature""" + try: + sig = inspect.signature(obj) + return f"{name}{sig}" + except: + return f"{name}(...)" + +def document_function(name, func, indent=0): + """Generate markdown for a function""" + ind = " " * indent + md = [] + + # Function heading and signature + md.append(f"{ind}### `{format_signature(name, func)}`\n") + + # Parse docstring + doc = inspect.getdoc(func) + if doc: + parsed = parse_docstring(doc) + + # Description + if parsed["description"]: + md.append(f"{ind}{parsed['description']}\n") + + # Arguments + if parsed["args"]: + md.append(f"{ind}**Arguments:**\n") + for arg in parsed["args"]: + md.append(f"{ind}- `{arg['name']}`: {arg['description']}") + md.append("") + + # Returns + if parsed["returns"]: + md.append(f"{ind}**Returns:** {parsed['returns']}\n") + + # Raises + if parsed["raises"]: + md.append(f"{ind}**Raises:**\n") + for exc in parsed["raises"]: + md.append(f"{ind}- `{exc['name']}`: {exc['description']}") + md.append("") + + # Examples + if parsed["examples"]: + md.append(f"{ind}**Example:**\n") + + # Strip common leading whitespace from examples + import textwrap + example_text = '\n'.join(parsed["examples"]) + dedented = textwrap.dedent(example_text) + example_lines = dedented.split('\n') + + # Check if examples already contain code fences + if '```' in dedented: + # Already has code fences, don't wrap + for line in example_lines: + md.append(line.rstrip()) + md.append("") + else: + # No code fences, wrap in python block + md.append("```python") + for line in example_lines: + md.append(line.rstrip()) + md.append("```\n") + + return '\n'.join(md) + +def document_class(name, cls): + """Generate markdown for a class""" + md = [] + + # Class heading + md.append(f"## `{name}`\n") + + # Import statement + md.append(f"```python") + md.append(f"from trustgraph.api import {name}") + md.append(f"```\n") + + # Parse class docstring + doc = inspect.getdoc(cls) + if doc: + parsed = parse_docstring(doc) + + # Description + if parsed["description"]: + md.append(f"{parsed['description']}\n") + + # Attributes (for class-level attributes) + if parsed["attributes"]: + md.append(f"**Attributes:**\n") + for attr in parsed["attributes"]: + md.append(f"- `{attr['name']}`: {attr['description']}") + md.append("") + + # For dataclasses, show fields + if is_dataclass(cls): + md.append("**Fields:**\n") + for field in fields(cls): + field_doc = "" + if cls.__doc__: + # Try to extract field description from docstring + pass + md.append(f"- `{field.name}`: {field.type}") + md.append("") + + # Document methods + methods = [] + for method_name, method in inspect.getmembers(cls, predicate=inspect.isfunction): + # Skip private methods except special ones + if method_name.startswith('_') and method_name not in ['__init__', '__enter__', '__exit__', '__aenter__', '__aexit__']: + continue + methods.append((method_name, method)) + + if methods: + md.append("### Methods\n") + for method_name, method in methods: + md.append(document_function(method_name, method, indent=0)) + + return '\n'.join(md) + +def document_exception(name, exc): + """Generate markdown for an exception""" + md = [] + + md.append(f"## `{name}`\n") + + # Import statement + md.append(f"```python") + md.append(f"from trustgraph.api import {name}") + md.append(f"```\n") + + doc = inspect.getdoc(exc) + if doc: + md.append(f"{doc}\n") + else: + md.append(f"Exception class for {name.replace('Exception', '').replace('Error', '')} errors.\n") + + return '\n'.join(md) + +def generate_toc(items): + """Generate table of contents""" + md = [] + md.append("# TrustGraph Python API Reference\n") + + # Add introduction + md.append("## Installation\n") + md.append("```bash") + md.append("pip install trustgraph") + md.append("```\n") + + md.append("## Quick Start\n") + md.append("All classes and types are imported from the `trustgraph.api` package:\n") + md.append("```python") + md.append("from trustgraph.api import Api, Triple, ConfigKey") + md.append("") + md.append("# Create API client") + md.append("api = Api(url=\"http://localhost:8088/\")") + md.append("") + md.append("# Get a flow instance") + md.append("flow = api.flow().id(\"default\")") + md.append("") + md.append("# Execute a graph RAG query") + md.append("response = flow.graph_rag(") + md.append(" query=\"What are the main topics?\",") + md.append(" user=\"trustgraph\",") + md.append(" collection=\"default\"") + md.append(")") + md.append("```\n") + + md.append("## Table of Contents\n") + + # Group by category + categories = { + "Core": ["Api"], + "Flow Clients": ["Flow", "FlowInstance", "AsyncFlow", "AsyncFlowInstance"], + "WebSocket Clients": ["SocketClient", "SocketFlowInstance", "AsyncSocketClient", "AsyncSocketFlowInstance"], + "Bulk Operations": ["BulkClient", "AsyncBulkClient"], + "Metrics": ["Metrics", "AsyncMetrics"], + "Data Types": ["Triple", "ConfigKey", "ConfigValue", "DocumentMetadata", "ProcessingMetadata", + "CollectionMetadata", "StreamingChunk", "AgentThought", "AgentObservation", + "AgentAnswer", "RAGChunk"], + "Exceptions": [] + } + + # Find exceptions + for item in items: + if "Exception" in item or "Error" in item: + categories["Exceptions"].append(item) + + for category, names in categories.items(): + if not names: + continue + md.append(f"### {category}\n") + for name in names: + if name in items: + md.append(f"- [{name}](#{name.lower()})") + md.append("") + + return '\n'.join(md) + +def main(): + """Generate API documentation""" + + # Import the package + try: + api_module = importlib.import_module('trustgraph.api') + except ImportError as e: + print(f"Error importing trustgraph.api: {e}", file=sys.stderr) + sys.exit(1) + + # Get exported names + if not hasattr(api_module, '__all__'): + print("Error: trustgraph.api has no __all__", file=sys.stderr) + sys.exit(1) + + all_names = api_module.__all__ + + # Generate TOC + print(generate_toc(all_names)) + print("---\n") + + # Document each exported item + for name in all_names: + try: + obj = getattr(api_module, name) + + # Determine what kind of object it is + if inspect.isclass(obj): + if issubclass(obj, Exception): + print(document_exception(name, obj)) + else: + print(document_class(name, obj)) + elif inspect.isfunction(obj): + print(document_function(name, obj)) + + print("\n---\n") + + except Exception as e: + print(f"Error documenting {name}: {e}", file=sys.stderr) + continue + +if __name__ == "__main__": + main() diff --git a/docs/python-api.md b/docs/python-api.md new file mode 100644 index 00000000..47e5843e --- /dev/null +++ b/docs/python-api.md @@ -0,0 +1,2143 @@ +# TrustGraph Python API Reference + +## Installation + +```bash +pip install trustgraph +``` + +## Quick Start + +All classes and types are imported from the `trustgraph.api` package: + +```python +from trustgraph.api import Api, Triple, ConfigKey + +# Create API client +api = Api(url="http://localhost:8088/") + +# Get a flow instance +flow = api.flow().id("default") + +# Execute a graph RAG query +response = flow.graph_rag( + query="What are the main topics?", + user="trustgraph", + collection="default" +) +``` + +## Table of Contents + +### Core + +- [Api](#api) + +### Flow Clients + +- [Flow](#flow) +- [FlowInstance](#flowinstance) +- [AsyncFlow](#asyncflow) +- [AsyncFlowInstance](#asyncflowinstance) + +### WebSocket Clients + +- [SocketClient](#socketclient) +- [SocketFlowInstance](#socketflowinstance) +- [AsyncSocketClient](#asyncsocketclient) +- [AsyncSocketFlowInstance](#asyncsocketflowinstance) + +### Bulk Operations + +- [BulkClient](#bulkclient) +- [AsyncBulkClient](#asyncbulkclient) + +### Metrics + +- [Metrics](#metrics) +- [AsyncMetrics](#asyncmetrics) + +### Data Types + +- [Triple](#triple) +- [ConfigKey](#configkey) +- [ConfigValue](#configvalue) +- [DocumentMetadata](#documentmetadata) +- [ProcessingMetadata](#processingmetadata) +- [CollectionMetadata](#collectionmetadata) +- [StreamingChunk](#streamingchunk) +- [AgentThought](#agentthought) +- [AgentObservation](#agentobservation) +- [AgentAnswer](#agentanswer) +- [RAGChunk](#ragchunk) + +### Exceptions + +- [ProtocolException](#protocolexception) +- [TrustGraphException](#trustgraphexception) +- [AgentError](#agenterror) +- [ConfigError](#configerror) +- [DocumentRagError](#documentragerror) +- [FlowError](#flowerror) +- [GatewayError](#gatewayerror) +- [GraphRagError](#graphragerror) +- [LLMError](#llmerror) +- [LoadError](#loaderror) +- [LookupError](#lookuperror) +- [NLPQueryError](#nlpqueryerror) +- [ObjectsQueryError](#objectsqueryerror) +- [RequestError](#requesterror) +- [StructuredQueryError](#structuredqueryerror) +- [UnexpectedError](#unexpectederror) +- [ApplicationException](#applicationexception) + +--- + +## `Api` + +```python +from trustgraph.api import Api +``` + +Main TrustGraph API client for synchronous and asynchronous operations. + +This class provides access to all TrustGraph services including flow management, +knowledge graph operations, document processing, RAG queries, and more. It supports +both REST-based and WebSocket-based communication patterns. + +The client can be used as a context manager for automatic resource cleanup: + ```python + with Api(url="http://localhost:8088/") as api: + result = api.flow().id("default").graph_rag(query="test") + ``` + +### Methods + +### `__aenter__(self)` + +Enter asynchronous context manager. + +### `__aexit__(self, *args)` + +Exit asynchronous context manager and close connections. + +### `__enter__(self)` + +Enter synchronous context manager. + +### `__exit__(self, *args)` + +Exit synchronous context manager and close connections. + +### `__init__(self, url='http://localhost:8088/', timeout=60, token: Optional[str] = None)` + +Initialize the TrustGraph API client. + +**Arguments:** + +- `url`: Base URL for TrustGraph API (default: "http://localhost:8088/") +- `timeout`: Request timeout in seconds (default: 60) +- `token`: Optional bearer token for authentication + +**Example:** + +```python +# Local development +api = Api() + +# Production with authentication +api = Api( + url="https://trustgraph.example.com/", + timeout=120, + token="your-api-token" +) +``` + +### `aclose(self)` + +Close all asynchronous client connections. + +This method closes async WebSocket, bulk operation, and flow connections. +It is automatically called when exiting an async context manager. + +**Example:** + +```python +api = Api() +async_socket = api.async_socket() +# ... use async_socket +await api.aclose() # Clean up connections + +# Or use async context manager (automatic cleanup) +async with Api() as api: + async_socket = api.async_socket() + # ... use async_socket +# Automatically closed +``` + +### `async_bulk(self)` + +Get an asynchronous bulk operations client. + +Provides async/await style bulk import/export operations via WebSocket +for efficient handling of large datasets. + +**Returns:** AsyncBulkClient: Asynchronous bulk operations client + +**Example:** + +```python +async_bulk = api.async_bulk() + +# Export triples asynchronously +async for triple in async_bulk.export_triples(flow="default"): + print(f"{triple.s} {triple.p} {triple.o}") + +# Import with async generator +async def triple_gen(): + yield Triple(s="subj", p="pred", o="obj") + # ... more triples + +await async_bulk.import_triples( + flow="default", + triples=triple_gen() +) +``` + +### `async_flow(self)` + +Get an asynchronous REST-based flow client. + +Provides async/await style access to flow operations. This is preferred +for async Python applications and frameworks (FastAPI, aiohttp, etc.). + +**Returns:** AsyncFlow: Asynchronous flow client + +**Example:** + +```python +async_flow = api.async_flow() + +# List flows +flow_ids = await async_flow.list() + +# Execute operations +instance = async_flow.id("default") +result = await instance.text_completion( + system="You are helpful", + prompt="Hello" +) +``` + +### `async_metrics(self)` + +Get an asynchronous metrics client. + +Provides async/await style access to Prometheus metrics. + +**Returns:** AsyncMetrics: Asynchronous metrics client + +**Example:** + +```python +async_metrics = api.async_metrics() +prometheus_text = await async_metrics.get() +print(prometheus_text) +``` + +### `async_socket(self)` + +Get an asynchronous WebSocket client for streaming operations. + +Provides async/await style WebSocket access with streaming support. +This is the preferred method for async streaming in Python. + +**Returns:** AsyncSocketClient: Asynchronous WebSocket client + +**Example:** + +```python +async_socket = api.async_socket() +flow = async_socket.flow("default") + +# Stream agent responses +async for chunk in flow.agent( + question="Explain quantum computing", + user="trustgraph", + streaming=True +): + if hasattr(chunk, 'content'): + print(chunk.content, end='', flush=True) +``` + +### `bulk(self)` + +Get a synchronous bulk operations client for import/export. + +Bulk operations allow efficient transfer of large datasets via WebSocket +connections, including triples, embeddings, entity contexts, and objects. + +**Returns:** BulkClient: Synchronous bulk operations client + +**Example:** + +```python +bulk = api.bulk() + +# Export triples +for triple in bulk.export_triples(flow="default"): + print(f"{triple.s} {triple.p} {triple.o}") + +# Import triples +def triple_generator(): + yield Triple(s="subj", p="pred", o="obj") + # ... more triples + +bulk.import_triples(flow="default", triples=triple_generator()) +``` + +### `close(self)` + +Close all synchronous client connections. + +This method closes WebSocket and bulk operation connections. +It is automatically called when exiting a context manager. + +**Example:** + +```python +api = Api() +socket = api.socket() +# ... use socket +api.close() # Clean up connections + +# Or use context manager (automatic cleanup) +with Api() as api: + socket = api.socket() + # ... use socket +# Automatically closed +``` + +### `collection(self)` + +Get a Collection client for managing data collections. + +Collections organize documents and knowledge graph data into +logical groupings for isolation and access control. + +**Returns:** Collection: Collection management client + +**Example:** + +```python +collection = api.collection() + +# List collections +colls = collection.list_collections(user="trustgraph") + +# Update collection metadata +collection.update_collection( + user="trustgraph", + collection="default", + name="Default Collection", + description="Main data collection" +) +``` + +### `config(self)` + +Get a Config client for managing configuration settings. + +**Returns:** Config: Configuration management client + +**Example:** + +```python +config = api.config() + +# Get configuration values +values = config.get([ConfigKey(type="llm", key="model")]) + +# Set configuration +config.put([ConfigValue(type="llm", key="model", value="gpt-4")]) +``` + +### `flow(self)` + +Get a Flow client for managing and interacting with flows. + +Flows are the primary execution units in TrustGraph, providing access to +services like agents, RAG queries, embeddings, and document processing. + +**Returns:** Flow: Flow management client + +**Example:** + +```python +flow_client = api.flow() + +# List available blueprints +blueprints = flow_client.list_blueprints() + +# Get a specific flow instance +flow_instance = flow_client.id("default") +response = flow_instance.text_completion( + system="You are helpful", + prompt="Hello" +) +``` + +### `knowledge(self)` + +Get a Knowledge client for managing knowledge graph cores. + +**Returns:** Knowledge: Knowledge graph management client + +**Example:** + +```python +knowledge = api.knowledge() + +# List available KG cores +cores = knowledge.list_kg_cores(user="trustgraph") + +# Load a KG core +knowledge.load_kg_core(id="core-123", user="trustgraph") +``` + +### `library(self)` + +Get a Library client for document management. + +The library provides document storage, metadata management, and +processing workflow coordination. + +**Returns:** Library: Document library management client + +**Example:** + +```python +library = api.library() + +# Add a document +library.add_document( + document=b"Document content", + id="doc-123", + metadata=[], + user="trustgraph", + title="My Document", + comments="Test document" +) + +# List documents +docs = library.get_documents(user="trustgraph") +``` + +### `metrics(self)` + +Get a synchronous metrics client for monitoring. + +Retrieves Prometheus-formatted metrics from the TrustGraph service +for monitoring and observability. + +**Returns:** Metrics: Synchronous metrics client + +**Example:** + +```python +metrics = api.metrics() +prometheus_text = metrics.get() +print(prometheus_text) +``` + +### `request(self, path, request)` + +Make a low-level REST API request. + +This method is primarily for internal use but can be used for direct +API access when needed. + +**Arguments:** + +- `path`: API endpoint path (relative to base URL) +- `request`: Request payload as a dictionary + +**Returns:** dict: Response object + +**Raises:** + +- `ProtocolException`: If the response status is not 200 or response is not JSON +- `ApplicationException`: If the response contains an error + +**Example:** + +```python +response = api.request("flow", { + "operation": "list-flows" +}) +``` + +### `socket(self)` + +Get a synchronous WebSocket client for streaming operations. + +WebSocket connections provide streaming support for real-time responses +from agents, RAG queries, and text completions. This method returns a +synchronous wrapper around the WebSocket protocol. + +**Returns:** SocketClient: Synchronous WebSocket client + +**Example:** + +```python +socket = api.socket() +flow = socket.flow("default") + +# Stream agent responses +for chunk in flow.agent( + question="Explain quantum computing", + user="trustgraph", + streaming=True +): + if hasattr(chunk, 'content'): + print(chunk.content, end='', flush=True) +``` + + +--- + +## `Flow` + +```python +from trustgraph.api import Flow +``` + +Flow management client for blueprint and flow instance operations. + +This class provides methods for managing flow blueprints (templates) and +flow instances (running flows). Blueprints define the structure and +parameters of flows, while instances represent active flows that can +execute services. + +### Methods + +### `__init__(self, api)` + +Initialize Flow client. + +**Arguments:** + +- `api`: Parent Api instance for making requests + +### `delete_blueprint(self, blueprint_name)` + +Delete a flow blueprint. + +**Arguments:** + +- `blueprint_name`: Name of the blueprint to delete + +**Example:** + +```python +api.flow().delete_blueprint("old-blueprint") +``` + +### `get(self, id)` + +Get the definition of a running flow instance. + +**Arguments:** + +- `id`: Flow instance ID + +**Returns:** dict: Flow instance definition + +**Example:** + +```python +flow_def = api.flow().get("default") +print(flow_def) +``` + +### `get_blueprint(self, blueprint_name)` + +Get a flow blueprint definition by name. + +**Arguments:** + +- `blueprint_name`: Name of the blueprint to retrieve + +**Returns:** dict: Blueprint definition as a dictionary + +**Example:** + +```python +blueprint = api.flow().get_blueprint("default") +print(blueprint) # Blueprint configuration +``` + +### `id(self, id='default')` + +Get a FlowInstance for executing operations on a specific flow. + +**Arguments:** + +- `id`: Flow identifier (default: "default") + +**Returns:** FlowInstance: Flow instance for service operations + +**Example:** + +```python +flow = api.flow().id("my-flow") +response = flow.text_completion( + system="You are helpful", + prompt="Hello" +) +``` + +### `list(self)` + +List all active flow instances. + +**Returns:** list[str]: List of flow instance IDs + +**Example:** + +```python +flows = api.flow().list() +print(flows) # ['default', 'flow-1', 'flow-2', ...] +``` + +### `list_blueprints(self)` + +List all available flow blueprints. + +**Returns:** list[str]: List of blueprint names + +**Example:** + +```python +blueprints = api.flow().list_blueprints() +print(blueprints) # ['default', 'custom-flow', ...] +``` + +### `put_blueprint(self, blueprint_name, definition)` + +Create or update a flow blueprint. + +**Arguments:** + +- `blueprint_name`: Name for the blueprint +- `definition`: Blueprint definition dictionary + +**Example:** + +```python +definition = { + "services": ["text-completion", "graph-rag"], + "parameters": {"model": "gpt-4"} +} +api.flow().put_blueprint("my-blueprint", definition) +``` + +### `request(self, path=None, request=None)` + +Make a flow-scoped API request. + +**Arguments:** + +- `path`: Optional path suffix for flow endpoints +- `request`: Request payload dictionary + +**Returns:** dict: Response object + +**Raises:** + +- `RuntimeError`: If request parameter is not specified + +### `start(self, blueprint_name, id, description, parameters=None)` + +Start a new flow instance from a blueprint. + +**Arguments:** + +- `blueprint_name`: Name of the blueprint to instantiate +- `id`: Unique identifier for the flow instance +- `description`: Human-readable description +- `parameters`: Optional parameters dictionary + +**Example:** + +```python +api.flow().start( + blueprint_name="default", + id="my-flow", + description="My custom flow", + parameters={"model": "gpt-4"} +) +``` + +### `stop(self, id)` + +Stop a running flow instance. + +**Arguments:** + +- `id`: Flow instance ID to stop + +**Example:** + +```python +api.flow().stop("my-flow") +``` + + +--- + +## `FlowInstance` + +```python +from trustgraph.api import FlowInstance +``` + +Flow instance client for executing services on a specific flow. + +This class provides access to all TrustGraph services including: +- Text completion and embeddings +- Agent operations with state management +- Graph and document RAG queries +- Knowledge graph operations (triples, objects) +- Document loading and processing +- Natural language to GraphQL query conversion +- Structured data analysis and schema detection +- MCP tool execution +- Prompt templating + +Services are accessed through a running flow instance identified by ID. + +### Methods + +### `__init__(self, api, id)` + +Initialize FlowInstance. + +**Arguments:** + +- `api`: Parent Flow client +- `id`: Flow instance identifier + +### `agent(self, question, user='trustgraph', state=None, group=None, history=None)` + +Execute an agent operation with reasoning and tool use capabilities. + +Agents can perform multi-step reasoning, use tools, and maintain conversation +state across interactions. This is a synchronous non-streaming version. + +**Arguments:** + +- `question`: User question or instruction +- `user`: User identifier (default: "trustgraph") +- `state`: Optional state dictionary for stateful conversations +- `group`: Optional group identifier for multi-user contexts +- `history`: Optional conversation history as list of message dicts + +**Returns:** str: Agent's final answer + +**Example:** + +```python +flow = api.flow().id("default") + +# Simple question +answer = flow.agent( + question="What is the capital of France?", + user="trustgraph" +) + +# With conversation history +history = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi! How can I help?"} +] +answer = flow.agent( + question="Tell me about Paris", + user="trustgraph", + history=history +) +``` + +### `detect_type(self, sample)` + +Detect the data type of a structured data sample. + +**Arguments:** + +- `sample`: Data sample to analyze (string content) + +**Returns:** dict with detected_type, confidence, and optional metadata + +### `diagnose_data(self, sample, schema_name=None, options=None)` + +Perform combined data diagnosis: detect type and generate descriptor. + +**Arguments:** + +- `sample`: Data sample to analyze (string content) +- `schema_name`: Optional target schema name for descriptor generation +- `options`: Optional parameters (e.g., delimiter for CSV) + +**Returns:** dict with detected_type, confidence, descriptor, and metadata + +### `document_rag(self, query, user='trustgraph', collection='default', doc_limit=10)` + +Execute document-based Retrieval-Augmented Generation (RAG) query. + +Document RAG uses vector embeddings to find relevant document chunks, +then generates a response using an LLM with those chunks as context. + +**Arguments:** + +- `query`: Natural language query +- `user`: User/keyspace identifier (default: "trustgraph") +- `collection`: Collection identifier (default: "default") +- `doc_limit`: Maximum document chunks to retrieve (default: 10) + +**Returns:** str: Generated response incorporating document context + +**Example:** + +```python +flow = api.flow().id("default") +response = flow.document_rag( + query="Summarize the key findings", + user="trustgraph", + collection="research-papers", + doc_limit=5 +) +print(response) +``` + +### `embeddings(self, text)` + +Generate vector embeddings for text. + +Converts text into dense vector representations suitable for semantic +search and similarity comparison. + +**Arguments:** + +- `text`: Input text to embed + +**Returns:** list[float]: Vector embedding + +**Example:** + +```python +flow = api.flow().id("default") +vectors = flow.embeddings("quantum computing") +print(f"Embedding dimension: {len(vectors)}") +``` + +### `generate_descriptor(self, sample, data_type, schema_name, options=None)` + +Generate a descriptor for structured data mapping to a specific schema. + +**Arguments:** + +- `sample`: Data sample to analyze (string content) +- `data_type`: Data type (csv, json, xml) +- `schema_name`: Target schema name for descriptor generation +- `options`: Optional parameters (e.g., delimiter for CSV) + +**Returns:** dict with descriptor and metadata + +### `graph_embeddings_query(self, text, user, collection, limit=10)` + +Query knowledge graph entities using semantic similarity. + +Finds entities in the knowledge graph whose descriptions are semantically +similar to the input text, using vector embeddings. + +**Arguments:** + +- `text`: Query text for semantic search +- `user`: User/keyspace identifier +- `collection`: Collection identifier +- `limit`: Maximum number of results (default: 10) + +**Returns:** dict: Query results with similar entities + +**Example:** + +```python +flow = api.flow().id("default") +results = flow.graph_embeddings_query( + text="physicist who discovered radioactivity", + user="trustgraph", + collection="scientists", + limit=5 +) +``` + +### `graph_rag(self, query, user='trustgraph', collection='default', entity_limit=50, triple_limit=30, max_subgraph_size=150, max_path_length=2)` + +Execute graph-based Retrieval-Augmented Generation (RAG) query. + +Graph RAG uses knowledge graph structure to find relevant context by +traversing entity relationships, then generates a response using an LLM. + +**Arguments:** + +- `query`: Natural language query +- `user`: User/keyspace identifier (default: "trustgraph") +- `collection`: Collection identifier (default: "default") +- `entity_limit`: Maximum entities to retrieve (default: 50) +- `triple_limit`: Maximum triples per entity (default: 30) +- `max_subgraph_size`: Maximum total triples in subgraph (default: 150) +- `max_path_length`: Maximum traversal depth (default: 2) + +**Returns:** str: Generated response incorporating graph context + +**Example:** + +```python +flow = api.flow().id("default") +response = flow.graph_rag( + query="Tell me about Marie Curie's discoveries", + user="trustgraph", + collection="scientists", + entity_limit=20, + max_path_length=3 +) +print(response) +``` + +### `load_document(self, document, id=None, metadata=None, user=None, collection=None)` + +Load a binary document for processing. + +Uploads a document (PDF, DOCX, images, etc.) for extraction and +processing through the flow's document pipeline. + +**Arguments:** + +- `document`: Document content as bytes +- `id`: Optional document identifier (auto-generated if None) +- `metadata`: Optional metadata (list of Triples or object with emit method) +- `user`: User/keyspace identifier (optional) +- `collection`: Collection identifier (optional) + +**Returns:** dict: Processing response + +**Raises:** + +- `RuntimeError`: If metadata is provided without id + +**Example:** + +```python +flow = api.flow().id("default") + +# Load a PDF document +with open("research.pdf", "rb") as f: + result = flow.load_document( + document=f.read(), + id="research-001", + user="trustgraph", + collection="papers" + ) +``` + +### `load_text(self, text, id=None, metadata=None, charset='utf-8', user=None, collection=None)` + +Load text content for processing. + +Uploads text content for extraction and processing through the flow's +text pipeline. + +**Arguments:** + +- `text`: Text content as bytes +- `id`: Optional document identifier (auto-generated if None) +- `metadata`: Optional metadata (list of Triples or object with emit method) +- `charset`: Character encoding (default: "utf-8") +- `user`: User/keyspace identifier (optional) +- `collection`: Collection identifier (optional) + +**Returns:** dict: Processing response + +**Raises:** + +- `RuntimeError`: If metadata is provided without id + +**Example:** + +```python +flow = api.flow().id("default") + +# Load text content +text_content = b"This is the document content..." +result = flow.load_text( + text=text_content, + id="text-001", + charset="utf-8", + user="trustgraph", + collection="documents" +) +``` + +### `mcp_tool(self, name, parameters={})` + +Execute a Model Context Protocol (MCP) tool. + +MCP tools provide extensible functionality for agents and workflows, +allowing integration with external systems and services. + +**Arguments:** + +- `name`: Tool name/identifier +- `parameters`: Tool parameters dictionary (default: {}) + +**Returns:** str or dict: Tool execution result + +**Raises:** + +- `ProtocolException`: If response format is invalid + +**Example:** + +```python +flow = api.flow().id("default") + +# Execute a tool +result = flow.mcp_tool( + name="search-web", + parameters={"query": "latest AI news", "limit": 5} +) +``` + +### `nlp_query(self, question, max_results=100)` + +Convert a natural language question to a GraphQL query. + +**Arguments:** + +- `question`: Natural language question +- `max_results`: Maximum number of results to return (default: 100) + +**Returns:** dict with graphql_query, variables, detected_schemas, confidence + +### `objects_query(self, query, user='trustgraph', collection='default', variables=None, operation_name=None)` + +Execute a GraphQL query against structured objects in the knowledge graph. + +Queries structured data using GraphQL syntax, allowing complex queries +with filtering, aggregation, and relationship traversal. + +**Arguments:** + +- `query`: GraphQL query string +- `user`: User/keyspace identifier (default: "trustgraph") +- `collection`: Collection identifier (default: "default") +- `variables`: Optional query variables dictionary +- `operation_name`: Optional operation name for multi-operation documents + +**Returns:** dict: GraphQL response with 'data', 'errors', and/or 'extensions' fields + +**Raises:** + +- `ProtocolException`: If system-level error occurs + +**Example:** + +```python +flow = api.flow().id("default") + +# Simple query +query = ''' +{ + scientists(limit: 10) { + name + field + discoveries + } +} +''' +result = flow.objects_query( + query=query, + user="trustgraph", + collection="scientists" +) + +# Query with variables +query = ''' +query GetScientist($name: String!) { + scientists(name: $name) { + name + nobelPrizes + } +} +''' +result = flow.objects_query( + query=query, + variables={"name": "Marie Curie"} +) +``` + +### `prompt(self, id, variables)` + +Execute a prompt template with variable substitution. + +Prompt templates allow reusable prompt patterns with dynamic variable +substitution, useful for consistent prompt engineering. + +**Arguments:** + +- `id`: Prompt template identifier +- `variables`: Dictionary of variable name to value mappings + +**Returns:** str or dict: Rendered prompt result (text or structured object) + +**Raises:** + +- `ProtocolException`: If response format is invalid + +**Example:** + +```python +flow = api.flow().id("default") + +# Text template +result = flow.prompt( + id="summarize-template", + variables={"topic": "quantum computing", "length": "brief"} +) + +# Structured template +result = flow.prompt( + id="extract-entities", + variables={"text": "Marie Curie won Nobel Prizes"} +) +``` + +### `request(self, path, request)` + +Make a service request on this flow instance. + +**Arguments:** + +- `path`: Service path (e.g., "service/text-completion") +- `request`: Request payload dictionary + +**Returns:** dict: Service response + +### `schema_selection(self, sample, options=None)` + +Select matching schemas for a data sample using prompt analysis. + +**Arguments:** + +- `sample`: Data sample to analyze (string content) +- `options`: Optional parameters + +**Returns:** dict with schema_matches array and metadata + +### `structured_query(self, question, user='trustgraph', collection='default')` + +Execute a natural language question against structured data. +Combines NLP query conversion and GraphQL execution. + +**Arguments:** + +- `question`: Natural language question +- `user`: Cassandra keyspace identifier (default: "trustgraph") +- `collection`: Data collection identifier (default: "default") + +**Returns:** dict with data and optional errors + +### `text_completion(self, system, prompt)` + +Execute text completion using the flow's LLM. + +**Arguments:** + +- `system`: System prompt defining the assistant's behavior +- `prompt`: User prompt/question + +**Returns:** str: Generated response text + +**Example:** + +```python +flow = api.flow().id("default") +response = flow.text_completion( + system="You are a helpful assistant", + prompt="What is quantum computing?" +) +print(response) +``` + +### `triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=10000)` + +Query knowledge graph triples using pattern matching. + +Searches for RDF triples matching the given subject, predicate, and/or +object patterns. Unspecified parameters act as wildcards. + +**Arguments:** + +- `s`: Subject URI (optional, use None for wildcard) +- `p`: Predicate URI (optional, use None for wildcard) +- `o`: Object URI or Literal (optional, use None for wildcard) +- `user`: User/keyspace identifier (optional) +- `collection`: Collection identifier (optional) +- `limit`: Maximum results to return (default: 10000) + +**Returns:** list[Triple]: List of matching Triple objects + +**Raises:** + +- `RuntimeError`: If s or p is not a Uri, or o is not Uri/Literal + +**Example:** + +```python +from trustgraph.knowledge import Uri, Literal + +flow = api.flow().id("default") + +# Find all triples about a specific subject +triples = flow.triples_query( + s=Uri("http://example.org/person/marie-curie"), + user="trustgraph", + collection="scientists" +) + +# Find all instances of a specific relationship +triples = flow.triples_query( + p=Uri("http://example.org/ontology/discovered"), + limit=100 +) +``` + + +--- + +## `AsyncFlow` + +```python +from trustgraph.api import AsyncFlow +``` + +Asynchronous REST-based flow interface + +### Methods + +### `__init__(self, url: str, timeout: int, token: Optional[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + +### `aclose(self) -> None` + +Close connection (cleanup handled by aiohttp session) + +### `delete_class(self, class_name: str)` + +Delete flow class + +### `get(self, id: str) -> Dict[str, Any]` + +Get flow definition + +### `get_class(self, class_name: str) -> Dict[str, Any]` + +Get flow class definition + +### `id(self, flow_id: str)` + +Get async flow instance + +### `list(self) -> List[str]` + +List all flows + +### `list_classes(self) -> List[str]` + +List flow classes + +### `put_class(self, class_name: str, definition: Dict[str, Any])` + +Create/update flow class + +### `request(self, path: str, request_data: Dict[str, Any]) -> Dict[str, Any]` + +Make async HTTP request to Gateway API + +### `start(self, class_name: str, id: str, description: str, parameters: Optional[Dict] = None)` + +Start a flow + +### `stop(self, id: str)` + +Stop a flow + + +--- + +## `AsyncFlowInstance` + +```python +from trustgraph.api import AsyncFlowInstance +``` + +Asynchronous REST flow instance + +### Methods + +### `__init__(self, flow: trustgraph.api.async_flow.AsyncFlow, flow_id: str)` + +Initialize self. See help(type(self)) for accurate signature. + +### `agent(self, question: str, user: str, state: Optional[Dict] = None, group: Optional[str] = None, history: Optional[List] = None, **kwargs: Any) -> Dict[str, Any]` + +Execute agent (non-streaming, use async_socket for streaming) + +### `document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, **kwargs: Any) -> str` + +Document RAG (non-streaming, use async_socket for streaming) + +### `embeddings(self, text: str, **kwargs: Any)` + +Generate text embeddings + +### `graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any)` + +Query graph embeddings for semantic search + +### `graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, **kwargs: Any) -> str` + +Graph RAG (non-streaming, use async_socket for streaming) + +### `objects_query(self, query: str, user: str, collection: str, variables: Optional[Dict] = None, operation_name: Optional[str] = None, **kwargs: Any)` + +GraphQL query + +### `request(self, service: str, request_data: Dict[str, Any]) -> Dict[str, Any]` + +Make request to flow-scoped service + +### `text_completion(self, system: str, prompt: str, **kwargs: Any) -> str` + +Text completion (non-streaming, use async_socket for streaming) + +### `triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs: Any)` + +Triple pattern query + + +--- + +## `SocketClient` + +```python +from trustgraph.api import SocketClient +``` + +Synchronous WebSocket client (wraps async websockets library) + +### Methods + +### `__init__(self, url: str, timeout: int, token: Optional[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + +### `close(self) -> None` + +Close WebSocket connection + +### `flow(self, flow_id: str) -> 'SocketFlowInstance'` + +Get flow instance for WebSocket operations + + +--- + +## `SocketFlowInstance` + +```python +from trustgraph.api import SocketFlowInstance +``` + +Synchronous WebSocket flow instance with same interface as REST FlowInstance + +### Methods + +### `__init__(self, client: trustgraph.api.socket_client.SocketClient, flow_id: str) -> None` + +Initialize self. See help(type(self)) for accurate signature. + +### `agent(self, question: str, user: str, state: Optional[Dict[str, Any]] = None, group: Optional[str] = None, history: Optional[List[Dict[str, Any]]] = None, streaming: bool = False, **kwargs: Any) -> Union[Dict[str, Any], Iterator[trustgraph.api.types.StreamingChunk]]` + +Agent with optional streaming + +### `document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, streaming: bool = False, **kwargs: Any) -> Union[str, Iterator[str]]` + +Document RAG with optional streaming + +### `embeddings(self, text: str, **kwargs: Any) -> Dict[str, Any]` + +Generate text embeddings + +### `graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any) -> Dict[str, Any]` + +Query graph embeddings for semantic search + +### `graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, streaming: bool = False, **kwargs: Any) -> Union[str, Iterator[str]]` + +Graph RAG with optional streaming + +### `mcp_tool(self, name: str, parameters: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]` + +Execute MCP tool + +### `objects_query(self, query: str, user: str, collection: str, variables: Optional[Dict[str, Any]] = None, operation_name: Optional[str] = None, **kwargs: Any) -> Dict[str, Any]` + +GraphQL query + +### `prompt(self, id: str, variables: Dict[str, str], streaming: bool = False, **kwargs: Any) -> Union[str, Iterator[str]]` + +Execute prompt with optional streaming + +### `text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> Union[str, Iterator[str]]` + +Text completion with optional streaming + +### `triples_query(self, s: Optional[str] = None, p: Optional[str] = None, o: Optional[str] = None, user: Optional[str] = None, collection: Optional[str] = None, limit: int = 100, **kwargs: Any) -> Dict[str, Any]` + +Triple pattern query + + +--- + +## `AsyncSocketClient` + +```python +from trustgraph.api import AsyncSocketClient +``` + +Asynchronous WebSocket client + +### Methods + +### `__init__(self, url: str, timeout: int, token: Optional[str])` + +Initialize self. See help(type(self)) for accurate signature. + +### `aclose(self)` + +Close WebSocket connection + +### `flow(self, flow_id: str)` + +Get async flow instance for WebSocket operations + + +--- + +## `AsyncSocketFlowInstance` + +```python +from trustgraph.api import AsyncSocketFlowInstance +``` + +Asynchronous WebSocket flow instance + +### Methods + +### `__init__(self, client: trustgraph.api.async_socket_client.AsyncSocketClient, flow_id: str)` + +Initialize self. See help(type(self)) for accurate signature. + +### `agent(self, question: str, user: str, state: Optional[Dict[str, Any]] = None, group: Optional[str] = None, history: Optional[list] = None, streaming: bool = False, **kwargs) -> Union[Dict[str, Any], AsyncIterator]` + +Agent with optional streaming + +### `document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, streaming: bool = False, **kwargs)` + +Document RAG with optional streaming + +### `embeddings(self, text: str, **kwargs)` + +Generate text embeddings + +### `graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs)` + +Query graph embeddings for semantic search + +### `graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, streaming: bool = False, **kwargs)` + +Graph RAG with optional streaming + +### `mcp_tool(self, name: str, parameters: Dict[str, Any], **kwargs)` + +Execute MCP tool + +### `objects_query(self, query: str, user: str, collection: str, variables: Optional[Dict] = None, operation_name: Optional[str] = None, **kwargs)` + +GraphQL query + +### `prompt(self, id: str, variables: Dict[str, str], streaming: bool = False, **kwargs)` + +Execute prompt with optional streaming + +### `text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs)` + +Text completion with optional streaming + +### `triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs)` + +Triple pattern query + + +--- + +## `BulkClient` + +```python +from trustgraph.api import BulkClient +``` + +Synchronous bulk operations client + +### Methods + +### `__init__(self, url: str, timeout: int, token: Optional[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + +### `close(self) -> None` + +Close connections + +### `export_document_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]` + +Bulk export document embeddings via WebSocket + +### `export_entity_contexts(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]` + +Bulk export entity contexts via WebSocket + +### `export_graph_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]` + +Bulk export graph embeddings via WebSocket + +### `export_triples(self, flow: str, **kwargs: Any) -> Iterator[trustgraph.api.types.Triple]` + +Bulk export triples via WebSocket + +### `import_document_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None` + +Bulk import document embeddings via WebSocket + +### `import_entity_contexts(self, flow: str, contexts: Iterator[Dict[str, Any]], **kwargs: Any) -> None` + +Bulk import entity contexts via WebSocket + +### `import_graph_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None` + +Bulk import graph embeddings via WebSocket + +### `import_objects(self, flow: str, objects: Iterator[Dict[str, Any]], **kwargs: Any) -> None` + +Bulk import objects via WebSocket + +### `import_triples(self, flow: str, triples: Iterator[trustgraph.api.types.Triple], **kwargs: Any) -> None` + +Bulk import triples via WebSocket + + +--- + +## `AsyncBulkClient` + +```python +from trustgraph.api import AsyncBulkClient +``` + +Asynchronous bulk operations client + +### Methods + +### `__init__(self, url: str, timeout: int, token: Optional[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + +### `aclose(self) -> None` + +Close connections + +### `export_document_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]` + +Bulk export document embeddings via WebSocket + +### `export_entity_contexts(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]` + +Bulk export entity contexts via WebSocket + +### `export_graph_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]` + +Bulk export graph embeddings via WebSocket + +### `export_triples(self, flow: str, **kwargs: Any) -> AsyncIterator[trustgraph.api.types.Triple]` + +Bulk export triples via WebSocket + +### `import_document_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None` + +Bulk import document embeddings via WebSocket + +### `import_entity_contexts(self, flow: str, contexts: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None` + +Bulk import entity contexts via WebSocket + +### `import_graph_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None` + +Bulk import graph embeddings via WebSocket + +### `import_objects(self, flow: str, objects: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None` + +Bulk import objects via WebSocket + +### `import_triples(self, flow: str, triples: AsyncIterator[trustgraph.api.types.Triple], **kwargs: Any) -> None` + +Bulk import triples via WebSocket + + +--- + +## `Metrics` + +```python +from trustgraph.api import Metrics +``` + +Synchronous metrics client + +### Methods + +### `__init__(self, url: str, timeout: int, token: Optional[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + +### `get(self) -> str` + +Get Prometheus metrics as text + + +--- + +## `AsyncMetrics` + +```python +from trustgraph.api import AsyncMetrics +``` + +Asynchronous metrics client + +### Methods + +### `__init__(self, url: str, timeout: int, token: Optional[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + +### `aclose(self) -> None` + +Close connections + +### `get(self) -> str` + +Get Prometheus metrics as text + + +--- + +## `Triple` + +```python +from trustgraph.api import Triple +``` + +RDF triple representing a knowledge graph statement. + +**Fields:** + +- `s`: +- `p`: +- `o`: + +### Methods + +### `__init__(self, s: str, p: str, o: str) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `ConfigKey` + +```python +from trustgraph.api import ConfigKey +``` + +Configuration key identifier. + +**Fields:** + +- `type`: +- `key`: + +### Methods + +### `__init__(self, type: str, key: str) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `ConfigValue` + +```python +from trustgraph.api import ConfigValue +``` + +Configuration key-value pair. + +**Fields:** + +- `type`: +- `key`: +- `value`: + +### Methods + +### `__init__(self, type: str, key: str, value: str) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `DocumentMetadata` + +```python +from trustgraph.api import DocumentMetadata +``` + +Metadata for a document in the library. + +**Fields:** + +- `id`: +- `time`: +- `kind`: +- `title`: +- `comments`: +- `metadata`: typing.List[trustgraph.api.types.Triple] +- `user`: +- `tags`: typing.List[str] + +### Methods + +### `__init__(self, id: str, time: datetime.datetime, kind: str, title: str, comments: str, metadata: List[trustgraph.api.types.Triple], user: str, tags: List[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `ProcessingMetadata` + +```python +from trustgraph.api import ProcessingMetadata +``` + +Metadata for an active document processing job. + +**Fields:** + +- `id`: +- `document_id`: +- `time`: +- `flow`: +- `user`: +- `collection`: +- `tags`: typing.List[str] + +### Methods + +### `__init__(self, id: str, document_id: str, time: datetime.datetime, flow: str, user: str, collection: str, tags: List[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `CollectionMetadata` + +```python +from trustgraph.api import CollectionMetadata +``` + +Metadata for a data collection. + +Collections provide logical grouping and isolation for documents and +knowledge graph data. + +**Attributes:** + +- `name: Human`: readable collection name + +**Fields:** + +- `user`: +- `collection`: +- `name`: +- `description`: +- `tags`: typing.List[str] + +### Methods + +### `__init__(self, user: str, collection: str, name: str, description: str, tags: List[str]) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `StreamingChunk` + +```python +from trustgraph.api import StreamingChunk +``` + +Base class for streaming response chunks. + +Used for WebSocket-based streaming operations where responses are delivered +incrementally as they are generated. + +**Fields:** + +- `content`: +- `end_of_message`: + +### Methods + +### `__init__(self, content: str, end_of_message: bool = False) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `AgentThought` + +```python +from trustgraph.api import AgentThought +``` + +Agent reasoning/thought process chunk. + +Represents the agent's internal reasoning or planning steps during execution. +These chunks show how the agent is thinking about the problem. + +**Fields:** + +- `content`: +- `end_of_message`: +- `chunk_type`: + +### Methods + +### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'thought') -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `AgentObservation` + +```python +from trustgraph.api import AgentObservation +``` + +Agent tool execution observation chunk. + +Represents the result or observation from executing a tool or action. +These chunks show what the agent learned from using tools. + +**Fields:** + +- `content`: +- `end_of_message`: +- `chunk_type`: + +### Methods + +### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'observation') -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `AgentAnswer` + +```python +from trustgraph.api import AgentAnswer +``` + +Agent final answer chunk. + +Represents the agent's final response to the user's query after completing +its reasoning and tool use. + +**Attributes:** + +- `chunk_type: Always "final`: answer" + +**Fields:** + +- `content`: +- `end_of_message`: +- `chunk_type`: +- `end_of_dialog`: + +### Methods + +### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'final-answer', end_of_dialog: bool = False) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `RAGChunk` + +```python +from trustgraph.api import RAGChunk +``` + +RAG (Retrieval-Augmented Generation) streaming chunk. + +Used for streaming responses from graph RAG, document RAG, text completion, +and other generative services. + +**Fields:** + +- `content`: +- `end_of_message`: +- `chunk_type`: +- `end_of_stream`: +- `error`: typing.Optional[typing.Dict[str, str]] + +### Methods + +### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'rag', end_of_stream: bool = False, error: Optional[Dict[str, str]] = None) -> None` + +Initialize self. See help(type(self)) for accurate signature. + + +--- + +## `ProtocolException` + +```python +from trustgraph.api import ProtocolException +``` + +Raised when WebSocket protocol errors occur + + +--- + +## `TrustGraphException` + +```python +from trustgraph.api import TrustGraphException +``` + +Base class for all TrustGraph service errors + + +--- + +## `AgentError` + +```python +from trustgraph.api import AgentError +``` + +Agent service error + + +--- + +## `ConfigError` + +```python +from trustgraph.api import ConfigError +``` + +Configuration service error + + +--- + +## `DocumentRagError` + +```python +from trustgraph.api import DocumentRagError +``` + +Document RAG retrieval error + + +--- + +## `FlowError` + +```python +from trustgraph.api import FlowError +``` + +Flow management error + + +--- + +## `GatewayError` + +```python +from trustgraph.api import GatewayError +``` + +API Gateway error + + +--- + +## `GraphRagError` + +```python +from trustgraph.api import GraphRagError +``` + +Graph RAG retrieval error + + +--- + +## `LLMError` + +```python +from trustgraph.api import LLMError +``` + +LLM service error + + +--- + +## `LoadError` + +```python +from trustgraph.api import LoadError +``` + +Data loading error + + +--- + +## `LookupError` + +```python +from trustgraph.api import LookupError +``` + +Lookup/search error + + +--- + +## `NLPQueryError` + +```python +from trustgraph.api import NLPQueryError +``` + +NLP query service error + + +--- + +## `ObjectsQueryError` + +```python +from trustgraph.api import ObjectsQueryError +``` + +Objects query service error + + +--- + +## `RequestError` + +```python +from trustgraph.api import RequestError +``` + +Request processing error + + +--- + +## `StructuredQueryError` + +```python +from trustgraph.api import StructuredQueryError +``` + +Structured query service error + + +--- + +## `UnexpectedError` + +```python +from trustgraph.api import UnexpectedError +``` + +Unexpected/unknown error + + +--- + +## `ApplicationException` + +```python +from trustgraph.api import ApplicationException +``` + +Base class for all TrustGraph service errors + + +--- + diff --git a/trustgraph-base/trustgraph/api/__init__.py b/trustgraph-base/trustgraph/api/__init__.py index 0ecb760e..93466cd2 100644 --- a/trustgraph-base/trustgraph/api/__init__.py +++ b/trustgraph-base/trustgraph/api/__init__.py @@ -1,3 +1,55 @@ +""" +TrustGraph API Client Library + +This package provides Python client interfaces for interacting with TrustGraph services. +TrustGraph is a knowledge graph and RAG (Retrieval-Augmented Generation) platform that +combines graph databases, vector embeddings, and LLM capabilities. + +The library offers both synchronous and asynchronous APIs for: +- Flow management and execution +- Knowledge graph operations (triples, entities, embeddings) +- RAG queries (graph-based and document-based) +- Agent interactions with streaming support +- WebSocket-based real-time communication +- Bulk import/export operations +- Configuration and collection management + +Quick Start: + ```python + from trustgraph.api import Api + + # Create API client + api = Api(url="http://localhost:8088/") + + # Get a flow instance + flow = api.flow().id("default") + + # Execute a graph RAG query + response = flow.graph_rag( + query="What are the main topics?", + user="trustgraph", + collection="default" + ) + ``` + +For streaming and async operations: + ```python + # WebSocket streaming + socket = api.socket() + flow = socket.flow("default") + + for chunk in flow.agent(question="Hello", user="trustgraph"): + print(chunk.content) + + # Async operations + async with Api(url="http://localhost:8088/") as api: + async_flow = api.async_flow() + result = await async_flow.id("default").text_completion( + system="You are helpful", + prompt="Hello" + ) + ``` +""" # Core API from .api import Api diff --git a/trustgraph-base/trustgraph/api/api.py b/trustgraph-base/trustgraph/api/api.py index d1f07513..dbdce0a8 100644 --- a/trustgraph-base/trustgraph/api/api.py +++ b/trustgraph-base/trustgraph/api/api.py @@ -1,3 +1,8 @@ +""" +TrustGraph API Client + +Core API client for interacting with TrustGraph services via REST and WebSocket protocols. +""" import requests import json @@ -26,8 +31,47 @@ def check_error(response): raise ApplicationException(f"{tp}: {msg}") class Api: + """ + Main TrustGraph API client for synchronous and asynchronous operations. + + This class provides access to all TrustGraph services including flow management, + knowledge graph operations, document processing, RAG queries, and more. It supports + both REST-based and WebSocket-based communication patterns. + + The client can be used as a context manager for automatic resource cleanup: + ```python + with Api(url="http://localhost:8088/") as api: + result = api.flow().id("default").graph_rag(query="test") + ``` + + Attributes: + url: Base URL for the TrustGraph API endpoint + timeout: Request timeout in seconds + token: Optional bearer token for authentication + """ def __init__(self, url="http://localhost:8088/", timeout=60, token: Optional[str] = None): + """ + Initialize the TrustGraph API client. + + Args: + url: Base URL for TrustGraph API (default: "http://localhost:8088/") + timeout: Request timeout in seconds (default: 60) + token: Optional bearer token for authentication + + Example: + ```python + # Local development + api = Api() + + # Production with authentication + api = Api( + url="https://trustgraph.example.com/", + timeout=120, + token="your-api-token" + ) + ``` + """ self.url = url @@ -49,15 +93,97 @@ class Api: self._async_metrics = None def flow(self): + """ + Get a Flow client for managing and interacting with flows. + + Flows are the primary execution units in TrustGraph, providing access to + services like agents, RAG queries, embeddings, and document processing. + + Returns: + Flow: Flow management client + + Example: + ```python + flow_client = api.flow() + + # List available blueprints + blueprints = flow_client.list_blueprints() + + # Get a specific flow instance + flow_instance = flow_client.id("default") + response = flow_instance.text_completion( + system="You are helpful", + prompt="Hello" + ) + ``` + """ return Flow(api=self) def config(self): + """ + Get a Config client for managing configuration settings. + + Returns: + Config: Configuration management client + + Example: + ```python + config = api.config() + + # Get configuration values + values = config.get([ConfigKey(type="llm", key="model")]) + + # Set configuration + config.put([ConfigValue(type="llm", key="model", value="gpt-4")]) + ``` + """ return Config(api=self) def knowledge(self): + """ + Get a Knowledge client for managing knowledge graph cores. + + Returns: + Knowledge: Knowledge graph management client + + Example: + ```python + knowledge = api.knowledge() + + # List available KG cores + cores = knowledge.list_kg_cores(user="trustgraph") + + # Load a KG core + knowledge.load_kg_core(id="core-123", user="trustgraph") + ``` + """ return Knowledge(api=self) def request(self, path, request): + """ + Make a low-level REST API request. + + This method is primarily for internal use but can be used for direct + API access when needed. + + Args: + path: API endpoint path (relative to base URL) + request: Request payload as a dictionary + + Returns: + dict: Response object + + Raises: + ProtocolException: If the response status is not 200 or response is not JSON + ApplicationException: If the response contains an error + + Example: + ```python + response = api.request("flow", { + "operation": "list-flows" + }) + ``` + """ url = f"{self.url}{path}" @@ -83,14 +209,90 @@ class Api: return object def library(self): + """ + Get a Library client for document management. + + The library provides document storage, metadata management, and + processing workflow coordination. + + Returns: + Library: Document library management client + + Example: + ```python + library = api.library() + + # Add a document + library.add_document( + document=b"Document content", + id="doc-123", + metadata=[], + user="trustgraph", + title="My Document", + comments="Test document" + ) + + # List documents + docs = library.get_documents(user="trustgraph") + ``` + """ return Library(self) def collection(self): + """ + Get a Collection client for managing data collections. + + Collections organize documents and knowledge graph data into + logical groupings for isolation and access control. + + Returns: + Collection: Collection management client + + Example: + ```python + collection = api.collection() + + # List collections + colls = collection.list_collections(user="trustgraph") + + # Update collection metadata + collection.update_collection( + user="trustgraph", + collection="default", + name="Default Collection", + description="Main data collection" + ) + ``` + """ return Collection(self) # New synchronous methods def socket(self): - """Synchronous WebSocket-based interface for streaming operations""" + """ + Get a synchronous WebSocket client for streaming operations. + + WebSocket connections provide streaming support for real-time responses + from agents, RAG queries, and text completions. This method returns a + synchronous wrapper around the WebSocket protocol. + + Returns: + SocketClient: Synchronous WebSocket client + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + # Stream agent responses + for chunk in flow.agent( + question="Explain quantum computing", + user="trustgraph", + streaming=True + ): + if hasattr(chunk, 'content'): + print(chunk.content, end='', flush=True) + ``` + """ if self._socket_client is None: from . socket_client import SocketClient # Extract base URL (remove api/v1/ suffix) @@ -99,7 +301,31 @@ class Api: return self._socket_client def bulk(self): - """Synchronous bulk operations interface for import/export""" + """ + Get a synchronous bulk operations client for import/export. + + Bulk operations allow efficient transfer of large datasets via WebSocket + connections, including triples, embeddings, entity contexts, and objects. + + Returns: + BulkClient: Synchronous bulk operations client + + Example: + ```python + bulk = api.bulk() + + # Export triples + for triple in bulk.export_triples(flow="default"): + print(f"{triple.s} {triple.p} {triple.o}") + + # Import triples + def triple_generator(): + yield Triple(s="subj", p="pred", o="obj") + # ... more triples + + bulk.import_triples(flow="default", triples=triple_generator()) + ``` + """ if self._bulk_client is None: from . bulk_client import BulkClient # Extract base URL (remove api/v1/ suffix) @@ -108,7 +334,22 @@ class Api: return self._bulk_client def metrics(self): - """Synchronous metrics interface""" + """ + Get a synchronous metrics client for monitoring. + + Retrieves Prometheus-formatted metrics from the TrustGraph service + for monitoring and observability. + + Returns: + Metrics: Synchronous metrics client + + Example: + ```python + metrics = api.metrics() + prometheus_text = metrics.get() + print(prometheus_text) + ``` + """ if self._metrics is None: from . metrics import Metrics # Extract base URL (remove api/v1/ suffix) @@ -118,14 +359,60 @@ class Api: # New asynchronous methods def async_flow(self): - """Asynchronous REST-based flow interface""" + """ + Get an asynchronous REST-based flow client. + + Provides async/await style access to flow operations. This is preferred + for async Python applications and frameworks (FastAPI, aiohttp, etc.). + + Returns: + AsyncFlow: Asynchronous flow client + + Example: + ```python + async_flow = api.async_flow() + + # List flows + flow_ids = await async_flow.list() + + # Execute operations + instance = async_flow.id("default") + result = await instance.text_completion( + system="You are helpful", + prompt="Hello" + ) + ``` + """ if self._async_flow is None: from . async_flow import AsyncFlow self._async_flow = AsyncFlow(self.url, self.timeout, self.token) return self._async_flow def async_socket(self): - """Asynchronous WebSocket-based interface for streaming operations""" + """ + Get an asynchronous WebSocket client for streaming operations. + + Provides async/await style WebSocket access with streaming support. + This is the preferred method for async streaming in Python. + + Returns: + AsyncSocketClient: Asynchronous WebSocket client + + Example: + ```python + async_socket = api.async_socket() + flow = async_socket.flow("default") + + # Stream agent responses + async for chunk in flow.agent( + question="Explain quantum computing", + user="trustgraph", + streaming=True + ): + if hasattr(chunk, 'content'): + print(chunk.content, end='', flush=True) + ``` + """ if self._async_socket_client is None: from . async_socket_client import AsyncSocketClient # Extract base URL (remove api/v1/ suffix) @@ -134,7 +421,34 @@ class Api: return self._async_socket_client def async_bulk(self): - """Asynchronous bulk operations interface for import/export""" + """ + Get an asynchronous bulk operations client. + + Provides async/await style bulk import/export operations via WebSocket + for efficient handling of large datasets. + + Returns: + AsyncBulkClient: Asynchronous bulk operations client + + Example: + ```python + async_bulk = api.async_bulk() + + # Export triples asynchronously + async for triple in async_bulk.export_triples(flow="default"): + print(f"{triple.s} {triple.p} {triple.o}") + + # Import with async generator + async def triple_gen(): + yield Triple(s="subj", p="pred", o="obj") + # ... more triples + + await async_bulk.import_triples( + flow="default", + triples=triple_gen() + ) + ``` + """ if self._async_bulk_client is None: from . async_bulk_client import AsyncBulkClient # Extract base URL (remove api/v1/ suffix) @@ -143,7 +457,21 @@ class Api: return self._async_bulk_client def async_metrics(self): - """Asynchronous metrics interface""" + """ + Get an asynchronous metrics client. + + Provides async/await style access to Prometheus metrics. + + Returns: + AsyncMetrics: Asynchronous metrics client + + Example: + ```python + async_metrics = api.async_metrics() + prometheus_text = await async_metrics.get() + print(prometheus_text) + ``` + """ if self._async_metrics is None: from . async_metrics import AsyncMetrics # Extract base URL (remove api/v1/ suffix) @@ -153,14 +481,52 @@ class Api: # Resource management def close(self): - """Close all synchronous connections""" + """ + Close all synchronous client connections. + + This method closes WebSocket and bulk operation connections. + It is automatically called when exiting a context manager. + + Example: + ```python + api = Api() + socket = api.socket() + # ... use socket + api.close() # Clean up connections + + # Or use context manager (automatic cleanup) + with Api() as api: + socket = api.socket() + # ... use socket + # Automatically closed + ``` + """ if self._socket_client: self._socket_client.close() if self._bulk_client: self._bulk_client.close() async def aclose(self): - """Close all asynchronous connections""" + """ + Close all asynchronous client connections. + + This method closes async WebSocket, bulk operation, and flow connections. + It is automatically called when exiting an async context manager. + + Example: + ```python + api = Api() + async_socket = api.async_socket() + # ... use async_socket + await api.aclose() # Clean up connections + + # Or use async context manager (automatic cleanup) + async with Api() as api: + async_socket = api.async_socket() + # ... use async_socket + # Automatically closed + ``` + """ if self._async_socket_client: await self._async_socket_client.aclose() if self._async_bulk_client: @@ -170,13 +536,17 @@ class Api: # Context manager support def __enter__(self): + """Enter synchronous context manager.""" return self def __exit__(self, *args): + """Exit synchronous context manager and close connections.""" self.close() async def __aenter__(self): + """Enter asynchronous context manager.""" return self async def __aexit__(self, *args): + """Exit asynchronous context manager and close connections.""" await self.aclose() diff --git a/trustgraph-base/trustgraph/api/async_flow.py b/trustgraph-base/trustgraph/api/async_flow.py index 5d3cd486..6b28886b 100644 --- a/trustgraph-base/trustgraph/api/async_flow.py +++ b/trustgraph-base/trustgraph/api/async_flow.py @@ -1,3 +1,14 @@ +""" +TrustGraph Asynchronous Flow Management + +This module provides async/await based interfaces for managing and interacting +with TrustGraph flows using REST API calls. Unlike async_socket_client which +provides streaming support, this module is focused on non-streaming operations +that return complete responses. + +For streaming support (e.g., real-time agent responses, streaming RAG), use +AsyncSocketClient instead. +""" import aiohttp import json @@ -18,15 +29,47 @@ def check_error(response): class AsyncFlow: - """Asynchronous REST-based flow interface""" + """ + Asynchronous flow management client using REST API. + + Provides async/await based flow management operations including listing, + starting, stopping flows, and managing flow class definitions. Also provides + access to flow-scoped services like agents, RAG, and queries via non-streaming + REST endpoints. + + Note: For streaming support, use AsyncSocketClient instead. + """ def __init__(self, url: str, timeout: int, token: Optional[str]) -> None: + """ + Initialize async flow client. + + Args: + url: Base URL for TrustGraph API + timeout: Request timeout in seconds + token: Optional bearer token for authentication + """ self.url: str = url self.timeout: int = timeout self.token: Optional[str] = token async def request(self, path: str, request_data: Dict[str, Any]) -> Dict[str, Any]: - """Make async HTTP request to Gateway API""" + """ + Make async HTTP POST request to Gateway API. + + Internal method for making authenticated requests to the TrustGraph API. + + Args: + path: API endpoint path (relative to base URL) + request_data: Request payload dictionary + + Returns: + dict: Response object from API + + Raises: + ProtocolException: If HTTP status is not 200 or response is not valid JSON + ApplicationException: If API returns an error response + """ url = f"{self.url}{path}" headers = {"Content-Type": "application/json"} @@ -49,12 +92,49 @@ class AsyncFlow: return obj async def list(self) -> List[str]: - """List all flows""" + """ + List all flow identifiers. + + Retrieves IDs of all flows currently deployed in the system. + + Returns: + list[str]: List of flow identifiers + + Example: + ```python + async_flow = await api.async_flow() + + # List all flows + flows = await async_flow.list() + print(f"Available flows: {flows}") + ``` + """ result = await self.request("flow", {"operation": "list-flows"}) return result.get("flow-ids", []) async def get(self, id: str) -> Dict[str, Any]: - """Get flow definition""" + """ + Get flow definition. + + Retrieves the complete flow configuration including its class name, + description, and parameters. + + Args: + id: Flow identifier + + Returns: + dict: Flow definition object + + Example: + ```python + async_flow = await api.async_flow() + + # Get flow definition + flow_def = await async_flow.get("default") + print(f"Flow class: {flow_def.get('class-name')}") + print(f"Description: {flow_def.get('description')}") + ``` + """ result = await self.request("flow", { "operation": "get-flow", "flow-id": id @@ -62,7 +142,31 @@ class AsyncFlow: return json.loads(result.get("flow", "{}")) async def start(self, class_name: str, id: str, description: str, parameters: Optional[Dict] = None): - """Start a flow""" + """ + Start a new flow instance. + + Creates and starts a flow from a flow class definition with the specified + parameters. + + Args: + class_name: Flow class name to instantiate + id: Identifier for the new flow instance + description: Human-readable description of the flow + parameters: Optional configuration parameters for the flow + + Example: + ```python + async_flow = await api.async_flow() + + # Start a flow from a class + await async_flow.start( + class_name="default", + id="my-flow", + description="Custom flow instance", + parameters={"model": "claude-3-opus"} + ) + ``` + """ request_data = { "operation": "start-flow", "flow-id": id, @@ -75,19 +179,70 @@ class AsyncFlow: await self.request("flow", request_data) async def stop(self, id: str): - """Stop a flow""" + """ + Stop a running flow. + + Stops and removes a flow instance, freeing its resources. + + Args: + id: Flow identifier to stop + + Example: + ```python + async_flow = await api.async_flow() + + # Stop a flow + await async_flow.stop("my-flow") + ``` + """ await self.request("flow", { "operation": "stop-flow", "flow-id": id }) async def list_classes(self) -> List[str]: - """List flow classes""" + """ + List all flow class names. + + Retrieves names of all flow classes (blueprints) available in the system. + + Returns: + list[str]: List of flow class names + + Example: + ```python + async_flow = await api.async_flow() + + # List available flow classes + classes = await async_flow.list_classes() + print(f"Available flow classes: {classes}") + ``` + """ result = await self.request("flow", {"operation": "list-classes"}) return result.get("class-names", []) async def get_class(self, class_name: str) -> Dict[str, Any]: - """Get flow class definition""" + """ + Get flow class definition. + + Retrieves the blueprint definition for a flow class, including its + configuration schema and service bindings. + + Args: + class_name: Flow class name + + Returns: + dict: Flow class definition object + + Example: + ```python + async_flow = await api.async_flow() + + # Get flow class definition + class_def = await async_flow.get_class("default") + print(f"Services: {class_def.get('services')}") + ``` + """ result = await self.request("flow", { "operation": "get-class", "class-name": class_name @@ -95,7 +250,29 @@ class AsyncFlow: return json.loads(result.get("class-definition", "{}")) async def put_class(self, class_name: str, definition: Dict[str, Any]): - """Create/update flow class""" + """ + Create or update a flow class definition. + + Stores a flow class blueprint that can be used to instantiate flows. + + Args: + class_name: Flow class name + definition: Flow class definition object + + Example: + ```python + async_flow = await api.async_flow() + + # Create a custom flow class + class_def = { + "services": { + "agent": {"module": "agent", "config": {...}}, + "graph-rag": {"module": "graph-rag", "config": {...}} + } + } + await async_flow.put_class("custom-flow", class_def) + ``` + """ await self.request("flow", { "operation": "put-class", "class-name": class_name, @@ -103,35 +280,145 @@ class AsyncFlow: }) async def delete_class(self, class_name: str): - """Delete flow class""" + """ + Delete a flow class definition. + + Removes a flow class blueprint from the system. Does not affect + running flow instances. + + Args: + class_name: Flow class name to delete + + Example: + ```python + async_flow = await api.async_flow() + + # Delete a flow class + await async_flow.delete_class("old-flow-class") + ``` + """ await self.request("flow", { "operation": "delete-class", "class-name": class_name }) def id(self, flow_id: str): - """Get async flow instance""" + """ + Get an async flow instance client. + + Returns a client for interacting with a specific flow's services + (agent, RAG, queries, embeddings, etc.). + + Args: + flow_id: Flow identifier + + Returns: + AsyncFlowInstance: Client for flow-specific operations + + Example: + ```python + async_flow = await api.async_flow() + + # Get flow instance + flow = async_flow.id("default") + + # Use flow services + result = await flow.graph_rag( + query="What is TrustGraph?", + user="trustgraph", + collection="default" + ) + ``` + """ return AsyncFlowInstance(self, flow_id) async def aclose(self) -> None: - """Close connection (cleanup handled by aiohttp session)""" + """ + Close async client and cleanup resources. + + Note: Cleanup is handled automatically by aiohttp session context managers. + This method is provided for consistency with other async clients. + """ pass class AsyncFlowInstance: - """Asynchronous REST flow instance""" + """ + Asynchronous flow instance client. + + Provides async/await access to flow-scoped services including agents, + RAG queries, embeddings, and graph queries. All operations return complete + responses (non-streaming). + + Note: For streaming support, use AsyncSocketFlowInstance instead. + """ def __init__(self, flow: AsyncFlow, flow_id: str): + """ + Initialize async flow instance. + + Args: + flow: Parent AsyncFlow client + flow_id: Flow identifier + """ self.flow = flow self.flow_id = flow_id async def request(self, service: str, request_data: Dict[str, Any]) -> Dict[str, Any]: - """Make request to flow-scoped service""" + """ + Make request to a flow-scoped service. + + Internal method for calling services within this flow instance. + + Args: + service: Service name (e.g., "agent", "graph-rag", "triples") + request_data: Service request payload + + Returns: + dict: Service response object + + Raises: + ProtocolException: If request fails or response is invalid + ApplicationException: If service returns an error + """ return await self.flow.request(f"flow/{self.flow_id}/service/{service}", request_data) async def agent(self, question: str, user: str, state: Optional[Dict] = None, group: Optional[str] = None, history: Optional[List] = None, **kwargs: Any) -> Dict[str, Any]: - """Execute agent (non-streaming, use async_socket for streaming)""" + """ + Execute an agent operation (non-streaming). + + Runs an agent to answer a question, with optional conversation state and + history. Returns the complete response after the agent has finished + processing. + + Note: This method does not support streaming. For real-time agent thoughts + and observations, use AsyncSocketFlowInstance.agent() instead. + + Args: + question: User question or instruction + user: User identifier + state: Optional state dictionary for conversation context + group: Optional group identifier for session management + history: Optional conversation history list + **kwargs: Additional service-specific parameters + + Returns: + dict: Complete agent response including answer and metadata + + Example: + ```python + async_flow = await api.async_flow() + flow = async_flow.id("default") + + # Execute agent + result = await flow.agent( + question="What is the capital of France?", + user="trustgraph" + ) + print(f"Answer: {result.get('response')}") + ``` + """ request_data = { "question": question, "user": user, @@ -148,7 +435,36 @@ class AsyncFlowInstance: return await self.request("agent", request_data) async def text_completion(self, system: str, prompt: str, **kwargs: Any) -> str: - """Text completion (non-streaming, use async_socket for streaming)""" + """ + Generate text completion (non-streaming). + + Generates a text response from an LLM given a system prompt and user prompt. + Returns the complete response text. + + Note: This method does not support streaming. For streaming text generation, + use AsyncSocketFlowInstance.text_completion() instead. + + Args: + system: System prompt defining the LLM's behavior + prompt: User prompt or question + **kwargs: Additional service-specific parameters + + Returns: + str: Complete generated text response + + Example: + ```python + async_flow = await api.async_flow() + flow = async_flow.id("default") + + # Generate text + response = await flow.text_completion( + system="You are a helpful assistant.", + prompt="Explain quantum computing in simple terms." + ) + print(response) + ``` + """ request_data = { "system": system, "prompt": prompt, @@ -162,7 +478,43 @@ class AsyncFlowInstance: async def graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, **kwargs: Any) -> str: - """Graph RAG (non-streaming, use async_socket for streaming)""" + """ + Execute graph-based RAG query (non-streaming). + + Performs Retrieval-Augmented Generation using knowledge graph data. + Identifies relevant entities and their relationships, then generates a + response grounded in the graph structure. Returns complete response. + + Note: This method does not support streaming. For streaming RAG responses, + use AsyncSocketFlowInstance.graph_rag() instead. + + Args: + query: User query text + user: User identifier + collection: Collection identifier containing the knowledge graph + max_subgraph_size: Maximum number of triples per subgraph (default: 1000) + max_subgraph_count: Maximum number of subgraphs to retrieve (default: 5) + max_entity_distance: Maximum graph distance for entity expansion (default: 3) + **kwargs: Additional service-specific parameters + + Returns: + str: Complete generated response grounded in graph data + + Example: + ```python + async_flow = await api.async_flow() + flow = async_flow.id("default") + + # Query knowledge graph + response = await flow.graph_rag( + query="What are the relationships between these entities?", + user="trustgraph", + collection="medical-kb", + max_subgraph_count=3 + ) + print(response) + ``` + """ request_data = { "query": query, "user": user, @@ -179,7 +531,41 @@ class AsyncFlowInstance: async def document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, **kwargs: Any) -> str: - """Document RAG (non-streaming, use async_socket for streaming)""" + """ + Execute document-based RAG query (non-streaming). + + Performs Retrieval-Augmented Generation using document embeddings. + Retrieves relevant document chunks via semantic search, then generates + a response grounded in the retrieved documents. Returns complete response. + + Note: This method does not support streaming. For streaming RAG responses, + use AsyncSocketFlowInstance.document_rag() instead. + + Args: + query: User query text + user: User identifier + collection: Collection identifier containing documents + doc_limit: Maximum number of document chunks to retrieve (default: 10) + **kwargs: Additional service-specific parameters + + Returns: + str: Complete generated response grounded in document data + + Example: + ```python + async_flow = await api.async_flow() + flow = async_flow.id("default") + + # Query documents + response = await flow.document_rag( + query="What does the documentation say about authentication?", + user="trustgraph", + collection="docs", + doc_limit=5 + ) + print(response) + ``` + """ request_data = { "query": query, "user": user, @@ -193,7 +579,39 @@ class AsyncFlowInstance: return result.get("response", "") async def graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any): - """Query graph embeddings for semantic search""" + """ + Query graph embeddings for semantic entity search. + + Performs semantic search over graph entity embeddings to find entities + most relevant to the input text. Returns entities ranked by similarity. + + Args: + text: Query text for semantic search + user: User identifier + collection: Collection identifier containing graph embeddings + limit: Maximum number of results to return (default: 10) + **kwargs: Additional service-specific parameters + + Returns: + dict: Response containing ranked entity matches with similarity scores + + Example: + ```python + async_flow = await api.async_flow() + flow = async_flow.id("default") + + # Find related entities + results = await flow.graph_embeddings_query( + text="machine learning algorithms", + user="trustgraph", + collection="tech-kb", + limit=5 + ) + + for entity in results.get("entities", []): + print(f"{entity['name']}: {entity['score']}") + ``` + """ request_data = { "text": text, "user": user, @@ -205,14 +623,72 @@ class AsyncFlowInstance: return await self.request("graph-embeddings", request_data) async def embeddings(self, text: str, **kwargs: Any): - """Generate text embeddings""" + """ + Generate embeddings for input text. + + Converts text into a numerical vector representation using the flow's + configured embedding model. Useful for semantic search and similarity + comparisons. + + Args: + text: Input text to embed + **kwargs: Additional service-specific parameters + + Returns: + dict: Response containing embedding vector and metadata + + Example: + ```python + async_flow = await api.async_flow() + flow = async_flow.id("default") + + # Generate embeddings + result = await flow.embeddings(text="Sample text to embed") + vector = result.get("embedding") + print(f"Embedding dimension: {len(vector)}") + ``` + """ request_data = {"text": text} request_data.update(kwargs) return await self.request("embeddings", request_data) async def triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs: Any): - """Triple pattern query""" + """ + Query RDF triples using pattern matching. + + Searches for triples matching the specified subject, predicate, and/or + object patterns. Patterns use None as a wildcard to match any value. + + Args: + s: Subject pattern (None for wildcard) + p: Predicate pattern (None for wildcard) + o: Object pattern (None for wildcard) + user: User identifier (None for all users) + collection: Collection identifier (None for all collections) + limit: Maximum number of triples to return (default: 100) + **kwargs: Additional service-specific parameters + + Returns: + dict: Response containing matching triples + + Example: + ```python + async_flow = await api.async_flow() + flow = async_flow.id("default") + + # Find all triples with a specific predicate + results = await flow.triples_query( + p="knows", + user="trustgraph", + collection="social", + limit=50 + ) + + for triple in results.get("triples", []): + print(f"{triple['s']} knows {triple['o']}") + ``` + """ request_data = {"limit": limit} if s is not None: request_data["s"] = str(s) @@ -230,7 +706,50 @@ class AsyncFlowInstance: async def objects_query(self, query: str, user: str, collection: str, variables: Optional[Dict] = None, operation_name: Optional[str] = None, **kwargs: Any): - """GraphQL query""" + """ + Execute a GraphQL query on stored objects. + + Queries structured data objects using GraphQL syntax. Supports complex + queries with variables and named operations. + + Args: + query: GraphQL query string + user: User identifier + collection: Collection identifier containing objects + variables: Optional GraphQL query variables + operation_name: Optional operation name for multi-operation queries + **kwargs: Additional service-specific parameters + + Returns: + dict: GraphQL response with data and/or errors + + Example: + ```python + async_flow = await api.async_flow() + flow = async_flow.id("default") + + # Execute GraphQL query + query = ''' + query GetUsers($status: String!) { + users(status: $status) { + id + name + email + } + } + ''' + + result = await flow.objects_query( + query=query, + user="trustgraph", + collection="users", + variables={"status": "active"} + ) + + for user in result.get("data", {}).get("users", []): + print(f"{user['name']}: {user['email']}") + ``` + """ request_data = { "query": query, "user": user, diff --git a/trustgraph-base/trustgraph/api/bulk_client.py b/trustgraph-base/trustgraph/api/bulk_client.py index a119668d..a2796332 100644 --- a/trustgraph-base/trustgraph/api/bulk_client.py +++ b/trustgraph-base/trustgraph/api/bulk_client.py @@ -1,3 +1,10 @@ +""" +TrustGraph Synchronous Bulk Operations Client + +This module provides synchronous bulk import/export operations via WebSocket +for efficient transfer of large datasets including triples, embeddings, +entity contexts, and objects. +""" import json import asyncio @@ -9,9 +16,24 @@ from . exceptions import ProtocolException class BulkClient: - """Synchronous bulk operations client""" + """ + Synchronous bulk operations client for import/export. + + Provides efficient bulk data transfer via WebSocket for large datasets. + Wraps async WebSocket operations with synchronous generators for ease of use. + + Note: For true async support, use AsyncBulkClient instead. + """ def __init__(self, url: str, timeout: int, token: Optional[str]) -> None: + """ + Initialize synchronous bulk client. + + Args: + url: Base URL for TrustGraph API (HTTP/HTTPS will be converted to WS/WSS) + timeout: WebSocket timeout in seconds + token: Optional bearer token for authentication + """ self.url: str = self._convert_to_ws_url(url) self.timeout: int = timeout self.token: Optional[str] = token @@ -41,7 +63,32 @@ class BulkClient: return loop.run_until_complete(coro) def import_triples(self, flow: str, triples: Iterator[Triple], **kwargs: Any) -> None: - """Bulk import triples via WebSocket""" + """ + Bulk import RDF triples into a flow. + + Efficiently uploads large numbers of triples via WebSocket streaming. + + Args: + flow: Flow identifier + triples: Iterator yielding Triple objects + **kwargs: Additional parameters (reserved for future use) + + Example: + ```python + from trustgraph.api import Triple + + bulk = api.bulk() + + # Generate triples to import + def triple_generator(): + yield Triple(s="subj1", p="pred", o="obj1") + yield Triple(s="subj2", p="pred", o="obj2") + # ... more triples + + # Import triples + bulk.import_triples(flow="default", triples=triple_generator()) + ``` + """ self._run_async(self._import_triples_async(flow, triples)) async def _import_triples_async(self, flow: str, triples: Iterator[Triple]) -> None: @@ -60,7 +107,27 @@ class BulkClient: await websocket.send(json.dumps(message)) def export_triples(self, flow: str, **kwargs: Any) -> Iterator[Triple]: - """Bulk export triples via WebSocket""" + """ + Bulk export RDF triples from a flow. + + Efficiently downloads all triples via WebSocket streaming. + + Args: + flow: Flow identifier + **kwargs: Additional parameters (reserved for future use) + + Returns: + Iterator[Triple]: Stream of Triple objects + + Example: + ```python + bulk = api.bulk() + + # Export and process triples + for triple in bulk.export_triples(flow="default"): + print(f"{triple.s} -> {triple.p} -> {triple.o}") + ``` + """ async_gen = self._export_triples_async(flow) try: @@ -101,7 +168,32 @@ class BulkClient: ) def import_graph_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None: - """Bulk import graph embeddings via WebSocket""" + """ + Bulk import graph embeddings into a flow. + + Efficiently uploads graph entity embeddings via WebSocket streaming. + + Args: + flow: Flow identifier + embeddings: Iterator yielding embedding dictionaries + **kwargs: Additional parameters (reserved for future use) + + Example: + ```python + bulk = api.bulk() + + # Generate embeddings to import + def embedding_generator(): + yield {"entity": "entity1", "embedding": [0.1, 0.2, ...]} + yield {"entity": "entity2", "embedding": [0.3, 0.4, ...]} + # ... more embeddings + + bulk.import_graph_embeddings( + flow="default", + embeddings=embedding_generator() + ) + ``` + """ self._run_async(self._import_graph_embeddings_async(flow, embeddings)) async def _import_graph_embeddings_async(self, flow: str, embeddings: Iterator[Dict[str, Any]]) -> None: @@ -115,7 +207,29 @@ class BulkClient: await websocket.send(json.dumps(embedding)) def export_graph_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]: - """Bulk export graph embeddings via WebSocket""" + """ + Bulk export graph embeddings from a flow. + + Efficiently downloads all graph entity embeddings via WebSocket streaming. + + Args: + flow: Flow identifier + **kwargs: Additional parameters (reserved for future use) + + Returns: + Iterator[Dict[str, Any]]: Stream of embedding dictionaries + + Example: + ```python + bulk = api.bulk() + + # Export and process embeddings + for embedding in bulk.export_graph_embeddings(flow="default"): + entity = embedding.get("entity") + vector = embedding.get("embedding") + print(f"{entity}: {len(vector)} dimensions") + ``` + """ async_gen = self._export_graph_embeddings_async(flow) try: @@ -151,7 +265,33 @@ class BulkClient: yield json.loads(raw_message) def import_document_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None: - """Bulk import document embeddings via WebSocket""" + """ + Bulk import document embeddings into a flow. + + Efficiently uploads document chunk embeddings via WebSocket streaming + for use in document RAG queries. + + Args: + flow: Flow identifier + embeddings: Iterator yielding embedding dictionaries + **kwargs: Additional parameters (reserved for future use) + + Example: + ```python + bulk = api.bulk() + + # Generate document embeddings to import + def doc_embedding_generator(): + yield {"id": "doc1-chunk1", "embedding": [0.1, 0.2, ...]} + yield {"id": "doc1-chunk2", "embedding": [0.3, 0.4, ...]} + # ... more embeddings + + bulk.import_document_embeddings( + flow="default", + embeddings=doc_embedding_generator() + ) + ``` + """ self._run_async(self._import_document_embeddings_async(flow, embeddings)) async def _import_document_embeddings_async(self, flow: str, embeddings: Iterator[Dict[str, Any]]) -> None: @@ -165,7 +305,29 @@ class BulkClient: await websocket.send(json.dumps(embedding)) def export_document_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]: - """Bulk export document embeddings via WebSocket""" + """ + Bulk export document embeddings from a flow. + + Efficiently downloads all document chunk embeddings via WebSocket streaming. + + Args: + flow: Flow identifier + **kwargs: Additional parameters (reserved for future use) + + Returns: + Iterator[Dict[str, Any]]: Stream of embedding dictionaries + + Example: + ```python + bulk = api.bulk() + + # Export and process document embeddings + for embedding in bulk.export_document_embeddings(flow="default"): + doc_id = embedding.get("id") + vector = embedding.get("embedding") + print(f"{doc_id}: {len(vector)} dimensions") + ``` + """ async_gen = self._export_document_embeddings_async(flow) try: @@ -201,7 +363,34 @@ class BulkClient: yield json.loads(raw_message) def import_entity_contexts(self, flow: str, contexts: Iterator[Dict[str, Any]], **kwargs: Any) -> None: - """Bulk import entity contexts via WebSocket""" + """ + Bulk import entity contexts into a flow. + + Efficiently uploads entity context information via WebSocket streaming. + Entity contexts provide additional textual context about graph entities + for improved RAG performance. + + Args: + flow: Flow identifier + contexts: Iterator yielding context dictionaries + **kwargs: Additional parameters (reserved for future use) + + Example: + ```python + bulk = api.bulk() + + # Generate entity contexts to import + def context_generator(): + yield {"entity": "entity1", "context": "Description of entity1..."} + yield {"entity": "entity2", "context": "Description of entity2..."} + # ... more contexts + + bulk.import_entity_contexts( + flow="default", + contexts=context_generator() + ) + ``` + """ self._run_async(self._import_entity_contexts_async(flow, contexts)) async def _import_entity_contexts_async(self, flow: str, contexts: Iterator[Dict[str, Any]]) -> None: @@ -215,7 +404,29 @@ class BulkClient: await websocket.send(json.dumps(context)) def export_entity_contexts(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]: - """Bulk export entity contexts via WebSocket""" + """ + Bulk export entity contexts from a flow. + + Efficiently downloads all entity context information via WebSocket streaming. + + Args: + flow: Flow identifier + **kwargs: Additional parameters (reserved for future use) + + Returns: + Iterator[Dict[str, Any]]: Stream of context dictionaries + + Example: + ```python + bulk = api.bulk() + + # Export and process entity contexts + for context in bulk.export_entity_contexts(flow="default"): + entity = context.get("entity") + text = context.get("context") + print(f"{entity}: {text[:100]}...") + ``` + """ async_gen = self._export_entity_contexts_async(flow) try: @@ -251,7 +462,33 @@ class BulkClient: yield json.loads(raw_message) def import_objects(self, flow: str, objects: Iterator[Dict[str, Any]], **kwargs: Any) -> None: - """Bulk import objects via WebSocket""" + """ + Bulk import structured objects into a flow. + + Efficiently uploads structured data objects via WebSocket streaming + for use in GraphQL queries. + + Args: + flow: Flow identifier + objects: Iterator yielding object dictionaries + **kwargs: Additional parameters (reserved for future use) + + Example: + ```python + bulk = api.bulk() + + # Generate objects to import + def object_generator(): + yield {"id": "obj1", "name": "Object 1", "value": 100} + yield {"id": "obj2", "name": "Object 2", "value": 200} + # ... more objects + + bulk.import_objects( + flow="default", + objects=object_generator() + ) + ``` + """ self._run_async(self._import_objects_async(flow, objects)) async def _import_objects_async(self, flow: str, objects: Iterator[Dict[str, Any]]) -> None: diff --git a/trustgraph-base/trustgraph/api/collection.py b/trustgraph-base/trustgraph/api/collection.py index 5a1f0850..414d07db 100644 --- a/trustgraph-base/trustgraph/api/collection.py +++ b/trustgraph-base/trustgraph/api/collection.py @@ -1,3 +1,11 @@ +""" +TrustGraph Collection Management + +This module provides interfaces for managing data collections in TrustGraph. +Collections provide logical grouping and isolation for documents and knowledge +graph data. +""" + import datetime import logging @@ -7,14 +15,71 @@ from . exceptions import * logger = logging.getLogger(__name__) class Collection: + """ + Collection management client. + + Provides methods for managing data collections, including listing, + updating metadata, and deleting collections. Collections organize + documents and knowledge graph data into logical groupings for + isolation and access control. + """ def __init__(self, api): + """ + Initialize Collection client. + + Args: + api: Parent Api instance for making requests + """ self.api = api def request(self, request): + """ + Make a collection-scoped API request. + + Args: + request: Request payload dictionary + + Returns: + dict: Response object + """ return self.api.request(f"collection-management", request) def list_collections(self, user, tag_filter=None): + """ + List all collections for a user. + + Retrieves metadata for all collections owned by the specified user, + with optional filtering by tags. + + Args: + user: User identifier + tag_filter: Optional list of tags to filter collections (default: None) + + Returns: + list[CollectionMetadata]: List of collection metadata objects + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + collection = api.collection() + + # List all collections + all_colls = collection.list_collections(user="trustgraph") + for coll in all_colls: + print(f"{coll.collection}: {coll.name}") + print(f" Description: {coll.description}") + print(f" Tags: {', '.join(coll.tags)}") + + # List collections with specific tags + research_colls = collection.list_collections( + user="trustgraph", + tag_filter=["research", "published"] + ) + ``` + """ input = { "operation": "list-collections", @@ -50,6 +115,46 @@ class Collection: raise ProtocolException(f"Response not formatted correctly") def update_collection(self, user, collection, name=None, description=None, tags=None): + """ + Update collection metadata. + + Updates the name, description, and/or tags for an existing collection. + Only provided fields are updated; others remain unchanged. + + Args: + user: User identifier + collection: Collection identifier + name: New collection name (optional) + description: New collection description (optional) + tags: New list of tags (optional) + + Returns: + CollectionMetadata: Updated collection metadata, or None if not found + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + collection_api = api.collection() + + # Update collection metadata + updated = collection_api.update_collection( + user="trustgraph", + collection="default", + name="Default Collection", + description="Main data collection for general use", + tags=["default", "production"] + ) + + # Update only specific fields + updated = collection_api.update_collection( + user="trustgraph", + collection="research", + description="Updated description" + ) + ``` + """ input = { "operation": "update-collection", @@ -82,6 +187,29 @@ class Collection: raise ProtocolException(f"Response not formatted correctly") def delete_collection(self, user, collection): + """ + Delete a collection. + + Removes a collection and all its associated data from the system. + + Args: + user: User identifier + collection: Collection identifier to delete + + Returns: + dict: Empty response object + + Example: + ```python + collection_api = api.collection() + + # Delete a collection + collection_api.delete_collection( + user="trustgraph", + collection="old-collection" + ) + ``` + """ input = { "operation": "delete-collection", diff --git a/trustgraph-base/trustgraph/api/config.py b/trustgraph-base/trustgraph/api/config.py index cd50ca6c..c8c8d5bb 100644 --- a/trustgraph-base/trustgraph/api/config.py +++ b/trustgraph-base/trustgraph/api/config.py @@ -1,3 +1,9 @@ +""" +TrustGraph Configuration Management + +This module provides interfaces for managing TrustGraph configuration settings, +including retrieving, updating, and deleting configuration values. +""" import logging @@ -7,14 +13,67 @@ from . types import ConfigValue logger = logging.getLogger(__name__) class Config: + """ + Configuration management client. + + Provides methods for managing TrustGraph configuration settings across + different types (llm, embedding, etc.), with support for get, put, delete, + and list operations. + """ def __init__(self, api): + """ + Initialize Config client. + + Args: + api: Parent Api instance for making requests + """ self.api = api def request(self, request): + """ + Make a configuration-scoped API request. + + Args: + request: Request payload dictionary + + Returns: + dict: Response object + """ return self.api.request("config", request) def get(self, keys): + """ + Get configuration values for specified keys. + + Retrieves the configuration values for one or more configuration keys. + + Args: + keys: List of ConfigKey objects specifying which values to retrieve + + Returns: + list[ConfigValue]: List of configuration values + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + from trustgraph.api import ConfigKey + + config = api.config() + + # Get specific configuration values + values = config.get([ + ConfigKey(type="llm", key="model"), + ConfigKey(type="llm", key="temperature"), + ConfigKey(type="embedding", key="model") + ]) + + for val in values: + print(f"{val.type}.{val.key} = {val.value}") + ``` + """ # The input consists of system and prompt strings input = { @@ -41,6 +100,28 @@ class Config: raise ProtocolException("Response not formatted correctly") def put(self, values): + """ + Set configuration values. + + Updates or creates configuration values for the specified keys. + + Args: + values: List of ConfigValue objects with type, key, and value + + Example: + ```python + from trustgraph.api import ConfigValue + + config = api.config() + + # Set configuration values + config.put([ + ConfigValue(type="llm", key="model", value="gpt-4"), + ConfigValue(type="llm", key="temperature", value="0.7"), + ConfigValue(type="embedding", key="model", value="text-embedding-3-small") + ]) + ``` + """ # The input consists of system and prompt strings input = { @@ -54,6 +135,27 @@ class Config: self.request(input) def delete(self, keys): + """ + Delete configuration values. + + Removes configuration values for the specified keys. + + Args: + keys: List of ConfigKey objects specifying which values to delete + + Example: + ```python + from trustgraph.api import ConfigKey + + config = api.config() + + # Delete configuration values + config.delete([ + ConfigKey(type="llm", key="old-setting"), + ConfigKey(type="embedding", key="deprecated") + ]) + ``` + """ # The input consists of system and prompt strings input = { @@ -67,6 +169,31 @@ class Config: self.request(input) def list(self, type): + """ + List all configuration keys for a given type. + + Retrieves a list of all configuration key names within a specific + configuration type. + + Args: + type: Configuration type (e.g., "llm", "embedding", "storage") + + Returns: + list[str]: List of configuration key names + + Example: + ```python + config = api.config() + + # List all LLM configuration keys + llm_keys = config.list(type="llm") + print(f"LLM configuration keys: {llm_keys}") + + # List all embedding configuration keys + embedding_keys = config.list(type="embedding") + print(f"Embedding configuration keys: {embedding_keys}") + ``` + """ # The input consists of system and prompt strings input = { @@ -77,6 +204,36 @@ class Config: return self.request(input)["directory"] def get_values(self, type): + """ + Get all configuration values for a given type. + + Retrieves all configuration key-value pairs within a specific + configuration type. + + Args: + type: Configuration type (e.g., "llm", "embedding", "storage") + + Returns: + list[ConfigValue]: List of all configuration values for the type + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + config = api.config() + + # Get all LLM configuration + llm_config = config.get_values(type="llm") + for val in llm_config: + print(f"{val.key} = {val.value}") + + # Get all embedding configuration + embedding_config = config.get_values(type="embedding") + for val in embedding_config: + print(f"{val.key} = {val.value}") + ``` + """ # The input consists of system and prompt strings input = { @@ -99,6 +256,28 @@ class Config: raise ProtocolException(f"Response not formatted correctly") def all(self): + """ + Get complete configuration and version. + + Retrieves the entire configuration object along with its version number. + + Returns: + tuple: (config_dict, version_string) - Complete configuration and version + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + config = api.config() + + # Get complete configuration + config_data, version = config.all() + + print(f"Configuration version: {version}") + print(f"Configuration: {config_data}") + ``` + """ # The input consists of system and prompt strings input = { diff --git a/trustgraph-base/trustgraph/api/flow.py b/trustgraph-base/trustgraph/api/flow.py index 142a699b..d06a6327 100644 --- a/trustgraph-base/trustgraph/api/flow.py +++ b/trustgraph-base/trustgraph/api/flow.py @@ -1,3 +1,10 @@ +""" +TrustGraph Flow Management + +This module provides interfaces for managing and executing TrustGraph flows. +Flows are the primary execution units that provide access to various services +including LLM operations, RAG queries, knowledge graph management, and more. +""" import json import base64 @@ -11,11 +18,38 @@ def to_value(x): return Literal(x["v"]) class Flow: + """ + Flow management client for blueprint and flow instance operations. + + This class provides methods for managing flow blueprints (templates) and + flow instances (running flows). Blueprints define the structure and + parameters of flows, while instances represent active flows that can + execute services. + """ def __init__(self, api): + """ + Initialize Flow client. + + Args: + api: Parent Api instance for making requests + """ self.api = api def request(self, path=None, request=None): + """ + Make a flow-scoped API request. + + Args: + path: Optional path suffix for flow endpoints + request: Request payload dictionary + + Returns: + dict: Response object + + Raises: + RuntimeError: If request parameter is not specified + """ if request is None: raise RuntimeError("request must be specified") @@ -26,9 +60,39 @@ class Flow: return self.api.request(f"flow", request) def id(self, id="default"): + """ + Get a FlowInstance for executing operations on a specific flow. + + Args: + id: Flow identifier (default: "default") + + Returns: + FlowInstance: Flow instance for service operations + + Example: + ```python + flow = api.flow().id("my-flow") + response = flow.text_completion( + system="You are helpful", + prompt="Hello" + ) + ``` + """ return FlowInstance(api=self, id=id) def list_blueprints(self): + """ + List all available flow blueprints. + + Returns: + list[str]: List of blueprint names + + Example: + ```python + blueprints = api.flow().list_blueprints() + print(blueprints) # ['default', 'custom-flow', ...] + ``` + """ # The input consists of system and prompt strings input = { @@ -38,6 +102,21 @@ class Flow: return self.request(request = input)["blueprint-names"] def get_blueprint(self, blueprint_name): + """ + Get a flow blueprint definition by name. + + Args: + blueprint_name: Name of the blueprint to retrieve + + Returns: + dict: Blueprint definition as a dictionary + + Example: + ```python + blueprint = api.flow().get_blueprint("default") + print(blueprint) # Blueprint configuration + ``` + """ # The input consists of system and prompt strings input = { @@ -48,6 +127,22 @@ class Flow: return json.loads(self.request(request = input)["blueprint-definition"]) def put_blueprint(self, blueprint_name, definition): + """ + Create or update a flow blueprint. + + Args: + blueprint_name: Name for the blueprint + definition: Blueprint definition dictionary + + Example: + ```python + definition = { + "services": ["text-completion", "graph-rag"], + "parameters": {"model": "gpt-4"} + } + api.flow().put_blueprint("my-blueprint", definition) + ``` + """ # The input consists of system and prompt strings input = { @@ -59,6 +154,17 @@ class Flow: self.request(request = input) def delete_blueprint(self, blueprint_name): + """ + Delete a flow blueprint. + + Args: + blueprint_name: Name of the blueprint to delete + + Example: + ```python + api.flow().delete_blueprint("old-blueprint") + ``` + """ # The input consists of system and prompt strings input = { @@ -69,6 +175,18 @@ class Flow: self.request(request = input) def list(self): + """ + List all active flow instances. + + Returns: + list[str]: List of flow instance IDs + + Example: + ```python + flows = api.flow().list() + print(flows) # ['default', 'flow-1', 'flow-2', ...] + ``` + """ # The input consists of system and prompt strings input = { @@ -78,6 +196,21 @@ class Flow: return self.request(request = input)["flow-ids"] def get(self, id): + """ + Get the definition of a running flow instance. + + Args: + id: Flow instance ID + + Returns: + dict: Flow instance definition + + Example: + ```python + flow_def = api.flow().get("default") + print(flow_def) + ``` + """ # The input consists of system and prompt strings input = { @@ -88,6 +221,25 @@ class Flow: return json.loads(self.request(request = input)["flow"]) def start(self, blueprint_name, id, description, parameters=None): + """ + Start a new flow instance from a blueprint. + + Args: + blueprint_name: Name of the blueprint to instantiate + id: Unique identifier for the flow instance + description: Human-readable description + parameters: Optional parameters dictionary + + Example: + ```python + api.flow().start( + blueprint_name="default", + id="my-flow", + description="My custom flow", + parameters={"model": "gpt-4"} + ) + ``` + """ # The input consists of system and prompt strings input = { @@ -103,6 +255,17 @@ class Flow: self.request(request = input) def stop(self, id): + """ + Stop a running flow instance. + + Args: + id: Flow instance ID to stop + + Example: + ```python + api.flow().stop("my-flow") + ``` + """ # The input consists of system and prompt strings input = { @@ -111,18 +274,70 @@ class Flow: } self.request(request = input) - + class FlowInstance: + """ + Flow instance client for executing services on a specific flow. + + This class provides access to all TrustGraph services including: + - Text completion and embeddings + - Agent operations with state management + - Graph and document RAG queries + - Knowledge graph operations (triples, objects) + - Document loading and processing + - Natural language to GraphQL query conversion + - Structured data analysis and schema detection + - MCP tool execution + - Prompt templating + + Services are accessed through a running flow instance identified by ID. + """ def __init__(self, api, id): + """ + Initialize FlowInstance. + + Args: + api: Parent Flow client + id: Flow instance identifier + """ self.api = api self.id = id def request(self, path, request): + """ + Make a service request on this flow instance. + Args: + path: Service path (e.g., "service/text-completion") + request: Request payload dictionary + + Returns: + dict: Service response + """ return self.api.request(path = f"{self.id}/{path}", request = request) def text_completion(self, system, prompt): + """ + Execute text completion using the flow's LLM. + + Args: + system: System prompt defining the assistant's behavior + prompt: User prompt/question + + Returns: + str: Generated response text + + Example: + ```python + flow = api.flow().id("default") + response = flow.text_completion( + system="You are a helpful assistant", + prompt="What is quantum computing?" + ) + print(response) + ``` + """ # The input consists of system and prompt strings input = { @@ -136,6 +351,44 @@ class FlowInstance: )["response"] def agent(self, question, user="trustgraph", state=None, group=None, history=None): + """ + Execute an agent operation with reasoning and tool use capabilities. + + Agents can perform multi-step reasoning, use tools, and maintain conversation + state across interactions. This is a synchronous non-streaming version. + + Args: + question: User question or instruction + user: User identifier (default: "trustgraph") + state: Optional state dictionary for stateful conversations + group: Optional group identifier for multi-user contexts + history: Optional conversation history as list of message dicts + + Returns: + str: Agent's final answer + + Example: + ```python + flow = api.flow().id("default") + + # Simple question + answer = flow.agent( + question="What is the capital of France?", + user="trustgraph" + ) + + # With conversation history + history = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi! How can I help?"} + ] + answer = flow.agent( + question="Tell me about Paris", + user="trustgraph", + history=history + ) + ``` + """ # The input consists of a question and optional context input = { @@ -164,6 +417,37 @@ class FlowInstance: entity_limit=50, triple_limit=30, max_subgraph_size=150, max_path_length=2, ): + """ + Execute graph-based Retrieval-Augmented Generation (RAG) query. + + Graph RAG uses knowledge graph structure to find relevant context by + traversing entity relationships, then generates a response using an LLM. + + Args: + query: Natural language query + user: User/keyspace identifier (default: "trustgraph") + collection: Collection identifier (default: "default") + entity_limit: Maximum entities to retrieve (default: 50) + triple_limit: Maximum triples per entity (default: 30) + max_subgraph_size: Maximum total triples in subgraph (default: 150) + max_path_length: Maximum traversal depth (default: 2) + + Returns: + str: Generated response incorporating graph context + + Example: + ```python + flow = api.flow().id("default") + response = flow.graph_rag( + query="Tell me about Marie Curie's discoveries", + user="trustgraph", + collection="scientists", + entity_limit=20, + max_path_length=3 + ) + print(response) + ``` + """ # The input consists of a question input = { @@ -185,6 +469,33 @@ class FlowInstance: self, query, user="trustgraph", collection="default", doc_limit=10, ): + """ + Execute document-based Retrieval-Augmented Generation (RAG) query. + + Document RAG uses vector embeddings to find relevant document chunks, + then generates a response using an LLM with those chunks as context. + + Args: + query: Natural language query + user: User/keyspace identifier (default: "trustgraph") + collection: Collection identifier (default: "default") + doc_limit: Maximum document chunks to retrieve (default: 10) + + Returns: + str: Generated response incorporating document context + + Example: + ```python + flow = api.flow().id("default") + response = flow.document_rag( + query="Summarize the key findings", + user="trustgraph", + collection="research-papers", + doc_limit=5 + ) + print(response) + ``` + """ # The input consists of a question input = { @@ -200,6 +511,25 @@ class FlowInstance: )["response"] def embeddings(self, text): + """ + Generate vector embeddings for text. + + Converts text into dense vector representations suitable for semantic + search and similarity comparison. + + Args: + text: Input text to embed + + Returns: + list[float]: Vector embedding + + Example: + ```python + flow = api.flow().id("default") + vectors = flow.embeddings("quantum computing") + print(f"Embedding dimension: {len(vectors)}") + ``` + """ # The input consists of a text block input = { @@ -212,6 +542,32 @@ class FlowInstance: )["vectors"] def graph_embeddings_query(self, text, user, collection, limit=10): + """ + Query knowledge graph entities using semantic similarity. + + Finds entities in the knowledge graph whose descriptions are semantically + similar to the input text, using vector embeddings. + + Args: + text: Query text for semantic search + user: User/keyspace identifier + collection: Collection identifier + limit: Maximum number of results (default: 10) + + Returns: + dict: Query results with similar entities + + Example: + ```python + flow = api.flow().id("default") + results = flow.graph_embeddings_query( + text="physicist who discovered radioactivity", + user="trustgraph", + collection="scientists", + limit=5 + ) + ``` + """ # Query graph embeddings for semantic search input = { @@ -227,6 +583,39 @@ class FlowInstance: ) def prompt(self, id, variables): + """ + Execute a prompt template with variable substitution. + + Prompt templates allow reusable prompt patterns with dynamic variable + substitution, useful for consistent prompt engineering. + + Args: + id: Prompt template identifier + variables: Dictionary of variable name to value mappings + + Returns: + str or dict: Rendered prompt result (text or structured object) + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + flow = api.flow().id("default") + + # Text template + result = flow.prompt( + id="summarize-template", + variables={"topic": "quantum computing", "length": "brief"} + ) + + # Structured template + result = flow.prompt( + id="extract-entities", + variables={"text": "Marie Curie won Nobel Prizes"} + ) + ``` + """ input = { "id": id, @@ -252,6 +641,33 @@ class FlowInstance: raise ProtocolException("Response not formatted correctly") def mcp_tool(self, name, parameters={}): + """ + Execute a Model Context Protocol (MCP) tool. + + MCP tools provide extensible functionality for agents and workflows, + allowing integration with external systems and services. + + Args: + name: Tool name/identifier + parameters: Tool parameters dictionary (default: {}) + + Returns: + str or dict: Tool execution result + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + flow = api.flow().id("default") + + # Execute a tool + result = flow.mcp_tool( + name="search-web", + parameters={"query": "latest AI news", "limit": 5} + ) + ``` + """ # The input consists of name and parameters input = { @@ -281,6 +697,46 @@ class FlowInstance: self, s=None, p=None, o=None, user=None, collection=None, limit=10000 ): + """ + Query knowledge graph triples using pattern matching. + + Searches for RDF triples matching the given subject, predicate, and/or + object patterns. Unspecified parameters act as wildcards. + + Args: + s: Subject URI (optional, use None for wildcard) + p: Predicate URI (optional, use None for wildcard) + o: Object URI or Literal (optional, use None for wildcard) + user: User/keyspace identifier (optional) + collection: Collection identifier (optional) + limit: Maximum results to return (default: 10000) + + Returns: + list[Triple]: List of matching Triple objects + + Raises: + RuntimeError: If s or p is not a Uri, or o is not Uri/Literal + + Example: + ```python + from trustgraph.knowledge import Uri, Literal + + flow = api.flow().id("default") + + # Find all triples about a specific subject + triples = flow.triples_query( + s=Uri("http://example.org/person/marie-curie"), + user="trustgraph", + collection="scientists" + ) + + # Find all instances of a specific relationship + triples = flow.triples_query( + p=Uri("http://example.org/ontology/discovered"), + limit=100 + ) + ``` + """ input = { "limit": limit @@ -325,6 +781,39 @@ class FlowInstance: self, document, id=None, metadata=None, user=None, collection=None, ): + """ + Load a binary document for processing. + + Uploads a document (PDF, DOCX, images, etc.) for extraction and + processing through the flow's document pipeline. + + Args: + document: Document content as bytes + id: Optional document identifier (auto-generated if None) + metadata: Optional metadata (list of Triples or object with emit method) + user: User/keyspace identifier (optional) + collection: Collection identifier (optional) + + Returns: + dict: Processing response + + Raises: + RuntimeError: If metadata is provided without id + + Example: + ```python + flow = api.flow().id("default") + + # Load a PDF document + with open("research.pdf", "rb") as f: + result = flow.load_document( + document=f.read(), + id="research-001", + user="trustgraph", + collection="papers" + ) + ``` + """ if id is None: @@ -372,6 +861,41 @@ class FlowInstance: self, text, id=None, metadata=None, charset="utf-8", user=None, collection=None, ): + """ + Load text content for processing. + + Uploads text content for extraction and processing through the flow's + text pipeline. + + Args: + text: Text content as bytes + id: Optional document identifier (auto-generated if None) + metadata: Optional metadata (list of Triples or object with emit method) + charset: Character encoding (default: "utf-8") + user: User/keyspace identifier (optional) + collection: Collection identifier (optional) + + Returns: + dict: Processing response + + Raises: + RuntimeError: If metadata is provided without id + + Example: + ```python + flow = api.flow().id("default") + + # Load text content + text_content = b"This is the document content..." + result = flow.load_text( + text=text_content, + id="text-001", + charset="utf-8", + user="trustgraph", + collection="documents" + ) + ``` + """ if id is None: @@ -417,6 +941,60 @@ class FlowInstance: self, query, user="trustgraph", collection="default", variables=None, operation_name=None ): + """ + Execute a GraphQL query against structured objects in the knowledge graph. + + Queries structured data using GraphQL syntax, allowing complex queries + with filtering, aggregation, and relationship traversal. + + Args: + query: GraphQL query string + user: User/keyspace identifier (default: "trustgraph") + collection: Collection identifier (default: "default") + variables: Optional query variables dictionary + operation_name: Optional operation name for multi-operation documents + + Returns: + dict: GraphQL response with 'data', 'errors', and/or 'extensions' fields + + Raises: + ProtocolException: If system-level error occurs + + Example: + ```python + flow = api.flow().id("default") + + # Simple query + query = ''' + { + scientists(limit: 10) { + name + field + discoveries + } + } + ''' + result = flow.objects_query( + query=query, + user="trustgraph", + collection="scientists" + ) + + # Query with variables + query = ''' + query GetScientist($name: String!) { + scientists(name: $name) { + name + nobelPrizes + } + } + ''' + result = flow.objects_query( + query=query, + variables={"name": "Marie Curie"} + ) + ``` + """ # The input consists of a GraphQL query and optional variables input = { diff --git a/trustgraph-base/trustgraph/api/knowledge.py b/trustgraph-base/trustgraph/api/knowledge.py index 3c625057..23f6c9f2 100644 --- a/trustgraph-base/trustgraph/api/knowledge.py +++ b/trustgraph-base/trustgraph/api/knowledge.py @@ -1,3 +1,10 @@ +""" +TrustGraph Knowledge Graph Core Management + +This module provides interfaces for managing knowledge graph cores in TrustGraph. +KG cores are pre-built knowledge graph datasets that can be loaded and unloaded +into flows for use in queries and RAG operations. +""" import json import base64 @@ -10,15 +17,56 @@ def to_value(x): return Literal(x["v"]) class Knowledge: + """ + Knowledge graph core management client. + + Provides methods for managing knowledge graph cores, including listing + available cores, loading them into flows, and unloading them. KG cores + are pre-built knowledge graph datasets that enhance RAG capabilities. + """ def __init__(self, api): + """ + Initialize Knowledge client. + + Args: + api: Parent Api instance for making requests + """ self.api = api def request(self, request): + """ + Make a knowledge-scoped API request. + Args: + request: Request payload dictionary + + Returns: + dict: Response object + """ return self.api.request(f"knowledge", request) def list_kg_cores(self, user="trustgraph"): + """ + List all available knowledge graph cores. + + Retrieves the IDs of all KG cores available for the specified user. + + Args: + user: User identifier (default: "trustgraph") + + Returns: + list[str]: List of KG core identifiers + + Example: + ```python + knowledge = api.knowledge() + + # List available KG cores + cores = knowledge.list_kg_cores(user="trustgraph") + print(f"Available KG cores: {cores}") + ``` + """ # The input consists of system and prompt strings input = { @@ -29,6 +77,24 @@ class Knowledge: return self.request(request = input)["ids"] def delete_kg_core(self, id, user="trustgraph"): + """ + Delete a knowledge graph core. + + Removes a KG core from storage. This does not affect currently loaded + cores in flows. + + Args: + id: KG core identifier to delete + user: User identifier (default: "trustgraph") + + Example: + ```python + knowledge = api.knowledge() + + # Delete a KG core + knowledge.delete_kg_core(id="medical-kb-v1", user="trustgraph") + ``` + """ # The input consists of system and prompt strings input = { @@ -41,6 +107,39 @@ class Knowledge: def load_kg_core(self, id, user="trustgraph", flow="default", collection="default"): + """ + Load a knowledge graph core into a flow. + + Makes a KG core available for use in queries and RAG operations within + the specified flow and collection. + + Args: + id: KG core identifier to load + user: User identifier (default: "trustgraph") + flow: Flow instance to load into (default: "default") + collection: Collection to associate with (default: "default") + + Example: + ```python + knowledge = api.knowledge() + + # Load a medical knowledge base into the default flow + knowledge.load_kg_core( + id="medical-kb-v1", + user="trustgraph", + flow="default", + collection="medical" + ) + + # Now the flow can use this KG core for RAG queries + flow = api.flow().id("default") + response = flow.graph_rag( + query="What are the symptoms of diabetes?", + user="trustgraph", + collection="medical" + ) + ``` + """ # The input consists of system and prompt strings input = { @@ -54,6 +153,29 @@ class Knowledge: self.request(request = input) def unload_kg_core(self, id, user="trustgraph", flow="default"): + """ + Unload a knowledge graph core from a flow. + + Removes a KG core from active use in the specified flow, freeing + resources while keeping the core available in storage. + + Args: + id: KG core identifier to unload + user: User identifier (default: "trustgraph") + flow: Flow instance to unload from (default: "default") + + Example: + ```python + knowledge = api.knowledge() + + # Unload a KG core when no longer needed + knowledge.unload_kg_core( + id="medical-kb-v1", + user="trustgraph", + flow="default" + ) + ``` + """ # The input consists of system and prompt strings input = { diff --git a/trustgraph-base/trustgraph/api/library.py b/trustgraph-base/trustgraph/api/library.py index a08a9546..b068f627 100644 --- a/trustgraph-base/trustgraph/api/library.py +++ b/trustgraph-base/trustgraph/api/library.py @@ -1,3 +1,9 @@ +""" +TrustGraph Document Library Management + +This module provides interfaces for managing documents in the TrustGraph library, +including document storage, metadata management, and processing workflow coordination. +""" import datetime import time @@ -15,17 +21,79 @@ def to_value(x): return Literal(x["v"]) class Library: + """ + Document library management client. + + Provides methods for managing documents in the TrustGraph library, including + adding, retrieving, updating, and removing documents, as well as managing + document processing workflows. + """ def __init__(self, api): + """ + Initialize Library client. + + Args: + api: Parent Api instance for making requests + """ self.api = api def request(self, request): + """ + Make a library-scoped API request. + + Args: + request: Request payload dictionary + + Returns: + dict: Response object + """ return self.api.request(f"librarian", request) def add_document( self, document, id, metadata, user, title, comments, - kind="text/plain", tags=[], + kind="text/plain", tags=[], ): + """ + Add a document to the library. + + Stores a document with associated metadata in the library for + retrieval and processing. + + Args: + document: Document content as bytes + id: Document identifier (auto-generated if None) + metadata: Document metadata as list of Triple objects or object with emit method + user: User/owner identifier + title: Document title + comments: Document description or comments + kind: MIME type of the document (default: "text/plain") + tags: List of tags for categorization (default: []) + + Returns: + dict: Response from the add operation + + Raises: + RuntimeError: If metadata is provided without an id + + Example: + ```python + library = api.library() + + # Add a PDF document + with open("research.pdf", "rb") as f: + library.add_document( + document=f.read(), + id="research-001", + metadata=[], + user="trustgraph", + title="Research Paper", + comments="Key findings in quantum computing", + kind="application/pdf", + tags=["research", "physics"] + ) + ``` + """ if id is None: @@ -85,6 +153,31 @@ class Library: return self.request(input) def get_documents(self, user): + """ + List all documents for a user. + + Retrieves metadata for all documents owned by the specified user. + + Args: + user: User identifier + + Returns: + list[DocumentMetadata]: List of document metadata objects + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + library = api.library() + docs = library.get_documents(user="trustgraph") + + for doc in docs: + print(f"{doc.id}: {doc.title} ({doc.kind})") + print(f" Uploaded: {doc.time}") + print(f" Tags: {', '.join(doc.tags)}") + ``` + """ input = { "operation": "list-documents", @@ -119,6 +212,29 @@ class Library: raise ProtocolException(f"Response not formatted correctly") def get_document(self, user, id): + """ + Get metadata for a specific document. + + Retrieves the metadata for a single document by ID. + + Args: + user: User identifier + id: Document identifier + + Returns: + DocumentMetadata: Document metadata object + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + library = api.library() + doc = library.get_document(user="trustgraph", id="doc-123") + print(f"Title: {doc.title}") + print(f"Comments: {doc.comments}") + ``` + """ input = { "operation": "get-document", @@ -152,6 +268,42 @@ class Library: raise ProtocolException(f"Response not formatted correctly") def update_document(self, user, id, metadata): + """ + Update document metadata. + + Updates the metadata for an existing document in the library. + + Args: + user: User identifier + id: Document identifier + metadata: Updated DocumentMetadata object + + Returns: + DocumentMetadata: Updated document metadata + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + library = api.library() + + # Get existing document + doc = library.get_document(user="trustgraph", id="doc-123") + + # Update metadata + doc.title = "Updated Title" + doc.comments = "Updated description" + doc.tags.append("reviewed") + + # Save changes + updated_doc = library.update_document( + user="trustgraph", + id="doc-123", + metadata=doc + ) + ``` + """ input = { "operation": "update-document", @@ -199,6 +351,24 @@ class Library: raise ProtocolException(f"Response not formatted correctly") def remove_document(self, user, id): + """ + Remove a document from the library. + + Deletes a document and its metadata from the library. + + Args: + user: User identifier + id: Document identifier to remove + + Returns: + dict: Empty response object + + Example: + ```python + library = api.library() + library.remove_document(user="trustgraph", id="doc-123") + ``` + """ input = { "operation": "remove-document", @@ -214,6 +384,38 @@ class Library: self, id, document_id, flow="default", user="trustgraph", collection="default", tags=[], ): + """ + Start a document processing workflow. + + Initiates processing of a document through a specified flow, tracking + the processing job with metadata. + + Args: + id: Unique processing job identifier + document_id: ID of the document to process + flow: Flow instance to use for processing (default: "default") + user: User identifier (default: "trustgraph") + collection: Target collection for processed data (default: "default") + tags: List of tags for the processing job (default: []) + + Returns: + dict: Empty response object + + Example: + ```python + library = api.library() + + # Start processing a document + library.start_processing( + id="proc-001", + document_id="doc-123", + flow="default", + user="trustgraph", + collection="research", + tags=["automated", "extract"] + ) + ``` + """ input = { "operation": "add-processing", @@ -233,8 +435,26 @@ class Library: return {} def stop_processing( - self, id, user="trustgraph", + self, id, user="trustgraph", ): + """ + Stop a running document processing job. + + Terminates an active document processing workflow and removes its metadata. + + Args: + id: Processing job identifier to stop + user: User identifier (default: "trustgraph") + + Returns: + dict: Empty response object + + Example: + ```python + library = api.library() + library.stop_processing(id="proc-001", user="trustgraph") + ``` + """ input = { "operation": "remove-processing", @@ -247,6 +467,34 @@ class Library: return {} def get_processings(self, user="trustgraph"): + """ + List all active document processing jobs. + + Retrieves metadata for all currently running document processing workflows + for the specified user. + + Args: + user: User identifier (default: "trustgraph") + + Returns: + list[ProcessingMetadata]: List of processing job metadata objects + + Raises: + ProtocolException: If response format is invalid + + Example: + ```python + library = api.library() + jobs = library.get_processings(user="trustgraph") + + for job in jobs: + print(f"Job {job.id}:") + print(f" Document: {job.document_id}") + print(f" Flow: {job.flow}") + print(f" Collection: {job.collection}") + print(f" Started: {job.time}") + ``` + """ input = { "operation": "list-processing", diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index 23e3dbc0..c712f808 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -1,3 +1,9 @@ +""" +TrustGraph Synchronous WebSocket Client + +This module provides synchronous WebSocket-based access to TrustGraph services with +streaming support for real-time responses from agents, RAG queries, and text completions. +""" import json import asyncio @@ -10,9 +16,26 @@ from . exceptions import ProtocolException, raise_from_error_dict class SocketClient: - """Synchronous WebSocket client (wraps async websockets library)""" + """ + Synchronous WebSocket client for streaming operations. + + Provides a synchronous interface to WebSocket-based TrustGraph services, + wrapping async websockets library with synchronous generators for ease of use. + Supports streaming responses from agents, RAG queries, and text completions. + + Note: This is a synchronous wrapper around async WebSocket operations. For + true async support, use AsyncSocketClient instead. + """ def __init__(self, url: str, timeout: int, token: Optional[str]) -> None: + """ + Initialize synchronous WebSocket client. + + Args: + url: Base URL for TrustGraph API (HTTP/HTTPS will be converted to WS/WSS) + timeout: WebSocket timeout in seconds + token: Optional bearer token for authentication + """ self.url: str = self._convert_to_ws_url(url) self.timeout: int = timeout self.token: Optional[str] = token @@ -22,7 +45,15 @@ class SocketClient: self._loop: Optional[asyncio.AbstractEventLoop] = None def _convert_to_ws_url(self, url: str) -> str: - """Convert HTTP URL to WebSocket URL""" + """ + Convert HTTP URL to WebSocket URL. + + Args: + url: HTTP/HTTPS or WS/WSS URL + + Returns: + str: WebSocket URL (ws:// or wss://) + """ if url.startswith("http://"): return url.replace("http://", "ws://", 1) elif url.startswith("https://"): @@ -34,7 +65,25 @@ class SocketClient: return f"ws://{url}" def flow(self, flow_id: str) -> "SocketFlowInstance": - """Get flow instance for WebSocket operations""" + """ + Get a flow instance for WebSocket streaming operations. + + Args: + flow_id: Flow identifier + + Returns: + SocketFlowInstance: Flow instance with streaming methods + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + # Stream agent responses + for chunk in flow.agent(question="Hello", user="trustgraph", streaming=True): + print(chunk.content, end='', flush=True) + ``` + """ return SocketFlowInstance(self, flow_id) def _send_request_sync( @@ -242,15 +291,32 @@ class SocketClient: ) def close(self) -> None: - """Close WebSocket connection""" + """ + Close WebSocket connections. + + Note: Cleanup is handled automatically by context managers in async code. + """ # Cleanup handled by context manager in async code pass class SocketFlowInstance: - """Synchronous WebSocket flow instance with same interface as REST FlowInstance""" + """ + Synchronous WebSocket flow instance for streaming operations. + + Provides the same interface as REST FlowInstance but with WebSocket-based + streaming support for real-time responses. All methods support an optional + `streaming` parameter to enable incremental result delivery. + """ def __init__(self, client: SocketClient, flow_id: str) -> None: + """ + Initialize socket flow instance. + + Args: + client: Parent SocketClient + flow_id: Flow identifier + """ self.client: SocketClient = client self.flow_id: str = flow_id @@ -264,7 +330,44 @@ class SocketFlowInstance: streaming: bool = False, **kwargs: Any ) -> Union[Dict[str, Any], Iterator[StreamingChunk]]: - """Agent with optional streaming""" + """ + Execute an agent operation with streaming support. + + Agents can perform multi-step reasoning with tool use. This method always + returns streaming chunks (thoughts, observations, answers) even when + streaming=False, to show the agent's reasoning process. + + Args: + question: User question or instruction + user: User identifier + state: Optional state dictionary for stateful conversations + group: Optional group identifier for multi-user contexts + history: Optional conversation history as list of message dicts + streaming: Enable streaming mode (default: False) + **kwargs: Additional parameters passed to the agent service + + Returns: + Iterator[StreamingChunk]: Stream of agent thoughts, observations, and answers + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + # Stream agent reasoning + for chunk in flow.agent( + question="What is quantum computing?", + user="trustgraph", + streaming=True + ): + if isinstance(chunk, AgentThought): + print(f"[Thinking] {chunk.content}") + elif isinstance(chunk, AgentObservation): + print(f"[Observation] {chunk.content}") + elif isinstance(chunk, AgentAnswer): + print(f"[Answer] {chunk.content}") + ``` + """ request = { "question": question, "user": user, @@ -283,7 +386,40 @@ class SocketFlowInstance: return self.client._send_request_sync("agent", self.flow_id, request, streaming=True) def text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> Union[str, Iterator[str]]: - """Text completion with optional streaming""" + """ + Execute text completion with optional streaming. + + Args: + system: System prompt defining the assistant's behavior + prompt: User prompt/question + streaming: Enable streaming mode (default: False) + **kwargs: Additional parameters passed to the service + + Returns: + Union[str, Iterator[str]]: Complete response or stream of text chunks + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + # Non-streaming + response = flow.text_completion( + system="You are helpful", + prompt="Explain quantum computing", + streaming=False + ) + print(response) + + # Streaming + for chunk in flow.text_completion( + system="You are helpful", + prompt="Explain quantum computing", + streaming=True + ): + print(chunk, end='', flush=True) + ``` + """ request = { "system": system, "prompt": prompt, @@ -316,7 +452,40 @@ class SocketFlowInstance: streaming: bool = False, **kwargs: Any ) -> Union[str, Iterator[str]]: - """Graph RAG with optional streaming""" + """ + Execute graph-based RAG query with optional streaming. + + Uses knowledge graph structure to find relevant context, then generates + a response using an LLM. Streaming mode delivers results incrementally. + + Args: + query: Natural language query + user: User/keyspace identifier + collection: Collection identifier + max_subgraph_size: Maximum total triples in subgraph (default: 1000) + max_subgraph_count: Maximum number of subgraphs (default: 5) + max_entity_distance: Maximum traversal depth (default: 3) + streaming: Enable streaming mode (default: False) + **kwargs: Additional parameters passed to the service + + Returns: + Union[str, Iterator[str]]: Complete response or stream of text chunks + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + # Streaming graph RAG + for chunk in flow.graph_rag( + query="Tell me about Marie Curie", + user="trustgraph", + collection="scientists", + streaming=True + ): + print(chunk, end='', flush=True) + ``` + """ request = { "query": query, "user": user, @@ -344,7 +513,39 @@ class SocketFlowInstance: streaming: bool = False, **kwargs: Any ) -> Union[str, Iterator[str]]: - """Document RAG with optional streaming""" + """ + Execute document-based RAG query with optional streaming. + + Uses vector embeddings to find relevant document chunks, then generates + a response using an LLM. Streaming mode delivers results incrementally. + + Args: + query: Natural language query + user: User/keyspace identifier + collection: Collection identifier + doc_limit: Maximum document chunks to retrieve (default: 10) + streaming: Enable streaming mode (default: False) + **kwargs: Additional parameters passed to the service + + Returns: + Union[str, Iterator[str]]: Complete response or stream of text chunks + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + # Streaming document RAG + for chunk in flow.document_rag( + query="Summarize the key findings", + user="trustgraph", + collection="research-papers", + doc_limit=5, + streaming=True + ): + print(chunk, end='', flush=True) + ``` + """ request = { "query": query, "user": user, @@ -374,7 +575,32 @@ class SocketFlowInstance: streaming: bool = False, **kwargs: Any ) -> Union[str, Iterator[str]]: - """Execute prompt with optional streaming""" + """ + Execute a prompt template with optional streaming. + + Args: + id: Prompt template identifier + variables: Dictionary of variable name to value mappings + streaming: Enable streaming mode (default: False) + **kwargs: Additional parameters passed to the service + + Returns: + Union[str, Iterator[str]]: Complete response or stream of text chunks + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + # Streaming prompt execution + for chunk in flow.prompt( + id="summarize-template", + variables={"topic": "quantum computing", "length": "brief"}, + streaming=True + ): + print(chunk, end='', flush=True) + ``` + """ request = { "id": id, "variables": variables, @@ -397,7 +623,32 @@ class SocketFlowInstance: limit: int = 10, **kwargs: Any ) -> Dict[str, Any]: - """Query graph embeddings for semantic search""" + """ + Query knowledge graph entities using semantic similarity. + + Args: + text: Query text for semantic search + user: User/keyspace identifier + collection: Collection identifier + limit: Maximum number of results (default: 10) + **kwargs: Additional parameters passed to the service + + Returns: + dict: Query results with similar entities + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + results = flow.graph_embeddings_query( + text="physicist who discovered radioactivity", + user="trustgraph", + collection="scientists", + limit=5 + ) + ``` + """ request = { "text": text, "user": user, @@ -409,7 +660,25 @@ class SocketFlowInstance: return self.client._send_request_sync("graph-embeddings", self.flow_id, request, False) def embeddings(self, text: str, **kwargs: Any) -> Dict[str, Any]: - """Generate text embeddings""" + """ + Generate vector embeddings for text. + + Args: + text: Input text to embed + **kwargs: Additional parameters passed to the service + + Returns: + dict: Response containing vectors + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + result = flow.embeddings("quantum computing") + vectors = result.get("vectors", []) + ``` + """ request = {"text": text} request.update(kwargs) @@ -425,7 +694,34 @@ class SocketFlowInstance: limit: int = 100, **kwargs: Any ) -> Dict[str, Any]: - """Triple pattern query""" + """ + Query knowledge graph triples using pattern matching. + + Args: + s: Subject URI (optional, use None for wildcard) + p: Predicate URI (optional, use None for wildcard) + o: Object URI or Literal (optional, use None for wildcard) + user: User/keyspace identifier (optional) + collection: Collection identifier (optional) + limit: Maximum results to return (default: 100) + **kwargs: Additional parameters passed to the service + + Returns: + dict: Query results with matching triples + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + # Find all triples about a specific subject + result = flow.triples_query( + s="http://example.org/person/marie-curie", + user="trustgraph", + collection="scientists" + ) + ``` + """ request = {"limit": limit} if s is not None: request["s"] = str(s) @@ -450,7 +746,41 @@ class SocketFlowInstance: operation_name: Optional[str] = None, **kwargs: Any ) -> Dict[str, Any]: - """GraphQL query""" + """ + Execute a GraphQL query against structured objects. + + Args: + query: GraphQL query string + user: User/keyspace identifier + collection: Collection identifier + variables: Optional query variables dictionary + operation_name: Optional operation name for multi-operation documents + **kwargs: Additional parameters passed to the service + + Returns: + dict: GraphQL response with data, errors, and/or extensions + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + query = ''' + { + scientists(limit: 10) { + name + field + discoveries + } + } + ''' + result = flow.objects_query( + query=query, + user="trustgraph", + collection="scientists" + ) + ``` + """ request = { "query": query, "user": user, @@ -470,7 +800,28 @@ class SocketFlowInstance: parameters: Dict[str, Any], **kwargs: Any ) -> Dict[str, Any]: - """Execute MCP tool""" + """ + Execute a Model Context Protocol (MCP) tool. + + Args: + name: Tool name/identifier + parameters: Tool parameters dictionary + **kwargs: Additional parameters passed to the service + + Returns: + dict: Tool execution result + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + result = flow.mcp_tool( + name="search-web", + parameters={"query": "latest AI news", "limit": 5} + ) + ``` + """ request = { "name": name, "parameters": parameters diff --git a/trustgraph-base/trustgraph/api/types.py b/trustgraph-base/trustgraph/api/types.py index a8608853..3b4e476e 100644 --- a/trustgraph-base/trustgraph/api/types.py +++ b/trustgraph-base/trustgraph/api/types.py @@ -1,3 +1,9 @@ +""" +TrustGraph API Type Definitions + +Data classes and type definitions for TrustGraph API objects including knowledge +graph elements, metadata structures, and streaming response chunks. +""" import dataclasses import datetime @@ -6,23 +12,59 @@ from .. knowledge import hash, Uri, Literal @dataclasses.dataclass class Triple: + """ + RDF triple representing a knowledge graph statement. + + Attributes: + s: Subject (entity URI or value) + p: Predicate (relationship URI) + o: Object (entity URI, literal value, or typed value) + """ s : str p : str o : str @dataclasses.dataclass class ConfigKey: + """ + Configuration key identifier. + + Attributes: + type: Configuration type/category (e.g., "llm", "embedding") + key: Specific configuration key within the type + """ type : str key : str @dataclasses.dataclass class ConfigValue: + """ + Configuration key-value pair. + + Attributes: + type: Configuration type/category + key: Specific configuration key + value: Configuration value as string + """ type : str key : str value : str @dataclasses.dataclass class DocumentMetadata: + """ + Metadata for a document in the library. + + Attributes: + id: Unique document identifier + time: Document creation/upload timestamp + kind: Document MIME type (e.g., "application/pdf", "text/plain") + title: Document title + comments: Additional comments or description + metadata: List of RDF triples providing structured metadata + user: User/owner identifier + tags: List of tags for categorization + """ id : str time : datetime.datetime kind : str @@ -34,6 +76,18 @@ class DocumentMetadata: @dataclasses.dataclass class ProcessingMetadata: + """ + Metadata for an active document processing job. + + Attributes: + id: Unique processing job identifier + document_id: ID of the document being processed + time: Processing start timestamp + flow: Flow instance handling the processing + user: User identifier + collection: Target collection for processed data + tags: List of tags for categorization + """ id : str document_id : str time : datetime.datetime @@ -44,6 +98,19 @@ class ProcessingMetadata: @dataclasses.dataclass class CollectionMetadata: + """ + Metadata for a data collection. + + Collections provide logical grouping and isolation for documents and + knowledge graph data. + + Attributes: + user: User/owner identifier + collection: Collection identifier + name: Human-readable collection name + description: Collection description + tags: List of tags for categorization + """ user : str collection : str name : str @@ -54,29 +121,80 @@ class CollectionMetadata: @dataclasses.dataclass class StreamingChunk: - """Base class for streaming chunks""" + """ + Base class for streaming response chunks. + + Used for WebSocket-based streaming operations where responses are delivered + incrementally as they are generated. + + Attributes: + content: The text content of this chunk + end_of_message: True if this is the final chunk of a message segment + """ content: str end_of_message: bool = False @dataclasses.dataclass class AgentThought(StreamingChunk): - """Agent reasoning chunk""" + """ + Agent reasoning/thought process chunk. + + Represents the agent's internal reasoning or planning steps during execution. + These chunks show how the agent is thinking about the problem. + + Attributes: + content: Agent's thought text + end_of_message: True if this completes the current thought + chunk_type: Always "thought" + """ chunk_type: str = "thought" @dataclasses.dataclass class AgentObservation(StreamingChunk): - """Agent tool observation chunk""" + """ + Agent tool execution observation chunk. + + Represents the result or observation from executing a tool or action. + These chunks show what the agent learned from using tools. + + Attributes: + content: Observation text describing tool results + end_of_message: True if this completes the current observation + chunk_type: Always "observation" + """ chunk_type: str = "observation" @dataclasses.dataclass class AgentAnswer(StreamingChunk): - """Agent final answer chunk""" + """ + Agent final answer chunk. + + Represents the agent's final response to the user's query after completing + its reasoning and tool use. + + Attributes: + content: Answer text + end_of_message: True if this completes the current answer segment + end_of_dialog: True if this completes the entire agent interaction + chunk_type: Always "final-answer" + """ chunk_type: str = "final-answer" end_of_dialog: bool = False @dataclasses.dataclass class RAGChunk(StreamingChunk): - """RAG streaming chunk""" + """ + RAG (Retrieval-Augmented Generation) streaming chunk. + + Used for streaming responses from graph RAG, document RAG, text completion, + and other generative services. + + Attributes: + content: Generated text content + end_of_stream: True if this is the final chunk of the stream + error: Optional error information if an error occurred + chunk_type: Always "rag" + """ chunk_type: str = "rag" end_of_stream: bool = False error: Optional[Dict[str, str]] = None