Loki logging (#586)

* Consolidate logging into a single module

* Added Loki logging

* Update tech spec

* Add processor label

* Fix recursive log entries, logging Loki"s internals
This commit is contained in:
cybermaggedon 2025-12-09 23:24:41 +00:00 committed by GitHub
parent 39f6a8b940
commit f12fcc2652
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 415 additions and 80 deletions

View file

@ -2,17 +2,29 @@
## Overview
TrustGraph uses Python's built-in `logging` module for all logging operations. This provides a standardized, flexible approach to logging across all components of the system.
TrustGraph uses Python's built-in `logging` module for all logging operations, with centralized configuration and optional Loki integration for log aggregation. This provides a standardized, flexible approach to logging across all components of the system.
## Default Configuration
### Logging Level
- **Default Level**: `INFO`
- **Debug Mode**: `DEBUG` (enabled via command-line argument)
- **Production**: `WARNING` or `ERROR` as appropriate
- **Configurable via**: `--log-level` command-line argument
- **Choices**: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`
### Output Destination
All logs should be written to **standard output (stdout)** to ensure compatibility with containerized environments and log aggregation systems.
### Output Destinations
1. **Console (stdout)**: Always enabled - ensures compatibility with containerized environments
2. **Loki**: Optional centralized log aggregation (enabled by default, can be disabled)
## Centralized Logging Module
All logging configuration is managed by `trustgraph.base.logging` module, which provides:
- `add_logging_args(parser)` - Adds standard logging CLI arguments
- `setup_logging(args)` - Configures logging from parsed arguments
This module is used by all server-side components:
- AsyncProcessor-based services
- API Gateway
- MCP Server
## Implementation Guidelines
@ -26,39 +38,80 @@ import logging
logger = logging.getLogger(__name__)
```
### 2. Centralized Configuration
The logger name is automatically used as a label in Loki for filtering and searching.
The logging configuration should be centralized in `async_processor.py` (or a dedicated logging configuration module) since it's inherited by much of the codebase:
### 2. Service Initialization
All server-side services automatically get logging configuration through the centralized module:
```python
import logging
from trustgraph.base import add_logging_args, setup_logging
import argparse
def setup_logging(log_level='INFO'):
"""Configure logging for the entire application"""
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
def parse_args():
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'--log-level',
default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help='Set the logging level (default: INFO)'
)
return parser.parse_args()
# In main execution
if __name__ == '__main__':
args = parse_args()
setup_logging(args.log_level)
# Add standard logging arguments (includes Loki configuration)
add_logging_args(parser)
# Add your service-specific arguments
parser.add_argument('--port', type=int, default=8080)
args = parser.parse_args()
args = vars(args)
# Setup logging early in startup
setup_logging(args)
# Rest of your service initialization
logger = logging.getLogger(__name__)
logger.info("Service starting...")
```
### 3. Logging Best Practices
### 3. Command-Line Arguments
All services support these logging arguments:
**Log Level:**
```bash
--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
```
**Loki Configuration:**
```bash
--loki-enabled # Enable Loki (default)
--no-loki-enabled # Disable Loki
--loki-url URL # Loki push URL (default: http://loki:3100/loki/api/v1/push)
--loki-username USERNAME # Optional authentication
--loki-password PASSWORD # Optional authentication
```
**Examples:**
```bash
# Default - INFO level, Loki enabled
./my-service
# Debug mode, console only
./my-service --log-level DEBUG --no-loki-enabled
# Custom Loki server with auth
./my-service --loki-url http://loki.prod:3100/loki/api/v1/push \
--loki-username admin --loki-password secret
```
### 4. Environment Variables
Loki configuration supports environment variable fallbacks:
```bash
export LOKI_URL=http://loki.prod:3100/loki/api/v1/push
export LOKI_USERNAME=admin
export LOKI_PASSWORD=secret
```
Command-line arguments take precedence over environment variables.
### 5. Logging Best Practices
#### Log Levels Usage
- **DEBUG**: Detailed information for diagnosing problems (variable values, function entry/exit)
@ -89,20 +142,25 @@ if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Debug data: {debug_data}")
```
### 4. Structured Logging
### 6. Structured Logging with Loki
For complex data, use structured logging:
For complex data, use structured logging with extra tags for Loki:
```python
logger.info("Request processed", extra={
'tags': {
'request_id': request_id,
'duration_ms': duration,
'status_code': status_code,
'user_id': user_id
'user_id': user_id,
'status': 'success'
}
})
```
### 5. Exception Logging
These tags become searchable labels in Loki, in addition to automatic labels:
- `severity` - Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- `logger` - Module name (from `__name__`)
### 7. Exception Logging
Always include stack traces for exceptions:
@ -114,9 +172,13 @@ except Exception as e:
raise
```
### 6. Async Logging Considerations
### 8. Async Logging Considerations
For async code, ensure thread-safe logging:
The logging system uses non-blocking queued handlers for Loki:
- Console output is synchronous (fast)
- Loki output is queued with 500-message buffer
- Background thread handles Loki transmission
- No blocking of main application code
```python
import asyncio
@ -124,46 +186,165 @@ import logging
async def async_operation():
logger = logging.getLogger(__name__)
# Logging is thread-safe and won't block async operations
logger.info(f"Starting async operation in task: {asyncio.current_task().get_name()}")
```
## Environment Variables
## Loki Integration
Support environment-based configuration as a fallback:
### Architecture
The logging system uses Python's built-in `QueueHandler` and `QueueListener` for non-blocking Loki integration:
1. **QueueHandler**: Logs are placed in a 500-message queue (non-blocking)
2. **Background Thread**: QueueListener sends logs to Loki asynchronously
3. **Graceful Degradation**: If Loki is unavailable, console logging continues
### Automatic Labels
Every log sent to Loki includes:
- `processor`: Processor identity (e.g., `config-svc`, `text-completion`, `embeddings`)
- `severity`: Log level (DEBUG, INFO, etc.)
- `logger`: Module name (e.g., `trustgraph.gateway.service`, `trustgraph.agent.react.service`)
### Custom Labels
Add custom labels via the `extra` parameter:
```python
import os
log_level = os.environ.get('TRUSTGRAPH_LOG_LEVEL', 'INFO')
logger.info("User action", extra={
'tags': {
'user_id': user_id,
'action': 'document_upload',
'collection': collection_name
}
})
```
### Querying Logs in Loki
```logql
# All logs from a specific processor (recommended - matches Prometheus metrics)
{processor="config-svc"}
{processor="text-completion"}
{processor="embeddings"}
# Error logs from a specific processor
{processor="config-svc", severity="ERROR"}
# Error logs from all processors
{severity="ERROR"}
# Logs from a specific processor with text filter
{processor="text-completion"} |= "Processing"
# All logs from API gateway
{processor="api-gateway"}
# Logs from processors matching pattern
{processor=~".*-completion"}
# Logs with custom tags
{processor="api-gateway"} | json | user_id="12345"
```
### Graceful Degradation
If Loki is unavailable or `python-logging-loki` is not installed:
- Warning message printed to console
- Console logging continues normally
- Application continues running
- No retry logic for Loki connection (fail fast, degrade gracefully)
## Testing
During tests, consider using a different logging configuration:
```python
# In test setup
logging.getLogger().setLevel(logging.WARNING) # Reduce noise during tests
import logging
# Reduce noise during tests
logging.getLogger().setLevel(logging.WARNING)
# Or disable Loki for tests
setup_logging({'log_level': 'WARNING', 'loki_enabled': False})
```
## Monitoring Integration
Ensure log format is compatible with monitoring tools:
- Include timestamps in ISO format
- Use consistent field names
- Include correlation IDs where applicable
- Structure logs for easy parsing (JSON format for production)
### Standard Format
All logs use consistent format:
```
2025-01-09 10:30:45,123 - trustgraph.gateway.service - INFO - Request processed
```
Format components:
- Timestamp (ISO format with milliseconds)
- Logger name (module path)
- Log level
- Message
### Loki Queries for Monitoring
Common monitoring queries:
```logql
# Error rate by processor
rate({severity="ERROR"}[5m]) by (processor)
# Top error-producing processors
topk(5, count_over_time({severity="ERROR"}[1h]) by (processor))
# Recent errors with processor name
{severity="ERROR"} | line_format "{{.processor}}: {{.message}}"
# All agent processors
{processor=~".*agent.*"} |= "exception"
# Specific processor error count
count_over_time({processor="config-svc", severity="ERROR"}[1h])
```
## Security Considerations
- Never log sensitive information (passwords, API keys, personal data)
- Sanitize user input before logging
- Use placeholders for sensitive fields: `user_id=****1234`
- **Never log sensitive information** (passwords, API keys, personal data, tokens)
- **Sanitize user input** before logging
- **Use placeholders** for sensitive fields: `user_id=****1234`
- **Loki authentication**: Use `--loki-username` and `--loki-password` for secure deployments
- **Secure transport**: Use HTTPS for Loki URL in production: `https://loki.prod:3100/loki/api/v1/push`
## Dependencies
The centralized logging module requires:
- `python-logging-loki` - For Loki integration (optional, graceful degradation if missing)
Already included in `trustgraph-base/pyproject.toml` and `requirements.txt`.
## Migration Path
For existing code using print statements:
1. Replace `print()` with appropriate logger calls
2. Choose appropriate log levels based on message importance
3. Add context to make logs more useful
4. Test logging output at different levels
For existing code:
1. **Services already using AsyncProcessor**: No changes needed, Loki support is automatic
2. **Services not using AsyncProcessor** (api-gateway, mcp-server): Already updated
3. **CLI tools**: Out of scope - continue using print() or simple logging
### From print() to logging:
```python
# Before
print(f"Processing document {doc_id}")
# After
logger = logging.getLogger(__name__)
logger.info(f"Processing document {doc_id}")
```
## Configuration Summary
| Argument | Default | Environment Variable | Description |
|----------|---------|---------------------|-------------|
| `--log-level` | `INFO` | - | Console and Loki log level |
| `--loki-enabled` | `True` | - | Enable Loki logging |
| `--loki-url` | `http://loki:3100/loki/api/v1/push` | `LOKI_URL` | Loki push endpoint |
| `--loki-username` | `None` | `LOKI_USERNAME` | Loki auth username |
| `--loki-password` | `None` | `LOKI_PASSWORD` | Loki auth password |

View file

@ -21,3 +21,4 @@ prometheus-client
pyarrow
boto3
ollama
python-logging-loki

View file

@ -13,6 +13,7 @@ dependencies = [
"pulsar-client",
"prometheus-client",
"requests",
"python-logging-loki",
]
classifiers = [
"Programming Language :: Python :: 3",

View file

@ -6,6 +6,7 @@ from . producer import Producer
from . publisher import Publisher
from . subscriber import Subscriber
from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
from . logging import add_logging_args, setup_logging
from . flow_processor import FlowProcessor
from . consumer_spec import ConsumerSpec
from . parameter_spec import ParameterSpec

View file

@ -19,6 +19,7 @@ from . pubsub import PulsarClient
from . producer import Producer
from . consumer import Consumer
from . metrics import ProcessorMetrics, ConsumerMetrics
from . logging import add_logging_args, setup_logging
default_config_queue = config_push_queue
@ -165,18 +166,9 @@ class AsyncProcessor:
raise e
@classmethod
def setup_logging(cls, log_level='INFO'):
def setup_logging(cls, args):
"""Configure logging for the entire application"""
# Support environment variable override
env_log_level = os.environ.get('TRUSTGRAPH_LOG_LEVEL', log_level)
# Configure logging
logging.basicConfig(
level=getattr(logging, env_log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger.info(f"Logging configured with level: {env_log_level}")
setup_logging(args)
# Startup fabric. launch calls launch_async in async mode.
@classmethod
@ -203,7 +195,7 @@ class AsyncProcessor:
args = vars(args)
# Setup logging before anything else
cls.setup_logging(args.get('log_level', 'INFO').upper())
cls.setup_logging(args)
# Debug
logger.debug(f"Arguments: {args}")
@ -256,6 +248,7 @@ class AsyncProcessor:
def add_args(parser):
PulsarClient.add_args(parser)
add_logging_args(parser)
parser.add_argument(
'--config-push-queue',

View file

@ -0,0 +1,155 @@
"""
Centralized logging configuration for TrustGraph server-side components.
This module provides standardized logging setup across all TrustGraph services,
ensuring consistent log formats, levels, and command-line arguments.
Supports dual output to console and Loki for centralized log aggregation.
"""
import logging
import logging.handlers
from queue import Queue
import os
def add_logging_args(parser):
"""
Add standard logging arguments to an argument parser.
Args:
parser: argparse.ArgumentParser instance to add arguments to
"""
parser.add_argument(
'-l', '--log-level',
default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help='Log level (default: INFO)'
)
parser.add_argument(
'--loki-enabled',
action='store_true',
default=True,
help='Enable Loki logging (default: True)'
)
parser.add_argument(
'--no-loki-enabled',
dest='loki_enabled',
action='store_false',
help='Disable Loki logging'
)
parser.add_argument(
'--loki-url',
default=os.getenv('LOKI_URL', 'http://loki:3100/loki/api/v1/push'),
help='Loki push URL (default: http://loki:3100/loki/api/v1/push)'
)
parser.add_argument(
'--loki-username',
default=os.getenv('LOKI_USERNAME', None),
help='Loki username for authentication (optional)'
)
parser.add_argument(
'--loki-password',
default=os.getenv('LOKI_PASSWORD', None),
help='Loki password for authentication (optional)'
)
def setup_logging(args):
"""
Configure logging from parsed command-line arguments.
Sets up logging with a standardized format and output to stdout.
Optionally enables Loki integration for centralized log aggregation.
This should be called early in application startup, before any
logging calls are made.
Args:
args: Dictionary of parsed arguments (typically from vars(args))
Must contain 'log_level' key, optional Loki configuration
"""
log_level = args.get('log_level', 'INFO')
loki_enabled = args.get('loki_enabled', True)
# Build list of handlers starting with console
handlers = [logging.StreamHandler()]
# Add Loki handler if enabled
queue_listener = None
if loki_enabled:
loki_url = args.get('loki_url', 'http://loki:3100/loki/api/v1/push')
loki_username = args.get('loki_username')
loki_password = args.get('loki_password')
processor_id = args.get('id') # Processor identity (e.g., "config-svc", "text-completion")
try:
from logging_loki import LokiHandler
# Create Loki handler with optional authentication and processor label
loki_handler_kwargs = {
'url': loki_url,
'version': "1",
}
if loki_username and loki_password:
loki_handler_kwargs['auth'] = (loki_username, loki_password)
# Add processor label if available (for consistency with Prometheus metrics)
if processor_id:
loki_handler_kwargs['tags'] = {'processor': processor_id}
loki_handler = LokiHandler(**loki_handler_kwargs)
# Wrap in QueueHandler for non-blocking operation
log_queue = Queue(maxsize=500)
queue_handler = logging.handlers.QueueHandler(log_queue)
handlers.append(queue_handler)
# Start QueueListener in background thread
queue_listener = logging.handlers.QueueListener(
log_queue,
loki_handler,
respect_handler_level=True
)
queue_listener.start()
# Store listener reference for potential cleanup
# (attached to root logger for access if needed)
logging.getLogger().loki_queue_listener = queue_listener
except ImportError:
# Graceful degradation if python-logging-loki not installed
print("WARNING: python-logging-loki not installed, Loki logging disabled")
print("Install with: pip install python-logging-loki")
except Exception as e:
# Graceful degradation if Loki connection fails
print(f"WARNING: Failed to setup Loki logging: {e}")
print("Continuing with console-only logging")
# Configure logging with all handlers
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=handlers,
force=True # Force reconfiguration if already configured
)
# Prevent recursive logging from Loki's HTTP client
if loki_enabled and queue_listener:
# Disable urllib3 logging to prevent infinite loop
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('urllib3.connectionpool').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.info(f"Logging configured with level: {log_level}")
if loki_enabled and queue_listener:
logger.info(f"Loki logging enabled: {loki_url}")
elif loki_enabled:
logger.warning("Loki logging requested but not available")

View file

@ -71,10 +71,3 @@ class PulsarClient:
'--pulsar-listener',
help=f'Pulsar listener (default: none)',
)
parser.add_argument(
'-l', '--log-level',
default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help=f'Log level (default: INFO)'
)

View file

@ -9,7 +9,7 @@ from aiohttp import web
import logging
import os
from .. log_level import LogLevel
from trustgraph.base.logging import setup_logging
from . auth import Authenticator
from . config.receiver import ConfigReceiver
@ -29,7 +29,6 @@ from .. schema import (
)
logger = logging.getLogger("api")
logger.setLevel(logging.INFO)
default_pulsar_host = os.getenv("PULSAR_HOST", "pulsar://pulsar:6650")
default_prometheus_url = os.getenv("PROMETHEUS_URL", "http://prometheus:9090")
@ -285,6 +284,9 @@ def run():
args = parser.parse_args()
args = vars(args)
# Setup logging before creating API instance
setup_logging(args)
if args["metrics"]:
start_http_server(args["metrics_port"])

View file

@ -16,6 +16,8 @@ from mcp.server.fastmcp import FastMCP, Context
from mcp.types import TextContent
from websockets.asyncio.client import connect
from trustgraph.base.logging import add_logging_args, setup_logging
from . tg_socket import WebSocketManager
@dataclass
@ -2041,8 +2043,14 @@ def main():
parser.add_argument('--port', type=int, default=8000, help='Port to bind to (default: 8000)')
parser.add_argument('--websocket-url', default='ws://api-gateway:8088/api/v1/socket', help='WebSocket URL to connect to (default: ws://api-gateway:8088/api/v1/socket)')
# Add logging arguments
add_logging_args(parser)
args = parser.parse_args()
# Setup logging before creating server
setup_logging(vars(args))
# Create and run the MCP server
server = McpServer(host=args.host, port=args.port, websocket_url=args.websocket_url)
server.run()