"""Connect service implementation for conversion request orchestration.""" from __future__ import annotations import asyncio from collections.abc import Callable from datetime import datetime, timedelta, timezone import logging from pathlib import Path import shutil import tempfile import time import uuid from connectrpc.code import Code from connectrpc.errors import ConnectError from connectrpc.request import RequestContext from google.protobuf.timestamp_pb2 import Timestamp from officeconvert import SlideArtifact, convert_pptx_to_slidedeck from officeconvert.conversion import ( ConversionTimeoutError, PHASE_EXTRACTING_NOTES, PHASE_PDF_TO_IMAGES, PHASE_PPTX_TO_PDF, ) from officeconvertapi.v1 import conversion_connect, conversion_pb2 from officeconvert_server.config import ServerConfig from officeconvert_server.models import ConversionSession, utc_now from officeconvert_server.storage import S3Store logger = logging.getLogger("uvicorn.error") class ConversionServiceImpl(conversion_connect.ConversionService): """Implements the conversion API with in-memory state and S3 orchestration.""" def __init__(self, config: ServerConfig, store: S3Store) -> None: """Initialize service with runtime config and storage adapter.""" self._config = config self._store = store self._sessions: dict[str, ConversionSession] = {} self._lock = asyncio.Lock() async def create_conversion( self, request: conversion_pb2.CreateConversionRequest, ctx: RequestContext, ) -> conversion_pb2.CreateConversionResponse: """Create a new conversion session and return upload credentials.""" del ctx source_filename = request.source_filename.strip() if not source_filename: raise ConnectError(Code.INVALID_ARGUMENT, "source_filename is required") if not source_filename.lower().endswith(".pptx"): raise ConnectError(Code.INVALID_ARGUMENT, "only .pptx input is supported") conversion_id = str(uuid.uuid4()) bucket_name = f"oc-{conversion_id}" upload_key = "input/source.pptx" expires_at = utc_now() + timedelta(seconds=self._config.s3_session_ttl_seconds) self._store.ensure_bucket(bucket_name) upload_url = self._store.presigned_put_url( bucket_name, upload_key, ttl_seconds=self._config.s3_session_ttl_seconds, ) session = ConversionSession( conversion_id=conversion_id, source_filename=source_filename, bucket_name=bucket_name, upload_object_key=upload_key, status=conversion_pb2.CONVERSION_STATUS_PENDING, ) async with self._lock: self._sessions[conversion_id] = session return conversion_pb2.CreateConversionResponse( conversion_id=conversion_id, upload_bucket=bucket_name, upload_object_key=upload_key, upload_url=upload_url, expires_at=_to_timestamp(expires_at), ) async def start_conversion( self, request: conversion_pb2.StartConversionRequest, ctx: RequestContext, ) -> conversion_pb2.StartConversionResponse: """Start asynchronous conversion for an already-uploaded session payload.""" del ctx session = await self._get_session(request.conversion_id) async with self._lock: if session.status == conversion_pb2.CONVERSION_STATUS_RUNNING: return conversion_pb2.StartConversionResponse( conversion_id=session.conversion_id, status=session.status, ) if session.status in ( conversion_pb2.CONVERSION_STATUS_FAILED, conversion_pb2.CONVERSION_STATUS_SUCCEEDED, ): raise ConnectError( Code.FAILED_PRECONDITION, "conversion has already completed", ) session.status = conversion_pb2.CONVERSION_STATUS_RUNNING session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.current_progress = 0 session.max_progress = 0 session.error_message = "" session.updated_at = utc_now() session.conversion_task = asyncio.create_task(self._run_conversion(session)) return conversion_pb2.StartConversionResponse( conversion_id=session.conversion_id, status=session.status, ) async def get_conversion_status( self, request: conversion_pb2.GetConversionStatusRequest, ctx: RequestContext, ) -> conversion_pb2.GetConversionStatusResponse: """Return current conversion status and optional error details.""" del ctx session = await self._get_session(request.conversion_id) return conversion_pb2.GetConversionStatusResponse( conversion_id=session.conversion_id, status=session.status, error_message=session.error_message, updated_at=_to_timestamp(session.updated_at), phase=session.phase, current_progress=session.current_progress, max_progress=session.max_progress, ) async def get_slide_deck( self, request: conversion_pb2.GetSlideDeckRequest, ctx: RequestContext, ) -> conversion_pb2.GetSlideDeckResponse: """Return the finished slide deck once conversion succeeds.""" del ctx session = await self._get_session(request.conversion_id) if session.status == conversion_pb2.CONVERSION_STATUS_FAILED: raise ConnectError(Code.FAILED_PRECONDITION, session.error_message) if session.status != conversion_pb2.CONVERSION_STATUS_SUCCEEDED: raise ConnectError(Code.FAILED_PRECONDITION, "conversion is not finished yet") if session.slide_deck is None: raise ConnectError(Code.INTERNAL, "slide deck missing from successful session") return conversion_pb2.GetSlideDeckResponse(slide_deck=session.slide_deck) async def delete_conversion( self, request: conversion_pb2.DeleteConversionRequest, ctx: RequestContext, ) -> conversion_pb2.DeleteConversionResponse: """Delete a conversion session and associated object storage/local artifacts.""" del ctx async with self._lock: session = self._sessions.pop(request.conversion_id, None) if session is None: return conversion_pb2.DeleteConversionResponse( conversion_id=request.conversion_id, deleted=False, ) if session.cleanup_task is not None: session.cleanup_task.cancel() if session.conversion_task is not None and not session.conversion_task.done(): session.conversion_task.cancel() await self._cleanup_local_artifacts(session) await asyncio.to_thread(self._store.remove_bucket_tree, session.bucket_name) return conversion_pb2.DeleteConversionResponse( conversion_id=session.conversion_id, deleted=True, ) async def _run_conversion(self, session: ConversionSession) -> None: """Execute conversion flow and persist terminal state in memory.""" started_at = time.monotonic() logger.info( "Starting conversion conversion_id=%s source_filename=%s dpi=%d " "timeout_caps_s[pptx_to_pdf_total=%d,pdf_to_images_total=%d]", session.conversion_id, session.source_filename, self._config.conversion_image_dpi, self._config.conversion_pptx_to_pdf_timeout_seconds, self._config.conversion_pdf_to_images_timeout_seconds, ) work_dir = Path( tempfile.mkdtemp(prefix=f"officeconvert-{session.conversion_id}-") ).resolve() session.work_dir = work_dir source_path = work_dir / "input.pptx" try: await asyncio.to_thread( self._store.fget_object, session.bucket_name, session.upload_object_key, source_path, ) result = await asyncio.to_thread( convert_pptx_to_slidedeck, source_path, work_dir, dpi=self._config.conversion_image_dpi, pptx_to_pdf_timeout_s=self._config.conversion_pptx_to_pdf_timeout_seconds, pdf_to_images_timeout_s=self._config.conversion_pdf_to_images_timeout_seconds, pptx_to_pdf_base_timeout_s=self._config.conversion_pptx_to_pdf_base_timeout_seconds, pptx_to_pdf_per_slide_timeout_s=self._config.conversion_pptx_to_pdf_per_slide_timeout_seconds, pdf_to_images_base_timeout_s=self._config.conversion_pdf_to_images_base_timeout_seconds, pdf_to_images_per_slide_timeout_s=self._config.conversion_pdf_to_images_per_slide_timeout_seconds, progress_callback=lambda phase_name, current, max_value: self._set_session_progress_from_name( session, phase_name=phase_name, current_progress=current, max_progress=max_value, ), ) self._set_session_progress( session, phase=conversion_pb2.CONVERSION_PHASE_UPLOADING_RESULTS, current_progress=0, max_progress=len(result.slides), ) session.slide_deck = await asyncio.to_thread( self._upload_and_build_slide_deck, session, result.slides, result.source_filename, lambda current, max_value: self._set_session_progress( session, phase=conversion_pb2.CONVERSION_PHASE_UPLOADING_RESULTS, current_progress=current, max_progress=max_value, ), ) session.status = conversion_pb2.CONVERSION_STATUS_SUCCEEDED session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.updated_at = utc_now() elapsed_s = time.monotonic() - started_at logger.info( "Conversion succeeded conversion_id=%s source_filename=%s slides=%d elapsed_s=%.3f", session.conversion_id, session.source_filename, len(result.slides), elapsed_s, ) except asyncio.CancelledError: session.status = conversion_pb2.CONVERSION_STATUS_FAILED session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.error_message = "conversion cancelled" session.updated_at = utc_now() elapsed_s = time.monotonic() - started_at logger.warning( "Conversion cancelled conversion_id=%s source_filename=%s elapsed_s=%.3f", session.conversion_id, session.source_filename, elapsed_s, ) raise except ConversionTimeoutError as exc: session.status = conversion_pb2.CONVERSION_STATUS_FAILED session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.error_message = str(exc) session.updated_at = utc_now() elapsed_s = time.monotonic() - started_at logger.error( "Conversion timed out conversion_id=%s source_filename=%s elapsed_s=%.3f error=%s", session.conversion_id, session.source_filename, elapsed_s, exc, ) except Exception as exc: session.status = conversion_pb2.CONVERSION_STATUS_FAILED session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.error_message = str(exc) session.updated_at = utc_now() elapsed_s = time.monotonic() - started_at logger.exception( "Conversion failed conversion_id=%s source_filename=%s elapsed_s=%.3f", session.conversion_id, session.source_filename, elapsed_s, ) finally: await self._cleanup_local_artifacts(session) session.cleanup_task = asyncio.create_task(self._delayed_cleanup(session)) def _upload_and_build_slide_deck( self, session: ConversionSession, slides: list[SlideArtifact], source_filename: str, progress_callback: Callable[[int, int], None] | None = None, ) -> conversion_pb2.SlideDeck: """Upload generated slide images and construct API response payload.""" response_slides: list[conversion_pb2.Slide] = [] slide_total = len(slides) for slide_index, slide in enumerate(slides, start=1): object_key = f"output/slide-{slide.index:04d}{slide.image_path.suffix}" self._store.fput_object(session.bucket_name, object_key, slide.image_path) image_url = self._store.presigned_get_url( session.bucket_name, object_key, ttl_seconds=self._config.s3_session_ttl_seconds, ) response_slides.append( conversion_pb2.Slide( index=slide.index, notes_plain=slide.notes_plain, image_url=image_url, ) ) if progress_callback is not None: progress_callback(slide_index, slide_total) return conversion_pb2.SlideDeck( conversion_id=session.conversion_id, source_filename=source_filename, slides=response_slides, created_at=_to_timestamp(utc_now()), ) async def _delayed_cleanup(self, session: ConversionSession) -> None: """Delete storage resources after the configured session retention period.""" try: await asyncio.sleep(self._config.conversion_cleanup_delay_seconds) await asyncio.to_thread(self._store.remove_bucket_tree, session.bucket_name) except asyncio.CancelledError: return finally: async with self._lock: self._sessions.pop(session.conversion_id, None) async def _cleanup_local_artifacts(self, session: ConversionSession) -> None: """Delete temporary local files for a session if they still exist.""" if session.work_dir is not None and session.work_dir.exists(): await asyncio.to_thread(shutil.rmtree, session.work_dir, True) session.work_dir = None async def _get_session(self, conversion_id: str) -> ConversionSession: """Return an existing session or raise a NOT_FOUND error.""" async with self._lock: session = self._sessions.get(conversion_id) if session is None: raise ConnectError(Code.NOT_FOUND, "conversion_id not found") return session def _set_session_progress_from_name( self, session: ConversionSession, *, phase_name: str, current_progress: int, max_progress: int, ) -> None: """Map conversion-library phase names onto API enum phases.""" phase_map = { PHASE_EXTRACTING_NOTES: conversion_pb2.CONVERSION_PHASE_EXTRACTING_NOTES, PHASE_PPTX_TO_PDF: conversion_pb2.CONVERSION_PHASE_PPTX_TO_PDF, PHASE_PDF_TO_IMAGES: conversion_pb2.CONVERSION_PHASE_PDF_TO_IMAGES, } self._set_session_progress( session, phase=phase_map.get(phase_name, conversion_pb2.CONVERSION_PHASE_INACTIVE), current_progress=current_progress, max_progress=max_progress, ) def _set_session_progress( self, session: ConversionSession, *, phase: conversion_pb2.ConversionPhase, current_progress: int, max_progress: int, ) -> None: """Set normalized phase/progress counters and touch update timestamp.""" normalized_max = max(0, max_progress) normalized_current = max(0, current_progress) if normalized_max > 0: normalized_current = min(normalized_current, normalized_max) session.phase = phase session.current_progress = normalized_current session.max_progress = normalized_max session.updated_at = utc_now() def _to_timestamp(value: datetime) -> Timestamp: """Convert a timezone-aware datetime to protobuf Timestamp.""" normalized = value.astimezone(timezone.utc) proto = Timestamp() proto.FromDatetime(normalized) return proto