Source code for viam.robot.service

import asyncio
from typing import Any, Dict, Iterable, List, Set

from grpclib.server import Stream

from viam import logging
from viam.components.movement_sensor import MovementSensor
from viam.components.sensor import Sensor
from viam.errors import ViamGRPCError
from viam.proto.common import ResourceName
from viam.proto.robot import (
    GetStatusRequest,
    GetStatusResponse,
    ResourceNamesRequest,
    ResourceNamesResponse,
    Status,
    StopAllRequest,
    StopAllResponse,
    StreamStatusRequest,
    StreamStatusResponse,
    UnimplementedRobotServiceBase,
)
from viam.resource.registry import Registry
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import resource_names_for_resource, struct_to_dict

LOGGER = logging.getLogger(__name__)


[docs]class RobotService(UnimplementedRobotServiceBase, ResourceRPCServiceBase): def _generate_metadata(self) -> List[ResourceName]: md: Set[ResourceName] = set() for resource in self.manager.resources.values(): # If the resource is a MovementSensor, DO NOT include Sensor as well (it will get added via MovementSensor) if resource.SUBTYPE == Sensor.SUBTYPE and MovementSensor.get_resource_name(resource.name) in self.manager.resources: continue md.update(resource_names_for_resource(resource)) return list(md) async def _generate_status(self, resource_names: Iterable[ResourceName]) -> List[Status]: statuses: List[Status] = [] seen_resource_names: Set[ResourceName] = set() for resource in self.manager.resources.values(): for registration in Registry.REGISTERED_SUBTYPES().values(): if isinstance(resource, registration.resource_type): if resource_names and resource.get_resource_name(resource.name) not in resource_names: continue try: status = await registration.create_status(resource) if status.name not in seen_resource_names: seen_resource_names.add(status.name) statuses.append(status) except ViamGRPCError as e: raise e.grpc_error if resource_names: statuses = [s for s in statuses if s.name in resource_names] return statuses
[docs] async def ResourceNames(self, stream: Stream[ResourceNamesRequest, ResourceNamesResponse]) -> None: request = await stream.recv_message() assert request is not None metadata = self._generate_metadata() response = ResourceNamesResponse(resources=metadata) await stream.send_message(response)
[docs] async def GetStatus(self, stream: Stream[GetStatusRequest, GetStatusResponse]) -> None: request = await stream.recv_message() assert request is not None status = await self._generate_status(request.resource_names) response = GetStatusResponse(status=status) await stream.send_message(response)
[docs] async def StreamStatus(self, stream: Stream[StreamStatusRequest, StreamStatusResponse]) -> None: request = await stream.recv_message() assert request is not None interval = 1 every = request.every.ToSeconds() if every > 0: interval = every while True: status = await self._generate_status(request.resource_names) response = StreamStatusResponse(status=status) await stream.send_message(response) await asyncio.sleep(interval)
[docs] async def StopAll(self, stream: Stream[StopAllRequest, StopAllResponse]) -> None: request = await stream.recv_message() assert request is not None extra: Dict[ResourceName, Dict[str, Any]] = {} for ex in request.extra: extra[ex.name] = struct_to_dict(ex.params) errors: List[str] = [] for component in self.manager.resources.values(): if callable(getattr(component, "stop", None)): try: rn = component.get_resource_name(component.name) if rn in extra: try: await component.stop(extra=extra[rn]) # type: ignore except TypeError: await component.stop() # type: ignore else: await component.stop() # type: ignore except Exception: LOGGER.exception(f"Failed to stop component named {component.name}") errors.append(component.name) if errors: raise ViamGRPCError(f'Failed to stop components named {", ".join(errors)}') await stream.send_message(StopAllResponse())