Source code for viam.components.audio_input.client

from typing import Any, AsyncIterator, Dict, List, Mapping, Optional, Union

from grpclib.client import Channel

from viam.media import MediaStream, MediaStreamWithIterator
from viam.media.audio import Audio
from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry
from viam.proto.component.audioinput import (
    AudioInputServiceStub,
    ChunksRequest,
    ChunksResponse,
    PropertiesRequest,
    PropertiesResponse,
    SampleFormat,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict

from .audio_input import AudioInput


[docs]class AudioInputClient(AudioInput, ReconfigurableResourceRPCClientBase): """ gRPC client for the AudioInput component. """ def __init__(self, name: str, channel: Channel): self.channel = channel self.client = AudioInputServiceStub(channel) super().__init__(name)
[docs] async def stream(self, *, timeout: Optional[float] = None, **__) -> MediaStream[Audio]: async def read() -> AsyncIterator[Audio]: async with self.client.Chunks.open(timeout=timeout) as chunks_stream: await chunks_stream.send_message( ChunksRequest(name=self.name, sample_format=SampleFormat.SAMPLE_FORMAT_FLOAT32_INTERLEAVED), end=True ) response: Union[ChunksResponse, None] = await chunks_stream.recv_message() if not response: await chunks_stream.recv_trailing_metadata() # causes us to throw appropriate gRPC error. raise TypeError("Response cannot be empty") # we should never get here, but for typechecking assert response.HasField("info") info = response.info while True: response = await chunks_stream.recv_message() if response is None: break assert response.HasField("chunk") audio = Audio(info=info, chunk=response.chunk) yield audio return MediaStreamWithIterator(read())
[docs] async def get_properties(self, *, timeout: Optional[float] = None, **__) -> AudioInput.Properties: request = PropertiesRequest(name=self.name) response: PropertiesResponse = await self.client.Properties(request, timeout=timeout) return AudioInput.Properties.from_proto(response)
[docs] async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **__) -> Mapping[str, ValueTypes]: request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout) return struct_to_dict(response.result)
[docs] async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]: return await get_geometries(self.client, self.name, extra, timeout)