Source code for viam.services.motion.client

from typing import Any, Mapping, Optional, Sequence

from grpclib.client import Channel

from viam.proto.common import (
    DoCommandRequest,
    DoCommandResponse,
    GeoGeometry,
    Geometry,
    GeoPoint,
    Pose,
    PoseInFrame,
    ResourceName,
    Transform,
    WorldState,
)
from viam.proto.service.motion import (
    Constraints,
    GetPlanRequest,
    GetPlanResponse,
    GetPoseRequest,
    GetPoseResponse,
    ListPlanStatusesRequest,
    ListPlanStatusesResponse,
    MotionConfiguration,
    MotionServiceStub,
    MoveOnGlobeRequest,
    MoveOnGlobeResponse,
    MoveOnMapRequest,
    MoveOnMapResponse,
    MoveRequest,
    MoveResponse,
    PlanStatusWithID,
    StopPlanRequest,
    StopPlanResponse,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict

from .motion import Motion


[docs]class MotionClient(Motion, ReconfigurableResourceRPCClientBase): """ gRPC client for the Motion service. """ client: MotionServiceStub def __init__(self, name: str, channel: Channel): self.channel = channel self.client = MotionServiceStub(channel) super().__init__(name)
[docs] async def move( self, component_name: ResourceName, destination: PoseInFrame, world_state: Optional[WorldState] = None, constraints: Optional[Constraints] = None, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ) -> bool: md = kwargs.get("metadata", self.Metadata()).proto request = MoveRequest( name=self.name, destination=destination, component_name=component_name, world_state=world_state, constraints=constraints, extra=dict_to_struct(extra), ) response: MoveResponse = await self.client.Move(request, timeout=timeout, metadata=md) return response.success
[docs] async def move_on_globe( self, component_name: ResourceName, destination: GeoPoint, movement_sensor_name: ResourceName, obstacles: Optional[Sequence[GeoGeometry]] = None, heading: Optional[float] = None, configuration: Optional[MotionConfiguration] = None, *, bounding_regions: Optional[Sequence[GeoGeometry]] = None, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None, **kwargs, ) -> str: md = kwargs.get("metadata", self.Metadata()).proto request = MoveOnGlobeRequest( name=self.name, component_name=component_name, destination=destination, movement_sensor_name=movement_sensor_name, obstacles=obstacles, heading=heading, motion_configuration=configuration, bounding_regions=bounding_regions, extra=dict_to_struct(extra), ) response: MoveOnGlobeResponse = await self.client.MoveOnGlobe(request, timeout=timeout, metadata=md) return response.execution_id
[docs] async def move_on_map( self, component_name: ResourceName, destination: Pose, slam_service_name: ResourceName, configuration: Optional[MotionConfiguration] = None, obstacles: Optional[Sequence[Geometry]] = None, *, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None, **kwargs, ) -> str: md = kwargs.get("metadata", self.Metadata()).proto request = MoveOnMapRequest( name=self.name, destination=destination, component_name=component_name, slam_service_name=slam_service_name, motion_configuration=configuration, obstacles=obstacles, extra=dict_to_struct(extra), ) response: MoveOnMapResponse = await self.client.MoveOnMap(request, timeout=timeout, metadata=md) return response.execution_id
[docs] async def stop_plan( self, component_name: ResourceName, *, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = StopPlanRequest( name=self.name, component_name=component_name, extra=dict_to_struct(extra), ) _: StopPlanResponse = await self.client.StopPlan(request, timeout=timeout, metadata=md) return
[docs] async def get_plan( self, component_name: ResourceName, last_plan_only: bool = False, execution_id: Optional[str] = None, *, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None, **kwargs, ) -> GetPlanResponse: md = kwargs.get("metadata", self.Metadata()).proto request = GetPlanRequest( name=self.name, component_name=component_name, last_plan_only=last_plan_only, execution_id=execution_id, extra=dict_to_struct(extra), ) response: GetPlanResponse = await self.client.GetPlan(request, timeout=timeout, metadata=md) return response
[docs] async def list_plan_statuses( self, only_active_plans: bool = False, *, extra: Optional[Mapping[str, ValueTypes]] = None, timeout: Optional[float] = None, **kwargs, ) -> Sequence[PlanStatusWithID]: md = kwargs.get("metadata", self.Metadata()).proto request = ListPlanStatusesRequest( name=self.name, only_active_plans=only_active_plans, extra=dict_to_struct(extra), ) response: ListPlanStatusesResponse = await self.client.ListPlanStatuses(request, timeout=timeout, metadata=md) return response.plan_statuses_with_ids
[docs] async def get_pose( self, component_name: ResourceName, destination_frame: str, supplemental_transforms: Optional[Sequence[Transform]] = None, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ) -> PoseInFrame: md = kwargs.get("metadata", self.Metadata()).proto request = GetPoseRequest( name=self.name, component_name=component_name, destination_frame=destination_frame, supplemental_transforms=supplemental_transforms, extra=dict_to_struct(extra), ) response: GetPoseResponse = await self.client.GetPose(request, timeout=timeout, metadata=md) return response.pose
[docs] async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]: md = kwargs.get("metadata", self.Metadata()).proto request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) return struct_to_dict(response.result)