From e214eb4e02ea2495a28d22b02ef67a1030ab8007 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Mon, 26 Jan 2026 17:38:00 +0000 Subject: [PATCH] Feature/prompts jsonl (#619) * Tech spec * JSONL implementation complete * Updated prompt client users * Fix tests --- docs/tech-specs/jsonl-prompt-output.md | 455 ++++++++++++++++++ .../test_agent_kg_extraction_integration.py | 72 ++- .../test_agent_extraction.py | 297 ++++++------ .../test_agent_extraction_edge_cases.py | 394 +++++++-------- tests/unit/test_prompt_manager.py | 247 +++++++++- .../trustgraph/extract/kg/agent/extract.py | 77 ++- .../extract/kg/ontology/simplified_parser.py | 124 ++++- .../trustgraph/template/prompt_manager.py | 89 +++- 8 files changed, 1292 insertions(+), 463 deletions(-) create mode 100644 docs/tech-specs/jsonl-prompt-output.md diff --git a/docs/tech-specs/jsonl-prompt-output.md b/docs/tech-specs/jsonl-prompt-output.md new file mode 100644 index 00000000..d8872fd4 --- /dev/null +++ b/docs/tech-specs/jsonl-prompt-output.md @@ -0,0 +1,455 @@ +# JSONL Prompt Output Technical Specification + +## Overview + +This specification describes the implementation of JSONL (JSON Lines) output +format for prompt responses in TrustGraph. JSONL enables truncation-resilient +extraction of structured data from LLM responses, addressing critical issues +with JSON array outputs being corrupted when LLM responses hit output token +limits. + +This implementation supports the following use cases: + +1. **Truncation-Resilient Extraction**: Extract valid partial results even when + LLM output is truncated mid-response +2. **Large-Scale Extraction**: Handle extraction of many items without risk of + complete failure due to token limits +3. **Mixed-Type Extraction**: Support extraction of multiple entity types + (definitions, relationships, entities, attributes) in a single prompt +4. **Streaming-Compatible Output**: Enable future streaming/incremental + processing of extraction results + +## Goals + +- **Backward Compatibility**: Existing prompts using `response-type: "text"` and + `response-type: "json"` continue to work without modification +- **Truncation Resilience**: Partial LLM outputs yield partial valid results + rather than complete failure +- **Schema Validation**: Support JSON Schema validation for individual objects +- **Discriminated Unions**: Support mixed-type outputs using a `type` field + discriminator +- **Minimal API Changes**: Extend existing prompt configuration with new + response type and schema key + +## Background + +### Current Architecture + +The prompt service supports two response types: + +1. `response-type: "text"` - Raw text response returned as-is +2. `response-type: "json"` - JSON parsed from response, validated against + optional `schema` + +Current implementation in `trustgraph-flow/trustgraph/template/prompt_manager.py`: + +```python +class Prompt: + def __init__(self, template, response_type = "text", terms=None, schema=None): + self.template = template + self.response_type = response_type + self.terms = terms + self.schema = schema +``` + +### Current Limitations + +When extraction prompts request output as JSON arrays (`[{...}, {...}, ...]`): + +- **Truncation corruption**: If the LLM hits output token limits mid-array, the + entire response becomes invalid JSON and cannot be parsed +- **All-or-nothing parsing**: Must receive complete output before parsing +- **No partial results**: A truncated response yields zero usable data +- **Unreliable for large extractions**: More extracted items = higher failure risk + +This specification addresses these limitations by introducing JSONL format for +extraction prompts, where each extracted item is a complete JSON object on its +own line. + +## Technical Design + +### Response Type Extension + +Add a new response type `"jsonl"` alongside existing `"text"` and `"json"` types. + +#### Configuration Changes + +**New response type value:** + +``` +"response-type": "jsonl" +``` + +**Schema interpretation:** + +The existing `"schema"` key is used for both `"json"` and `"jsonl"` response +types. The interpretation depends on the response type: + +- `"json"`: Schema describes the entire response (typically an array or object) +- `"jsonl"`: Schema describes each individual line/object + +```json +{ + "response-type": "jsonl", + "schema": { + "type": "object", + "properties": { + "entity": { "type": "string" }, + "definition": { "type": "string" } + }, + "required": ["entity", "definition"] + } +} +``` + +This avoids changes to prompt configuration tooling and editors. + +### JSONL Format Specification + +#### Simple Extraction + +For prompts extracting a single type of object (definitions, relationships, +topics, rows), the output is one JSON object per line with no wrapper: + +**Prompt output format:** +``` +{"entity": "photosynthesis", "definition": "Process by which plants convert sunlight"} +{"entity": "chlorophyll", "definition": "Green pigment in plants"} +{"entity": "mitochondria", "definition": "Powerhouse of the cell"} +``` + +**Contrast with previous JSON array format:** +```json +[ + {"entity": "photosynthesis", "definition": "Process by which plants convert sunlight"}, + {"entity": "chlorophyll", "definition": "Green pigment in plants"}, + {"entity": "mitochondria", "definition": "Powerhouse of the cell"} +] +``` + +If the LLM truncates after line 2, the JSON array format yields invalid JSON, +while JSONL yields two valid objects. + +#### Mixed-Type Extraction (Discriminated Unions) + +For prompts extracting multiple types of objects (e.g., both definitions and +relationships, or entities, relationships, and attributes), use a `"type"` +field as discriminator: + +**Prompt output format:** +``` +{"type": "definition", "entity": "DNA", "definition": "Molecule carrying genetic instructions"} +{"type": "relationship", "subject": "DNA", "predicate": "located_in", "object": "cell nucleus", "object-entity": true} +{"type": "definition", "entity": "RNA", "definition": "Molecule that carries genetic information"} +{"type": "relationship", "subject": "RNA", "predicate": "transcribed_from", "object": "DNA", "object-entity": true} +``` + +**Schema for discriminated unions uses `oneOf`:** +```json +{ + "response-type": "jsonl", + "schema": { + "oneOf": [ + { + "type": "object", + "properties": { + "type": { "const": "definition" }, + "entity": { "type": "string" }, + "definition": { "type": "string" } + }, + "required": ["type", "entity", "definition"] + }, + { + "type": "object", + "properties": { + "type": { "const": "relationship" }, + "subject": { "type": "string" }, + "predicate": { "type": "string" }, + "object": { "type": "string" }, + "object-entity": { "type": "boolean" } + }, + "required": ["type", "subject", "predicate", "object", "object-entity"] + } + ] + } +} +``` + +#### Ontology Extraction + +For ontology-based extraction with entities, relationships, and attributes: + +**Prompt output format:** +``` +{"type": "entity", "entity": "Cornish pasty", "entity_type": "fo/Recipe"} +{"type": "entity", "entity": "beef", "entity_type": "fo/Food"} +{"type": "relationship", "subject": "Cornish pasty", "subject_type": "fo/Recipe", "relation": "fo/has_ingredient", "object": "beef", "object_type": "fo/Food"} +{"type": "attribute", "entity": "Cornish pasty", "entity_type": "fo/Recipe", "attribute": "fo/serves", "value": "4 people"} +``` + +### Implementation Details + +#### Prompt Class + +The existing `Prompt` class requires no changes. The `schema` field is reused +for JSONL, with its interpretation determined by `response_type`: + +```python +class Prompt: + def __init__(self, template, response_type="text", terms=None, schema=None): + self.template = template + self.response_type = response_type + self.terms = terms + self.schema = schema # Interpretation depends on response_type +``` + +#### PromptManager.load_config + +No changes required - existing configuration loading already handles the +`schema` key. + +#### JSONL Parsing + +Add a new parsing method for JSONL responses: + +```python +def parse_jsonl(self, text): + """ + Parse JSONL response, returning list of valid objects. + + Invalid lines (malformed JSON, empty lines) are skipped with warnings. + This provides truncation resilience - partial output yields partial results. + """ + results = [] + + for line_num, line in enumerate(text.strip().split('\n'), 1): + line = line.strip() + + # Skip empty lines + if not line: + continue + + # Skip markdown code fence markers if present + if line.startswith('```'): + continue + + try: + obj = json.loads(line) + results.append(obj) + except json.JSONDecodeError as e: + # Log warning but continue - this provides truncation resilience + logger.warning(f"JSONL parse error on line {line_num}: {e}") + + return results +``` + +#### PromptManager.invoke Changes + +Extend the invoke method to handle the new response type: + +```python +async def invoke(self, id, input, llm): + logger.debug("Invoking prompt template...") + + terms = self.terms | self.prompts[id].terms | input + resp_type = self.prompts[id].response_type + + prompt = { + "system": self.system_template.render(terms), + "prompt": self.render(id, input) + } + + resp = await llm(**prompt) + + if resp_type == "text": + return resp + + if resp_type == "json": + try: + obj = self.parse_json(resp) + except: + logger.error(f"JSON parse failed: {resp}") + raise RuntimeError("JSON parse fail") + + if self.prompts[id].schema: + try: + validate(instance=obj, schema=self.prompts[id].schema) + logger.debug("Schema validation successful") + except Exception as e: + raise RuntimeError(f"Schema validation fail: {e}") + + return obj + + if resp_type == "jsonl": + objects = self.parse_jsonl(resp) + + if not objects: + logger.warning("JSONL parse returned no valid objects") + return [] + + # Validate each object against schema if provided + if self.prompts[id].schema: + validated = [] + for i, obj in enumerate(objects): + try: + validate(instance=obj, schema=self.prompts[id].schema) + validated.append(obj) + except Exception as e: + logger.warning(f"Object {i} failed schema validation: {e}") + return validated + + return objects + + raise RuntimeError(f"Response type {resp_type} not known") +``` + +### Affected Prompts + +The following prompts should be migrated to JSONL format: + +| Prompt ID | Description | Type Field | +|-----------|-------------|------------| +| `extract-definitions` | Entity/definition extraction | No (single type) | +| `extract-relationships` | Relationship extraction | No (single type) | +| `extract-topics` | Topic/definition extraction | No (single type) | +| `extract-rows` | Structured row extraction | No (single type) | +| `agent-kg-extract` | Combined definition + relationship extraction | Yes: `"definition"`, `"relationship"` | +| `extract-with-ontologies` / `ontology-extract` | Ontology-based extraction | Yes: `"entity"`, `"relationship"`, `"attribute"` | + +### API Changes + +#### Client Perspective + +JSONL parsing is transparent to prompt service API callers. The parsing occurs +server-side in the prompt service, and the response is returned via the standard +`PromptResponse.object` field as a serialized JSON array. + +When clients call the prompt service (via `PromptClient.prompt()` or similar): + +- **`response-type: "json"`** with array schema → client receives Python `list` +- **`response-type: "jsonl"`** → client receives Python `list` + +From the client's perspective, both return identical data structures. The +difference is entirely in how the LLM output is parsed server-side: + +- JSON array format: Single `json.loads()` call; fails completely if truncated +- JSONL format: Line-by-line parsing; yields partial results if truncated + +This means existing client code expecting a list from extraction prompts +requires no changes when migrating prompts from JSON to JSONL format. + +#### Server Return Value + +For `response-type: "jsonl"`, the `PromptManager.invoke()` method returns a +`list[dict]` containing all successfully parsed and validated objects. This +list is then serialized to JSON for the `PromptResponse.object` field. + +#### Error Handling + +- Empty results: Returns empty list `[]` with warning log +- Partial parse failure: Returns list of successfully parsed objects with + warning logs for failures +- Complete parse failure: Returns empty list `[]` with warning logs + +This differs from `response-type: "json"` which raises `RuntimeError` on +parse failure. The lenient behavior for JSONL is intentional to provide +truncation resilience. + +### Configuration Example + +Complete prompt configuration example: + +```json +{ + "prompt": "Extract all entities and their definitions from the following text. Output one JSON object per line.\n\nText:\n{{text}}\n\nOutput format per line:\n{\"entity\": \"\", \"definition\": \"\"}", + "response-type": "jsonl", + "schema": { + "type": "object", + "properties": { + "entity": { + "type": "string", + "description": "The entity name" + }, + "definition": { + "type": "string", + "description": "A clear definition of the entity" + } + }, + "required": ["entity", "definition"] + } +} +``` + +## Security Considerations + +- **Input Validation**: JSON parsing uses standard `json.loads()` which is safe + against injection attacks +- **Schema Validation**: Uses `jsonschema.validate()` for schema enforcement +- **No New Attack Surface**: JSONL parsing is strictly safer than JSON array + parsing due to line-by-line processing + +## Performance Considerations + +- **Memory**: Line-by-line parsing uses less peak memory than loading full JSON + arrays +- **Latency**: Parsing performance is comparable to JSON array parsing +- **Validation**: Schema validation runs per-object, which adds overhead but + enables partial results on validation failure + +## Testing Strategy + +### Unit Tests + +- JSONL parsing with valid input +- JSONL parsing with empty lines +- JSONL parsing with markdown code fences +- JSONL parsing with truncated final line +- JSONL parsing with invalid JSON lines interspersed +- Schema validation with `oneOf` discriminated unions +- Backward compatibility: existing `"text"` and `"json"` prompts unchanged + +### Integration Tests + +- End-to-end extraction with JSONL prompts +- Extraction with simulated truncation (artificially limited response) +- Mixed-type extraction with type discriminator +- Ontology extraction with all three types + +### Extraction Quality Tests + +- Compare extraction results: JSONL vs JSON array format +- Verify truncation resilience: JSONL yields partial results where JSON fails + +## Migration Plan + +### Phase 1: Implementation + +1. Implement `parse_jsonl()` method in `PromptManager` +2. Extend `invoke()` to handle `response-type: "jsonl"` +3. Add unit tests + +### Phase 2: Prompt Migration + +1. Update `extract-definitions` prompt and configuration +2. Update `extract-relationships` prompt and configuration +3. Update `extract-topics` prompt and configuration +4. Update `extract-rows` prompt and configuration +5. Update `agent-kg-extract` prompt and configuration +6. Update `extract-with-ontologies` prompt and configuration + +### Phase 3: Downstream Updates + +1. Update any code consuming extraction results to handle list return type +2. Update code that categorizes mixed-type extractions by `type` field +3. Update tests that assert on extraction output format + +## Open Questions + +None at this time. + +## References + +- Current implementation: `trustgraph-flow/trustgraph/template/prompt_manager.py` +- JSON Lines specification: https://jsonlines.org/ +- JSON Schema `oneOf`: https://json-schema.org/understanding-json-schema/reference/combining.html#oneof +- Related specification: Streaming LLM Responses (`docs/tech-specs/streaming-llm-responses.md`) diff --git a/tests/integration/test_agent_kg_extraction_integration.py b/tests/integration/test_agent_kg_extraction_integration.py index 50aadf3b..01516d8b 100644 --- a/tests/integration/test_agent_kg_extraction_integration.py +++ b/tests/integration/test_agent_kg_extraction_integration.py @@ -30,38 +30,16 @@ class TestAgentKgExtractionIntegration: # Mock agent client agent_client = AsyncMock() - # Mock successful agent response + # Mock successful agent response in JSONL format def mock_agent_response(recipient, question): - # Simulate agent processing and return structured response + # Simulate agent processing and return structured JSONL response mock_response = MagicMock() mock_response.error = None mock_response.answer = '''```json -{ - "definitions": [ - { - "entity": "Machine Learning", - "definition": "A subset of artificial intelligence that enables computers to learn from data without explicit programming." - }, - { - "entity": "Neural Networks", - "definition": "Computing systems inspired by biological neural networks that process information." - } - ], - "relationships": [ - { - "subject": "Machine Learning", - "predicate": "is_subset_of", - "object": "Artificial Intelligence", - "object-entity": true - }, - { - "subject": "Neural Networks", - "predicate": "used_in", - "object": "Machine Learning", - "object-entity": true - } - ] -} +{"type": "definition", "entity": "Machine Learning", "definition": "A subset of artificial intelligence that enables computers to learn from data without explicit programming."} +{"type": "definition", "entity": "Neural Networks", "definition": "Computing systems inspired by biological neural networks that process information."} +{"type": "relationship", "subject": "Machine Learning", "predicate": "is_subset_of", "object": "Artificial Intelligence", "object-entity": true} +{"type": "relationship", "subject": "Neural Networks", "predicate": "used_in", "object": "Machine Learning", "object-entity": true} ```''' return mock_response.answer @@ -120,7 +98,7 @@ class TestAgentKgExtractionIntegration: # Copy the methods we want to test extractor.to_uri = real_extractor.to_uri - extractor.parse_json = real_extractor.parse_json + extractor.parse_jsonl = real_extractor.parse_jsonl extractor.process_extraction_data = real_extractor.process_extraction_data extractor.emit_triples = real_extractor.emit_triples extractor.emit_entity_contexts = real_extractor.emit_entity_contexts @@ -156,7 +134,7 @@ class TestAgentKgExtractionIntegration: agent_response = agent_client.invoke(recipient=lambda x: True, question=prompt) # Parse and process - extraction_data = extractor.parse_json(agent_response) + extraction_data = extractor.parse_jsonl(agent_response) triples, entity_contexts = extractor.process_extraction_data(extraction_data, v.metadata) # Add metadata triples @@ -248,22 +226,28 @@ class TestAgentKgExtractionIntegration: @pytest.mark.asyncio async def test_invalid_json_response_handling(self, configured_agent_extractor, sample_chunk, mock_flow_context): - """Test handling of invalid JSON responses from agent""" + """Test handling of invalid JSON responses from agent - JSONL is lenient and skips invalid lines""" # Arrange - mock invalid JSON response agent_client = mock_flow_context("agent-request") - + def mock_invalid_json_response(recipient, question): return "This is not valid JSON at all" - + agent_client.invoke = mock_invalid_json_response - + mock_message = MagicMock() mock_message.value.return_value = sample_chunk mock_consumer = MagicMock() - # Act & Assert - with pytest.raises((ValueError, json.JSONDecodeError)): - await configured_agent_extractor.on_message(mock_message, mock_consumer, mock_flow_context) + # Act - JSONL parsing is lenient, invalid lines are skipped + await configured_agent_extractor.on_message(mock_message, mock_consumer, mock_flow_context) + + # Assert - should emit triples (with just metadata) but no entity contexts + triples_publisher = mock_flow_context("triples") + triples_publisher.send.assert_called_once() + + entity_contexts_publisher = mock_flow_context("entity-contexts") + entity_contexts_publisher.send.assert_not_called() @pytest.mark.asyncio async def test_empty_extraction_results(self, configured_agent_extractor, sample_chunk, mock_flow_context): @@ -272,7 +256,8 @@ class TestAgentKgExtractionIntegration: agent_client = mock_flow_context("agent-request") def mock_empty_response(recipient, question): - return '{"definitions": [], "relationships": []}' + # Return empty JSONL (just empty/whitespace) + return '' agent_client.invoke = mock_empty_response @@ -303,7 +288,8 @@ class TestAgentKgExtractionIntegration: agent_client = mock_flow_context("agent-request") def mock_malformed_response(recipient, question): - return '''{"definitions": [{"entity": "Missing Definition"}], "relationships": [{"subject": "Missing Object"}]}''' + # JSONL with definition missing required field + return '{"type": "definition", "entity": "Missing Definition"}' agent_client.invoke = mock_malformed_response @@ -330,7 +316,7 @@ class TestAgentKgExtractionIntegration: def capture_prompt(recipient, question): # Verify the prompt contains the test text assert test_text in question - return '{"definitions": [], "relationships": []}' + return '' # Empty JSONL response agent_client.invoke = capture_prompt @@ -361,7 +347,7 @@ class TestAgentKgExtractionIntegration: responses = [] def mock_response(recipient, question): - response = f'{{"definitions": [{{"entity": "Entity {len(responses)}", "definition": "Definition {len(responses)}"}}], "relationships": []}}' + response = f'{{"type": "definition", "entity": "Entity {len(responses)}", "definition": "Definition {len(responses)}"}}' responses.append(response) return response @@ -398,7 +384,7 @@ class TestAgentKgExtractionIntegration: # Verify unicode text was properly decoded and included assert "学习机器" in question assert "人工知能" in question - return '''{"definitions": [{"entity": "機械学習", "definition": "人工知能の一分野"}], "relationships": []}''' + return '{"type": "definition", "entity": "機械学習", "definition": "人工知能の一分野"}' agent_client.invoke = mock_unicode_response @@ -433,7 +419,7 @@ class TestAgentKgExtractionIntegration: def mock_large_text_response(recipient, question): # Verify large text was included assert len(question) > 10000 - return '''{"definitions": [{"entity": "Machine Learning", "definition": "Important AI technique"}], "relationships": []}''' + return '{"type": "definition", "entity": "Machine Learning", "definition": "Important AI technique"}' agent_client.invoke = mock_large_text_response diff --git a/tests/unit/test_knowledge_graph/test_agent_extraction.py b/tests/unit/test_knowledge_graph/test_agent_extraction.py index be5553df..626eba42 100644 --- a/tests/unit/test_knowledge_graph/test_agent_extraction.py +++ b/tests/unit/test_knowledge_graph/test_agent_extraction.py @@ -33,7 +33,7 @@ class TestAgentKgExtractor: # Set up the methods we want to test extractor.to_uri = real_extractor.to_uri - extractor.parse_json = real_extractor.parse_json + extractor.parse_jsonl = real_extractor.parse_jsonl extractor.process_extraction_data = real_extractor.process_extraction_data extractor.emit_triples = real_extractor.emit_triples extractor.emit_entity_contexts = real_extractor.emit_entity_contexts @@ -62,39 +62,40 @@ class TestAgentKgExtractor: @pytest.fixture def sample_extraction_data(self): - """Sample extraction data in expected format""" - return { - "definitions": [ - { - "entity": "Machine Learning", - "definition": "A subset of artificial intelligence that enables computers to learn from data without explicit programming." - }, - { - "entity": "Neural Networks", - "definition": "Computing systems inspired by biological neural networks that process information." - } - ], - "relationships": [ - { - "subject": "Machine Learning", - "predicate": "is_subset_of", - "object": "Artificial Intelligence", - "object-entity": True - }, - { - "subject": "Neural Networks", - "predicate": "used_in", - "object": "Machine Learning", - "object-entity": True - }, - { - "subject": "Deep Learning", - "predicate": "accuracy", - "object": "95%", - "object-entity": False - } - ] - } + """Sample extraction data in JSONL format (list with type discriminators)""" + return [ + { + "type": "definition", + "entity": "Machine Learning", + "definition": "A subset of artificial intelligence that enables computers to learn from data without explicit programming." + }, + { + "type": "definition", + "entity": "Neural Networks", + "definition": "Computing systems inspired by biological neural networks that process information." + }, + { + "type": "relationship", + "subject": "Machine Learning", + "predicate": "is_subset_of", + "object": "Artificial Intelligence", + "object-entity": True + }, + { + "type": "relationship", + "subject": "Neural Networks", + "predicate": "used_in", + "object": "Machine Learning", + "object-entity": True + }, + { + "type": "relationship", + "subject": "Deep Learning", + "predicate": "accuracy", + "object": "95%", + "object-entity": False + } + ] def test_to_uri_conversion(self, agent_extractor): """Test URI conversion for entities""" @@ -113,61 +114,67 @@ class TestAgentKgExtractor: expected = f"{TRUSTGRAPH_ENTITIES}" assert uri == expected - def test_parse_json_with_code_blocks(self, agent_extractor): - """Test JSON parsing from code blocks""" - # Test JSON in code blocks + def test_parse_jsonl_with_code_blocks(self, agent_extractor): + """Test JSONL parsing from code blocks""" + # Test JSONL in code blocks - note: JSON uses lowercase true/false response = '''```json - { - "definitions": [{"entity": "AI", "definition": "Artificial Intelligence"}], - "relationships": [] - } - ```''' - - result = agent_extractor.parse_json(response) - - assert result["definitions"][0]["entity"] == "AI" - assert result["definitions"][0]["definition"] == "Artificial Intelligence" - assert result["relationships"] == [] +{"type": "definition", "entity": "AI", "definition": "Artificial Intelligence"} +{"type": "relationship", "subject": "AI", "predicate": "is", "object": "technology", "object-entity": false} +```''' - def test_parse_json_without_code_blocks(self, agent_extractor): - """Test JSON parsing without code blocks""" - response = '''{"definitions": [{"entity": "ML", "definition": "Machine Learning"}], "relationships": []}''' - - result = agent_extractor.parse_json(response) - - assert result["definitions"][0]["entity"] == "ML" - assert result["definitions"][0]["definition"] == "Machine Learning" + result = agent_extractor.parse_jsonl(response) - def test_parse_json_invalid_format(self, agent_extractor): - """Test JSON parsing with invalid format""" - invalid_response = "This is not JSON at all" - - with pytest.raises(json.JSONDecodeError): - agent_extractor.parse_json(invalid_response) + assert len(result) == 2 + assert result[0]["entity"] == "AI" + assert result[0]["definition"] == "Artificial Intelligence" + assert result[1]["type"] == "relationship" - def test_parse_json_malformed_code_blocks(self, agent_extractor): - """Test JSON parsing with malformed code blocks""" - # Missing closing backticks - response = '''```json - {"definitions": [], "relationships": []} - ''' - - # Should still parse the JSON content - with pytest.raises(json.JSONDecodeError): - agent_extractor.parse_json(response) + def test_parse_jsonl_without_code_blocks(self, agent_extractor): + """Test JSONL parsing without code blocks""" + response = '''{"type": "definition", "entity": "ML", "definition": "Machine Learning"} +{"type": "definition", "entity": "AI", "definition": "Artificial Intelligence"}''' + + result = agent_extractor.parse_jsonl(response) + + assert len(result) == 2 + assert result[0]["entity"] == "ML" + assert result[1]["entity"] == "AI" + + def test_parse_jsonl_invalid_lines_skipped(self, agent_extractor): + """Test JSONL parsing skips invalid lines gracefully""" + response = '''{"type": "definition", "entity": "Valid", "definition": "Valid def"} +This is not JSON at all +{"type": "definition", "entity": "Also Valid", "definition": "Another def"}''' + + result = agent_extractor.parse_jsonl(response) + + # Should get 2 valid objects, skipping the invalid line + assert len(result) == 2 + assert result[0]["entity"] == "Valid" + assert result[1]["entity"] == "Also Valid" + + def test_parse_jsonl_truncation_resilience(self, agent_extractor): + """Test JSONL parsing handles truncated responses""" + # Simulates output cut off mid-line + response = '''{"type": "definition", "entity": "Complete", "definition": "Full def"} +{"type": "definition", "entity": "Trunca''' + + result = agent_extractor.parse_jsonl(response) + + # Should get 1 valid object, the truncated line is skipped + assert len(result) == 1 + assert result[0]["entity"] == "Complete" def test_process_extraction_data_definitions(self, agent_extractor, sample_metadata): """Test processing of definition data""" - data = { - "definitions": [ - { - "entity": "Machine Learning", - "definition": "A subset of AI that enables learning from data." - } - ], - "relationships": [] - } - + data = [ + { + "type": "definition", + "entity": "Machine Learning", + "definition": "A subset of AI that enables learning from data." + } + ] + triples, entity_contexts = agent_extractor.process_extraction_data(data, sample_metadata) # Check entity label triple @@ -196,18 +203,16 @@ class TestAgentKgExtractor: def test_process_extraction_data_relationships(self, agent_extractor, sample_metadata): """Test processing of relationship data""" - data = { - "definitions": [], - "relationships": [ - { - "subject": "Machine Learning", - "predicate": "is_subset_of", - "object": "Artificial Intelligence", - "object-entity": True - } - ] - } - + data = [ + { + "type": "relationship", + "subject": "Machine Learning", + "predicate": "is_subset_of", + "object": "Artificial Intelligence", + "object-entity": True + } + ] + triples, entity_contexts = agent_extractor.process_extraction_data(data, sample_metadata) # Check that subject, predicate, and object labels are created @@ -223,15 +228,12 @@ class TestAgentKgExtractor: assert predicate_label is not None assert predicate_label.o.value == "is_subset_of" - # Check main relationship triple - # NOTE: Current implementation has bugs: - # 1. Uses data.get("object-entity") instead of rel.get("object-entity") - # 2. Sets object_value to predicate_uri instead of actual object URI - # This test documents the current buggy behavior + # Check main relationship triple + object_uri = f"{TRUSTGRAPH_ENTITIES}Artificial%20Intelligence" rel_triple = next((t for t in triples if t.s.value == subject_uri and t.p.value == predicate_uri), None) assert rel_triple is not None - # Due to bug, object value is set to predicate_uri - assert rel_triple.o.value == predicate_uri + assert rel_triple.o.value == object_uri + assert rel_triple.o.is_uri == True # Check subject-of relationships subject_of_triples = [t for t in triples if t.p.value == SUBJECT_OF and t.o.value == "doc123"] @@ -239,20 +241,18 @@ class TestAgentKgExtractor: def test_process_extraction_data_literal_object(self, agent_extractor, sample_metadata): """Test processing of relationships with literal objects""" - data = { - "definitions": [], - "relationships": [ - { - "subject": "Deep Learning", - "predicate": "accuracy", - "object": "95%", - "object-entity": False - } - ] - } - + data = [ + { + "type": "relationship", + "subject": "Deep Learning", + "predicate": "accuracy", + "object": "95%", + "object-entity": False + } + ] + triples, entity_contexts = agent_extractor.process_extraction_data(data, sample_metadata) - + # Check that object labels are not created for literal objects object_labels = [t for t in triples if t.p.value == RDF_LABEL and t.o.value == "95%"] # Based on the code logic, it should not create object labels for non-entity objects @@ -275,63 +275,50 @@ class TestAgentKgExtractor: def test_process_extraction_data_no_metadata_id(self, agent_extractor): """Test processing when metadata has no ID""" metadata = Metadata(id=None, metadata=[]) - data = { - "definitions": [ - {"entity": "Test Entity", "definition": "Test definition"} - ], - "relationships": [] - } - + data = [ + {"type": "definition", "entity": "Test Entity", "definition": "Test definition"} + ] + triples, entity_contexts = agent_extractor.process_extraction_data(data, metadata) - + # Should not create subject-of relationships when no metadata ID subject_of_triples = [t for t in triples if t.p.value == SUBJECT_OF] assert len(subject_of_triples) == 0 - + # Should still create entity contexts assert len(entity_contexts) == 1 def test_process_extraction_data_empty_data(self, agent_extractor, sample_metadata): """Test processing of empty extraction data""" - data = {"definitions": [], "relationships": []} - - triples, entity_contexts = agent_extractor.process_extraction_data(data, sample_metadata) - - # Should only have metadata triples - assert len(entity_contexts) == 0 - # Triples should only contain metadata triples if any + data = [] - def test_process_extraction_data_missing_keys(self, agent_extractor, sample_metadata): - """Test processing data with missing keys""" - # Test missing definitions key - data = {"relationships": []} triples, entity_contexts = agent_extractor.process_extraction_data(data, sample_metadata) + + # Should have no entity contexts assert len(entity_contexts) == 0 - - # Test missing relationships key - data = {"definitions": []} + # Triples should be empty + assert len(triples) == 0 + + def test_process_extraction_data_unknown_types_ignored(self, agent_extractor, sample_metadata): + """Test processing data with unknown type values""" + data = [ + {"type": "definition", "entity": "Valid", "definition": "Valid def"}, + {"type": "unknown_type", "foo": "bar"}, # Unknown type - should be ignored + {"type": "relationship", "subject": "A", "predicate": "rel", "object": "B", "object-entity": True} + ] + triples, entity_contexts = agent_extractor.process_extraction_data(data, sample_metadata) - assert len(entity_contexts) == 0 - - # Test completely missing keys - data = {} - triples, entity_contexts = agent_extractor.process_extraction_data(data, sample_metadata) - assert len(entity_contexts) == 0 + + # Should process valid items and ignore unknown types + assert len(entity_contexts) == 1 # Only the definition creates entity context def test_process_extraction_data_malformed_entries(self, agent_extractor, sample_metadata): """Test processing data with malformed entries""" - # Test definition missing required fields - data = { - "definitions": [ - {"entity": "Test"}, # Missing definition - {"definition": "Test def"} # Missing entity - ], - "relationships": [ - {"subject": "A", "predicate": "rel"}, # Missing object - {"subject": "B", "object": "C"} # Missing predicate - ] - } - + # Test items missing required fields - should raise KeyError + data = [ + {"type": "definition", "entity": "Test"}, # Missing definition + ] + # Should handle gracefully or raise appropriate errors with pytest.raises(KeyError): agent_extractor.process_extraction_data(data, sample_metadata) diff --git a/tests/unit/test_knowledge_graph/test_agent_extraction_edge_cases.py b/tests/unit/test_knowledge_graph/test_agent_extraction_edge_cases.py index c69df8c4..a5190d4d 100644 --- a/tests/unit/test_knowledge_graph/test_agent_extraction_edge_cases.py +++ b/tests/unit/test_knowledge_graph/test_agent_extraction_edge_cases.py @@ -32,11 +32,11 @@ class TestAgentKgExtractionEdgeCases: # Set up the methods we want to test extractor.to_uri = real_extractor.to_uri - extractor.parse_json = real_extractor.parse_json + extractor.parse_jsonl = real_extractor.parse_jsonl extractor.process_extraction_data = real_extractor.process_extraction_data extractor.emit_triples = real_extractor.emit_triples extractor.emit_entity_contexts = real_extractor.emit_entity_contexts - + return extractor def test_to_uri_special_characters(self, agent_extractor): @@ -85,138 +85,108 @@ class TestAgentKgExtractionEdgeCases: # Verify the URI is properly encoded assert unicode_text not in uri # Original unicode should be encoded - def test_parse_json_whitespace_variations(self, agent_extractor): - """Test JSON parsing with various whitespace patterns""" - # Test JSON with different whitespace patterns + def test_parse_jsonl_whitespace_variations(self, agent_extractor): + """Test JSONL parsing with various whitespace patterns""" + # Test JSONL with different whitespace patterns test_cases = [ # Extra whitespace around code blocks - " ```json\n{\"test\": true}\n``` ", - # Tabs and mixed whitespace - "\t\t```json\n\t{\"test\": true}\n\t```\t", - # Multiple newlines - "\n\n\n```json\n\n{\"test\": true}\n\n```\n\n", - # JSON without code blocks but with whitespace - " {\"test\": true} ", - # Mixed line endings - "```json\r\n{\"test\": true}\r\n```", + ' ```json\n{"type": "definition", "entity": "test", "definition": "def"}\n``` ', + # Multiple newlines between lines + '{"type": "definition", "entity": "A", "definition": "def A"}\n\n\n{"type": "definition", "entity": "B", "definition": "def B"}', + # JSONL without code blocks but with whitespace + ' {"type": "definition", "entity": "test", "definition": "def"} ', ] - - for response in test_cases: - result = agent_extractor.parse_json(response) - assert result == {"test": True} - def test_parse_json_code_block_variations(self, agent_extractor): - """Test JSON parsing with different code block formats""" + for response in test_cases: + result = agent_extractor.parse_jsonl(response) + assert len(result) >= 1 + assert result[0].get("type") == "definition" + + def test_parse_jsonl_code_block_variations(self, agent_extractor): + """Test JSONL parsing with different code block formats""" test_cases = [ # Standard json code block - "```json\n{\"valid\": true}\n```", + '```json\n{"type": "definition", "entity": "A", "definition": "def"}\n```', + # jsonl code block + '```jsonl\n{"type": "definition", "entity": "A", "definition": "def"}\n```', # Code block without language - "```\n{\"valid\": true}\n```", - # Uppercase JSON - "```JSON\n{\"valid\": true}\n```", - # Mixed case - "```Json\n{\"valid\": true}\n```", - # Multiple code blocks (should take first one) - "```json\n{\"first\": true}\n```\n```json\n{\"second\": true}\n```", - # Code block with extra content - "Here's the result:\n```json\n{\"valid\": true}\n```\nDone!", + '```\n{"type": "definition", "entity": "A", "definition": "def"}\n```', + # Code block with extra content before/after + 'Here\'s the result:\n```json\n{"type": "definition", "entity": "A", "definition": "def"}\n```\nDone!', ] - + for i, response in enumerate(test_cases): - try: - result = agent_extractor.parse_json(response) - assert result.get("valid") == True or result.get("first") == True - except json.JSONDecodeError: - # Some cases may fail due to regex extraction issues - # This documents current behavior - the regex may not match all cases - print(f"Case {i} failed JSON parsing: {response[:50]}...") - pass + result = agent_extractor.parse_jsonl(response) + assert len(result) >= 1, f"Case {i} failed" + assert result[0].get("entity") == "A" - def test_parse_json_malformed_code_blocks(self, agent_extractor): - """Test JSON parsing with malformed code block formats""" - # These should still work by falling back to treating entire text as JSON - test_cases = [ - # Unclosed code block - "```json\n{\"test\": true}", - # No opening backticks - "{\"test\": true}\n```", - # Wrong number of backticks - "`json\n{\"test\": true}\n`", - # Nested backticks (should handle gracefully) - "```json\n{\"code\": \"```\", \"test\": true}\n```", - ] - - for response in test_cases: - try: - result = agent_extractor.parse_json(response) - assert "test" in result # Should successfully parse - except json.JSONDecodeError: - # This is also acceptable for malformed cases - pass + def test_parse_jsonl_truncation_resilience(self, agent_extractor): + """Test JSONL parsing with truncated responses""" + # Simulates LLM output being cut off mid-line + response = '''{"type": "definition", "entity": "Complete1", "definition": "Full definition"} +{"type": "definition", "entity": "Complete2", "definition": "Another full def"} +{"type": "definition", "entity": "Trunca''' - def test_parse_json_large_responses(self, agent_extractor): - """Test JSON parsing with very large responses""" - # Create a large JSON structure - large_data = { - "definitions": [ - { - "entity": f"Entity {i}", - "definition": f"Definition {i} " + "with more content " * 100 - } - for i in range(100) - ], - "relationships": [ - { - "subject": f"Subject {i}", - "predicate": f"predicate_{i}", - "object": f"Object {i}", - "object-entity": i % 2 == 0 - } - for i in range(50) - ] - } - - large_json_str = json.dumps(large_data) - response = f"```json\n{large_json_str}\n```" - - result = agent_extractor.parse_json(response) - - assert len(result["definitions"]) == 100 - assert len(result["relationships"]) == 50 - assert result["definitions"][0]["entity"] == "Entity 0" + result = agent_extractor.parse_jsonl(response) + + # Should get 2 valid objects, the truncated line is skipped + assert len(result) == 2 + assert result[0]["entity"] == "Complete1" + assert result[1]["entity"] == "Complete2" + + def test_parse_jsonl_large_responses(self, agent_extractor): + """Test JSONL parsing with very large responses""" + # Create a large JSONL response + lines = [] + for i in range(100): + lines.append(json.dumps({ + "type": "definition", + "entity": f"Entity {i}", + "definition": f"Definition {i} " + "with more content " * 100 + })) + for i in range(50): + lines.append(json.dumps({ + "type": "relationship", + "subject": f"Subject {i}", + "predicate": f"predicate_{i}", + "object": f"Object {i}", + "object-entity": i % 2 == 0 + })) + + response = f"```json\n{chr(10).join(lines)}\n```" + + result = agent_extractor.parse_jsonl(response) + + definitions = [r for r in result if r.get("type") == "definition"] + relationships = [r for r in result if r.get("type") == "relationship"] + + assert len(definitions) == 100 + assert len(relationships) == 50 + assert definitions[0]["entity"] == "Entity 0" def test_process_extraction_data_empty_metadata(self, agent_extractor): """Test processing with empty or minimal metadata""" # Test with None metadata - may not raise AttributeError depending on implementation try: - triples, contexts = agent_extractor.process_extraction_data( - {"definitions": [], "relationships": []}, - None - ) + triples, contexts = agent_extractor.process_extraction_data([], None) # If it doesn't raise, check the results assert len(triples) == 0 assert len(contexts) == 0 except (AttributeError, TypeError): # This is expected behavior when metadata is None pass - + # Test with metadata without ID metadata = Metadata(id=None, metadata=[]) - triples, contexts = agent_extractor.process_extraction_data( - {"definitions": [], "relationships": []}, - metadata - ) + triples, contexts = agent_extractor.process_extraction_data([], metadata) assert len(triples) == 0 assert len(contexts) == 0 - + # Test with metadata with empty string ID metadata = Metadata(id="", metadata=[]) - data = { - "definitions": [{"entity": "Test", "definition": "Test def"}], - "relationships": [] - } + data = [{"type": "definition", "entity": "Test", "definition": "Test def"}] triples, contexts = agent_extractor.process_extraction_data(data, metadata) - + # Should not create subject-of triples when ID is empty string subject_of_triples = [t for t in triples if t.p.value == SUBJECT_OF] assert len(subject_of_triples) == 0 @@ -224,7 +194,7 @@ class TestAgentKgExtractionEdgeCases: def test_process_extraction_data_special_entity_names(self, agent_extractor): """Test processing with special characters in entity names""" metadata = Metadata(id="doc123", metadata=[]) - + special_entities = [ "Entity with spaces", "Entity & Co.", @@ -237,20 +207,17 @@ class TestAgentKgExtractionEdgeCases: "Quotes: \"test\"", "Parentheses: (test)", ] - - data = { - "definitions": [ - {"entity": entity, "definition": f"Definition for {entity}"} - for entity in special_entities - ], - "relationships": [] - } - + + data = [ + {"type": "definition", "entity": entity, "definition": f"Definition for {entity}"} + for entity in special_entities + ] + triples, contexts = agent_extractor.process_extraction_data(data, metadata) - + # Verify all entities were processed assert len(contexts) == len(special_entities) - + # Verify URIs were properly encoded for i, entity in enumerate(special_entities): expected_uri = f"{TRUSTGRAPH_ENTITIES}{urllib.parse.quote(entity)}" @@ -259,23 +226,20 @@ class TestAgentKgExtractionEdgeCases: def test_process_extraction_data_very_long_definitions(self, agent_extractor): """Test processing with very long entity definitions""" metadata = Metadata(id="doc123", metadata=[]) - + # Create very long definition long_definition = "This is a very long definition. " * 1000 - - data = { - "definitions": [ - {"entity": "Test Entity", "definition": long_definition} - ], - "relationships": [] - } - + + data = [ + {"type": "definition", "entity": "Test Entity", "definition": long_definition} + ] + triples, contexts = agent_extractor.process_extraction_data(data, metadata) - + # Should handle long definitions without issues assert len(contexts) == 1 assert contexts[0].context == long_definition - + # Find definition triple def_triple = next((t for t in triples if t.p.value == DEFINITION), None) assert def_triple is not None @@ -284,22 +248,19 @@ class TestAgentKgExtractionEdgeCases: def test_process_extraction_data_duplicate_entities(self, agent_extractor): """Test processing with duplicate entity names""" metadata = Metadata(id="doc123", metadata=[]) - - data = { - "definitions": [ - {"entity": "Machine Learning", "definition": "First definition"}, - {"entity": "Machine Learning", "definition": "Second definition"}, # Duplicate - {"entity": "AI", "definition": "AI definition"}, - {"entity": "AI", "definition": "Another AI definition"}, # Duplicate - ], - "relationships": [] - } - + + data = [ + {"type": "definition", "entity": "Machine Learning", "definition": "First definition"}, + {"type": "definition", "entity": "Machine Learning", "definition": "Second definition"}, # Duplicate + {"type": "definition", "entity": "AI", "definition": "AI definition"}, + {"type": "definition", "entity": "AI", "definition": "Another AI definition"}, # Duplicate + ] + triples, contexts = agent_extractor.process_extraction_data(data, metadata) - + # Should process all entries (including duplicates) assert len(contexts) == 4 - + # Check that both definitions for "Machine Learning" are present ml_contexts = [ec for ec in contexts if "Machine%20Learning" in ec.entity.value] assert len(ml_contexts) == 2 @@ -309,25 +270,21 @@ class TestAgentKgExtractionEdgeCases: def test_process_extraction_data_empty_strings(self, agent_extractor): """Test processing with empty strings in data""" metadata = Metadata(id="doc123", metadata=[]) - - data = { - "definitions": [ - {"entity": "", "definition": "Definition for empty entity"}, - {"entity": "Valid Entity", "definition": ""}, - {"entity": " ", "definition": " "}, # Whitespace only - ], - "relationships": [ - {"subject": "", "predicate": "test", "object": "test", "object-entity": True}, - {"subject": "test", "predicate": "", "object": "test", "object-entity": True}, - {"subject": "test", "predicate": "test", "object": "", "object-entity": True}, - ] - } - + + data = [ + {"type": "definition", "entity": "", "definition": "Definition for empty entity"}, + {"type": "definition", "entity": "Valid Entity", "definition": ""}, + {"type": "definition", "entity": " ", "definition": " "}, # Whitespace only + {"type": "relationship", "subject": "", "predicate": "test", "object": "test", "object-entity": True}, + {"type": "relationship", "subject": "test", "predicate": "", "object": "test", "object-entity": True}, + {"type": "relationship", "subject": "test", "predicate": "test", "object": "", "object-entity": True}, + ] + triples, contexts = agent_extractor.process_extraction_data(data, metadata) - + # Should handle empty strings by creating URIs (even if empty) assert len(contexts) == 3 - + # Empty entity should create empty URI after encoding empty_entity_context = next((ec for ec in contexts if ec.entity.value == TRUSTGRAPH_ENTITIES), None) assert empty_entity_context is not None @@ -335,23 +292,22 @@ class TestAgentKgExtractionEdgeCases: def test_process_extraction_data_nested_json_in_strings(self, agent_extractor): """Test processing when definitions contain JSON-like strings""" metadata = Metadata(id="doc123", metadata=[]) - - data = { - "definitions": [ - { - "entity": "JSON Entity", - "definition": 'Definition with JSON: {"key": "value", "nested": {"inner": true}}' - }, - { - "entity": "Array Entity", - "definition": 'Contains array: [1, 2, 3, "string"]' - } - ], - "relationships": [] - } - + + data = [ + { + "type": "definition", + "entity": "JSON Entity", + "definition": 'Definition with JSON: {"key": "value", "nested": {"inner": true}}' + }, + { + "type": "definition", + "entity": "Array Entity", + "definition": 'Contains array: [1, 2, 3, "string"]' + } + ] + triples, contexts = agent_extractor.process_extraction_data(data, metadata) - + # Should handle JSON strings in definitions without parsing them assert len(contexts) == 2 assert '{"key": "value"' in contexts[0].context @@ -360,29 +316,26 @@ class TestAgentKgExtractionEdgeCases: def test_process_extraction_data_boolean_object_entity_variations(self, agent_extractor): """Test processing with various boolean values for object-entity""" metadata = Metadata(id="doc123", metadata=[]) - - data = { - "definitions": [], - "relationships": [ - # Explicit True - {"subject": "A", "predicate": "rel1", "object": "B", "object-entity": True}, - # Explicit False - {"subject": "A", "predicate": "rel2", "object": "literal", "object-entity": False}, - # Missing object-entity (should default to True based on code) - {"subject": "A", "predicate": "rel3", "object": "C"}, - # String "true" (should be treated as truthy) - {"subject": "A", "predicate": "rel4", "object": "D", "object-entity": "true"}, - # String "false" (should be treated as truthy in Python) - {"subject": "A", "predicate": "rel5", "object": "E", "object-entity": "false"}, - # Number 0 (falsy) - {"subject": "A", "predicate": "rel6", "object": "literal2", "object-entity": 0}, - # Number 1 (truthy) - {"subject": "A", "predicate": "rel7", "object": "F", "object-entity": 1}, - ] - } - + + data = [ + # Explicit True + {"type": "relationship", "subject": "A", "predicate": "rel1", "object": "B", "object-entity": True}, + # Explicit False + {"type": "relationship", "subject": "A", "predicate": "rel2", "object": "literal", "object-entity": False}, + # Missing object-entity (should default to True based on code) + {"type": "relationship", "subject": "A", "predicate": "rel3", "object": "C"}, + # String "true" (should be treated as truthy) + {"type": "relationship", "subject": "A", "predicate": "rel4", "object": "D", "object-entity": "true"}, + # String "false" (should be treated as truthy in Python) + {"type": "relationship", "subject": "A", "predicate": "rel5", "object": "E", "object-entity": "false"}, + # Number 0 (falsy) + {"type": "relationship", "subject": "A", "predicate": "rel6", "object": "literal2", "object-entity": 0}, + # Number 1 (truthy) + {"type": "relationship", "subject": "A", "predicate": "rel7", "object": "F", "object-entity": 1}, + ] + triples, contexts = agent_extractor.process_extraction_data(data, metadata) - + # Should process all relationships # Note: The current implementation has some logic issues that these tests document assert len([t for t in triples if t.p.value != RDF_LABEL and t.p.value != SUBJECT_OF]) >= 7 @@ -437,41 +390,40 @@ class TestAgentKgExtractionEdgeCases: def test_process_extraction_data_performance_large_dataset(self, agent_extractor): """Test performance with large extraction datasets""" metadata = Metadata(id="large-doc", metadata=[]) - - # Create large dataset + + # Create large dataset in JSONL format num_definitions = 1000 num_relationships = 2000 - - large_data = { - "definitions": [ - { - "entity": f"Entity_{i:04d}", - "definition": f"Definition for entity {i} with some detailed explanation." - } - for i in range(num_definitions) - ], - "relationships": [ - { - "subject": f"Entity_{i % num_definitions:04d}", - "predicate": f"predicate_{i % 10}", - "object": f"Entity_{(i + 1) % num_definitions:04d}", - "object-entity": True - } - for i in range(num_relationships) - ] - } - + + large_data = [ + { + "type": "definition", + "entity": f"Entity_{i:04d}", + "definition": f"Definition for entity {i} with some detailed explanation." + } + for i in range(num_definitions) + ] + [ + { + "type": "relationship", + "subject": f"Entity_{i % num_definitions:04d}", + "predicate": f"predicate_{i % 10}", + "object": f"Entity_{(i + 1) % num_definitions:04d}", + "object-entity": True + } + for i in range(num_relationships) + ] + import time start_time = time.time() - + triples, contexts = agent_extractor.process_extraction_data(large_data, metadata) - + end_time = time.time() processing_time = end_time - start_time - + # Should complete within reasonable time (adjust threshold as needed) assert processing_time < 10.0 # 10 seconds threshold - + # Verify results assert len(contexts) == num_definitions # Triples include labels, definitions, relationships, and subject-of relations diff --git a/tests/unit/test_prompt_manager.py b/tests/unit/test_prompt_manager.py index 026791d0..3e73ab9c 100644 --- a/tests/unit/test_prompt_manager.py +++ b/tests/unit/test_prompt_manager.py @@ -339,7 +339,250 @@ class TestPromptManager: """Test PromptManager with minimal configuration""" pm = PromptManager() pm.load_config({}) # Empty config - + assert pm.config.system_template == "Be helpful." # Default system assert pm.terms == {} # Default empty terms - assert len(pm.prompts) == 0 \ No newline at end of file + assert len(pm.prompts) == 0 + + +@pytest.mark.unit +class TestPromptManagerJsonl: + """Unit tests for PromptManager JSONL functionality""" + + @pytest.fixture + def jsonl_config(self): + """Configuration with JSONL response type prompts""" + return { + "system": json.dumps("You are an extraction assistant."), + "template-index": json.dumps(["extract_simple", "extract_with_schema", "extract_mixed"]), + "template.extract_simple": json.dumps({ + "prompt": "Extract entities from: {{ text }}", + "response-type": "jsonl" + }), + "template.extract_with_schema": json.dumps({ + "prompt": "Extract definitions from: {{ text }}", + "response-type": "jsonl", + "schema": { + "type": "object", + "properties": { + "entity": {"type": "string"}, + "definition": {"type": "string"} + }, + "required": ["entity", "definition"] + } + }), + "template.extract_mixed": json.dumps({ + "prompt": "Extract knowledge from: {{ text }}", + "response-type": "jsonl", + "schema": { + "oneOf": [ + { + "type": "object", + "properties": { + "type": {"const": "definition"}, + "entity": {"type": "string"}, + "definition": {"type": "string"} + }, + "required": ["type", "entity", "definition"] + }, + { + "type": "object", + "properties": { + "type": {"const": "relationship"}, + "subject": {"type": "string"}, + "predicate": {"type": "string"}, + "object": {"type": "string"} + }, + "required": ["type", "subject", "predicate", "object"] + } + ] + } + }) + } + + @pytest.fixture + def prompt_manager(self, jsonl_config): + """Create a PromptManager with JSONL configuration""" + pm = PromptManager() + pm.load_config(jsonl_config) + return pm + + def test_parse_jsonl_basic(self, prompt_manager): + """Test basic JSONL parsing""" + text = '{"entity": "cat", "definition": "A small furry animal"}\n{"entity": "dog", "definition": "A loyal pet"}' + + result = prompt_manager.parse_jsonl(text) + + assert len(result) == 2 + assert result[0]["entity"] == "cat" + assert result[1]["entity"] == "dog" + + def test_parse_jsonl_with_empty_lines(self, prompt_manager): + """Test JSONL parsing skips empty lines""" + text = '{"entity": "cat"}\n\n\n{"entity": "dog"}\n' + + result = prompt_manager.parse_jsonl(text) + + assert len(result) == 2 + + def test_parse_jsonl_with_markdown_fences(self, prompt_manager): + """Test JSONL parsing strips markdown code fences""" + text = '''```json +{"entity": "cat", "definition": "A furry animal"} +{"entity": "dog", "definition": "A loyal pet"} +```''' + + result = prompt_manager.parse_jsonl(text) + + assert len(result) == 2 + assert result[0]["entity"] == "cat" + assert result[1]["entity"] == "dog" + + def test_parse_jsonl_with_jsonl_fence(self, prompt_manager): + """Test JSONL parsing strips jsonl-marked code fences""" + text = '''```jsonl +{"entity": "cat"} +{"entity": "dog"} +```''' + + result = prompt_manager.parse_jsonl(text) + + assert len(result) == 2 + + def test_parse_jsonl_truncation_resilience(self, prompt_manager): + """Test JSONL parsing handles truncated final line""" + text = '{"entity": "cat", "definition": "Complete"}\n{"entity": "dog", "defi' + + result = prompt_manager.parse_jsonl(text) + + # Should get the first valid object, skip the truncated one + assert len(result) == 1 + assert result[0]["entity"] == "cat" + + def test_parse_jsonl_invalid_lines_skipped(self, prompt_manager): + """Test JSONL parsing skips invalid JSON lines""" + text = '''{"entity": "valid1"} +not json at all +{"entity": "valid2"} +{broken json +{"entity": "valid3"}''' + + result = prompt_manager.parse_jsonl(text) + + assert len(result) == 3 + assert result[0]["entity"] == "valid1" + assert result[1]["entity"] == "valid2" + assert result[2]["entity"] == "valid3" + + def test_parse_jsonl_empty_input(self, prompt_manager): + """Test JSONL parsing with empty input""" + result = prompt_manager.parse_jsonl("") + assert result == [] + + result = prompt_manager.parse_jsonl("\n\n\n") + assert result == [] + + @pytest.mark.asyncio + async def test_invoke_jsonl_response(self, prompt_manager): + """Test invoking a prompt with JSONL response""" + mock_llm = AsyncMock() + mock_llm.return_value = '{"entity": "photosynthesis", "definition": "Plant process"}\n{"entity": "mitosis", "definition": "Cell division"}' + + result = await prompt_manager.invoke( + "extract_simple", + {"text": "Biology text"}, + mock_llm + ) + + assert isinstance(result, list) + assert len(result) == 2 + assert result[0]["entity"] == "photosynthesis" + assert result[1]["entity"] == "mitosis" + + @pytest.mark.asyncio + async def test_invoke_jsonl_with_schema_validation(self, prompt_manager): + """Test JSONL response with schema validation""" + mock_llm = AsyncMock() + mock_llm.return_value = '{"entity": "cat", "definition": "A pet"}\n{"entity": "dog", "definition": "Another pet"}' + + result = await prompt_manager.invoke( + "extract_with_schema", + {"text": "Animal text"}, + mock_llm + ) + + assert len(result) == 2 + assert all("entity" in obj and "definition" in obj for obj in result) + + @pytest.mark.asyncio + async def test_invoke_jsonl_schema_filters_invalid(self, prompt_manager): + """Test JSONL schema validation filters out invalid objects""" + mock_llm = AsyncMock() + # Second object is missing required 'definition' field + mock_llm.return_value = '{"entity": "valid", "definition": "Has both fields"}\n{"entity": "invalid_missing_definition"}\n{"entity": "also_valid", "definition": "Complete"}' + + result = await prompt_manager.invoke( + "extract_with_schema", + {"text": "Test text"}, + mock_llm + ) + + # Only the two valid objects should be returned + assert len(result) == 2 + assert result[0]["entity"] == "valid" + assert result[1]["entity"] == "also_valid" + + @pytest.mark.asyncio + async def test_invoke_jsonl_mixed_types(self, prompt_manager): + """Test JSONL with discriminated union schema (oneOf)""" + mock_llm = AsyncMock() + mock_llm.return_value = '''{"type": "definition", "entity": "DNA", "definition": "Genetic material"} +{"type": "relationship", "subject": "DNA", "predicate": "found_in", "object": "nucleus"} +{"type": "definition", "entity": "RNA", "definition": "Messenger molecule"}''' + + result = await prompt_manager.invoke( + "extract_mixed", + {"text": "Biology text"}, + mock_llm + ) + + assert len(result) == 3 + + # Check definitions + definitions = [r for r in result if r.get("type") == "definition"] + assert len(definitions) == 2 + + # Check relationships + relationships = [r for r in result if r.get("type") == "relationship"] + assert len(relationships) == 1 + assert relationships[0]["subject"] == "DNA" + + @pytest.mark.asyncio + async def test_invoke_jsonl_empty_result(self, prompt_manager): + """Test JSONL response that yields no valid objects""" + mock_llm = AsyncMock() + mock_llm.return_value = "No JSON here at all" + + result = await prompt_manager.invoke( + "extract_simple", + {"text": "Test"}, + mock_llm + ) + + assert result == [] + + @pytest.mark.asyncio + async def test_invoke_jsonl_without_schema(self, prompt_manager): + """Test JSONL response without schema validation""" + mock_llm = AsyncMock() + mock_llm.return_value = '{"any": "structure"}\n{"completely": "different"}' + + result = await prompt_manager.invoke( + "extract_simple", + {"text": "Test"}, + mock_llm + ) + + assert len(result) == 2 + assert result[0] == {"any": "structure"} + assert result[1] == {"completely": "different"} \ No newline at end of file diff --git a/trustgraph-flow/trustgraph/extract/kg/agent/extract.py b/trustgraph-flow/trustgraph/extract/kg/agent/extract.py index b7ef9259..58230a41 100644 --- a/trustgraph-flow/trustgraph/extract/kg/agent/extract.py +++ b/trustgraph-flow/trustgraph/extract/kg/agent/extract.py @@ -126,16 +126,42 @@ class Processor(FlowProcessor): await pub.send(ecs) - def parse_json(self, text): - json_match = re.search(r'```(?:json)?(.*?)```', text, re.DOTALL) - - if json_match: - json_str = json_match.group(1).strip() - else: - # If no delimiters, assume the entire output is JSON - json_str = text.strip() + def parse_jsonl(self, text): + """ + Parse JSONL response, returning list of valid objects. - return json.loads(json_str) + Invalid lines (malformed JSON, empty lines) are skipped with warnings. + This provides truncation resilience - partial output yields partial results. + """ + results = [] + + # Strip markdown code fences if present + text = text.strip() + if text.startswith('```'): + # Remove opening fence (possibly with language hint) + text = re.sub(r'^```(?:json|jsonl)?\s*\n?', '', text) + if text.endswith('```'): + text = text[:-3] + + for line_num, line in enumerate(text.strip().split('\n'), 1): + line = line.strip() + + # Skip empty lines + if not line: + continue + + # Skip any remaining fence markers + if line.startswith('```'): + continue + + try: + obj = json.loads(line) + results.append(obj) + except json.JSONDecodeError as e: + # Log warning but continue - this provides truncation resilience + logger.warning(f"JSONL parse error on line {line_num}: {e}") + + return results async def on_message(self, msg, consumer, flow): @@ -178,11 +204,12 @@ class Processor(FlowProcessor): question = prompt ) - # Parse JSON response - try: - extraction_data = self.parse_json(agent_response) - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON response from agent: {e}") + # Parse JSONL response + extraction_data = self.parse_jsonl(agent_response) + + if not extraction_data: + logger.warning("JSONL parse returned no valid objects") + return # Process extraction data triples, entity_contexts = self.process_extraction_data( @@ -209,12 +236,21 @@ class Processor(FlowProcessor): raise def process_extraction_data(self, data, metadata): - """Process combined extraction data to generate triples and entity contexts""" + """Process JSONL extraction data to generate triples and entity contexts. + + Data is a flat list of objects with 'type' discriminator field: + - {"type": "definition", "entity": "...", "definition": "..."} + - {"type": "relationship", "subject": "...", "predicate": "...", "object": "...", "object-entity": bool} + """ triples = [] entity_contexts = [] + # Categorize items by type + definitions = [item for item in data if item.get("type") == "definition"] + relationships = [item for item in data if item.get("type") == "relationship"] + # Process definitions - for defn in data.get("definitions", []): + for defn in definitions: entity_uri = self.to_uri(defn["entity"]) @@ -247,17 +283,18 @@ class Processor(FlowProcessor): )) # Process relationships - for rel in data.get("relationships", []): + for rel in relationships: subject_uri = self.to_uri(rel["subject"]) predicate_uri = self.to_uri(rel["predicate"]) subject_value = Value(value=subject_uri, is_uri=True) predicate_value = Value(value=predicate_uri, is_uri=True) - if data.get("object-entity", False): - object_value = Value(value=predicate_uri, is_uri=True) + if rel.get("object-entity", True): + object_uri = self.to_uri(rel["object"]) + object_value = Value(value=object_uri, is_uri=True) else: - object_value = Value(value=predicate_uri, is_uri=False) + object_value = Value(value=rel["object"], is_uri=False) # Add subject and predicate labels triples.append(Triple( diff --git a/trustgraph-flow/trustgraph/extract/kg/ontology/simplified_parser.py b/trustgraph-flow/trustgraph/extract/kg/ontology/simplified_parser.py index 3131d977..1f54222d 100644 --- a/trustgraph-flow/trustgraph/extract/kg/ontology/simplified_parser.py +++ b/trustgraph-flow/trustgraph/extract/kg/ontology/simplified_parser.py @@ -49,8 +49,17 @@ class ExtractionResult: def parse_extraction_response(response: Any) -> Optional[ExtractionResult]: """Parse LLM extraction response into structured format. + Supports two formats: + 1. JSONL format (list): Flat list of objects with 'type' discriminator field + [{"type": "entity", ...}, {"type": "relationship", ...}, {"type": "attribute", ...}] + 2. Legacy format (dict): Nested structure with separate arrays + {"entities": [...], "relationships": [...], "attributes": [...]} + Args: - response: LLM response (string JSON or already parsed dict) + response: LLM response - can be: + - string (JSON to parse) + - dict (legacy nested format) + - list (JSONL format - flat list with type discriminators) Returns: ExtractionResult with parsed entities/relationships/attributes, @@ -64,17 +73,89 @@ def parse_extraction_response(response: Any) -> Optional[ExtractionResult]: logger.error(f"Failed to parse JSON response: {e}") logger.debug(f"Response was: {response[:500]}") return None - elif isinstance(response, dict): + elif isinstance(response, (dict, list)): data = response else: logger.error(f"Unexpected response type: {type(response)}") return None - # Validate structure - if not isinstance(data, dict): - logger.error(f"Expected dict, got {type(data)}") - return None + # Handle JSONL format (flat list with type discriminators) + if isinstance(data, list): + return parse_jsonl_format(data) + # Handle legacy format (nested dict) + if isinstance(data, dict): + return parse_legacy_format(data) + + logger.error(f"Expected dict or list, got {type(data)}") + return None + + +def parse_jsonl_format(data: List[Dict[str, Any]]) -> ExtractionResult: + """Parse JSONL format response (flat list with type discriminators). + + Each item has a 'type' field: 'entity', 'relationship', or 'attribute'. + + Args: + data: List of dicts with type discriminator + + Returns: + ExtractionResult with categorized items + """ + entities = [] + relationships = [] + attributes = [] + + for item in data: + if not isinstance(item, dict): + logger.warning(f"Skipping non-dict item: {type(item)}") + continue + + item_type = item.get('type') + + if item_type == 'entity': + try: + entity = parse_entity_jsonl(item) + if entity: + entities.append(entity) + except Exception as e: + logger.warning(f"Failed to parse entity {item}: {e}") + + elif item_type == 'relationship': + try: + relationship = parse_relationship(item) + if relationship: + relationships.append(relationship) + except Exception as e: + logger.warning(f"Failed to parse relationship {item}: {e}") + + elif item_type == 'attribute': + try: + attribute = parse_attribute(item) + if attribute: + attributes.append(attribute) + except Exception as e: + logger.warning(f"Failed to parse attribute {item}: {e}") + + else: + logger.warning(f"Unknown item type '{item_type}': {item}") + + return ExtractionResult( + entities=entities, + relationships=relationships, + attributes=attributes + ) + + +def parse_legacy_format(data: Dict[str, Any]) -> ExtractionResult: + """Parse legacy format response (nested dict with arrays). + + Args: + data: Dict with 'entities', 'relationships', 'attributes' arrays + + Returns: + ExtractionResult with parsed items + """ # Parse entities entities = [] entities_data = data.get('entities', []) @@ -127,6 +208,37 @@ def parse_extraction_response(response: Any) -> Optional[ExtractionResult]: ) +def parse_entity_jsonl(data: Dict[str, Any]) -> Optional[Entity]: + """Parse entity from JSONL format dict. + + JSONL format uses 'entity_type' instead of 'type' for the entity's type + (since 'type' is the discriminator field). + + Args: + data: Entity dict with 'entity' and 'entity_type' fields + + Returns: + Entity object or None if invalid + """ + if not isinstance(data, dict): + logger.warning(f"Entity data is not a dict: {type(data)}") + return None + + entity = data.get('entity') + # JSONL format uses 'entity_type' since 'type' is the discriminator + entity_type = data.get('entity_type') + + if not entity or not entity_type: + logger.warning(f"Missing required fields in entity: {data}") + return None + + if not isinstance(entity, str) or not isinstance(entity_type, str): + logger.warning(f"Entity fields must be strings: {data}") + return None + + return Entity(entity=entity, type=entity_type) + + def parse_entity(data: Dict[str, Any]) -> Optional[Entity]: """Parse entity from dict. diff --git a/trustgraph-flow/trustgraph/template/prompt_manager.py b/trustgraph-flow/trustgraph/template/prompt_manager.py index 9364cf21..546a7faf 100644 --- a/trustgraph-flow/trustgraph/template/prompt_manager.py +++ b/trustgraph-flow/trustgraph/template/prompt_manager.py @@ -83,7 +83,7 @@ class PromptManager: def parse_json(self, text): json_match = re.search(r'```(?:json)?(.*?)```', text, re.DOTALL) - + if json_match: json_str = json_match.group(1).strip() else: @@ -92,6 +92,43 @@ class PromptManager: return json.loads(json_str) + def parse_jsonl(self, text): + """ + Parse JSONL response, returning list of valid objects. + + Invalid lines (malformed JSON, empty lines) are skipped with warnings. + This provides truncation resilience - partial output yields partial results. + """ + results = [] + + # Strip markdown code fences if present + text = text.strip() + if text.startswith('```'): + # Remove opening fence (possibly with language hint) + text = re.sub(r'^```(?:json|jsonl)?\s*\n?', '', text) + if text.endswith('```'): + text = text[:-3] + + for line_num, line in enumerate(text.strip().split('\n'), 1): + line = line.strip() + + # Skip empty lines + if not line: + continue + + # Skip any remaining fence markers + if line.startswith('```'): + continue + + try: + obj = json.loads(line) + results.append(obj) + except json.JSONDecodeError as e: + # Log warning but continue - this provides truncation resilience + logger.warning(f"JSONL parse error on line {line_num}: {e}") + + return results + def render(self, id, input): if id not in self.prompts: @@ -121,21 +158,41 @@ class PromptManager: if resp_type == "text": return resp - if resp_type != "json": - raise RuntimeError(f"Response type {resp_type} not known") - - try: - obj = self.parse_json(resp) - except: - logger.error(f"JSON parse failed: {resp}") - raise RuntimeError("JSON parse fail") - - if self.prompts[id].schema: + if resp_type == "json": try: - validate(instance=obj, schema=self.prompts[id].schema) - logger.debug("Schema validation successful") - except Exception as e: - raise RuntimeError(f"Schema validation fail: {e}") + obj = self.parse_json(resp) + except: + logger.error(f"JSON parse failed: {resp}") + raise RuntimeError("JSON parse fail") - return obj + if self.prompts[id].schema: + try: + validate(instance=obj, schema=self.prompts[id].schema) + logger.debug("Schema validation successful") + except Exception as e: + raise RuntimeError(f"Schema validation fail: {e}") + + return obj + + if resp_type == "jsonl": + objects = self.parse_jsonl(resp) + + if not objects: + logger.warning("JSONL parse returned no valid objects") + return [] + + # Validate each object against schema if provided + if self.prompts[id].schema: + validated = [] + for i, obj in enumerate(objects): + try: + validate(instance=obj, schema=self.prompts[id].schema) + validated.append(obj) + except Exception as e: + logger.warning(f"Object {i} failed schema validation: {e}") + return validated + + return objects + + raise RuntimeError(f"Response type {resp_type} not known")