trustgraph/graph-dump

71 lines
1.5 KiB
Text
Raw Normal View History

2024-07-10 23:20:06 +01:00
#!/usr/bin/env python3
import pulsar
from pulsar.schema import JsonSchema, Bytes
from schema import Chunk, Triple
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.llms import Ollama
from trustgraphETL import scholar, callmixtral, build_graph_robust
import sys
import rdflib
import uuid
g = rdflib.Graph()
client = pulsar.Client("pulsar://localhost:6650")
consumer = client.subscribe(
'graph-load', 'graph-dump',
schema=JsonSchema(Triple),
)
g = rdflib.Graph()
count = 0
limit = 100
while True:
msg = consumer.receive()
try:
v = msg.value()
if v.o.is_uri:
g.add((
rdflib.term.URIRef(v.s.value),
rdflib.term.URIRef(v.p.value),
rdflib.term.URIRef(v.o.value),
))
else:
g.add((
rdflib.term.URIRef(v.s.value),
rdflib.term.URIRef(v.p.value),
rdflib.term.Literal(v.o.value),
))
count += 1
if count > limit:
id = str(uuid.uuid4())
path = f"graph/{id}.ttl"
g.serialize(destination=path)
g = rdflib.Graph()
print(f"Written {path}")
count = 0
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception as e:
print(e)
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()