viam.services.motion.motion

Classes

Motion

Motion is a Viam service that coordinates motion planning across all of the components in a given robot.

Module Contents

class viam.services.motion.motion.Motion(name: str, *, logger: logging.Logger | None = None)[source]

Bases: viam.services.service_base.ServiceBase

Motion is a Viam service that coordinates motion planning across all of the components in a given robot.

The motion planning service calculates a valid path that avoids self collision by default. If additional constraints are supplied in the world_state message, the motion planning service will also account for those.

For more information, see Motion service.

Plan: TypeAlias = GetPlanResponse
SUBTYPE: Final

The Subtype of the Resource

abstract move(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.PoseInFrame, world_state: viam.proto.common.WorldState | None = None, constraints: viam.proto.service.motion.Constraints | None = None, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None) bool[source]
Async:

Plan and execute a movement to move the component specified to its goal destination.

Note: Frames designated with respect to components can also be used as the component_name when calling for a move. This technique allows for planning and moving the frame itself to the destination. To do so, simply create a resource name with originating ReferenceFrame’s name. Then pass in the resource name into component_name. Ex:

resource_name = Arm.get_resource_name("externalFrame")
success = await MotionServiceClient.move(resource_name, ...)
motion = MotionClient.from_robot(robot=machine, name="builtin")

# Assumes a gripper configured with name "my_gripper" on the machine
gripper_name = Gripper.get_resource_name("my_gripper")
my_frame = "my_gripper_offset"

goal_pose = Pose(x=0, y=0, z=300, o_x=0, o_y=0, o_z=1, theta=0)

# Move the gripper
moved = await motion.move(component_name=gripper_name,
                      destination=PoseInFrame(reference_frame="myFrame",
                                              pose=goal_pose),
                      world_state=worldState,
                      constraints={},
                      extra={})
Parameters:
Returns:

Whether the move was successful.

Return type:

bool

For more information, see Motion service.

abstract move_on_globe(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.GeoPoint, movement_sensor_name: viam.proto.common.ResourceName, obstacles: Sequence[viam.proto.common.GeoGeometry] | None = None, heading: float | None = None, configuration: viam.proto.service.motion.MotionConfiguration | None = None, *, bounding_regions: Sequence[viam.proto.common.GeoGeometry] | None = None, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) str[source]
Async:

Move a component to a specific latitude and longitude, using a MovementSensor to check the location.

move_on_globe() is non blocking, meaning the motion service will move the component to the destination GPS point after move_on_globe() returns.

Each successful move_on_globe() call returns a unique ExecutionID which you can use to identify all plans generated during the move_on_globe() call.

You can monitor the progress of the move_on_globe() call by querying get_plan() and list_plan_statuses().

motion = MotionClient.from_robot(robot=machine, name="builtin")

# Get the ResourceNames of the base and movement sensor
my_base_resource_name = Base.get_resource_name("my_base")
mvmnt_sensor_resource_name = MovementSensor.get_resource_name(
    "my_movement_sensor")
#  Define a destination GeoPoint at the GPS coordinates [0, 0]
my_destination = movement_sensor.GeoPoint(latitude=0, longitude=0)

# Move the base component to the designated geographic location, as reported by the movement sensor
execution_id = await motion.move_on_globe(
    component_name=my_base_resource_name,
    destination=my_destination,
    movement_sensor_name=mvmnt_sensor_resource_name)
Parameters:
  • component_name (ResourceName) – The ResourceName of the base to move.

  • destination (GeoPoint) – The location of the component’s destination, represented in geographic notation as a GeoPoint (lat, lng).

  • movement_sensor_name (ResourceName) – The ResourceName of the movement sensor that you want to use to check the machine’s location.

  • obstacles (Optional[Sequence[GeoGeometry]]) – Obstacles to consider when planning the motion of the component, with each represented as a GeoGeometry. Default: None

  • heading (Optional[float]) – The compass heading, in degrees, that the machine’s movement sensor should report at the destination point. Range: [0-360) 0: North, 90: East, 180: South, 270: West. Default: None

  • configuration (Optional[MotionConfiguration]) –

    The configuration you want to set across this machine for this motion service. This parameter and each of its fields are optional.

    • obstacle_detectors (Sequence[ObstacleDetector]): The names of each vision service and camera resource pair

    you want to use for transient obstacle avoidance.

    • position_polling_frequency_hz (float): The frequency in Hz to poll the position of the machine.

    • obstacle_polling_frequency_hz (float): The frequency in Hz to poll the vision service for new obstacles.

    • plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan.

    • linear_m_per_sec (float): Linear velocity this machine should target when moving.

    • angular_degs_per_sec (float): Angular velocity this machine should target when turning.

  • bounding_regions (Optional[Sequence[GeoGeometry]]) – Set of obstacles which the robot must remain within while navigating

  • extra (Optional[Dict[str, Any]]) – Extra options to pass to the underlying RPC call.

  • timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.

Returns:

ExecutionID of the move_on_globe() call, which can be used to track execution progress.

Return type:

str

For more information, see Motion service.

abstract move_on_map(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.Pose, slam_service_name: viam.proto.common.ResourceName, configuration: viam.proto.service.motion.MotionConfiguration | None = None, obstacles: Sequence[viam.proto.common.Geometry] | None = None, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) str[source]
Async:

Move a component to a specific pose, using a SlamService for the SLAM map, using a SLAM Service to check the location.

move_on_map() is non blocking, meaning the motion service will move the component to the destination Pose point after move_on_map() returns.

Each successful move_on_map() call returns a unique ExecutionID which you can use to identify all plans generated during the move_on_map() call.

You can monitor the progress of the move_on_map() call by querying get_plan() and list_plan_statuses().

motion = MotionClient.from_robot(robot=machine, name="builtin")

# Get the ResourceNames of the base component and SLAM service
my_base_resource_name = Base.get_resource_name("my_base")
my_slam_service_name = SLAMClient.get_resource_name("my_slam_service")

# Define a destination pose with respect to the origin of the map from the SLAM service "my_slam_service"
my_pose = Pose(y=10)

# Move the base component to the destination pose of Y=10, a location of
# (0, 10, 0) in respect to the origin of the map
execution_id = await motion.move_on_map(component_name=my_base_resource_name,
                                        destination=my_pose,
                                        slam_service_name=my_slam_service_name)
Parameters:
  • component_name (ResourceName) – The ResourceName of the base to move.

  • destination (Pose) – The destination, which can be any Pose with respect to the SLAM map’s origin.

  • slam_service_name (ResourceName) – The ResourceName of the SLAM service from which the SLAM map is requested.

  • configuration (Optional[MotionConfiguration]) –

    The configuration you want to set across this machine for this motion service. This parameter and each of its fields are optional.

    • obstacle_detectors (Sequence[ObstacleDetector]): The names of each vision service and camera resource pair you want to use

    for transient obstacle avoidance.

    • position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine.

    • obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles.

    • plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan.

    • linear_m_per_sec (float): Linear velocity this machine should target when moving.

    • angular_degs_per_sec (float): Angular velocity this machine should target when turning.

  • obstacles (Optional[Sequence[Geometry]]) – Obstacles to be considered for motion planning.

  • extra (Optional[Dict[str, Any]]) – Extra options to pass to the underlying RPC call.

  • timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.

Returns:

ExecutionID of the move_on_map() call, which can be used to track execution progress.

Return type:

str

For more information, see Motion service.

abstract stop_plan(component_name: viam.proto.common.ResourceName, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None)[source]
Async:

Stop a component being moved by an in progress move_on_globe() or move_on_map() call.

motion = MotionClient.from_robot(robot=machine, name="builtin")

# Assuming a `move_on_globe()` started the execution
# Stop the base component which was instructed to move by `move_on_globe()`
# or `move_on_map()`
my_base_resource_name = Base.get_resource_name("my_base")
await motion.stop_plan(component_name=mvmnt_sensor)
Parameters:

component_name (ResourceName) – The component to stop

For more information, see Motion service.

abstract get_plan(component_name: viam.proto.common.ResourceName, last_plan_only: bool = False, execution_id: str | None = None, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) Plan[source]
Async:

By default: returns the plan history of the most recent move_on_globe() or move_on_map() call to move a component.

The plan history for executions before the most recent can be requested by providing an ExecutionID in the request.

Returns a result if both of the following conditions are met:

  • the execution (call to move_on_globe() or move_on_map()) is still executing or changed state within the last 24 hours

  • the robot has not reinitialized

Plans never change.

Replans always create new plans.

Replans share the ExecutionID of the previously executing plan.

All repeated fields are in time ascending order.

motion = MotionClient.from_robot(robot=machine, name="builtin")
my_base_resource_name = Base.get_resource_name("my_base")
# Get the plan(s) of the base component which was instructed to move by `MoveOnGlobe()` or `MoveOnMap()`
resp = await motion.get_plan(component_name=my_base_resource_name)
Parameters:
  • component_name (ResourceName) – The component to stop

  • last_plan_only (Optional[bool]) – If supplied, the response will only return the last plan for the component / execution.

  • execution_id (Optional[str]) – If supplied, the response will only return plans with the provided execution_id.

Returns:

The current PlanWithStatus & replan history which matches the request

Return type:

GetPlanResponse (GetPlanResponse)

For more information, see Motion service.

abstract list_plan_statuses(only_active_plans: bool = False, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) Sequence[viam.proto.service.motion.PlanStatusWithID][source]
Async:

Returns the statuses of plans created by move_on_globe() or move_on_map() calls that meet at least one of the following conditions since the motion service initialized:

  • the plan’s status is in progress

  • the plan’s status changed state within the last 24 hours

All repeated fields are in chronological order.

motion = MotionClient.from_robot(robot=machine, name="builtin")
# List the plan statuses of the motion service within the TTL
resp = await motion.list_plan_statuses()
Parameters:

only_active_plans (Optional[bool]) – If supplied, the response will filter out any plans that are not executing.

Returns:

List of last known statuses with the associated IDs of all plans within the TTL ordered by timestamp in ascending order.

Return type:

ListPlanStatusesResponse (ListPlanStatusesResponse)

For more information, see Motion service.

abstract get_pose(component_name: viam.proto.common.ResourceName, destination_frame: str, supplemental_transforms: Sequence[viam.proto.common.Transform] | None = None, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None) viam.proto.common.PoseInFrame[source]
Async:

Get the Pose and observer frame for any given component on a robot.

# Note that the example uses the ``Arm`` class, but any component class that inherits from ``ComponentBase`` will work
# (``Base``, ``Gripper``, etc).

# Create a `component_name`:
component_name = Arm.get_resource_name("arm")

from viam.components.gripper import Gripper
from viam.services.motion import MotionClient

# Assume that the connect function is written and will return a valid machine.
robot = await connect()

motion = MotionClient.from_robot(robot=machine, name="builtin")
gripperName = Gripper.get_resource_name("my_gripper")
gripperPoseInWorld = await motion.get_pose(component_name=gripperName,
                                        destination_frame="world")
Parameters:
  • component_name (viam.proto.common.ResourceName) – Name of a component on a robot.

  • destination_frame (str) – Name of the desired reference frame.

  • supplemental_transforms (Optional[List[viam.proto.common.Transform]]) – Transforms used to augment the robot’s frame while calculating pose.

Returns:

Pose of the given component and the frame in which it was observed.

Return type:

Pose (PoseInFrame)

For more information, see Motion service.

classmethod from_robot(robot: viam.robot.client.RobotClient, name: str) typing_extensions.Self

Get the service named name from the provided robot.

async def connect() -> RobotClient:
    # Replace "<API-KEY>" (including brackets) with your API key and "<API-KEY-ID>" with your API key ID
    options = RobotClient.Options.with_api_key("<API-KEY>", "<API-KEY-ID>")
    # Replace "<MACHINE-URL>" (included brackets) with your machine's connection URL or FQDN
    return await RobotClient.at_address("<MACHINE-URL>", options)

async def main():
    robot = await connect()

    # Can be used with any resource, using the motion service as an example
    motion = MotionClient.from_robot(robot=machine, name="builtin")

    robot.close()
Parameters:
  • robot (RobotClient) – The robot

  • name (str) – The name of the service

Returns:

The service, if it exists on the robot

Return type:

Self

abstract do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: float | None = None, **kwargs) Mapping[str, viam.utils.ValueTypes]
Async:

Send/receive arbitrary commands.

service = SERVICE.from_robot(robot=machine, "builtin")  # replace SERVICE with the appropriate class

my_command = {
  "cmnd": "dosomething",
  "someparameter": 52
}

# Can be used with any resource, using the motion service as an example
await service.do_command(command=my_command)
Parameters:

command (Dict[str, ValueTypes]) – The command to execute

Returns:

Result of the executed command

Return type:

Dict[str, ValueTypes]

classmethod get_resource_name(name: str) viam.proto.common.ResourceName

Get the ResourceName for this Resource with the given name

# Can be used with any resource, using an arm as an example
my_arm_name = Arm.get_resource_name("my_arm")
Parameters:

name (str) – The name of the Resource

Returns:

The ResourceName of this Resource

Return type:

ResourceName

get_operation(kwargs: Mapping[str, Any]) viam.operations.Operation

Get the Operation associated with the currently running function.

When writing custom resources, you should get the Operation by calling this function and check to see if it’s cancelled. If the Operation is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ).

Parameters:

kwargs (Mapping[str, Any]) – The kwargs object containing the operation

Returns:

The operation associated with this function

Return type:

viam.operations.Operation

async close()

Safely shut down the resource and prevent further use.

Close must be idempotent. Later configuration may allow a resource to be “open” again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called.

await component.close()