diff --git a/dev-tools/proc-group/README.md b/dev-tools/proc-group/README.md new file mode 100644 index 00000000..1874ea36 --- /dev/null +++ b/dev-tools/proc-group/README.md @@ -0,0 +1,117 @@ +# proc-group — run TrustGraph as a single process + +A dev-focused alternative to the per-container deployment. Instead of 30+ +containers each running a single processor, `processor-group` runs all the +processors as asyncio tasks inside one Python process, sharing the event +loop, Prometheus registry, and (importantly) resources on your laptop. + +This is **not** for production. Scale deployments should keep using +per-processor containers — one failure bringing down the whole process, +no horizontal scaling, and a single giant log are fine for dev and a +bad idea in prod. + +## What this directory contains + +- `group.yaml` — the group runner config. One entry per processor, each + with the dotted class path and a params dict. Defaults (pubsub backend, + rabbitmq host, log level) are pulled in per-entry with a YAML anchor. +- `README.md` — this file. + +## Prerequisites + +Install the TrustGraph packages into a venv: + +``` +pip install trustgraph-base trustgraph-flow trustgraph-unstructured +``` + +`trustgraph-base` provides the `processor-group` endpoint. The others +provide the processor classes that `group.yaml` imports at runtime. +`trustgraph-unstructured` is only needed if you want `document-decoder` +(the `universal-decoder` processor). + +## Running it + +Start infrastructure (cassandra, qdrant, rabbitmq, garage, observability +stack) with a working compose file. These aren't packable into the group - +they're third-party services. You may be able to run these as standalone +services. + +To get Cassandra to be accessible from the host, you need to +set a couple of environment variables: +``` + CASSANDRA_BROADCAST_ADDRESS: 127.0.0.1 + CASSANDRA_LISTEN_ADDRESS: 127.0.0.1 +``` +and also set `network: host`. Then start services: + +``` +podman-compose up -d cassandra qdrant rabbitmq +podman-compose up -d garage garage-init +podman-compose up -d loki prometheus grafana +podman-compose up -d init-trustgraph +``` + +`init-trustgraph` is a one-shot that seeds config and the default flow +into cassandra/rabbitmq. Don't leave too long a delay between starting +`init-trustgraph` and running the processor-group, because it needs to +talk to the config service. + +Run the api-gateway separately — it's an aiohttp HTTP server, not an +`AsyncProcessor`, so the group runner doesn't host it: + +Raise the file descriptor limit — 30+ processors sharing one process +open far more sockets than the default 1024 allows: + +``` +ulimit -n 65536 +``` + +Then start the group from a terminal: + +``` +processor-group -c group.yaml --no-loki-enabled +``` + +You'll see every processor's startup messages interleaved in one log. +Each processor has a supervisor that restarts it independently on +failure, so a transient crash (or a dependency that isn't ready yet) +only affects that one processor — siblings keep running and the failing +one self-heals on the next retry. + +Finally when everything is running you can start the API gateway from +its own terminal: + +``` +api-gateway \ + --pubsub-backend rabbitmq --rabbitmq-host localhost \ + --loki-url http://localhost:3100/loki/api/v1/push \ + --no-metrics +``` + + + +## When things go wrong + +- **"Too many open files"** — raise `ulimit -n` further. 65536 is + usually plenty but some workflows need more. +- **One processor failing repeatedly** — look for its id in the log. The + supervisor will log each failure before restarting. Fix the cause + (missing env var, unreachable dependency, bad params) and the + processor self-heals on the next 4-second retry without restarting + the whole group. +- **Ctrl-C leaves the process hung** — the pika and cassandra drivers + spawn non-cooperative threads that asyncio can't cancel. Use Ctrl-\ + (SIGQUIT) to force-kill. Not a bug in the group runner, just a + limitation of those libraries. + +## Environment variables + +Processors that talk to external LLMs or APIs read their credentials +from env vars, same as in the per-container deployment: + +- `OPENAI_TOKEN`, `OPENAI_BASE_URL` — for `text-completion` / + `text-completion-rag` + +Export whatever your particular `group.yaml` needs before running. + diff --git a/dev-tools/proc-group/group.yaml b/dev-tools/proc-group/group.yaml new file mode 100644 index 00000000..98ef5016 --- /dev/null +++ b/dev-tools/proc-group/group.yaml @@ -0,0 +1,257 @@ +# Multi-processor group config, derived from docker-compose.yaml. +# +# Covers every AsyncProcessor-based service from the compose file. +# Out of scope: +# - api-gateway (aiohttp, not AsyncProcessor) +# - init-trustgraph (one-shot init, not a processor) +# - document-decoder (universal-decoder, trustgraph-unstructured package — +# packable but lives in a separate image/package) +# - mcp-server (trustgraph-mcp package, separate image) +# - ddg-mcp-server (third-party image) +# - infrastructure (cassandra, rabbitmq, qdrant, garage, grafana, +# prometheus, loki, workbench-ui) +# +# Run with: +# processor-group -c group.yaml + +_defaults: &defaults + pubsub_backend: rabbitmq + rabbitmq_host: localhost + log_level: INFO + +processors: + + - class: trustgraph.agent.orchestrator.Processor + params: + <<: *defaults + id: agent-manager + + - class: trustgraph.chunking.recursive.Processor + params: + <<: *defaults + id: chunker + chunk_size: 2000 + chunk_overlap: 50 + + - class: trustgraph.config.service.Processor + params: + <<: *defaults + id: config-svc + cassandra_host: localhost + + - class: trustgraph.decoding.universal.Processor + params: + <<: *defaults + id: document-decoder + + - class: trustgraph.embeddings.document_embeddings.Processor + params: + <<: *defaults + id: document-embeddings + + - class: trustgraph.retrieval.document_rag.Processor + params: + <<: *defaults + id: document-rag + doc_limit: 20 + + - class: trustgraph.embeddings.fastembed.Processor + params: + <<: *defaults + id: embeddings + concurrency: 1 + + - class: trustgraph.embeddings.graph_embeddings.Processor + params: + <<: *defaults + id: graph-embeddings + + - class: trustgraph.retrieval.graph_rag.Processor + params: + <<: *defaults + id: graph-rag + concurrency: 1 + entity_limit: 50 + triple_limit: 30 + edge_limit: 30 + edge_score_limit: 10 + max_subgraph_size: 100 + max_path_length: 2 + + - class: trustgraph.extract.kg.agent.Processor + params: + <<: *defaults + id: kg-extract-agent + concurrency: 1 + + - class: trustgraph.extract.kg.definitions.Processor + params: + <<: *defaults + id: kg-extract-definitions + concurrency: 1 + + - class: trustgraph.extract.kg.ontology.Processor + params: + <<: *defaults + id: kg-extract-ontology + concurrency: 1 + + - class: trustgraph.extract.kg.relationships.Processor + params: + <<: *defaults + id: kg-extract-relationships + concurrency: 1 + + - class: trustgraph.extract.kg.rows.Processor + params: + <<: *defaults + id: kg-extract-rows + concurrency: 1 + + - class: trustgraph.cores.service.Processor + params: + <<: *defaults + id: knowledge + cassandra_host: localhost + + - class: trustgraph.storage.knowledge.store.Processor + params: + <<: *defaults + id: kg-store + cassandra_host: localhost + + - class: trustgraph.librarian.Processor + params: + <<: *defaults + id: librarian + cassandra_host: localhost + object_store_endpoint: localhost:3900 + object_store_access_key: GK000000000000000000000001 + object_store_secret_key: b171f00be9be4c32c734f4c05fe64c527a8ab5eb823b376cfa8c2531f70fc427 + object_store_region: garage + + - class: trustgraph.agent.mcp_tool.Service + params: + <<: *defaults + id: mcp-tool + + - class: trustgraph.metering.Processor + params: + <<: *defaults + id: metering + + - class: trustgraph.metering.Processor + params: + <<: *defaults + id: metering-rag + + - class: trustgraph.retrieval.nlp_query.Processor + params: + <<: *defaults + id: nlp-query + + - class: trustgraph.prompt.template.Processor + params: + <<: *defaults + id: prompt + concurrency: 1 + + - class: trustgraph.prompt.template.Processor + params: + <<: *defaults + id: prompt-rag + concurrency: 1 + + - class: trustgraph.query.doc_embeddings.qdrant.Processor + params: + <<: *defaults + id: doc-embeddings-query + store_uri: http://localhost:6333 + + - class: trustgraph.query.graph_embeddings.qdrant.Processor + params: + <<: *defaults + id: graph-embeddings-query + store_uri: http://localhost:6333 + + - class: trustgraph.query.row_embeddings.qdrant.Processor + params: + <<: *defaults + id: row-embeddings-query + store_uri: http://localhost:6333 + + - class: trustgraph.query.rows.cassandra.Processor + params: + <<: *defaults + id: rows-query + cassandra_host: localhost + + - class: trustgraph.query.triples.cassandra.Processor + params: + <<: *defaults + id: triples-query + cassandra_host: localhost + + - class: trustgraph.embeddings.row_embeddings.Processor + params: + <<: *defaults + id: row-embeddings + + - class: trustgraph.query.sparql.Processor + params: + <<: *defaults + id: sparql-query + + - class: trustgraph.storage.doc_embeddings.qdrant.Processor + params: + <<: *defaults + id: doc-embeddings-write + store_uri: http://localhost:6333 + + - class: trustgraph.storage.graph_embeddings.qdrant.Processor + params: + <<: *defaults + id: graph-embeddings-write + store_uri: http://localhost:6333 + + - class: trustgraph.storage.row_embeddings.qdrant.Processor + params: + <<: *defaults + id: row-embeddings-write + store_uri: http://localhost:6333 + + - class: trustgraph.storage.rows.cassandra.Processor + params: + <<: *defaults + id: rows-write + cassandra_host: localhost + + - class: trustgraph.storage.triples.cassandra.Processor + params: + <<: *defaults + id: triples-write + cassandra_host: localhost + + - class: trustgraph.retrieval.structured_diag.Processor + params: + <<: *defaults + id: structured-diag + + - class: trustgraph.retrieval.structured_query.Processor + params: + <<: *defaults + id: structured-query + + - class: trustgraph.model.text_completion.openai.Processor + params: + <<: *defaults + id: text-completion + max_output: 8192 + temperature: 0.0 + + - class: trustgraph.model.text_completion.openai.Processor + params: + <<: *defaults + id: text-completion-rag + max_output: 8192 + temperature: 0.0 diff --git a/trustgraph-base/pyproject.toml b/trustgraph-base/pyproject.toml index 216ccbd6..e4a640bd 100644 --- a/trustgraph-base/pyproject.toml +++ b/trustgraph-base/pyproject.toml @@ -24,6 +24,9 @@ classifiers = [ [project.urls] Homepage = "https://github.com/trustgraph-ai/trustgraph" +[project.scripts] +processor-group = "trustgraph.base.processor_group:run" + [tool.setuptools.packages.find] include = ["trustgraph*"] diff --git a/trustgraph-base/trustgraph/base/processor_group.py b/trustgraph-base/trustgraph/base/processor_group.py new file mode 100644 index 00000000..2e370f17 --- /dev/null +++ b/trustgraph-base/trustgraph/base/processor_group.py @@ -0,0 +1,197 @@ + +# Multi-processor group runner. Runs multiple AsyncProcessor descendants +# as concurrent tasks inside a single process, sharing one event loop, +# one Prometheus HTTP server, and one pub/sub backend pool. +# +# Intended for dev and resource-constrained deployments. Scale deployments +# should continue to use per-processor endpoints. +# +# Group config is a YAML or JSON file with shape: +# +# processors: +# - class: trustgraph.extract.kg.definitions.extract.Processor +# params: +# id: kg-extract-definitions +# triples_batch_size: 1000 +# - class: trustgraph.chunking.recursive.Processor +# params: +# id: chunker-recursive +# +# Each entry's params are passed directly to the class constructor alongside +# the shared taskgroup. Defaults live inside each processor class. + +import argparse +import asyncio +import importlib +import json +import logging +import time + +from prometheus_client import start_http_server + +from . logging import add_logging_args, setup_logging + +logger = logging.getLogger(__name__) + + +def _load_config(path): + with open(path) as f: + text = f.read() + if path.endswith((".yaml", ".yml")): + import yaml + return yaml.safe_load(text) + return json.loads(text) + + +def _resolve_class(dotted): + module_path, _, class_name = dotted.rpartition(".") + if not module_path: + raise ValueError( + f"Processor class must be a dotted path, got {dotted!r}" + ) + module = importlib.import_module(module_path) + return getattr(module, class_name) + + +RESTART_DELAY_SECONDS = 4 + + +async def _supervise(entry): + """Run one processor with its own nested TaskGroup, restarting on any + failure. Each processor is isolated from its siblings — a crash here + does not propagate to the outer group.""" + + pid = entry["params"]["id"] + class_path = entry["class"] + + while True: + + try: + + async with asyncio.TaskGroup() as inner_tg: + + cls = _resolve_class(class_path) + params = dict(entry.get("params", {})) + params["taskgroup"] = inner_tg + + logger.info(f"Starting {class_path} as {pid}") + + p = cls(**params) + await p.start() + inner_tg.create_task(p.run()) + + # Clean exit — processor's run() returned without raising. + # Treat as a transient shutdown and restart, matching the + # behaviour of per-container `restart: on-failure`. + logger.warning( + f"Processor {pid} exited cleanly, will restart" + ) + + except asyncio.CancelledError: + logger.info(f"Processor {pid} cancelled") + raise + + except BaseExceptionGroup as eg: + for e in eg.exceptions: + logger.error( + f"Processor {pid} failure: {type(e).__name__}: {e}", + exc_info=e, + ) + + except Exception as e: + logger.error( + f"Processor {pid} failure: {type(e).__name__}: {e}", + exc_info=True, + ) + + logger.info( + f"Restarting {pid} in {RESTART_DELAY_SECONDS}s..." + ) + await asyncio.sleep(RESTART_DELAY_SECONDS) + + +async def run_group(config): + + entries = config.get("processors", []) + if not entries: + raise RuntimeError("Group config has no processors") + + seen_ids = set() + for entry in entries: + pid = entry.get("params", {}).get("id") + if pid is None: + raise RuntimeError( + f"Entry {entry.get('class')!r} missing params.id — " + f"required for metrics labelling" + ) + if pid in seen_ids: + raise RuntimeError(f"Duplicate processor id {pid!r} in group") + seen_ids.add(pid) + + async with asyncio.TaskGroup() as outer_tg: + for entry in entries: + outer_tg.create_task(_supervise(entry)) + + +def run(): + + parser = argparse.ArgumentParser( + prog="processor-group", + description="Run multiple processors as tasks in one process", + ) + + parser.add_argument( + "-c", "--config", + required=True, + help="Path to group config file (JSON or YAML)", + ) + + parser.add_argument( + "--metrics", + action=argparse.BooleanOptionalAction, + default=True, + help="Metrics enabled (default: true)", + ) + + parser.add_argument( + "-P", "--metrics-port", + type=int, + default=8000, + help="Prometheus metrics port (default: 8000)", + ) + + add_logging_args(parser) + + args = vars(parser.parse_args()) + + setup_logging(args) + + config = _load_config(args["config"]) + + if args["metrics"]: + start_http_server(args["metrics_port"]) + + while True: + + logger.info("Starting group...") + + try: + asyncio.run(run_group(config)) + + except KeyboardInterrupt: + logger.info("Keyboard interrupt.") + return + + except ExceptionGroup as e: + logger.error("Exception group:") + for se in e.exceptions: + logger.error(f" Type: {type(se)}") + logger.error(f" Exception: {se}", exc_info=se) + + except Exception as e: + logger.error(f"Type: {type(e)}") + logger.error(f"Exception: {e}", exc_info=True) + + logger.warning("Will retry...") + time.sleep(4) + logger.info("Retrying...") diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index 8d1aca9e..4e465bf7 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -9,7 +9,7 @@ from aiohttp import web import logging import os -from trustgraph.base.logging import setup_logging +from trustgraph.base.logging import setup_logging, add_logging_args from trustgraph.base.pubsub import get_pubsub, add_pubsub_args from . auth import Authenticator @@ -195,12 +195,7 @@ def run(): help=f'Secret API token (default: no auth)', ) - parser.add_argument( - '-l', '--log-level', - default='INFO', - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], - help=f'Log level (default: INFO)' - ) + add_logging_args(parser) parser.add_argument( '--metrics', diff --git a/trustgraph-flow/trustgraph/metering/counter.py b/trustgraph-flow/trustgraph/metering/counter.py index 3e0b610c..46460b1f 100644 --- a/trustgraph-flow/trustgraph/metering/counter.py +++ b/trustgraph-flow/trustgraph/metering/counter.py @@ -102,10 +102,10 @@ class Processor(FlowProcessor): __class__.cost_metric.labels(model=modelname, direction="input").inc(cost_in) __class__.cost_metric.labels(model=modelname, direction="output").inc(cost_out) - logger.info(f"Model: {modelname}") - logger.info(f"Input Tokens: {num_in}") - logger.info(f"Output Tokens: {num_out}") - logger.info(f"Cost for call: ${cost_per_call}") + logger.debug( + f"Model: {modelname}, in={num_in}, out={num_out}, " + f"cost=${cost_per_call}" + ) @staticmethod def add_args(parser):