viam.services.motion

Submodules

Package Contents

Classes

MotionConfiguration

Abstract base class for protocol messages.

MotionClient

Motion is a Viam service that coordinates motion planning across all of the components in a given robot.

class viam.services.motion.MotionConfiguration(*, obstacle_detectors: collections.abc.Iterable[global___ObstacleDetector] | None = ..., position_polling_frequency_hz: float | None = ..., obstacle_polling_frequency_hz: float | None = ..., plan_deviation_m: float | None = ..., linear_m_per_sec: float | None = ..., angular_degs_per_sec: float | None = ...)

Bases: google.protobuf.message.Message

Abstract base class for protocol messages.

Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below.

property obstacle_detectors: google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ObstacleDetector]

The ObstacleDetectors that will be used for transient obstacle avoidance

position_polling_frequency_hz: float

Sets the frequency to poll for the position of the robot

obstacle_polling_frequency_hz: float

Sets the frequency to poll the vision service(s) for new obstacles

plan_deviation_m: float

Sets the distance in meters that a robot is allowed to deviate from the motion plan

linear_m_per_sec: float

Optional linear velocity to target when moving

angular_degs_per_sec: float

Optional angular velocity to target when turning

HasField(field_name: Literal[_angular_degs_per_sec, b'_angular_degs_per_sec', _linear_m_per_sec, b'_linear_m_per_sec', _obstacle_polling_frequency_hz, b'_obstacle_polling_frequency_hz', _plan_deviation_m, b'_plan_deviation_m', _position_polling_frequency_hz, b'_position_polling_frequency_hz', angular_degs_per_sec, b'angular_degs_per_sec', linear_m_per_sec, b'linear_m_per_sec', obstacle_polling_frequency_hz, b'obstacle_polling_frequency_hz', plan_deviation_m, b'plan_deviation_m', position_polling_frequency_hz, b'position_polling_frequency_hz']) bool

Checks if a certain field is set for the message.

For a oneof group, checks if any field inside is set. Note that if the field_name is not defined in the message descriptor, ValueError will be raised.

Parameters:

field_name (str) – The name of the field to check for presence.

Returns:

Whether a value has been set for the named field.

Return type:

bool

Raises:

ValueError – if the field_name is not a member of this message.

WhichOneof(oneof_group: Literal[_angular_degs_per_sec, b'_angular_degs_per_sec']) Literal[angular_degs_per_sec] | None
WhichOneof(oneof_group: Literal[_linear_m_per_sec, b'_linear_m_per_sec']) Literal[linear_m_per_sec] | None
WhichOneof(oneof_group: Literal[_obstacle_polling_frequency_hz, b'_obstacle_polling_frequency_hz']) Literal[obstacle_polling_frequency_hz] | None
WhichOneof(oneof_group: Literal[_plan_deviation_m, b'_plan_deviation_m']) Literal[plan_deviation_m] | None
WhichOneof(oneof_group: Literal[_position_polling_frequency_hz, b'_position_polling_frequency_hz']) Literal[position_polling_frequency_hz] | None

Returns the name of the field that is set inside a oneof group.

If no field is set, returns None.

Parameters:

oneof_group (str) – the name of the oneof group to check.

Returns:

The name of the group that is set, or None.

Return type:

str or None

Raises:

ValueError – no group with the given name exists

class viam.services.motion.MotionClient(name: str, channel: grpclib.client.Channel)[source]

Bases: viam.services.service_client_base.ServiceClientBase, viam.resource.rpc_client_base.ReconfigurableResourceRPCClientBase

Motion is a Viam service that coordinates motion planning across all of the components in a given robot.

The motion planning service calculates a valid path that avoids self collision by default. If additional constraints are supplied in the world_state message, the motion planning service will also account for those.

SUBTYPE: Final
client: viam.proto.service.motion.MotionServiceStub
async move(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.PoseInFrame, world_state: viam.proto.common.WorldState | None = None, constraints: viam.proto.service.motion.Constraints | None = None, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None) bool[source]

Plan and execute a movement to move the component specified to its goal destination.

Note: Frames designated with respect to components can also be used as the component_name when calling for a move. This technique allows for planning and moving the frame itself to the destination. To do so, simply create a resource name with originating ReferenceFrame’s name. Then pass in the resource name into component_name. Ex:

resource_name = Arm.get_resource_name("externalFrame")
success = await MotionServiceClient.move(resource_name, ...)
motion = MotionClient.from_robot(robot=robot, name="builtin")

# Assumes a gripper configured with name "my_gripper" on the machine
gripper_name = Gripper.get_resource_name("my_gripper")
my_frame = "my_gripper_offset"

goal_pose = Pose(x=0, y=0, z=300, o_x=0, o_y=0, o_z=1, theta=0)

# Move the gripper
moved = await motion.move(component_name=gripper_name,
                      destination=PoseInFrame(reference_frame="myFrame",
                                              pose=goal_pose),
                      world_state=worldState,
                      constraints={},
                      extra={})
Parameters:
Returns:

Whether the move was successful

Return type:

bool

async move_on_globe(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.GeoPoint, movement_sensor_name: viam.proto.common.ResourceName, obstacles: Sequence[viam.proto.common.GeoObstacle] | None = None, heading: float | None = None, configuration: viam.proto.service.motion.MotionConfiguration | None = None, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) str[source]

Move a component to a specific latitude and longitude, using a MovementSensor to check the location.

move_on_globe() is non blocking, meaning the motion service will move the component to the destination GPS point after move_on_globe() returns.

Each successful move_on_globe() call retuns a unique ExectionID which you can use to identify all plans generated durring the move_on_globe() call.

You can monitor the progress of the move_on_globe() call by querying get_plan() and list_plan_statuses().

motion = MotionClient.from_robot(robot=robot, name="builtin")

# Get the ResourceNames of the base and movement sensor
my_base_resource_name = Base.get_resource_name("my_base")
mvmnt_sensor_resource_name = MovementSensor.get_resource_name(
    "my_movement_sensor")
#  Define a destination GeoPoint at the GPS coordinates [0, 0]
my_destination = movement_sensor.GeoPoint(latitude=0, longitude=0)

# Move the base component to the designated geographic location, as reported by the movement sensor
execution_id = await motion.move_on_globe(
    component_name=my_base_resource_name,
    destination=my_destination,
    movement_sensor_name=mvmnt_sensor_resource_name)
Parameters:
  • component_name (ResourceName) – The ResourceName of the base to move.

  • destination (GeoPoint) – The location of the component’s destination, represented in geographic notation as a

  • GeoPoint (lat, lng) –

  • movement_sensor_name (ResourceName) – The ResourceName of the movement sensor that you want to use to check

  • location. (the machine’s) –

  • obstacles (Optional[Sequence[GeoObstacle]]) – Obstacles to consider when planning the motion of the component,

  • GeoObstacle. (with each represented as a) –

    • Default: None

  • heading (Optional[float]) – The compass heading, in degrees, that the machine’s movement sensor should report

  • point. (at the destination) – Range: [0-360) 0: North, 90: East, 180: South, 270: West Default: None

  • configuration (Optional[MotionConfiguration]) – The configuration you want to set across this machine for this

  • optional. (motion service. This parameter and each of its fields are) – obstacle_detectors (Iterable[ObstacleDetector]): The names of each vision service and camera resource pair you want to use for transient obstacle avoidance. position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine. obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles. plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan. linear_m_per_sec (float): Linear velocity this machine should target when moving. angular_degs_per_sec (float): Angular velocity this machine should target when turning.

  • extra (Optional[Dict[str, Any]]) – Extra options to pass to the underlying RPC call.

  • timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing

  • call. (the underlying RPC) –

Returns:

ExecutionID of the move_on_globe() call, which can be used to track execution progress.

Return type:

str

async move_on_map(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.Pose, slam_service_name: viam.proto.common.ResourceName, configuration: viam.proto.service.motion.MotionConfiguration | None = None, obstacles: Iterable[viam.proto.common.Geometry] | None = None, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) str[source]

Move a component to a specific pose, using a SlamService for the SLAM map, using a SLAM Service to check the location.

move_on_map() is non blocking, meaning the motion service will move the component to the destination Pose point after move_on_map() returns.

Each successful move_on_map() call retuns a unique ExectionID which you can use to identify all plans generated durring the move_on_map() call.

You can monitor the progress of the move_on_map() call by querying get_plan() and list_plan_statuses().

motion = MotionClient.from_robot(robot=robot, name="builtin")

# Get the ResourceNames of the base component and SLAM service
my_base_resource_name = Base.get_resource_name("my_base")
my_slam_service_name = SLAMClient.get_resource_name("my_slam_service")

# Define a destination pose with respect to the origin of the map from the SLAM service "my_slam_service"
my_pose = Pose(y=10)

# Move the base component to the destination pose of Y=10, a location of
# (0, 10, 0) in respect to the origin of the map
execution_id = await motion.move_on_map(component_name=my_base_resource_name,
                                        destination=my_pose,
                                        slam_service_name=my_slam_service_name)
Parameters:
  • component_name (ResourceName) – The ResourceName of the base to move.

  • destination (Pose) – The destination, which can be any Pose with respect to the SLAM map’s origin.

  • slam_service_name (ResourceName) – The ResourceName of the SLAM service from which the SLAM map is requested.

  • configuration (Optional[MotionConfiguration]) – The configuration you want to set across this machine for this motion service.

  • optional. (This parameter and each of its fields are) – obstacle_detectors (Iterable[ObstacleDetector]): The names of each vision service and camera resource pair you want to use for transient obstacle avoidance. position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine. obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles. plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan. linear_m_per_sec (float): Linear velocity this machine should target when moving. angular_degs_per_sec (float): Angular velocity this machine should target when turning.

  • obstacles (Optional[Iterable[Geometry]]) – Obstacles to be considered for motion planning.

  • extra (Optional[Dict[str, Any]]) – Extra options to pass to the underlying RPC call.

  • timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying

  • call. (RPC) –

Returns:

ExecutionID of the move_on_map() call, which can be used to track execution progress.

Return type:

str

async stop_plan(component_name: viam.proto.common.ResourceName, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None)[source]

Stop a component being moved by an in progress move_on_globe() or move_on_map() call.

# Assuming a `move_on_globe()` started the execution
# Stop the base component which was instructed to move by `move_on_globe()`
# or `move_on_map()`
my_base_resource_name = Base.get_resource_name("my_base")
await motion.stop_plan(component_name=mvmnt_sensor)
Parameters:

component_name (ResourceName) – The component to stop

Returns:

None

async get_plan(component_name: viam.proto.common.ResourceName, last_plan_only: bool = False, execution_id: str | None = None, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) viam.proto.service.motion.GetPlanResponse[source]

By default: returns the plan history of the most recent move_on_globe() or move_on_map() call to move a component.

The plan history for executions before the most recent can be requested by providing an ExecutionID in the request.

Returns a result if both of the following conditions are met:

  • the execution (call to move_on_globe() or move_on_map()) is still executing or changed state within the last 24 hours

  • the robot has not reinitialized

Plans never change.

Replans always create new plans.

Replans share the ExecutionID of the previously executing plan.

All repeated fields are in time ascending order.

motion = MotionClient.from_robot(robot=robot, name="builtin")
my_base_resource_name = Base.get_resource_name("my_base")
# Get the plan(s) of the base component which was instructed to move by `MoveOnGlobe()` or `MoveOnMap()`
resp = await motion.get_plan(component_name=my_base_resource_name)
Parameters:
  • component_name (ResourceName) – The component to stop

  • last_plan_only (Optional[bool]) – If supplied, the response will only return the last plan for the component / execution

  • execution_id (Optional[str]) – If supplied, the response will only return plans with the provided execution_id

Returns:

The current PlanWithStatus & replan history which matches the request

Return type:

GetPlanResponse (GetPlanResponse)

async list_plan_statuses(only_active_plans: bool = False, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) viam.proto.service.motion.ListPlanStatusesResponse[source]

Returns the statuses of plans created by move_on_globe() or move_on_map() calls that meet at least one of the following conditions since the motion service initialized:

  • the plan’s status is in progress

  • the plan’s status changed state within the last 24 hours

All repeated fields are in chronological order.

motion = MotionClient.from_robot(robot=robot, name="builtin")
# List the plan statuses of the motion service within the TTL
resp = await motion.list_plan_statuses()
Parameters:

only_active_plans (Optional[bool]) – If supplied, the response will filter out any plans that are not executing

Returns:

List of last known statuses with the associated IDs of all plans within the TTL ordered by timestamp in ascending order

Return type:

ListPlanStatusesResponse (ListPlanStatusesResponse)

async get_pose(component_name: viam.proto.common.ResourceName, destination_frame: str, supplemental_transforms: List[viam.proto.common.Transform] | None = None, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None) viam.proto.common.PoseInFrame[source]

Get the Pose and observer frame for any given component on a robot. A component_name can be created like this:

component_name = Arm.get_resource_name("arm")

Note that the example uses the Arm class, but any component class that inherits from ComponentBase will work (Base, Gripper, etc).

from viam.components.gripper import Gripper
from viam.services.motion import MotionClient

# Assume that the connect function is written and will return a valid machine.
robot = await connect()

motion = MotionClient.from_robot(robot=robot, name="builtin")
gripperName = Gripper.get_resource_name("my_gripper")
gripperPoseInWorld = await motion.get_pose(component_name=gripperName,
                                        destination_frame="world")
Parameters:
  • component_name (viam.proto.common.ResourceName) – Name of a component on a robot.

  • destination_frame (str) – Name of the desired reference frame.

  • supplemental_transforms (Optional[List[viam.proto.common.Transform]]) – Transforms used to augment the robot’s frame while calculating pose.

Returns:

Pose of the given component and the frame in which it was observed.

Return type:

Pose (PoseInFrame)

async do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: float | None = None, **__) Mapping[str, viam.utils.ValueTypes][source]

Send/receive arbitrary commands

# Access the motion service
motion = MotionClient.from_robot(robot=robot, name="builtin")

my_command = {
  "command": "dosomething",
  "someparameter": 52
}

await motion.do_command(my_command)
Parameters:

command (Dict[str, ValueTypes]) – The command to execute

Returns:

Result of the executed command

Return type:

Dict[str, ValueTypes]

classmethod from_robot(robot: viam.robot.client.RobotClient, name: str = 'builtin') typing_extensions.Self

Get the service client named name from the provided robot.

Parameters:
  • robot (RobotClient) – The robot

  • name (str) – The name of the service client

Returns:

The service client, if it exists on the robot

Return type:

Self

classmethod get_resource_name(name: str) viam.proto.common.ResourceName

Get the ResourceName for this Resource with the given name

# Can be used with any resource, using an arm as an example
my_arm_name = my_arm.get_resource_name("my_arm")
Parameters:

name (str) – The name of the Resource

get_operation(kwargs: Mapping[str, Any]) viam.operations.Operation

Get the Operation associated with the currently running function.

When writing custom resources, you should get the Operation by calling this function and check to see if it’s cancelled. If the Operation is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ).

Parameters:

kwargs (Mapping[str, Any]) – The kwargs object containing the operation

Returns:

The operation associated with this function

Return type:

viam.operations.Operation

async close()

Safely shut down the resource and prevent further use.

Close must be idempotent. Later configuration may allow a resource to be “open” again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called.

await component.close()