"""Connect service implementation for conversion request orchestration.""" from __future__ import annotations import asyncio from collections.abc import Callable from datetime import datetime, timedelta, timezone import logging from pathlib import Path import shutil import tempfile import time from connectrpc.code import Code from connectrpc.errors import ConnectError from connectrpc.request import RequestContext from google.protobuf.timestamp_pb2 import Timestamp from officeconvert import SlideArtifact, convert_pptx_to_slidedeck from officeconvert.conversion import ( ConversionTimeoutError, HtmlFormattingPolicy, NotesFormat, NotesOptions, PHASE_EXTRACTING_NOTES, PHASE_PDF_TO_IMAGES, PHASE_PPTX_TO_PDF, RESOLUTION_FHD, RESOLUTION_HD, RESOLUTION_QHD, RESOLUTION_SD, RESOLUTION_UHD, ) from officeconvertapi.v1 import conversion_connect, conversion_pb2 from ksuid import Ksuid from minio.error import S3Error from officeconvert_server.config import ServerConfig from officeconvert_server.models import ConversionSession, utc_now from officeconvert_server.storage import S3Store, log_s3_error logger = logging.getLogger("uvicorn.error") _RESOLUTION_PRESET_BY_PROTO = { conversion_pb2.CONVERSION_RESOLUTION_SD: RESOLUTION_SD, conversion_pb2.CONVERSION_RESOLUTION_HD: RESOLUTION_HD, conversion_pb2.CONVERSION_RESOLUTION_FHD: RESOLUTION_FHD, conversion_pb2.CONVERSION_RESOLUTION_QHD: RESOLUTION_QHD, conversion_pb2.CONVERSION_RESOLUTION_UHD: RESOLUTION_UHD, } _DEFAULT_FULL_RESOLUTION = conversion_pb2.CONVERSION_RESOLUTION_FHD _DEFAULT_THUMBNAIL_RESOLUTION = conversion_pb2.CONVERSION_RESOLUTION_SD _DEFAULT_FULL_JPEG_QUALITY = 85 _DEFAULT_THUMBNAIL_JPEG_QUALITY = 75 _DEFAULT_NOTES_FORMAT = conversion_pb2.NOTES_FORMAT_PLAIN _DEFAULT_HTML_USE_PARAGRAPH_TAGS = True def _to_library_notes_options( notes: conversion_pb2.NotesOptions | None, ) -> NotesOptions | None: if notes is None: return None fmt = notes.format or _DEFAULT_NOTES_FORMAT library_format = NotesFormat.HTML if fmt == conversion_pb2.NOTES_FORMAT_HTML else NotesFormat.PLAIN html_use_paragraph_tags = _DEFAULT_HTML_USE_PARAGRAPH_TAGS if notes.HasField("html_use_paragraph_tags"): html_use_paragraph_tags = bool(notes.html_use_paragraph_tags) policy_proto = notes.html_policy policy = HtmlFormattingPolicy( ignore_bold=bool(policy_proto.ignore_bold), ignore_italic=bool(policy_proto.ignore_italic), ignore_underline=bool(policy_proto.ignore_underline), ignore_strikethrough=bool(policy_proto.ignore_strikethrough), ignore_font_size=bool(policy_proto.ignore_font_size), ignore_color=bool(policy_proto.ignore_color), ) return NotesOptions( format=library_format, html_use_paragraph_tags=html_use_paragraph_tags, html_policy=policy, ) class ConversionServiceImpl(conversion_connect.ConversionService): """Implements the conversion API with in-memory state and S3 orchestration.""" def __init__(self, config: ServerConfig, store: S3Store) -> None: """Initialize service with runtime config and storage adapter.""" self._config = config self._store = store self._sessions: dict[str, ConversionSession] = {} self._lock = asyncio.Lock() async def create_conversion( self, request: conversion_pb2.CreateConversionRequest, ctx: RequestContext, ) -> conversion_pb2.CreateConversionResponse: """Create a new conversion session and return upload credentials.""" del ctx source_filename = request.source_filename.strip() if not source_filename: raise ConnectError(Code.INVALID_ARGUMENT, "source_filename is required") if not source_filename.lower().endswith(".pptx"): raise ConnectError(Code.INVALID_ARGUMENT, "only .pptx input is supported") full_resolution, full_jpeg_quality = self._resolve_raster_options( request.full, default_resolution=_DEFAULT_FULL_RESOLUTION, default_jpeg_quality=_DEFAULT_FULL_JPEG_QUALITY, field_name="full", ) thumbnail_resolution, thumbnail_jpeg_quality = self._resolve_raster_options( request.thumbnail, default_resolution=_DEFAULT_THUMBNAIL_RESOLUTION, default_jpeg_quality=_DEFAULT_THUMBNAIL_JPEG_QUALITY, field_name="thumbnail", ) ksuid = Ksuid() conversion_id = str(ksuid) object_prefix = f"{conversion_id}/" upload_key = f"{object_prefix}input/source.pptx" expires_at = utc_now() + timedelta(seconds=self._config.s3_session_ttl_seconds) try: upload_url = self._store.presigned_put_url( self._config.s3_bucket, upload_key, ttl_seconds=self._config.s3_session_ttl_seconds, ) except S3Error as exc: log_s3_error( "presigned_put_url", endpoint=self._config.s3_public_endpoint, secure=self._config.s3_public_secure, exc=exc, ) raise session = ConversionSession( conversion_id=conversion_id, source_filename=source_filename, full_resolution=full_resolution, thumbnail_resolution=thumbnail_resolution, full_jpeg_quality=full_jpeg_quality, thumbnail_jpeg_quality=thumbnail_jpeg_quality, notes=request.notes if request.HasField("notes") else None, object_prefix=object_prefix, upload_object_key=upload_key, status=conversion_pb2.CONVERSION_STATUS_PENDING, ) async with self._lock: self._sessions[conversion_id] = session return conversion_pb2.CreateConversionResponse( conversion_id=conversion_id, upload_bucket=self._config.s3_bucket, upload_object_key=upload_key, upload_url=upload_url, expires_at=_to_timestamp(expires_at), ) async def start_conversion( self, request: conversion_pb2.StartConversionRequest, ctx: RequestContext, ) -> conversion_pb2.StartConversionResponse: """Start asynchronous conversion for an already-uploaded session payload.""" del ctx session = await self._get_session(request.conversion_id) async with self._lock: if session.status == conversion_pb2.CONVERSION_STATUS_RUNNING: return conversion_pb2.StartConversionResponse( conversion_id=session.conversion_id, status=session.status, ) if session.status in ( conversion_pb2.CONVERSION_STATUS_FAILED, conversion_pb2.CONVERSION_STATUS_SUCCEEDED, ): raise ConnectError( Code.FAILED_PRECONDITION, "conversion has already completed", ) session.status = conversion_pb2.CONVERSION_STATUS_RUNNING session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.current_progress = 0 session.max_progress = 0 session.error_message = "" session.updated_at = utc_now() session.conversion_task = asyncio.create_task(self._run_conversion(session)) return conversion_pb2.StartConversionResponse( conversion_id=session.conversion_id, status=session.status, ) async def get_conversion_status( self, request: conversion_pb2.GetConversionStatusRequest, ctx: RequestContext, ) -> conversion_pb2.GetConversionStatusResponse: """Return current conversion status and optional error details.""" del ctx session = await self._get_session(request.conversion_id) return conversion_pb2.GetConversionStatusResponse( conversion_id=session.conversion_id, status=session.status, error_message=session.error_message, updated_at=_to_timestamp(session.updated_at), phase=session.phase, current_progress=session.current_progress, max_progress=session.max_progress, ) async def get_slide_deck( self, request: conversion_pb2.GetSlideDeckRequest, ctx: RequestContext, ) -> conversion_pb2.GetSlideDeckResponse: """Return the finished slide deck once conversion succeeds.""" del ctx session = await self._get_session(request.conversion_id) if session.status == conversion_pb2.CONVERSION_STATUS_FAILED: raise ConnectError(Code.FAILED_PRECONDITION, session.error_message) if session.status != conversion_pb2.CONVERSION_STATUS_SUCCEEDED: raise ConnectError(Code.FAILED_PRECONDITION, "conversion is not finished yet") if session.slide_deck is None: raise ConnectError(Code.INTERNAL, "slide deck missing from successful session") return conversion_pb2.GetSlideDeckResponse(slide_deck=session.slide_deck) async def delete_conversion( self, request: conversion_pb2.DeleteConversionRequest, ctx: RequestContext, ) -> conversion_pb2.DeleteConversionResponse: """Delete a conversion session and associated object storage/local artifacts.""" del ctx async with self._lock: session = self._sessions.pop(request.conversion_id, None) if session is None: return conversion_pb2.DeleteConversionResponse( conversion_id=request.conversion_id, deleted=False, ) if session.cleanup_task is not None: session.cleanup_task.cancel() if session.conversion_task is not None and not session.conversion_task.done(): session.conversion_task.cancel() await self._cleanup_local_artifacts(session) await asyncio.to_thread( self._store.remove_prefix, self._config.s3_bucket, session.object_prefix, ) return conversion_pb2.DeleteConversionResponse( conversion_id=session.conversion_id, deleted=True, ) async def _run_conversion(self, session: ConversionSession) -> None: """Execute conversion flow and persist terminal state in memory.""" started_at = time.monotonic() logger.info( "Starting conversion conversion_id=%s source_filename=%s " "full[resolution=%s,jpeg_quality=%d] thumbnail[resolution=%s,jpeg_quality=%d] " "timeout_caps_s[pptx_to_pdf_total=%d,pdf_to_images_total=%d]", session.conversion_id, session.source_filename, conversion_pb2.ConversionResolution.Name(session.full_resolution), session.full_jpeg_quality, conversion_pb2.ConversionResolution.Name(session.thumbnail_resolution), session.thumbnail_jpeg_quality, self._config.conversion_pptx_to_pdf_timeout_seconds, self._config.conversion_pdf_to_images_timeout_seconds, ) work_dir = Path( tempfile.mkdtemp(prefix=f"officeconvert-{session.conversion_id}-") ).resolve() session.work_dir = work_dir source_path = work_dir / "input.pptx" try: await asyncio.to_thread( self._store.fget_object, self._config.s3_bucket, session.upload_object_key, source_path, ) result = await asyncio.to_thread( convert_pptx_to_slidedeck, source_path, work_dir, full_resolution=_RESOLUTION_PRESET_BY_PROTO[session.full_resolution], thumbnail_resolution=_RESOLUTION_PRESET_BY_PROTO[ session.thumbnail_resolution ], full_jpeg_quality=session.full_jpeg_quality, thumbnail_jpeg_quality=session.thumbnail_jpeg_quality, pptx_to_pdf_timeout_s=self._config.conversion_pptx_to_pdf_timeout_seconds, pdf_to_images_timeout_s=self._config.conversion_pdf_to_images_timeout_seconds, pptx_to_pdf_base_timeout_s=self._config.conversion_pptx_to_pdf_base_timeout_seconds, pptx_to_pdf_per_slide_timeout_s=self._config.conversion_pptx_to_pdf_per_slide_timeout_seconds, pdf_to_images_base_timeout_s=self._config.conversion_pdf_to_images_base_timeout_seconds, pdf_to_images_per_slide_timeout_s=self._config.conversion_pdf_to_images_per_slide_timeout_seconds, notes_options=_to_library_notes_options(session.notes), progress_callback=lambda phase_name, current, max_value: self._set_session_progress_from_name( session, phase_name=phase_name, current_progress=current, max_progress=max_value, ), ) logger.info( "Resolved conversion plan conversion_id=%s source_filename=%s " "full[resolution=%s,size=%dx%d,jpeg_quality=%d] " "thumbnail[resolution=%s,size=%dx%d,jpeg_quality=%d] " "inferred_dpi=%d " "computed_timeouts_s[pptx_to_pdf_total=%d,pdf_to_images_total=%d,pdf_to_images_per_page=%d]", session.conversion_id, session.source_filename, conversion_pb2.ConversionResolution.Name(session.full_resolution), result.width, result.height, session.full_jpeg_quality, conversion_pb2.ConversionResolution.Name(session.thumbnail_resolution), result.thumbnail_width, result.thumbnail_height, session.thumbnail_jpeg_quality, result.inferred_dpi, result.pptx_to_pdf_timeout_s, result.pdf_to_images_timeout_s, result.pdf_to_images_page_timeout_s, ) self._set_session_progress( session, phase=conversion_pb2.CONVERSION_PHASE_UPLOADING_RESULTS, current_progress=0, max_progress=len(result.slides) * 2, ) session.slide_deck = await asyncio.to_thread( self._upload_and_build_slide_deck, session, result.slides, result.source_filename, result.width, result.height, result.thumbnail_width, result.thumbnail_height, lambda current, max_value: self._set_session_progress( session, phase=conversion_pb2.CONVERSION_PHASE_UPLOADING_RESULTS, current_progress=current, max_progress=max_value, ), ) session.status = conversion_pb2.CONVERSION_STATUS_SUCCEEDED session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.updated_at = utc_now() elapsed_s = time.monotonic() - started_at logger.info( "Conversion succeeded conversion_id=%s source_filename=%s slides=%d elapsed_s=%.3f", session.conversion_id, session.source_filename, len(result.slides), elapsed_s, ) except asyncio.CancelledError: session.status = conversion_pb2.CONVERSION_STATUS_FAILED session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.error_message = "conversion cancelled" session.updated_at = utc_now() elapsed_s = time.monotonic() - started_at logger.warning( "Conversion cancelled conversion_id=%s source_filename=%s elapsed_s=%.3f", session.conversion_id, session.source_filename, elapsed_s, ) raise except ConversionTimeoutError as exc: session.status = conversion_pb2.CONVERSION_STATUS_FAILED session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.error_message = str(exc) session.updated_at = utc_now() elapsed_s = time.monotonic() - started_at logger.error( "Conversion timed out conversion_id=%s source_filename=%s elapsed_s=%.3f error=%s", session.conversion_id, session.source_filename, elapsed_s, exc, ) except Exception as exc: session.status = conversion_pb2.CONVERSION_STATUS_FAILED session.phase = conversion_pb2.CONVERSION_PHASE_INACTIVE session.error_message = str(exc) session.updated_at = utc_now() elapsed_s = time.monotonic() - started_at logger.exception( "Conversion failed conversion_id=%s source_filename=%s elapsed_s=%.3f", session.conversion_id, session.source_filename, elapsed_s, ) finally: await self._cleanup_local_artifacts(session) session.cleanup_task = asyncio.create_task(self._delayed_cleanup(session)) def _upload_and_build_slide_deck( self, session: ConversionSession, slides: list[SlideArtifact], source_filename: str, width: int, height: int, thumbnail_width: int, thumbnail_height: int, progress_callback: Callable[[int, int], None] | None = None, ) -> conversion_pb2.SlideDeck: """Upload generated slide images and construct API response payload.""" response_slides: list[conversion_pb2.Slide] = [] slide_total = len(slides) upload_total = slide_total * 2 upload_index = 0 for slide in slides: object_key = ( f"{session.object_prefix}output/slide-{slide.index:04d}{slide.image_path.suffix}" ) self._store.fput_object(self._config.s3_bucket, object_key, slide.image_path) image_url = self._store.presigned_get_url( self._config.s3_bucket, object_key, ttl_seconds=self._config.s3_session_ttl_seconds, ) upload_index += 1 if progress_callback is not None: progress_callback(upload_index, upload_total) thumbnail_object_key = ( f"{session.object_prefix}output/thumb/slide-{slide.index:04d}" f"{slide.thumbnail_path.suffix}" ) self._store.fput_object( self._config.s3_bucket, thumbnail_object_key, slide.thumbnail_path, ) thumbnail_image_url = self._store.presigned_get_url( self._config.s3_bucket, thumbnail_object_key, ttl_seconds=self._config.s3_session_ttl_seconds, ) upload_index += 1 response_slides.append( conversion_pb2.Slide( index=slide.index, notes_plain=slide.notes_plain, notes_html=slide.notes_html, image_url=image_url, thumbnail_image_url=thumbnail_image_url, ) ) if progress_callback is not None: progress_callback(upload_index, upload_total) return conversion_pb2.SlideDeck( conversion_id=session.conversion_id, source_filename=source_filename, slides=response_slides, created_at=_to_timestamp(utc_now()), width=width, height=height, thumbnail_width=thumbnail_width, thumbnail_height=thumbnail_height, ) def _resolve_raster_options( self, options: conversion_pb2.SlideRasterOptions, *, default_resolution: conversion_pb2.ConversionResolution, default_jpeg_quality: int, field_name: str, ) -> tuple[conversion_pb2.ConversionResolution, int]: """Resolve per-tier raster options with defaults and validation.""" resolution = options.resolution if resolution == conversion_pb2.CONVERSION_RESOLUTION_UNSPECIFIED: resolution = default_resolution if resolution not in _RESOLUTION_PRESET_BY_PROTO: raise ConnectError(Code.INVALID_ARGUMENT, f"{field_name}.resolution is invalid") jpeg_quality = default_jpeg_quality if options.HasField("jpeg"): quality = options.jpeg.quality if quality == 0: jpeg_quality = default_jpeg_quality elif 1 <= quality <= 100: jpeg_quality = quality else: raise ConnectError( Code.INVALID_ARGUMENT, f"{field_name}.jpeg.quality must be 0 or between 1 and 100", ) return resolution, jpeg_quality async def _delayed_cleanup(self, session: ConversionSession) -> None: """Delete storage resources after the configured session retention period.""" try: await asyncio.sleep(self._config.conversion_cleanup_delay_seconds) await asyncio.to_thread( self._store.remove_prefix, self._config.s3_bucket, session.object_prefix, ) except asyncio.CancelledError: return finally: async with self._lock: self._sessions.pop(session.conversion_id, None) async def _cleanup_local_artifacts(self, session: ConversionSession) -> None: """Delete temporary local files for a session if they still exist.""" if session.work_dir is not None and session.work_dir.exists(): await asyncio.to_thread(shutil.rmtree, session.work_dir, True) session.work_dir = None async def _get_session(self, conversion_id: str) -> ConversionSession: """Return an existing session or raise a NOT_FOUND error.""" async with self._lock: session = self._sessions.get(conversion_id) if session is None: raise ConnectError(Code.NOT_FOUND, "conversion_id not found") return session def _set_session_progress_from_name( self, session: ConversionSession, *, phase_name: str, current_progress: int, max_progress: int, ) -> None: """Map conversion-library phase names onto API enum phases.""" phase_map = { PHASE_EXTRACTING_NOTES: conversion_pb2.CONVERSION_PHASE_EXTRACTING_NOTES, PHASE_PPTX_TO_PDF: conversion_pb2.CONVERSION_PHASE_PPTX_TO_PDF, PHASE_PDF_TO_IMAGES: conversion_pb2.CONVERSION_PHASE_PDF_TO_IMAGES, } self._set_session_progress( session, phase=phase_map.get(phase_name, conversion_pb2.CONVERSION_PHASE_INACTIVE), current_progress=current_progress, max_progress=max_progress, ) def _set_session_progress( self, session: ConversionSession, *, phase: conversion_pb2.ConversionPhase, current_progress: int, max_progress: int, ) -> None: """Set normalized phase/progress counters and touch update timestamp.""" normalized_max = max(0, max_progress) normalized_current = max(0, current_progress) if normalized_max > 0: normalized_current = min(normalized_current, normalized_max) session.phase = phase session.current_progress = normalized_current session.max_progress = normalized_max session.updated_at = utc_now() def _to_timestamp(value: datetime) -> Timestamp: """Convert a timezone-aware datetime to protobuf Timestamp.""" normalized = value.astimezone(timezone.utc) proto = Timestamp() proto.FromDatetime(normalized) return proto