don't use S3 CreateBucket and clean up
Docker server image / build-and-push (push) Successful in 1m6s
Docker server image / build-and-push (push) Successful in 1m6s
This commit is contained in:
@@ -46,6 +46,7 @@ def create_app() -> ConversionServiceASGIApplication:
|
||||
_configure_application_logging()
|
||||
config = load_server_config()
|
||||
store = S3Store(
|
||||
bucket=config.s3_bucket,
|
||||
endpoint=config.s3_endpoint,
|
||||
access_key=config.s3_access_key,
|
||||
secret_key=config.s3_secret_key,
|
||||
@@ -58,10 +59,10 @@ def create_app() -> ConversionServiceASGIApplication:
|
||||
store.enable_http_trace(sys.stderr)
|
||||
logger.warning("OFFICECONVERT_S3_TRACE enabled: S3 HTTP dumps on stderr")
|
||||
try:
|
||||
store.ensure_bucket(config.s3_bucket)
|
||||
store.require_bucket()
|
||||
except S3Error as exc:
|
||||
log_s3_error(
|
||||
"ensure_bucket",
|
||||
"require_bucket",
|
||||
endpoint=config.s3_endpoint,
|
||||
secure=config.s3_secure,
|
||||
exc=exc,
|
||||
|
||||
@@ -128,7 +128,6 @@ class ConversionServiceImpl(conversion_connect.ConversionService):
|
||||
|
||||
try:
|
||||
upload_url = self._store.presigned_put_url(
|
||||
self._config.s3_bucket,
|
||||
upload_key,
|
||||
ttl_seconds=self._config.s3_session_ttl_seconds,
|
||||
)
|
||||
@@ -158,7 +157,6 @@ class ConversionServiceImpl(conversion_connect.ConversionService):
|
||||
|
||||
return conversion_pb2.CreateConversionResponse(
|
||||
conversion_id=conversion_id,
|
||||
upload_bucket=self._config.s3_bucket,
|
||||
upload_object_key=upload_key,
|
||||
upload_url=upload_url,
|
||||
expires_at=_to_timestamp(expires_at),
|
||||
@@ -257,7 +255,6 @@ class ConversionServiceImpl(conversion_connect.ConversionService):
|
||||
await self._cleanup_local_artifacts(session)
|
||||
await asyncio.to_thread(
|
||||
self._store.remove_prefix,
|
||||
self._config.s3_bucket,
|
||||
session.object_prefix,
|
||||
)
|
||||
return conversion_pb2.DeleteConversionResponse(
|
||||
@@ -289,7 +286,6 @@ class ConversionServiceImpl(conversion_connect.ConversionService):
|
||||
try:
|
||||
await asyncio.to_thread(
|
||||
self._store.fget_object,
|
||||
self._config.s3_bucket,
|
||||
session.upload_object_key,
|
||||
source_path,
|
||||
)
|
||||
@@ -433,9 +429,8 @@ class ConversionServiceImpl(conversion_connect.ConversionService):
|
||||
object_key = (
|
||||
f"{session.object_prefix}output/slide-{slide.index:04d}{slide.image_path.suffix}"
|
||||
)
|
||||
self._store.fput_object(self._config.s3_bucket, object_key, slide.image_path)
|
||||
self._store.fput_object(object_key, slide.image_path)
|
||||
image_url = self._store.presigned_get_url(
|
||||
self._config.s3_bucket,
|
||||
object_key,
|
||||
ttl_seconds=self._config.s3_session_ttl_seconds,
|
||||
)
|
||||
@@ -447,12 +442,10 @@ class ConversionServiceImpl(conversion_connect.ConversionService):
|
||||
f"{slide.thumbnail_path.suffix}"
|
||||
)
|
||||
self._store.fput_object(
|
||||
self._config.s3_bucket,
|
||||
thumbnail_object_key,
|
||||
slide.thumbnail_path,
|
||||
)
|
||||
thumbnail_image_url = self._store.presigned_get_url(
|
||||
self._config.s3_bucket,
|
||||
thumbnail_object_key,
|
||||
ttl_seconds=self._config.s3_session_ttl_seconds,
|
||||
)
|
||||
@@ -514,7 +507,6 @@ class ConversionServiceImpl(conversion_connect.ConversionService):
|
||||
await asyncio.sleep(self._config.conversion_cleanup_delay_seconds)
|
||||
await asyncio.to_thread(
|
||||
self._store.remove_prefix,
|
||||
self._config.s3_bucket,
|
||||
session.object_prefix,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
|
||||
@@ -6,7 +6,6 @@ import logging
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
from typing import TextIO
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from minio import Minio
|
||||
from minio.deleteobjects import DeleteObject
|
||||
@@ -54,6 +53,7 @@ class S3Store:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
bucket: str,
|
||||
endpoint: str,
|
||||
access_key: str,
|
||||
secret_key: str,
|
||||
@@ -63,6 +63,7 @@ class S3Store:
|
||||
public_secure: bool,
|
||||
) -> None:
|
||||
"""Initialize S3 clients for internal and public URL generation."""
|
||||
self._bucket = bucket
|
||||
self._client = Minio(
|
||||
endpoint,
|
||||
access_key=access_key,
|
||||
@@ -83,55 +84,44 @@ class S3Store:
|
||||
self._client.trace_on(stream)
|
||||
self._public_client.trace_on(stream)
|
||||
|
||||
def ensure_bucket(self, bucket_name: str) -> None:
|
||||
"""Create a bucket if it does not already exist.
|
||||
def require_bucket(self) -> None:
|
||||
"""Verify the configured bucket exists before serving traffic."""
|
||||
if not self._client.bucket_exists(self._bucket):
|
||||
raise RuntimeError(
|
||||
f"S3 bucket {self._bucket!r} does not exist; create it before starting the server"
|
||||
)
|
||||
|
||||
Tries CreateBucket first (idempotent on SeaweedFS and when the caller
|
||||
owns the bucket). AWS production IAM often grants object access only on
|
||||
a pre-provisioned bucket; in that case CreateBucket returns
|
||||
AccessDenied even though HeadBucket succeeds.
|
||||
"""
|
||||
try:
|
||||
self._client.make_bucket(bucket_name)
|
||||
except S3Error as exc:
|
||||
if exc.code in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
|
||||
return
|
||||
if exc.code in ("AccessDenied", "Forbidden"):
|
||||
if self._client.bucket_exists(bucket_name):
|
||||
return
|
||||
raise
|
||||
|
||||
def presigned_put_url(self, bucket_name: str, object_key: str, *, ttl_seconds: int) -> str:
|
||||
def presigned_put_url(self, object_key: str, *, ttl_seconds: int) -> str:
|
||||
"""Generate a presigned PUT URL for a single object upload."""
|
||||
return self._public_client.presigned_put_object(
|
||||
bucket_name,
|
||||
self._bucket,
|
||||
object_key,
|
||||
expires=timedelta(seconds=ttl_seconds),
|
||||
)
|
||||
|
||||
def presigned_get_url(self, bucket_name: str, object_key: str, *, ttl_seconds: int) -> str:
|
||||
def presigned_get_url(self, object_key: str, *, ttl_seconds: int) -> str:
|
||||
"""Generate a presigned GET URL for downloading one object."""
|
||||
return self._public_client.presigned_get_object(
|
||||
bucket_name,
|
||||
self._bucket,
|
||||
object_key,
|
||||
expires=timedelta(seconds=ttl_seconds),
|
||||
)
|
||||
|
||||
def fget_object(self, bucket_name: str, object_key: str, output_path: Path) -> None:
|
||||
def fget_object(self, object_key: str, output_path: Path) -> None:
|
||||
"""Download one object from storage to a local filesystem path."""
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._client.fget_object(bucket_name, object_key, str(output_path))
|
||||
self._client.fget_object(self._bucket, object_key, str(output_path))
|
||||
|
||||
def fput_object(self, bucket_name: str, object_key: str, source_path: Path) -> None:
|
||||
def fput_object(self, object_key: str, source_path: Path) -> None:
|
||||
"""Upload one local filesystem object to storage."""
|
||||
self._client.fput_object(bucket_name, object_key, str(source_path))
|
||||
self._client.fput_object(self._bucket, object_key, str(source_path))
|
||||
|
||||
def remove_prefix(self, bucket_name: str, prefix: str) -> None:
|
||||
"""Remove all objects under a key prefix within a bucket."""
|
||||
def remove_prefix(self, prefix: str) -> None:
|
||||
"""Remove all objects under a key prefix within the configured bucket."""
|
||||
normalized_prefix = prefix if prefix.endswith("/") else f"{prefix}/"
|
||||
objects = list(
|
||||
self._client.list_objects(
|
||||
bucket_name,
|
||||
self._bucket,
|
||||
prefix=normalized_prefix,
|
||||
recursive=True,
|
||||
)
|
||||
@@ -149,7 +139,7 @@ class S3Store:
|
||||
delete_requests.append(DeleteObject(object_name))
|
||||
|
||||
errors = self._client.remove_objects(
|
||||
bucket_name,
|
||||
self._bucket,
|
||||
delete_requests,
|
||||
)
|
||||
for err in errors:
|
||||
@@ -158,10 +148,3 @@ class S3Store:
|
||||
raise RuntimeError(
|
||||
f"failed to delete object {object_name}: {message}"
|
||||
)
|
||||
|
||||
|
||||
def object_key_from_presigned_url(url: str) -> str:
|
||||
"""Extract object key from a presigned URL path for diagnostics."""
|
||||
path = urlparse(url).path
|
||||
path_parts = [part for part in path.split("/") if part]
|
||||
return "/".join(path_parts[1:]) if len(path_parts) >= 2 else ""
|
||||
|
||||
Reference in New Issue
Block a user