from grpclib.server import Stream
from viam.proto.common import (
DoCommandRequest,
DoCommandResponse,
GetGeometriesRequest,
GetGeometriesResponse,
GetStatusRequest,
GetStatusResponse,
)
from viam.proto.component.encoder import (
EncoderServiceBase,
GetPositionRequest,
GetPositionResponse,
GetPropertiesRequest,
GetPropertiesResponse,
ResetPositionRequest,
ResetPositionResponse,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict
from .encoder import Encoder
[docs]
class EncoderRPCService(EncoderServiceBase, ResourceRPCServiceBase[Encoder]):
"""
gRPC Service for an Encoder
"""
RESOURCE_TYPE = Encoder
[docs]
async def ResetPosition(self, stream: Stream[ResetPositionRequest, ResetPositionResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
encoder = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
await encoder.reset_position(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata)
await stream.send_message(ResetPositionResponse())
[docs]
async def GetPosition(self, stream: Stream[GetPositionRequest, GetPositionResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
encoder = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
position, pos_type = await encoder.get_position(
position_type=request.position_type, extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata
)
await stream.send_message(GetPositionResponse(value=position, position_type=pos_type))
[docs]
async def GetProperties(self, stream: Stream[GetPropertiesRequest, GetPropertiesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
encoder = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
properties = await encoder.get_properties(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata)
response = GetPropertiesResponse(**properties.__dict__)
await stream.send_message(response)
[docs]
async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None:
request = await stream.recv_message()
assert request is not None
encoder = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await encoder.do_command(command=struct_to_dict(request.command), timeout=timeout, metadata=stream.metadata)
response = DoCommandResponse(result=dict_to_struct(result))
await stream.send_message(response)
[docs]
async def GetStatus(self, stream: Stream[GetStatusRequest, GetStatusResponse]) -> None:
request = await stream.recv_message()
assert request is not None
encoder = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await encoder.get_status(timeout=timeout, metadata=stream.metadata)
response = GetStatusResponse(result=dict_to_struct(result))
await stream.send_message(response)
[docs]
async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
arm = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
geometries = await arm.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout)
response = GetGeometriesResponse(geometries=geometries)
await stream.send_message(response)