diff --git a/trustgraph-parquet/trustgraph/dump/triples/parquet/processor.py b/trustgraph-parquet/trustgraph/dump/triples/parquet/processor.py index 553a62b8..dc15d8a9 100755 --- a/trustgraph-parquet/trustgraph/dump/triples/parquet/processor.py +++ b/trustgraph-parquet/trustgraph/dump/triples/parquet/processor.py @@ -9,7 +9,7 @@ import os import argparse import time -from .... schema import Triple +from .... schema import Triples from .... schema import triples_store_queue from .... base import Consumer @@ -38,7 +38,7 @@ class Processor(Consumer): **params | { "input_queue": input_queue, "subscriber": subscriber, - "input_schema": Triple, + "input_schema": Triples, } ) @@ -51,7 +51,9 @@ class Processor(Consumer): def handle(self, msg): v = msg.value() - self.writer.write(v.s.value, v.p.value, v.o.value) + + for t in v.triples: + self.writer.write(t.s.value, t.p.value, t.o.value) @staticmethod def add_args(parser):