feat: implement retry logic and exponential backoff for S3 operations (#829)

* feat: implement retry logic and exponential backoff for S3 operations

* test: fix librarian mocks after BlobStore async conversion
This commit is contained in:
Het Patel 2026-04-18 16:35:19 +05:30 committed by Cyber MacGeddon
parent cce3acd84f
commit adea976203
4 changed files with 179 additions and 21 deletions

View file

@ -4,11 +4,12 @@ from .. exceptions import RequestError
from minio import Minio
from minio.datatypes import Part
import time
from minio.error import S3Error
import io
import logging
from typing import Iterator, List, Tuple
from uuid import UUID
import asyncio
# Module logger
logger = logging.getLogger(__name__)
@ -35,8 +36,36 @@ class BlobStore:
protocol = "https" if use_ssl else "http"
logger.info(f"Connected to S3-compatible storage at {protocol}://{endpoint}")
# Retry and Exponential delay configuration
self.max_retries = 8
self.base_delay = 0.25
self.ensure_bucket()
async def _with_retry(self, operation, *args, **kwargs):
"""Execute a minio operation with exponential backoff retry."""
last_exception = None
for attempt in range(self.max_retries):
try:
# Run the synchronous minio call in the default executor to avoid blocking
return await asyncio.get_event_loop().run_in_executor(
None, lambda: operation(*args, **kwargs)
)
except (S3Error, Exception) as e:
last_exception = e
if attempt < self.max_retries - 1:
delay = self.base_delay * (2 ** attempt)
logger.warning(
f"S3 operation failed: {e}. "
f"Retrying in {delay}s... (Attempt {attempt + 1}/{self.max_retries})"
)
await asyncio.sleep(delay)
else:
logger.error(f"S3 operation failed after {self.max_retries} attempts: {e}")
if last_exception:
raise last_exception
def ensure_bucket(self):
# Make the bucket if it doesn't exist.
@ -49,8 +78,8 @@ class BlobStore:
async def add(self, object_id, blob, kind):
# FIXME: Loop retry
self.client.put_object(
await self._with_retry(
self.client.put_object,
bucket_name = self.bucket_name,
object_name = "doc/" + str(object_id),
length = len(blob),
@ -62,8 +91,8 @@ class BlobStore:
async def remove(self, object_id):
# FIXME: Loop retry
self.client.remove_object(
await self._with_retry(
self.client.remove_object,
bucket_name = self.bucket_name,
object_name = "doc/" + str(object_id),
)
@ -73,8 +102,8 @@ class BlobStore:
async def get(self, object_id):
# FIXME: Loop retry
resp = self.client.get_object(
resp = await self._with_retry(
self.client.get_object,
bucket_name = self.bucket_name,
object_name = "doc/" + str(object_id),
)
@ -83,7 +112,8 @@ class BlobStore:
async def get_range(self, object_id, offset: int, length: int) -> bytes:
"""Fetch a specific byte range from an object."""
resp = self.client.get_object(
resp = await self._with_retry(
self.client.get_object,
bucket_name=self.bucket_name,
object_name="doc/" + str(object_id),
offset=offset,
@ -97,7 +127,8 @@ class BlobStore:
async def get_size(self, object_id) -> int:
"""Get the size of an object without downloading it."""
stat = self.client.stat_object(
stat = await self._with_retry(
self.client.stat_object,
bucket_name=self.bucket_name,
object_name="doc/" + str(object_id),
)
@ -134,7 +165,7 @@ class BlobStore:
logger.debug("Stream complete")
def create_multipart_upload(self, object_id: UUID, kind: str) -> str:
async def create_multipart_upload(self, object_id: UUID, kind: str) -> str:
"""
Initialize a multipart upload.
@ -148,7 +179,8 @@ class BlobStore:
object_name = "doc/" + str(object_id)
# Use minio's internal method to create multipart upload
upload_id = self.client._create_multipart_upload(
upload_id = await self._with_retry(
self.client._create_multipart_upload,
bucket_name=self.bucket_name,
object_name=object_name,
headers={"Content-Type": kind},
@ -157,7 +189,7 @@ class BlobStore:
logger.info(f"Created multipart upload {upload_id} for {object_id}")
return upload_id
def upload_part(
async def upload_part(
self,
object_id: UUID,
upload_id: str,
@ -178,7 +210,8 @@ class BlobStore:
"""
object_name = "doc/" + str(object_id)
etag = self.client._upload_part(
etag = await self._with_retry(
self.client._upload_part,
bucket_name=self.bucket_name,
object_name=object_name,
data=data,
@ -190,7 +223,7 @@ class BlobStore:
logger.debug(f"Uploaded part {part_number} for {object_id}, etag={etag}")
return etag
def complete_multipart_upload(
async def complete_multipart_upload(
self,
object_id: UUID,
upload_id: str,
@ -214,7 +247,8 @@ class BlobStore:
for part_number, etag in parts
]
self.client._complete_multipart_upload(
await self._with_retry(
self.client._complete_multipart_upload,
bucket_name=self.bucket_name,
object_name=object_name,
upload_id=upload_id,
@ -223,7 +257,7 @@ class BlobStore:
logger.info(f"Completed multipart upload for {object_id}")
def abort_multipart_upload(self, object_id: UUID, upload_id: str) -> None:
async def abort_multipart_upload(self, object_id: UUID, upload_id: str) -> None:
"""
Abort a multipart upload, cleaning up any uploaded parts.
@ -233,7 +267,8 @@ class BlobStore:
"""
object_name = "doc/" + str(object_id)
self.client._abort_multipart_upload(
await self._with_retry(
self.client._abort_multipart_upload,
bucket_name=self.bucket_name,
object_name=object_name,
upload_id=upload_id,

View file

@ -301,7 +301,7 @@ class Librarian:
object_id = uuid.uuid4()
# Create S3 multipart upload
s3_upload_id = self.blob_store.create_multipart_upload(
s3_upload_id = await self.blob_store.create_multipart_upload(
object_id, request.document_metadata.kind
)
@ -367,7 +367,7 @@ class Librarian:
# Upload to S3 (part numbers are 1-indexed in S3)
part_number = request.chunk_index + 1
etag = self.blob_store.upload_part(
etag = await self.blob_store.upload_part(
object_id=session["object_id"],
upload_id=session["s3_upload_id"],
part_number=part_number,
@ -440,7 +440,7 @@ class Librarian:
]
# Complete S3 multipart upload
self.blob_store.complete_multipart_upload(
await self.blob_store.complete_multipart_upload(
object_id=session["object_id"],
upload_id=session["s3_upload_id"],
parts=parts,
@ -492,7 +492,7 @@ class Librarian:
raise RequestError("Not authorized to abort this upload")
# Abort S3 multipart upload
self.blob_store.abort_multipart_upload(
await self.blob_store.abort_multipart_upload(
object_id=session["object_id"],
upload_id=session["s3_upload_id"],
)