diff --git a/trustgraph-base/trustgraph/api/async_flow.py b/trustgraph-base/trustgraph/api/async_flow.py index 6b28886b..38560b19 100644 --- a/trustgraph-base/trustgraph/api/async_flow.py +++ b/trustgraph-base/trustgraph/api/async_flow.py @@ -612,8 +612,12 @@ class AsyncFlowInstance: print(f"{entity['name']}: {entity['score']}") ``` """ + # First convert text to embeddings vectors + emb_result = await self.embeddings(text=text) + vectors = emb_result.get("vectors", []) + request_data = { - "text": text, + "vectors": vectors, "user": user, "collection": collection, "limit": limit diff --git a/trustgraph-base/trustgraph/api/async_socket_client.py b/trustgraph-base/trustgraph/api/async_socket_client.py index cb6c8605..53727ef6 100644 --- a/trustgraph-base/trustgraph/api/async_socket_client.py +++ b/trustgraph-base/trustgraph/api/async_socket_client.py @@ -282,8 +282,12 @@ class AsyncSocketFlowInstance: async def graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs): """Query graph embeddings for semantic search""" + # First convert text to embeddings vectors + emb_result = await self.embeddings(text=text) + vectors = emb_result.get("vectors", []) + request = { - "text": text, + "vectors": vectors, "user": user, "collection": collection, "limit": limit diff --git a/trustgraph-base/trustgraph/api/bulk_client.py b/trustgraph-base/trustgraph/api/bulk_client.py index a2796332..91369ef4 100644 --- a/trustgraph-base/trustgraph/api/bulk_client.py +++ b/trustgraph-base/trustgraph/api/bulk_client.py @@ -15,6 +15,15 @@ from . types import Triple from . exceptions import ProtocolException +def _string_to_term(value: str) -> Dict[str, Any]: + """Convert a string value to Term format for the gateway.""" + # Treat URIs as IRI type, otherwise as literal + if value.startswith("http://") or value.startswith("https://") or "://" in value: + return {"t": "i", "i": value} + else: + return {"t": "l", "v": value} + + class BulkClient: """ Synchronous bulk operations client for import/export. @@ -62,7 +71,12 @@ class BulkClient: return loop.run_until_complete(coro) - def import_triples(self, flow: str, triples: Iterator[Triple], **kwargs: Any) -> None: + def import_triples( + self, flow: str, triples: Iterator[Triple], + metadata: Optional[Dict[str, Any]] = None, + batch_size: int = 100, + **kwargs: Any + ) -> None: """ Bulk import RDF triples into a flow. @@ -71,6 +85,8 @@ class BulkClient: Args: flow: Flow identifier triples: Iterator yielding Triple objects + metadata: Metadata dict with id, metadata, user, collection + batch_size: Number of triples per batch (default 100) **kwargs: Additional parameters (reserved for future use) Example: @@ -86,23 +102,47 @@ class BulkClient: # ... more triples # Import triples - bulk.import_triples(flow="default", triples=triple_generator()) + bulk.import_triples( + flow="default", + triples=triple_generator(), + metadata={"id": "doc1", "metadata": [], "user": "user1", "collection": "default"} + ) ``` """ - self._run_async(self._import_triples_async(flow, triples)) + self._run_async(self._import_triples_async(flow, triples, metadata, batch_size)) - async def _import_triples_async(self, flow: str, triples: Iterator[Triple]) -> None: + async def _import_triples_async( + self, flow: str, triples: Iterator[Triple], + metadata: Optional[Dict[str, Any]], batch_size: int + ) -> None: """Async implementation of triple import""" ws_url = f"{self.url}/api/v1/flow/{flow}/import/triples" if self.token: ws_url = f"{ws_url}?token={self.token}" + if metadata is None: + metadata = {"id": "", "metadata": [], "user": "trustgraph", "collection": "default"} + async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket: + batch = [] for triple in triples: + batch.append({ + "s": _string_to_term(triple.s), + "p": _string_to_term(triple.p), + "o": _string_to_term(triple.o) + }) + if len(batch) >= batch_size: + message = { + "metadata": metadata, + "triples": batch + } + await websocket.send(json.dumps(message)) + batch = [] + # Send remaining items + if batch: message = { - "s": triple.s, - "p": triple.p, - "o": triple.o + "metadata": metadata, + "triples": batch } await websocket.send(json.dumps(message)) @@ -362,7 +402,12 @@ class BulkClient: async for raw_message in websocket: yield json.loads(raw_message) - def import_entity_contexts(self, flow: str, contexts: Iterator[Dict[str, Any]], **kwargs: Any) -> None: + def import_entity_contexts( + self, flow: str, contexts: Iterator[Dict[str, Any]], + metadata: Optional[Dict[str, Any]] = None, + batch_size: int = 100, + **kwargs: Any + ) -> None: """ Bulk import entity contexts into a flow. @@ -373,6 +418,8 @@ class BulkClient: Args: flow: Flow identifier contexts: Iterator yielding context dictionaries + metadata: Metadata dict with id, metadata, user, collection + batch_size: Number of contexts per batch (default 100) **kwargs: Additional parameters (reserved for future use) Example: @@ -381,27 +428,49 @@ class BulkClient: # Generate entity contexts to import def context_generator(): - yield {"entity": "entity1", "context": "Description of entity1..."} - yield {"entity": "entity2", "context": "Description of entity2..."} + yield {"entity": {"v": "entity1", "e": True}, "context": "Description..."} + yield {"entity": {"v": "entity2", "e": True}, "context": "Description..."} # ... more contexts bulk.import_entity_contexts( flow="default", - contexts=context_generator() + contexts=context_generator(), + metadata={"id": "doc1", "metadata": [], "user": "user1", "collection": "default"} ) ``` """ - self._run_async(self._import_entity_contexts_async(flow, contexts)) + self._run_async(self._import_entity_contexts_async(flow, contexts, metadata, batch_size)) - async def _import_entity_contexts_async(self, flow: str, contexts: Iterator[Dict[str, Any]]) -> None: + async def _import_entity_contexts_async( + self, flow: str, contexts: Iterator[Dict[str, Any]], + metadata: Optional[Dict[str, Any]], batch_size: int + ) -> None: """Async implementation of entity contexts import""" ws_url = f"{self.url}/api/v1/flow/{flow}/import/entity-contexts" if self.token: ws_url = f"{ws_url}?token={self.token}" + if metadata is None: + metadata = {"id": "", "metadata": [], "user": "trustgraph", "collection": "default"} + async with websockets.connect(ws_url, ping_interval=20, ping_timeout=self.timeout) as websocket: + batch = [] for context in contexts: - await websocket.send(json.dumps(context)) + batch.append(context) + if len(batch) >= batch_size: + message = { + "metadata": metadata, + "entities": batch + } + await websocket.send(json.dumps(message)) + batch = [] + # Send remaining items + if batch: + message = { + "metadata": metadata, + "entities": batch + } + await websocket.send(json.dumps(message)) def export_entity_contexts(self, flow: str, **kwargs: Any) -> Iterator[Dict[str, Any]]: """ diff --git a/trustgraph-base/trustgraph/api/flow.py b/trustgraph-base/trustgraph/api/flow.py index 6fd9c723..e10ae0f7 100644 --- a/trustgraph-base/trustgraph/api/flow.py +++ b/trustgraph-base/trustgraph/api/flow.py @@ -584,9 +584,13 @@ class FlowInstance: ``` """ + # First convert text to embeddings vectors + emb_result = self.embeddings(text=text) + vectors = emb_result.get("vectors", []) + # Query graph embeddings for semantic search input = { - "text": text, + "vectors": vectors, "user": user, "collection": collection, "limit": limit @@ -597,6 +601,51 @@ class FlowInstance: input ) + def document_embeddings_query(self, text, user, collection, limit=10): + """ + Query document chunks using semantic similarity. + + Finds document chunks whose content is semantically similar to the + input text, using vector embeddings. + + Args: + text: Query text for semantic search + user: User/keyspace identifier + collection: Collection identifier + limit: Maximum number of results (default: 10) + + Returns: + dict: Query results with similar document chunks + + Example: + ```python + flow = api.flow().id("default") + results = flow.document_embeddings_query( + text="machine learning algorithms", + user="trustgraph", + collection="research-papers", + limit=5 + ) + ``` + """ + + # First convert text to embeddings vectors + emb_result = self.embeddings(text=text) + vectors = emb_result.get("vectors", []) + + # Query document embeddings for semantic search + input = { + "vectors": vectors, + "user": user, + "collection": collection, + "limit": limit + } + + return self.request( + "service/document-embeddings", + input + ) + def prompt(self, id, variables): """ Execute a prompt template with variable substitution. diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index c712f808..53ad1b4b 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -649,8 +649,12 @@ class SocketFlowInstance: ) ``` """ + # First convert text to embeddings vectors + emb_result = self.embeddings(text=text) + vectors = emb_result.get("vectors", []) + request = { - "text": text, + "vectors": vectors, "user": user, "collection": collection, "limit": limit @@ -659,6 +663,54 @@ class SocketFlowInstance: return self.client._send_request_sync("graph-embeddings", self.flow_id, request, False) + def document_embeddings_query( + self, + text: str, + user: str, + collection: str, + limit: int = 10, + **kwargs: Any + ) -> Dict[str, Any]: + """ + Query document chunks using semantic similarity. + + Args: + text: Query text for semantic search + user: User/keyspace identifier + collection: Collection identifier + limit: Maximum number of results (default: 10) + **kwargs: Additional parameters passed to the service + + Returns: + dict: Query results with similar document chunks + + Example: + ```python + socket = api.socket() + flow = socket.flow("default") + + results = flow.document_embeddings_query( + text="machine learning algorithms", + user="trustgraph", + collection="research-papers", + limit=5 + ) + ``` + """ + # First convert text to embeddings vectors + emb_result = self.embeddings(text=text) + vectors = emb_result.get("vectors", []) + + request = { + "vectors": vectors, + "user": user, + "collection": collection, + "limit": limit + } + request.update(kwargs) + + return self.client._send_request_sync("document-embeddings", self.flow_id, request, False) + def embeddings(self, text: str, **kwargs: Any) -> Dict[str, Any]: """ Generate vector embeddings for text. diff --git a/trustgraph-cli/pyproject.toml b/trustgraph-cli/pyproject.toml index ab7980ac..09a22bdc 100644 --- a/trustgraph-cli/pyproject.toml +++ b/trustgraph-cli/pyproject.toml @@ -43,6 +43,9 @@ tg-invoke-agent = "trustgraph.cli.invoke_agent:main" tg-invoke-document-rag = "trustgraph.cli.invoke_document_rag:main" tg-invoke-graph-rag = "trustgraph.cli.invoke_graph_rag:main" tg-invoke-llm = "trustgraph.cli.invoke_llm:main" +tg-invoke-embeddings = "trustgraph.cli.invoke_embeddings:main" +tg-invoke-graph-embeddings = "trustgraph.cli.invoke_graph_embeddings:main" +tg-invoke-document-embeddings = "trustgraph.cli.invoke_document_embeddings:main" tg-invoke-mcp-tool = "trustgraph.cli.invoke_mcp_tool:main" tg-invoke-nlp-query = "trustgraph.cli.invoke_nlp_query:main" tg-invoke-objects-query = "trustgraph.cli.invoke_objects_query:main" diff --git a/trustgraph-cli/trustgraph/cli/invoke_document_embeddings.py b/trustgraph-cli/trustgraph/cli/invoke_document_embeddings.py new file mode 100644 index 00000000..b14397cb --- /dev/null +++ b/trustgraph-cli/trustgraph/cli/invoke_document_embeddings.py @@ -0,0 +1,121 @@ +""" +Queries document chunks by text similarity using vector embeddings. +Returns a list of matching document chunks, truncated to the specified length. +""" + +import argparse +import os +from trustgraph.api import Api + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') +default_token = os.getenv("TRUSTGRAPH_TOKEN", None) + +def truncate_chunk(chunk, max_length): + """Truncate a chunk to max_length characters, adding ellipsis if needed.""" + if len(chunk) <= max_length: + return chunk + return chunk[:max_length] + "..." + +def query(url, flow_id, query_text, user, collection, limit, max_chunk_length, token=None): + + # Create API client + api = Api(url=url, token=token) + socket = api.socket() + flow = socket.flow(flow_id) + + try: + # Call document embeddings query service + result = flow.document_embeddings_query( + text=query_text, + user=user, + collection=collection, + limit=limit + ) + + chunks = result.get("chunks", []) + for i, chunk in enumerate(chunks, 1): + truncated = truncate_chunk(chunk, max_chunk_length) + print(f"{i}. {truncated}") + + finally: + # Clean up socket connection + socket.close() + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-invoke-document-embeddings', + description=__doc__, + ) + + parser.add_argument( + '-u', '--url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-t', '--token', + default=default_token, + help='Authentication token (default: $TRUSTGRAPH_TOKEN)', + ) + + parser.add_argument( + '-f', '--flow-id', + default="default", + help=f'Flow ID (default: default)' + ) + + parser.add_argument( + '-U', '--user', + default="trustgraph", + help='User/keyspace (default: trustgraph)', + ) + + parser.add_argument( + '-c', '--collection', + default="default", + help='Collection (default: default)', + ) + + parser.add_argument( + '-l', '--limit', + type=int, + default=10, + help='Maximum number of results (default: 10)', + ) + + parser.add_argument( + '--max-chunk-length', + type=int, + default=200, + help='Truncate chunks to N characters (default: 200)', + ) + + parser.add_argument( + 'query', + nargs=1, + help='Query text to search for similar document chunks', + ) + + args = parser.parse_args() + + try: + + query( + url=args.url, + flow_id=args.flow_id, + query_text=args.query[0], + user=args.user, + collection=args.collection, + limit=args.limit, + max_chunk_length=args.max_chunk_length, + token=args.token, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +if __name__ == "__main__": + main() diff --git a/trustgraph-cli/trustgraph/cli/invoke_embeddings.py b/trustgraph-cli/trustgraph/cli/invoke_embeddings.py new file mode 100644 index 00000000..71a88bd7 --- /dev/null +++ b/trustgraph-cli/trustgraph/cli/invoke_embeddings.py @@ -0,0 +1,77 @@ +""" +Invokes the embeddings service to convert text to a vector embedding. +Returns the embedding vector as a list of floats. +""" + +import argparse +import os +from trustgraph.api import Api + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') +default_token = os.getenv("TRUSTGRAPH_TOKEN", None) + +def query(url, flow_id, text, token=None): + + # Create API client + api = Api(url=url, token=token) + socket = api.socket() + flow = socket.flow(flow_id) + + try: + # Call embeddings service + result = flow.embeddings(text=text) + vectors = result.get("vectors", []) + print(vectors) + + finally: + # Clean up socket connection + socket.close() + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-invoke-embeddings', + description=__doc__, + ) + + parser.add_argument( + '-u', '--url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-t', '--token', + default=default_token, + help='Authentication token (default: $TRUSTGRAPH_TOKEN)', + ) + + parser.add_argument( + '-f', '--flow-id', + default="default", + help=f'Flow ID (default: default)' + ) + + parser.add_argument( + 'text', + nargs=1, + help='Text to convert to embedding vector', + ) + + args = parser.parse_args() + + try: + + query( + url=args.url, + flow_id=args.flow_id, + text=args.text[0], + token=args.token, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +if __name__ == "__main__": + main() diff --git a/trustgraph-cli/trustgraph/cli/invoke_graph_embeddings.py b/trustgraph-cli/trustgraph/cli/invoke_graph_embeddings.py new file mode 100644 index 00000000..ae195007 --- /dev/null +++ b/trustgraph-cli/trustgraph/cli/invoke_graph_embeddings.py @@ -0,0 +1,106 @@ +""" +Queries graph entities by text similarity using vector embeddings. +Returns a list of matching graph entities. +""" + +import argparse +import os +from trustgraph.api import Api + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') +default_token = os.getenv("TRUSTGRAPH_TOKEN", None) + +def query(url, flow_id, query_text, user, collection, limit, token=None): + + # Create API client + api = Api(url=url, token=token) + socket = api.socket() + flow = socket.flow(flow_id) + + try: + # Call graph embeddings query service + result = flow.graph_embeddings_query( + text=query_text, + user=user, + collection=collection, + limit=limit + ) + + entities = result.get("entities", []) + for entity in entities: + print(entity) + + finally: + # Clean up socket connection + socket.close() + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-invoke-graph-embeddings', + description=__doc__, + ) + + parser.add_argument( + '-u', '--url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-t', '--token', + default=default_token, + help='Authentication token (default: $TRUSTGRAPH_TOKEN)', + ) + + parser.add_argument( + '-f', '--flow-id', + default="default", + help=f'Flow ID (default: default)' + ) + + parser.add_argument( + '-U', '--user', + default="trustgraph", + help='User/keyspace (default: trustgraph)', + ) + + parser.add_argument( + '-c', '--collection', + default="default", + help='Collection (default: default)', + ) + + parser.add_argument( + '-l', '--limit', + type=int, + default=10, + help='Maximum number of results (default: 10)', + ) + + parser.add_argument( + 'query', + nargs=1, + help='Query text to search for similar graph entities', + ) + + args = parser.parse_args() + + try: + + query( + url=args.url, + flow_id=args.flow_id, + query_text=args.query[0], + user=args.user, + collection=args.collection, + limit=args.limit, + token=args.token, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +if __name__ == "__main__": + main() diff --git a/trustgraph-cli/trustgraph/cli/load_knowledge.py b/trustgraph-cli/trustgraph/cli/load_knowledge.py index ff6ca980..5e96850f 100644 --- a/trustgraph-cli/trustgraph/cli/load_knowledge.py +++ b/trustgraph-cli/trustgraph/cli/load_knowledge.py @@ -87,13 +87,20 @@ class KnowledgeLoader: # Load triples from all files print("Loading triples...") + total_triples = 0 for file in self.files: print(f" Processing {file}...") - triples = self.load_triples_from_file(file) + count = 0 + + def counting_triples(): + nonlocal count + for triple in self.load_triples_from_file(file): + count += 1 + yield triple bulk.import_triples( flow=self.flow, - triples=triples, + triples=counting_triples(), metadata={ "id": self.document_id, "metadata": [], @@ -101,25 +108,33 @@ class KnowledgeLoader: "collection": self.collection } ) + print(f" Loaded {count} triples") + total_triples += count - print("Triples loaded.") + print(f"Triples loaded. Total: {total_triples}") # Load entity contexts from all files print("Loading entity contexts...") + total_contexts = 0 for file in self.files: print(f" Processing {file}...") + count = 0 # Convert tuples to the format expected by import_entity_contexts + # Entity must be in Term format: {"t": "i", "i": uri} for IRI def entity_context_generator(): + nonlocal count for entity, context in self.load_entity_contexts_from_file(file): + count += 1 + # Entities from RDF are URIs, use IRI term format yield { - "entity": {"v": entity, "e": True}, + "entity": {"t": "i", "i": entity}, "context": context } bulk.import_entity_contexts( flow=self.flow, - entities=entity_context_generator(), + contexts=entity_context_generator(), metadata={ "id": self.document_id, "metadata": [], @@ -127,8 +142,10 @@ class KnowledgeLoader: "collection": self.collection } ) + print(f" Loaded {count} entity contexts") + total_contexts += count - print("Entity contexts loaded.") + print(f"Entity contexts loaded. Total: {total_contexts}") except Exception as e: print(f"Error: {e}", flush=True) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_query.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_query.py new file mode 100644 index 00000000..650d4f40 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_query.py @@ -0,0 +1,31 @@ + +from ... schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse +from ... messaging import TranslatorRegistry + +from . requestor import ServiceRequestor + +class DocumentEmbeddingsQueryRequestor(ServiceRequestor): + def __init__( + self, backend, request_queue, response_queue, timeout, + consumer, subscriber, + ): + + super(DocumentEmbeddingsQueryRequestor, self).__init__( + backend=backend, + request_queue=request_queue, + response_queue=response_queue, + request_schema=DocumentEmbeddingsRequest, + response_schema=DocumentEmbeddingsResponse, + subscription = subscriber, + consumer_name = consumer, + timeout=timeout, + ) + + self.request_translator = TranslatorRegistry.get_request_translator("document-embeddings-query") + self.response_translator = TranslatorRegistry.get_response_translator("document-embeddings-query") + + def to_request(self, body): + return self.request_translator.to_pulsar(body) + + def from_response(self, message): + return self.response_translator.from_response_with_completion(message) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index 0766e232..2d401cf3 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -26,6 +26,7 @@ from . structured_query import StructuredQueryRequestor from . structured_diag import StructuredDiagRequestor from . embeddings import EmbeddingsRequestor from . graph_embeddings_query import GraphEmbeddingsQueryRequestor +from . document_embeddings_query import DocumentEmbeddingsQueryRequestor from . mcp_tool import McpToolRequestor from . text_load import TextLoad from . document_load import DocumentLoad @@ -55,6 +56,7 @@ request_response_dispatchers = { "document-rag": DocumentRagRequestor, "embeddings": EmbeddingsRequestor, "graph-embeddings": GraphEmbeddingsQueryRequestor, + "document-embeddings": DocumentEmbeddingsQueryRequestor, "triples": TriplesQueryRequestor, "objects": ObjectsQueryRequestor, "nlp-query": NLPQueryRequestor,