Source code for viam.components.input.service
import asyncio
from multiprocessing import Pipe
from typing import Optional
from grpclib.server import Stream
from h2.exceptions import StreamClosedError
import viam
from viam.errors import NotSupportedError
from viam.proto.common import DoCommandRequest, DoCommandResponse, GetGeometriesRequest, GetGeometriesResponse
from viam.proto.component.inputcontroller import (
GetControlsRequest,
GetControlsResponse,
GetEventsRequest,
GetEventsResponse,
InputControllerServiceBase,
StreamEventsRequest,
StreamEventsResponse,
TriggerEventRequest,
TriggerEventResponse,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict
from .input import Control, Controller, Event, EventType
LOGGER = viam.logging.getLogger(__name__)
[docs]class InputControllerRPCService(InputControllerServiceBase, ResourceRPCServiceBase[Controller]):
"""
gRPC Service for an input controller
"""
RESOURCE_TYPE = Controller
[docs] async def GetControls(self, stream: Stream[GetControlsRequest, GetControlsResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.controller
controller = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
controls = await controller.get_controls(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata)
response = GetControlsResponse(controls=[c.value for c in controls])
await stream.send_message(response)
[docs] async def GetEvents(self, stream: Stream[GetEventsRequest, GetEventsResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.controller
controller = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
events = await controller.get_events(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata)
pb_events = [e.proto for e in events.values()]
response = GetEventsResponse(events=pb_events)
await stream.send_message(response)
[docs] async def StreamEvents(self, stream: Stream[StreamEventsRequest, StreamEventsResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.controller
controller = self.get_resource(name)
loop = asyncio.get_running_loop()
# Using Pipes to send event data back to this function so it can be streamed to clients
# The write pipe is added to the callbacks for a control, so whenever that control sends a watched event,
# that event is sent through the pipe, where it will be read (further down) and sent over the stream
pipe_r, pipe_w = Pipe(duplex=False)
def ctrlFunc(event: Event):
try:
pipe_w.send(event)
except Exception as e:
LOGGER.error(e)
cleanup(e)
# Register the pipe callbacks
for event in request.events:
triggers = [EventType(et) for et in event.events]
if len(triggers):
controller.register_control_callback(
Control(event.control),
triggers,
ctrlFunc,
extra=struct_to_dict(request.extra),
)
cancelled_triggers = [EventType(et) for et in event.cancelled_events]
if len(cancelled_triggers):
controller.register_control_callback(
Control(event.control),
cancelled_triggers,
None,
extra=struct_to_dict(request.extra),
)
# Asynchronously wait for messages to come over the read pipe and run the READ function whenever the pipe is ready.
def read():
ev: Event = pipe_r.recv()
pb_ev = ev.proto
response = StreamEventsResponse(event=pb_ev)
async def send_message():
try:
stream._cancel_done = False # Undo hack, see below
await stream.send_message(response)
except StreamClosedError:
cleanup()
except Exception as e:
cleanup(e)
loop.create_task(send_message(), name=f"{viam._TASK_PREFIX}-input_send_event")
loop.add_reader(pipe_r, read)
# HACK: Keep the stream open when this function returns.
# When the StreamEvents function returns, the Stream is closed. But we don't want the stream to close because we still need
# to send events to any clients who have registered callbacks.
# By setting ``stream._cancel_done`` to ``True``, this tricks grpclib into thinking it already closed the stream, so it doesn't
# perform any cleanup (like removing the stream). We eventually do want to actually close this stream, so we undo this hack
# every time we send a message. That way, the trailing metadata is sent when either the server closes or the client disconnects.
stream._cancel_done = True
# Remove ctrl functions when this stream is closed
def cleanup(exc: Optional[Exception] = None):
loop.remove_reader(pipe_r)
pipe_w.close()
pipe_r.close()
unregister_pipe_callbacks()
asyncio.create_task(stream.__aexit__(None, exc, None))
def unregister_pipe_callbacks():
for event in request.events:
triggers = [EventType(et) for et in event.events]
if len(triggers):
controller.register_control_callback(
Control(event.control),
triggers,
None,
extra=struct_to_dict(request.extra),
)
[docs] async def TriggerEvent(self, stream: Stream[TriggerEventRequest, TriggerEventResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.controller
timeout = stream.deadline.time_remaining() if stream.deadline else None
controller = self.get_resource(name)
try:
pb_event = request.event
event = Event.from_proto(pb_event)
await controller.trigger_event(event, extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata)
except NotSupportedError as e:
raise e.grpc_error
response = TriggerEventResponse()
await stream.send_message(response)
[docs] async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None:
request = await stream.recv_message()
assert request is not None
controller = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await controller.do_command(command=struct_to_dict(request.command), timeout=timeout, metadata=stream.metadata)
response = DoCommandResponse(result=dict_to_struct(result))
await stream.send_message(response)
[docs] async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
arm = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
geometries = await arm.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout)
response = GetGeometriesResponse(geometries=geometries)
await stream.send_message(response)