from typing import Any, Final, Iterable, List, Mapping, Optional, Sequence
from grpclib.client import Channel
from viam.proto.common import (
DoCommandRequest,
DoCommandResponse,
Geometry,
GeoObstacle,
GeoPoint,
Pose,
PoseInFrame,
ResourceName,
Transform,
WorldState,
)
from viam.proto.service.motion import (
Constraints,
GetPlanRequest,
GetPlanResponse,
GetPoseRequest,
GetPoseResponse,
ListPlanStatusesRequest,
ListPlanStatusesResponse,
MotionConfiguration,
MotionServiceStub,
MoveOnGlobeRequest,
MoveOnGlobeResponse,
MoveOnMapRequest,
MoveOnMapResponse,
MoveRequest,
MoveResponse,
StopPlanRequest,
StopPlanResponse,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.resource.types import RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE, Subtype
from viam.services.service_client_base import ServiceClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict
[docs]class MotionClient(ServiceClientBase, ReconfigurableResourceRPCClientBase):
"""Motion is a Viam service that coordinates motion planning across all of the components in a given robot.
The motion planning service calculates a valid path that avoids self collision by default. If additional constraints are supplied in the
``world_state`` message, the motion planning service will also account for those.
"""
SUBTYPE: Final = Subtype( # pyright: ignore [reportIncompatibleVariableOverride]
RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE, "motion"
)
client: MotionServiceStub
def __init__(self, name: str, channel: Channel):
super().__init__(name, channel)
self.client = MotionServiceStub(channel)
[docs] async def move(
self,
component_name: ResourceName,
destination: PoseInFrame,
world_state: Optional[WorldState] = None,
constraints: Optional[Constraints] = None,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> bool:
"""Plan and execute a movement to move the component specified to its goal destination.
Note: Frames designated with respect to components can also be used as the ``component_name`` when calling for a move. This
technique allows for planning and moving the frame itself to the ``destination``. To do so, simply create a resource name with
originating ReferenceFrame's name. Then pass in the resource name into ``component_name``. Ex::
resource_name = Arm.get_resource_name("externalFrame")
success = await MotionServiceClient.move(resource_name, ...)
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
# Assumes a gripper configured with name "my_gripper" on the machine
gripper_name = Gripper.get_resource_name("my_gripper")
my_frame = "my_gripper_offset"
goal_pose = Pose(x=0, y=0, z=300, o_x=0, o_y=0, o_z=1, theta=0)
# Move the gripper
moved = await motion.move(component_name=gripper_name,
destination=PoseInFrame(reference_frame="myFrame",
pose=goal_pose),
world_state=worldState,
constraints={},
extra={})
Args:
component_name (viam.proto.common.ResourceName): Name of a component on a given robot.
destination (viam.proto.common.PoseInFrame): The destination to move to, expressed as a ``Pose`` and the frame in which it was
observed.
world_state (viam.proto.common.WorldState): When supplied, the motion service will create a plan that obeys any constraints
expressed in the WorldState message.
constraints (viam.proto.service.motion.Constraints): When supplied, the motion service will create a plan that obeys any
specified constraints
Returns:
bool: Whether the move was successful
"""
if extra is None:
extra = {}
request = MoveRequest(
name=self.name,
destination=destination,
component_name=component_name,
world_state=world_state,
constraints=constraints,
extra=dict_to_struct(extra),
)
response: MoveResponse = await self.client.Move(request, timeout=timeout)
return response.success
[docs] async def move_on_globe(
self,
component_name: ResourceName,
destination: GeoPoint,
movement_sensor_name: ResourceName,
obstacles: Optional[Sequence[GeoObstacle]] = None,
heading: Optional[float] = None,
configuration: Optional[MotionConfiguration] = None,
*,
extra: Optional[Mapping[str, ValueTypes]] = None,
timeout: Optional[float] = None,
) -> str:
"""Move a component to a specific latitude and longitude, using a ``MovementSensor`` to check the location.
``move_on_globe()`` is non blocking, meaning the motion service will move the component to the destination
GPS point after ``move_on_globe()`` returns.
Each successful ``move_on_globe()`` call returns a unique ExecutionID which you can use to identify all plans
generated during the ``move_on_globe()`` call.
You can monitor the progress of the ``move_on_globe()`` call by querying ``get_plan()`` and ``list_plan_statuses()``.
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
# Get the ResourceNames of the base and movement sensor
my_base_resource_name = Base.get_resource_name("my_base")
mvmnt_sensor_resource_name = MovementSensor.get_resource_name(
"my_movement_sensor")
# Define a destination GeoPoint at the GPS coordinates [0, 0]
my_destination = movement_sensor.GeoPoint(latitude=0, longitude=0)
# Move the base component to the designated geographic location, as reported by the movement sensor
execution_id = await motion.move_on_globe(
component_name=my_base_resource_name,
destination=my_destination,
movement_sensor_name=mvmnt_sensor_resource_name)
Args:
component_name (ResourceName): The ResourceName of the base to move.
destination (GeoPoint): The location of the component’s destination, represented in geographic notation as a
GeoPoint (lat, lng).
movement_sensor_name (ResourceName): The ResourceName of the movement sensor that you want to use to check
the machine’s location.
obstacles (Optional[Sequence[GeoObstacle]]): Obstacles to consider when planning the motion of the component,
with each represented as a GeoObstacle. Default: None
heading (Optional[float]): The compass heading, in degrees, that the machine’s movement sensor should report
at the destination point. Range: [0-360) 0: North, 90: East, 180: South, 270: West. Default: None
configuration (Optional[MotionConfiguration]): The configuration you want to set across this machine for this
motion service. This parameter and each of its fields are optional.
- obstacle_detectors (Iterable[ObstacleDetector]): The names of each vision service and camera resource pair
you want to use for transient obstacle avoidance.
- position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine.
- obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles.
- plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan.
- linear_m_per_sec (float): Linear velocity this machine should target when moving.
- angular_degs_per_sec (float): Angular velocity this machine should target when turning.
extra (Optional[Dict[str, Any]]): Extra options to pass to the underlying RPC call.
timeout (Optional[float]): An option to set how long to wait (in seconds) before calling a time-out and closing
the underlying RPC call.
Returns:
str: ExecutionID of the ``move_on_globe()`` call, which can be used to track execution progress.
"""
if extra is None:
extra = {}
request = MoveOnGlobeRequest(
name=self.name,
component_name=component_name,
destination=destination,
movement_sensor_name=movement_sensor_name,
obstacles=obstacles,
heading=heading,
motion_configuration=configuration,
extra=dict_to_struct(extra),
)
response: MoveOnGlobeResponse = await self.client.MoveOnGlobe(request, timeout=timeout)
return response.execution_id
[docs] async def move_on_map(
self,
component_name: ResourceName,
destination: Pose,
slam_service_name: ResourceName,
configuration: Optional[MotionConfiguration] = None,
obstacles: Optional[Iterable[Geometry]] = None,
*,
extra: Optional[Mapping[str, ValueTypes]] = None,
timeout: Optional[float] = None,
) -> str:
"""
Move a component to a specific pose, using a ``SlamService`` for the SLAM map, using a ``SLAM Service`` to check the location.
``move_on_map()`` is non blocking, meaning the motion service will move the component to the destination
Pose point after ``move_on_map()`` returns.
Each successful ``move_on_map()`` call returns a unique ExecutionID which you can use to identify all plans
generated during the ``move_on_map()`` call.
You can monitor the progress of the ``move_on_map()`` call by querying ``get_plan()`` and ``list_plan_statuses()``.
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
# Get the ResourceNames of the base component and SLAM service
my_base_resource_name = Base.get_resource_name("my_base")
my_slam_service_name = SLAMClient.get_resource_name("my_slam_service")
# Define a destination pose with respect to the origin of the map from the SLAM service "my_slam_service"
my_pose = Pose(y=10)
# Move the base component to the destination pose of Y=10, a location of
# (0, 10, 0) in respect to the origin of the map
execution_id = await motion.move_on_map(component_name=my_base_resource_name,
destination=my_pose,
slam_service_name=my_slam_service_name)
Args:
component_name (ResourceName): The ResourceName of the base to move.
destination (Pose): The destination, which can be any Pose with respect to the SLAM map’s origin.
slam_service_name (ResourceName): The ResourceName of the SLAM service from which the SLAM map is requested.
configuration (Optional[MotionConfiguration]): The configuration you want to set across this machine for this motion service.
This parameter and each of its fields are optional.
- obstacle_detectors (Iterable[ObstacleDetector]): The names of each vision service and camera resource pair you want to use
for transient obstacle avoidance.
- position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine.
- obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles.
- plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan.
- linear_m_per_sec (float): Linear velocity this machine should target when moving.
- angular_degs_per_sec (float): Angular velocity this machine should target when turning.
obstacles (Optional[Iterable[Geometry]]): Obstacles to be considered for motion planning.
extra (Optional[Dict[str, Any]]): Extra options to pass to the underlying RPC call.
timeout (Optional[float]): An option to set how long to wait (in seconds) before calling a time-out and closing the underlying
RPC call.
Returns:
str: ExecutionID of the ``move_on_map()`` call, which can be used to track execution progress.
"""
if extra is None:
extra = {}
request = MoveOnMapRequest(
name=self.name,
destination=destination,
component_name=component_name,
slam_service_name=slam_service_name,
motion_configuration=configuration,
obstacles=obstacles,
extra=dict_to_struct(extra),
)
response: MoveOnMapResponse = await self.client.MoveOnMap(request, timeout=timeout)
return response.execution_id
[docs] async def stop_plan(
self,
component_name: ResourceName,
*,
extra: Optional[Mapping[str, ValueTypes]] = None,
timeout: Optional[float] = None,
):
"""Stop a component being moved by an in progress ``move_on_globe()`` or ``move_on_map()`` call.
::
# Assuming a `move_on_globe()` started the execution
# Stop the base component which was instructed to move by `move_on_globe()`
# or `move_on_map()`
my_base_resource_name = Base.get_resource_name("my_base")
await motion.stop_plan(component_name=mvmnt_sensor)
Args:
component_name (ResourceName): The component to stop
"""
if extra is None:
extra = {}
request = StopPlanRequest(
name=self.name,
component_name=component_name,
extra=dict_to_struct(extra),
)
_: StopPlanResponse = await self.client.StopPlan(request, timeout=timeout)
return
[docs] async def get_plan(
self,
component_name: ResourceName,
last_plan_only: bool = False,
execution_id: Optional[str] = None,
*,
extra: Optional[Mapping[str, ValueTypes]] = None,
timeout: Optional[float] = None,
) -> GetPlanResponse:
"""By default: returns the plan history of the most recent ``move_on_globe()`` or ``move_on_map()`` call to move a component.
The plan history for executions before the most recent can be requested by providing an ExecutionID in the request.
Returns a result if both of the following conditions are met:
- the execution (call to ``move_on_globe()`` or ``move_on_map()``) is still executing **or** changed state within the last 24 hours
- the robot has not reinitialized
Plans never change.
Replans always create new plans.
Replans share the ExecutionID of the previously executing plan.
All repeated fields are in time ascending order.
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
my_base_resource_name = Base.get_resource_name("my_base")
# Get the plan(s) of the base component which was instructed to move by `MoveOnGlobe()` or `MoveOnMap()`
resp = await motion.get_plan(component_name=my_base_resource_name)
Args:
component_name (ResourceName): The component to stop
last_plan_only (Optional[bool]): If supplied, the response will only return the last plan for the component / execution
execution_id (Optional[str]): If supplied, the response will only return plans with the provided execution_id
Returns:
``GetPlanResponse`` (GetPlanResponse): The current PlanWithStatus & replan history which matches the request
"""
if extra is None:
extra = {}
request = GetPlanRequest(
name=self.name,
component_name=component_name,
last_plan_only=last_plan_only,
execution_id=execution_id,
extra=dict_to_struct(extra),
)
response: GetPlanResponse = await self.client.GetPlan(request, timeout=timeout)
return response
[docs] async def list_plan_statuses(
self,
only_active_plans: bool = False,
*,
extra: Optional[Mapping[str, ValueTypes]] = None,
timeout: Optional[float] = None,
) -> ListPlanStatusesResponse:
"""Returns the statuses of plans created by `move_on_globe()` or ``move_on_map()`` calls that meet at least one of the following
conditions since the motion service initialized:
- the plan's status is in progress
- the plan's status changed state within the last 24 hours
All repeated fields are in chronological order.
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
# List the plan statuses of the motion service within the TTL
resp = await motion.list_plan_statuses()
Args:
only_active_plans (Optional[bool]): If supplied, the response will filter out any plans that are not executing
Returns:
``ListPlanStatusesResponse`` (ListPlanStatusesResponse): List of last known statuses with the
associated IDs of all plans within the TTL ordered by timestamp in ascending order
"""
if extra is None:
extra = {}
request = ListPlanStatusesRequest(
name=self.name,
only_active_plans=only_active_plans,
extra=dict_to_struct(extra),
)
response: ListPlanStatusesResponse = await self.client.ListPlanStatuses(request, timeout=timeout)
return response
[docs] async def get_pose(
self,
component_name: ResourceName,
destination_frame: str,
supplemental_transforms: Optional[List[Transform]] = None,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> PoseInFrame:
"""
Get the Pose and observer frame for any given component on a robot. A ``component_name`` can be created like this::
component_name = Arm.get_resource_name("arm")
Note that the example uses the ``Arm`` class, but any component class that inherits from ``ComponentBase`` will work
(``Base``, ``Gripper``, etc).
::
from viam.components.gripper import Gripper
from viam.services.motion import MotionClient
# Assume that the connect function is written and will return a valid machine.
robot = await connect()
motion = MotionClient.from_robot(robot=robot, name="builtin")
gripperName = Gripper.get_resource_name("my_gripper")
gripperPoseInWorld = await motion.get_pose(component_name=gripperName,
destination_frame="world")
Args:
component_name (viam.proto.common.ResourceName): Name of a component on a robot.
destination_frame (str): Name of the desired reference frame.
supplemental_transforms (Optional[List[viam.proto.common.Transform]]): Transforms used to augment the robot's frame while
calculating pose.
Returns:
``Pose`` (PoseInFrame): Pose of the given component and the frame in which it was observed.
"""
if extra is None:
extra = {}
request = GetPoseRequest(
name=self.name,
component_name=component_name,
destination_frame=destination_frame,
supplemental_transforms=supplemental_transforms,
extra=dict_to_struct(extra),
)
response: GetPoseResponse = await self.client.GetPose(request, timeout=timeout)
return response.pose
[docs] async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **__) -> Mapping[str, ValueTypes]:
"""Send/receive arbitrary commands
::
# Access the motion service
motion = MotionClient.from_robot(robot=robot, name="builtin")
my_command = {
"command": "dosomething",
"someparameter": 52
}
await motion.do_command(my_command)
Args:
command (Dict[str, ValueTypes]): The command to execute
Returns:
Dict[str, ValueTypes]: Result of the executed command
"""
request = DoCommandRequest(name=self.name, command=dict_to_struct(command))
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout)
return struct_to_dict(response.result)