from grpclib.server import Stream
from viam.proto.common import DoCommandRequest, DoCommandResponse
from viam.proto.service.motion import (
GetPlanRequest,
GetPlanResponse,
GetPoseRequest,
GetPoseResponse,
ListPlanStatusesRequest,
ListPlanStatusesResponse,
MoveOnGlobeRequest,
MoveOnGlobeResponse,
MoveOnMapRequest,
MoveOnMapResponse,
MoveRequest,
MoveResponse,
StopPlanRequest,
StopPlanResponse,
UnimplementedMotionServiceBase,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict
from .motion import Motion
[docs]class MotionRPCService(UnimplementedMotionServiceBase, ResourceRPCServiceBase[Motion]):
RESOURCE_TYPE = Motion
[docs] async def Move(self, stream: Stream[MoveRequest, MoveResponse]) -> None:
request = await stream.recv_message()
assert request is not None
service = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await service.move(
request.component_name,
request.destination,
request.world_state,
request.constraints,
extra=struct_to_dict(request.extra),
timeout=timeout,
)
response = MoveResponse(success=result)
await stream.send_message(response)
[docs] async def MoveOnMap(self, stream: Stream[MoveOnMapRequest, MoveOnMapResponse]) -> None:
request = await stream.recv_message()
assert request is not None
service = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await service.move_on_map(
request.component_name,
request.destination,
request.slam_service_name,
request.motion_configuration,
request.obstacles,
extra=struct_to_dict(request.extra),
timeout=timeout,
)
response = MoveOnMapResponse(execution_id=result)
await stream.send_message(response)
[docs] async def MoveOnGlobe(self, stream: Stream[MoveOnGlobeRequest, MoveOnGlobeResponse]) -> None:
request = await stream.recv_message()
assert request is not None
service = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await service.move_on_globe(
request.component_name,
request.destination,
request.movement_sensor_name,
request.obstacles,
request.heading,
request.motion_configuration,
bounding_regions=request.bounding_regions,
extra=struct_to_dict(request.extra),
timeout=timeout,
)
response = MoveOnGlobeResponse(execution_id=result)
await stream.send_message(response)
[docs] async def GetPose(self, stream: Stream[GetPoseRequest, GetPoseResponse]) -> None:
request = await stream.recv_message()
assert request is not None
service = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await service.get_pose(
request.component_name,
request.destination_frame,
request.supplemental_transforms,
extra=struct_to_dict(request.extra),
timeout=timeout,
)
response = GetPoseResponse(pose=result)
await stream.send_message(response)
[docs] async def StopPlan(self, stream: Stream[StopPlanRequest, StopPlanResponse]) -> None:
request = await stream.recv_message()
assert request is not None
service = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
await service.stop_plan(request.component_name, extra=struct_to_dict(request.extra), timeout=timeout)
response = StopPlanResponse()
await stream.send_message(response)
[docs] async def ListPlanStatuses(self, stream: Stream[ListPlanStatusesRequest, ListPlanStatusesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
service = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await service.list_plan_statuses(request.only_active_plans, extra=struct_to_dict(request.extra), timeout=timeout)
response = ListPlanStatusesResponse(plan_statuses_with_ids=result)
await stream.send_message(response)
[docs] async def GetPlan(self, stream: Stream[GetPlanRequest, GetPlanResponse]) -> None:
request = await stream.recv_message()
assert request is not None
service = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await service.get_plan(
request.component_name, request.last_plan_only, request.execution_id, extra=struct_to_dict(request.extra), timeout=timeout
)
await stream.send_message(result)
[docs] async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None:
request = await stream.recv_message()
assert request is not None
service = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await service.do_command(struct_to_dict(request.command), timeout=timeout, metadata=stream.metadata)
response = DoCommandResponse(result=dict_to_struct(result))
await stream.send_message(response)