from typing import Any, Dict, Mapping, Optional
from grpclib.client import Channel
from viam.proto.common import DoCommandRequest, DoCommandResponse
from viam.proto.component.servo import (
GetPositionRequest,
GetPositionResponse,
IsMovingRequest,
IsMovingResponse,
MoveRequest,
ServoServiceStub,
StopRequest,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict
from .servo import Servo
[docs]class ServoClient(Servo, ReconfigurableResourceRPCClientBase):
"""
gRPC client for the Servo component.
"""
def __init__(self, name: str, channel: Channel):
self.channel = channel
self.client = ServoServiceStub(channel)
super().__init__(name)
[docs] async def get_position(self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> int:
if extra is None:
extra = {}
request = GetPositionRequest(name=self.name, extra=dict_to_struct(extra))
response: GetPositionResponse = await self.client.GetPosition(request, timeout=timeout)
return response.position_deg
[docs] async def move(self, angle: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None):
if extra is None:
extra = {}
request = MoveRequest(name=self.name, angle_deg=angle, extra=dict_to_struct(extra))
await self.client.Move(request, timeout=timeout)
[docs] async def stop(self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None):
if extra is None:
extra = {}
request = StopRequest(name=self.name, extra=dict_to_struct(extra))
await self.client.Stop(request, timeout=timeout)
[docs] async def is_moving(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> bool:
if extra is None:
extra = {}
request = IsMovingRequest(name=self.name)
response: IsMovingResponse = await self.client.IsMoving(request, timeout=timeout)
return response.is_moving
[docs] async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None) -> Mapping[str, ValueTypes]:
request = DoCommandRequest(name=self.name, command=dict_to_struct(command))
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout)
return struct_to_dict(response.result)