Source code for viam.services.slam.service

from grpclib.server import Stream

from viam.proto.common import DoCommandRequest, DoCommandResponse
from viam.proto.service.slam import (
    GetInternalStateRequest,
    GetInternalStateResponse,
    GetPointCloudMapRequest,
    GetPointCloudMapResponse,
    GetPositionRequest,
    GetPositionResponse,
    GetPropertiesRequest,
    GetPropertiesResponse,
    SLAMServiceBase,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict

from .slam import SLAM


[docs]class SLAMRPCService(SLAMServiceBase, ResourceRPCServiceBase): """ gRPC Service for a SLAM service """ RESOURCE_TYPE = SLAM
[docs] async def GetInternalState(self, stream: Stream[GetInternalStateRequest, GetInternalStateResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name slam = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None chunks = await slam.get_internal_state(timeout=timeout) for chunk in chunks: response = GetInternalStateResponse(internal_state_chunk=chunk) await stream.send_message(response)
[docs] async def GetPointCloudMap(self, stream: Stream[GetPointCloudMapRequest, GetPointCloudMapResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name slam = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None chunks = await slam.get_point_cloud_map(return_edited_map=request.return_edited_map, timeout=timeout) for chunk in chunks: response = GetPointCloudMapResponse(point_cloud_pcd_chunk=chunk) await stream.send_message(response)
[docs] async def GetPosition(self, stream: Stream[GetPositionRequest, GetPositionResponse]) -> None: request = await stream.recv_message() assert request is not None name = request.name slam = self.get_resource(name) timeout = stream.deadline.time_remaining() if stream.deadline else None position = await slam.get_position(timeout=timeout) response = GetPositionResponse(pose=position) await stream.send_message(response)
[docs] async def GetProperties(self, stream: Stream[GetPropertiesRequest, GetPropertiesResponse]) -> None: request = await stream.recv_message() assert request is not None slam = self.get_resource(request.name) timeout = stream.deadline.time_remaining() if stream.deadline else None properties = await slam.get_properties(timeout=timeout) await stream.send_message(properties)
[docs] async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None: request = await stream.recv_message() assert request is not None slam = self.get_resource(request.name) timeout = stream.deadline.time_remaining() if stream.deadline else None result = await slam.do_command(command=struct_to_dict(request.command), timeout=timeout, metadata=stream.metadata) response = DoCommandResponse(result=dict_to_struct(result)) await stream.send_message(response)