mvp implementation
This commit is contained in:
@@ -0,0 +1,269 @@
|
||||
"""Connect service implementation for conversion request orchestration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import tempfile
|
||||
import uuid
|
||||
|
||||
from connectrpc.code import Code
|
||||
from connectrpc.errors import ConnectError
|
||||
from connectrpc.request import RequestContext
|
||||
from google.protobuf.timestamp_pb2 import Timestamp
|
||||
from officeconvert import SlideArtifact, convert_pptx_to_slidedeck
|
||||
from officeconvertapi.v1 import conversion_connect, conversion_pb2
|
||||
|
||||
from officeconvert_server.config import ServerConfig
|
||||
from officeconvert_server.models import ConversionSession, utc_now
|
||||
from officeconvert_server.storage import MinIOStore
|
||||
|
||||
|
||||
class ConversionServiceImpl(conversion_connect.ConversionService):
|
||||
"""Implements the conversion API with in-memory state and MinIO orchestration."""
|
||||
|
||||
def __init__(self, config: ServerConfig, store: MinIOStore) -> None:
|
||||
"""Initialize service with runtime config and storage adapter."""
|
||||
self._config = config
|
||||
self._store = store
|
||||
self._sessions: dict[str, ConversionSession] = {}
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def create_conversion(
|
||||
self,
|
||||
request: conversion_pb2.CreateConversionRequest,
|
||||
ctx: RequestContext,
|
||||
) -> conversion_pb2.CreateConversionResponse:
|
||||
"""Create a new conversion session and return upload credentials."""
|
||||
del ctx
|
||||
source_filename = request.source_filename.strip()
|
||||
if not source_filename:
|
||||
raise ConnectError(Code.INVALID_ARGUMENT, "source_filename is required")
|
||||
if not source_filename.lower().endswith(".pptx"):
|
||||
raise ConnectError(Code.INVALID_ARGUMENT, "only .pptx input is supported")
|
||||
|
||||
conversion_id = str(uuid.uuid4())
|
||||
bucket_name = f"oc-{conversion_id}"
|
||||
upload_key = "input/source.pptx"
|
||||
expires_at = utc_now() + timedelta(seconds=self._config.minio_session_ttl_seconds)
|
||||
|
||||
self._store.ensure_bucket(bucket_name)
|
||||
upload_url = self._store.presigned_put_url(
|
||||
bucket_name,
|
||||
upload_key,
|
||||
ttl_seconds=self._config.minio_session_ttl_seconds,
|
||||
)
|
||||
|
||||
session = ConversionSession(
|
||||
conversion_id=conversion_id,
|
||||
source_filename=source_filename,
|
||||
bucket_name=bucket_name,
|
||||
upload_object_key=upload_key,
|
||||
status=conversion_pb2.CONVERSION_STATUS_PENDING,
|
||||
)
|
||||
async with self._lock:
|
||||
self._sessions[conversion_id] = session
|
||||
|
||||
return conversion_pb2.CreateConversionResponse(
|
||||
conversion_id=conversion_id,
|
||||
upload_bucket=bucket_name,
|
||||
upload_object_key=upload_key,
|
||||
upload_url=upload_url,
|
||||
expires_at=_to_timestamp(expires_at),
|
||||
)
|
||||
|
||||
async def start_conversion(
|
||||
self,
|
||||
request: conversion_pb2.StartConversionRequest,
|
||||
ctx: RequestContext,
|
||||
) -> conversion_pb2.StartConversionResponse:
|
||||
"""Start asynchronous conversion for an already-uploaded session payload."""
|
||||
del ctx
|
||||
session = await self._get_session(request.conversion_id)
|
||||
async with self._lock:
|
||||
if session.status == conversion_pb2.CONVERSION_STATUS_RUNNING:
|
||||
return conversion_pb2.StartConversionResponse(
|
||||
conversion_id=session.conversion_id,
|
||||
status=session.status,
|
||||
)
|
||||
if session.status in (
|
||||
conversion_pb2.CONVERSION_STATUS_FAILED,
|
||||
conversion_pb2.CONVERSION_STATUS_SUCCEEDED,
|
||||
):
|
||||
raise ConnectError(
|
||||
Code.FAILED_PRECONDITION,
|
||||
"conversion has already completed",
|
||||
)
|
||||
|
||||
session.status = conversion_pb2.CONVERSION_STATUS_RUNNING
|
||||
session.updated_at = utc_now()
|
||||
session.conversion_task = asyncio.create_task(self._run_conversion(session))
|
||||
|
||||
return conversion_pb2.StartConversionResponse(
|
||||
conversion_id=session.conversion_id,
|
||||
status=session.status,
|
||||
)
|
||||
|
||||
async def get_conversion_status(
|
||||
self,
|
||||
request: conversion_pb2.GetConversionStatusRequest,
|
||||
ctx: RequestContext,
|
||||
) -> conversion_pb2.GetConversionStatusResponse:
|
||||
"""Return current conversion status and optional error details."""
|
||||
del ctx
|
||||
session = await self._get_session(request.conversion_id)
|
||||
return conversion_pb2.GetConversionStatusResponse(
|
||||
conversion_id=session.conversion_id,
|
||||
status=session.status,
|
||||
error_message=session.error_message,
|
||||
updated_at=_to_timestamp(session.updated_at),
|
||||
)
|
||||
|
||||
async def get_slide_deck(
|
||||
self,
|
||||
request: conversion_pb2.GetSlideDeckRequest,
|
||||
ctx: RequestContext,
|
||||
) -> conversion_pb2.GetSlideDeckResponse:
|
||||
"""Return the finished slide deck once conversion succeeds."""
|
||||
del ctx
|
||||
session = await self._get_session(request.conversion_id)
|
||||
if session.status == conversion_pb2.CONVERSION_STATUS_FAILED:
|
||||
raise ConnectError(Code.FAILED_PRECONDITION, session.error_message)
|
||||
if session.status != conversion_pb2.CONVERSION_STATUS_SUCCEEDED:
|
||||
raise ConnectError(Code.FAILED_PRECONDITION, "conversion is not finished yet")
|
||||
if session.slide_deck is None:
|
||||
raise ConnectError(Code.INTERNAL, "slide deck missing from successful session")
|
||||
|
||||
return conversion_pb2.GetSlideDeckResponse(slide_deck=session.slide_deck)
|
||||
|
||||
async def delete_conversion(
|
||||
self,
|
||||
request: conversion_pb2.DeleteConversionRequest,
|
||||
ctx: RequestContext,
|
||||
) -> conversion_pb2.DeleteConversionResponse:
|
||||
"""Delete a conversion session and associated MinIO/local artifacts."""
|
||||
del ctx
|
||||
async with self._lock:
|
||||
session = self._sessions.pop(request.conversion_id, None)
|
||||
if session is None:
|
||||
return conversion_pb2.DeleteConversionResponse(
|
||||
conversion_id=request.conversion_id,
|
||||
deleted=False,
|
||||
)
|
||||
|
||||
if session.cleanup_task is not None:
|
||||
session.cleanup_task.cancel()
|
||||
if session.conversion_task is not None and not session.conversion_task.done():
|
||||
session.conversion_task.cancel()
|
||||
await self._cleanup_local_artifacts(session)
|
||||
await asyncio.to_thread(self._store.remove_bucket_tree, session.bucket_name)
|
||||
return conversion_pb2.DeleteConversionResponse(
|
||||
conversion_id=session.conversion_id,
|
||||
deleted=True,
|
||||
)
|
||||
|
||||
async def _run_conversion(self, session: ConversionSession) -> None:
|
||||
"""Execute conversion flow and persist terminal state in memory."""
|
||||
work_dir = Path(
|
||||
tempfile.mkdtemp(prefix=f"officeconvert-{session.conversion_id}-")
|
||||
).resolve()
|
||||
session.work_dir = work_dir
|
||||
source_path = work_dir / "input.pptx"
|
||||
try:
|
||||
await asyncio.to_thread(
|
||||
self._store.fget_object,
|
||||
session.bucket_name,
|
||||
session.upload_object_key,
|
||||
source_path,
|
||||
)
|
||||
result = await asyncio.to_thread(
|
||||
convert_pptx_to_slidedeck,
|
||||
source_path,
|
||||
work_dir,
|
||||
)
|
||||
session.slide_deck = await asyncio.to_thread(
|
||||
self._upload_and_build_slide_deck,
|
||||
session,
|
||||
result.slides,
|
||||
result.source_filename,
|
||||
)
|
||||
session.status = conversion_pb2.CONVERSION_STATUS_SUCCEEDED
|
||||
session.updated_at = utc_now()
|
||||
except asyncio.CancelledError:
|
||||
session.status = conversion_pb2.CONVERSION_STATUS_FAILED
|
||||
session.error_message = "conversion cancelled"
|
||||
session.updated_at = utc_now()
|
||||
raise
|
||||
except Exception as exc:
|
||||
session.status = conversion_pb2.CONVERSION_STATUS_FAILED
|
||||
session.error_message = str(exc)
|
||||
session.updated_at = utc_now()
|
||||
finally:
|
||||
await self._cleanup_local_artifacts(session)
|
||||
session.cleanup_task = asyncio.create_task(self._delayed_cleanup(session))
|
||||
|
||||
def _upload_and_build_slide_deck(
|
||||
self,
|
||||
session: ConversionSession,
|
||||
slides: list[SlideArtifact],
|
||||
source_filename: str,
|
||||
) -> conversion_pb2.SlideDeck:
|
||||
"""Upload generated slide images and construct API response payload."""
|
||||
response_slides: list[conversion_pb2.Slide] = []
|
||||
for slide in slides:
|
||||
object_key = f"output/slide-{slide.index:04d}{slide.image_path.suffix}"
|
||||
self._store.fput_object(session.bucket_name, object_key, slide.image_path)
|
||||
image_url = self._store.presigned_get_url(
|
||||
session.bucket_name,
|
||||
object_key,
|
||||
ttl_seconds=self._config.minio_session_ttl_seconds,
|
||||
)
|
||||
response_slides.append(
|
||||
conversion_pb2.Slide(
|
||||
index=slide.index,
|
||||
notes_plain=slide.notes_plain,
|
||||
image_url=image_url,
|
||||
)
|
||||
)
|
||||
|
||||
return conversion_pb2.SlideDeck(
|
||||
conversion_id=session.conversion_id,
|
||||
source_filename=source_filename,
|
||||
slides=response_slides,
|
||||
created_at=_to_timestamp(utc_now()),
|
||||
)
|
||||
|
||||
async def _delayed_cleanup(self, session: ConversionSession) -> None:
|
||||
"""Delete storage resources after the configured session retention period."""
|
||||
try:
|
||||
await asyncio.sleep(self._config.conversion_cleanup_delay_seconds)
|
||||
await asyncio.to_thread(self._store.remove_bucket_tree, session.bucket_name)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
finally:
|
||||
async with self._lock:
|
||||
self._sessions.pop(session.conversion_id, None)
|
||||
|
||||
async def _cleanup_local_artifacts(self, session: ConversionSession) -> None:
|
||||
"""Delete temporary local files for a session if they still exist."""
|
||||
if session.work_dir is not None and session.work_dir.exists():
|
||||
await asyncio.to_thread(shutil.rmtree, session.work_dir, True)
|
||||
session.work_dir = None
|
||||
|
||||
async def _get_session(self, conversion_id: str) -> ConversionSession:
|
||||
"""Return an existing session or raise a NOT_FOUND error."""
|
||||
async with self._lock:
|
||||
session = self._sessions.get(conversion_id)
|
||||
if session is None:
|
||||
raise ConnectError(Code.NOT_FOUND, "conversion_id not found")
|
||||
return session
|
||||
|
||||
|
||||
def _to_timestamp(value: datetime) -> Timestamp:
|
||||
"""Convert a timezone-aware datetime to protobuf Timestamp."""
|
||||
normalized = value.astimezone(timezone.utc)
|
||||
proto = Timestamp()
|
||||
proto.FromDatetime(normalized)
|
||||
return proto
|
||||
Reference in New Issue
Block a user