viam.services.motion
Submodules
Package Contents
Classes
Abstract base class for protocol messages. |
|
Motion is a Viam service that coordinates motion planning across all of the components in a given robot. |
- class viam.services.motion.MotionConfiguration(*, obstacle_detectors: collections.abc.Iterable[global___ObstacleDetector] | None = ..., position_polling_frequency_hz: float | None = ..., obstacle_polling_frequency_hz: float | None = ..., plan_deviation_m: float | None = ..., linear_m_per_sec: float | None = ..., angular_degs_per_sec: float | None = ...)
Bases:
google.protobuf.message.Message
Abstract base class for protocol messages.
Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below.
- property obstacle_detectors: google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ObstacleDetector]
The ObstacleDetectors that will be used for transient obstacle avoidance
- position_polling_frequency_hz: float
Sets the frequency to poll for the position of the robot
- obstacle_polling_frequency_hz: float
Sets the frequency to poll the vision service(s) for new obstacles
- plan_deviation_m: float
Sets the distance in meters that a robot is allowed to deviate from the motion plan
- linear_m_per_sec: float
Optional linear velocity to target when moving
- angular_degs_per_sec: float
Optional angular velocity to target when turning
- HasField(field_name: Literal[_angular_degs_per_sec, b'_angular_degs_per_sec', _linear_m_per_sec, b'_linear_m_per_sec', _obstacle_polling_frequency_hz, b'_obstacle_polling_frequency_hz', _plan_deviation_m, b'_plan_deviation_m', _position_polling_frequency_hz, b'_position_polling_frequency_hz', angular_degs_per_sec, b'angular_degs_per_sec', linear_m_per_sec, b'linear_m_per_sec', obstacle_polling_frequency_hz, b'obstacle_polling_frequency_hz', plan_deviation_m, b'plan_deviation_m', position_polling_frequency_hz, b'position_polling_frequency_hz']) bool
Checks if a certain field is set for the message.
For a oneof group, checks if any field inside is set. Note that if the field_name is not defined in the message descriptor,
ValueError
will be raised.- Parameters:
field_name (str) – The name of the field to check for presence.
- Returns:
Whether a value has been set for the named field.
- Return type:
bool
- Raises:
ValueError – if the field_name is not a member of this message.
- WhichOneof(oneof_group: Literal[_angular_degs_per_sec, b'_angular_degs_per_sec']) Literal[angular_degs_per_sec] | None
- WhichOneof(oneof_group: Literal[_linear_m_per_sec, b'_linear_m_per_sec']) Literal[linear_m_per_sec] | None
- WhichOneof(oneof_group: Literal[_obstacle_polling_frequency_hz, b'_obstacle_polling_frequency_hz']) Literal[obstacle_polling_frequency_hz] | None
- WhichOneof(oneof_group: Literal[_plan_deviation_m, b'_plan_deviation_m']) Literal[plan_deviation_m] | None
- WhichOneof(oneof_group: Literal[_position_polling_frequency_hz, b'_position_polling_frequency_hz']) Literal[position_polling_frequency_hz] | None
Returns the name of the field that is set inside a oneof group.
If no field is set, returns None.
- Parameters:
oneof_group (str) – the name of the oneof group to check.
- Returns:
The name of the group that is set, or None.
- Return type:
str or None
- Raises:
ValueError – no group with the given name exists
- class viam.services.motion.MotionClient(name: str, channel: grpclib.client.Channel)[source]
Bases:
viam.services.service_client_base.ServiceClientBase
,viam.resource.rpc_client_base.ReconfigurableResourceRPCClientBase
Motion is a Viam service that coordinates motion planning across all of the components in a given robot.
The motion planning service calculates a valid path that avoids self collision by default. If additional constraints are supplied in the
world_state
message, the motion planning service will also account for those.- SUBTYPE: Final
- async move(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.PoseInFrame, world_state: viam.proto.common.WorldState | None = None, constraints: viam.proto.service.motion.Constraints | None = None, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None) bool [source]
Plan and execute a movement to move the component specified to its goal destination.
Note: Frames designated with respect to components can also be used as the
component_name
when calling for a move. This technique allows for planning and moving the frame itself to thedestination
. To do so, simply create a resource name with originating ReferenceFrame’s name. Then pass in the resource name intocomponent_name
. Ex:resource_name = Arm.get_resource_name("externalFrame") success = await MotionServiceClient.move(resource_name, ...)
motion = MotionClient.from_robot(robot=robot, name="builtin") # Assumes a gripper configured with name "my_gripper" on the machine gripper_name = Gripper.get_resource_name("my_gripper") my_frame = "my_gripper_offset" goal_pose = Pose(x=0, y=0, z=300, o_x=0, o_y=0, o_z=1, theta=0) # Move the gripper moved = await motion.move(component_name=gripper_name, destination=PoseInFrame(reference_frame="myFrame", pose=goal_pose), world_state=worldState, constraints={}, extra={})
- Parameters:
component_name (viam.proto.common.ResourceName) – Name of a component on a given robot.
destination (viam.proto.common.PoseInFrame) – The destination to move to, expressed as a
Pose
and the frame in which it was observed.world_state (viam.proto.common.WorldState) – When supplied, the motion service will create a plan that obeys any contraints expressed in the WorldState message.
constraints (viam.proto.service.motion.Constraints) – When supplied, the motion service will create a plan that obeys any specified constraints
- Returns:
Whether the move was successful
- Return type:
bool
- async move_on_globe(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.GeoPoint, movement_sensor_name: viam.proto.common.ResourceName, obstacles: Sequence[viam.proto.common.GeoObstacle] | None = None, heading: float | None = None, configuration: viam.proto.service.motion.MotionConfiguration | None = None, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) str [source]
Move a component to a specific latitude and longitude, using a
MovementSensor
to check the location.move_on_globe()
is non blocking, meaning the motion service will move the component to the destination GPS point aftermove_on_globe()
returns.Each successful
move_on_globe()
call retuns a unique ExectionID which you can use to identify all plans generated durring themove_on_globe()
call.You can monitor the progress of the
move_on_globe()
call by queryingget_plan()
andlist_plan_statuses()
.motion = MotionClient.from_robot(robot=robot, name="builtin") # Get the ResourceNames of the base and movement sensor my_base_resource_name = Base.get_resource_name("my_base") mvmnt_sensor_resource_name = MovementSensor.get_resource_name( "my_movement_sensor") # Define a destination GeoPoint at the GPS coordinates [0, 0] my_destination = movement_sensor.GeoPoint(latitude=0, longitude=0) # Move the base component to the designated geographic location, as reported by the movement sensor execution_id = await motion.move_on_globe( component_name=my_base_resource_name, destination=my_destination, movement_sensor_name=mvmnt_sensor_resource_name)
- Parameters:
component_name (ResourceName) – The ResourceName of the base to move.
destination (GeoPoint) – The location of the component’s destination, represented in geographic notation as a
GeoPoint (lat, lng) –
movement_sensor_name (ResourceName) – The ResourceName of the movement sensor that you want to use to check
location. (the machine’s) –
obstacles (Optional[Sequence[GeoObstacle]]) – Obstacles to consider when planning the motion of the component,
GeoObstacle. (with each represented as a) –
Default: None
heading (Optional[float]) – The compass heading, in degrees, that the machine’s movement sensor should report
point. (at the destination) – Range: [0-360) 0: North, 90: East, 180: South, 270: West Default: None
configuration (Optional[MotionConfiguration]) – The configuration you want to set across this machine for this
optional. (motion service. This parameter and each of its fields are) – obstacle_detectors (Iterable[ObstacleDetector]): The names of each vision service and camera resource pair you want to use for transient obstacle avoidance. position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine. obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles. plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan. linear_m_per_sec (float): Linear velocity this machine should target when moving. angular_degs_per_sec (float): Angular velocity this machine should target when turning.
extra (Optional[Dict[str, Any]]) – Extra options to pass to the underlying RPC call.
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing
call. (the underlying RPC) –
- Returns:
ExecutionID of the
move_on_globe()
call, which can be used to track execution progress.- Return type:
str
- async move_on_map(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.Pose, slam_service_name: viam.proto.common.ResourceName, configuration: viam.proto.service.motion.MotionConfiguration | None = None, obstacles: Iterable[viam.proto.common.Geometry] | None = None, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) str [source]
Move a component to a specific pose, using a
SlamService
for the SLAM map, using aSLAM Service
to check the location.move_on_map()
is non blocking, meaning the motion service will move the component to the destination Pose point aftermove_on_map()
returns.Each successful
move_on_map()
call retuns a unique ExectionID which you can use to identify all plans generated durring themove_on_map()
call.You can monitor the progress of the
move_on_map()
call by queryingget_plan()
andlist_plan_statuses()
.motion = MotionClient.from_robot(robot=robot, name="builtin") # Get the ResourceNames of the base component and SLAM service my_base_resource_name = Base.get_resource_name("my_base") my_slam_service_name = SLAMClient.get_resource_name("my_slam_service") # Define a destination pose with respect to the origin of the map from the SLAM service "my_slam_service" my_pose = Pose(y=10) # Move the base component to the destination pose of Y=10, a location of # (0, 10, 0) in respect to the origin of the map execution_id = await motion.move_on_map(component_name=my_base_resource_name, destination=my_pose, slam_service_name=my_slam_service_name)
- Parameters:
component_name (ResourceName) – The ResourceName of the base to move.
destination (Pose) – The destination, which can be any Pose with respect to the SLAM map’s origin.
slam_service_name (ResourceName) – The ResourceName of the SLAM service from which the SLAM map is requested.
configuration (Optional[MotionConfiguration]) – The configuration you want to set across this machine for this motion service.
optional. (This parameter and each of its fields are) – obstacle_detectors (Iterable[ObstacleDetector]): The names of each vision service and camera resource pair you want to use for transient obstacle avoidance. position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine. obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles. plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan. linear_m_per_sec (float): Linear velocity this machine should target when moving. angular_degs_per_sec (float): Angular velocity this machine should target when turning.
obstacles (Optional[Iterable[Geometry]]) – Obstacles to be considered for motion planning.
extra (Optional[Dict[str, Any]]) – Extra options to pass to the underlying RPC call.
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying
call. (RPC) –
- Returns:
ExecutionID of the
move_on_map()
call, which can be used to track execution progress.- Return type:
str
- async stop_plan(component_name: viam.proto.common.ResourceName, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None)[source]
Stop a component being moved by an in progress
move_on_globe()
ormove_on_map()
call.# Assuming a `move_on_globe()` started the execution # Stop the base component which was instructed to move by `move_on_globe()` # or `move_on_map()` my_base_resource_name = Base.get_resource_name("my_base") await motion.stop_plan(component_name=mvmnt_sensor)
- Parameters:
component_name (ResourceName) – The component to stop
- Returns:
None
- async get_plan(component_name: viam.proto.common.ResourceName, last_plan_only: bool = False, execution_id: str | None = None, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) viam.proto.service.motion.GetPlanResponse [source]
By default: returns the plan history of the most recent
move_on_globe()
ormove_on_map()
call to move a component.The plan history for executions before the most recent can be requested by providing an ExecutionID in the request.
Returns a result if both of the following conditions are met:
the execution (call to
move_on_globe()
ormove_on_map()
) is still executing or changed state within the last 24 hoursthe robot has not reinitialized
Plans never change.
Replans always create new plans.
Replans share the ExecutionID of the previously executing plan.
All repeated fields are in time ascending order.
motion = MotionClient.from_robot(robot=robot, name="builtin") my_base_resource_name = Base.get_resource_name("my_base") # Get the plan(s) of the base component which was instructed to move by `MoveOnGlobe()` or `MoveOnMap()` resp = await motion.get_plan(component_name=my_base_resource_name)
- Parameters:
component_name (ResourceName) – The component to stop
last_plan_only (Optional[bool]) – If supplied, the response will only return the last plan for the component / execution
execution_id (Optional[str]) – If supplied, the response will only return plans with the provided execution_id
- Returns:
The current PlanWithStatus & replan history which matches the request
- Return type:
GetPlanResponse
(GetPlanResponse)
- async list_plan_statuses(only_active_plans: bool = False, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) viam.proto.service.motion.ListPlanStatusesResponse [source]
Returns the statuses of plans created by move_on_globe() or
move_on_map()
calls that meet at least one of the following conditions since the motion service initialized:the plan’s status is in progress
the plan’s status changed state within the last 24 hours
All repeated fields are in chronological order.
motion = MotionClient.from_robot(robot=robot, name="builtin") # List the plan statuses of the motion service within the TTL resp = await motion.list_plan_statuses()
- Parameters:
only_active_plans (Optional[bool]) – If supplied, the response will filter out any plans that are not executing
- Returns:
List of last known statuses with the associated IDs of all plans within the TTL ordered by timestamp in ascending order
- Return type:
ListPlanStatusesResponse
(ListPlanStatusesResponse)
- async get_pose(component_name: viam.proto.common.ResourceName, destination_frame: str, supplemental_transforms: List[viam.proto.common.Transform] | None = None, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None) viam.proto.common.PoseInFrame [source]
Get the Pose and observer frame for any given component on a robot. A
component_name
can be created like this:component_name = Arm.get_resource_name("arm")
Note that the example uses the
Arm
class, but any component class that inherits fromComponentBase
will work (Base
,Gripper
, etc).from viam.components.gripper import Gripper from viam.services.motion import MotionClient # Assume that the connect function is written and will return a valid machine. robot = await connect() motion = MotionClient.from_robot(robot=robot, name="builtin") gripperName = Gripper.get_resource_name("my_gripper") gripperPoseInWorld = await motion.get_pose(component_name=gripperName, destination_frame="world")
- Parameters:
component_name (viam.proto.common.ResourceName) – Name of a component on a robot.
destination_frame (str) – Name of the desired reference frame.
supplemental_transforms (Optional[List[viam.proto.common.Transform]]) – Transforms used to augment the robot’s frame while calculating pose.
- Returns:
Pose of the given component and the frame in which it was observed.
- Return type:
Pose
(PoseInFrame)
- async do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: float | None = None, **__) Mapping[str, viam.utils.ValueTypes] [source]
Send/receive arbitrary commands
# Access the motion service motion = MotionClient.from_robot(robot=robot, name="builtin") my_command = { "command": "dosomething", "someparameter": 52 } await motion.do_command(my_command)
- Parameters:
command (Dict[str, ValueTypes]) – The command to execute
- Returns:
Result of the executed command
- Return type:
Dict[str, ValueTypes]
- classmethod from_robot(robot: viam.robot.client.RobotClient, name: str = 'builtin') typing_extensions.Self
Get the service client named
name
from the provided robot.- Parameters:
robot (RobotClient) – The robot
name (str) – The name of the service client
- Returns:
The service client, if it exists on the robot
- Return type:
Self
- classmethod get_resource_name(name: str) viam.proto.common.ResourceName
Get the ResourceName for this Resource with the given name
# Can be used with any resource, using an arm as an example my_arm_name = my_arm.get_resource_name("my_arm")
- Parameters:
name (str) – The name of the Resource
- get_operation(kwargs: Mapping[str, Any]) viam.operations.Operation
Get the
Operation
associated with the currently running function.When writing custom resources, you should get the
Operation
by calling this function and check to see if it’s cancelled. If theOperation
is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ).- Parameters:
kwargs (Mapping[str, Any]) – The kwargs object containing the operation
- Returns:
The operation associated with this function
- Return type:
- async close()
Safely shut down the resource and prevent further use.
Close must be idempotent. Later configuration may allow a resource to be “open” again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called.
await component.close()