Source code for viam.services.discovery.client

from typing import Any, List, Mapping, Optional

from grpclib.client import Channel

from viam.proto.app.robot import ComponentConfig
from viam.proto.common import DoCommandRequest, DoCommandResponse
from viam.proto.service.discovery import (
    DiscoverResourcesRequest,
    DiscoverResourcesResponse,
    DiscoveryServiceStub,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict

from .discovery import Discovery


[docs]class DiscoveryClient(Discovery, ReconfigurableResourceRPCClientBase): """ Connect to the Discovery service, which allows you to discover resources on a machine. """ client: DiscoveryServiceStub def __init__(self, name: str, channel: Channel): super().__init__(name) self.channel = channel self.client = DiscoveryServiceStub(channel)
[docs] async def discover_resources( self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs, ) -> List[ComponentConfig]: md = kwargs.get("metadata", self.Metadata()).proto request = DiscoverResourcesRequest( name=self.name, extra=dict_to_struct(extra), ) response: DiscoverResourcesResponse = await self.client.DiscoverResources(request, timeout=timeout, metadata=md) return list(response.discoveries)
[docs] async def do_command( self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs, ) -> Mapping[str, ValueTypes]: md = kwargs.get("metadata", self.Metadata()).proto request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) return struct_to_dict(response.result)