trustgraph/docs/python-api.zh-cn.md
Alex Jenkins 8954fa3ad7 Feat: TrustGraph i18n & Documentation Translation Updates (#781)
Native CLI i18n: The TrustGraph CLI has built-in translation support
that dynamically loads language strings. You can test and use
different languages by simply passing the --lang flag (e.g., --lang
es for Spanish, --lang ru for Russian) or by configuring your
environment's LANG variable.

Automated Docs Translations: This PR introduces autonomously
translated Markdown documentation into several target languages,
including Spanish, Swahili, Portuguese, Turkish, Hindi, Hebrew,
Arabic, Simplified Chinese, and Russian.
2026-04-14 12:08:32 +01:00

95 KiB
Raw Permalink Blame History

layout title parent
default TrustGraph Python API 参考 Chinese (Beta)

TrustGraph Python API 参考

Beta Translation: This document was translated via Machine Learning and as such may not be 100% accurate. All non-English languages are currently classified as Beta.

安装

pip install trustgraph

快速开始

所有类和类型都从 trustgraph.api 包中导入:

from trustgraph.api import Api, Triple, ConfigKey

# Create API client
api = Api(url="http://localhost:8088/")

# Get a flow instance
flow = api.flow().id("default")

# Execute a graph RAG query
response = flow.graph_rag(
    query="What are the main topics?",
    user="trustgraph",
    collection="default"
)

目录

核心

Api

Flow 客户端

Flow FlowInstance AsyncFlow AsyncFlowInstance

WebSocket 客户端

SocketClient SocketFlowInstance AsyncSocketClient AsyncSocketFlowInstance

批量操作

BulkClient AsyncBulkClient

指标

Metrics AsyncMetrics

数据类型

Triple ConfigKey ConfigValue DocumentMetadata ProcessingMetadata CollectionMetadata StreamingChunk AgentThought AgentObservation AgentAnswer RAGChunk

异常

ProtocolException TrustGraphException AgentError ConfigError DocumentRagError FlowError GatewayError GraphRagError LLMError LoadError LookupError NLPQueryError RowsQueryError RequestError StructuredQueryError UnexpectedError ApplicationException

--

Api

from trustgraph.api import Api

主要的 TrustGraph API 客户端,用于同步和异步操作。

此类提供对所有 TrustGraph 服务的访问,包括流程管理、 知识图谱操作、文档处理、RAG 查询等等。它支持 基于 REST 和基于 WebSocket 的通信模式。

客户端可以作为上下文管理器使用,以进行自动资源清理: python with Api(url="http://localhost:8088/") as api: result = api.flow().id("default").graph_rag(query="test")

方法

__aenter__(self)

进入异步上下文管理器。

__aexit__(self, *args)

退出异步上下文管理器并关闭连接。

__enter__(self)

进入同步上下文管理器。

__exit__(self, *args)

退出同步上下文管理器并关闭连接。

__init__(self, url='http://localhost:8088/', timeout=60, token: str | None = None)

初始化 TrustGraph API 客户端。

参数:

url: TrustGraph API 的基本 URL (默认值: "http://localhost:8088/"") timeout: 请求超时时间,单位为秒 (默认值: 60) token: 可选的用于身份验证的 bearer token

示例:

# Local development
api = Api()

# Production with authentication
api = Api(
    url="https://trustgraph.example.com/",
    timeout=120,
    token="your-api-token"
)

aclose(self)

关闭所有异步客户端连接。

此方法关闭异步 WebSocket、批量操作和流连接。 当退出异步上下文管理器时,它会自动调用。

示例:

api = Api()
async_socket = api.async_socket()
# ... use async_socket
await api.aclose()  # Clean up connections

# Or use async context manager (automatic cleanup)
async with Api() as api:
    async_socket = api.async_socket()
    # ... use async_socket
# Automatically closed

async_bulk(self)

获取一个异步批量操作客户端。

提供通过 WebSocket 进行异步/等待风格的批量导入/导出操作,以便高效处理大型数据集。

返回值: AsyncBulkClient: 异步批量操作客户端

示例:

async_bulk = api.async_bulk()

# Export triples asynchronously
async for triple in async_bulk.export_triples(flow="default"):
    print(f"{triple.s} {triple.p} {triple.o}")

# Import with async generator
async def triple_gen():
    yield Triple(s="subj", p="pred", o="obj")
    # ... more triples

await async_bulk.import_triples(
    flow="default",
    triples=triple_gen()
)

async_flow(self)

获取一个基于异步 REST 的流程客户端。

提供对流程操作的 async/await 风格访问。这对于异步 Python 应用程序和框架FastAPI、aiohttp 等)是首选。

返回值: AsyncFlow: 异步流程客户端

示例:

async_flow = api.async_flow()

# List flows
flow_ids = await async_flow.list()

# Execute operations
instance = async_flow.id("default")
result = await instance.text_completion(
    system="You are helpful",
    prompt="Hello"
)

async_metrics(self)

获取一个异步指标客户端。

提供对 Prometheus 指标的 async/await 风格访问。

返回值: AsyncMetrics: 异步指标客户端

示例:

async_metrics = api.async_metrics()
prometheus_text = await async_metrics.get()
print(prometheus_text)

async_socket(self)

获取一个异步 WebSocket 客户端,用于流式操作。

提供异步/等待 (async/await) 风格的 WebSocket 访问,并支持流式传输。 这是在 Python 中进行异步流式传输的首选方法。

返回值: AsyncSocketClient: 异步 WebSocket 客户端

示例:

async_socket = api.async_socket()
flow = async_socket.flow("default")

# Stream agent responses
async for chunk in flow.agent(
    question="Explain quantum computing",
    user="trustgraph",
    streaming=True
):
    if hasattr(chunk, 'content'):
        print(chunk.content, end='', flush=True)

bulk(self)

获取用于导入/导出的同步批量操作客户端。

批量操作允许通过 WebSocket 连接高效地传输大型数据集,包括三元组、嵌入、实体上下文和对象。

返回值: BulkClient: 同步批量操作客户端

示例:

bulk = api.bulk()

# Export triples
for triple in bulk.export_triples(flow="default"):
    print(f"{triple.s} {triple.p} {triple.o}")

# Import triples
def triple_generator():
    yield Triple(s="subj", p="pred", o="obj")
    # ... more triples

bulk.import_triples(flow="default", triples=triple_generator())

close(self)

关闭所有同步客户端连接。

此方法关闭 WebSocket 和批量操作连接。 它在退出上下文管理器时会自动调用。

示例:

api = Api()
socket = api.socket()
# ... use socket
api.close()  # Clean up connections

# Or use context manager (automatic cleanup)
with Api() as api:
    socket = api.socket()
    # ... use socket
# Automatically closed

collection(self)

获取一个 Collection 客户端,用于管理数据集合。

集合将文档和知识图谱数据组织成 逻辑分组,用于隔离和访问控制。

返回值: Collection: 集合管理客户端

示例:

collection = api.collection()

# List collections
colls = collection.list_collections(user="trustgraph")

# Update collection metadata
collection.update_collection(
    user="trustgraph",
    collection="default",
    name="Default Collection",
    description="Main data collection"
)

config(self)

获取一个 Config 客户端,用于管理配置设置。

返回值: Config: 配置管理客户端

示例:

config = api.config()

# Get configuration values
values = config.get([ConfigKey(type="llm", key="model")])

# Set configuration
config.put([ConfigValue(type="llm", key="model", value="gpt-4")])

flow(self)

获取一个 Flow 客户端,用于管理和与流程进行交互。

Flows 是 TrustGraph 中的主要执行单元,提供对 诸如代理、RAG 查询、嵌入和文档处理等服务的访问。

返回值: Flow: Flow 管理客户端

示例:

flow_client = api.flow()

# List available blueprints
blueprints = flow_client.list_blueprints()

# Get a specific flow instance
flow_instance = flow_client.id("default")
response = flow_instance.text_completion(
    system="You are helpful",
    prompt="Hello"
)

knowledge(self)

获取一个 Knowledge 客户端,用于管理知识图谱核心。

返回值: Knowledge: 知识图谱管理客户端

示例:

knowledge = api.knowledge()

# List available KG cores
cores = knowledge.list_kg_cores(user="trustgraph")

# Load a KG core
knowledge.load_kg_core(id="core-123", user="trustgraph")

library(self)

获取一个用于文档管理的库客户端。

该库提供文档存储、元数据管理以及 处理工作流程协调功能。

返回值: Library: 文档库管理客户端

示例:

library = api.library()

# Add a document
library.add_document(
    document=b"Document content",
    id="doc-123",
    metadata=[],
    user="trustgraph",
    title="My Document",
    comments="Test document"
)

# List documents
docs = library.get_documents(user="trustgraph")

metrics(self)

获取一个同步指标客户端,用于监控。

从 TrustGraph 服务获取 Prometheus 格式的指标,用于监控和可观察性。

返回值: 指标:同步指标客户端

示例:

metrics = api.metrics()
prometheus_text = metrics.get()
print(prometheus_text)

request(self, path, request)

构造一个低级别的 REST API 请求。

此方法主要用于内部使用,但可以在需要时用于直接 API 访问。

参数:

path: API 端点路径(相对于基本 URL request: 请求负载,以字典形式表示

返回值: dict: 响应对象

抛出异常:

ProtocolException: 如果响应状态码不是 200 或响应不是 JSON 格式 ApplicationException: 如果响应包含错误

示例:

response = api.request("flow", {
    "operation": "list-flows"
})

socket(self)

获取一个同步的 WebSocket 客户端,用于流式操作。

WebSocket 连接提供流式支持,用于实时响应 来自代理、RAG 查询和文本补全。此方法返回一个 对 WebSocket 协议的同步包装器。

返回值: SocketClient: 同步 WebSocket 客户端

示例:

socket = api.socket()
flow = socket.flow("default")

# Stream agent responses
for chunk in flow.agent(
    question="Explain quantum computing",
    user="trustgraph",
    streaming=True
):
    if hasattr(chunk, 'content'):
        print(chunk.content, end='', flush=True)

--

Flow

from trustgraph.api import Flow

用于蓝图和流程实例操作的流程管理客户端。

此类提供用于管理流程蓝图(模板)和 流程实例(正在运行的流程)的方法。蓝图定义了流程的结构和 参数,而实例代表可以 执行服务的活动流程。

方法

__init__(self, api)

初始化流程客户端。

参数:

api: 用于发出请求的父级 API 实例。

delete_blueprint(self, blueprint_name)

删除一个流程蓝图。

参数:

blueprint_name: 要删除的蓝图的名称。

示例:

api.flow().delete_blueprint("old-blueprint")

get(self, id)

获取正在运行的流程实例的定义。

参数:

id: 流程实例 ID

返回值: dict: 流程实例定义

示例:

flow_def = api.flow().get("default")
print(flow_def)

get_blueprint(self, blueprint_name)

通过名称获取流程蓝图定义。

参数:

blueprint_name: 要检索的蓝图的名称

返回值: dict: 蓝图定义,以字典形式返回

示例:

blueprint = api.flow().get_blueprint("default")
print(blueprint)  # Blueprint configuration

id(self, id='default')

获取一个 FlowInstance用于在特定流程上执行操作。

参数:

id: 流程标识符 (默认值: "default")

返回值: FlowInstance: 用于服务操作的流程实例

示例:

flow = api.flow().id("my-flow")
response = flow.text_completion(
    system="You are helpful",
    prompt="Hello"
)

list(self)

列出所有活动流程实例。

返回值: list[str]: 流程实例 ID 列表

示例:

flows = api.flow().list()
print(flows)  # ['default', 'flow-1', 'flow-2', ...]

list_blueprints(self)

列出所有可用的流程蓝图。

返回值: list[str]: 蓝图名称列表

示例:

blueprints = api.flow().list_blueprints()
print(blueprints)  # ['default', 'custom-flow', ...]

put_blueprint(self, blueprint_name, definition)

创建或更新流程蓝图。

参数:

blueprint_name: 蓝图的名称 definition: 蓝图定义字典

示例:

definition = {
    "services": ["text-completion", "graph-rag"],
    "parameters": {"model": "gpt-4"}
}
api.flow().put_blueprint("my-blueprint", definition)

request(self, path=None, request=None)

发送一个流程范围内的 API 请求。

参数:

path: 可选的流程端点路径后缀 request: 请求负载字典

返回值: dict: 响应对象

异常:

RuntimeError: 如果未指定请求参数

start(self, blueprint_name, id, description, parameters=None)

从蓝图启动一个新的流程实例。

参数:

blueprint_name: 要实例化的蓝图名称 id: 流程实例的唯一标识符 description: 人类可读的描述 parameters: 可选的参数字典

示例:

api.flow().start(
    blueprint_name="default",
    id="my-flow",
    description="My custom flow",
    parameters={"model": "gpt-4"}
)

stop(self, id)

停止正在运行的流程实例。

参数:

id: 要停止的流程实例 ID

示例:

api.flow().stop("my-flow")

--

FlowInstance

from trustgraph.api import FlowInstance

用于在特定流程上执行服务的流程实例客户端。

此类提供对所有 TrustGraph 服务的访问,包括: 文本补全和嵌入 具有状态管理的代理操作 图表和文档 RAG 查询 知识图谱操作(三元组、对象) 文档加载和处理 自然语言到 GraphQL 查询的转换 结构化数据分析和模式检测 MCP 工具执行 提示模板

通过运行的流程实例访问服务,该实例由 ID 标识。

方法

__init__(self, api, id)

初始化 FlowInstance。

参数:

api: 父级 Flow 客户端 id: 流程实例标识符

agent(self, question, user='trustgraph', state=None, group=None, history=None)

执行具有推理和工具使用功能的代理操作。

代理可以执行多步骤推理、使用工具,并在交互过程中维护对话 状态。 这是一个同步的非流版本。

参数:

question: 用户问题或指令 user: 用户标识符(默认为:"trustgraph" state: 可选的状态字典,用于具有状态的对话 group: 可选的组标识符,用于多用户上下文 history: 可选的对话历史记录,以消息字典列表的形式

返回值: str: 代理的最终答案

示例:

flow = api.flow().id("default")

# Simple question
answer = flow.agent(
    question="What is the capital of France?",
    user="trustgraph"
)

# With conversation history
history = [
    {"role": "user", "content": "Hello"},
    {"role": "assistant", "content": "Hi! How can I help?"}
]
answer = flow.agent(
    question="Tell me about Paris",
    user="trustgraph",
    history=history
)

detect_type(self, sample)

检测结构化数据样本的数据类型。

参数:

sample: 要分析的数据样本(字符串内容)

返回值: 包含 detected_type、confidence 以及可选元数据的字典。

diagnose_data(self, sample, schema_name=None, options=None)

执行综合数据诊断:检测类型并生成描述符。

参数:

sample: 要分析的数据样本(字符串内容) schema_name: 可选的目标模式名称,用于生成描述符。 options: 可选参数例如CSV 的分隔符)。

返回值: 包含 detected_type、confidence、descriptor 和元数据的字典。

document_embeddings_query(self, text, user, collection, limit=10)

使用语义相似度查询文档块。

查找内容在语义上与输入文本相似的文档块,使用向量嵌入。

参数:

text: 用于语义搜索的查询文本。 user: 用户/键空间标识符。 collection: 集合标识符。 limit: 最大结果数量(默认为 10

返回值: 字典:包含 chunk_id 和 score 的查询结果。

示例:

flow = api.flow().id("default")
results = flow.document_embeddings_query(
    text="machine learning algorithms",
    user="trustgraph",
    collection="research-papers",
    limit=5
)
# results contains {"chunks": [{"chunk_id": "doc1/p0/c0", "score": 0.95}, ...]}

document_rag(self, query, user='trustgraph', collection='default', doc_limit=10)

执行基于文档的检索增强生成 (RAG) 查询。

文档 RAG 使用向量嵌入来查找相关的文档片段, 然后使用 LLM并将这些片段作为上下文来生成响应。

参数:

query: 自然语言查询 user: 用户/键空间标识符 (默认值: "trustgraph") collection: 集合标识符 (默认值: "default") doc_limit: 要检索的最大文档片段数 (默认值: 10)

返回值: str: 包含文档上下文的生成响应

示例:

flow = api.flow().id("default")
response = flow.document_rag(
    query="Summarize the key findings",
    user="trustgraph",
    collection="research-papers",
    doc_limit=5
)
print(response)

embeddings(self, texts)

为一个或多个文本生成向量嵌入。

将文本转换为适合语义 搜索和相似度比较的密集向量表示。

参数:

texts: 要嵌入的输入文本列表

返回值: list[list[list[float]]]: 向量嵌入,每个输入文本对应一个集合

示例:

flow = api.flow().id("default")
vectors = flow.embeddings(["quantum computing"])
print(f"Embedding dimension: {len(vectors[0][0])}")

generate_descriptor(self, sample, data_type, schema_name, options=None)

生成用于将结构化数据映射到特定模式的描述符。

参数:

sample: 用于分析的数据样本(字符串内容) data_type: 数据类型csv, json, xml schema_name: 用于生成描述符的目标模式名称 options: 可选参数例如CSV的分隔符

返回值: 包含描述符和元数据的字典

graph_embeddings_query(self, text, user, collection, limit=10)

使用语义相似性查询知识图谱实体。

查找知识图谱中描述与输入文本在语义上 相似的实体,使用向量嵌入。

参数:

text: 用于语义搜索的查询文本 user: 用户/键空间标识符 collection: 集合标识符 limit: 最大结果数量(默认为 10

返回值: 字典:包含相似实体的查询结果

示例:

flow = api.flow().id("default")
results = flow.graph_embeddings_query(
    text="physicist who discovered radioactivity",
    user="trustgraph",
    collection="scientists",
    limit=5
)
# results contains {"entities": [{"entity": {...}, "score": 0.95}, ...]}

graph_rag(self, query, user='trustgraph', collection='default', entity_limit=50, triple_limit=30, max_subgraph_size=150, max_path_length=2)

执行基于图的检索增强生成 (RAG) 查询。

图 RAG 使用知识图谱结构来通过 遍历实体关系来查找相关上下文,然后使用 LLM 生成响应。

参数:

query: 自然语言查询 user: 用户/键空间标识符 (默认为: "trustgraph") collection: 集合标识符 (默认为: "default") entity_limit: 检索到的最大实体数量 (默认为: 50) triple_limit: 每个实体的最大三元组数量 (默认为: 30) max_subgraph_size: 子图中最大总三元组数量 (默认为: 150) max_path_length: 最大遍历深度 (默认为: 2)

返回值: str: 包含图上下文的生成响应

示例:

flow = api.flow().id("default")
response = flow.graph_rag(
    query="Tell me about Marie Curie's discoveries",
    user="trustgraph",
    collection="scientists",
    entity_limit=20,
    max_path_length=3
)
print(response)

load_document(self, document, id=None, metadata=None, user=None, collection=None)

加载一个二进制文档以进行处理。

上传一个文档PDF、DOCX、图像等以便从流程的文档管道中提取和 处理。

参数:

document: 文档内容,以字节形式表示 id: 可选的文档标识符(如果为 None则自动生成 metadata: 可选的元数据(三元组列表或具有 emit 方法的对象) user: 用户/键空间标识符(可选) collection: 集合标识符(可选)

返回值: dict: 处理响应

引发:

RuntimeError: 如果提供了元数据,但没有提供 id

示例:

flow = api.flow().id("default")

# Load a PDF document
with open("research.pdf", "rb") as f:
    result = flow.load_document(
        document=f.read(),
        id="research-001",
        user="trustgraph",
        collection="papers"
    )

load_text(self, text, id=None, metadata=None, charset='utf-8', user=None, collection=None)

加载用于处理的文本内容。

上传文本内容,以便通过流程的 文本管道进行提取和处理。

参数:

text: 文本内容,以字节形式 id: 可选的文档标识符(如果为 None则自动生成 metadata: 可选的元数据(三元组列表或具有 emit 方法的对象) charset: 字符编码(默认值:"utf-8" user: 用户/键空间标识符(可选) collection: 集合标识符(可选)

返回值: dict: 处理响应

异常:

RuntimeError: 如果提供了元数据,但没有提供 id

示例:

flow = api.flow().id("default")

# Load text content
text_content = b"This is the document content..."
result = flow.load_text(
    text=text_content,
    id="text-001",
    charset="utf-8",
    user="trustgraph",
    collection="documents"
)

mcp_tool(self, name, parameters={})

执行一个模型上下文协议 (MCP) 工具。

MCP 工具提供可扩展的功能,用于代理和工作流程, 允许与外部系统和服务集成。

参数:

name: 工具名称/标识符 parameters: 工具参数字典 (默认: {})

返回值: str 或 dict: 工具执行结果

异常:

ProtocolException: 如果响应格式无效

示例:

flow = api.flow().id("default")

# Execute a tool
result = flow.mcp_tool(
    name="search-web",
    parameters={"query": "latest AI news", "limit": 5}
)

nlp_query(self, question, max_results=100)

将自然语言问题转换为 GraphQL 查询。

参数:

question: 自然语言问题 max_results: 返回的最大结果数量默认为100

返回值: 包含 graphql_query、variables、detected_schemas 和 confidence 的字典。

prompt(self, id, variables)

执行带有变量替换的提示模板。

提示模板允许使用可重用的提示模式,并带有动态变量。 这种方法对于一致的提示工程非常有用。

参数:

id: 提示模板标识符 variables: 变量名称到值的映射字典

返回值: str 或 dict: 渲染后的提示结果(文本或结构化对象)

引发:

ProtocolException: 如果响应格式无效

示例:

flow = api.flow().id("default")

# Text template
result = flow.prompt(
    id="summarize-template",
    variables={"topic": "quantum computing", "length": "brief"}
)

# Structured template
result = flow.prompt(
    id="extract-entities",
    variables={"text": "Marie Curie won Nobel Prizes"}
)

request(self, path, request)

在此流程实例中发起服务请求。

参数:

path: 服务路径(例如,"service/text-completion" request: 请求负载字典

返回值: dict: 服务响应

row_embeddings_query(self, text, schema_name, user='trustgraph', collection='default', index_name=None, limit=10)

使用语义相似性在索引字段上查询行数据。

查找索引字段值在语义上与输入文本相似的行,使用向量嵌入。这实现了对结构化数据的模糊/语义匹配。

参数:

text: 用于语义搜索的查询文本。 schema_name: 要搜索的模式名称。 user: 用户/键空间标识符(默认为:"trustgraph")。 collection: 集合标识符(默认为:"default")。 index_name: 可选的索引名称,用于将搜索限制到特定的索引。 limit: 最大结果数量默认为10

返回值: dict: 查询结果,包含 index_name、index_value、text 和 score。

示例:

flow = api.flow().id("default")

# Search for customers by name similarity
results = flow.row_embeddings_query(
    text="John Smith",
    schema_name="customers",
    user="trustgraph",
    collection="sales",
    limit=5
)

# Filter to specific index
results = flow.row_embeddings_query(
    text="machine learning engineer",
    schema_name="employees",
    index_name="job_title",
    limit=10
)

rows_query(self, query, user='trustgraph', collection='default', variables=None, operation_name=None)

对知识图谱中的结构化行执行 GraphQL 查询。

使用 GraphQL 语法查询结构化数据,允许进行复杂的查询, 包括过滤、聚合和关系遍历。

参数:

query: GraphQL 查询字符串 user: 用户/键空间标识符(默认为:"trustgraph" collection: 集合标识符(默认为:"default" variables: 可选的查询变量字典 operation_name: 多操作文档的可选操作名称

返回值: dict: 包含 'data'、'errors' 和/或 'extensions' 字段的 GraphQL 响应

引发:

ProtocolException: 如果发生系统级错误

示例:

flow = api.flow().id("default")

# Simple query
query = '''
{
  scientists(limit: 10) {
    name
    field
    discoveries
  }
}
'''
result = flow.rows_query(
    query=query,
    user="trustgraph",
    collection="scientists"
)

# Query with variables
query = '''
query GetScientist($name: String!) {
  scientists(name: $name) {
    name
    nobelPrizes
  }
}
'''
result = flow.rows_query(
    query=query,
    variables={"name": "Marie Curie"}
)

schema_selection(self, sample, options=None)

使用提示分析,为数据样本选择匹配的模式。

参数:

sample: 要分析的数据样本(字符串内容) options: 可选参数

返回值: 包含 schema_matches 数组和元数据的字典。

structured_query(self, question, user='trustgraph', collection='default')

对结构化数据执行自然语言问题。 结合 NLP 查询转换和 GraphQL 执行。

参数:

question: 自然语言问题 user: Cassandra keyspace 标识符(默认为:"trustgraph" collection: 数据集合标识符(默认为:"default"

返回值: 包含数据和可选错误的字典。

text_completion(self, system, prompt)

使用流程的 LLM 执行文本补全。

参数:

system: 定义助手行为的系统提示。 prompt: 用户提示/问题

返回值: str: 生成的响应文本

示例:

flow = api.flow().id("default")
response = flow.text_completion(
    system="You are a helpful assistant",
    prompt="What is quantum computing?"
)
print(response)

triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=10000)

使用模式匹配查询知识图谱三元组。

搜索与给定的主语、谓语和/或 对象模式匹配的 RDF 三元组。 未指定的参数用作通配符。

参数:

s: 主语 URI (可选,使用 None 表示通配符) p: 谓语 URI (可选,使用 None 表示通配符) o: 对象 URI 或字面量 (可选,使用 None 表示通配符) user: 用户/键空间标识符 (可选) collection: 集合标识符 (可选) limit: 要返回的最大结果数 (默认10000)

返回值: list[Triple]: 匹配的三元组对象的列表

引发:

RuntimeError: 如果 s 或 p 不是 Uri或者 o 不是 Uri/Literal

示例:

from trustgraph.knowledge import Uri, Literal

flow = api.flow().id("default")

# Find all triples about a specific subject
triples = flow.triples_query(
    s=Uri("http://example.org/person/marie-curie"),
    user="trustgraph",
    collection="scientists"
)

# Find all instances of a specific relationship
triples = flow.triples_query(
    p=Uri("http://example.org/ontology/discovered"),
    limit=100
)

--

AsyncFlow

from trustgraph.api import AsyncFlow

使用 REST API 的异步流程管理客户端。

提供基于 async/await 的流程管理操作,包括列出、 启动、停止流程,以及管理流程类定义。 此外还提供对流程范围内的服务如代理、RAG 和查询)的访问,这些服务通过非流式 REST 接口提供。

注意:对于流式支持,请使用 AsyncSocketClient。

方法

__init__(self, url: str, timeout: int, token: str | None) -> None

初始化异步流程客户端。

参数:

urlTrustGraph API 的基本 URL timeout:请求超时时间(以秒为单位) token:可选的用于身份验证的 bearer token

aclose(self) -> None

关闭异步客户端并清理资源。

注意:清理由 aiohttp 会话上下文管理器自动处理。 此方法是为了与其他异步客户端保持一致而提供的。

delete_class(self, class_name: str)

删除一个流程类定义。

从系统中移除一个流程类蓝图。 不会影响 正在运行的流程实例。

参数:

class_name:要删除的流程类名称

示例:

async_flow = await api.async_flow()

# Delete a flow class
await async_flow.delete_class("old-flow-class")

get(self, id: str) -> Dict[str, Any]

获取流程定义。

获取完整的流程配置,包括其类名、 描述和参数。

参数:

id: 流程标识符

返回值: dict: 流程定义对象

示例:

async_flow = await api.async_flow()

# Get flow definition
flow_def = await async_flow.get("default")
print(f"Flow class: {flow_def.get('class-name')}")
print(f"Description: {flow_def.get('description')}")

get_class(self, class_name: str) -> Dict[str, Any]

获取流程类定义。

获取流程类的蓝图定义,包括其配置模式和服务绑定。

参数:

class_name: 流程类名称

返回值: dict: 流程类定义对象

示例:

async_flow = await api.async_flow()

# Get flow class definition
class_def = await async_flow.get_class("default")
print(f"Services: {class_def.get('services')}")

id(self, flow_id: str)

获取一个异步流程实例客户端。

返回一个客户端用于与特定流程的服务进行交互代理、RAG、查询、嵌入等

参数:

flow_id: 流程标识符

返回值: AsyncFlowInstance: 用于特定流程操作的客户端

示例:

async_flow = await api.async_flow()

# Get flow instance
flow = async_flow.id("default")

# Use flow services
result = await flow.graph_rag(
    query="What is TrustGraph?",
    user="trustgraph",
    collection="default"
)

list(self) -> List[str]

列出所有流程标识符。

获取系统中当前部署的所有流程的ID。

返回值: list[str]: 流程标识符列表

示例:

async_flow = await api.async_flow()

# List all flows
flows = await async_flow.list()
print(f"Available flows: {flows}")

list_classes(self) -> List[str]

列出所有流程类名称。

获取系统中所有可用流程类(蓝图)的名称。

返回值: list[str]: 流程类名称列表

示例:

async_flow = await api.async_flow()

# List available flow classes
classes = await async_flow.list_classes()
print(f"Available flow classes: {classes}")

put_class(self, class_name: str, definition: Dict[str, Any])

创建或更新一个流程类定义。

存储一个流程类蓝图,该蓝图可用于实例化流程。

参数:

class_name: 流程类名称 definition: 流程类定义对象

示例:

async_flow = await api.async_flow()

# Create a custom flow class
class_def = {
    "services": {
        "agent": {"module": "agent", "config": {...}},
        "graph-rag": {"module": "graph-rag", "config": {...}}
    }
}
await async_flow.put_class("custom-flow", class_def)

request(self, path: str, request_data: Dict[str, Any]) -> Dict[str, Any]

异步向网关API发送HTTP POST请求。

用于向TrustGraph API发送经过身份验证的请求的内部方法。

参数:

path: API端点路径相对于基本URL request_data: 请求负载字典

返回值: dict: 来自API的响应对象

引发:

ProtocolException:如果 HTTP 状态码不是 200 或响应不是有效的 JSON ApplicationException:如果 API 返回错误响应

start(self, class_name: str, id: str, description: str, parameters: Dict | None = None)

启动一个新的流程实例。

从指定的流程类定义创建并启动一个流程。

参数:

class_name:用于创建实例的流程类名称 id:新流程实例的标识符 description:流程的易于理解的描述 parameters:流程的可选配置参数

示例:

async_flow = await api.async_flow()

# Start a flow from a class
await async_flow.start(
    class_name="default",
    id="my-flow",
    description="Custom flow instance",
    parameters={"model": "claude-3-opus"}
)

stop(self, id: str)

停止正在运行的流程。

停止并移除一个流程实例,释放其资源。

参数:

id: 要停止的流程标识符

示例:

async_flow = await api.async_flow()

# Stop a flow
await async_flow.stop("my-flow")

--

AsyncFlowInstance

from trustgraph.api import AsyncFlowInstance

异步流程实例客户端。

提供对流程范围内的服务的异步/等待访问,包括代理、 RAG 查询、嵌入和图查询。所有操作返回完整的 响应(非流式传输)。

注意:对于流式传输支持,请使用 AsyncSocketFlowInstance。

方法

__init__(self, flow: trustgraph.api.async_flow.AsyncFlow, flow_id: str)

初始化异步流程实例。

参数:

flow: 父级 AsyncFlow 客户端 flow_id: 流程标识符

agent(self, question: str, user: str, state: Dict | None = None, group: str | None = None, history: List | None = None, **kwargs: Any) -> Dict[str, Any]

执行代理操作(非流式传输)。

运行代理以回答问题,可以选择使用对话状态和 历史记录。在代理完成处理后,返回完整的响应。

注意:此方法不支持流式传输。对于代理的实时想法和 观察结果,请使用 AsyncSocketFlowInstance.agent()。

参数:

question: 用户问题或指令 user: 用户标识符 state: 可选的状态字典,用于对话上下文 group: 可选的组标识符,用于会话管理 history: 可选的对话历史记录列表 **kwargs: 额外的特定于服务的参数

返回值: dict: 完整的代理响应,包括答案和元数据

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Execute agent
result = await flow.agent(
    question="What is the capital of France?",
    user="trustgraph"
)
print(f"Answer: {result.get('response')}")

document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, **kwargs: Any) -> str

执行基于文档的 RAG 查询(非流式)。

使用文档嵌入进行检索增强生成。 通过语义搜索检索相关的文档片段,然后根据检索到的文档生成 一个完整的响应。返回完整的响应。

注意:此方法不支持流式传输。对于流式 RAG 响应, 请使用 AsyncSocketFlowInstance.document_rag()。

参数:

query: 用户查询文本 user: 用户标识符 collection: 包含文档的集合标识符 doc_limit: 要检索的最大文档片段数量(默认为 10 **kwargs: 额外的特定于服务的参数

返回值: str: 基于文档数据的完整生成的响应

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Query documents
response = await flow.document_rag(
    query="What does the documentation say about authentication?",
    user="trustgraph",
    collection="docs",
    doc_limit=5
)
print(response)

embeddings(self, texts: list, **kwargs: Any)

为输入文本生成嵌入向量。

将文本转换为数值向量表示,使用流程配置的嵌入模型。适用于语义搜索和相似度 比较。

参数:

texts: 要嵌入的输入文本列表 **kwargs: 额外的特定于服务的参数

返回值: dict: 包含嵌入向量的响应

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Generate embeddings
result = await flow.embeddings(texts=["Sample text to embed"])
vectors = result.get("vectors")
print(f"Embedding dimension: {len(vectors[0][0])}")

graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any)

用于语义实体搜索的图嵌入查询。

对图实体嵌入执行语义搜索,以查找与输入文本最相关的实体。返回按相似度排序的实体。

参数:

text: 用于语义搜索的查询文本 user: 用户标识符 collection: 包含图嵌入的集合标识符 limit: 要返回的最大结果数默认为10 **kwargs: 额外的特定于服务的参数

返回值: dict: 包含按相似度排序的实体匹配结果,以及相似度分数。

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Find related entities
results = await flow.graph_embeddings_query(
    text="machine learning algorithms",
    user="trustgraph",
    collection="tech-kb",
    limit=5
)

for entity in results.get("entities", []):
    print(f"{entity['name']}: {entity['score']}")

graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, **kwargs: Any) -> str

执行基于图的 RAG 查询(非流式)。

使用知识图谱数据执行检索增强生成。 识别相关的实体及其关系,然后生成一个 基于图结构的回应。返回完整的响应。

注意:此方法不支持流式传输。对于流式 RAG 响应, 请使用 AsyncSocketFlowInstance.graph_rag()。

参数:

query: 用户查询文本 user: 用户标识符 collection: 包含知识图谱的集合标识符 max_subgraph_size: 每个子图中最大三元组数默认1000 max_subgraph_count: 要检索的最大子图数量默认5 max_entity_distance: 实体扩展的最大图距离默认3 **kwargs: 额外的特定于服务的参数

返回值: str: 基于图数据生成的完整响应

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Query knowledge graph
response = await flow.graph_rag(
    query="What are the relationships between these entities?",
    user="trustgraph",
    collection="medical-kb",
    max_subgraph_count=3
)
print(response)

request(self, service: str, request_data: Dict[str, Any]) -> Dict[str, Any]

向作用域为流程的服务发送请求。

用于在当前流程实例中调用服务的内部方法。

参数:

service: 服务名称(例如,"agent", "graph-rag", "triples" request_data: 服务请求负载

返回值: dict: 服务响应对象

抛出异常:

ProtocolException: 如果请求失败或响应无效 ApplicationException: 如果服务返回错误

row_embeddings_query(self, text: str, schema_name: str, user: str = 'trustgraph', collection: str = 'default', index_name: str | None = None, limit: int = 10, **kwargs: Any)

查询用于结构化数据语义搜索的行嵌入。

在行索引嵌入上执行语义搜索,以查找其索引字段值与输入文本最相似的行。 允许 对结构化数据进行模糊/语义匹配。

参数:

text: 用于语义搜索的查询文本。 schema_name: 要搜索的模式名称。 user: 用户标识符(默认值:"trustgraph")。 collection: 集合标识符(默认值:"default")。 index_name: 可选的索引名称,用于将搜索限制到特定的索引。 limit: 要返回的最大结果数量默认值10**kwargs: 额外的特定于服务的参数。

返回值: dict: 包含匹配项的响应,包含 index_name、index_value、text 和 score。

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Search for customers by name similarity
results = await flow.row_embeddings_query(
    text="John Smith",
    schema_name="customers",
    user="trustgraph",
    collection="sales",
    limit=5
)

for match in results.get("matches", []):
    print(f"{match['index_name']}: {match['index_value']} (score: {match['score']})")

rows_query(self, query: str, user: str, collection: str, variables: Dict | None = None, operation_name: str | None = None, **kwargs: Any)

在存储的行上执行 GraphQL 查询。

使用 GraphQL 语法查询结构化数据行。支持使用变量和命名操作的复杂查询。

参数:

query: GraphQL 查询字符串 user: 用户标识符 collection: 包含行的集合标识符 variables: 可选的 GraphQL 查询变量 operation_name: 用于多操作查询的可选操作名称 **kwargs: 额外的特定于服务的参数

返回值: dict: 包含数据和/或错误的 GraphQL 响应

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Execute GraphQL query
query = '''
    query GetUsers($status: String!) {
        users(status: $status) {
            id
            name
            email
        }
    }
'''

result = await flow.rows_query(
    query=query,
    user="trustgraph",
    collection="users",
    variables={"status": "active"}
)

for user in result.get("data", {}).get("users", []):
    print(f"{user['name']}: {user['email']}")

text_completion(self, system: str, prompt: str, **kwargs: Any) -> str

生成文本补全(非流式)。

根据系统提示和用户提示,从大型语言模型生成文本回复。 返回完整的回复文本。

注意:此方法不支持流式传输。对于流式文本生成, 请使用 AsyncSocketFlowInstance.text_completion()。

参数:

system: 定义大型语言模型行为的系统提示。 prompt: 用户提示或问题。 **kwargs: 额外的特定于服务的参数。

返回值: str: 完整的生成文本回复。

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Generate text
response = await flow.text_completion(
    system="You are a helpful assistant.",
    prompt="Explain quantum computing in simple terms."
)
print(response)

triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs: Any)

使用模式匹配查询 RDF 三元组。

搜索与指定的 subject主体、predicate谓词和/或 object对象模式匹配的三元组。 模式使用 None 作为通配符来匹配任何值。

参数:

s: Subject主体模式None 表示通配符) p: Predicate谓词模式None 表示通配符) o: Object对象模式None 表示通配符) user: 用户标识符None 表示所有用户) collection: Collection集合标识符None 表示所有集合) limit: 返回的三元组的最大数量默认100 **kwargs: 额外的特定于服务的参数

返回值: dict: 包含匹配的三元组的响应

示例:

async_flow = await api.async_flow()
flow = async_flow.id("default")

# Find all triples with a specific predicate
results = await flow.triples_query(
    p="knows",
    user="trustgraph",
    collection="social",
    limit=50
)

for triple in results.get("triples", []):
    print(f"{triple['s']} knows {triple['o']}")

--

SocketClient

from trustgraph.api import SocketClient

同步 WebSocket 客户端,用于流式操作。

提供了对基于 WebSocket 的 TrustGraph 服务的同步接口, 通过使用同步生成器包装异步 WebSocket 库,以提高易用性。 支持从代理接收流式响应、RAG 查询和文本补全。

注意:这是一个围绕异步 WebSocket 操作的同步包装器。对于 真正的异步支持,请使用 AsyncSocketClient。

方法

__init__(self, url: str, timeout: int, token: str | None) -> None

初始化同步 WebSocket 客户端。

参数:

urlTrustGraph API 的基本 URLHTTP/HTTPS 将转换为 WS/WSS timeoutWebSocket 超时时间(以秒为单位) token:可选的用于身份验证的 bearer token

close(self) -> None

关闭 WebSocket 连接。

注意:在异步代码中,清理由上下文管理器自动处理。

flow(self, flow_id: str) -> 'SocketFlowInstance'

获取用于 WebSocket 流式操作的 flow 实例。

参数:

flow_idFlow 标识符

返回值: SocketFlowInstance具有流式方法的 Flow 实例

示例:

socket = api.socket()
flow = socket.flow("default")

# Stream agent responses
for chunk in flow.agent(question="Hello", user="trustgraph", streaming=True):
    print(chunk.content, end='', flush=True)

--

SocketFlowInstance

from trustgraph.api import SocketFlowInstance

同步 WebSocket 流程实例,用于流式操作。

它提供与 REST FlowInstance 相同的接口,但具有基于 WebSocket 的 流式支持,用于实时响应。 所有方法都支持一个可选的 streaming 参数,以启用增量结果交付。

方法

__init__(self, client: trustgraph.api.socket_client.SocketClient, flow_id: str) -> None

初始化 socket 流程实例。

参数:

client: 父 SocketClient flow_id: 流程标识符

agent(self, question: str, user: str, state: Dict[str, Any] | None = None, group: str | None = None, history: List[Dict[str, Any]] | None = None, streaming: bool = False, **kwargs: Any) -> Dict[str, Any] | Iterator[trustgraph.api.types.StreamingChunk]

执行一个支持流式传输的代理操作。

代理可以执行涉及工具使用的多步骤推理。 即使当 streaming=False 时,此方法也会始终 返回流式传输的分块(想法、观察结果、答案),以显示代理的推理过程。

参数:

question: 用户问题或指令 user: 用户标识符 state: 可选的状态字典,用于具有状态的对话 group: 可选的组标识符,用于多用户环境 history: 可选的对话历史记录,以消息字典列表的形式 streaming: 启用流式传输模式默认False **kwargs: 传递给代理服务的其他参数

返回值: Iterator[StreamingChunk]: 代理的想法、观察结果和答案的流

示例:

socket = api.socket()
flow = socket.flow("default")

# Stream agent reasoning
for chunk in flow.agent(
    question="What is quantum computing?",
    user="trustgraph",
    streaming=True
):
    if isinstance(chunk, AgentThought):
        print(f"[Thinking] {chunk.content}")
    elif isinstance(chunk, AgentObservation):
        print(f"[Observation] {chunk.content}")
    elif isinstance(chunk, AgentAnswer):
        print(f"[Answer] {chunk.content}")

agent_explain(self, question: str, user: str, collection: str, state: Dict[str, Any] | None = None, group: str | None = None, history: List[Dict[str, Any]] | None = None, **kwargs: Any) -> Iterator[trustgraph.api.types.StreamingChunk | trustgraph.api.types.ProvenanceEvent]

执行具有可解释性支持的代理操作。

同时流式传输内容块AgentThought、AgentObservation、AgentAnswer 和溯源事件ProvenanceEvent。溯源事件包含 URI 可以使用 ExplainabilityClient 获取有关代理推理过程的详细信息。

代理跟踪信息包括: Session初始问题和会话元数据 Iterations每个思考/行动/观察周期 Conclusion最终答案

参数:

question:用户问题或指令 user:用户标识符 collection:用于存储溯源信息的集合标识符 state:可选的状态字典,用于有状态的对话 group:可选的组标识符,用于多用户上下文 history:可选的对话历史记录,以消息字典列表的形式 **kwargs:传递给代理服务的其他参数 Yields Union[StreamingChunk, ProvenanceEvent]:代理块和溯源事件

示例:

from trustgraph.api import Api, ExplainabilityClient, ProvenanceEvent
from trustgraph.api import AgentThought, AgentObservation, AgentAnswer

socket = api.socket()
flow = socket.flow("default")
explain_client = ExplainabilityClient(flow)

provenance_ids = []
for item in flow.agent_explain(
    question="What is the capital of France?",
    user="trustgraph",
    collection="default"
):
    if isinstance(item, AgentThought):
        print(f"[Thought] {item.content}")
    elif isinstance(item, AgentObservation):
        print(f"[Observation] {item.content}")
    elif isinstance(item, AgentAnswer):
        print(f"[Answer] {item.content}")
    elif isinstance(item, ProvenanceEvent):
        provenance_ids.append(item.explain_id)

# Fetch session trace after completion
if provenance_ids:
    trace = explain_client.fetch_agent_trace(
        provenance_ids[0],  # Session URI is first
        graph="urn:graph:retrieval",
        user="trustgraph",
        collection="default"
    )

document_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any) -> Dict[str, Any]

使用语义相似度查询文档块。

参数:

text: 用于语义搜索的查询文本 user: 用户/键空间标识符 collection: 集合标识符 limit: 最大结果数量默认为10 **kwargs: 传递给服务的其他参数

返回值: dict: 包含匹配文档块的 chunk_ids 的查询结果

示例:

socket = api.socket()
flow = socket.flow("default")

results = flow.document_embeddings_query(
    text="machine learning algorithms",
    user="trustgraph",
    collection="research-papers",
    limit=5
)
# results contains {"chunks": [{"chunk_id": "...", "score": 0.95}, ...]}

document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, streaming: bool = False, **kwargs: Any) -> str | Iterator[str]

执行基于文档的 RAG 查询,并可选择启用流式传输。

使用向量嵌入来查找相关的文档片段,然后使用 LLM 生成 响应。流式模式逐步提供结果。

参数:

query: 自然语言查询 user: 用户/键空间标识符 collection: 集合标识符 doc_limit: 要检索的最大文档片段数默认为10 streaming: 启用流式模式默认为False **kwargs: 传递给服务的其他参数

返回值: Union[str, Iterator[str]]: 完整的响应或文本片段的流

示例:

socket = api.socket()
flow = socket.flow("default")

# Streaming document RAG
for chunk in flow.document_rag(
    query="Summarize the key findings",
    user="trustgraph",
    collection="research-papers",
    doc_limit=5,
    streaming=True
):
    print(chunk, end='', flush=True)

document_rag_explain(self, query: str, user: str, collection: str, doc_limit: int = 10, **kwargs: Any) -> Iterator[trustgraph.api.types.RAGChunk | trustgraph.api.types.ProvenanceEvent]

执行支持可解释性的基于文档的 RAG 查询。

同时流式传输内容块RAGChunk和溯源事件ProvenanceEvent。 溯源事件包含 URI可以使用 ExplainabilityClient 获取详细信息,了解响应的生成方式。

文档 RAG 跟踪信息包括: 问题:用户的查询 探索从文档存储中检索到的块chunk_count 综合:生成的答案

参数:

query:自然语言查询 user:用户/键空间标识符 collection:集合标识符 doc_limit要检索的最大文档块数默认10 **kwargs:传递给服务的其他参数 Yields Union[RAGChunk, ProvenanceEvent]:内容块和溯源事件

示例:

from trustgraph.api import Api, ExplainabilityClient, RAGChunk, ProvenanceEvent

socket = api.socket()
flow = socket.flow("default")
explain_client = ExplainabilityClient(flow)

for item in flow.document_rag_explain(
    query="Summarize the key findings",
    user="trustgraph",
    collection="research-papers",
    doc_limit=5
):
    if isinstance(item, RAGChunk):
        print(item.content, end='', flush=True)
    elif isinstance(item, ProvenanceEvent):
        # Fetch entity details
        entity = explain_client.fetch_entity(
            item.explain_id,
            graph=item.explain_graph,
            user="trustgraph",
            collection="research-papers"
        )
        print(f"Event: {entity}", file=sys.stderr)

embeddings(self, texts: list, **kwargs: Any) -> Dict[str, Any]

为一个或多个文本生成向量嵌入。

参数:

texts: 要进行嵌入的输入文本列表。 **kwargs: 传递给服务的其他参数。

返回值: dict: 包含向量的响应(每个输入文本对应一个向量集合)。

示例:

socket = api.socket()
flow = socket.flow("default")

result = flow.embeddings(["quantum computing"])
vectors = result.get("vectors", [])

graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any) -> Dict[str, Any]

使用语义相似度查询知识图谱实体。

参数:

text: 用于语义搜索的查询文本 user: 用户/命名空间标识符 collection: 集合标识符 limit: 最大结果数量默认为10 **kwargs: 传递给服务的附加参数

返回值: dict: 包含相似实体的查询结果

示例:

socket = api.socket()
flow = socket.flow("default")

results = flow.graph_embeddings_query(
    text="physicist who discovered radioactivity",
    user="trustgraph",
    collection="scientists",
    limit=5
)

graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, streaming: bool = False, **kwargs: Any) -> str | Iterator[str]

执行基于图的关系检索增强生成RAG查询并可选择启用流式传输。

使用知识图谱结构来查找相关上下文然后使用大型语言模型LLM生成 响应。流式模式逐步提供结果。

参数:

query:自然语言查询 user:用户/键空间标识符 collection:集合标识符 max_subgraph_size子图中最大三元组数量默认为1000 max_subgraph_count最大子图数量默认为5 max_entity_distance最大遍历深度默认为3 streaming启用流式模式默认为False **kwargs:传递给服务的其他参数

返回值: Union[str, Iterator[str]]:完整的响应或文本块的流

示例:

socket = api.socket()
flow = socket.flow("default")

# Streaming graph RAG
for chunk in flow.graph_rag(
    query="Tell me about Marie Curie",
    user="trustgraph",
    collection="scientists",
    streaming=True
):
    print(chunk, end='', flush=True)

graph_rag_explain(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, **kwargs: Any) -> Iterator[trustgraph.api.types.RAGChunk | trustgraph.api.types.ProvenanceEvent]

执行具有可解释性支持的基于图的 RAG 查询。

同时流式传输内容块RAGChunk和溯源事件ProvenanceEvent。 溯源事件包含 URI可以使用 ExplainabilityClient 获取详细信息,了解响应的生成方式。

参数:

query: 自然语言查询 user: 用户/键空间标识符 collection: 集合标识符 max_subgraph_size: 子图中最大三元组数量默认1000 max_subgraph_count: 最大子图数量默认5 max_entity_distance: 最大遍历深度默认3 **kwargs: 传递给服务的其他参数 Yields: Union[RAGChunk, ProvenanceEvent]: 内容块和溯源事件

示例:

from trustgraph.api import Api, ExplainabilityClient, RAGChunk, ProvenanceEvent

socket = api.socket()
flow = socket.flow("default")
explain_client = ExplainabilityClient(flow)

provenance_ids = []
response_text = ""

for item in flow.graph_rag_explain(
    query="Tell me about Marie Curie",
    user="trustgraph",
    collection="scientists"
):
    if isinstance(item, RAGChunk):
        response_text += item.content
        print(item.content, end='', flush=True)
    elif isinstance(item, ProvenanceEvent):
        provenance_ids.append(item.provenance_id)

# Fetch explainability details
for prov_id in provenance_ids:
    entity = explain_client.fetch_entity(
        prov_id,
        graph="urn:graph:retrieval",
        user="trustgraph",
        collection="scientists"
    )
    print(f"Entity: {entity}")

mcp_tool(self, name: str, parameters: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]

执行模型上下文协议 (MCP) 工具。

参数:

name: 工具名称/标识符 parameters: 工具参数字典 **kwargs: 传递给服务的附加参数

返回值: dict: 工具执行结果

示例:

socket = api.socket()
flow = socket.flow("default")

result = flow.mcp_tool(
    name="search-web",
    parameters={"query": "latest AI news", "limit": 5}
)

prompt(self, id: str, variables: Dict[str, str], streaming: bool = False, **kwargs: Any) -> str | Iterator[str]

执行一个带有可选流式的提示模板。

参数:

id: 提示模板标识符 variables: 变量名到值的映射字典 streaming: 启用流式模式 (默认: False) **kwargs: 传递给服务的附加参数

返回值: Union[str, Iterator[str]]: 完整的响应或文本块的流

示例:

socket = api.socket()
flow = socket.flow("default")

# Streaming prompt execution
for chunk in flow.prompt(
    id="summarize-template",
    variables={"topic": "quantum computing", "length": "brief"},
    streaming=True
):
    print(chunk, end='', flush=True)

row_embeddings_query(self, text: str, schema_name: str, user: str = 'trustgraph', collection: str = 'default', index_name: str | None = None, limit: int = 10, **kwargs: Any) -> Dict[str, Any]

使用语义相似性在索引字段上查询行数据。

查找其索引字段值在语义上与 输入文本相似的行,使用向量嵌入。这实现了对结构化数据的模糊/语义匹配。

参数:

text: 用于语义搜索的查询文本 schema_name: 要搜索的模式名称 user: 用户/键空间标识符(默认为:"trustgraph" collection: 集合标识符(默认为:"default" index_name: 可选的索引名称,用于将搜索限制到特定索引 limit: 最大结果数量默认为10 **kwargs: 传递给服务的其他参数

返回值: dict: 查询结果,包含匹配项,其中包含 index_name、index_value、text 和 score

示例:

socket = api.socket()
flow = socket.flow("default")

# Search for customers by name similarity
results = flow.row_embeddings_query(
    text="John Smith",
    schema_name="customers",
    user="trustgraph",
    collection="sales",
    limit=5
)

# Filter to specific index
results = flow.row_embeddings_query(
    text="machine learning engineer",
    schema_name="employees",
    index_name="job_title",
    limit=10
)

rows_query(self, query: str, user: str, collection: str, variables: Dict[str, Any] | None = None, operation_name: str | None = None, **kwargs: Any) -> Dict[str, Any]

执行针对结构化行的 GraphQL 查询。

参数:

query: GraphQL 查询字符串 user: 用户/键空间标识符 collection: 集合标识符 variables: 可选的查询变量字典 operation_name: 用于多操作文档的可选操作名称 **kwargs: 传递给服务的附加参数

返回值: dict: 包含数据、错误和/或扩展的 GraphQL 响应

示例:

socket = api.socket()
flow = socket.flow("default")

query = '''
{
  scientists(limit: 10) {
    name
    field
    discoveries
  }
}
'''
result = flow.rows_query(
    query=query,
    user="trustgraph",
    collection="scientists"
)

text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> str | Iterator[str]

执行带有可选流的文本补全。

参数:

system: 定义助手行为的系统提示。 prompt: 用户提示/问题。 streaming: 启用流模式默认False**kwargs: 传递给服务的附加参数。

返回值: Union[str, Iterator[str]]: 完整的响应或文本块的流。

示例:

socket = api.socket()
flow = socket.flow("default")

# Non-streaming
response = flow.text_completion(
    system="You are helpful",
    prompt="Explain quantum computing",
    streaming=False
)
print(response)

# Streaming
for chunk in flow.text_completion(
    system="You are helpful",
    prompt="Explain quantum computing",
    streaming=True
):
    print(chunk, end='', flush=True)

triples_query(self, s: str | Dict[str, Any] | None = None, p: str | Dict[str, Any] | None = None, o: str | Dict[str, Any] | None = None, g: str | None = None, user: str | None = None, collection: str | None = None, limit: int = 100, **kwargs: Any) -> List[Dict[str, Any]]

使用模式匹配查询知识图谱三元组。

参数:

s: 主体过滤器 - URI 字符串、Term 字典,或 None 表示通配符 p: 谓语过滤器 - URI 字符串、Term 字典,或 None 表示通配符 o: 对象过滤器 - URI/字面量字符串、Term 字典,或 None 表示通配符 g: 命名图过滤器 - URI 字符串或 None 表示所有图 user: 用户/键空间标识符(可选) collection: 集合标识符(可选) limit: 返回的最大结果数默认100 **kwargs: 传递给服务的其他参数

返回值: List[Dict]: 匹配的三元组列表,以原始格式返回

示例:

socket = api.socket()
flow = socket.flow("default")

# Find all triples about a specific subject
triples = flow.triples_query(
    s="http://example.org/person/marie-curie",
    user="trustgraph",
    collection="scientists"
)

# Query with named graph filter
triples = flow.triples_query(
    s="urn:trustgraph:session:abc123",
    g="urn:graph:retrieval",
    user="trustgraph",
    collection="default"
)

triples_query_stream(self, s: str | Dict[str, Any] | None = None, p: str | Dict[str, Any] | None = None, o: str | Dict[str, Any] | None = None, g: str | None = None, user: str | None = None, collection: str | None = None, limit: int = 100, batch_size: int = 20, **kwargs: Any) -> Iterator[List[Dict[str, Any]]]

使用流式批处理查询知识图谱三元组。

随着三元组的到达,它会产生一批批的三元组,从而减少获得第一个结果所需的时间,并降低大型结果集的内存开销。

参数:

s: 主体过滤器 - URI 字符串、Term 字典,或 None 表示通配符 p: 谓语过滤器 - URI 字符串、Term 字典,或 None 表示通配符 o: 对象过滤器 - URI/字面量字符串、Term 字典,或 None 表示通配符 g: 命名图过滤器 - URI 字符串或 None 表示所有图 user: 用户/键空间标识符(可选) collection: 集合标识符(可选) limit: 要返回的最大结果数默认100 batch_size: 每批三元组数默认20 **kwargs: 传递给服务的其他参数 Yields: List[Dict]: 以线格式的三元组批次

示例:

socket = api.socket()
flow = socket.flow("default")

for batch in flow.triples_query_stream(
    user="trustgraph",
    collection="default"
):
    for triple in batch:
        print(triple["s"], triple["p"], triple["o"])

--

AsyncSocketClient

from trustgraph.api import AsyncSocketClient

异步 WebSocket 客户端

方法

__init__(self, url: str, timeout: int, token: str | None)

初始化 self。请参考 help(type(self)) 以获取准确的签名。

aclose(self)

关闭 WebSocket 连接

flow(self, flow_id: str)

获取用于 WebSocket 操作的异步流程实例

--

AsyncSocketFlowInstance

from trustgraph.api import AsyncSocketFlowInstance

异步 WebSocket 流实例

方法

__init__(self, client: trustgraph.api.async_socket_client.AsyncSocketClient, flow_id: str)

初始化 self。请参阅 help(type(self)) 以获取准确的签名。

agent(self, question: str, user: str, state: Dict[str, Any] | None = None, group: str | None = None, history: list | None = None, streaming: bool = False, **kwargs) -> Dict[str, Any] | AsyncIterator

带有可选流的 Agent

document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, streaming: bool = False, **kwargs)

带有可选流的文档 RAG

embeddings(self, texts: list, **kwargs)

生成文本嵌入

graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs)

用于语义搜索的图嵌入查询

graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, streaming: bool = False, **kwargs)

带有可选流的图 RAG

mcp_tool(self, name: str, parameters: Dict[str, Any], **kwargs)

执行 MCP 工具

prompt(self, id: str, variables: Dict[str, str], streaming: bool = False, **kwargs)

带有可选流的提示执行

row_embeddings_query(self, text: str, schema_name: str, user: str = 'trustgraph', collection: str = 'default', index_name: str | None = None, limit: int = 10, **kwargs)

用于结构化数据的语义搜索的行嵌入查询

rows_query(self, query: str, user: str, collection: str, variables: Dict | None = None, operation_name: str | None = None, **kwargs)

对结构化行的 GraphQL 查询

text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs)

带有可选流的文本补全

triples_query(self, s=None, p=None, o=None, user=None, collection=None, limit=100, **kwargs)

三元模式查询

--

build_term(value: Any, term_type: str | None = None, datatype: str | None = None, language: str | None = None) -> Dict[str, Any] | None

从值构建线性的 Term 字典。

自动检测规则(当 term_type 为 None 时): 已经是一个带有 't' 键的字典 -> 保持原样(已经是 Term 以 http://, https://, urn: 开头 -> IRI 包含在 <> 中(例如,http://...-> IRI去除尖角括号 其他任何内容 -> 文本

参数:

value: Term 值(字符串、字典或 None term_type: 'iri'、'literal' 或 None 中的一个,用于自动检测 datatype: 文本对象的类型例如xsd:integer language: 文本对象的语言标签例如en

返回值: dict: 线性的 Term 字典,如果值是 None 则返回 None

--

BulkClient

from trustgraph.api import BulkClient

用于导入/导出的同步批量操作客户端。

通过 WebSocket 提供高效的大数据集批量数据传输。 使用同步生成器包装异步 WebSocket 操作,以提高易用性。

注意:要获得真正的异步支持,请使用 AsyncBulkClient。

方法

__init__(self, url: str, timeout: int, token: str | None) -> None

初始化同步批量客户端。

参数:

urlTrustGraph API 的基本 URLHTTP/HTTPS 将转换为 WS/WSS timeoutWebSocket 超时时间(以秒为单位) token:可选的用于身份验证的 bearer token

close(self) -> None

关闭连接

export_document_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]

从流程中批量导出文档嵌入。

通过 WebSocket 流式传输高效下载所有文档块嵌入。

参数:

flow:流程标识符 **kwargs:附加参数(为未来使用保留)

返回值: Iterator[Dict[str, Any]]: 嵌入字典的流

示例:

bulk = api.bulk()

# Export and process document embeddings
for embedding in bulk.export_document_embeddings(flow="default"):
    chunk_id = embedding.get("chunk_id")
    vector = embedding.get("embedding")
    print(f"{chunk_id}: {len(vector)} dimensions")

export_entity_contexts(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]

从流程中批量导出实体上下文。

通过 WebSocket 流式传输高效下载所有实体上下文信息。

参数:

flow: 流程标识符 **kwargs: 附加参数(预留给未来使用)

返回值: Iterator[Dict[str, Any]]: 上下文字典的流

示例:

bulk = api.bulk()

# Export and process entity contexts
for context in bulk.export_entity_contexts(flow="default"):
    entity = context.get("entity")
    text = context.get("context")
    print(f"{entity}: {text[:100]}...")

export_graph_embeddings(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]

从流程中批量导出图嵌入。

能够高效地通过 WebSocket 流式传输下载所有图实体嵌入。

参数:

flow: 流程标识符 **kwargs: 附加参数(预留给未来使用)

返回值: Iterator[Dict[str, Any]]: 嵌入字典的流

示例:

bulk = api.bulk()

# Export and process embeddings
for embedding in bulk.export_graph_embeddings(flow="default"):
    entity = embedding.get("entity")
    vector = embedding.get("embedding")
    print(f"{entity}: {len(vector)} dimensions")

export_triples(self, flow: str, **kwargs: Any) -> Iterator[trustgraph.api.types.Triple]

从流程中批量导出 RDF 三元组。

能够高效地通过 WebSocket 流式传输下载所有三元组。

参数:

flow: 流程标识符 **kwargs: 附加参数(预留给未来使用)

返回值: Iterator[Triple]: 三元组对象的流

示例:

bulk = api.bulk()

# Export and process triples
for triple in bulk.export_triples(flow="default"):
    print(f"{triple.s} -> {triple.p} -> {triple.o}")

import_document_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None

将文档嵌入批量导入到流程中。

通过 WebSocket 流式传输高效地上传文档分块嵌入, 用于文档检索增强生成 (RAG) 查询。

参数:

flow: 流程标识符 embeddings: 产生嵌入字典的迭代器 **kwargs: 附加参数(预留给未来使用)

示例:

bulk = api.bulk()

# Generate document embeddings to import
def doc_embedding_generator():
    yield {"chunk_id": "doc1/p0/c0", "embedding": [0.1, 0.2, ...]}
    yield {"chunk_id": "doc1/p0/c1", "embedding": [0.3, 0.4, ...]}
    # ... more embeddings

bulk.import_document_embeddings(
    flow="default",
    embeddings=doc_embedding_generator()
)

import_entity_contexts(self, flow: str, contexts: Iterator[Dict[str, Any]], metadata: Dict[str, Any] | None = None, batch_size: int = 100, **kwargs: Any) -> None

将实体上下文批量导入到流程中。

通过 WebSocket 流式传输高效地上传实体上下文信息。 实体上下文为图实体提供额外的文本上下文, 以提高 RAG 的性能。

参数:

flow: 流程标识符 contexts: 产生上下文字典的迭代器 metadata: 包含 id、元数据、用户、集合的元数据字典 batch_size: 每个批次的上下文数量(默认为 100 **kwargs: 附加参数(为未来使用保留)

示例:

bulk = api.bulk()

# Generate entity contexts to import
def context_generator():
    yield {"entity": {"v": "entity1", "e": True}, "context": "Description..."}
    yield {"entity": {"v": "entity2", "e": True}, "context": "Description..."}
    # ... more contexts

bulk.import_entity_contexts(
    flow="default",
    contexts=context_generator(),
    metadata={"id": "doc1", "metadata": [], "user": "user1", "collection": "default"}
)

import_graph_embeddings(self, flow: str, embeddings: Iterator[Dict[str, Any]], **kwargs: Any) -> None

批量将图嵌入向量导入到流程中。

通过 WebSocket 流式传输高效地上传图实体嵌入向量。

参数:

flow: 流程标识符 embeddings: 产生嵌入字典的迭代器 **kwargs: 附加参数(预留给未来使用)

示例:

bulk = api.bulk()

# Generate embeddings to import
def embedding_generator():
    yield {"entity": "entity1", "embedding": [0.1, 0.2, ...]}
    yield {"entity": "entity2", "embedding": [0.3, 0.4, ...]}
    # ... more embeddings

bulk.import_graph_embeddings(
    flow="default",
    embeddings=embedding_generator()
)

import_rows(self, flow: str, rows: Iterator[Dict[str, Any]], **kwargs: Any) -> None

批量将结构化行导入到流程中。

通过 WebSocket 流式传输高效地上传结构化数据行, 用于 GraphQL 查询。

参数:

flow: 流程标识符 rows: 产生行字典的迭代器 **kwargs: 附加参数(为未来使用保留)

示例:

bulk = api.bulk()

# Generate rows to import
def row_generator():
    yield {"id": "row1", "name": "Row 1", "value": 100}
    yield {"id": "row2", "name": "Row 2", "value": 200}
    # ... more rows

bulk.import_rows(
    flow="default",
    rows=row_generator()
)

import_triples(self, flow: str, triples: Iterator[trustgraph.api.types.Triple], metadata: Dict[str, Any] | None = None, batch_size: int = 100, **kwargs: Any) -> None

批量将 RDF 三元组导入到流程中。

通过 WebSocket 流式传输高效地上传大量三元组。

参数:

flow: 流程标识符 triples: 产生 Triple 对象的迭代器 metadata: 包含 id、元数据、用户、集合的元数据字典 batch_size: 每个批次的元组数量(默认为 100 **kwargs: 其他参数(保留用于未来使用)

示例:

from trustgraph.api import Triple

bulk = api.bulk()

# Generate triples to import
def triple_generator():
    yield Triple(s="subj1", p="pred", o="obj1")
    yield Triple(s="subj2", p="pred", o="obj2")
    # ... more triples

# Import triples
bulk.import_triples(
    flow="default",
    triples=triple_generator(),
    metadata={"id": "doc1", "metadata": [], "user": "user1", "collection": "default"}
)

--

AsyncBulkClient

from trustgraph.api import AsyncBulkClient

异步批量操作客户端

方法

__init__(self, url: str, timeout: int, token: str | None) -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

aclose(self) -> None

关闭连接

export_document_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]

通过 WebSocket 批量导出文档嵌入

export_entity_contexts(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]

通过 WebSocket 批量导出实体上下文

export_graph_embeddings(self, flow: str, **kwargs: Any) -> AsyncIterator[Dict[str, Any]]

通过 WebSocket 批量导出图嵌入

export_triples(self, flow: str, **kwargs: Any) -> AsyncIterator[trustgraph.api.types.Triple]

通过 WebSocket 批量导出三元组

import_document_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None

通过 WebSocket 批量导入文档嵌入

import_entity_contexts(self, flow: str, contexts: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None

通过 WebSocket 批量导入实体上下文

import_graph_embeddings(self, flow: str, embeddings: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None

通过 WebSocket 批量导入图嵌入

import_rows(self, flow: str, rows: AsyncIterator[Dict[str, Any]], **kwargs: Any) -> None

通过 WebSocket 批量导入行

import_triples(self, flow: str, triples: AsyncIterator[trustgraph.api.types.Triple], **kwargs: Any) -> None

通过 WebSocket 批量导入三元组

--

Metrics

from trustgraph.api import Metrics

同步指标客户端

方法

__init__(self, url: str, timeout: int, token: str | None) -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

get(self) -> str

获取 Prometheus 指标,以文本形式

--

AsyncMetrics

from trustgraph.api import AsyncMetrics

异步指标客户端

方法

__init__(self, url: str, timeout: int, token: str | None) -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

aclose(self) -> None

关闭连接

get(self) -> str

获取 Prometheus 指标(以文本形式)

--

ExplainabilityClient

from trustgraph.api import ExplainabilityClient

客户端,用于获取可解释性实体,并处理最终一致性。

使用静默检测:获取、等待、再次获取、比较。 如果结果相同,则数据稳定。

方法

__init__(self, flow_instance, retry_delay: float = 0.2, max_retries: int = 10)

初始化可解释性客户端。

参数:

flow_instance: 用于查询三元组的 SocketFlowInstance。 retry_delay: 重试之间的延迟以秒为单位默认值0.2)。 max_retries: 最大重试次数默认值10

detect_session_type(self, session_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None) -> str

检测会话是 GraphRAG 还是 Agent 类型。

参数:

session_uri: 会话/问题的 URI。 graph: 命名图。 user: 用户/键空间标识符。 collection: 集合标识符。

返回值: "graphrag" 或 "agent"。

fetch_agent_trace(self, session_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None, api: Any = None, max_content: int = 10000) -> Dict[str, Any]

从会话 URI 开始,获取完整的 Agent 跟踪。

遵循溯源链:问题 -> 分析(s) -> 结论。

参数:

session_uri: Agent 会话/问题的 URI。 graph: 命名图默认值urn:graph:retrievaluser: 用户/键空间标识符。 collection: 集合标识符。 api: 用于图书管理员访问的 TrustGraph Api 实例(可选)。 max_content: 结论的最大内容长度。

返回值: 包含问题、迭代(分析列表)和结论实体的字典。

fetch_docrag_trace(self, question_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None, api: Any = None, max_content: int = 10000) -> Dict[str, Any]

从问题 URI 开始,获取完整的 DocumentRAG 跟踪。

遵循溯源链: 问题 -> 关联 -> 探索 -> 综合。

参数:

question_uri: 问题实体 URI。 graph: 命名图默认值urn:graph:retrievaluser: 用户/键空间标识符。 collection: 集合标识符。 api: 用于图书管理员访问的 TrustGraph Api 实例(可选)。 max_content: 综合的最大内容长度。

返回值: 包含问题、关联、探索和综合实体的字典。

fetch_document_content(self, document_uri: str, api: Any, user: str | None = None, max_content: int = 10000) -> str

通过文档 URI 从图书管理员处获取内容。

参数:

document_uri: 图书管理员中的文档 URI。 api: 用于图书管理员访问的 TrustGraph Api 实例。 user: 用于图书管理员的用户标识符。 max_content: 要返回的最大内容长度。

返回值: 文档内容作为字符串。

fetch_edge_selection(self, uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None) -> trustgraph.api.explainability.EdgeSelection | None

获取边缘选择实体(由 Focus 使用)。

参数:

uri: 边缘选择 URI。 graph: 要查询的命名图。 user: 用户/键空间标识符。 collection: 集合标识符。

返回值: EdgeSelection 或如果未找到则返回 None。

fetch_entity(self, uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None) -> trustgraph.api.explainability.ExplainEntity | None

通过 URI 获取可解释性实体,并采用最终一致性处理。

使用静默检测:

  1. 获取 URI 对应的三元组
  2. 如果结果为零,则重试
  3. 如果结果不为零,则等待并再次获取
  4. 如果结果相同,则数据稳定 - 解析并返回
  5. 如果结果不同,则数据仍在写入 - 重试

参数:

uri: 要获取的实体 URI graph: 查询的命名图 (例如,"urn:graph:retrieval") user: 用户/密钥空间标识符 collection: 集合标识符

返回值: ExplainEntity 子类,如果未找到则返回 None

fetch_focus_with_edges(self, uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None) -> trustgraph.api.explainability.Focus | None

获取 Focus 实体及其所有边选择。

参数:

uri: Focus 实体 URI graph: 查询的命名图 user: 用户/密钥空间标识符 collection: 集合标识符

返回值: 包含填充的 edge_selections 的 Focus 对象,如果未找到则返回 None

fetch_graphrag_trace(self, question_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None, api: Any = None, max_content: int = 10000) -> Dict[str, Any]

从问题 URI 开始,获取完整的 GraphRAG 跟踪。

遵循溯源链:问题 -> 关联 -> 探索 -> Focus -> 综合

参数:

question_uri: 问题实体 URI graph: 命名图 (默认: urn:graph:retrieval) user: 用户/密钥空间标识符 collection: 集合标识符 api: 用于 librarian 访问的 TrustGraph Api 实例 (可选) max_content: 综合的最大内容长度

返回值: 包含 question、grounding、exploration、focus、synthesis 实体的字典

list_sessions(self, graph: str | None = None, user: str | None = None, collection: str | None = None, limit: int = 50) -> List[trustgraph.api.explainability.Question]

列出集合中的所有可解释性会话 (问题)。

参数:

graph: 命名图 (默认: urn:graph:retrieval) user: 用户/密钥空间标识符 collection: 集合标识符 limit: 要返回的会话的最大数量

返回值: 按时间戳排序的 Question 实体列表 (最新在前)

resolve_edge_labels(self, edge: Dict[str, str], user: str | None = None, collection: str | None = None) -> Tuple[str, str, str]

解决边三元组的所有组件的标签。

参数:

edge: 包含 "s"、"p"、"o" 键的字典 user: 用户/密钥空间标识符 collection: 集合标识符

返回值: (s_label, p_label, o_label) 元组

resolve_label(self, uri: str, user: str | None = None, collection: str | None = None) -> str

使用缓存,为 URI 解析 rdfs:label。

参数:

uri: 要获取标签的 URI user: 用户/密钥空间标识符 collection: 集合标识符

返回值: 如果找到,则返回标签,否则返回 URI 本身

--

ExplainEntity

from trustgraph.api import ExplainEntity

可解释性实体的基类。

字段:

uri: <class 'str'> entity_type: <class 'str'>

方法

__init__(self, uri: str, entity_type: str = '') -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

Question

from trustgraph.api import Question

问题实体 - 用户启动会话的查询。

字段:

uri: <class 'str'> entity_type: <class 'str'> query: <class 'str'> timestamp: <class 'str'> question_type: <class 'str'>

方法

__init__(self, uri: str, entity_type: str = '', query: str = '', timestamp: str = '', question_type: str = '') -> None

初始化 self。请参阅 help(type(self)) 以获取准确的签名。

--

Exploration

from trustgraph.api import Exploration

探索实体 - 从知识库中检索到的边/块。

字段:

uri: <class 'str'> entity_type: <class 'str'> edge_count: <class 'int'> chunk_count: <class 'int'> entities: typing.List[str]

方法

__init__(self, uri: str, entity_type: str = '', edge_count: int = 0, chunk_count: int = 0, entities: List[str] = <factory>) -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

Focus

from trustgraph.api import Focus

关注实体 - 使用 LLM 推理选择的边(仅限 GraphRAG

字段:

uri: <class 'str'> entity_type: <class 'str'> selected_edge_uris: typing.List[str] edge_selections: typing.List[trustgraph.api.explainability.EdgeSelection]

方法

__init__(self, uri: str, entity_type: str = '', selected_edge_uris: List[str] = <factory>, edge_selections: List[trustgraph.api.explainability.EdgeSelection] = <factory>) -> None

初始化 self。请参阅 help(type(self)) 以获取准确的签名。

--

Synthesis

from trustgraph.api import Synthesis

综合实体 - 最终答案。

字段:

uri: <class 'str'> entity_type: <class 'str'> document: <class 'str'>

方法

__init__(self, uri: str, entity_type: str = '', document: str = '') -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

Analysis

from trustgraph.api import Analysis

分析实体 - 一个思考/行动/观察循环仅限Agent

字段:

uri: <class 'str'> entity_type: <class 'str'> action: <class 'str'> arguments: <class 'str'> thought: <class 'str'> observation: <class 'str'>

方法

__init__(self, uri: str, entity_type: str = '', action: str = '', arguments: str = '', thought: str = '', observation: str = '') -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

Conclusion

from trustgraph.api import Conclusion

结论实体 - 最终答案(仅限代理)。

字段:

uri: <class 'str'> entity_type: <class 'str'> document: <class 'str'>

方法

__init__(self, uri: str, entity_type: str = '', document: str = '') -> None

初始化 self。请参阅 help(type(self)) 以获取准确的签名。

--

EdgeSelection

from trustgraph.api import EdgeSelection

从 GraphRAG Focus 步骤中选择的边,并附带推理。

字段:

uri: <class 'str'> edge: typing.Dict[str, str] | None reasoning: <class 'str'>

方法

__init__(self, uri: str, edge: Dict[str, str] | None = None, reasoning: str = '') -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

wire_triples_to_tuples(wire_triples: List[Dict[str, Any]]) -> List[Tuple[str, str, Any]]

将线格式的三元组转换为 (s, p, o) 元组。

--

extract_term_value(term: Dict[str, Any]) -> Any

从线格式的 Term 字典中提取值。

--

Triple

from trustgraph.api import Triple

RDF三元组表示知识图谱中的一条陈述。

字段:

s: <class 'str'> p: <class 'str'> o: <class 'str'>

方法

__init__(self, s: str, p: str, o: str) -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

Uri

from trustgraph.api import Uri

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

从给定的对象创建一个新的字符串对象。如果指定了 encoding 或 errors则该对象必须公开一个数据缓冲区该缓冲区将使用给定的编码和错误处理程序进行解码。 否则,返回 object.str() 的结果(如果已定义),或者 repr(object)。 encoding 默认为 'utf-8'。 errors 默认为 'strict'。

方法

⟦CODE_0⟧

is_literal(self)

is_triple(self)

is_uri(self)

--

Literal

from trustgraph.api import Literal

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

从给定的对象创建一个新的字符串对象。如果指定了 encoding 或 errors则该对象必须公开一个数据缓冲区该缓冲区将使用给定的编码和错误处理程序进行解码。 否则,返回 object.str() 的结果(如果已定义),或者 repr(object)。 encoding 默认为 'utf-8'。 errors 默认为 'strict'。

方法

⟦CODE_0⟧

is_literal(self)

is_triple(self)

is_uri(self)

--

ConfigKey

from trustgraph.api import ConfigKey

配置键标识符。

字段:

type: <class 'str'> key: <class 'str'>

方法

__init__(self, type: str, key: str) -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

ConfigValue

from trustgraph.api import ConfigValue

配置键值对。

字段:

type: <class 'str'> key: <class 'str'> value: <class 'str'>

方法

__init__(self, type: str, key: str, value: str) -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

DocumentMetadata

from trustgraph.api import DocumentMetadata

文档库中一个文档的元数据。

属性:

parent_id: Parent document ID for child documents (empty for top: level docs)

字段:

id: <class 'str'> time: <class 'datetime.datetime'> kind: <class 'str'> title: <class 'str'> comments: <class 'str'> metadata: typing.List[trustgraph.api.types.Triple] user: <class 'str'> tags: typing.List[str] parent_id: <class 'str'> document_type: <class 'str'>

方法

__init__(self, id: str, time: datetime.datetime, kind: str, title: str, comments: str, metadata: List[trustgraph.api.types.Triple], user: str, tags: List[str], parent_id: str = '', document_type: str = 'source') -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

ProcessingMetadata

from trustgraph.api import ProcessingMetadata

正在处理的文档的元数据。

字段:

id: <class 'str'> document_id: <class 'str'> time: <class 'datetime.datetime'> flow: <class 'str'> user: <class 'str'> collection: <class 'str'> tags: typing.List[str]

方法

__init__(self, id: str, document_id: str, time: datetime.datetime, flow: str, user: str, collection: str, tags: List[str]) -> None

初始化 self。有关准确签名的信息请参阅 help(type(self))。

--

CollectionMetadata

from trustgraph.api import CollectionMetadata

数据的元数据。

集合提供逻辑分组和隔离,用于文档和 知识图谱数据。

属性:

name: Human: 可读的集合名称

字段:

user: <class 'str'> collection: <class 'str'> name: <class 'str'> description: <class 'str'> tags: typing.List[str]

方法

__init__(self, user: str, collection: str, name: str, description: str, tags: List[str]) -> None

初始化 self。请参阅 help(type(self)) 以获取准确的签名。

--

StreamingChunk

from trustgraph.api import StreamingChunk

流式响应分块的基础类。

用于基于 WebSocket 的流式操作,其中响应以增量方式传递, 并在生成时交付。

字段:

content: <class 'str'> end_of_message: <class 'bool'>

方法

__init__(self, content: str, end_of_message: bool = False) -> None

初始化 self。请参阅 help(type(self)) 以获取准确的签名。

--

AgentThought

from trustgraph.api import AgentThought

代理的推理/思考过程片段。

代表代理在执行期间的内部推理或规划步骤。 这些片段展示了代理如何思考问题。

字段:

content: <class 'str'> end_of_message: <class 'bool'> chunk_type: <class 'str'>

方法

__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'thought') -> None

初始化 self。请参考 help(type(self)) 以获取准确的签名。

--

AgentObservation

from trustgraph.api import AgentObservation

代理工具执行观察块。

表示执行工具或操作的结果或观察结果。 这些块显示了代理通过使用工具学到的内容。

字段:

content: <class 'str'> end_of_message: <class 'bool'> chunk_type: <class 'str'>

方法

__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'observation') -> None

初始化 self。请参阅 help(type(self)) 以获取准确的签名。

--

AgentAnswer

from trustgraph.api import AgentAnswer

Agent final answer chunk.

Represents the agent's final response to the user's query after completing its reasoning and tool use.

Attributes:

chunk_type: Always "final: answer"

Fields:

content: <class 'str'> end_of_message: <class 'bool'> chunk_type: <class 'str'> end_of_dialog: <class 'bool'>

Methods

__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'final-answer', end_of_dialog: bool = False) -> None

Initialize self. See help(type(self)) for accurate signature.

--

RAGChunk

from trustgraph.api import RAGChunk

RAG检索增强生成流式传输块。

用于从图 RAG、文档 RAG、文本补全以及其他生成服务中流式传输响应。

字段:

content: <class 'str'> end_of_message: <class 'bool'> chunk_type: <class 'str'> end_of_stream: <class 'bool'> error: typing.Dict[str, str] | None

方法

__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'rag', end_of_stream: bool = False, error: Dict[str, str] | None = None) -> None

初始化 self。有关准确的签名请参阅 help(type(self))。

--

ProvenanceEvent

from trustgraph.api import ProvenanceEvent

可解释性的溯源事件。

在启用可解释模式时GraphRAG 查询期间会发出。 每个事件代表在查询处理过程中创建的溯源节点。

字段:

explain_id: <class 'str'> explain_graph: <class 'str'> event_type: <class 'str'>

方法

__init__(self, explain_id: str, explain_graph: str = '', event_type: str = '') -> None

初始化 self。请参阅 help(type(self)) 以获取准确的签名。

--

ProtocolException

from trustgraph.api import ProtocolException

当 WebSocket 协议出现错误时触发。

--

TrustGraphException

from trustgraph.api import TrustGraphException

TrustGraph 服务所有错误的基类。

--

AgentError

from trustgraph.api import AgentError

代理服务错误

--

ConfigError

from trustgraph.api import ConfigError

配置服务错误

--

DocumentRagError

from trustgraph.api import DocumentRagError

文档检索错误

--

FlowError

from trustgraph.api import FlowError

流程管理错误

--

GatewayError

from trustgraph.api import GatewayError

API 网关错误

--

GraphRagError

from trustgraph.api import GraphRagError

图表 RAG 检索错误

--

LLMError

from trustgraph.api import LLMError

LLM 服务错误

--

LoadError

from trustgraph.api import LoadError

数据加载错误

--

LookupError

from trustgraph.api import LookupError

查找/搜索错误

--

NLPQueryError

from trustgraph.api import NLPQueryError

自然语言处理查询服务错误

--

RowsQueryError

from trustgraph.api import RowsQueryError

行查询服务错误

--

RequestError

from trustgraph.api import RequestError

请求处理错误

--

StructuredQueryError

from trustgraph.api import StructuredQueryError

结构化查询服务错误

--

UnexpectedError

from trustgraph.api import UnexpectedError

意外/未知的错误

--

ApplicationException

from trustgraph.api import ApplicationException

TrustGraph 服务所有错误的基类。

--