from typing import TYPE_CHECKING
from grpclib.server import Stream
from viam.proto.module import (
AddResourceRequest,
AddResourceResponse,
ModuleServiceBase,
ReadyRequest,
ReadyResponse,
ReconfigureResourceRequest,
ReconfigureResourceResponse,
RemoveResourceRequest,
RemoveResourceResponse,
ValidateConfigRequest,
ValidateConfigResponse,
)
if TYPE_CHECKING:
from .module import Module
[docs]class ModuleRPCService(ModuleServiceBase):
_module: "Module"
def __init__(self, module: "Module") -> None:
self._module = module
[docs] async def AddResource(self, stream: Stream[AddResourceRequest, AddResourceResponse]) -> None:
request = await stream.recv_message()
assert request is not None
await self._module.add_resource(request)
await stream.send_message(AddResourceResponse())
[docs] async def RemoveResource(self, stream: Stream[RemoveResourceRequest, RemoveResourceResponse]) -> None:
request = await stream.recv_message()
assert request is not None
await self._module.remove_resource(request)
await stream.send_message(RemoveResourceResponse())
[docs] async def Ready(self, stream: Stream[ReadyRequest, ReadyResponse]) -> None:
request = await stream.recv_message()
assert request is not None
response = await self._module.ready(request)
await stream.send_message(response)
[docs] async def ValidateConfig(self, stream: Stream[ValidateConfigRequest, ValidateConfigResponse]) -> None:
request = await stream.recv_message()
assert request is not None
response = await self._module.validate_config(request)
if response is not None:
await stream.send_message(response)