Source code for viam.components.audio_in.client

import uuid
from typing import Any, Dict, List, Mapping, Optional

from grpclib.client import Channel
from grpclib.client import Stream as ClientStream

from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry, GetPropertiesRequest
from viam.proto.component.audioin import AudioInServiceStub, GetAudioRequest, GetAudioResponse
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.streams import StreamWithIterator
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict

from .audio_in import AudioIn


[docs]class AudioInClient(AudioIn, ReconfigurableResourceRPCClientBase): def __init__(self, name: str, channel: Channel) -> None: self.channel = channel self.client = AudioInServiceStub(channel) super().__init__(name)
[docs] async def get_audio( self, codec: str, duration_seconds: float, previous_timestamp_ns: int, *, extra: Optional[Dict[str, Any]] = None, **kwargs, ): request = GetAudioRequest( name=self.name, codec=codec, duration_seconds=duration_seconds, previous_timestamp_nanoseconds=previous_timestamp_ns, request_id=str(uuid.uuid4()), extra=dict_to_struct(extra), ) async def read(): md = kwargs.get("metadata", self.Metadata()).proto audio_stream: ClientStream[GetAudioRequest, GetAudioResponse] async with self.client.GetAudio.open(metadata=md) as audio_stream: try: await audio_stream.send_message(request, end=True) async for response in audio_stream: yield response except Exception as e: raise (e) return StreamWithIterator(read())
[docs] async def get_properties( self, *, timeout: Optional[float] = None, **kwargs, ) -> AudioIn.Properties: md = kwargs.get("metadata", self.Metadata()).proto return await self.client.GetProperties(GetPropertiesRequest(name=self.name), timeout=timeout, metadata=md)
[docs] async def do_command( self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs, ) -> Mapping[str, ValueTypes]: md = kwargs.get("metadata", self.Metadata()).proto request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md) return struct_to_dict(response.result)
[docs] async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]: md = kwargs.get("metadata", self.Metadata()) return await get_geometries(self.client, self.name, extra, timeout, md)