Source code for viam.components.arm.client

from typing import Any, Dict, List, Mapping, Optional

from grpclib.client import Channel

from viam.components import KinematicsReturn
from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry, GetKinematicsRequest, GetKinematicsResponse
from viam.proto.component.arm import (
    ArmServiceStub,
    GetEndPositionRequest,
    GetEndPositionResponse,
    GetJointPositionsRequest,
    GetJointPositionsResponse,
    IsMovingRequest,
    IsMovingResponse,
    JointPositions,
    MoveToJointPositionsRequest,
    MoveToPositionRequest,
    StopRequest,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict

from . import Arm, Pose


[docs]class ArmClient(Arm, ReconfigurableResourceRPCClientBase): """ gRPC client for an Arm component. Used to communicate with an existing ``Arm`` implementation over gRPC. """ def __init__(self, name: str, channel: Channel): self.channel = channel self.client = ArmServiceStub(channel) super().__init__(name)
[docs] async def get_end_position( self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ) -> Pose: md = kwargs.get("metadata", self.Metadata()).proto request = GetEndPositionRequest(name=self.name, extra=dict_to_struct(extra)) response: GetEndPositionResponse = await self.client.GetEndPosition(request, timeout=timeout, metadata=md) return response.pose
[docs] async def move_to_position( self, pose: Pose, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = MoveToPositionRequest(name=self.name, to=pose, extra=dict_to_struct(extra)) await self.client.MoveToPosition(request, timeout=timeout, metadata=md)
[docs] async def get_joint_positions( self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ) -> JointPositions: md = kwargs.get("metadata", self.Metadata()).proto request = GetJointPositionsRequest(name=self.name, extra=dict_to_struct(extra)) response: GetJointPositionsResponse = await self.client.GetJointPositions(request, timeout=timeout, metadata=md) return response.positions
[docs] async def move_to_joint_positions( self, positions: JointPositions, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = MoveToJointPositionsRequest(name=self.name, positions=positions, extra=dict_to_struct(extra)) await self.client.MoveToJointPositions(request, timeout=timeout, metadata=md)
[docs] async def stop( self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = StopRequest(name=self.name, extra=dict_to_struct(extra)) await self.client.Stop(request, timeout=timeout, metadata=md)
[docs] async def is_moving(self, *, timeout: Optional[float] = None, **kwargs) -> bool: md = kwargs.get("metadata", self.Metadata()).proto request = IsMovingRequest(name=self.name) response: IsMovingResponse = await self.client.IsMoving(request, timeout=timeout, metadata=md) return response.is_moving
[docs] async def do_command( self, command: Mapping[str, Any], *, timeout: Optional[float] = None, **kwargs, ) -> Mapping[str, ValueTypes]: md = kwargs.get("metadata", self.Metadata()).proto request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) return struct_to_dict(response.result)
[docs] async def get_kinematics( self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs ) -> KinematicsReturn: md = kwargs.get("metadata", self.Metadata()).proto request = GetKinematicsRequest(name=self.name, extra=dict_to_struct(extra)) response: GetKinematicsResponse = await self.client.GetKinematics(request, timeout=timeout, metadata=md) return (response.format, response.kinematics_data, response.meshes_by_urdf_filepath)
[docs] async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]: md = kwargs.get("metadata", self.Metadata()) return await get_geometries(self.client, self.name, extra, timeout, md)