Source code for viam.components.gantry.service

from grpclib.server import Stream

from viam.proto.common import DoCommandRequest, DoCommandResponse, GetGeometriesRequest, GetGeometriesResponse
from viam.proto.component.gantry import (
    GantryServiceBase,
    GetLengthsRequest,
    GetLengthsResponse,
    GetPositionRequest,
    GetPositionResponse,
    HomeRequest,
    HomeResponse,
    IsMovingRequest,
    IsMovingResponse,
    MoveToPositionRequest,
    MoveToPositionResponse,
    StopRequest,
    StopResponse,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict

from .gantry import Gantry


[docs]class GantryRPCService(GantryServiceBase, ResourceRPCServiceBase[Gantry]): """ gRPC Service for a Gantry """ RESOURCE_TYPE = Gantry
[docs] async def GetPosition(self, stream: Stream[GetPositionRequest, GetPositionResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name gantry = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None position = await gantry.get_position(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata) response = GetPositionResponse(positions_mm=position) await stream.send_message(response)
[docs] async def MoveToPosition(self, stream: Stream[MoveToPositionRequest, MoveToPositionResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name gantry = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None await gantry.move_to_position( list(request.positions_mm), list(request.speeds_mm_per_sec), extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata, ) response = MoveToPositionResponse() await stream.send_message(response)
[docs] async def Home(self, stream: Stream[HomeRequest, HomeResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name gantry = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None homed = await gantry.home(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata) response = HomeResponse(homed=homed) await stream.send_message(response)
[docs] async def GetLengths(self, stream: Stream[GetLengthsRequest, GetLengthsResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name gantry = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None lengths = await gantry.get_lengths(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata) response = GetLengthsResponse(lengths_mm=lengths) await stream.send_message(response)
[docs] async def Stop(self, stream: Stream[StopRequest, StopResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name gantry = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None await gantry.stop(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata) response = StopResponse() await stream.send_message(response)
[docs] async def IsMoving(self, stream: Stream[IsMovingRequest, IsMovingResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name gantry = self.get_resource(name) is_moving = await gantry.is_moving() response = IsMovingResponse(is_moving=is_moving) await stream.send_message(response)
[docs] async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None: request = await stream.recv_message() assert request is not None gantry = self.get_resource(request.name) timeout = stream.deadline.time_remaining() if stream.deadline else None result = await gantry.do_command(command=struct_to_dict(request.command), timeout=timeout, metadata=stream.metadata) response = DoCommandResponse(result=dict_to_struct(result)) await stream.send_message(response)
[docs] async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None: request = await stream.recv_message() assert request is not None arm = self.get_resource(request.name) timeout = stream.deadline.time_remaining() if stream.deadline else None geometries = await arm.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout) response = GetGeometriesResponse(geometries=geometries) await stream.send_message(response)