Source code for viam.robot.service

from typing import Any, Dict, List, Set

from grpclib.server import Stream

from viam import logging
from viam.components.movement_sensor import MovementSensor
from viam.components.sensor import Sensor
from viam.errors import ViamGRPCError
from viam.proto.common import ResourceName
from viam.proto.robot import (
    ResourceNamesRequest,
    ResourceNamesResponse,
    StopAllRequest,
    StopAllResponse,
    UnimplementedRobotServiceBase,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import resource_names_for_resource, struct_to_dict

LOGGER = logging.getLogger(__name__)


[docs]class RobotService(UnimplementedRobotServiceBase, ResourceRPCServiceBase): def _generate_metadata(self) -> List[ResourceName]: md: Set[ResourceName] = set() for resource in self.manager.resources.values(): # If the resource is a MovementSensor, DO NOT include Sensor as well (it will get added via MovementSensor) if resource.API == Sensor.API and MovementSensor.get_resource_name(resource.name) in self.manager.resources: continue md.update(resource_names_for_resource(resource)) return list(md)
[docs] async def ResourceNames(self, stream: Stream[ResourceNamesRequest, ResourceNamesResponse]) -> None: request = await stream.recv_message() assert request is not None metadata = self._generate_metadata() response = ResourceNamesResponse(resources=metadata) await stream.send_message(response)
[docs] async def StopAll(self, stream: Stream[StopAllRequest, StopAllResponse]) -> None: request = await stream.recv_message() assert request is not None extra: Dict[ResourceName, Dict[str, Any]] = {} for ex in request.extra: extra[ex.name] = struct_to_dict(ex.params) errors: List[str] = [] for component in self.manager.resources.values(): if callable(getattr(component, "stop", None)): try: rn = component.get_resource_name(component.name) if rn in extra: try: await component.stop(extra=extra[rn]) # type: ignore except TypeError: await component.stop() # type: ignore else: await component.stop() # type: ignore except Exception: LOGGER.exception(f"Failed to stop component named {component.name}") errors.append(component.name) if errors: raise ViamGRPCError(f'Failed to stop components named {", ".join(errors)}') await stream.send_message(StopAllResponse())