Source code for viam.components.base.client

from typing import Any, Dict, List, Mapping, Optional

from grpclib.client import Channel

from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry
from viam.proto.component.base import (
    BaseServiceStub,
    GetPropertiesRequest,
    GetPropertiesResponse,
    IsMovingRequest,
    IsMovingResponse,
    MoveStraightRequest,
    SetPowerRequest,
    SetVelocityRequest,
    SpinRequest,
    StopRequest,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict

from . import Base, Vector3


[docs]class BaseClient(Base, ReconfigurableResourceRPCClientBase): """ gRPC client for the Base component. """ def __init__(self, name: str, channel: Channel): self.channel = channel self.client = BaseServiceStub(channel) super().__init__(name)
[docs] async def move_straight( self, distance: int, velocity: float, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = MoveStraightRequest( name=self.name, distance_mm=distance, mm_per_sec=velocity, extra=dict_to_struct(extra), ) await self.client.MoveStraight(request, timeout=timeout, metadata=md)
[docs] async def spin( self, angle: float, velocity: float, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = SpinRequest( name=self.name, angle_deg=angle, degs_per_sec=velocity, extra=dict_to_struct(extra), ) await self.client.Spin(request, timeout=timeout, metadata=md)
[docs] async def set_power( self, linear: Vector3, angular: Vector3, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = SetPowerRequest( name=self.name, linear=linear, angular=angular, extra=dict_to_struct(extra), ) await self.client.SetPower(request, timeout=timeout, metadata=md)
[docs] async def set_velocity( self, linear: Vector3, angular: Vector3, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = SetVelocityRequest(name=self.name, linear=linear, angular=angular, extra=dict_to_struct(extra)) await self.client.SetVelocity(request, timeout=timeout, metadata=md)
[docs] async def stop( self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ): md = kwargs.get("metadata", self.Metadata()).proto request = StopRequest(name=self.name, extra=dict_to_struct(extra)) await self.client.Stop(request, timeout=timeout, metadata=md)
[docs] async def is_moving( self, *, timeout: Optional[float] = None, **kwargs, ) -> bool: md = kwargs.get("metadata", self.Metadata()).proto request = IsMovingRequest(name=self.name) response: IsMovingResponse = await self.client.IsMoving(request, timeout=timeout, metadata=md) return response.is_moving
[docs] async def get_properties( self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ) -> Base.Properties: md = kwargs.get("metadata", self.Metadata()).proto request = GetPropertiesRequest(name=self.name, extra=dict_to_struct(extra)) response: GetPropertiesResponse = await self.client.GetProperties(request, timeout=timeout, metadata=md) return Base.Properties( width_meters=response.width_meters, turning_radius_meters=response.turning_radius_meters, wheel_circumference_meters=response.wheel_circumference_meters, )
[docs] async def do_command( self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs, ) -> Mapping[str, ValueTypes]: md = kwargs.get("metadata", self.Metadata()).proto request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) return struct_to_dict(response.result)
[docs] async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]: md = kwargs.get("metadata", self.Metadata()) return await get_geometries(self.client, self.name, extra, timeout, md)