mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 00:16:23 +02:00
Change MinIO integration options in librarian to be more generic - to support a Garage integration (#594)
* Tweak object store parameters to be more generic for other S3-type store integration * Update librarian to have region & SSL params * Update MinIO migration tech spec
This commit is contained in:
parent
5304f96fe6
commit
25563bae3c
5 changed files with 333 additions and 44 deletions
|
|
@ -1,8 +1,8 @@
|
|||
# TrustGraph Librarian API
|
||||
|
||||
This API provides document library management for TrustGraph. It handles document storage,
|
||||
metadata management, and processing orchestration using hybrid storage (MinIO for content,
|
||||
Cassandra for metadata) with multi-user support.
|
||||
This API provides document library management for TrustGraph. It handles document storage,
|
||||
metadata management, and processing orchestration using hybrid storage (S3-compatible object
|
||||
storage for content, Cassandra for metadata) with multi-user support.
|
||||
|
||||
## Request/response
|
||||
|
||||
|
|
@ -374,13 +374,14 @@ await client.add_processing(
|
|||
|
||||
## Features
|
||||
|
||||
- **Hybrid Storage**: MinIO for content, Cassandra for metadata
|
||||
- **Hybrid Storage**: S3-compatible object storage (MinIO, Ceph RGW, AWS S3, etc.) for content, Cassandra for metadata
|
||||
- **Multi-user Support**: User-based document ownership and access control
|
||||
- **Rich Metadata**: RDF-style metadata triples and tagging system
|
||||
- **Processing Integration**: Automatic triggering of document processing workflows
|
||||
- **Content Types**: Support for multiple document formats (PDF, text, etc.)
|
||||
- **Collection Management**: Optional document grouping by collection
|
||||
- **Metadata Search**: Query documents by metadata criteria
|
||||
- **Flexible Storage Backend**: Works with any S3-compatible storage (MinIO, Ceph RADOS Gateway, AWS S3, Cloudflare R2, etc.)
|
||||
|
||||
## Use Cases
|
||||
|
||||
|
|
|
|||
258
docs/tech-specs/minio-to-s3-migration.md
Normal file
258
docs/tech-specs/minio-to-s3-migration.md
Normal file
|
|
@ -0,0 +1,258 @@
|
|||
# Tech Spec: S3-Compatible Storage Backend Support
|
||||
|
||||
## Overview
|
||||
|
||||
The Librarian service uses S3-compatible object storage for document blob storage. This spec documents the implementation that enables support for any S3-compatible backend including MinIO, Ceph RADOS Gateway (RGW), AWS S3, Cloudflare R2, DigitalOcean Spaces, and others.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Storage Components
|
||||
- **Blob Storage**: S3-compatible object storage via `minio` Python client library
|
||||
- **Metadata Storage**: Cassandra (stores object_id mapping and document metadata)
|
||||
- **Affected Component**: Librarian service only
|
||||
- **Storage Pattern**: Hybrid storage with metadata in Cassandra, content in S3-compatible storage
|
||||
|
||||
### Implementation
|
||||
- **Library**: `minio` Python client (supports any S3-compatible API)
|
||||
- **Location**: `trustgraph-flow/trustgraph/librarian/blob_store.py`
|
||||
- **Operations**:
|
||||
- `add()` - Store blob with UUID object_id
|
||||
- `get()` - Retrieve blob by object_id
|
||||
- `remove()` - Delete blob by object_id
|
||||
- `ensure_bucket()` - Create bucket if not exists
|
||||
- **Bucket**: `library`
|
||||
- **Object Path**: `doc/{object_id}`
|
||||
- **Supported MIME Types**: `text/plain`, `application/pdf`
|
||||
|
||||
### Key Files
|
||||
1. `trustgraph-flow/trustgraph/librarian/blob_store.py` - BlobStore implementation
|
||||
2. `trustgraph-flow/trustgraph/librarian/librarian.py` - BlobStore initialization
|
||||
3. `trustgraph-flow/trustgraph/librarian/service.py` - Service configuration
|
||||
4. `trustgraph-flow/pyproject.toml` - Dependencies (`minio` package)
|
||||
5. `docs/apis/api-librarian.md` - API documentation
|
||||
|
||||
## Supported Storage Backends
|
||||
|
||||
The implementation works with any S3-compatible object storage system:
|
||||
|
||||
### Tested/Supported
|
||||
- **Ceph RADOS Gateway (RGW)** - Distributed storage system with S3 API (default configuration)
|
||||
- **MinIO** - Lightweight self-hosted object storage
|
||||
- **Garage** - Lightweight geo-distributed S3-compatible storage
|
||||
|
||||
### Should Work (S3-Compatible)
|
||||
- **AWS S3** - Amazon's cloud object storage
|
||||
- **Cloudflare R2** - Cloudflare's S3-compatible storage
|
||||
- **DigitalOcean Spaces** - DigitalOcean's object storage
|
||||
- **Wasabi** - S3-compatible cloud storage
|
||||
- **Backblaze B2** - S3-compatible backup storage
|
||||
- Any other service implementing the S3 REST API
|
||||
|
||||
## Configuration
|
||||
|
||||
### CLI Arguments
|
||||
|
||||
```bash
|
||||
librarian \
|
||||
--object-store-endpoint <hostname:port> \
|
||||
--object-store-access-key <access_key> \
|
||||
--object-store-secret-key <secret_key> \
|
||||
[--object-store-use-ssl] \
|
||||
[--object-store-region <region>]
|
||||
```
|
||||
|
||||
**Note:** Do not include `http://` or `https://` in the endpoint. Use `--object-store-use-ssl` to enable HTTPS.
|
||||
|
||||
### Environment Variables (Alternative)
|
||||
|
||||
```bash
|
||||
OBJECT_STORE_ENDPOINT=<hostname:port>
|
||||
OBJECT_STORE_ACCESS_KEY=<access_key>
|
||||
OBJECT_STORE_SECRET_KEY=<secret_key>
|
||||
OBJECT_STORE_USE_SSL=true|false # Optional, default: false
|
||||
OBJECT_STORE_REGION=<region> # Optional
|
||||
```
|
||||
|
||||
### Examples
|
||||
|
||||
**Ceph RADOS Gateway (default):**
|
||||
```bash
|
||||
--object-store-endpoint ceph-rgw:7480 \
|
||||
--object-store-access-key object-user \
|
||||
--object-store-secret-key object-password
|
||||
```
|
||||
|
||||
**MinIO:**
|
||||
```bash
|
||||
--object-store-endpoint minio:9000 \
|
||||
--object-store-access-key minioadmin \
|
||||
--object-store-secret-key minioadmin
|
||||
```
|
||||
|
||||
**Garage (S3-compatible):**
|
||||
```bash
|
||||
--object-store-endpoint garage:3900 \
|
||||
--object-store-access-key GK000000000000000000000001 \
|
||||
--object-store-secret-key b171f00be9be4c32c734f4c05fe64c527a8ab5eb823b376cfa8c2531f70fc427
|
||||
```
|
||||
|
||||
**AWS S3 with SSL:**
|
||||
```bash
|
||||
--object-store-endpoint s3.amazonaws.com \
|
||||
--object-store-access-key AKIAIOSFODNN7EXAMPLE \
|
||||
--object-store-secret-key wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY \
|
||||
--object-store-use-ssl \
|
||||
--object-store-region us-east-1
|
||||
```
|
||||
|
||||
## Authentication
|
||||
|
||||
All S3-compatible backends require AWS Signature Version 4 (or v2) authentication:
|
||||
|
||||
- **Access Key** - Public identifier (like username)
|
||||
- **Secret Key** - Private signing key (like password)
|
||||
|
||||
The MinIO Python client handles all signature calculation automatically.
|
||||
|
||||
### Creating Credentials
|
||||
|
||||
**For MinIO:**
|
||||
```bash
|
||||
# Use default credentials or create user via MinIO Console
|
||||
minioadmin / minioadmin
|
||||
```
|
||||
|
||||
**For Ceph RGW:**
|
||||
```bash
|
||||
radosgw-admin user create --uid="trustgraph" --display-name="TrustGraph Service"
|
||||
# Returns access_key and secret_key
|
||||
```
|
||||
|
||||
**For AWS S3:**
|
||||
- Create IAM user with S3 permissions
|
||||
- Generate access key in AWS Console
|
||||
|
||||
## Library Selection: MinIO Python Client
|
||||
|
||||
**Rationale:**
|
||||
- Lightweight (~500KB vs boto3's ~50MB)
|
||||
- S3-compatible - works with any S3 API endpoint
|
||||
- Simpler API than boto3 for basic operations
|
||||
- Already in use, no migration needed
|
||||
- Battle-tested with MinIO and other S3 systems
|
||||
|
||||
## BlobStore Implementation
|
||||
|
||||
**Location:** `trustgraph-flow/trustgraph/librarian/blob_store.py`
|
||||
|
||||
```python
|
||||
from minio import Minio
|
||||
import io
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class BlobStore:
|
||||
"""
|
||||
S3-compatible blob storage for document content.
|
||||
Supports MinIO, Ceph RGW, AWS S3, and other S3-compatible backends.
|
||||
"""
|
||||
|
||||
def __init__(self, endpoint, access_key, secret_key, bucket_name,
|
||||
use_ssl=False, region=None):
|
||||
"""
|
||||
Initialize S3-compatible blob storage.
|
||||
|
||||
Args:
|
||||
endpoint: S3 endpoint (e.g., "minio:9000", "ceph-rgw:7480")
|
||||
access_key: S3 access key
|
||||
secret_key: S3 secret key
|
||||
bucket_name: Bucket name for storage
|
||||
use_ssl: Use HTTPS instead of HTTP (default: False)
|
||||
region: S3 region (optional, e.g., "us-east-1")
|
||||
"""
|
||||
self.client = Minio(
|
||||
endpoint=endpoint,
|
||||
access_key=access_key,
|
||||
secret_key=secret_key,
|
||||
secure=use_ssl,
|
||||
region=region,
|
||||
)
|
||||
|
||||
self.bucket_name = bucket_name
|
||||
|
||||
protocol = "https" if use_ssl else "http"
|
||||
logger.info(f"Connected to S3-compatible storage at {protocol}://{endpoint}")
|
||||
|
||||
self.ensure_bucket()
|
||||
|
||||
def ensure_bucket(self):
|
||||
"""Create bucket if it doesn't exist"""
|
||||
found = self.client.bucket_exists(bucket_name=self.bucket_name)
|
||||
if not found:
|
||||
self.client.make_bucket(bucket_name=self.bucket_name)
|
||||
logger.info(f"Created bucket {self.bucket_name}")
|
||||
else:
|
||||
logger.debug(f"Bucket {self.bucket_name} already exists")
|
||||
|
||||
async def add(self, object_id, blob, kind):
|
||||
"""Store blob in S3-compatible storage"""
|
||||
self.client.put_object(
|
||||
bucket_name=self.bucket_name,
|
||||
object_name=f"doc/{object_id}",
|
||||
length=len(blob),
|
||||
data=io.BytesIO(blob),
|
||||
content_type=kind,
|
||||
)
|
||||
logger.debug("Add blob complete")
|
||||
|
||||
async def remove(self, object_id):
|
||||
"""Delete blob from S3-compatible storage"""
|
||||
self.client.remove_object(
|
||||
bucket_name=self.bucket_name,
|
||||
object_name=f"doc/{object_id}",
|
||||
)
|
||||
logger.debug("Remove blob complete")
|
||||
|
||||
async def get(self, object_id):
|
||||
"""Retrieve blob from S3-compatible storage"""
|
||||
resp = self.client.get_object(
|
||||
bucket_name=self.bucket_name,
|
||||
object_name=f"doc/{object_id}",
|
||||
)
|
||||
return resp.read()
|
||||
```
|
||||
|
||||
## Key Benefits
|
||||
|
||||
1. **No Vendor Lock-in** - Works with any S3-compatible storage
|
||||
2. **Lightweight** - MinIO client is only ~500KB
|
||||
3. **Simple Configuration** - Just endpoint + credentials
|
||||
4. **No Data Migration** - Drop-in replacement between backends
|
||||
5. **Battle-Tested** - MinIO client works with all major S3 implementations
|
||||
|
||||
## Implementation Status
|
||||
|
||||
All code has been updated to use generic S3 parameter names:
|
||||
|
||||
- ✅ `blob_store.py` - Updated to accept `endpoint`, `access_key`, `secret_key`
|
||||
- ✅ `librarian.py` - Updated parameter names
|
||||
- ✅ `service.py` - Updated CLI arguments and configuration
|
||||
- ✅ Documentation updated
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
1. **SSL/TLS Support** - Add `--s3-use-ssl` flag for HTTPS
|
||||
2. **Retry Logic** - Implement exponential backoff for transient failures
|
||||
3. **Presigned URLs** - Generate temporary upload/download URLs
|
||||
4. **Multi-region Support** - Replicate blobs across regions
|
||||
5. **CDN Integration** - Serve blobs via CDN
|
||||
6. **Storage Classes** - Use S3 storage classes for cost optimization
|
||||
7. **Lifecycle Policies** - Automatic archival/deletion
|
||||
8. **Versioning** - Store multiple versions of blobs
|
||||
|
||||
## References
|
||||
|
||||
- MinIO Python Client: https://min.io/docs/minio/linux/developers/python/API.html
|
||||
- Ceph RGW S3 API: https://docs.ceph.com/en/latest/radosgw/s3/
|
||||
- S3 API Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html
|
||||
|
|
@ -14,29 +14,32 @@ class BlobStore:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
minio_host, minio_access_key, minio_secret_key, bucket_name,
|
||||
endpoint, access_key, secret_key, bucket_name,
|
||||
use_ssl=False, region=None,
|
||||
):
|
||||
|
||||
|
||||
self.minio = Minio(
|
||||
endpoint = minio_host,
|
||||
access_key = minio_access_key,
|
||||
secret_key = minio_secret_key,
|
||||
secure = False,
|
||||
self.client = Minio(
|
||||
endpoint = endpoint,
|
||||
access_key = access_key,
|
||||
secret_key = secret_key,
|
||||
secure = use_ssl,
|
||||
region = region,
|
||||
)
|
||||
|
||||
self.bucket_name = bucket_name
|
||||
|
||||
logger.info("Connected to MinIO")
|
||||
protocol = "https" if use_ssl else "http"
|
||||
logger.info(f"Connected to S3-compatible storage at {protocol}://{endpoint}")
|
||||
|
||||
self.ensure_bucket()
|
||||
|
||||
def ensure_bucket(self):
|
||||
|
||||
# Make the bucket if it doesn't exist.
|
||||
found = self.minio.bucket_exists(bucket_name=self.bucket_name)
|
||||
found = self.client.bucket_exists(bucket_name=self.bucket_name)
|
||||
if not found:
|
||||
self.minio.make_bucket(bucket_name=self.bucket_name)
|
||||
self.client.make_bucket(bucket_name=self.bucket_name)
|
||||
logger.info(f"Created bucket {self.bucket_name}")
|
||||
else:
|
||||
logger.debug(f"Bucket {self.bucket_name} already exists")
|
||||
|
|
@ -44,7 +47,7 @@ class BlobStore:
|
|||
async def add(self, object_id, blob, kind):
|
||||
|
||||
# FIXME: Loop retry
|
||||
self.minio.put_object(
|
||||
self.client.put_object(
|
||||
bucket_name = self.bucket_name,
|
||||
object_name = "doc/" + str(object_id),
|
||||
length = len(blob),
|
||||
|
|
@ -57,7 +60,7 @@ class BlobStore:
|
|||
async def remove(self, object_id):
|
||||
|
||||
# FIXME: Loop retry
|
||||
self.minio.remove_object(
|
||||
self.client.remove_object(
|
||||
bucket_name = self.bucket_name,
|
||||
object_name = "doc/" + str(object_id),
|
||||
)
|
||||
|
|
@ -68,7 +71,7 @@ class BlobStore:
|
|||
async def get(self, object_id):
|
||||
|
||||
# FIXME: Loop retry
|
||||
resp = self.minio.get_object(
|
||||
resp = self.client.get_object(
|
||||
bucket_name = self.bucket_name,
|
||||
object_name = "doc/" + str(object_id),
|
||||
)
|
||||
|
|
|
|||
|
|
@ -17,12 +17,14 @@ class Librarian:
|
|||
def __init__(
|
||||
self,
|
||||
cassandra_host, cassandra_username, cassandra_password,
|
||||
minio_host, minio_access_key, minio_secret_key,
|
||||
object_store_endpoint, object_store_access_key, object_store_secret_key,
|
||||
bucket_name, keyspace, load_document,
|
||||
object_store_use_ssl=False, object_store_region=None,
|
||||
):
|
||||
|
||||
self.blob_store = BlobStore(
|
||||
minio_host, minio_access_key, minio_secret_key, bucket_name
|
||||
object_store_endpoint, object_store_access_key, object_store_secret_key, bucket_name,
|
||||
use_ssl=object_store_use_ssl, region=object_store_region,
|
||||
)
|
||||
|
||||
self.table_store = LibraryTableStore(
|
||||
|
|
|
|||
|
|
@ -41,9 +41,11 @@ default_collection_response_queue = collection_response_queue
|
|||
default_config_request_queue = config_request_queue
|
||||
default_config_response_queue = config_response_queue
|
||||
|
||||
default_minio_host = "minio:9000"
|
||||
default_minio_access_key = "minioadmin"
|
||||
default_minio_secret_key = "minioadmin"
|
||||
default_object_store_endpoint = "ceph-rgw:7480"
|
||||
default_object_store_access_key = "object-user"
|
||||
default_object_store_secret_key = "object-password"
|
||||
default_object_store_use_ssl = False
|
||||
default_object_store_region = None
|
||||
default_cassandra_host = "cassandra"
|
||||
|
||||
bucket_name = "library"
|
||||
|
|
@ -80,14 +82,22 @@ class Processor(AsyncProcessor):
|
|||
"config_response_queue", default_config_response_queue
|
||||
)
|
||||
|
||||
minio_host = params.get("minio_host", default_minio_host)
|
||||
minio_access_key = params.get(
|
||||
"minio_access_key",
|
||||
default_minio_access_key
|
||||
object_store_endpoint = params.get("object_store_endpoint", default_object_store_endpoint)
|
||||
object_store_access_key = params.get(
|
||||
"object_store_access_key",
|
||||
default_object_store_access_key
|
||||
)
|
||||
minio_secret_key = params.get(
|
||||
"minio_secret_key",
|
||||
default_minio_secret_key
|
||||
object_store_secret_key = params.get(
|
||||
"object_store_secret_key",
|
||||
default_object_store_secret_key
|
||||
)
|
||||
object_store_use_ssl = params.get(
|
||||
"object_store_use_ssl",
|
||||
default_object_store_use_ssl
|
||||
)
|
||||
object_store_region = params.get(
|
||||
"object_store_region",
|
||||
default_object_store_region
|
||||
)
|
||||
|
||||
cassandra_host = params.get("cassandra_host")
|
||||
|
|
@ -113,8 +123,8 @@ class Processor(AsyncProcessor):
|
|||
"librarian_response_queue": librarian_response_queue,
|
||||
"collection_request_queue": collection_request_queue,
|
||||
"collection_response_queue": collection_response_queue,
|
||||
"minio_host": minio_host,
|
||||
"minio_access_key": minio_access_key,
|
||||
"object_store_endpoint": object_store_endpoint,
|
||||
"object_store_access_key": object_store_access_key,
|
||||
"cassandra_host": self.cassandra_host,
|
||||
"cassandra_username": self.cassandra_username,
|
||||
"cassandra_password": self.cassandra_password,
|
||||
|
|
@ -208,12 +218,14 @@ class Processor(AsyncProcessor):
|
|||
cassandra_host = self.cassandra_host,
|
||||
cassandra_username = self.cassandra_username,
|
||||
cassandra_password = self.cassandra_password,
|
||||
minio_host = minio_host,
|
||||
minio_access_key = minio_access_key,
|
||||
minio_secret_key = minio_secret_key,
|
||||
object_store_endpoint = object_store_endpoint,
|
||||
object_store_access_key = object_store_access_key,
|
||||
object_store_secret_key = object_store_secret_key,
|
||||
bucket_name = bucket_name,
|
||||
keyspace = keyspace,
|
||||
load_document = self.load_document,
|
||||
object_store_use_ssl = object_store_use_ssl,
|
||||
object_store_region = object_store_region,
|
||||
)
|
||||
|
||||
self.collection_manager = CollectionManager(
|
||||
|
|
@ -494,23 +506,36 @@ class Processor(AsyncProcessor):
|
|||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--minio-host',
|
||||
default=default_minio_host,
|
||||
help=f'Minio hostname (default: {default_minio_host})',
|
||||
'--object-store-endpoint',
|
||||
default=default_object_store_endpoint,
|
||||
help=f'Object storage endpoint (default: {default_object_store_endpoint})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--minio-access-key',
|
||||
default='minioadmin',
|
||||
help='Minio access key / username '
|
||||
f'(default: {default_minio_access_key})',
|
||||
'--object-store-access-key',
|
||||
default=default_object_store_access_key,
|
||||
help='Object storage access key / username '
|
||||
f'(default: {default_object_store_access_key})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--minio-secret-key',
|
||||
default='minioadmin',
|
||||
help='Minio secret key / password '
|
||||
f'(default: {default_minio_access_key})',
|
||||
'--object-store-secret-key',
|
||||
default=default_object_store_secret_key,
|
||||
help='Object storage secret key / password '
|
||||
f'(default: {default_object_store_secret_key})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--object-store-use-ssl',
|
||||
action='store_true',
|
||||
default=default_object_store_use_ssl,
|
||||
help=f'Use SSL/TLS for object storage connection (default: {default_object_store_use_ssl})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--object-store-region',
|
||||
default=default_object_store_region,
|
||||
help='Object storage region (optional)',
|
||||
)
|
||||
|
||||
add_cassandra_args(parser)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue