Source code for viam.components.audio_input.service
import wave
from datetime import timedelta
from io import BytesIO
from google.api.httpbody_pb2 import HttpBody # type: ignore
from grpclib import GRPCError, Status
from grpclib.server import Stream
from viam.errors import NotSupportedError
from viam.proto.common import DoCommandRequest, DoCommandResponse, GetGeometriesRequest, GetGeometriesResponse
from viam.proto.component.audioinput import (
AudioInputServiceBase,
ChunksRequest,
ChunksResponse,
PropertiesRequest,
PropertiesResponse,
RecordRequest,
SampleFormat,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict
from .audio_input import AudioInput
[docs]class AudioInputRPCService(AudioInputServiceBase, ResourceRPCServiceBase[AudioInput]):
"""
gRPC Service for a generic AudioInput
"""
RESOURCE_TYPE = AudioInput
[docs] async def Chunks(self, stream: Stream[ChunksRequest, ChunksResponse]) -> None:
request = await stream.recv_message()
assert request is not None
audio_input = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
audio_stream = await audio_input.stream(timeout=timeout, metadata=stream.metadata)
first_chunk = await audio_stream.__anext__()
await stream.send_message(ChunksResponse(info=first_chunk.info))
await stream.send_message(ChunksResponse(chunk=first_chunk.chunk))
async for audio in audio_stream:
await stream.send_message(ChunksResponse(chunk=audio.chunk))
[docs] async def Properties(self, stream: Stream[PropertiesRequest, PropertiesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
audio_input = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
response = (await audio_input.get_properties(timeout=timeout, metadata=stream.metadata)).proto
await stream.send_message(response)
[docs] async def Record(self, stream: Stream[RecordRequest, HttpBody]) -> None: # pyright: ignore [reportInvalidTypeForm]
raise NotSupportedError("Recording audio input is not supported").grpc_error
# TODO: Eventually implement recording
request = await stream.recv_message()
assert request is not None
duration = request.duration.ToTimedelta()
if duration.total_seconds() == 0:
duration = timedelta(seconds=1)
if duration.total_seconds() > 5:
raise GRPCError(Status.INVALID_ARGUMENT, "Can only record up to 5 seconds")
audio_input = self.get_resource(request.name)
audio_stream = await audio_input.stream()
first_chunk = await audio_stream.__anext__()
num_chunks = int(duration.total_seconds() * float(first_chunk.info.sampling_rate / first_chunk.chunk.length))
sample_width: int
if first_chunk.info.sample_format == SampleFormat.SAMPLE_FORMAT_INT16_INTERLEAVED:
sample_width = 2
elif first_chunk.info.sample_format == SampleFormat.SAMPLE_FORMAT_FLOAT32_INTERLEAVED:
sample_width = 4
else:
raise GRPCError(Status.INVALID_ARGUMENT, "Unspecified type of audio buffer")
output = BytesIO()
wav_file = wave.open(output, "w")
wav_file.setnchannels(first_chunk.info.channels)
wav_file.setframerate(first_chunk.info.sampling_rate)
wav_file.setsampwidth(sample_width)
try:
wav_file.writeframes(first_chunk.chunk.data)
for _ in range(num_chunks - 1):
chunk = await audio_stream.__anext__()
wav_file.writeframes(chunk.chunk.data)
finally:
wav_file.close()
output.close()
output.seek(0)
response = HttpBody(data=output.read(), content_type="audio/wav")
await stream.send_message(response)
[docs] async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None:
request = await stream.recv_message()
assert request is not None
audio_input = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await audio_input.do_command(command=struct_to_dict(request.command), timeout=timeout, metadata=stream.metadata)
response = DoCommandResponse(result=dict_to_struct(result))
await stream.send_message(response)
[docs] async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
audio_input = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
geometries = await audio_input.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout)
response = GetGeometriesResponse(geometries=geometries)
await stream.send_message(response)