#!/usr/bin/env python3 import pulsar from pulsar.schema import JsonSchema, Bytes from schema import Chunk, Triple from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.llms import Ollama from trustgraphETL import scholar, callmixtral, build_graph_robust import sys import rdflib import uuid g = rdflib.Graph() client = pulsar.Client("pulsar://localhost:6650") consumer = client.subscribe( 'graph-load', 'graph-dump', schema=JsonSchema(Triple), ) g = rdflib.Graph() count = 0 limit = 100 while True: msg = consumer.receive() try: v = msg.value() if v.o.is_uri: g.add(( rdflib.term.URIRef(v.s.value), rdflib.term.URIRef(v.p.value), rdflib.term.URIRef(v.o.value), )) else: g.add(( rdflib.term.URIRef(v.s.value), rdflib.term.URIRef(v.p.value), rdflib.term.Literal(v.o.value), )) count += 1 if count > limit: id = str(uuid.uuid4()) path = f"graph/{id}.ttl" g.serialize(destination=path) g = rdflib.Graph() print(f"Written {path}") count = 0 # Acknowledge successful processing of the message consumer.acknowledge(msg) except Exception as e: print(e) # Message failed to be processed consumer.negative_acknowledge(msg) client.close()