from typing import Any, BinaryIO, Dict, Optional import aioboto3 from botocore.exceptions import ClientError from .base import BaseFileSystem class S3FileSystem(BaseFileSystem): """S3 implementation of the filesystem interface.""" def __init__(self, bucket_name: str, region_name: str = "us-east-1"): """Initialize S3 filesystem. Args: bucket_name: Name of the S3 bucket region_name: AWS region name """ self.bucket_name = bucket_name self.region_name = region_name self.session = aioboto3.Session() async def acreate_file(self, file_path: str, content: BinaryIO) -> bool: try: async with self.session.client( "s3", region_name=self.region_name ) as s3_client: await s3_client.put_object( Bucket=self.bucket_name, Key=file_path, Body=await content.read() ) return True except ClientError: return False async def aupload_file(self, local_path: str, destination_path: str) -> bool: try: async with self.session.client( "s3", region_name=self.region_name ) as s3_client: await s3_client.upload_file( local_path, self.bucket_name, destination_path ) return True except ClientError: return False async def aget_signed_url( self, file_path: str, expiration: int = 3600, force_inline: bool = False, use_internal_endpoint: bool = False, ) -> Optional[str]: """Generate a presigned GET url for the given object. For transcript text files we force the response headers so that the browser renders the content **inline** instead of triggering a file download. We do this by asking S3 to override the content type & disposition on the response. """ try: async with self.session.client( "s3", region_name=self.region_name ) as s3_client: params = {"Bucket": self.bucket_name, "Key": file_path} # Make artifacts viewable inline in the browser when requested if force_inline: if file_path.endswith(".txt"): params.update( { "ResponseContentType": "text/plain", "ResponseContentDisposition": "inline", } ) elif file_path.endswith(".wav"): params.update( { "ResponseContentType": "audio/wav", "ResponseContentDisposition": "inline", } ) elif file_path.endswith(".mp3"): params.update( { "ResponseContentType": "audio/mpeg", "ResponseContentDisposition": "inline", } ) url = await s3_client.generate_presigned_url( "get_object", Params=params, ExpiresIn=expiration, ) return url except ClientError: return None async def aget_file_metadata(self, file_path: str) -> Optional[Dict[str, Any]]: """Get S3 object metadata.""" try: async with self.session.client( "s3", region_name=self.region_name ) as s3_client: response = await s3_client.head_object( Bucket=self.bucket_name, Key=file_path ) return { "size": response.get("ContentLength"), "created_at": response.get("LastModified"), "modified_at": response.get("LastModified"), "etag": response.get("ETag", "").strip('"'), "content_type": response.get("ContentType"), "storage_class": response.get("StorageClass"), } except ClientError: return None async def aget_presigned_put_url( self, file_path: str, expiration: int = 900, content_type: str = "text/csv", max_size: int = 10_485_760, ) -> Optional[str]: """Generate a presigned PUT URL for direct file upload.""" try: async with self.session.client( "s3", region_name=self.region_name ) as s3_client: url = await s3_client.generate_presigned_url( "put_object", Params={ "Bucket": self.bucket_name, "Key": file_path, "ContentType": content_type, }, ExpiresIn=expiration, ) return url except ClientError: return None async def adownload_file(self, source_path: str, local_path: str) -> bool: """Download a file from S3 to local path.""" try: async with self.session.client( "s3", region_name=self.region_name ) as s3_client: await s3_client.download_file(self.bucket_name, source_path, local_path) return True except ClientError: return False async def acopy_file(self, source_path: str, destination_path: str) -> bool: """Copy a file within S3 (server-side copy).""" try: async with self.session.client( "s3", region_name=self.region_name ) as s3_client: await s3_client.copy_object( Bucket=self.bucket_name, Key=destination_path, CopySource={"Bucket": self.bucket_name, "Key": source_path}, ) return True except ClientError: return False