Source code for viam.services.slam.client

from typing import List, Mapping, Optional

from grpclib.client import Channel

from viam.proto.common import DoCommandRequest, DoCommandResponse
from viam.proto.service.slam import (
    GetInternalStateRequest,
    GetInternalStateResponse,
    GetPointCloudMapRequest,
    GetPointCloudMapResponse,
    GetPositionRequest,
    GetPositionResponse,
    GetPropertiesRequest,
    SLAMServiceStub,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict

from . import Pose
from .slam import SLAM


[docs]class SLAMClient(SLAM, ReconfigurableResourceRPCClientBase): """ Connect to the SLAMService, which allows the robot to create a map of its surroundings and find its location in that map. """ client: SLAMServiceStub def __init__(self, name: str, channel: Channel): self.channel = channel self.client = SLAMServiceStub(channel) super().__init__(name)
[docs] async def get_position(self, *, timeout: Optional[float] = None, **kwargs) -> Pose: md = kwargs.get("metadata", self.Metadata()).proto request = GetPositionRequest(name=self.name) response: GetPositionResponse = await self.client.GetPosition(request, timeout=timeout, metadata=md) return response.pose
[docs] async def get_point_cloud_map(self, return_edited_map: bool = False, *, timeout: Optional[float] = None, **kwargs) -> List[bytes]: md = kwargs.get("metadata", self.Metadata()).proto request = GetPointCloudMapRequest(name=self.name, return_edited_map=return_edited_map) response: List[GetPointCloudMapResponse] = await self.client.GetPointCloudMap(request, timeout=timeout, metadata=md) return [r.point_cloud_pcd_chunk for r in response]
[docs] async def get_internal_state(self, *, timeout: Optional[float] = None, **kwargs) -> List[bytes]: md = kwargs.get("metadata", self.Metadata()).proto request = GetInternalStateRequest(name=self.name) response: List[GetInternalStateResponse] = await self.client.GetInternalState(request, timeout=timeout, metadata=md) return [r.internal_state_chunk for r in response]
[docs] async def get_properties(self, *, timeout: Optional[float] = None, **kwargs) -> SLAM.Properties: md = kwargs.get("metadata", self.Metadata()).proto request = GetPropertiesRequest(name=self.name) return await self.client.GetProperties(request, timeout=timeout, metadata=md)
[docs] async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]: md = kwargs.get("metadata", self.Metadata()).proto request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) return struct_to_dict(response.result)