mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-30 10:56:23 +02:00
Fixing tests
This commit is contained in:
parent
d26f846ad0
commit
d1ada72ab4
1 changed files with 18 additions and 6 deletions
|
|
@ -147,7 +147,7 @@ async def test_import_graceful_shutdown_integration():
|
||||||
for i, (message, properties) in enumerate(sent_messages):
|
for i, (message, properties) in enumerate(sent_messages):
|
||||||
assert message.metadata.id == f"msg-{i}"
|
assert message.metadata.id == f"msg-{i}"
|
||||||
assert len(message.triples) == 1
|
assert len(message.triples) == 1
|
||||||
assert message.triples[0][0] == f"subject-{i}"
|
assert message.triples[0].s.value == f"subject-{i}"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|
@ -169,17 +169,29 @@ async def test_export_no_message_loss_integration():
|
||||||
},
|
},
|
||||||
"triples": [{"s": {"v": f"export-subject-{i}", "e": False}, "p": {"v": "predicate", "e": False}, "o": {"v": f"export-object-{i}", "e": False}}]
|
"triples": [{"s": {"v": f"export-subject-{i}", "e": False}, "p": {"v": "predicate", "e": False}, "o": {"v": f"export-object-{i}", "e": False}}]
|
||||||
}
|
}
|
||||||
test_messages.append(MockPulsarMessage(msg_data, f"export-msg-{i}"))
|
# Create Triples object instead of raw dict
|
||||||
|
from trustgraph.schema import Triples, Metadata
|
||||||
|
from trustgraph.gateway.dispatch.serialize import to_subgraph
|
||||||
|
triples_obj = Triples(
|
||||||
|
metadata=Metadata(
|
||||||
|
id=f"export-msg-{i}",
|
||||||
|
metadata=to_subgraph(msg_data["metadata"]["metadata"]),
|
||||||
|
user=msg_data["metadata"]["user"],
|
||||||
|
collection=msg_data["metadata"]["collection"],
|
||||||
|
),
|
||||||
|
triples=to_subgraph(msg_data["triples"]),
|
||||||
|
)
|
||||||
|
test_messages.append(MockPulsarMessage(triples_obj, f"export-msg-{i}"))
|
||||||
|
|
||||||
# Mock consumer to provide messages
|
# Mock consumer to provide messages
|
||||||
message_iter = iter(test_messages)
|
message_iter = iter(test_messages)
|
||||||
async def mock_receive():
|
def mock_receive(timeout_millis=None):
|
||||||
try:
|
try:
|
||||||
return next(message_iter)
|
return next(message_iter)
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
# Simulate no more messages
|
# Simulate timeout when no more messages
|
||||||
await asyncio.sleep(1)
|
from pulsar import TimeoutException
|
||||||
raise StopIteration
|
raise TimeoutException("No more messages")
|
||||||
|
|
||||||
mock_consumer.receive = mock_receive
|
mock_consumer.receive = mock_receive
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue