mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-06-16 10:25:13 +02:00
Additional user fixes and test fixes
This commit is contained in:
parent
db05427d0e
commit
7f0f79dd15
62 changed files with 1078 additions and 1315 deletions
|
|
@ -6,9 +6,9 @@ import re
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def make_safe_collection_name(user, collection, prefix):
|
||||
def make_safe_collection_name(workspace, collection, prefix):
|
||||
"""
|
||||
Create a safe Milvus collection name from user/collection parameters.
|
||||
Create a safe Milvus collection name from workspace/collection parameters.
|
||||
Milvus only allows letters, numbers, and underscores.
|
||||
"""
|
||||
def sanitize(s):
|
||||
|
|
@ -23,10 +23,10 @@ def make_safe_collection_name(user, collection, prefix):
|
|||
safe = 'default'
|
||||
return safe
|
||||
|
||||
safe_user = sanitize(user)
|
||||
safe_workspace = sanitize(workspace)
|
||||
safe_collection = sanitize(collection)
|
||||
|
||||
return f"{prefix}_{safe_user}_{safe_collection}"
|
||||
return f"{prefix}_{safe_workspace}_{safe_collection}"
|
||||
|
||||
class DocVectors:
|
||||
|
||||
|
|
@ -49,26 +49,26 @@ class DocVectors:
|
|||
self.next_reload = time.time() + self.reload_time
|
||||
logger.debug(f"Reload at {self.next_reload}")
|
||||
|
||||
def collection_exists(self, user, collection):
|
||||
def collection_exists(self, workspace, collection):
|
||||
"""
|
||||
Check if any collection exists for this user/collection combination.
|
||||
Check if any collection exists for this workspace/collection combination.
|
||||
Since collections are dimension-specific, this checks if ANY dimension variant exists.
|
||||
"""
|
||||
base_name = make_safe_collection_name(user, collection, self.prefix)
|
||||
base_name = make_safe_collection_name(workspace, collection, self.prefix)
|
||||
prefix = f"{base_name}_"
|
||||
all_collections = self.client.list_collections()
|
||||
return any(coll.startswith(prefix) for coll in all_collections)
|
||||
|
||||
def create_collection(self, user, collection, dimension=384):
|
||||
def create_collection(self, workspace, collection, dimension=384):
|
||||
"""
|
||||
No-op for explicit collection creation.
|
||||
Collections are created lazily on first insert with actual dimension.
|
||||
"""
|
||||
logger.info(f"Collection creation requested for {user}/{collection} - will be created lazily on first insert")
|
||||
logger.info(f"Collection creation requested for {workspace}/{collection} - will be created lazily on first insert")
|
||||
|
||||
def init_collection(self, dimension, user, collection):
|
||||
def init_collection(self, dimension, workspace, collection):
|
||||
|
||||
base_name = make_safe_collection_name(user, collection, self.prefix)
|
||||
base_name = make_safe_collection_name(workspace, collection, self.prefix)
|
||||
collection_name = f"{base_name}_{dimension}"
|
||||
|
||||
pkey_field = FieldSchema(
|
||||
|
|
@ -116,15 +116,15 @@ class DocVectors:
|
|||
index_params=index_params
|
||||
)
|
||||
|
||||
self.collections[(dimension, user, collection)] = collection_name
|
||||
self.collections[(dimension, workspace, collection)] = collection_name
|
||||
logger.info(f"Created Milvus collection {collection_name} with dimension {dimension}")
|
||||
|
||||
def insert(self, embeds, chunk_id, user, collection):
|
||||
def insert(self, embeds, chunk_id, workspace, collection):
|
||||
|
||||
dim = len(embeds)
|
||||
|
||||
if (dim, user, collection) not in self.collections:
|
||||
self.init_collection(dim, user, collection)
|
||||
if (dim, workspace, collection) not in self.collections:
|
||||
self.init_collection(dim, workspace, collection)
|
||||
|
||||
data = [
|
||||
{
|
||||
|
|
@ -134,25 +134,25 @@ class DocVectors:
|
|||
]
|
||||
|
||||
self.client.insert(
|
||||
collection_name=self.collections[(dim, user, collection)],
|
||||
collection_name=self.collections[(dim, workspace, collection)],
|
||||
data=data
|
||||
)
|
||||
|
||||
def search(self, embeds, user, collection, fields=["chunk_id"], limit=10):
|
||||
def search(self, embeds, workspace, collection, fields=["chunk_id"], limit=10):
|
||||
|
||||
dim = len(embeds)
|
||||
|
||||
# Check if collection exists - return empty if not
|
||||
if (dim, user, collection) not in self.collections:
|
||||
base_name = make_safe_collection_name(user, collection, self.prefix)
|
||||
if (dim, workspace, collection) not in self.collections:
|
||||
base_name = make_safe_collection_name(workspace, collection, self.prefix)
|
||||
collection_name = f"{base_name}_{dim}"
|
||||
if not self.client.has_collection(collection_name):
|
||||
logger.info(f"Collection {collection_name} does not exist, returning empty results")
|
||||
return []
|
||||
# Collection exists but not in cache, add it
|
||||
self.collections[(dim, user, collection)] = collection_name
|
||||
self.collections[(dim, workspace, collection)] = collection_name
|
||||
|
||||
coll = self.collections[(dim, user, collection)]
|
||||
coll = self.collections[(dim, workspace, collection)]
|
||||
|
||||
logger.debug("Loading...")
|
||||
self.client.load_collection(
|
||||
|
|
@ -181,12 +181,12 @@ class DocVectors:
|
|||
|
||||
return res
|
||||
|
||||
def delete_collection(self, user, collection):
|
||||
def delete_collection(self, workspace, collection):
|
||||
"""
|
||||
Delete all dimension variants of the collection for the given user/collection.
|
||||
Delete all dimension variants of the collection for the given workspace/collection.
|
||||
Since collections are created with dimension suffixes, we need to find and delete all.
|
||||
"""
|
||||
base_name = make_safe_collection_name(user, collection, self.prefix)
|
||||
base_name = make_safe_collection_name(workspace, collection, self.prefix)
|
||||
prefix = f"{base_name}_"
|
||||
|
||||
# Get all collections and filter for matches
|
||||
|
|
@ -199,10 +199,10 @@ class DocVectors:
|
|||
for collection_name in matching_collections:
|
||||
self.client.drop_collection(collection_name)
|
||||
logger.info(f"Deleted Milvus collection: {collection_name}")
|
||||
logger.info(f"Deleted {len(matching_collections)} collection(s) for {user}/{collection}")
|
||||
logger.info(f"Deleted {len(matching_collections)} collection(s) for {workspace}/{collection}")
|
||||
|
||||
# Remove from our local cache
|
||||
keys_to_remove = [key for key in self.collections.keys() if key[1] == user and key[2] == collection]
|
||||
keys_to_remove = [key for key in self.collections.keys() if key[1] == workspace and key[2] == collection]
|
||||
for key in keys_to_remove:
|
||||
del self.collections[key]
|
||||
|
||||
|
|
|
|||
|
|
@ -6,9 +6,9 @@ import re
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def make_safe_collection_name(user, collection, prefix):
|
||||
def make_safe_collection_name(workspace, collection, prefix):
|
||||
"""
|
||||
Create a safe Milvus collection name from user/collection parameters.
|
||||
Create a safe Milvus collection name from workspace/collection parameters.
|
||||
Milvus only allows letters, numbers, and underscores.
|
||||
"""
|
||||
def sanitize(s):
|
||||
|
|
@ -23,10 +23,10 @@ def make_safe_collection_name(user, collection, prefix):
|
|||
safe = 'default'
|
||||
return safe
|
||||
|
||||
safe_user = sanitize(user)
|
||||
safe_workspace = sanitize(workspace)
|
||||
safe_collection = sanitize(collection)
|
||||
|
||||
return f"{prefix}_{safe_user}_{safe_collection}"
|
||||
return f"{prefix}_{safe_workspace}_{safe_collection}"
|
||||
|
||||
class EntityVectors:
|
||||
|
||||
|
|
@ -49,26 +49,26 @@ class EntityVectors:
|
|||
self.next_reload = time.time() + self.reload_time
|
||||
logger.debug(f"Reload at {self.next_reload}")
|
||||
|
||||
def collection_exists(self, user, collection):
|
||||
def collection_exists(self, workspace, collection):
|
||||
"""
|
||||
Check if any collection exists for this user/collection combination.
|
||||
Check if any collection exists for this workspace/collection combination.
|
||||
Since collections are dimension-specific, this checks if ANY dimension variant exists.
|
||||
"""
|
||||
base_name = make_safe_collection_name(user, collection, self.prefix)
|
||||
base_name = make_safe_collection_name(workspace, collection, self.prefix)
|
||||
prefix = f"{base_name}_"
|
||||
all_collections = self.client.list_collections()
|
||||
return any(coll.startswith(prefix) for coll in all_collections)
|
||||
|
||||
def create_collection(self, user, collection, dimension=384):
|
||||
def create_collection(self, workspace, collection, dimension=384):
|
||||
"""
|
||||
No-op for explicit collection creation.
|
||||
Collections are created lazily on first insert with actual dimension.
|
||||
"""
|
||||
logger.info(f"Collection creation requested for {user}/{collection} - will be created lazily on first insert")
|
||||
logger.info(f"Collection creation requested for {workspace}/{collection} - will be created lazily on first insert")
|
||||
|
||||
def init_collection(self, dimension, user, collection):
|
||||
def init_collection(self, dimension, workspace, collection):
|
||||
|
||||
base_name = make_safe_collection_name(user, collection, self.prefix)
|
||||
base_name = make_safe_collection_name(workspace, collection, self.prefix)
|
||||
collection_name = f"{base_name}_{dimension}"
|
||||
|
||||
pkey_field = FieldSchema(
|
||||
|
|
@ -122,15 +122,15 @@ class EntityVectors:
|
|||
index_params=index_params
|
||||
)
|
||||
|
||||
self.collections[(dimension, user, collection)] = collection_name
|
||||
self.collections[(dimension, workspace, collection)] = collection_name
|
||||
logger.info(f"Created Milvus collection {collection_name} with dimension {dimension}")
|
||||
|
||||
def insert(self, embeds, entity, user, collection, chunk_id=""):
|
||||
def insert(self, embeds, entity, workspace, collection, chunk_id=""):
|
||||
|
||||
dim = len(embeds)
|
||||
|
||||
if (dim, user, collection) not in self.collections:
|
||||
self.init_collection(dim, user, collection)
|
||||
if (dim, workspace, collection) not in self.collections:
|
||||
self.init_collection(dim, workspace, collection)
|
||||
|
||||
data = [
|
||||
{
|
||||
|
|
@ -141,25 +141,25 @@ class EntityVectors:
|
|||
]
|
||||
|
||||
self.client.insert(
|
||||
collection_name=self.collections[(dim, user, collection)],
|
||||
collection_name=self.collections[(dim, workspace, collection)],
|
||||
data=data
|
||||
)
|
||||
|
||||
def search(self, embeds, user, collection, fields=["entity"], limit=10):
|
||||
def search(self, embeds, workspace, collection, fields=["entity"], limit=10):
|
||||
|
||||
dim = len(embeds)
|
||||
|
||||
# Check if collection exists - return empty if not
|
||||
if (dim, user, collection) not in self.collections:
|
||||
base_name = make_safe_collection_name(user, collection, self.prefix)
|
||||
if (dim, workspace, collection) not in self.collections:
|
||||
base_name = make_safe_collection_name(workspace, collection, self.prefix)
|
||||
collection_name = f"{base_name}_{dim}"
|
||||
if not self.client.has_collection(collection_name):
|
||||
logger.info(f"Collection {collection_name} does not exist, returning empty results")
|
||||
return []
|
||||
# Collection exists but not in cache, add it
|
||||
self.collections[(dim, user, collection)] = collection_name
|
||||
self.collections[(dim, workspace, collection)] = collection_name
|
||||
|
||||
coll = self.collections[(dim, user, collection)]
|
||||
coll = self.collections[(dim, workspace, collection)]
|
||||
|
||||
logger.debug("Loading...")
|
||||
self.client.load_collection(
|
||||
|
|
@ -188,12 +188,12 @@ class EntityVectors:
|
|||
|
||||
return res
|
||||
|
||||
def delete_collection(self, user, collection):
|
||||
def delete_collection(self, workspace, collection):
|
||||
"""
|
||||
Delete all dimension variants of the collection for the given user/collection.
|
||||
Delete all dimension variants of the collection for the given workspace/collection.
|
||||
Since collections are created with dimension suffixes, we need to find and delete all.
|
||||
"""
|
||||
base_name = make_safe_collection_name(user, collection, self.prefix)
|
||||
base_name = make_safe_collection_name(workspace, collection, self.prefix)
|
||||
prefix = f"{base_name}_"
|
||||
|
||||
# Get all collections and filter for matches
|
||||
|
|
@ -206,10 +206,10 @@ class EntityVectors:
|
|||
for collection_name in matching_collections:
|
||||
self.client.drop_collection(collection_name)
|
||||
logger.info(f"Deleted Milvus collection: {collection_name}")
|
||||
logger.info(f"Deleted {len(matching_collections)} collection(s) for {user}/{collection}")
|
||||
logger.info(f"Deleted {len(matching_collections)} collection(s) for {workspace}/{collection}")
|
||||
|
||||
# Remove from our local cache
|
||||
keys_to_remove = [key for key in self.collections.keys() if key[1] == user and key[2] == collection]
|
||||
keys_to_remove = [key for key in self.collections.keys() if key[1] == workspace and key[2] == collection]
|
||||
for key in keys_to_remove:
|
||||
del self.collections[key]
|
||||
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ class GraphQLSchemaBuilder:
|
|||
Build the GraphQL schema with the provided query callback.
|
||||
|
||||
The query callback will be invoked when resolving queries, with:
|
||||
- user: str
|
||||
- workspace: str
|
||||
- collection: str
|
||||
- schema_name: str
|
||||
- row_schema: RowSchema
|
||||
|
|
@ -228,7 +228,7 @@ class GraphQLSchemaBuilder:
|
|||
limit: Optional[int] = 100
|
||||
) -> List[graphql_type]:
|
||||
# Get context values
|
||||
user = info.context["user"]
|
||||
workspace = info.context["workspace"]
|
||||
collection = info.context["collection"]
|
||||
|
||||
# Parse the where clause
|
||||
|
|
@ -236,7 +236,7 @@ class GraphQLSchemaBuilder:
|
|||
|
||||
# Call the query backend
|
||||
results = await query_callback(
|
||||
user, collection, schema_name, row_schema,
|
||||
workspace, collection, schema_name, row_schema,
|
||||
filters, limit, order_by, direction
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -167,7 +167,7 @@ class QueryExplainer:
|
|||
question_components, query_results, processing_metadata
|
||||
)
|
||||
|
||||
# Generate user-friendly explanation
|
||||
# Generate workspace-friendly explanation
|
||||
user_friendly_explanation = self._generate_user_friendly_explanation(
|
||||
question, question_components, ontology_subsets, final_answer
|
||||
)
|
||||
|
|
@ -503,7 +503,7 @@ class QueryExplainer:
|
|||
question_components: QuestionComponents,
|
||||
ontology_subsets: List[QueryOntologySubset],
|
||||
final_answer: str) -> str:
|
||||
"""Generate user-friendly explanation of the process."""
|
||||
"""Generate workspace-friendly explanation of the process."""
|
||||
explanation_parts = []
|
||||
|
||||
# Introduction
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
@dataclass
|
||||
class QueryRequest:
|
||||
"""Query request from user."""
|
||||
"""Query request from workspace."""
|
||||
question: str
|
||||
context: Optional[str] = None
|
||||
ontology_hint: Optional[str] = None
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
"""
|
||||
Question analyzer for ontology-sensitive query system.
|
||||
Decomposes user questions into semantic components.
|
||||
Decomposes workspace questions into semantic components.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
"""
|
||||
Row embeddings query service for Qdrant.
|
||||
|
||||
Input is query vectors plus user/collection/schema context.
|
||||
Input is query vectors plus workspace/collection/schema context.
|
||||
Output is matching row index information (index_name, index_value) for
|
||||
use in subsequent Cassandra lookups.
|
||||
"""
|
||||
|
|
@ -70,10 +70,10 @@ class Processor(FlowProcessor):
|
|||
safe_name = 'r_' + safe_name
|
||||
return safe_name.lower()
|
||||
|
||||
def find_collection(self, user: str, collection: str, schema_name: str) -> Optional[str]:
|
||||
"""Find the Qdrant collection for a given user/collection/schema"""
|
||||
def find_collection(self, workspace: str, collection: str, schema_name: str) -> Optional[str]:
|
||||
"""Find the Qdrant collection for a given workspace/collection/schema"""
|
||||
prefix = (
|
||||
f"rows_{self.sanitize_name(user)}_"
|
||||
f"rows_{self.sanitize_name(workspace)}_"
|
||||
f"{self.sanitize_name(collection)}_{self.sanitize_name(schema_name)}_"
|
||||
)
|
||||
|
||||
|
|
@ -163,7 +163,7 @@ class Processor(FlowProcessor):
|
|||
|
||||
logger.debug(
|
||||
f"Handling row embeddings query for "
|
||||
f"{request.user}/{request.collection}/{request.schema_name}..."
|
||||
f"{flow.workspace}/{request.collection}/{request.schema_name}..."
|
||||
)
|
||||
|
||||
# Execute query
|
||||
|
|
|
|||
|
|
@ -238,7 +238,7 @@ class Processor(FlowProcessor):
|
|||
|
||||
async def query_cassandra(
|
||||
self,
|
||||
user: str,
|
||||
workspace: str,
|
||||
collection: str,
|
||||
schema_name: str,
|
||||
row_schema: RowSchema,
|
||||
|
|
@ -256,7 +256,7 @@ class Processor(FlowProcessor):
|
|||
# Connect if needed
|
||||
self.connect_cassandra()
|
||||
|
||||
safe_keyspace = self.sanitize_name(user)
|
||||
safe_keyspace = self.sanitize_name(workspace)
|
||||
|
||||
# Try to find an index that matches the filters
|
||||
index_match = self.find_matching_index(row_schema, filters)
|
||||
|
|
@ -409,7 +409,6 @@ class Processor(FlowProcessor):
|
|||
query: str,
|
||||
variables: Dict[str, Any],
|
||||
operation_name: Optional[str],
|
||||
user: str,
|
||||
collection: str
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute a GraphQL query against the workspace's schema"""
|
||||
|
|
@ -424,7 +423,7 @@ class Processor(FlowProcessor):
|
|||
# Create context for the query
|
||||
context = {
|
||||
"processor": self,
|
||||
"user": user,
|
||||
"workspace": workspace,
|
||||
"collection": collection
|
||||
}
|
||||
|
||||
|
|
@ -479,7 +478,6 @@ class Processor(FlowProcessor):
|
|||
query=request.query,
|
||||
variables=dict(request.variables) if request.variables else {},
|
||||
operation_name=request.operation_name,
|
||||
user=request.user,
|
||||
collection=request.collection
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -30,14 +30,14 @@ class EvaluationError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
async def evaluate(node, triples_client, user, collection, limit=10000):
|
||||
async def evaluate(node, triples_client, workspace, collection, limit=10000):
|
||||
"""
|
||||
Evaluate a SPARQL algebra node.
|
||||
|
||||
Args:
|
||||
node: rdflib CompValue algebra node
|
||||
triples_client: TriplesClient instance for triple pattern queries
|
||||
user: user/keyspace identifier
|
||||
workspace: workspace/keyspace identifier
|
||||
collection: collection identifier
|
||||
limit: safety limit on results
|
||||
|
||||
|
|
@ -55,24 +55,24 @@ async def evaluate(node, triples_client, user, collection, limit=10000):
|
|||
logger.warning(f"Unsupported algebra node: {name}")
|
||||
return [{}]
|
||||
|
||||
return await handler(node, triples_client, user, collection, limit)
|
||||
return await handler(node, triples_client, workspace, collection, limit)
|
||||
|
||||
|
||||
# --- Node handlers ---
|
||||
|
||||
async def _eval_select_query(node, tc, user, collection, limit):
|
||||
async def _eval_select_query(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a SelectQuery node."""
|
||||
return await evaluate(node.p, tc, user, collection, limit)
|
||||
return await evaluate(node.p, tc, workspace, collection, limit)
|
||||
|
||||
|
||||
async def _eval_project(node, tc, user, collection, limit):
|
||||
async def _eval_project(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Project node (SELECT variable projection)."""
|
||||
solutions = await evaluate(node.p, tc, user, collection, limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, limit)
|
||||
variables = [str(v) for v in node.PV]
|
||||
return project(solutions, variables)
|
||||
|
||||
|
||||
async def _eval_bgp(node, tc, user, collection, limit):
|
||||
async def _eval_bgp(node, tc, workspace, collection, limit):
|
||||
"""
|
||||
Evaluate a Basic Graph Pattern.
|
||||
|
||||
|
|
@ -107,7 +107,7 @@ async def _eval_bgp(node, tc, user, collection, limit):
|
|||
|
||||
# Query the triples store
|
||||
results = await _query_pattern(
|
||||
tc, s_val, p_val, o_val, user, collection, limit
|
||||
tc, s_val, p_val, o_val, workspace, collection, limit
|
||||
)
|
||||
|
||||
# Map results back to variable bindings,
|
||||
|
|
@ -130,17 +130,17 @@ async def _eval_bgp(node, tc, user, collection, limit):
|
|||
return solutions[:limit]
|
||||
|
||||
|
||||
async def _eval_join(node, tc, user, collection, limit):
|
||||
async def _eval_join(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Join node."""
|
||||
left = await evaluate(node.p1, tc, user, collection, limit)
|
||||
right = await evaluate(node.p2, tc, user, collection, limit)
|
||||
left = await evaluate(node.p1, tc, workspace, collection, limit)
|
||||
right = await evaluate(node.p2, tc, workspace, collection, limit)
|
||||
return hash_join(left, right)[:limit]
|
||||
|
||||
|
||||
async def _eval_left_join(node, tc, user, collection, limit):
|
||||
async def _eval_left_join(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a LeftJoin node (OPTIONAL)."""
|
||||
left_sols = await evaluate(node.p1, tc, user, collection, limit)
|
||||
right_sols = await evaluate(node.p2, tc, user, collection, limit)
|
||||
left_sols = await evaluate(node.p1, tc, workspace, collection, limit)
|
||||
right_sols = await evaluate(node.p2, tc, workspace, collection, limit)
|
||||
|
||||
filter_fn = None
|
||||
if hasattr(node, "expr") and node.expr is not None:
|
||||
|
|
@ -153,16 +153,16 @@ async def _eval_left_join(node, tc, user, collection, limit):
|
|||
return left_join(left_sols, right_sols, filter_fn)[:limit]
|
||||
|
||||
|
||||
async def _eval_union(node, tc, user, collection, limit):
|
||||
async def _eval_union(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Union node."""
|
||||
left = await evaluate(node.p1, tc, user, collection, limit)
|
||||
right = await evaluate(node.p2, tc, user, collection, limit)
|
||||
left = await evaluate(node.p1, tc, workspace, collection, limit)
|
||||
right = await evaluate(node.p2, tc, workspace, collection, limit)
|
||||
return union(left, right)[:limit]
|
||||
|
||||
|
||||
async def _eval_filter(node, tc, user, collection, limit):
|
||||
async def _eval_filter(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Filter node."""
|
||||
solutions = await evaluate(node.p, tc, user, collection, limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, limit)
|
||||
expr = node.expr
|
||||
return [
|
||||
sol for sol in solutions
|
||||
|
|
@ -170,22 +170,22 @@ async def _eval_filter(node, tc, user, collection, limit):
|
|||
]
|
||||
|
||||
|
||||
async def _eval_distinct(node, tc, user, collection, limit):
|
||||
async def _eval_distinct(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Distinct node."""
|
||||
solutions = await evaluate(node.p, tc, user, collection, limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, limit)
|
||||
return distinct(solutions)
|
||||
|
||||
|
||||
async def _eval_reduced(node, tc, user, collection, limit):
|
||||
async def _eval_reduced(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Reduced node (like Distinct but implementation-defined)."""
|
||||
# Treat same as Distinct
|
||||
solutions = await evaluate(node.p, tc, user, collection, limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, limit)
|
||||
return distinct(solutions)
|
||||
|
||||
|
||||
async def _eval_order_by(node, tc, user, collection, limit):
|
||||
async def _eval_order_by(node, tc, workspace, collection, limit):
|
||||
"""Evaluate an OrderBy node."""
|
||||
solutions = await evaluate(node.p, tc, user, collection, limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, limit)
|
||||
|
||||
key_fns = []
|
||||
for cond in node.expr:
|
||||
|
|
@ -206,7 +206,7 @@ async def _eval_order_by(node, tc, user, collection, limit):
|
|||
return order_by(solutions, key_fns)
|
||||
|
||||
|
||||
async def _eval_slice(node, tc, user, collection, limit):
|
||||
async def _eval_slice(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Slice node (LIMIT/OFFSET)."""
|
||||
# Pass tighter limit downstream if possible
|
||||
inner_limit = limit
|
||||
|
|
@ -214,13 +214,13 @@ async def _eval_slice(node, tc, user, collection, limit):
|
|||
offset = node.start or 0
|
||||
inner_limit = min(limit, offset + node.length)
|
||||
|
||||
solutions = await evaluate(node.p, tc, user, collection, inner_limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, inner_limit)
|
||||
return slice_solutions(solutions, node.start or 0, node.length)
|
||||
|
||||
|
||||
async def _eval_extend(node, tc, user, collection, limit):
|
||||
async def _eval_extend(node, tc, workspace, collection, limit):
|
||||
"""Evaluate an Extend node (BIND)."""
|
||||
solutions = await evaluate(node.p, tc, user, collection, limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, limit)
|
||||
var_name = str(node.var)
|
||||
expr = node.expr
|
||||
|
||||
|
|
@ -246,9 +246,9 @@ async def _eval_extend(node, tc, user, collection, limit):
|
|||
return result
|
||||
|
||||
|
||||
async def _eval_group(node, tc, user, collection, limit):
|
||||
async def _eval_group(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Group node (GROUP BY with aggregation)."""
|
||||
solutions = await evaluate(node.p, tc, user, collection, limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, limit)
|
||||
|
||||
# Extract grouping expressions
|
||||
group_exprs = []
|
||||
|
|
@ -289,9 +289,9 @@ async def _eval_group(node, tc, user, collection, limit):
|
|||
return result
|
||||
|
||||
|
||||
async def _eval_aggregate_join(node, tc, user, collection, limit):
|
||||
async def _eval_aggregate_join(node, tc, workspace, collection, limit):
|
||||
"""Evaluate an AggregateJoin (aggregation functions after GROUP BY)."""
|
||||
solutions = await evaluate(node.p, tc, user, collection, limit)
|
||||
solutions = await evaluate(node.p, tc, workspace, collection, limit)
|
||||
|
||||
result = []
|
||||
for sol in solutions:
|
||||
|
|
@ -310,7 +310,7 @@ async def _eval_aggregate_join(node, tc, user, collection, limit):
|
|||
return result
|
||||
|
||||
|
||||
async def _eval_graph(node, tc, user, collection, limit):
|
||||
async def _eval_graph(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a Graph node (GRAPH clause)."""
|
||||
term = node.term
|
||||
|
||||
|
|
@ -319,16 +319,16 @@ async def _eval_graph(node, tc, user, collection, limit):
|
|||
# We'd need to pass graph to triples queries
|
||||
# For now, evaluate inner pattern normally
|
||||
logger.info(f"GRAPH <{term}> clause - graph filtering not yet wired")
|
||||
return await evaluate(node.p, tc, user, collection, limit)
|
||||
return await evaluate(node.p, tc, workspace, collection, limit)
|
||||
elif isinstance(term, Variable):
|
||||
# GRAPH ?g { ... } — variable graph
|
||||
logger.info(f"GRAPH ?{term} clause - variable graph not yet wired")
|
||||
return await evaluate(node.p, tc, user, collection, limit)
|
||||
return await evaluate(node.p, tc, workspace, collection, limit)
|
||||
else:
|
||||
return await evaluate(node.p, tc, user, collection, limit)
|
||||
return await evaluate(node.p, tc, workspace, collection, limit)
|
||||
|
||||
|
||||
async def _eval_values(node, tc, user, collection, limit):
|
||||
async def _eval_values(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a VALUES clause (inline data)."""
|
||||
variables = [str(v) for v in node.var]
|
||||
solutions = []
|
||||
|
|
@ -343,9 +343,9 @@ async def _eval_values(node, tc, user, collection, limit):
|
|||
return solutions
|
||||
|
||||
|
||||
async def _eval_to_multiset(node, tc, user, collection, limit):
|
||||
async def _eval_to_multiset(node, tc, workspace, collection, limit):
|
||||
"""Evaluate a ToMultiSet node (subquery)."""
|
||||
return await evaluate(node.p, tc, user, collection, limit)
|
||||
return await evaluate(node.p, tc, workspace, collection, limit)
|
||||
|
||||
|
||||
# --- Aggregate computation ---
|
||||
|
|
@ -487,7 +487,7 @@ def _resolve_term(tmpl, solution):
|
|||
return rdflib_term_to_term(tmpl)
|
||||
|
||||
|
||||
async def _query_pattern(tc, s, p, o, user, collection, limit):
|
||||
async def _query_pattern(tc, s, p, o, workspace, collection, limit):
|
||||
"""
|
||||
Issue a streaming triple pattern query via TriplesClient.
|
||||
|
||||
|
|
@ -496,7 +496,7 @@ async def _query_pattern(tc, s, p, o, user, collection, limit):
|
|||
results = await tc.query(
|
||||
s=s, p=p, o=o,
|
||||
limit=limit,
|
||||
user=user,
|
||||
workspace=workspace,
|
||||
collection=collection,
|
||||
)
|
||||
return results
|
||||
|
|
|
|||
|
|
@ -141,7 +141,7 @@ class Processor(FlowProcessor):
|
|||
solutions = await evaluate(
|
||||
parsed.algebra,
|
||||
triples_client,
|
||||
user=flow.workspace,
|
||||
workspace=flow.workspace,
|
||||
collection=request.collection or "default",
|
||||
limit=request.limit or 10000,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -178,24 +178,24 @@ class Processor(TriplesQueryService):
|
|||
self.cassandra_password = password
|
||||
self.table = None
|
||||
|
||||
def ensure_connection(self, user):
|
||||
def ensure_connection(self, workspace):
|
||||
"""Ensure we have a connection to the correct keyspace."""
|
||||
if user != self.table:
|
||||
if workspace != self.table:
|
||||
KGClass = EntityCentricKnowledgeGraph
|
||||
|
||||
if self.cassandra_username and self.cassandra_password:
|
||||
self.tg = KGClass(
|
||||
hosts=self.cassandra_host,
|
||||
keyspace=user,
|
||||
keyspace=workspace,
|
||||
username=self.cassandra_username,
|
||||
password=self.cassandra_password
|
||||
)
|
||||
else:
|
||||
self.tg = KGClass(
|
||||
hosts=self.cassandra_host,
|
||||
keyspace=user,
|
||||
keyspace=workspace,
|
||||
)
|
||||
self.table = user
|
||||
self.table = workspace
|
||||
|
||||
async def query_triples(self, workspace, query):
|
||||
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ class Processor(TriplesQueryService):
|
|||
|
||||
try:
|
||||
|
||||
user = workspace
|
||||
workspace = workspace
|
||||
collection = query.collection if query.collection else "default"
|
||||
|
||||
triples = []
|
||||
|
|
@ -79,13 +79,13 @@ class Processor(TriplesQueryService):
|
|||
# SPO
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN $src as src "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), rel=get_term_value(query.p), value=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -93,13 +93,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((get_term_value(query.s), get_term_value(query.p), get_term_value(query.o)))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, user: $user, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN $src as src "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), rel=get_term_value(query.p), uri=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -111,13 +111,13 @@ class Processor(TriplesQueryService):
|
|||
# SP
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN dest.value as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), rel=get_term_value(query.p),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -126,13 +126,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((get_term_value(query.s), get_term_value(query.p), data["dest"]))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, user: $user, collection: $collection}]->"
|
||||
"(dest:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN dest.uri as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), rel=get_term_value(query.p),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -147,13 +147,13 @@ class Processor(TriplesQueryService):
|
|||
# SO
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN rel.uri as rel "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), value=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -162,13 +162,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((get_term_value(query.s), data["rel"], get_term_value(query.o)))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN rel.uri as rel "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), uri=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -181,13 +181,13 @@ class Processor(TriplesQueryService):
|
|||
# S
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN rel.uri as rel, dest.value as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -196,13 +196,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((get_term_value(query.s), data["rel"], data["dest"]))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN rel.uri as rel, dest.uri as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -220,13 +220,13 @@ class Processor(TriplesQueryService):
|
|||
# PO
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.p), value=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -235,13 +235,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((data["src"], get_term_value(query.p), get_term_value(query.o)))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, user: $user, collection: $collection}]->"
|
||||
"(dest:Node {uri: $dest, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {uri: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.p), dest=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -254,13 +254,13 @@ class Processor(TriplesQueryService):
|
|||
# P
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, dest.value as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.p),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -269,13 +269,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((data["src"], get_term_value(query.p), data["dest"]))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, user: $user, collection: $collection}]->"
|
||||
"(dest:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, dest.uri as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.p),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -290,13 +290,13 @@ class Processor(TriplesQueryService):
|
|||
# O
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, rel.uri as rel "
|
||||
"LIMIT " + str(query.limit),
|
||||
value=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -305,13 +305,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((data["src"], data["rel"], get_term_value(query.o)))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, rel.uri as rel "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -324,12 +324,12 @@ class Processor(TriplesQueryService):
|
|||
# *
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, rel.uri as rel, dest.value as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -338,12 +338,12 @@ class Processor(TriplesQueryService):
|
|||
triples.append((data["src"], data["rel"], data["dest"]))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, rel.uri as rel, dest.uri as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -67,9 +67,8 @@ class Processor(TriplesQueryService):
|
|||
|
||||
try:
|
||||
|
||||
user = workspace
|
||||
collection = query.collection if query.collection else "default"
|
||||
|
||||
|
||||
triples = []
|
||||
|
||||
if query.s is not None:
|
||||
|
|
@ -79,13 +78,13 @@ class Processor(TriplesQueryService):
|
|||
# SPO
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN $src as src "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), rel=get_term_value(query.p), value=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -93,13 +92,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((get_term_value(query.s), get_term_value(query.p), get_term_value(query.o)))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, user: $user, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN $src as src "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), rel=get_term_value(query.p), uri=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -111,13 +110,13 @@ class Processor(TriplesQueryService):
|
|||
# SP
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN dest.value as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), rel=get_term_value(query.p),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -126,13 +125,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((get_term_value(query.s), get_term_value(query.p), data["dest"]))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, user: $user, collection: $collection}]->"
|
||||
"(dest:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $rel, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN dest.uri as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), rel=get_term_value(query.p),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -147,13 +146,13 @@ class Processor(TriplesQueryService):
|
|||
# SO
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN rel.uri as rel "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), value=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -162,13 +161,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((get_term_value(query.s), data["rel"], get_term_value(query.o)))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN rel.uri as rel "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s), uri=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -181,13 +180,13 @@ class Processor(TriplesQueryService):
|
|||
# S
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN rel.uri as rel, dest.value as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -196,13 +195,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((get_term_value(query.s), data["rel"], data["dest"]))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN rel.uri as rel, dest.uri as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
src=get_term_value(query.s),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -220,13 +219,13 @@ class Processor(TriplesQueryService):
|
|||
# PO
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.p), value=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -235,13 +234,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((data["src"], get_term_value(query.p), get_term_value(query.o)))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, user: $user, collection: $collection}]->"
|
||||
"(dest:Node {uri: $dest, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {uri: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.p), dest=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -254,13 +253,13 @@ class Processor(TriplesQueryService):
|
|||
# P
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, dest.value as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.p),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -269,13 +268,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((data["src"], get_term_value(query.p), data["dest"]))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, user: $user, collection: $collection}]->"
|
||||
"(dest:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, dest.uri as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.p),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -290,13 +289,13 @@ class Processor(TriplesQueryService):
|
|||
# O
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {value: $value, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, rel.uri as rel "
|
||||
"LIMIT " + str(query.limit),
|
||||
value=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -305,13 +304,13 @@ class Processor(TriplesQueryService):
|
|||
triples.append((data["src"], data["rel"], get_term_value(query.o)))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {uri: $uri, workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, rel.uri as rel "
|
||||
"LIMIT " + str(query.limit),
|
||||
uri=get_term_value(query.o),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -324,12 +323,12 @@ class Processor(TriplesQueryService):
|
|||
# *
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, rel.uri as rel, dest.value as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -338,12 +337,12 @@ class Processor(TriplesQueryService):
|
|||
triples.append((data["src"], data["rel"], data["dest"]))
|
||||
|
||||
records, summary, keys = self.io.execute_query(
|
||||
"MATCH (src:Node {user: $user, collection: $collection})-"
|
||||
"[rel:Rel {user: $user, collection: $collection}]->"
|
||||
"(dest:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (src:Node {workspace: $workspace, collection: $collection})-"
|
||||
"[rel:Rel {workspace: $workspace, collection: $collection}]->"
|
||||
"(dest:Node {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN src.uri as src, rel.uri as rel, dest.uri as dest "
|
||||
"LIMIT " + str(query.limit),
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
)
|
||||
|
||||
|
|
@ -366,7 +365,7 @@ class Processor(TriplesQueryService):
|
|||
|
||||
logger.error(f"Exception querying triples: {e}", exc_info=True)
|
||||
raise e
|
||||
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
|
|
|
|||
|
|
@ -60,27 +60,27 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
|
|||
help=f'Milvus store URI (default: {default_store_uri})'
|
||||
)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""
|
||||
Create collection via config push - collections are created lazily on first write
|
||||
with the correct dimension determined from the actual embeddings.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
|
||||
self.vecstore.create_collection(user, collection)
|
||||
logger.info(f"Collection create request for {workspace}/{collection} - will be created lazily on first write")
|
||||
self.vecstore.create_collection(workspace, collection)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete the collection for document embeddings via config push"""
|
||||
try:
|
||||
self.vecstore.delete_collection(user, collection)
|
||||
logger.info(f"Successfully deleted collection {user}/{collection}")
|
||||
self.vecstore.delete_collection(workspace, collection)
|
||||
logger.info(f"Successfully deleted collection {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def run():
|
||||
|
|
|
|||
|
|
@ -165,22 +165,22 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
|
|||
help=f'Pinecone region, (default: {default_region}'
|
||||
)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""
|
||||
Create collection via config push - indexes are created lazily on first write
|
||||
with the correct dimension determined from the actual embeddings.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
|
||||
logger.info(f"Collection create request for {workspace}/{collection} - will be created lazily on first write")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete the collection for document embeddings via config push"""
|
||||
try:
|
||||
prefix = f"d-{user}-{collection}-"
|
||||
prefix = f"d-{workspace}-{collection}-"
|
||||
|
||||
# Get all indexes and filter for matches
|
||||
all_indexes = self.pinecone.list_indexes()
|
||||
|
|
@ -195,10 +195,10 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
|
|||
for index_name in matching_indexes:
|
||||
self.pinecone.delete_index(index_name)
|
||||
logger.info(f"Deleted Pinecone index: {index_name}")
|
||||
logger.info(f"Deleted {len(matching_indexes)} index(es) for {user}/{collection}")
|
||||
logger.info(f"Deleted {len(matching_indexes)} index(es) for {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def run():
|
||||
|
|
|
|||
|
|
@ -107,22 +107,22 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
|
|||
help=f'Qdrant API key (default: None)'
|
||||
)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""
|
||||
Create collection via config push - collections are created lazily on first write
|
||||
with the correct dimension determined from the actual embeddings.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
|
||||
logger.info(f"Collection create request for {workspace}/{collection} - will be created lazily on first write")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete the collection for document embeddings via config push"""
|
||||
try:
|
||||
prefix = f"d_{user}_{collection}_"
|
||||
prefix = f"d_{workspace}_{collection}_"
|
||||
|
||||
# Get all collections and filter for matches
|
||||
all_collections = self.qdrant.get_collections().collections
|
||||
|
|
@ -137,10 +137,10 @@ class Processor(CollectionConfigHandler, DocumentEmbeddingsStoreService):
|
|||
for collection_name in matching_collections:
|
||||
self.qdrant.delete_collection(collection_name)
|
||||
logger.info(f"Deleted Qdrant collection: {collection_name}")
|
||||
logger.info(f"Deleted {len(matching_collections)} collection(s) for {user}/{collection}")
|
||||
logger.info(f"Deleted {len(matching_collections)} collection(s) for {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def run():
|
||||
|
|
|
|||
|
|
@ -73,27 +73,27 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
|
|||
help=f'Milvus store URI (default: {default_store_uri})'
|
||||
)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""
|
||||
Create collection via config push - collections are created lazily on first write
|
||||
with the correct dimension determined from the actual embeddings.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
|
||||
self.vecstore.create_collection(user, collection)
|
||||
logger.info(f"Collection create request for {workspace}/{collection} - will be created lazily on first write")
|
||||
self.vecstore.create_collection(workspace, collection)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete the collection for graph embeddings via config push"""
|
||||
try:
|
||||
self.vecstore.delete_collection(user, collection)
|
||||
logger.info(f"Successfully deleted collection {user}/{collection}")
|
||||
self.vecstore.delete_collection(workspace, collection)
|
||||
logger.info(f"Successfully deleted collection {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def run():
|
||||
|
|
|
|||
|
|
@ -183,22 +183,22 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
|
|||
help=f'Pinecone region, (default: {default_region}'
|
||||
)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""
|
||||
Create collection via config push - indexes are created lazily on first write
|
||||
with the correct dimension determined from the actual embeddings.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
|
||||
logger.info(f"Collection create request for {workspace}/{collection} - will be created lazily on first write")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete the collection for graph embeddings via config push"""
|
||||
try:
|
||||
prefix = f"t-{user}-{collection}-"
|
||||
prefix = f"t-{workspace}-{collection}-"
|
||||
|
||||
# Get all indexes and filter for matches
|
||||
all_indexes = self.pinecone.list_indexes()
|
||||
|
|
@ -213,10 +213,10 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
|
|||
for index_name in matching_indexes:
|
||||
self.pinecone.delete_index(index_name)
|
||||
logger.info(f"Deleted Pinecone index: {index_name}")
|
||||
logger.info(f"Deleted {len(matching_indexes)} index(es) for {user}/{collection}")
|
||||
logger.info(f"Deleted {len(matching_indexes)} index(es) for {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def run():
|
||||
|
|
|
|||
|
|
@ -126,22 +126,22 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
|
|||
help=f'Qdrant API key'
|
||||
)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""
|
||||
Create collection via config push - collections are created lazily on first write
|
||||
with the correct dimension determined from the actual embeddings.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Collection create request for {user}/{collection} - will be created lazily on first write")
|
||||
logger.info(f"Collection create request for {workspace}/{collection} - will be created lazily on first write")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete the collection for graph embeddings via config push"""
|
||||
try:
|
||||
prefix = f"t_{user}_{collection}_"
|
||||
prefix = f"t_{workspace}_{collection}_"
|
||||
|
||||
# Get all collections and filter for matches
|
||||
all_collections = self.qdrant.get_collections().collections
|
||||
|
|
@ -156,10 +156,10 @@ class Processor(CollectionConfigHandler, GraphEmbeddingsStoreService):
|
|||
for collection_name in matching_collections:
|
||||
self.qdrant.delete_collection(collection_name)
|
||||
logger.info(f"Deleted Qdrant collection: {collection_name}")
|
||||
logger.info(f"Deleted {len(matching_collections)} collection(s) for {user}/{collection}")
|
||||
logger.info(f"Deleted {len(matching_collections)} collection(s) for {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def run():
|
||||
|
|
|
|||
|
|
@ -2,13 +2,13 @@
|
|||
Row embeddings writer for Qdrant (Stage 2).
|
||||
|
||||
Consumes RowEmbeddings messages (which already contain computed vectors)
|
||||
and writes them to Qdrant. One Qdrant collection per (user, collection, schema_name) pair.
|
||||
and writes them to Qdrant. One Qdrant collection per (workspace, collection, schema_name) pair.
|
||||
|
||||
This follows the two-stage pattern used by graph-embeddings and document-embeddings:
|
||||
Stage 1 (row-embeddings): Compute embeddings
|
||||
Stage 2 (this processor): Store embeddings
|
||||
|
||||
Collection naming: rows_{user}_{collection}_{schema_name}_{dimension}
|
||||
Collection naming: rows_{workspace}_{collection}_{schema_name}_{dimension}
|
||||
|
||||
Payload structure:
|
||||
- index_name: The indexed field(s) this embedding represents
|
||||
|
|
@ -77,10 +77,10 @@ class Processor(CollectionConfigHandler, FlowProcessor):
|
|||
return safe_name.lower()
|
||||
|
||||
def get_collection_name(
|
||||
self, user: str, collection: str, schema_name: str, dimension: int
|
||||
self, workspace: str, collection: str, schema_name: str, dimension: int
|
||||
) -> str:
|
||||
"""Generate Qdrant collection name"""
|
||||
safe_user = self.sanitize_name(user)
|
||||
safe_user = self.sanitize_name(workspace)
|
||||
safe_collection = self.sanitize_name(collection)
|
||||
safe_schema = self.sanitize_name(schema_name)
|
||||
return f"rows_{safe_user}_{safe_collection}_{safe_schema}_{dimension}"
|
||||
|
|
@ -169,17 +169,17 @@ class Processor(CollectionConfigHandler, FlowProcessor):
|
|||
|
||||
logger.info(f"Wrote {embeddings_written} embeddings to Qdrant")
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""Collection creation via config push - collections created lazily on first write"""
|
||||
logger.info(
|
||||
f"Row embeddings collection create request for {user}/{collection} - "
|
||||
f"Row embeddings collection create request for {workspace}/{collection} - "
|
||||
f"will be created lazily on first write"
|
||||
)
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
"""Delete all Qdrant collections for a given user/collection"""
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete all Qdrant collections for a given workspace/collection"""
|
||||
try:
|
||||
prefix = f"rows_{self.sanitize_name(user)}_{self.sanitize_name(collection)}_"
|
||||
prefix = f"rows_{self.sanitize_name(workspace)}_{self.sanitize_name(collection)}_"
|
||||
|
||||
# Get all collections and filter for matches
|
||||
all_collections = self.qdrant.get_collections().collections
|
||||
|
|
@ -197,23 +197,23 @@ class Processor(CollectionConfigHandler, FlowProcessor):
|
|||
logger.info(f"Deleted Qdrant collection: {collection_name}")
|
||||
logger.info(
|
||||
f"Deleted {len(matching_collections)} collection(s) "
|
||||
f"for {user}/{collection}"
|
||||
f"for {workspace}/{collection}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to delete collection {user}/{collection}: {e}",
|
||||
f"Failed to delete collection {workspace}/{collection}: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
raise
|
||||
|
||||
async def delete_collection_schema(
|
||||
self, user: str, collection: str, schema_name: str
|
||||
self, workspace: str, collection: str, schema_name: str
|
||||
):
|
||||
"""Delete Qdrant collection for a specific user/collection/schema"""
|
||||
"""Delete Qdrant collection for a specific workspace/collection/schema"""
|
||||
try:
|
||||
prefix = (
|
||||
f"rows_{self.sanitize_name(user)}_"
|
||||
f"rows_{self.sanitize_name(workspace)}_"
|
||||
f"{self.sanitize_name(collection)}_{self.sanitize_name(schema_name)}_"
|
||||
)
|
||||
|
||||
|
|
@ -234,7 +234,7 @@ class Processor(CollectionConfigHandler, FlowProcessor):
|
|||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to delete collection {user}/{collection}/{schema_name}: {e}",
|
||||
f"Failed to delete collection {workspace}/{collection}/{schema_name}: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
raise
|
||||
|
|
|
|||
|
|
@ -459,25 +459,25 @@ class Processor(CollectionConfigHandler, FlowProcessor):
|
|||
f"({len(index_names)} indexes per row)"
|
||||
)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""Create/verify collection exists in Cassandra row store"""
|
||||
# Connect if not already connected (sync, push to thread)
|
||||
await asyncio.to_thread(self.connect_cassandra)
|
||||
|
||||
# Ensure tables exist (sync DDL, push to thread)
|
||||
await asyncio.to_thread(self.ensure_tables, user)
|
||||
await asyncio.to_thread(self.ensure_tables, workspace)
|
||||
|
||||
logger.info(f"Collection {collection} ready for user {user}")
|
||||
logger.info(f"Collection {collection} ready for workspace {workspace}")
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete all data for a specific collection using partition tracking"""
|
||||
# Connect if not already connected
|
||||
await asyncio.to_thread(self.connect_cassandra)
|
||||
|
||||
safe_keyspace = self.sanitize_name(user)
|
||||
safe_keyspace = self.sanitize_name(workspace)
|
||||
|
||||
# Check if keyspace exists
|
||||
if user not in self.known_keyspaces:
|
||||
if workspace not in self.known_keyspaces:
|
||||
check_keyspace_cql = """
|
||||
SELECT keyspace_name FROM system_schema.keyspaces
|
||||
WHERE keyspace_name = %s
|
||||
|
|
@ -488,7 +488,7 @@ class Processor(CollectionConfigHandler, FlowProcessor):
|
|||
if not result:
|
||||
logger.info(f"Keyspace {safe_keyspace} does not exist, nothing to delete")
|
||||
return
|
||||
self.known_keyspaces.add(user)
|
||||
self.known_keyspaces.add(workspace)
|
||||
|
||||
# Discover all partitions for this collection
|
||||
select_partitions_cql = f"""
|
||||
|
|
@ -551,12 +551,12 @@ class Processor(CollectionConfigHandler, FlowProcessor):
|
|||
f"from keyspace {safe_keyspace}"
|
||||
)
|
||||
|
||||
async def delete_collection_schema(self, user: str, collection: str, schema_name: str):
|
||||
async def delete_collection_schema(self, workspace: str, collection: str, schema_name: str):
|
||||
"""Delete all data for a specific collection + schema combination"""
|
||||
# Connect if not already connected
|
||||
await asyncio.to_thread(self.connect_cassandra)
|
||||
|
||||
safe_keyspace = self.sanitize_name(user)
|
||||
safe_keyspace = self.sanitize_name(workspace)
|
||||
|
||||
# Discover partitions for this collection + schema
|
||||
select_partitions_cql = f"""
|
||||
|
|
|
|||
|
|
@ -210,12 +210,12 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
|
||||
await asyncio.to_thread(_do_store)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""Create a collection in Cassandra triple store via config push"""
|
||||
|
||||
def _do_create():
|
||||
# Create or reuse connection for this user's keyspace
|
||||
if self.table is None or self.table != user:
|
||||
# Create or reuse connection for this workspace's keyspace
|
||||
if self.table is None or self.table != workspace:
|
||||
self.tg = None
|
||||
|
||||
# Use factory function to select implementation
|
||||
|
|
@ -225,23 +225,23 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
if self.cassandra_username and self.cassandra_password:
|
||||
self.tg = KGClass(
|
||||
hosts=self.cassandra_host,
|
||||
keyspace=user,
|
||||
keyspace=workspace,
|
||||
username=self.cassandra_username,
|
||||
password=self.cassandra_password,
|
||||
)
|
||||
else:
|
||||
self.tg = KGClass(
|
||||
hosts=self.cassandra_host,
|
||||
keyspace=user,
|
||||
keyspace=workspace,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Cassandra for user {user}: {e}")
|
||||
logger.error(f"Failed to connect to Cassandra for workspace {workspace}: {e}")
|
||||
raise
|
||||
|
||||
self.table = user
|
||||
self.table = workspace
|
||||
|
||||
# Create collection using the built-in method
|
||||
logger.info(f"Creating collection {collection} for user {user}")
|
||||
logger.info(f"Creating collection {collection} for workspace {workspace}")
|
||||
|
||||
if self.tg.collection_exists(collection):
|
||||
logger.info(f"Collection {collection} already exists")
|
||||
|
|
@ -252,15 +252,15 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
try:
|
||||
await asyncio.to_thread(_do_create)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete all data for a specific collection from the unified triples table"""
|
||||
|
||||
def _do_delete():
|
||||
# Create or reuse connection for this user's keyspace
|
||||
if self.table is None or self.table != user:
|
||||
# Create or reuse connection for this workspace's keyspace
|
||||
if self.table is None or self.table != workspace:
|
||||
self.tg = None
|
||||
|
||||
# Use factory function to select implementation
|
||||
|
|
@ -270,29 +270,29 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
if self.cassandra_username and self.cassandra_password:
|
||||
self.tg = KGClass(
|
||||
hosts=self.cassandra_host,
|
||||
keyspace=user,
|
||||
keyspace=workspace,
|
||||
username=self.cassandra_username,
|
||||
password=self.cassandra_password,
|
||||
)
|
||||
else:
|
||||
self.tg = KGClass(
|
||||
hosts=self.cassandra_host,
|
||||
keyspace=user,
|
||||
keyspace=workspace,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Cassandra for user {user}: {e}")
|
||||
logger.error(f"Failed to connect to Cassandra for workspace {workspace}: {e}")
|
||||
raise
|
||||
|
||||
self.table = user
|
||||
self.table = workspace
|
||||
|
||||
# Delete all triples for this collection using the built-in method
|
||||
self.tg.delete_collection(collection)
|
||||
logger.info(f"Deleted all triples for collection {collection} from keyspace {user}")
|
||||
logger.info(f"Deleted all triples for collection {collection} from keyspace {workspace}")
|
||||
|
||||
try:
|
||||
await asyncio.to_thread(_do_delete)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
|
|
|
|||
|
|
@ -59,15 +59,15 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
# Register for config push notifications
|
||||
self.register_config_handler(self.on_collection_config, types=["collection"])
|
||||
|
||||
def create_node(self, uri, user, collection):
|
||||
def create_node(self, uri, workspace, collection):
|
||||
|
||||
logger.debug(f"Create node {uri} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create node {uri} for workspace={workspace}, collection={collection}")
|
||||
|
||||
res = self.io.query(
|
||||
"MERGE (n:Node {uri: $uri, user: $user, collection: $collection})",
|
||||
"MERGE (n:Node {uri: $uri, workspace: $workspace, collection: $collection})",
|
||||
params={
|
||||
"uri": uri,
|
||||
"user": user,
|
||||
"workspace": workspace,
|
||||
"collection": collection,
|
||||
},
|
||||
)
|
||||
|
|
@ -77,15 +77,15 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=res.run_time_ms
|
||||
))
|
||||
|
||||
def create_literal(self, value, user, collection):
|
||||
def create_literal(self, value, workspace, collection):
|
||||
|
||||
logger.debug(f"Create literal {value} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create literal {value} for workspace={workspace}, collection={collection}")
|
||||
|
||||
res = self.io.query(
|
||||
"MERGE (n:Literal {value: $value, user: $user, collection: $collection})",
|
||||
"MERGE (n:Literal {value: $value, workspace: $workspace, collection: $collection})",
|
||||
params={
|
||||
"value": value,
|
||||
"user": user,
|
||||
"workspace": workspace,
|
||||
"collection": collection,
|
||||
},
|
||||
)
|
||||
|
|
@ -95,19 +95,19 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=res.run_time_ms
|
||||
))
|
||||
|
||||
def relate_node(self, src, uri, dest, user, collection):
|
||||
def relate_node(self, src, uri, dest, workspace, collection):
|
||||
|
||||
logger.debug(f"Create node rel {src} {uri} {dest} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create node rel {src} {uri} {dest} for workspace={workspace}, collection={collection}")
|
||||
|
||||
res = self.io.query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
|
||||
"MATCH (dest:Node {uri: $dest, user: $user, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection}) "
|
||||
"MATCH (dest:Node {uri: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->(dest)",
|
||||
params={
|
||||
"src": src,
|
||||
"dest": dest,
|
||||
"uri": uri,
|
||||
"user": user,
|
||||
"workspace": workspace,
|
||||
"collection": collection,
|
||||
},
|
||||
)
|
||||
|
|
@ -117,19 +117,19 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=res.run_time_ms
|
||||
))
|
||||
|
||||
def relate_literal(self, src, uri, dest, user, collection):
|
||||
def relate_literal(self, src, uri, dest, workspace, collection):
|
||||
|
||||
logger.debug(f"Create literal rel {src} {uri} {dest} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create literal rel {src} {uri} {dest} for workspace={workspace}, collection={collection}")
|
||||
|
||||
res = self.io.query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
|
||||
"MATCH (dest:Literal {value: $dest, user: $user, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection}) "
|
||||
"MATCH (dest:Literal {value: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->(dest)",
|
||||
params={
|
||||
"src": src,
|
||||
"dest": dest,
|
||||
"uri": uri,
|
||||
"user": user,
|
||||
"workspace": workspace,
|
||||
"collection": collection,
|
||||
},
|
||||
)
|
||||
|
|
@ -139,28 +139,28 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=res.run_time_ms
|
||||
))
|
||||
|
||||
def collection_exists(self, user, collection):
|
||||
def collection_exists(self, workspace, collection):
|
||||
"""Check if collection metadata node exists"""
|
||||
result = self.io.query(
|
||||
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MATCH (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN c LIMIT 1",
|
||||
params={"user": user, "collection": collection}
|
||||
params={"workspace": workspace, "collection": collection}
|
||||
)
|
||||
return result.result_set is not None and len(result.result_set) > 0
|
||||
|
||||
def create_collection(self, user, collection):
|
||||
def create_collection(self, workspace, collection):
|
||||
"""Create collection metadata node"""
|
||||
import datetime
|
||||
self.io.query(
|
||||
"MERGE (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MERGE (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"SET c.created_at = $created_at",
|
||||
params={
|
||||
"user": user,
|
||||
"workspace": workspace,
|
||||
"collection": collection,
|
||||
"created_at": datetime.datetime.now().isoformat()
|
||||
}
|
||||
)
|
||||
logger.info(f"Created collection metadata node for {user}/{collection}")
|
||||
logger.info(f"Created collection metadata node for {workspace}/{collection}")
|
||||
|
||||
async def store_triples(self, workspace, message):
|
||||
collection = message.metadata.collection if message.metadata.collection else "default"
|
||||
|
|
@ -206,58 +206,58 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
help=f'FalkorDB database (default: {default_database})'
|
||||
)
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""Create collection metadata in FalkorDB via config push"""
|
||||
try:
|
||||
# Check if collection exists
|
||||
result = self.io.query(
|
||||
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) RETURN c LIMIT 1",
|
||||
params={"user": user, "collection": collection}
|
||||
"MATCH (c:CollectionMetadata {workspace: $workspace, collection: $collection}) RETURN c LIMIT 1",
|
||||
params={"workspace": workspace, "collection": collection}
|
||||
)
|
||||
if result.result_set:
|
||||
logger.info(f"Collection {user}/{collection} already exists")
|
||||
logger.info(f"Collection {workspace}/{collection} already exists")
|
||||
else:
|
||||
# Create collection metadata node
|
||||
import datetime
|
||||
self.io.query(
|
||||
"MERGE (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MERGE (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"SET c.created_at = $created_at",
|
||||
params={
|
||||
"user": user,
|
||||
"workspace": workspace,
|
||||
"collection": collection,
|
||||
"created_at": datetime.datetime.now().isoformat()
|
||||
}
|
||||
)
|
||||
logger.info(f"Created collection {user}/{collection}")
|
||||
logger.info(f"Created collection {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete the collection for FalkorDB triples via config push"""
|
||||
try:
|
||||
# Delete all nodes and literals for this user/collection
|
||||
# Delete all nodes and literals for this workspace/collection
|
||||
node_result = self.io.query(
|
||||
"MATCH (n:Node {user: $user, collection: $collection}) DETACH DELETE n",
|
||||
params={"user": user, "collection": collection}
|
||||
"MATCH (n:Node {workspace: $workspace, collection: $collection}) DETACH DELETE n",
|
||||
params={"workspace": workspace, "collection": collection}
|
||||
)
|
||||
|
||||
literal_result = self.io.query(
|
||||
"MATCH (n:Literal {user: $user, collection: $collection}) DETACH DELETE n",
|
||||
params={"user": user, "collection": collection}
|
||||
"MATCH (n:Literal {workspace: $workspace, collection: $collection}) DETACH DELETE n",
|
||||
params={"workspace": workspace, "collection": collection}
|
||||
)
|
||||
|
||||
# Delete collection metadata node
|
||||
metadata_result = self.io.query(
|
||||
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) DELETE c",
|
||||
params={"user": user, "collection": collection}
|
||||
"MATCH (c:CollectionMetadata {workspace: $workspace, collection: $collection}) DELETE c",
|
||||
params={"workspace": workspace, "collection": collection}
|
||||
)
|
||||
|
||||
logger.info(f"Deleted {node_result.nodes_deleted} nodes, {literal_result.nodes_deleted} literals, and {metadata_result.nodes_deleted} metadata nodes for collection {user}/{collection}")
|
||||
logger.info(f"Deleted {node_result.nodes_deleted} nodes, {literal_result.nodes_deleted} literals, and {metadata_result.nodes_deleted} metadata nodes for collection {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def run():
|
||||
|
|
|
|||
|
|
@ -117,10 +117,10 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
# Maybe index already exists
|
||||
logger.warning("Index create failure ignored")
|
||||
|
||||
# New indexes for user/collection filtering
|
||||
# New indexes for workspace/collection filtering
|
||||
try:
|
||||
session.run(
|
||||
"CREATE INDEX ON :Node(user)"
|
||||
"CREATE INDEX ON :Node(workspace)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"User index create failure: {e}")
|
||||
|
|
@ -136,7 +136,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
|
||||
try:
|
||||
session.run(
|
||||
"CREATE INDEX ON :Literal(user)"
|
||||
"CREATE INDEX ON :Literal(workspace)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"User index create failure: {e}")
|
||||
|
|
@ -152,13 +152,13 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
|
||||
logger.info("Index creation done")
|
||||
|
||||
def create_node(self, uri, user, collection):
|
||||
def create_node(self, uri, workspace, collection):
|
||||
|
||||
logger.debug(f"Create node {uri} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create node {uri} for workspace={workspace}, collection={collection}")
|
||||
|
||||
summary = self.io.execute_query(
|
||||
"MERGE (n:Node {uri: $uri, user: $user, collection: $collection})",
|
||||
uri=uri, user=user, collection=collection,
|
||||
"MERGE (n:Node {uri: $uri, workspace: $workspace, collection: $collection})",
|
||||
uri=uri, workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
).summary
|
||||
|
||||
|
|
@ -167,13 +167,13 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=summary.result_available_after
|
||||
))
|
||||
|
||||
def create_literal(self, value, user, collection):
|
||||
def create_literal(self, value, workspace, collection):
|
||||
|
||||
logger.debug(f"Create literal {value} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create literal {value} for workspace={workspace}, collection={collection}")
|
||||
|
||||
summary = self.io.execute_query(
|
||||
"MERGE (n:Literal {value: $value, user: $user, collection: $collection})",
|
||||
value=value, user=user, collection=collection,
|
||||
"MERGE (n:Literal {value: $value, workspace: $workspace, collection: $collection})",
|
||||
value=value, workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
).summary
|
||||
|
||||
|
|
@ -182,15 +182,15 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=summary.result_available_after
|
||||
))
|
||||
|
||||
def relate_node(self, src, uri, dest, user, collection):
|
||||
def relate_node(self, src, uri, dest, workspace, collection):
|
||||
|
||||
logger.debug(f"Create node rel {src} {uri} {dest} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create node rel {src} {uri} {dest} for workspace={workspace}, collection={collection}")
|
||||
|
||||
summary = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
|
||||
"MATCH (dest:Node {uri: $dest, user: $user, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
|
||||
src=src, dest=dest, uri=uri, user=user, collection=collection,
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection}) "
|
||||
"MATCH (dest:Node {uri: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->(dest)",
|
||||
src=src, dest=dest, uri=uri, workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
).summary
|
||||
|
||||
|
|
@ -199,15 +199,15 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=summary.result_available_after
|
||||
))
|
||||
|
||||
def relate_literal(self, src, uri, dest, user, collection):
|
||||
def relate_literal(self, src, uri, dest, workspace, collection):
|
||||
|
||||
logger.debug(f"Create literal rel {src} {uri} {dest} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create literal rel {src} {uri} {dest} for workspace={workspace}, collection={collection}")
|
||||
|
||||
summary = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
|
||||
"MATCH (dest:Literal {value: $dest, user: $user, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
|
||||
src=src, dest=dest, uri=uri, user=user, collection=collection,
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection}) "
|
||||
"MATCH (dest:Literal {value: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->(dest)",
|
||||
src=src, dest=dest, uri=uri, workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
).summary
|
||||
|
||||
|
|
@ -216,7 +216,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=summary.result_available_after
|
||||
))
|
||||
|
||||
def create_triple(self, tx, t, user, collection):
|
||||
def create_triple(self, tx, t, workspace, collection):
|
||||
|
||||
s_val = get_term_value(t.s)
|
||||
p_val = get_term_value(t.p)
|
||||
|
|
@ -224,38 +224,38 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
|
||||
# Create new s node with given uri, if not exists
|
||||
result = tx.run(
|
||||
"MERGE (n:Node {uri: $uri, user: $user, collection: $collection})",
|
||||
uri=s_val, user=user, collection=collection
|
||||
"MERGE (n:Node {uri: $uri, workspace: $workspace, collection: $collection})",
|
||||
uri=s_val, workspace=workspace, collection=collection
|
||||
)
|
||||
|
||||
if t.o.type == IRI:
|
||||
|
||||
# Create new o node with given uri, if not exists
|
||||
result = tx.run(
|
||||
"MERGE (n:Node {uri: $uri, user: $user, collection: $collection})",
|
||||
uri=o_val, user=user, collection=collection
|
||||
"MERGE (n:Node {uri: $uri, workspace: $workspace, collection: $collection})",
|
||||
uri=o_val, workspace=workspace, collection=collection
|
||||
)
|
||||
|
||||
result = tx.run(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
|
||||
"MATCH (dest:Node {uri: $dest, user: $user, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
|
||||
src=s_val, dest=o_val, uri=p_val, user=user, collection=collection,
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection}) "
|
||||
"MATCH (dest:Node {uri: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->(dest)",
|
||||
src=s_val, dest=o_val, uri=p_val, workspace=workspace, collection=collection,
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
# Create new o literal with given uri, if not exists
|
||||
result = tx.run(
|
||||
"MERGE (n:Literal {value: $value, user: $user, collection: $collection})",
|
||||
value=o_val, user=user, collection=collection
|
||||
"MERGE (n:Literal {value: $value, workspace: $workspace, collection: $collection})",
|
||||
value=o_val, workspace=workspace, collection=collection
|
||||
)
|
||||
|
||||
result = tx.run(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
|
||||
"MATCH (dest:Literal {value: $dest, user: $user, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
|
||||
src=s_val, dest=o_val, uri=p_val, user=user, collection=collection,
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection}) "
|
||||
"MATCH (dest:Literal {value: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->(dest)",
|
||||
src=s_val, dest=o_val, uri=p_val, workspace=workspace, collection=collection,
|
||||
)
|
||||
|
||||
async def store_triples(self, workspace, message):
|
||||
|
|
@ -288,7 +288,7 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
|
||||
# Alternative implementation using transactions
|
||||
# with self.io.session(database=self.db) as session:
|
||||
# session.execute_write(self.create_triple, t, user, collection)
|
||||
# session.execute_write(self.create_triple, t, workspace, collection)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
|
@ -319,72 +319,72 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
help=f'Memgraph database (default: {default_database})'
|
||||
)
|
||||
|
||||
def _collection_exists_in_db(self, user, collection):
|
||||
def _collection_exists_in_db(self, workspace, collection):
|
||||
"""Check if collection metadata node exists"""
|
||||
with self.io.session(database=self.db) as session:
|
||||
result = session.run(
|
||||
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MATCH (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN c LIMIT 1",
|
||||
user=user, collection=collection
|
||||
workspace=workspace, collection=collection
|
||||
)
|
||||
return bool(list(result))
|
||||
|
||||
def _create_collection_in_db(self, user, collection):
|
||||
def _create_collection_in_db(self, workspace, collection):
|
||||
"""Create collection metadata node"""
|
||||
import datetime
|
||||
with self.io.session(database=self.db) as session:
|
||||
session.run(
|
||||
"MERGE (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MERGE (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"SET c.created_at = $created_at",
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
created_at=datetime.datetime.now().isoformat()
|
||||
)
|
||||
logger.info(f"Created collection metadata node for {user}/{collection}")
|
||||
logger.info(f"Created collection metadata node for {workspace}/{collection}")
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""Create collection metadata in Memgraph via config push"""
|
||||
try:
|
||||
if self._collection_exists_in_db(user, collection):
|
||||
logger.info(f"Collection {user}/{collection} already exists")
|
||||
if self._collection_exists_in_db(workspace, collection):
|
||||
logger.info(f"Collection {workspace}/{collection} already exists")
|
||||
else:
|
||||
self._create_collection_in_db(user, collection)
|
||||
logger.info(f"Created collection {user}/{collection}")
|
||||
self._create_collection_in_db(workspace, collection)
|
||||
logger.info(f"Created collection {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete all data for a specific collection via config push"""
|
||||
try:
|
||||
with self.io.session(database=self.db) as session:
|
||||
# Delete all nodes for this user and collection
|
||||
# Delete all nodes for this workspace and collection
|
||||
node_result = session.run(
|
||||
"MATCH (n:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (n:Node {workspace: $workspace, collection: $collection}) "
|
||||
"DETACH DELETE n",
|
||||
user=user, collection=collection
|
||||
workspace=workspace, collection=collection
|
||||
)
|
||||
nodes_deleted = node_result.consume().counters.nodes_deleted
|
||||
|
||||
# Delete all literals for this user and collection
|
||||
# Delete all literals for this workspace and collection
|
||||
literal_result = session.run(
|
||||
"MATCH (n:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (n:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"DETACH DELETE n",
|
||||
user=user, collection=collection
|
||||
workspace=workspace, collection=collection
|
||||
)
|
||||
literals_deleted = literal_result.consume().counters.nodes_deleted
|
||||
|
||||
# Delete collection metadata node
|
||||
metadata_result = session.run(
|
||||
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MATCH (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"DELETE c",
|
||||
user=user, collection=collection
|
||||
workspace=workspace, collection=collection
|
||||
)
|
||||
metadata_deleted = metadata_result.consume().counters.nodes_deleted
|
||||
|
||||
# Note: Relationships are automatically deleted with DETACH DELETE
|
||||
|
||||
logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {user}/{collection}")
|
||||
logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection: {e}")
|
||||
|
|
|
|||
|
|
@ -80,14 +80,12 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
|
||||
logger.info("Create indexes...")
|
||||
|
||||
# Legacy indexes for backwards compatibility
|
||||
try:
|
||||
session.run(
|
||||
"CREATE INDEX Node_uri FOR (n:Node) ON (n.uri)",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Index create failure: {e}")
|
||||
# Maybe index already exists
|
||||
logger.warning("Index create failure ignored")
|
||||
|
||||
try:
|
||||
|
|
@ -96,7 +94,6 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Index create failure: {e}")
|
||||
# Maybe index already exists
|
||||
logger.warning("Index create failure ignored")
|
||||
|
||||
try:
|
||||
|
|
@ -105,13 +102,11 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Index create failure: {e}")
|
||||
# Maybe index already exists
|
||||
logger.warning("Index create failure ignored")
|
||||
|
||||
# New compound indexes for user/collection filtering
|
||||
try:
|
||||
session.run(
|
||||
"CREATE INDEX node_user_collection_uri FOR (n:Node) ON (n.user, n.collection, n.uri)",
|
||||
"CREATE INDEX node_workspace_collection_uri FOR (n:Node) ON (n.workspace, n.collection, n.uri)",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Compound index create failure: {e}")
|
||||
|
|
@ -119,17 +114,16 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
|
||||
try:
|
||||
session.run(
|
||||
"CREATE INDEX literal_user_collection_value FOR (n:Literal) ON (n.user, n.collection, n.value)",
|
||||
"CREATE INDEX literal_workspace_collection_value FOR (n:Literal) ON (n.workspace, n.collection, n.value)",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Compound index create failure: {e}")
|
||||
logger.warning("Index create failure ignored")
|
||||
|
||||
# Note: Neo4j doesn't support compound indexes on relationships in all versions
|
||||
# Try to create individual indexes on relationship properties
|
||||
# Neo4j doesn't support compound indexes on relationships in all versions
|
||||
try:
|
||||
session.run(
|
||||
"CREATE INDEX rel_user FOR ()-[r:Rel]-() ON (r.user)",
|
||||
"CREATE INDEX rel_workspace FOR ()-[r:Rel]-() ON (r.workspace)",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Relationship index create failure: {e}")
|
||||
|
|
@ -145,13 +139,13 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
|
||||
logger.info("Index creation done")
|
||||
|
||||
def create_node(self, uri, user, collection):
|
||||
def create_node(self, uri, workspace, collection):
|
||||
|
||||
logger.debug(f"Create node {uri} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create node {uri} for workspace={workspace}, collection={collection}")
|
||||
|
||||
summary = self.io.execute_query(
|
||||
"MERGE (n:Node {uri: $uri, user: $user, collection: $collection})",
|
||||
uri=uri, user=user, collection=collection,
|
||||
"MERGE (n:Node {uri: $uri, workspace: $workspace, collection: $collection})",
|
||||
uri=uri, workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
).summary
|
||||
|
||||
|
|
@ -160,13 +154,13 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=summary.result_available_after
|
||||
))
|
||||
|
||||
def create_literal(self, value, user, collection):
|
||||
def create_literal(self, value, workspace, collection):
|
||||
|
||||
logger.debug(f"Create literal {value} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create literal {value} for workspace={workspace}, collection={collection}")
|
||||
|
||||
summary = self.io.execute_query(
|
||||
"MERGE (n:Literal {value: $value, user: $user, collection: $collection})",
|
||||
value=value, user=user, collection=collection,
|
||||
"MERGE (n:Literal {value: $value, workspace: $workspace, collection: $collection})",
|
||||
value=value, workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
).summary
|
||||
|
||||
|
|
@ -175,15 +169,15 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=summary.result_available_after
|
||||
))
|
||||
|
||||
def relate_node(self, src, uri, dest, user, collection):
|
||||
def relate_node(self, src, uri, dest, workspace, collection):
|
||||
|
||||
logger.debug(f"Create node rel {src} {uri} {dest} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create node rel {src} {uri} {dest} for workspace={workspace}, collection={collection}")
|
||||
|
||||
summary = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
|
||||
"MATCH (dest:Node {uri: $dest, user: $user, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
|
||||
src=src, dest=dest, uri=uri, user=user, collection=collection,
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection}) "
|
||||
"MATCH (dest:Node {uri: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->(dest)",
|
||||
src=src, dest=dest, uri=uri, workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
).summary
|
||||
|
||||
|
|
@ -192,15 +186,15 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
time=summary.result_available_after
|
||||
))
|
||||
|
||||
def relate_literal(self, src, uri, dest, user, collection):
|
||||
def relate_literal(self, src, uri, dest, workspace, collection):
|
||||
|
||||
logger.debug(f"Create literal rel {src} {uri} {dest} for user={user}, collection={collection}")
|
||||
logger.debug(f"Create literal rel {src} {uri} {dest} for workspace={workspace}, collection={collection}")
|
||||
|
||||
summary = self.io.execute_query(
|
||||
"MATCH (src:Node {uri: $src, user: $user, collection: $collection}) "
|
||||
"MATCH (dest:Literal {value: $dest, user: $user, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, user: $user, collection: $collection}]->(dest)",
|
||||
src=src, dest=dest, uri=uri, user=user, collection=collection,
|
||||
"MATCH (src:Node {uri: $src, workspace: $workspace, collection: $collection}) "
|
||||
"MATCH (dest:Literal {value: $dest, workspace: $workspace, collection: $collection}) "
|
||||
"MERGE (src)-[:Rel {uri: $uri, workspace: $workspace, collection: $collection}]->(dest)",
|
||||
src=src, dest=dest, uri=uri, workspace=workspace, collection=collection,
|
||||
database_=self.db,
|
||||
).summary
|
||||
|
||||
|
|
@ -266,75 +260,70 @@ class Processor(CollectionConfigHandler, TriplesStoreService):
|
|||
help=f'Neo4j database (default: {default_database})'
|
||||
)
|
||||
|
||||
def _collection_exists_in_db(self, user, collection):
|
||||
def _collection_exists_in_db(self, workspace, collection):
|
||||
"""Check if collection metadata node exists"""
|
||||
with self.io.session(database=self.db) as session:
|
||||
result = session.run(
|
||||
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MATCH (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"RETURN c LIMIT 1",
|
||||
user=user, collection=collection
|
||||
workspace=workspace, collection=collection
|
||||
)
|
||||
return bool(list(result))
|
||||
|
||||
def _create_collection_in_db(self, user, collection):
|
||||
def _create_collection_in_db(self, workspace, collection):
|
||||
"""Create collection metadata node"""
|
||||
import datetime
|
||||
with self.io.session(database=self.db) as session:
|
||||
session.run(
|
||||
"MERGE (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MERGE (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"SET c.created_at = $created_at",
|
||||
user=user, collection=collection,
|
||||
workspace=workspace, collection=collection,
|
||||
created_at=datetime.datetime.now().isoformat()
|
||||
)
|
||||
logger.info(f"Created collection metadata node for {user}/{collection}")
|
||||
logger.info(f"Created collection metadata node for {workspace}/{collection}")
|
||||
|
||||
async def create_collection(self, user: str, collection: str, metadata: dict):
|
||||
async def create_collection(self, workspace: str, collection: str, metadata: dict):
|
||||
"""Create collection metadata in Neo4j via config push"""
|
||||
try:
|
||||
if self._collection_exists_in_db(user, collection):
|
||||
logger.info(f"Collection {user}/{collection} already exists")
|
||||
if self._collection_exists_in_db(workspace, collection):
|
||||
logger.info(f"Collection {workspace}/{collection} already exists")
|
||||
else:
|
||||
self._create_collection_in_db(user, collection)
|
||||
logger.info(f"Created collection {user}/{collection}")
|
||||
self._create_collection_in_db(workspace, collection)
|
||||
logger.info(f"Created collection {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to create collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_collection(self, user: str, collection: str):
|
||||
async def delete_collection(self, workspace: str, collection: str):
|
||||
"""Delete all data for a specific collection via config push"""
|
||||
try:
|
||||
with self.io.session(database=self.db) as session:
|
||||
# Delete all nodes for this user and collection
|
||||
node_result = session.run(
|
||||
"MATCH (n:Node {user: $user, collection: $collection}) "
|
||||
"MATCH (n:Node {workspace: $workspace, collection: $collection}) "
|
||||
"DETACH DELETE n",
|
||||
user=user, collection=collection
|
||||
workspace=workspace, collection=collection
|
||||
)
|
||||
nodes_deleted = node_result.consume().counters.nodes_deleted
|
||||
|
||||
# Delete all literals for this user and collection
|
||||
literal_result = session.run(
|
||||
"MATCH (n:Literal {user: $user, collection: $collection}) "
|
||||
"MATCH (n:Literal {workspace: $workspace, collection: $collection}) "
|
||||
"DETACH DELETE n",
|
||||
user=user, collection=collection
|
||||
workspace=workspace, collection=collection
|
||||
)
|
||||
literals_deleted = literal_result.consume().counters.nodes_deleted
|
||||
|
||||
# Note: Relationships are automatically deleted with DETACH DELETE
|
||||
|
||||
# Delete collection metadata node
|
||||
metadata_result = session.run(
|
||||
"MATCH (c:CollectionMetadata {user: $user, collection: $collection}) "
|
||||
"MATCH (c:CollectionMetadata {workspace: $workspace, collection: $collection}) "
|
||||
"DELETE c",
|
||||
user=user, collection=collection
|
||||
workspace=workspace, collection=collection
|
||||
)
|
||||
metadata_deleted = metadata_result.consume().counters.nodes_deleted
|
||||
|
||||
logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {user}/{collection}")
|
||||
logger.info(f"Deleted {nodes_deleted} nodes, {literals_deleted} literals, and {metadata_deleted} metadata nodes for {workspace}/{collection}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection {user}/{collection}: {e}", exc_info=True)
|
||||
logger.error(f"Failed to delete collection {workspace}/{collection}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def run():
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue