from grpclib.server import Stream
from viam.proto.common import DoCommandRequest, DoCommandResponse, GetGeometriesRequest, GetGeometriesResponse
from viam.proto.component.base import (
BaseServiceBase,
GetPropertiesRequest,
GetPropertiesResponse,
IsMovingRequest,
IsMovingResponse,
MoveStraightRequest,
MoveStraightResponse,
SetPowerRequest,
SetPowerResponse,
SetVelocityRequest,
SetVelocityResponse,
SpinRequest,
SpinResponse,
StopRequest,
StopResponse,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict
from .base import Base
[docs]class BaseRPCService(BaseServiceBase, ResourceRPCServiceBase[Base]):
"""
gRPC service for a robotic Base
"""
RESOURCE_TYPE = Base
[docs] async def MoveStraight(self, stream: Stream[MoveStraightRequest, MoveStraightResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
base = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
await base.move_straight(
distance=request.distance_mm,
velocity=request.mm_per_sec,
extra=struct_to_dict(request.extra),
timeout=timeout,
metadata=stream.metadata,
)
response = MoveStraightResponse()
await stream.send_message(response)
[docs] async def Spin(self, stream: Stream[SpinRequest, SpinResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
base = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
await base.spin(
angle=request.angle_deg,
velocity=request.degs_per_sec,
extra=struct_to_dict(request.extra),
timeout=timeout,
metadata=stream.metadata,
)
response = SpinResponse()
await stream.send_message(response)
[docs] async def SetPower(self, stream: Stream[SetPowerRequest, SetPowerResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
base = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
await base.set_power(
request.linear, request.angular, extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata
)
response = SetPowerResponse()
await stream.send_message(response)
[docs] async def SetVelocity(self, stream: Stream[SetVelocityRequest, SetVelocityResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
base = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
await base.set_velocity(
request.linear, request.angular, extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata
)
await stream.send_message(SetVelocityResponse())
[docs] async def Stop(self, stream: Stream[StopRequest, StopResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
base = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
await base.stop(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata)
response = StopResponse()
await stream.send_message(response)
[docs] async def IsMoving(self, stream: Stream[IsMovingRequest, IsMovingResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
base = self.get_resource(name)
is_moving = await base.is_moving()
response = IsMovingResponse(is_moving=is_moving)
await stream.send_message(response)
[docs] async def GetProperties(self, stream: Stream[GetPropertiesRequest, GetPropertiesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
base = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
properties = await base.get_properties(timeout=timeout, metadata=stream.metadata)
response = GetPropertiesResponse(
width_meters=properties.width_meters,
turning_radius_meters=properties.turning_radius_meters,
wheel_circumference_meters=properties.wheel_circumference_meters,
)
await stream.send_message(response)
[docs] async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None:
request = await stream.recv_message()
assert request is not None
base = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await base.do_command(command=struct_to_dict(request.command), timeout=timeout, metadata=stream.metadata)
response = DoCommandResponse(result=dict_to_struct(result))
await stream.send_message(response)
[docs] async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
base = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
geometries = await base.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout)
response = GetGeometriesResponse(geometries=geometries)
await stream.send_message(response)