Maint/knowledge load collections (#132)

* Add user/collection support to knowledge loaders

* Fix timeouts
This commit is contained in:
cybermaggedon 2024-10-31 16:28:03 +00:00 committed by GitHub
parent bc1b38c998
commit 9ebfe0d94a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 71 additions and 10 deletions

View file

@ -38,7 +38,7 @@ class DocumentRagClient(BaseClient):
output_schema=DocumentRagResponse, output_schema=DocumentRagResponse,
) )
def request(self, query, timeout=500): def request(self, query, timeout=300):
return self.call( return self.call(
query=query, timeout=timeout query=query, timeout=timeout

View file

@ -52,7 +52,7 @@ class TriplesQueryClient(BaseClient):
self, self,
s, p, o, s, p, o,
user="trustgraph", collection="default", user="trustgraph", collection="default",
limit=10, timeout=60, limit=10, timeout=120,
): ):
return self.call( return self.call(
s=self.create_value(s), s=self.create_value(s),

View file

@ -6,7 +6,7 @@ Loads Graph embeddings into TrustGraph processing.
import pulsar import pulsar
from pulsar.schema import JsonSchema from pulsar.schema import JsonSchema
from trustgraph.schema import GraphEmbeddings, Value from trustgraph.schema import GraphEmbeddings, Value, Metadata
from trustgraph.schema import graph_embeddings_store_queue from trustgraph.schema import graph_embeddings_store_queue
import argparse import argparse
import os import os
@ -24,6 +24,8 @@ class Loader:
output_queue, output_queue,
log_level, log_level,
file, file,
user,
collection,
): ):
self.client = pulsar.Client( self.client = pulsar.Client(
@ -38,6 +40,8 @@ class Loader:
) )
self.file = file self.file = file
self.user = user
self.collection = collection
def run(self): def run(self):
@ -66,11 +70,16 @@ class Loader:
n = ent.as_py() n = ent.as_py()
r = GraphEmbeddings( r = GraphEmbeddings(
metadata=Metadata(
metadata=[],
user=self.user,
collection=self.collection,
),
vectors=b, vectors=b,
entity=Value( entity=Value(
value=n, value=n,
is_uri=n.startswith("https:") is_uri=n.startswith("https:")
) ),
) )
self.producer.send(r) self.producer.send(r)
@ -90,6 +99,8 @@ def main():
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650')
default_output_queue = graph_embeddings_store_queue default_output_queue = graph_embeddings_store_queue
default_user = 'trustgraph'
default_collection = 'default'
parser.add_argument( parser.add_argument(
'-p', '--pulsar-host', '-p', '--pulsar-host',
@ -103,6 +114,18 @@ def main():
help=f'Output queue (default: {default_output_queue})' help=f'Output queue (default: {default_output_queue})'
) )
parser.add_argument(
'-u', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-c', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument( parser.add_argument(
'-l', '--log-level', '-l', '--log-level',
type=LogLevel, type=LogLevel,
@ -127,6 +150,8 @@ def main():
output_queue=args.output_queue, output_queue=args.output_queue,
log_level=args.log_level, log_level=args.log_level,
file=args.file, file=args.file,
user=args.user,
collection=args.collection,
) )
p.run() p.run()

View file

@ -6,7 +6,7 @@ Loads Graph embeddings into TrustGraph processing.
import pulsar import pulsar
from pulsar.schema import JsonSchema from pulsar.schema import JsonSchema
from trustgraph.schema import Triple, Value from trustgraph.schema import Triples, Triple, Value, Metadata
from trustgraph.schema import triples_store_queue from trustgraph.schema import triples_store_queue
import argparse import argparse
import os import os
@ -24,6 +24,8 @@ class Loader:
output_queue, output_queue,
log_level, log_level,
file, file,
user,
collection,
): ):
self.client = pulsar.Client( self.client = pulsar.Client(
@ -33,11 +35,13 @@ class Loader:
self.producer = self.client.create_producer( self.producer = self.client.create_producer(
topic=output_queue, topic=output_queue,
schema=JsonSchema(Triple), schema=JsonSchema(Triples),
chunking_enabled=True, chunking_enabled=True,
) )
self.file = file self.file = file
self.user = user
self.collection = collection
def run(self): def run(self):
@ -66,10 +70,26 @@ class Loader:
for s, p, o in zip(sc, pc, oc): for s, p, o in zip(sc, pc, oc):
r = Triple( r = Triples(
s=Value(value=s.as_py(), is_uri=True), metadata=Metadata(
p=Value(value=p.as_py(), is_uri=True), metadata=[],
o=Value(value=o.as_py(), is_uri=o.as_py().startswith("https:")) user=self.user,
collection=self.collection,
),
triples=[
Triple(
s=Value(
value=s.as_py(), is_uri=True
),
p=Value(
value=p.as_py(), is_uri=True
),
o=Value(
value=o.as_py(),
is_uri=o.as_py().startswith("https:")
)
)
]
) )
self.producer.send(r) self.producer.send(r)
@ -89,6 +109,8 @@ def main():
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650')
default_output_queue = triples_store_queue default_output_queue = triples_store_queue
default_user = 'trustgraph'
default_collection = 'default'
parser.add_argument( parser.add_argument(
'-p', '--pulsar-host', '-p', '--pulsar-host',
@ -102,6 +124,18 @@ def main():
help=f'Output queue (default: {default_output_queue})' help=f'Output queue (default: {default_output_queue})'
) )
parser.add_argument(
'-u', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-c', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument( parser.add_argument(
'-l', '--log-level', '-l', '--log-level',
type=LogLevel, type=LogLevel,
@ -126,6 +160,8 @@ def main():
output_queue=args.output_queue, output_queue=args.output_queue,
log_level=args.log_level, log_level=args.log_level,
file=args.file, file=args.file,
user=args.user,
collection=args.collection,
) )
p.run() p.run()