import asyncio
from typing import Any, Dict, Iterable, List, Set
from grpclib.server import Stream
from viam import logging
from viam.components.movement_sensor import MovementSensor
from viam.components.sensor import Sensor
from viam.errors import MethodNotImplementedError, ViamGRPCError
from viam.proto.common import ResourceName
from viam.proto.robot import (
BlockForOperationRequest,
BlockForOperationResponse,
CancelOperationRequest,
CancelOperationResponse,
DiscoverComponentsRequest,
DiscoverComponentsResponse,
FrameSystemConfigRequest,
FrameSystemConfigResponse,
GetCloudMetadataRequest,
GetCloudMetadataResponse,
GetOperationsRequest,
GetOperationsResponse,
GetSessionsRequest,
GetSessionsResponse,
GetStatusRequest,
GetStatusResponse,
LogRequest,
LogResponse,
ResourceNamesRequest,
ResourceNamesResponse,
ResourceRPCSubtypesRequest,
ResourceRPCSubtypesResponse,
RestartModuleRequest,
RestartModuleResponse,
RobotServiceBase,
SendSessionHeartbeatRequest,
SendSessionHeartbeatResponse,
StartSessionRequest,
StartSessionResponse,
Status,
StopAllRequest,
StopAllResponse,
StreamStatusRequest,
StreamStatusResponse,
TransformPCDRequest,
TransformPCDResponse,
TransformPoseRequest,
TransformPoseResponse,
)
from viam.resource.registry import Registry
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import resource_names_for_resource, struct_to_dict
LOGGER = logging.getLogger(__name__)
[docs]class RobotService(RobotServiceBase, ResourceRPCServiceBase):
def _generate_metadata(self) -> List[ResourceName]:
md: Set[ResourceName] = set()
for resource in self.manager.resources.values():
# If the resource is a MovementSensor, DO NOT include Sensor as well (it will get added via MovementSensor)
if resource.SUBTYPE == Sensor.SUBTYPE and MovementSensor.get_resource_name(resource.name) in self.manager.resources:
continue
md.update(resource_names_for_resource(resource))
return list(md)
async def _generate_status(self, resource_names: Iterable[ResourceName]) -> List[Status]:
statuses: List[Status] = []
seen_resource_names: Set[ResourceName] = set()
for resource in self.manager.resources.values():
for registration in Registry.REGISTERED_SUBTYPES().values():
if isinstance(resource, registration.resource_type):
if resource_names and resource.get_resource_name(resource.name) not in resource_names:
continue
try:
status = await registration.create_status(resource)
if status.name not in seen_resource_names:
seen_resource_names.add(status.name)
statuses.append(status)
except ViamGRPCError as e:
raise e.grpc_error
if resource_names:
statuses = [s for s in statuses if s.name in resource_names]
return statuses
[docs] async def RestartModule(self, stream: Stream[RestartModuleRequest, RestartModuleResponse]) -> None:
return None
[docs] async def ResourceNames(self, stream: Stream[ResourceNamesRequest, ResourceNamesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
metadata = self._generate_metadata()
response = ResourceNamesResponse(resources=metadata)
await stream.send_message(response)
[docs] async def GetStatus(self, stream: Stream[GetStatusRequest, GetStatusResponse]) -> None:
request = await stream.recv_message()
assert request is not None
status = await self._generate_status(request.resource_names)
response = GetStatusResponse(status=status)
await stream.send_message(response)
[docs] async def StreamStatus(self, stream: Stream[StreamStatusRequest, StreamStatusResponse]) -> None:
request = await stream.recv_message()
assert request is not None
interval = 1
every = request.every.ToSeconds()
if every > 0:
interval = every
while True:
status = await self._generate_status(request.resource_names)
response = StreamStatusResponse(status=status)
await stream.send_message(response)
await asyncio.sleep(interval)
[docs] async def GetOperations(self, stream: Stream[GetOperationsRequest, GetOperationsResponse]) -> None:
raise MethodNotImplementedError("GetOperations").grpc_error
[docs] async def ResourceRPCSubtypes(self, stream: Stream[ResourceRPCSubtypesRequest, ResourceRPCSubtypesResponse]) -> None:
raise MethodNotImplementedError("ResourceRPCSubtypes").grpc_error
[docs] async def CancelOperation(self, stream: Stream[CancelOperationRequest, CancelOperationResponse]) -> None:
raise MethodNotImplementedError("CancelOperation").grpc_error
[docs] async def BlockForOperation(self, stream: Stream[BlockForOperationRequest, BlockForOperationResponse]) -> None:
raise MethodNotImplementedError("BlockForOperation").grpc_error
[docs] async def FrameSystemConfig(self, stream: Stream[FrameSystemConfigRequest, FrameSystemConfigResponse]) -> None:
raise MethodNotImplementedError("FrameSystemConfig").grpc_error
[docs] async def DiscoverComponents(self, stream: Stream[DiscoverComponentsRequest, DiscoverComponentsResponse]) -> None:
raise MethodNotImplementedError("DiscoverComponents").grpc_error
[docs] async def StopAll(self, stream: Stream[StopAllRequest, StopAllResponse]) -> None:
request = await stream.recv_message()
assert request is not None
extra: Dict[ResourceName, Dict[str, Any]] = {}
for ex in request.extra:
extra[ex.name] = struct_to_dict(ex.params)
errors: List[str] = []
for component in self.manager.resources.values():
if callable(getattr(component, "stop", None)):
try:
rn = component.get_resource_name(component.name)
if rn in extra:
try:
await component.stop(extra=extra[rn]) # type: ignore
except TypeError:
await component.stop() # type: ignore
else:
await component.stop() # type: ignore
except Exception:
LOGGER.exception(f"Failed to stop component named {component.name}")
errors.append(component.name)
if errors:
raise ViamGRPCError(f'Failed to stop components named {", ".join(errors)}')
await stream.send_message(StopAllResponse())
[docs] async def GetSessions(self, stream: Stream[GetSessionsRequest, GetSessionsResponse]) -> None:
raise MethodNotImplementedError("GetSessions").grpc_error
[docs] async def StartSession(self, stream: Stream[StartSessionRequest, StartSessionResponse]) -> None:
raise MethodNotImplementedError("StartSession").grpc_error
[docs] async def SendSessionHeartbeat(self, stream: Stream[SendSessionHeartbeatRequest, SendSessionHeartbeatResponse]) -> None:
raise MethodNotImplementedError("SendSessionHeartbeat").grpc_error
[docs] async def Log(self, stream: Stream[LogRequest, LogResponse]) -> None:
raise MethodNotImplementedError("Log").grpc_error