:py:mod:`viam.services.motion` ============================== .. py:module:: viam.services.motion Submodules ---------- .. toctree:: :titlesonly: :maxdepth: 1 client/index.rst Package Contents ---------------- Classes ~~~~~~~ .. autoapisummary:: viam.services.motion.MotionConfiguration viam.services.motion.MotionClient .. py:class:: MotionConfiguration(*, obstacle_detectors: collections.abc.Iterable[global___ObstacleDetector] | None = ..., position_polling_frequency_hz: float | None = ..., obstacle_polling_frequency_hz: float | None = ..., plan_deviation_m: float | None = ..., linear_m_per_sec: float | None = ..., angular_degs_per_sec: float | None = ...) Bases: :py:obj:`google.protobuf.message.Message` Abstract base class for protocol messages. Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below. .. py:property:: obstacle_detectors :type: google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ObstacleDetector] The ObstacleDetectors that will be used for transient obstacle avoidance .. py:attribute:: position_polling_frequency_hz :type: float Sets the frequency to poll for the position of the robot .. py:attribute:: obstacle_polling_frequency_hz :type: float Sets the frequency to poll the vision service(s) for new obstacles .. py:attribute:: plan_deviation_m :type: float Sets the distance in meters that a robot is allowed to deviate from the motion plan .. py:attribute:: linear_m_per_sec :type: float Optional linear velocity to target when moving .. py:attribute:: angular_degs_per_sec :type: float Optional angular velocity to target when turning .. py:method:: HasField(field_name: Literal[_angular_degs_per_sec, b'_angular_degs_per_sec', _linear_m_per_sec, b'_linear_m_per_sec', _obstacle_polling_frequency_hz, b'_obstacle_polling_frequency_hz', _plan_deviation_m, b'_plan_deviation_m', _position_polling_frequency_hz, b'_position_polling_frequency_hz', angular_degs_per_sec, b'angular_degs_per_sec', linear_m_per_sec, b'linear_m_per_sec', obstacle_polling_frequency_hz, b'obstacle_polling_frequency_hz', plan_deviation_m, b'plan_deviation_m', position_polling_frequency_hz, b'position_polling_frequency_hz']) -> bool Checks if a certain field is set for the message. For a oneof group, checks if any field inside is set. Note that if the field_name is not defined in the message descriptor, :exc:`ValueError` will be raised. :param field_name: The name of the field to check for presence. :type field_name: str :returns: Whether a value has been set for the named field. :rtype: bool :raises ValueError: if the `field_name` is not a member of this message. .. py:method:: WhichOneof(oneof_group: Literal[_angular_degs_per_sec, b'_angular_degs_per_sec']) -> Literal[angular_degs_per_sec] | None WhichOneof(oneof_group: Literal[_linear_m_per_sec, b'_linear_m_per_sec']) -> Literal[linear_m_per_sec] | None WhichOneof(oneof_group: Literal[_obstacle_polling_frequency_hz, b'_obstacle_polling_frequency_hz']) -> Literal[obstacle_polling_frequency_hz] | None WhichOneof(oneof_group: Literal[_plan_deviation_m, b'_plan_deviation_m']) -> Literal[plan_deviation_m] | None WhichOneof(oneof_group: Literal[_position_polling_frequency_hz, b'_position_polling_frequency_hz']) -> Literal[position_polling_frequency_hz] | None Returns the name of the field that is set inside a oneof group. If no field is set, returns None. :param oneof_group: the name of the oneof group to check. :type oneof_group: str :returns: The name of the group that is set, or None. :rtype: str or None :raises ValueError: no group with the given name exists .. py:class:: MotionClient(name: str, channel: grpclib.client.Channel) Bases: :py:obj:`viam.services.service_client_base.ServiceClientBase`, :py:obj:`viam.resource.rpc_client_base.ReconfigurableResourceRPCClientBase` Motion is a Viam service that coordinates motion planning across all of the components in a given robot. The motion planning service calculates a valid path that avoids self collision by default. If additional constraints are supplied in the ``world_state`` message, the motion planning service will also account for those. .. py:attribute:: SUBTYPE :type: Final .. py:attribute:: client :type: viam.proto.service.motion.MotionServiceStub .. py:method:: move(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.PoseInFrame, world_state: Optional[viam.proto.common.WorldState] = None, constraints: Optional[viam.proto.service.motion.Constraints] = None, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> bool :async: Plan and execute a movement to move the component specified to its goal destination. Note: Frames designated with respect to components can also be used as the ``component_name`` when calling for a move. This technique allows for planning and moving the frame itself to the ``destination``. To do so, simply create a resource name with originating ReferenceFrame's name. Then pass in the resource name into ``component_name``. Ex:: resource_name = Arm.get_resource_name("externalFrame") success = await MotionServiceClient.move(resource_name, ...) :: motion = MotionClient.from_robot(robot=robot, name="builtin") # Assumes a gripper configured with name "my_gripper" on the machine gripper_name = Gripper.get_resource_name("my_gripper") my_frame = "my_gripper_offset" goal_pose = Pose(x=0, y=0, z=300, o_x=0, o_y=0, o_z=1, theta=0) # Move the gripper moved = await motion.move(component_name=gripper_name, destination=PoseInFrame(reference_frame="myFrame", pose=goal_pose), world_state=worldState, constraints={}, extra={}) :param component_name: Name of a component on a given robot. :type component_name: viam.proto.common.ResourceName :param destination: The destination to move to, expressed as a ``Pose`` and the frame in which it was observed. :type destination: viam.proto.common.PoseInFrame :param world_state: When supplied, the motion service will create a plan that obeys any constraints expressed in the WorldState message. :type world_state: viam.proto.common.WorldState :param constraints: When supplied, the motion service will create a plan that obeys any specified constraints :type constraints: viam.proto.service.motion.Constraints :returns: Whether the move was successful :rtype: bool .. py:method:: move_on_globe(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.GeoPoint, movement_sensor_name: viam.proto.common.ResourceName, obstacles: Optional[Sequence[viam.proto.common.GeoObstacle]] = None, heading: Optional[float] = None, configuration: Optional[viam.proto.service.motion.MotionConfiguration] = None, *, extra: Optional[Mapping[str, viam.utils.ValueTypes]] = None, timeout: Optional[float] = None) -> str :async: Move a component to a specific latitude and longitude, using a ``MovementSensor`` to check the location. ``move_on_globe()`` is non blocking, meaning the motion service will move the component to the destination GPS point after ``move_on_globe()`` returns. Each successful ``move_on_globe()`` call returns a unique ExecutionID which you can use to identify all plans generated during the ``move_on_globe()`` call. You can monitor the progress of the ``move_on_globe()`` call by querying ``get_plan()`` and ``list_plan_statuses()``. :: motion = MotionClient.from_robot(robot=robot, name="builtin") # Get the ResourceNames of the base and movement sensor my_base_resource_name = Base.get_resource_name("my_base") mvmnt_sensor_resource_name = MovementSensor.get_resource_name( "my_movement_sensor") # Define a destination GeoPoint at the GPS coordinates [0, 0] my_destination = movement_sensor.GeoPoint(latitude=0, longitude=0) # Move the base component to the designated geographic location, as reported by the movement sensor execution_id = await motion.move_on_globe( component_name=my_base_resource_name, destination=my_destination, movement_sensor_name=mvmnt_sensor_resource_name) :param component_name: The ResourceName of the base to move. :type component_name: ResourceName :param destination: The location of the component’s destination, represented in geographic notation as a GeoPoint (lat, lng). :type destination: GeoPoint :param movement_sensor_name: The ResourceName of the movement sensor that you want to use to check the machine’s location. :type movement_sensor_name: ResourceName :param obstacles: Obstacles to consider when planning the motion of the component, with each represented as a GeoObstacle. Default: None :type obstacles: Optional[Sequence[GeoObstacle]] :param heading: The compass heading, in degrees, that the machine’s movement sensor should report at the destination point. Range: [0-360) 0: North, 90: East, 180: South, 270: West. Default: None :type heading: Optional[float] :param configuration: The configuration you want to set across this machine for this motion service. This parameter and each of its fields are optional. - obstacle_detectors (Iterable[ObstacleDetector]): The names of each vision service and camera resource pair you want to use for transient obstacle avoidance. - position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine. - obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles. - plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan. - linear_m_per_sec (float): Linear velocity this machine should target when moving. - angular_degs_per_sec (float): Angular velocity this machine should target when turning. :type configuration: Optional[MotionConfiguration] :param extra: Extra options to pass to the underlying RPC call. :type extra: Optional[Dict[str, Any]] :param timeout: An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call. :type timeout: Optional[float] :returns: ExecutionID of the ``move_on_globe()`` call, which can be used to track execution progress. :rtype: str .. py:method:: move_on_map(component_name: viam.proto.common.ResourceName, destination: viam.proto.common.Pose, slam_service_name: viam.proto.common.ResourceName, configuration: Optional[viam.proto.service.motion.MotionConfiguration] = None, obstacles: Optional[Iterable[viam.proto.common.Geometry]] = None, *, extra: Optional[Mapping[str, viam.utils.ValueTypes]] = None, timeout: Optional[float] = None) -> str :async: Move a component to a specific pose, using a ``SlamService`` for the SLAM map, using a ``SLAM Service`` to check the location. ``move_on_map()`` is non blocking, meaning the motion service will move the component to the destination Pose point after ``move_on_map()`` returns. Each successful ``move_on_map()`` call returns a unique ExecutionID which you can use to identify all plans generated during the ``move_on_map()`` call. You can monitor the progress of the ``move_on_map()`` call by querying ``get_plan()`` and ``list_plan_statuses()``. :: motion = MotionClient.from_robot(robot=robot, name="builtin") # Get the ResourceNames of the base component and SLAM service my_base_resource_name = Base.get_resource_name("my_base") my_slam_service_name = SLAMClient.get_resource_name("my_slam_service") # Define a destination pose with respect to the origin of the map from the SLAM service "my_slam_service" my_pose = Pose(y=10) # Move the base component to the destination pose of Y=10, a location of # (0, 10, 0) in respect to the origin of the map execution_id = await motion.move_on_map(component_name=my_base_resource_name, destination=my_pose, slam_service_name=my_slam_service_name) :param component_name: The ResourceName of the base to move. :type component_name: ResourceName :param destination: The destination, which can be any Pose with respect to the SLAM map’s origin. :type destination: Pose :param slam_service_name: The ResourceName of the SLAM service from which the SLAM map is requested. :type slam_service_name: ResourceName :param configuration: The configuration you want to set across this machine for this motion service. This parameter and each of its fields are optional. - obstacle_detectors (Iterable[ObstacleDetector]): The names of each vision service and camera resource pair you want to use for transient obstacle avoidance. - position_polling_frequency_hz (float): The frequency in hz to poll the position of the machine. - obstacle_polling_frequency_hz (float): The frequency in hz to poll the vision service for new obstacles. - plan_deviation_m (float): The distance in meters that the machine can deviate from the motion plan. - linear_m_per_sec (float): Linear velocity this machine should target when moving. - angular_degs_per_sec (float): Angular velocity this machine should target when turning. :type configuration: Optional[MotionConfiguration] :param obstacles: Obstacles to be considered for motion planning. :type obstacles: Optional[Iterable[Geometry]] :param extra: Extra options to pass to the underlying RPC call. :type extra: Optional[Dict[str, Any]] :param timeout: An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call. :type timeout: Optional[float] :returns: ExecutionID of the ``move_on_map()`` call, which can be used to track execution progress. :rtype: str .. py:method:: stop_plan(component_name: viam.proto.common.ResourceName, *, extra: Optional[Mapping[str, viam.utils.ValueTypes]] = None, timeout: Optional[float] = None) :async: Stop a component being moved by an in progress ``move_on_globe()`` or ``move_on_map()`` call. :: # Assuming a `move_on_globe()` started the execution # Stop the base component which was instructed to move by `move_on_globe()` # or `move_on_map()` my_base_resource_name = Base.get_resource_name("my_base") await motion.stop_plan(component_name=mvmnt_sensor) :param component_name: The component to stop :type component_name: ResourceName .. py:method:: get_plan(component_name: viam.proto.common.ResourceName, last_plan_only: bool = False, execution_id: Optional[str] = None, *, extra: Optional[Mapping[str, viam.utils.ValueTypes]] = None, timeout: Optional[float] = None) -> viam.proto.service.motion.GetPlanResponse :async: By default: returns the plan history of the most recent ``move_on_globe()`` or ``move_on_map()`` call to move a component. The plan history for executions before the most recent can be requested by providing an ExecutionID in the request. Returns a result if both of the following conditions are met: - the execution (call to ``move_on_globe()`` or ``move_on_map()``) is still executing **or** changed state within the last 24 hours - the robot has not reinitialized Plans never change. Replans always create new plans. Replans share the ExecutionID of the previously executing plan. All repeated fields are in time ascending order. :: motion = MotionClient.from_robot(robot=robot, name="builtin") my_base_resource_name = Base.get_resource_name("my_base") # Get the plan(s) of the base component which was instructed to move by `MoveOnGlobe()` or `MoveOnMap()` resp = await motion.get_plan(component_name=my_base_resource_name) :param component_name: The component to stop :type component_name: ResourceName :param last_plan_only: If supplied, the response will only return the last plan for the component / execution :type last_plan_only: Optional[bool] :param execution_id: If supplied, the response will only return plans with the provided execution_id :type execution_id: Optional[str] :returns: The current PlanWithStatus & replan history which matches the request :rtype: ``GetPlanResponse`` (GetPlanResponse) .. py:method:: list_plan_statuses(only_active_plans: bool = False, *, extra: Optional[Mapping[str, viam.utils.ValueTypes]] = None, timeout: Optional[float] = None) -> viam.proto.service.motion.ListPlanStatusesResponse :async: Returns the statuses of plans created by `move_on_globe()` or ``move_on_map()`` calls that meet at least one of the following conditions since the motion service initialized: - the plan's status is in progress - the plan's status changed state within the last 24 hours All repeated fields are in chronological order. :: motion = MotionClient.from_robot(robot=robot, name="builtin") # List the plan statuses of the motion service within the TTL resp = await motion.list_plan_statuses() :param only_active_plans: If supplied, the response will filter out any plans that are not executing :type only_active_plans: Optional[bool] :returns: List of last known statuses with the associated IDs of all plans within the TTL ordered by timestamp in ascending order :rtype: ``ListPlanStatusesResponse`` (ListPlanStatusesResponse) .. py:method:: get_pose(component_name: viam.proto.common.ResourceName, destination_frame: str, supplemental_transforms: Optional[List[viam.proto.common.Transform]] = None, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> viam.proto.common.PoseInFrame :async: Get the Pose and observer frame for any given component on a robot. A ``component_name`` can be created like this:: component_name = Arm.get_resource_name("arm") Note that the example uses the ``Arm`` class, but any component class that inherits from ``ComponentBase`` will work (``Base``, ``Gripper``, etc). :: from viam.components.gripper import Gripper from viam.services.motion import MotionClient # Assume that the connect function is written and will return a valid machine. robot = await connect() motion = MotionClient.from_robot(robot=robot, name="builtin") gripperName = Gripper.get_resource_name("my_gripper") gripperPoseInWorld = await motion.get_pose(component_name=gripperName, destination_frame="world") :param component_name: Name of a component on a robot. :type component_name: viam.proto.common.ResourceName :param destination_frame: Name of the desired reference frame. :type destination_frame: str :param supplemental_transforms: Transforms used to augment the robot's frame while calculating pose. :type supplemental_transforms: Optional[List[viam.proto.common.Transform]] :returns: Pose of the given component and the frame in which it was observed. :rtype: ``Pose`` (PoseInFrame) .. py:method:: do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: Optional[float] = None, **__) -> Mapping[str, viam.utils.ValueTypes] :async: Send/receive arbitrary commands :: # Access the motion service motion = MotionClient.from_robot(robot=robot, name="builtin") my_command = { "command": "dosomething", "someparameter": 52 } await motion.do_command(my_command) :param command: The command to execute :type command: Dict[str, ValueTypes] :returns: Result of the executed command :rtype: Dict[str, ValueTypes] .. py:method:: from_robot(robot: viam.robot.client.RobotClient, name: str = 'builtin') -> typing_extensions.Self :classmethod: Get the service client named ``name`` from the provided robot. :param robot: The robot :type robot: RobotClient :param name: The name of the service client :type name: str :returns: The service client, if it exists on the robot :rtype: Self .. py:method:: get_resource_name(name: str) -> viam.proto.common.ResourceName :classmethod: Get the ResourceName for this Resource with the given name :: # Can be used with any resource, using an arm as an example my_arm_name = my_arm.get_resource_name("my_arm") :param name: The name of the Resource :type name: str :returns: The ResourceName of this Resource :rtype: ResourceName .. py:method:: get_operation(kwargs: Mapping[str, Any]) -> viam.operations.Operation Get the ``Operation`` associated with the currently running function. When writing custom resources, you should get the ``Operation`` by calling this function and check to see if it's cancelled. If the ``Operation`` is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ). :param kwargs: The kwargs object containing the operation :type kwargs: Mapping[str, Any] :returns: The operation associated with this function :rtype: viam.operations.Operation .. py:method:: close() :async: Safely shut down the resource and prevent further use. Close must be idempotent. Later configuration may allow a resource to be "open" again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called. :: await component.close()