from typing import Any, Dict, List, Mapping, Optional
import uuid
from grpclib.client import Channel
from viam.proto.component.audioin import GetAudioRequest, GetAudioResponse
from viam.proto.common import (
DoCommandRequest,
DoCommandResponse,
GetPropertiesRequest,
Geometry)
from grpclib.client import Stream as ClientStream
from viam.proto.component.audioin import AudioInServiceStub
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.streams import StreamWithIterator
from .audio_in import AudioIn
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict
[docs]class AudioInClient(AudioIn, ReconfigurableResourceRPCClientBase):
def __init__(self, name: str, channel: Channel) -> None:
self.channel = channel
self.client = AudioInServiceStub(channel)
super().__init__(name)
[docs] async def get_audio(self,
codec:str,
duration_seconds: float,
previous_timestamp_ns:int,
*,
extra: Optional[Dict[str, Any]] = None,
**kwargs,
):
request = GetAudioRequest(name=self.name, codec = codec,
duration_seconds=duration_seconds,
previous_timestamp_nanoseconds = previous_timestamp_ns,
request_id = str(uuid.uuid4()),
extra=dict_to_struct(extra))
async def read():
md = kwargs.get("metadata", self.Metadata()).proto
audio_stream: ClientStream[GetAudioRequest, GetAudioResponse]
async with self.client.GetAudio.open(metadata=md) as audio_stream:
try:
await audio_stream.send_message(request, end=True)
async for response in audio_stream:
yield response
except Exception as e:
raise (e)
return StreamWithIterator(read())
[docs] async def get_properties(
self,
*,
timeout: Optional[float] = None,
**kwargs,
) -> AudioIn.Properties:
md = kwargs.get("metadata", self.Metadata()).proto
return await self.client.GetProperties(GetPropertiesRequest(name=self.name), timeout=timeout, metadata=md)
[docs] async def do_command(
self,
command: Mapping[str, ValueTypes],
*,
timeout: Optional[float] = None,
**kwargs,
) -> Mapping[str, ValueTypes]:
md = kwargs.get("metadata", self.Metadata()).proto
request = DoCommandRequest(name=self.name, command=dict_to_struct(command))
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md)
return struct_to_dict(response.result)
[docs] async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]:
md = kwargs.get("metadata", self.Metadata())
return await get_geometries(self.client, self.name, extra, timeout, md)