from typing import Any, Dict, List, Mapping, Optional, Tuple
from grpclib.client import Channel
from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry
from viam.proto.component.motor import (
GetPositionRequest,
GetPositionResponse,
GetPropertiesRequest,
GetPropertiesResponse,
GoForRequest,
GoToRequest,
IsMovingRequest,
IsMovingResponse,
IsPoweredRequest,
IsPoweredResponse,
MotorServiceStub,
ResetZeroPositionRequest,
SetPowerRequest,
SetRPMRequest,
StopRequest,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict
from .motor import Motor
[docs]class MotorClient(Motor, ReconfigurableResourceRPCClientBase):
"""
gRPC client for the Motor component.
"""
def __init__(self, name: str, channel: Channel):
self.channel = channel
self.client = MotorServiceStub(channel)
super().__init__(name)
[docs] async def set_power(
self,
power: float,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
md = kwargs.get("metadata", self.Metadata()).proto
request = SetPowerRequest(name=self.name, power_pct=power, extra=dict_to_struct(extra))
await self.client.SetPower(request, timeout=timeout, metadata=md)
[docs] async def go_for(
self,
rpm: float,
revolutions: float,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
md = kwargs.get("metadata", self.Metadata()).proto
request = GoForRequest(name=self.name, rpm=rpm, revolutions=revolutions, extra=dict_to_struct(extra))
await self.client.GoFor(request, timeout=timeout, metadata=md)
[docs] async def go_to(
self,
rpm: float,
position_revolutions: float,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
md = kwargs.get("metadata", self.Metadata()).proto
request = GoToRequest(name=self.name, rpm=rpm, position_revolutions=position_revolutions, extra=dict_to_struct(extra))
await self.client.GoTo(request, timeout=timeout, metadata=md)
[docs] async def set_rpm(
self,
rpm: float,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
md = kwargs.get("metadata", self.Metadata()).proto
request = SetRPMRequest(name=self.name, rpm=rpm, extra=dict_to_struct(extra))
await self.client.SetRPM(request, timeout=timeout, metadata=md)
[docs] async def reset_zero_position(
self,
offset: float,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
md = kwargs.get("metadata", self.Metadata()).proto
request = ResetZeroPositionRequest(name=self.name, offset=offset, extra=dict_to_struct(extra))
await self.client.ResetZeroPosition(request, timeout=timeout, metadata=md)
[docs] async def get_position(
self,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> float:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetPositionRequest(name=self.name, extra=dict_to_struct(extra))
response: GetPositionResponse = await self.client.GetPosition(request, timeout=timeout, metadata=md)
return response.position
[docs] async def get_properties(
self,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Motor.Properties:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetPropertiesRequest(name=self.name, extra=dict_to_struct(extra))
response: GetPropertiesResponse = await self.client.GetProperties(request, timeout=timeout, metadata=md)
return Motor.Properties(position_reporting=response.position_reporting)
[docs] async def stop(
self,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
md = kwargs.get("metadata", self.Metadata()).proto
request = StopRequest(name=self.name, extra=dict_to_struct(extra))
await self.client.Stop(request, timeout=timeout, metadata=md)
[docs] async def is_powered(
self,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Tuple[bool, float]:
md = kwargs.get("metadata", self.Metadata()).proto
request = IsPoweredRequest(name=self.name, extra=dict_to_struct(extra))
response: IsPoweredResponse = await self.client.IsPowered(request, timeout=timeout, metadata=md)
return response.is_on, response.power_pct
[docs] async def is_moving(self, *, timeout: Optional[float] = None, **kwargs) -> bool:
md = kwargs.get("metadata", self.Metadata()).proto
request = IsMovingRequest(name=self.name)
response: IsMovingResponse = await self.client.IsMoving(request, timeout=timeout, metadata=md)
return response.is_moving
[docs] async def do_command(
self,
command: Mapping[str, ValueTypes],
*,
timeout: Optional[float] = None,
**kwargs,
) -> Mapping[str, ValueTypes]:
md = kwargs.get("metadata", self.Metadata()).proto
request = DoCommandRequest(name=self.name, command=dict_to_struct(command))
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md)
return struct_to_dict(response.result)
[docs] async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]:
md = kwargs.get("metadata", self.Metadata())
return await get_geometries(self.client, self.name, extra, timeout, md)