SPARQL query service (#754)

SPARQL 1.1 query service wrapping pub/sub triples interface

Add a backend-agnostic SPARQL query service that parses SPARQL
queries using rdflib, decomposes them into triple pattern lookups
via the existing TriplesClient pub/sub interface, and performs
in-memory joins, filters, and projections.

Includes:
- SPARQL parser, algebra evaluator, expression evaluator, solution
  sequence operations (BGP, JOIN, OPTIONAL, UNION, FILTER, BIND,
  VALUES, GROUP BY, ORDER BY, LIMIT/OFFSET, DISTINCT, aggregates)
- FlowProcessor service with TriplesClientSpec
- Gateway dispatcher, request/response translators, API spec
- Python SDK method (FlowInstance.sparql_query)
- CLI command (tg-invoke-sparql-query)
- Tech spec (docs/tech-specs/sparql-query.md)

New unit tests for SPARQL query
This commit is contained in:
cybermaggedon 2026-04-02 17:21:39 +01:00 committed by GitHub
parent 62c30a3a50
commit d9dc4cbab5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 3498 additions and 3 deletions

View file

@ -101,6 +101,7 @@ pdf-ocr-mistral = "trustgraph.decoding.mistral_ocr:run"
prompt-template = "trustgraph.prompt.template:run"
rev-gateway = "trustgraph.rev_gateway:run"
run-processing = "trustgraph.processing:run"
sparql-query = "trustgraph.query.sparql:run"
structured-query = "trustgraph.retrieval.structured_query:run"
structured-diag = "trustgraph.retrieval.structured_diag:run"
text-completion-azure = "trustgraph.model.text_completion.azure:run"

View file

@ -22,6 +22,7 @@ from . document_rag import DocumentRagRequestor
from . triples_query import TriplesQueryRequestor
from . rows_query import RowsQueryRequestor
from . nlp_query import NLPQueryRequestor
from . sparql_query import SparqlQueryRequestor
from . structured_query import StructuredQueryRequestor
from . structured_diag import StructuredDiagRequestor
from . embeddings import EmbeddingsRequestor
@ -65,6 +66,7 @@ request_response_dispatchers = {
"structured-query": StructuredQueryRequestor,
"structured-diag": StructuredDiagRequestor,
"row-embeddings": RowEmbeddingsQueryRequestor,
"sparql": SparqlQueryRequestor,
}
global_dispatchers = {

View file

@ -0,0 +1,30 @@
from ... schema import SparqlQueryRequest, SparqlQueryResponse
from ... messaging import TranslatorRegistry
from . requestor import ServiceRequestor
class SparqlQueryRequestor(ServiceRequestor):
def __init__(
self, backend, request_queue, response_queue, timeout,
consumer, subscriber,
):
super(SparqlQueryRequestor, self).__init__(
backend=backend,
request_queue=request_queue,
response_queue=response_queue,
request_schema=SparqlQueryRequest,
response_schema=SparqlQueryResponse,
subscription = subscriber,
consumer_name = consumer,
timeout=timeout,
)
self.request_translator = TranslatorRegistry.get_request_translator("sparql-query")
self.response_translator = TranslatorRegistry.get_response_translator("sparql-query")
def to_request(self, body):
return self.request_translator.decode(body)
def from_response(self, message):
return self.response_translator.encode_with_completion(message)

View file

@ -0,0 +1 @@
from . service import *

View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from . service import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,541 @@
"""
SPARQL algebra evaluator.
Recursively evaluates an rdflib SPARQL algebra tree by issuing triple
pattern queries via TriplesClient (streaming) and performing in-memory
joins, filters, and projections.
"""
import logging
from collections import defaultdict
from rdflib.term import Variable, URIRef, Literal, BNode
from rdflib.plugins.sparql.parserutils import CompValue
from ... schema import Term, Triple, IRI, LITERAL, BLANK
from ... knowledge import Uri
from ... knowledge import Literal as KgLiteral
from . parser import rdflib_term_to_term
from . solutions import (
hash_join, left_join, union, project, distinct,
order_by, slice_solutions, _term_key,
)
from . expressions import evaluate_expression, _effective_boolean
logger = logging.getLogger(__name__)
class EvaluationError(Exception):
"""Raised when SPARQL evaluation fails."""
pass
async def evaluate(node, triples_client, user, collection, limit=10000):
"""
Evaluate a SPARQL algebra node.
Args:
node: rdflib CompValue algebra node
triples_client: TriplesClient instance for triple pattern queries
user: user/keyspace identifier
collection: collection identifier
limit: safety limit on results
Returns:
list of solutions (dicts mapping variable names to Term values)
"""
if not isinstance(node, CompValue):
logger.warning(f"Expected CompValue, got {type(node)}: {node}")
return [{}]
name = node.name
handler = _HANDLERS.get(name)
if handler is None:
logger.warning(f"Unsupported algebra node: {name}")
return [{}]
return await handler(node, triples_client, user, collection, limit)
# --- Node handlers ---
async def _eval_select_query(node, tc, user, collection, limit):
"""Evaluate a SelectQuery node."""
return await evaluate(node.p, tc, user, collection, limit)
async def _eval_project(node, tc, user, collection, limit):
"""Evaluate a Project node (SELECT variable projection)."""
solutions = await evaluate(node.p, tc, user, collection, limit)
variables = [str(v) for v in node.PV]
return project(solutions, variables)
async def _eval_bgp(node, tc, user, collection, limit):
"""
Evaluate a Basic Graph Pattern.
Issues streaming triple pattern queries and joins results. Patterns
are ordered by selectivity (more bound terms first) and evaluated
sequentially with bound-variable substitution.
"""
triples = node.triples
if not triples:
return [{}]
# Sort patterns by selectivity: more bound terms = more selective
def selectivity(pattern):
return sum(1 for t in pattern if not isinstance(t, Variable))
sorted_patterns = sorted(
enumerate(triples), key=lambda x: -selectivity(x[1])
)
solutions = [{}]
for _, pattern in sorted_patterns:
s_tmpl, p_tmpl, o_tmpl = pattern
new_solutions = []
for sol in solutions:
# Substitute known bindings into the pattern
s_val = _resolve_term(s_tmpl, sol)
p_val = _resolve_term(p_tmpl, sol)
o_val = _resolve_term(o_tmpl, sol)
# Query the triples store
results = await _query_pattern(
tc, s_val, p_val, o_val, user, collection, limit
)
# Map results back to variable bindings,
# converting Uri/Literal to Term objects
for triple in results:
binding = dict(sol)
if isinstance(s_tmpl, Variable):
binding[str(s_tmpl)] = _to_term(triple.s)
if isinstance(p_tmpl, Variable):
binding[str(p_tmpl)] = _to_term(triple.p)
if isinstance(o_tmpl, Variable):
binding[str(o_tmpl)] = _to_term(triple.o)
new_solutions.append(binding)
solutions = new_solutions
if not solutions:
break
return solutions[:limit]
async def _eval_join(node, tc, user, collection, limit):
"""Evaluate a Join node."""
left = await evaluate(node.p1, tc, user, collection, limit)
right = await evaluate(node.p2, tc, user, collection, limit)
return hash_join(left, right)[:limit]
async def _eval_left_join(node, tc, user, collection, limit):
"""Evaluate a LeftJoin node (OPTIONAL)."""
left_sols = await evaluate(node.p1, tc, user, collection, limit)
right_sols = await evaluate(node.p2, tc, user, collection, limit)
filter_fn = None
if hasattr(node, "expr") and node.expr is not None:
expr = node.expr
if not (isinstance(expr, CompValue) and expr.name == "TrueFilter"):
filter_fn = lambda sol: _effective_boolean(
evaluate_expression(expr, sol)
)
return left_join(left_sols, right_sols, filter_fn)[:limit]
async def _eval_union(node, tc, user, collection, limit):
"""Evaluate a Union node."""
left = await evaluate(node.p1, tc, user, collection, limit)
right = await evaluate(node.p2, tc, user, collection, limit)
return union(left, right)[:limit]
async def _eval_filter(node, tc, user, collection, limit):
"""Evaluate a Filter node."""
solutions = await evaluate(node.p, tc, user, collection, limit)
expr = node.expr
return [
sol for sol in solutions
if _effective_boolean(evaluate_expression(expr, sol))
]
async def _eval_distinct(node, tc, user, collection, limit):
"""Evaluate a Distinct node."""
solutions = await evaluate(node.p, tc, user, collection, limit)
return distinct(solutions)
async def _eval_reduced(node, tc, user, collection, limit):
"""Evaluate a Reduced node (like Distinct but implementation-defined)."""
# Treat same as Distinct
solutions = await evaluate(node.p, tc, user, collection, limit)
return distinct(solutions)
async def _eval_order_by(node, tc, user, collection, limit):
"""Evaluate an OrderBy node."""
solutions = await evaluate(node.p, tc, user, collection, limit)
key_fns = []
for cond in node.expr:
if isinstance(cond, CompValue) and cond.name == "OrderCondition":
ascending = cond.order != "DESC"
expr = cond.expr
key_fns.append((
lambda sol, e=expr: evaluate_expression(e, sol),
ascending,
))
else:
# Simple variable or expression
key_fns.append((
lambda sol, e=cond: evaluate_expression(e, sol),
True,
))
return order_by(solutions, key_fns)
async def _eval_slice(node, tc, user, collection, limit):
"""Evaluate a Slice node (LIMIT/OFFSET)."""
# Pass tighter limit downstream if possible
inner_limit = limit
if node.length is not None:
offset = node.start or 0
inner_limit = min(limit, offset + node.length)
solutions = await evaluate(node.p, tc, user, collection, inner_limit)
return slice_solutions(solutions, node.start or 0, node.length)
async def _eval_extend(node, tc, user, collection, limit):
"""Evaluate an Extend node (BIND)."""
solutions = await evaluate(node.p, tc, user, collection, limit)
var_name = str(node.var)
expr = node.expr
result = []
for sol in solutions:
val = evaluate_expression(expr, sol)
new_sol = dict(sol)
if isinstance(val, Term):
new_sol[var_name] = val
elif isinstance(val, (int, float)):
new_sol[var_name] = Term(type=LITERAL, value=str(val))
elif isinstance(val, str):
new_sol[var_name] = Term(type=LITERAL, value=val)
elif isinstance(val, bool):
new_sol[var_name] = Term(
type=LITERAL, value=str(val).lower(),
datatype="http://www.w3.org/2001/XMLSchema#boolean"
)
elif val is not None:
new_sol[var_name] = Term(type=LITERAL, value=str(val))
result.append(new_sol)
return result
async def _eval_group(node, tc, user, collection, limit):
"""Evaluate a Group node (GROUP BY with aggregation)."""
solutions = await evaluate(node.p, tc, user, collection, limit)
# Extract grouping expressions
group_exprs = []
if hasattr(node, "expr") and node.expr:
for expr in node.expr:
if isinstance(expr, CompValue) and expr.name == "GroupAs":
group_exprs.append((expr.expr, str(expr.var) if hasattr(expr, "var") and expr.var else None))
elif isinstance(expr, Variable):
group_exprs.append((expr, str(expr)))
else:
group_exprs.append((expr, None))
# Group solutions
groups = defaultdict(list)
for sol in solutions:
key_parts = []
for expr, _ in group_exprs:
val = evaluate_expression(expr, sol)
key_parts.append(_term_key(val) if isinstance(val, Term) else val)
groups[tuple(key_parts)].append(sol)
if not group_exprs:
# No GROUP BY - entire result is one group
groups[()].extend(solutions)
# Build grouped solutions (one per group)
result = []
for key, group_sols in groups.items():
sol = {}
# Include group key variables
if group_sols:
for (expr, var_name), k in zip(group_exprs, key):
if var_name and group_sols:
sol[var_name] = evaluate_expression(expr, group_sols[0])
sol["__group__"] = group_sols
result.append(sol)
return result
async def _eval_aggregate_join(node, tc, user, collection, limit):
"""Evaluate an AggregateJoin (aggregation functions after GROUP BY)."""
solutions = await evaluate(node.p, tc, user, collection, limit)
result = []
for sol in solutions:
group = sol.get("__group__", [sol])
new_sol = {k: v for k, v in sol.items() if k != "__group__"}
# Apply aggregate functions
if hasattr(node, "A") and node.A:
for agg in node.A:
var_name = str(agg.res)
agg_val = _compute_aggregate(agg, group)
new_sol[var_name] = agg_val
result.append(new_sol)
return result
async def _eval_graph(node, tc, user, collection, limit):
"""Evaluate a Graph node (GRAPH clause)."""
term = node.term
if isinstance(term, URIRef):
# GRAPH <uri> { ... } — fixed graph
# We'd need to pass graph to triples queries
# For now, evaluate inner pattern normally
logger.info(f"GRAPH <{term}> clause - graph filtering not yet wired")
return await evaluate(node.p, tc, user, collection, limit)
elif isinstance(term, Variable):
# GRAPH ?g { ... } — variable graph
logger.info(f"GRAPH ?{term} clause - variable graph not yet wired")
return await evaluate(node.p, tc, user, collection, limit)
else:
return await evaluate(node.p, tc, user, collection, limit)
async def _eval_values(node, tc, user, collection, limit):
"""Evaluate a VALUES clause (inline data)."""
variables = [str(v) for v in node.var]
solutions = []
for row in node.value:
sol = {}
for var_name, val in zip(variables, row):
if val is not None and str(val) != "UNDEF":
sol[var_name] = rdflib_term_to_term(val)
solutions.append(sol)
return solutions
async def _eval_to_multiset(node, tc, user, collection, limit):
"""Evaluate a ToMultiSet node (subquery)."""
return await evaluate(node.p, tc, user, collection, limit)
# --- Aggregate computation ---
def _compute_aggregate(agg, group):
"""Compute a single aggregate function over a group of solutions."""
agg_name = agg.name if hasattr(agg, "name") else ""
# Get the expression to aggregate
expr = agg.vars if hasattr(agg, "vars") else None
if agg_name == "Aggregate_Count":
if hasattr(agg, "distinct") and agg.distinct:
vals = set()
for sol in group:
if expr:
val = evaluate_expression(expr, sol)
if val is not None:
vals.add(_term_key(val) if isinstance(val, Term) else val)
else:
vals.add(id(sol))
return Term(type=LITERAL, value=str(len(vals)),
datatype="http://www.w3.org/2001/XMLSchema#integer")
return Term(type=LITERAL, value=str(len(group)),
datatype="http://www.w3.org/2001/XMLSchema#integer")
if agg_name == "Aggregate_Sum":
total = 0
for sol in group:
val = evaluate_expression(expr, sol) if expr else None
num = _try_numeric(val)
if num is not None:
total += num
return Term(type=LITERAL, value=str(total),
datatype="http://www.w3.org/2001/XMLSchema#decimal")
if agg_name == "Aggregate_Avg":
total = 0
count = 0
for sol in group:
val = evaluate_expression(expr, sol) if expr else None
num = _try_numeric(val)
if num is not None:
total += num
count += 1
avg = total / count if count > 0 else 0
return Term(type=LITERAL, value=str(avg),
datatype="http://www.w3.org/2001/XMLSchema#decimal")
if agg_name == "Aggregate_Min":
min_val = None
for sol in group:
val = evaluate_expression(expr, sol) if expr else None
if val is not None:
cmp = _term_key(val) if isinstance(val, Term) else val
if min_val is None or cmp < min_val[0]:
min_val = (cmp, val)
if min_val:
val = min_val[1]
if isinstance(val, Term):
return val
return Term(type=LITERAL, value=str(val))
return None
if agg_name == "Aggregate_Max":
max_val = None
for sol in group:
val = evaluate_expression(expr, sol) if expr else None
if val is not None:
cmp = _term_key(val) if isinstance(val, Term) else val
if max_val is None or cmp > max_val[0]:
max_val = (cmp, val)
if max_val:
val = max_val[1]
if isinstance(val, Term):
return val
return Term(type=LITERAL, value=str(val))
return None
if agg_name == "Aggregate_GroupConcat":
separator = agg.separator if hasattr(agg, "separator") else " "
vals = []
for sol in group:
val = evaluate_expression(expr, sol) if expr else None
if val is not None:
if isinstance(val, Term):
vals.append(val.value if val.type == LITERAL else val.iri)
else:
vals.append(str(val))
return Term(type=LITERAL, value=separator.join(vals))
if agg_name == "Aggregate_Sample":
if group:
val = evaluate_expression(expr, group[0]) if expr else None
if isinstance(val, Term):
return val
if val is not None:
return Term(type=LITERAL, value=str(val))
return None
logger.warning(f"Unsupported aggregate: {agg_name}")
return None
# --- Helper functions ---
def _to_term(val):
"""
Convert a value to a schema Term. Handles Uri and Literal from the
knowledge module (returned by TriplesClient) as well as plain strings.
"""
if val is None:
return None
if isinstance(val, Term):
return val
if isinstance(val, Uri):
return Term(type=IRI, iri=str(val))
if isinstance(val, KgLiteral):
return Term(type=LITERAL, value=str(val))
if isinstance(val, str):
if val.startswith("http://") or val.startswith("https://") or val.startswith("urn:"):
return Term(type=IRI, iri=val)
return Term(type=LITERAL, value=val)
return Term(type=LITERAL, value=str(val))
def _resolve_term(tmpl, solution):
"""
Resolve a triple pattern term. If it's a variable and bound in the
solution, return the bound Term. Otherwise return None (wildcard)
for variables, or convert concrete terms.
"""
if isinstance(tmpl, Variable):
name = str(tmpl)
if name in solution:
return solution[name]
return None
else:
return rdflib_term_to_term(tmpl)
async def _query_pattern(tc, s, p, o, user, collection, limit):
"""
Issue a streaming triple pattern query via TriplesClient.
Returns a list of Triple-like objects with s, p, o attributes.
"""
results = await tc.query(
s=s, p=p, o=o,
limit=limit,
user=user,
collection=collection,
)
return results
def _try_numeric(val):
"""Try to convert a value to a number, return None on failure."""
if val is None:
return None
if isinstance(val, (int, float)):
return val
if isinstance(val, Term) and val.type == LITERAL:
try:
if "." in val.value:
return float(val.value)
return int(val.value)
except (ValueError, TypeError):
return None
return None
# --- Handler registry ---
_HANDLERS = {
"SelectQuery": _eval_select_query,
"Project": _eval_project,
"BGP": _eval_bgp,
"Join": _eval_join,
"LeftJoin": _eval_left_join,
"Union": _eval_union,
"Filter": _eval_filter,
"Distinct": _eval_distinct,
"Reduced": _eval_reduced,
"OrderBy": _eval_order_by,
"Slice": _eval_slice,
"Extend": _eval_extend,
"Group": _eval_group,
"AggregateJoin": _eval_aggregate_join,
"Graph": _eval_graph,
"values": _eval_values,
"ToMultiSet": _eval_to_multiset,
}

View file

@ -0,0 +1,481 @@
"""
SPARQL FILTER expression evaluator.
Evaluates rdflib algebra expression nodes against a solution (variable
binding) to produce a value or boolean result.
"""
import re
import logging
import operator
from rdflib.term import Variable, URIRef, Literal, BNode
from rdflib.plugins.sparql.parserutils import CompValue
from ... schema import Term, IRI, LITERAL, BLANK
from . parser import rdflib_term_to_term
logger = logging.getLogger(__name__)
class ExpressionError(Exception):
"""Raised when a SPARQL expression cannot be evaluated."""
pass
def evaluate_expression(expr, solution):
"""
Evaluate a SPARQL expression against a solution binding.
Args:
expr: rdflib algebra expression node
solution: dict mapping variable names to Term values
Returns:
The result value (Term, bool, number, string, or None)
"""
if expr is None:
return True
# rdflib Variable
if isinstance(expr, Variable):
name = str(expr)
return solution.get(name)
# rdflib concrete terms
if isinstance(expr, URIRef):
return Term(type=IRI, iri=str(expr))
if isinstance(expr, Literal):
return rdflib_term_to_term(expr)
if isinstance(expr, BNode):
return Term(type=BLANK, id=str(expr))
# Boolean constants
if isinstance(expr, bool):
return expr
# Numeric constants
if isinstance(expr, (int, float)):
return expr
# String constants
if isinstance(expr, str):
return expr
# CompValue nodes from rdflib algebra
if isinstance(expr, CompValue):
return _evaluate_comp_value(expr, solution)
# List/tuple (e.g. function arguments)
if isinstance(expr, (list, tuple)):
return [evaluate_expression(e, solution) for e in expr]
logger.warning(f"Unknown expression type: {type(expr)}: {expr}")
return None
def _evaluate_comp_value(node, solution):
"""Evaluate a CompValue expression node."""
name = node.name
# Relational expressions: =, !=, <, >, <=, >=
if name == "RelationalExpression":
return _eval_relational(node, solution)
# Conditional AND / OR
if name == "ConditionalAndExpression":
return _eval_conditional_and(node, solution)
if name == "ConditionalOrExpression":
return _eval_conditional_or(node, solution)
# Unary NOT
if name == "UnaryNot":
val = evaluate_expression(node.expr, solution)
return not _effective_boolean(val)
# Unary plus/minus
if name == "UnaryPlus":
return _to_numeric(evaluate_expression(node.expr, solution))
if name == "UnaryMinus":
val = _to_numeric(evaluate_expression(node.expr, solution))
return -val if val is not None else None
# Arithmetic
if name == "AdditiveExpression":
return _eval_additive(node, solution)
if name == "MultiplicativeExpression":
return _eval_multiplicative(node, solution)
# SPARQL built-in functions
if name.startswith("Builtin_"):
return _eval_builtin(name, node, solution)
# Function call
if name == "Function":
return _eval_function(node, solution)
# Exists / NotExists
if name == "Builtin_EXISTS":
# EXISTS requires graph pattern evaluation - not handled here
logger.warning("EXISTS not supported in filter expressions")
return True
if name == "Builtin_NOTEXISTS":
logger.warning("NOT EXISTS not supported in filter expressions")
return True
# TrueFilter (used with OPTIONAL)
if name == "TrueFilter":
return True
# IN / NOT IN
if name == "Builtin_IN":
return _eval_in(node, solution)
if name == "Builtin_NOTIN":
return not _eval_in(node, solution)
logger.warning(f"Unknown CompValue expression: {name}")
return None
def _eval_relational(node, solution):
"""Evaluate a relational expression (=, !=, <, >, <=, >=)."""
left = evaluate_expression(node.expr, solution)
right = evaluate_expression(node.other, solution)
op = node.op
if left is None or right is None:
return False
left_cmp = _comparable_value(left)
right_cmp = _comparable_value(right)
ops = {
"=": operator.eq, "==": operator.eq,
"!=": operator.ne,
"<": operator.lt,
">": operator.gt,
"<=": operator.le,
">=": operator.ge,
}
op_fn = ops.get(str(op))
if op_fn is None:
logger.warning(f"Unknown relational operator: {op}")
return False
try:
return op_fn(left_cmp, right_cmp)
except TypeError:
return False
def _eval_conditional_and(node, solution):
"""Evaluate AND expression."""
result = _effective_boolean(evaluate_expression(node.expr, solution))
if not result:
return False
for other in node.other:
result = _effective_boolean(evaluate_expression(other, solution))
if not result:
return False
return True
def _eval_conditional_or(node, solution):
"""Evaluate OR expression."""
result = _effective_boolean(evaluate_expression(node.expr, solution))
if result:
return True
for other in node.other:
result = _effective_boolean(evaluate_expression(other, solution))
if result:
return True
return False
def _eval_additive(node, solution):
"""Evaluate additive expression (a + b - c ...)."""
result = _to_numeric(evaluate_expression(node.expr, solution))
if result is None:
return None
for op, operand in zip(node.op, node.other):
val = _to_numeric(evaluate_expression(operand, solution))
if val is None:
return None
if str(op) == "+":
result = result + val
elif str(op) == "-":
result = result - val
return result
def _eval_multiplicative(node, solution):
"""Evaluate multiplicative expression (a * b / c ...)."""
result = _to_numeric(evaluate_expression(node.expr, solution))
if result is None:
return None
for op, operand in zip(node.op, node.other):
val = _to_numeric(evaluate_expression(operand, solution))
if val is None:
return None
if str(op) == "*":
result = result * val
elif str(op) == "/":
if val == 0:
return None
result = result / val
return result
def _eval_builtin(name, node, solution):
"""Evaluate SPARQL built-in functions."""
builtin = name[len("Builtin_"):]
if builtin == "BOUND":
var_name = str(node.arg)
return var_name in solution and solution[var_name] is not None
if builtin == "isIRI" or builtin == "isURI":
val = evaluate_expression(node.arg, solution)
return isinstance(val, Term) and val.type == IRI
if builtin == "isLITERAL":
val = evaluate_expression(node.arg, solution)
return isinstance(val, Term) and val.type == LITERAL
if builtin == "isBLANK":
val = evaluate_expression(node.arg, solution)
return isinstance(val, Term) and val.type == BLANK
if builtin == "STR":
val = evaluate_expression(node.arg, solution)
return Term(type=LITERAL, value=_to_string(val))
if builtin == "LANG":
val = evaluate_expression(node.arg, solution)
if isinstance(val, Term) and val.type == LITERAL:
return Term(type=LITERAL, value=val.language or "")
return Term(type=LITERAL, value="")
if builtin == "DATATYPE":
val = evaluate_expression(node.arg, solution)
if isinstance(val, Term) and val.type == LITERAL and val.datatype:
return Term(type=IRI, iri=val.datatype)
return Term(type=IRI, iri="http://www.w3.org/2001/XMLSchema#string")
if builtin == "REGEX":
text = _to_string(evaluate_expression(node.text, solution))
pattern = _to_string(evaluate_expression(node.pattern, solution))
flags_str = ""
if hasattr(node, "flags") and node.flags is not None:
flags_str = _to_string(evaluate_expression(node.flags, solution))
re_flags = 0
if "i" in flags_str:
re_flags |= re.IGNORECASE
if "m" in flags_str:
re_flags |= re.MULTILINE
if "s" in flags_str:
re_flags |= re.DOTALL
try:
return bool(re.search(pattern, text, re_flags))
except re.error:
return False
if builtin == "STRLEN":
val = _to_string(evaluate_expression(node.arg, solution))
return len(val)
if builtin == "UCASE":
val = _to_string(evaluate_expression(node.arg, solution))
return Term(type=LITERAL, value=val.upper())
if builtin == "LCASE":
val = _to_string(evaluate_expression(node.arg, solution))
return Term(type=LITERAL, value=val.lower())
if builtin == "CONTAINS":
string = _to_string(evaluate_expression(node.arg1, solution))
pattern = _to_string(evaluate_expression(node.arg2, solution))
return pattern in string
if builtin == "STRSTARTS":
string = _to_string(evaluate_expression(node.arg1, solution))
prefix = _to_string(evaluate_expression(node.arg2, solution))
return string.startswith(prefix)
if builtin == "STRENDS":
string = _to_string(evaluate_expression(node.arg1, solution))
suffix = _to_string(evaluate_expression(node.arg2, solution))
return string.endswith(suffix)
if builtin == "CONCAT":
args = [_to_string(evaluate_expression(a, solution)) for a in node.arg]
return Term(type=LITERAL, value="".join(args))
if builtin == "IF":
cond = _effective_boolean(evaluate_expression(node.arg1, solution))
if cond:
return evaluate_expression(node.arg2, solution)
else:
return evaluate_expression(node.arg3, solution)
if builtin == "COALESCE":
for arg in node.arg:
val = evaluate_expression(arg, solution)
if val is not None:
return val
return None
if builtin == "sameTerm":
left = evaluate_expression(node.arg1, solution)
right = evaluate_expression(node.arg2, solution)
if not isinstance(left, Term) or not isinstance(right, Term):
return False
from . solutions import _term_key
return _term_key(left) == _term_key(right)
logger.warning(f"Unsupported built-in function: {builtin}")
return None
def _eval_function(node, solution):
"""Evaluate a SPARQL function call."""
# Cast functions (xsd:integer, xsd:string, etc.)
iri = str(node.iri) if hasattr(node, "iri") else ""
args = [evaluate_expression(a, solution) for a in node.expr]
xsd = "http://www.w3.org/2001/XMLSchema#"
if iri == xsd + "integer":
try:
return int(_to_numeric(args[0]))
except (TypeError, ValueError):
return None
elif iri == xsd + "decimal" or iri == xsd + "double" or iri == xsd + "float":
try:
return float(_to_numeric(args[0]))
except (TypeError, ValueError):
return None
elif iri == xsd + "string":
return Term(type=LITERAL, value=_to_string(args[0]))
elif iri == xsd + "boolean":
return _effective_boolean(args[0])
logger.warning(f"Unsupported function: {iri}")
return None
def _eval_in(node, solution):
"""Evaluate IN expression."""
val = evaluate_expression(node.expr, solution)
for item in node.other:
other = evaluate_expression(item, solution)
if _comparable_value(val) == _comparable_value(other):
return True
return False
# --- Value conversion helpers ---
def _effective_boolean(val):
"""Convert a value to its effective boolean value (EBV)."""
if isinstance(val, bool):
return val
if val is None:
return False
if isinstance(val, (int, float)):
return val != 0
if isinstance(val, str):
return len(val) > 0
if isinstance(val, Term):
if val.type == LITERAL:
v = val.value
if val.datatype == "http://www.w3.org/2001/XMLSchema#boolean":
return v.lower() in ("true", "1")
if val.datatype in (
"http://www.w3.org/2001/XMLSchema#integer",
"http://www.w3.org/2001/XMLSchema#decimal",
"http://www.w3.org/2001/XMLSchema#double",
"http://www.w3.org/2001/XMLSchema#float",
):
try:
return float(v) != 0
except ValueError:
return False
return len(v) > 0
return True
return bool(val)
def _to_string(val):
"""Convert a value to a string."""
if val is None:
return ""
if isinstance(val, str):
return val
if isinstance(val, Term):
if val.type == IRI:
return val.iri
elif val.type == LITERAL:
return val.value
elif val.type == BLANK:
return val.id
return str(val)
def _to_numeric(val):
"""Convert a value to a number."""
if val is None:
return None
if isinstance(val, (int, float)):
return val
if isinstance(val, Term) and val.type == LITERAL:
try:
if "." in val.value:
return float(val.value)
return int(val.value)
except (ValueError, TypeError):
return None
if isinstance(val, str):
try:
if "." in val:
return float(val)
return int(val)
except (ValueError, TypeError):
return None
return None
def _comparable_value(val):
"""
Convert a value to a form suitable for comparison.
Returns a tuple (type, value) for consistent ordering.
"""
if val is None:
return (0, "")
if isinstance(val, bool):
return (1, val)
if isinstance(val, (int, float)):
return (2, val)
if isinstance(val, str):
return (3, val)
if isinstance(val, Term):
if val.type == IRI:
return (4, val.iri)
elif val.type == LITERAL:
# Try numeric comparison for numeric types
num = _to_numeric(val)
if num is not None:
return (2, num)
return (3, val.value)
elif val.type == BLANK:
return (5, val.id)
return (6, str(val))

View file

@ -0,0 +1,139 @@
"""
SPARQL parser wrapping rdflib's SPARQL 1.1 parser and algebra compiler.
Parses a SPARQL query string into an algebra tree for evaluation.
"""
import logging
from rdflib.plugins.sparql import prepareQuery
from rdflib.plugins.sparql.algebra import translateQuery
from rdflib.plugins.sparql.parserutils import CompValue
from rdflib.term import Variable, URIRef, Literal, BNode
from ... schema import Term, Triple, IRI, LITERAL, BLANK
logger = logging.getLogger(__name__)
class ParseError(Exception):
"""Raised when a SPARQL query cannot be parsed."""
pass
class ParsedQuery:
"""Result of parsing a SPARQL query string."""
def __init__(self, algebra, query_type, variables=None):
self.algebra = algebra
self.query_type = query_type # "select", "ask", "construct", "describe"
self.variables = variables or [] # projected variable names (SELECT)
def rdflib_term_to_term(t):
"""Convert an rdflib term (URIRef, Literal, BNode) to a schema Term."""
if isinstance(t, URIRef):
return Term(type=IRI, iri=str(t))
elif isinstance(t, Literal):
term = Term(type=LITERAL, value=str(t))
if t.datatype:
term.datatype = str(t.datatype)
if t.language:
term.language = t.language
return term
elif isinstance(t, BNode):
return Term(type=BLANK, id=str(t))
else:
return Term(type=LITERAL, value=str(t))
def term_to_rdflib(t):
"""Convert a schema Term to an rdflib term."""
if t.type == IRI:
return URIRef(t.iri)
elif t.type == LITERAL:
kwargs = {}
if t.datatype:
kwargs["datatype"] = URIRef(t.datatype)
if t.language:
kwargs["lang"] = t.language
return Literal(t.value, **kwargs)
elif t.type == BLANK:
return BNode(t.id)
else:
return Literal(t.value)
def parse_sparql(query_string):
"""
Parse a SPARQL query string into a ParsedQuery.
Args:
query_string: SPARQL 1.1 query string
Returns:
ParsedQuery with algebra tree, query type, and projected variables
Raises:
ParseError: if the query cannot be parsed
"""
try:
prepared = prepareQuery(query_string)
except Exception as e:
raise ParseError(f"SPARQL parse error: {e}") from e
algebra = prepared.algebra
# Determine query type and extract variables
query_type = _detect_query_type(algebra)
variables = _extract_variables(algebra, query_type)
return ParsedQuery(
algebra=algebra,
query_type=query_type,
variables=variables,
)
def _detect_query_type(algebra):
"""Detect the SPARQL query type from the algebra root."""
name = algebra.name
if name == "SelectQuery":
return "select"
elif name == "AskQuery":
return "ask"
elif name == "ConstructQuery":
return "construct"
elif name == "DescribeQuery":
return "describe"
# The top-level algebra node may be a modifier (Project, Slice, etc.)
# wrapping the actual query. Check for common patterns.
if name in ("Project", "Distinct", "Reduced", "OrderBy", "Slice"):
return "select"
logger.warning(f"Unknown algebra root type: {name}, assuming select")
return "select"
def _extract_variables(algebra, query_type):
"""Extract projected variable names from the algebra."""
if query_type != "select":
return []
# For SELECT queries, the Project node has PV (projected variables)
if hasattr(algebra, "PV"):
return [str(v) for v in algebra.PV]
# Walk down through modifiers to find Project
node = algebra
while hasattr(node, "p"):
node = node.p
if hasattr(node, "PV"):
return [str(v) for v in node.PV]
# Fallback: collect all variables from the algebra
if hasattr(algebra, "_vars"):
return [str(v) for v in algebra._vars]
return []

View file

@ -0,0 +1,230 @@
"""
SPARQL query service. Accepts SPARQL queries, decomposes them into triple
pattern lookups via the triples query pub/sub interface, performs in-memory
joins/filters/projections, and returns SPARQL result bindings.
"""
import logging
from ... schema import SparqlQueryRequest, SparqlQueryResponse
from ... schema import SparqlBinding, Error, Term, Triple
from ... base import FlowProcessor, ConsumerSpec, ProducerSpec
from ... base import TriplesClientSpec
from . parser import parse_sparql, ParseError
from . algebra import evaluate, EvaluationError
logger = logging.getLogger(__name__)
default_ident = "sparql-query"
default_concurrency = 10
class Processor(FlowProcessor):
def __init__(self, **params):
id = params.get("id", default_ident)
concurrency = params.get("concurrency", default_concurrency)
super(Processor, self).__init__(
**params | {
"id": id,
"concurrency": concurrency,
}
)
self.register_specification(
ConsumerSpec(
name="request",
schema=SparqlQueryRequest,
handler=self.on_message,
concurrency=concurrency,
)
)
self.register_specification(
ProducerSpec(
name="response",
schema=SparqlQueryResponse,
)
)
self.register_specification(
TriplesClientSpec(
request_name="triples-request",
response_name="triples-response",
)
)
async def on_message(self, msg, consumer, flow):
try:
request = msg.value()
id = msg.properties()["id"]
logger.debug(f"Handling SPARQL query request {id}...")
response = await self.execute_sparql(request, flow)
await flow("response").send(response, properties={"id": id})
logger.debug("SPARQL query request completed")
except Exception as e:
logger.error(
f"Exception in SPARQL query service: {e}", exc_info=True
)
r = SparqlQueryResponse(
error=Error(
type="sparql-query-error",
message=str(e),
),
)
await flow("response").send(r, properties={"id": id})
async def execute_sparql(self, request, flow):
"""Parse and evaluate a SPARQL query."""
# Parse the SPARQL query
try:
parsed = parse_sparql(request.query)
except ParseError as e:
return SparqlQueryResponse(
error=Error(
type="sparql-parse-error",
message=str(e),
),
)
# Get the triples client from the flow
triples_client = flow("triples-request")
# Evaluate the algebra
try:
solutions = await evaluate(
parsed.algebra,
triples_client,
user=request.user or "trustgraph",
collection=request.collection or "default",
limit=request.limit or 10000,
)
except EvaluationError as e:
return SparqlQueryResponse(
error=Error(
type="sparql-evaluation-error",
message=str(e),
),
)
# Build response based on query type
if parsed.query_type == "select":
return self._build_select_response(parsed, solutions)
elif parsed.query_type == "ask":
return self._build_ask_response(solutions)
elif parsed.query_type == "construct":
return self._build_construct_response(parsed, solutions)
elif parsed.query_type == "describe":
return self._build_describe_response(parsed, solutions)
else:
return SparqlQueryResponse(
error=Error(
type="sparql-unsupported",
message=f"Unsupported query type: {parsed.query_type}",
),
)
def _build_select_response(self, parsed, solutions):
"""Build response for SELECT queries."""
variables = parsed.variables
bindings = []
for sol in solutions:
values = [sol.get(v) for v in variables]
bindings.append(SparqlBinding(values=values))
return SparqlQueryResponse(
query_type="select",
variables=variables,
bindings=bindings,
)
def _build_ask_response(self, solutions):
"""Build response for ASK queries."""
return SparqlQueryResponse(
query_type="ask",
ask_result=len(solutions) > 0,
)
def _build_construct_response(self, parsed, solutions):
"""Build response for CONSTRUCT queries."""
# CONSTRUCT template is in the algebra
template = []
if hasattr(parsed.algebra, "template"):
template = parsed.algebra.template
triples = []
seen = set()
for sol in solutions:
for s_tmpl, p_tmpl, o_tmpl in template:
from rdflib.term import Variable
from . parser import rdflib_term_to_term
s = self._resolve_construct_term(s_tmpl, sol)
p = self._resolve_construct_term(p_tmpl, sol)
o = self._resolve_construct_term(o_tmpl, sol)
if s is not None and p is not None and o is not None:
key = (
s.type, s.iri or s.value,
p.type, p.iri or p.value,
o.type, o.iri or o.value,
)
if key not in seen:
seen.add(key)
triples.append(Triple(s=s, p=p, o=o))
return SparqlQueryResponse(
query_type="construct",
triples=triples,
)
def _build_describe_response(self, parsed, solutions):
"""Build response for DESCRIBE queries."""
# DESCRIBE returns all triples about the described resources
# For now, return empty - would need additional triples queries
return SparqlQueryResponse(
query_type="describe",
triples=[],
)
def _resolve_construct_term(self, tmpl, solution):
"""Resolve a CONSTRUCT template term."""
from rdflib.term import Variable
from . parser import rdflib_term_to_term
if isinstance(tmpl, Variable):
return solution.get(str(tmpl))
else:
return rdflib_term_to_term(tmpl)
@staticmethod
def add_args(parser):
FlowProcessor.add_args(parser)
parser.add_argument(
'-c', '--concurrency',
type=int,
default=default_concurrency,
help=f'Number of concurrent requests '
f'(default: {default_concurrency})'
)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -0,0 +1,248 @@
"""
Solution sequence operations for SPARQL evaluation.
A solution is a dict mapping variable names (str) to Term values.
A solution sequence is a list of solutions.
"""
import logging
from collections import defaultdict
from ... schema import Term, IRI, LITERAL, BLANK
logger = logging.getLogger(__name__)
def _term_key(term):
"""Create a hashable key from a Term for join/distinct operations."""
if term is None:
return None
if term.type == IRI:
return ("i", term.iri)
elif term.type == LITERAL:
return ("l", term.value, term.datatype, term.language)
elif term.type == BLANK:
return ("b", term.id)
else:
return ("?", str(term))
def _solution_key(solution, variables):
"""Create a hashable key from a solution for the given variables."""
return tuple(_term_key(solution.get(v)) for v in variables)
def _terms_equal(a, b):
"""Check if two Terms are equal."""
if a is None and b is None:
return True
if a is None or b is None:
return False
return _term_key(a) == _term_key(b)
def _compatible(sol_a, sol_b):
"""Check if two solutions are compatible (agree on shared variables)."""
shared = set(sol_a.keys()) & set(sol_b.keys())
return all(_terms_equal(sol_a[v], sol_b[v]) for v in shared)
def _merge(sol_a, sol_b):
"""Merge two compatible solutions into one."""
result = dict(sol_a)
result.update(sol_b)
return result
def hash_join(left, right):
"""
Inner join two solution sequences on shared variables.
Uses hash join for efficiency.
"""
if not left or not right:
return []
left_vars = set()
for sol in left:
left_vars.update(sol.keys())
right_vars = set()
for sol in right:
right_vars.update(sol.keys())
shared = sorted(left_vars & right_vars)
if not shared:
# Cross product
return [_merge(l, r) for l in left for r in right]
# Build hash table on the smaller side
if len(left) <= len(right):
index = defaultdict(list)
for sol in left:
key = _solution_key(sol, shared)
index[key].append(sol)
results = []
for sol_r in right:
key = _solution_key(sol_r, shared)
for sol_l in index.get(key, []):
results.append(_merge(sol_l, sol_r))
return results
else:
index = defaultdict(list)
for sol in right:
key = _solution_key(sol, shared)
index[key].append(sol)
results = []
for sol_l in left:
key = _solution_key(sol_l, shared)
for sol_r in index.get(key, []):
results.append(_merge(sol_l, sol_r))
return results
def left_join(left, right, filter_fn=None):
"""
Left outer join (OPTIONAL semantics).
Every left solution is preserved. If it joins with right solutions
(and passes the optional filter), the merged solutions are included.
Otherwise the original left solution is kept.
"""
if not left:
return []
if not right:
return list(left)
right_vars = set()
for sol in right:
right_vars.update(sol.keys())
left_vars = set()
for sol in left:
left_vars.update(sol.keys())
shared = sorted(left_vars & right_vars)
# Build hash table on right side
index = defaultdict(list)
for sol in right:
key = _solution_key(sol, shared) if shared else ()
index[key].append(sol)
results = []
for sol_l in left:
key = _solution_key(sol_l, shared) if shared else ()
matches = index.get(key, [])
matched = False
for sol_r in matches:
merged = _merge(sol_l, sol_r)
if filter_fn is None or filter_fn(merged):
results.append(merged)
matched = True
if not matched:
results.append(dict(sol_l))
return results
def union(left, right):
"""Union two solution sequences (concatenation)."""
return list(left) + list(right)
def project(solutions, variables):
"""Keep only the specified variables in each solution."""
return [
{v: sol[v] for v in variables if v in sol}
for sol in solutions
]
def distinct(solutions):
"""Remove duplicate solutions."""
seen = set()
results = []
for sol in solutions:
key = tuple(sorted(
(k, _term_key(v)) for k, v in sol.items()
))
if key not in seen:
seen.add(key)
results.append(sol)
return results
def order_by(solutions, key_fns):
"""
Sort solutions by the given key functions.
key_fns is a list of (fn, ascending) tuples where fn extracts
a comparable value from a solution.
"""
if not key_fns:
return solutions
def sort_key(sol):
keys = []
for fn, ascending in key_fns:
val = fn(sol)
# Convert to comparable form
if val is None:
comparable = ("", "")
elif isinstance(val, Term):
comparable = _term_key(val)
else:
comparable = ("v", str(val))
keys.append(comparable)
return keys
# Handle ascending/descending
# For simplicity, sort ascending then reverse individual keys
# This works for single sort keys; for multiple mixed keys we
# need a wrapper
result = sorted(solutions, key=sort_key)
# If any key is descending, we need a more complex approach.
# Check if all are same direction for the simple case.
if key_fns and all(not asc for _, asc in key_fns):
result.reverse()
elif key_fns and not all(asc for _, asc in key_fns):
# Mixed ascending/descending - use negation wrapper
result = _mixed_sort(solutions, key_fns)
return result
def _mixed_sort(solutions, key_fns):
"""Sort with mixed ascending/descending keys."""
import functools
def compare(a, b):
for fn, ascending in key_fns:
va = fn(a)
vb = fn(b)
ka = _term_key(va) if isinstance(va, Term) else ("v", str(va)) if va is not None else ("", "")
kb = _term_key(vb) if isinstance(vb, Term) else ("v", str(vb)) if vb is not None else ("", "")
if ka < kb:
return -1 if ascending else 1
elif ka > kb:
return 1 if ascending else -1
return 0
return sorted(solutions, key=functools.cmp_to_key(compare))
def slice_solutions(solutions, offset=0, limit=None):
"""Apply OFFSET and LIMIT to a solution sequence."""
if offset:
solutions = solutions[offset:]
if limit is not None:
solutions = solutions[:limit]
return solutions