Source code for viam.services.generic.client

from typing import Any, Mapping, Optional

from grpclib import GRPCError, Status
from grpclib.client import Channel

from viam.proto.common import DoCommandRequest, DoCommandResponse
from viam.proto.service.generic import GenericServiceStub
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase, ResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict

from .generic import Generic


[docs]class GenericClient(Generic, ReconfigurableResourceRPCClientBase): """ gRPC client for the Generic service. """ def __init__(self, name: str, channel: Channel): self.channel = channel self.client = GenericServiceStub(channel) super().__init__(name)
[docs] async def do_command( self, command: Mapping[str, Any], *, timeout: Optional[float] = None, **kwargs, ) -> Mapping[str, Any]: md = kwargs.get("metadata", self.Metadata()).proto request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) try: response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) except GRPCError as e: if e.status == Status.UNIMPLEMENTED: raise NotImplementedError() raise e return struct_to_dict(response.result)
[docs]async def do_command( channel: Channel, name: str, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs ) -> Mapping[str, ValueTypes]: """Convenience method to allow service clients to execute ``do_command`` functions Args: channel (Channel): A gRPC channel name (str): The name of the component command (Dict[str, Any]): The command to execute Returns: Dict[str, Any]: The result of the executed command """ md = kwargs.get("metadata", ResourceRPCClientBase.Metadata()).proto client = GenericClient(name, channel) return await client.do_command(command, timeout=timeout, metadata=md)