Files
end 06d4122e4e
Docker server image / build-and-push (push) Successful in 3m2s
add rich output support for slide notes
2026-05-07 10:35:37 -07:00

392 lines
20 KiB
Python

# -*- coding: utf-8 -*-
# Generated by https://github.com/connectrpc/connect-python. DO NOT EDIT!
# source: officeconvertapi/v1/conversion.proto
from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Iterator, Mapping
from typing import Protocol
from connectrpc.client import ConnectClient, ConnectClientSync
from connectrpc.code import Code
from connectrpc.codec import Codec
from connectrpc.compression import Compression
from connectrpc.errors import ConnectError
from connectrpc.interceptor import Interceptor, InterceptorSync
from connectrpc.method import IdempotencyLevel, MethodInfo
from connectrpc.request import Headers, RequestContext
from connectrpc.server import ConnectASGIApplication, ConnectWSGIApplication, Endpoint, EndpointSync
import officeconvertapi.v1.conversion_pb2 as officeconvertapi_dot_v1_dot_conversion__pb2
class ConversionService(Protocol):
async def create_conversion(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
async def start_conversion(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
async def get_conversion_status(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
async def get_slide_deck(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
async def delete_conversion(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
class ConversionServiceASGIApplication(ConnectASGIApplication[ConversionService]):
def __init__(self, service: ConversionService | AsyncGenerator[ConversionService], *, interceptors: Iterable[Interceptor]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None, codecs: Iterable[Codec] | None = None) -> None:
super().__init__(
service=service,
endpoints=lambda svc: {
"/officeconvertapi.v1.ConversionService/CreateConversion": Endpoint.unary(
method=MethodInfo(
name="CreateConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=svc.create_conversion,
),
"/officeconvertapi.v1.ConversionService/StartConversion": Endpoint.unary(
method=MethodInfo(
name="StartConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=svc.start_conversion,
),
"/officeconvertapi.v1.ConversionService/GetConversionStatus": Endpoint.unary(
method=MethodInfo(
name="GetConversionStatus",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=svc.get_conversion_status,
),
"/officeconvertapi.v1.ConversionService/GetSlideDeck": Endpoint.unary(
method=MethodInfo(
name="GetSlideDeck",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=svc.get_slide_deck,
),
"/officeconvertapi.v1.ConversionService/DeleteConversion": Endpoint.unary(
method=MethodInfo(
name="DeleteConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=svc.delete_conversion,
),
},
interceptors=interceptors,
read_max_bytes=read_max_bytes,
compressions=compressions,
codecs=codecs,
)
@property
def path(self) -> str:
"""Returns the URL path to mount the application to when serving multiple applications."""
return "/officeconvertapi.v1.ConversionService"
class ConversionServiceClient(ConnectClient):
async def create_conversion(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionResponse:
return await self.execute_unary(
request=request,
method=MethodInfo(
name="CreateConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
async def start_conversion(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionResponse:
return await self.execute_unary(
request=request,
method=MethodInfo(
name="StartConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
async def get_conversion_status(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusResponse:
return await self.execute_unary(
request=request,
method=MethodInfo(
name="GetConversionStatus",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
async def get_slide_deck(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckResponse:
return await self.execute_unary(
request=request,
method=MethodInfo(
name="GetSlideDeck",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
async def delete_conversion(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionResponse:
return await self.execute_unary(
request=request,
method=MethodInfo(
name="DeleteConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
class ConversionServiceSync(Protocol):
def create_conversion(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
def start_conversion(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
def get_conversion_status(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
def get_slide_deck(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
def delete_conversion(self, request: officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionRequest, ctx: RequestContext) -> officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionResponse:
raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
class ConversionServiceWSGIApplication(ConnectWSGIApplication):
def __init__(self, service: ConversionServiceSync, interceptors: Iterable[InterceptorSync]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None, codecs: Iterable[Codec] | None = None) -> None:
super().__init__(
endpoints={
"/officeconvertapi.v1.ConversionService/CreateConversion": EndpointSync.unary(
method=MethodInfo(
name="CreateConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=service.create_conversion,
),
"/officeconvertapi.v1.ConversionService/StartConversion": EndpointSync.unary(
method=MethodInfo(
name="StartConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=service.start_conversion,
),
"/officeconvertapi.v1.ConversionService/GetConversionStatus": EndpointSync.unary(
method=MethodInfo(
name="GetConversionStatus",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=service.get_conversion_status,
),
"/officeconvertapi.v1.ConversionService/GetSlideDeck": EndpointSync.unary(
method=MethodInfo(
name="GetSlideDeck",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=service.get_slide_deck,
),
"/officeconvertapi.v1.ConversionService/DeleteConversion": EndpointSync.unary(
method=MethodInfo(
name="DeleteConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
function=service.delete_conversion,
),
},
interceptors=interceptors,
read_max_bytes=read_max_bytes,
compressions=compressions,
codecs=codecs,
)
@property
def path(self) -> str:
"""Returns the URL path to mount the application to when serving multiple applications."""
return "/officeconvertapi.v1.ConversionService"
class ConversionServiceClientSync(ConnectClientSync):
def create_conversion(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionResponse:
return self.execute_unary(
request=request,
method=MethodInfo(
name="CreateConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.CreateConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
def start_conversion(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionResponse:
return self.execute_unary(
request=request,
method=MethodInfo(
name="StartConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.StartConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
def get_conversion_status(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusResponse:
return self.execute_unary(
request=request,
method=MethodInfo(
name="GetConversionStatus",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.GetConversionStatusResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
def get_slide_deck(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckResponse:
return self.execute_unary(
request=request,
method=MethodInfo(
name="GetSlideDeck",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.GetSlideDeckResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)
def delete_conversion(
self,
request: officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionRequest,
*,
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionResponse:
return self.execute_unary(
request=request,
method=MethodInfo(
name="DeleteConversion",
service_name="officeconvertapi.v1.ConversionService",
input=officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionRequest,
output=officeconvertapi_dot_v1_dot_conversion__pb2.DeleteConversionResponse,
idempotency_level=IdempotencyLevel.UNKNOWN,
),
headers=headers,
timeout_ms=timeout_ms,
)