dograh/api/services/filesystem/minio.py
2025-09-20 14:07:00 +05:30

130 lines
4.6 KiB
Python

import asyncio
import json
from typing import Any, BinaryIO, Dict, Optional
from loguru import logger
from minio import Minio
from minio.error import S3Error
from .base import BaseFileSystem
class MinioFileSystem(BaseFileSystem):
"""MinIO implementation of the filesystem interface for OSS users.
Handles both internal (container-to-container) and external (browser) access:
- endpoint: Used for API operations (uploads, downloads from code)
- public_endpoint: Used for generating browser-accessible presigned URLs
Auto-detection logic:
1. If MINIO_PUBLIC_ENDPOINT env var is set, use it (for production/custom domains)
2. If endpoint is "minio:9000" (Docker internal), auto-use "localhost:9000" for browser
3. Otherwise, endpoint works for both (e.g., "localhost:9000" in local non-Docker setup)
"""
def __init__(
self,
endpoint: str = "localhost:9000",
access_key: str = "minioadmin",
secret_key: str = "minioadmin",
bucket_name: str = "voice-audio",
secure: bool = False,
public_endpoint: Optional[str] = None,
):
self.bucket_name = bucket_name
self.endpoint = endpoint
self.public_endpoint = public_endpoint or endpoint
self.secure = secure
self.access_key = access_key
self.secret_key = secret_key
# Client for internal operations (uploads, etc.)
self.client = Minio(
endpoint, access_key=access_key, secret_key=secret_key, secure=secure
)
# Ensure bucket exists and configure anonymous access (using internal client)
try:
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
# Set anonymous download policy for local development
# This allows unsigned URLs to work
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{self.bucket_name}/*"],
}
],
}
self.client.set_bucket_policy(self.bucket_name, json.dumps(policy))
except Exception as e:
# Bucket might already exist or we might be in a restricted environment
logger.debug(f"Bucket setup note: {e}")
pass
async def acreate_file(self, file_path: str, content: BinaryIO) -> bool:
try:
data = await content.read()
def _put():
self.client.put_object(
self.bucket_name,
file_path,
data=bytes(data),
length=len(data),
)
await asyncio.to_thread(_put)
return True
except S3Error:
return False
async def aupload_file(self, local_path: str, destination_path: str) -> bool:
try:
def _fput():
self.client.fput_object(self.bucket_name, destination_path, local_path)
await asyncio.to_thread(_fput)
return True
except S3Error:
return False
async def aget_signed_url(
self, file_path: str, expiration: int = 3600, force_inline: bool = False
) -> Optional[str]:
try:
# For MinIO in local development, return unsigned URLs
# This avoids signature mismatch issues when endpoint differs
# MinIO must be configured to allow anonymous read access
protocol = "https" if self.secure else "http"
url = f"{protocol}://{self.public_endpoint}/{self.bucket_name}/{file_path}"
return url
except Exception as e:
logger.error(f"Error generating MinIO URL: {e}")
return None
async def aget_file_metadata(self, file_path: str) -> Optional[Dict[str, Any]]:
"""Get MinIO object metadata."""
try:
def _stat():
return self.client.stat_object(self.bucket_name, file_path)
stat = await asyncio.to_thread(_stat)
return {
"size": stat.size,
"created_at": stat.last_modified,
"modified_at": stat.last_modified,
"etag": stat.etag.strip('"') if stat.etag else None,
"content_type": stat.content_type,
"storage_class": None, # MinIO doesn't have storage classes like S3
}
except S3Error:
return None