Source code for viam.gen.robot.v1.robot_grpc

import abc
import typing
import grpclib.const
import grpclib.client
import grpclib.exceptions
if typing.TYPE_CHECKING:
    import grpclib.server
from ... import common
import google.api.annotations_pb2
import google.protobuf.duration_pb2
import google.protobuf.struct_pb2
import google.protobuf.timestamp_pb2
from ... import robot

[docs]class RobotServiceBase(abc.ABC):
[docs] @abc.abstractmethod async def GetOperations(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetOperationsRequest, robot.v1.robot_pb2.GetOperationsResponse]') -> None: pass
[docs] @abc.abstractmethod async def GetSessions(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetSessionsRequest, robot.v1.robot_pb2.GetSessionsResponse]') -> None: pass
[docs] @abc.abstractmethod async def ResourceNames(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.ResourceNamesRequest, robot.v1.robot_pb2.ResourceNamesResponse]') -> None: pass
[docs] @abc.abstractmethod async def ResourceRPCSubtypes(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.ResourceRPCSubtypesRequest, robot.v1.robot_pb2.ResourceRPCSubtypesResponse]') -> None: pass
[docs] @abc.abstractmethod async def CancelOperation(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.CancelOperationRequest, robot.v1.robot_pb2.CancelOperationResponse]') -> None: pass
[docs] @abc.abstractmethod async def BlockForOperation(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.BlockForOperationRequest, robot.v1.robot_pb2.BlockForOperationResponse]') -> None: pass
[docs] @abc.abstractmethod async def DiscoverComponents(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.DiscoverComponentsRequest, robot.v1.robot_pb2.DiscoverComponentsResponse]') -> None: pass
[docs] @abc.abstractmethod async def FrameSystemConfig(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.FrameSystemConfigRequest, robot.v1.robot_pb2.FrameSystemConfigResponse]') -> None: pass
[docs] @abc.abstractmethod async def TransformPose(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.TransformPoseRequest, robot.v1.robot_pb2.TransformPoseResponse]') -> None: pass
[docs] @abc.abstractmethod async def TransformPCD(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.TransformPCDRequest, robot.v1.robot_pb2.TransformPCDResponse]') -> None: pass
[docs] @abc.abstractmethod async def GetStatus(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetStatusRequest, robot.v1.robot_pb2.GetStatusResponse]') -> None: pass
[docs] @abc.abstractmethod async def StreamStatus(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.StreamStatusRequest, robot.v1.robot_pb2.StreamStatusResponse]') -> None: pass
[docs] @abc.abstractmethod async def StopAll(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.StopAllRequest, robot.v1.robot_pb2.StopAllResponse]') -> None: pass
[docs] @abc.abstractmethod async def StartSession(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.StartSessionRequest, robot.v1.robot_pb2.StartSessionResponse]') -> None: pass
[docs] @abc.abstractmethod async def SendSessionHeartbeat(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.SendSessionHeartbeatRequest, robot.v1.robot_pb2.SendSessionHeartbeatResponse]') -> None: pass
[docs] @abc.abstractmethod async def Log(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.LogRequest, robot.v1.robot_pb2.LogResponse]') -> None: pass
[docs] @abc.abstractmethod async def GetCloudMetadata(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetCloudMetadataRequest, robot.v1.robot_pb2.GetCloudMetadataResponse]') -> None: pass
[docs] @abc.abstractmethod async def RestartModule(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.RestartModuleRequest, robot.v1.robot_pb2.RestartModuleResponse]') -> None: pass
[docs] @abc.abstractmethod async def Shutdown(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.ShutdownRequest, robot.v1.robot_pb2.ShutdownResponse]') -> None: pass
[docs] @abc.abstractmethod async def GetMachineStatus(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetMachineStatusRequest, robot.v1.robot_pb2.GetMachineStatusResponse]') -> None: pass
[docs] def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: return {'/viam.robot.v1.RobotService/GetOperations': grpclib.const.Handler(self.GetOperations, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.GetOperationsRequest, robot.v1.robot_pb2.GetOperationsResponse), '/viam.robot.v1.RobotService/GetSessions': grpclib.const.Handler(self.GetSessions, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.GetSessionsRequest, robot.v1.robot_pb2.GetSessionsResponse), '/viam.robot.v1.RobotService/ResourceNames': grpclib.const.Handler(self.ResourceNames, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.ResourceNamesRequest, robot.v1.robot_pb2.ResourceNamesResponse), '/viam.robot.v1.RobotService/ResourceRPCSubtypes': grpclib.const.Handler(self.ResourceRPCSubtypes, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.ResourceRPCSubtypesRequest, robot.v1.robot_pb2.ResourceRPCSubtypesResponse), '/viam.robot.v1.RobotService/CancelOperation': grpclib.const.Handler(self.CancelOperation, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.CancelOperationRequest, robot.v1.robot_pb2.CancelOperationResponse), '/viam.robot.v1.RobotService/BlockForOperation': grpclib.const.Handler(self.BlockForOperation, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.BlockForOperationRequest, robot.v1.robot_pb2.BlockForOperationResponse), '/viam.robot.v1.RobotService/DiscoverComponents': grpclib.const.Handler(self.DiscoverComponents, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.DiscoverComponentsRequest, robot.v1.robot_pb2.DiscoverComponentsResponse), '/viam.robot.v1.RobotService/FrameSystemConfig': grpclib.const.Handler(self.FrameSystemConfig, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.FrameSystemConfigRequest, robot.v1.robot_pb2.FrameSystemConfigResponse), '/viam.robot.v1.RobotService/TransformPose': grpclib.const.Handler(self.TransformPose, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.TransformPoseRequest, robot.v1.robot_pb2.TransformPoseResponse), '/viam.robot.v1.RobotService/TransformPCD': grpclib.const.Handler(self.TransformPCD, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.TransformPCDRequest, robot.v1.robot_pb2.TransformPCDResponse), '/viam.robot.v1.RobotService/GetStatus': grpclib.const.Handler(self.GetStatus, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.GetStatusRequest, robot.v1.robot_pb2.GetStatusResponse), '/viam.robot.v1.RobotService/StreamStatus': grpclib.const.Handler(self.StreamStatus, grpclib.const.Cardinality.UNARY_STREAM, robot.v1.robot_pb2.StreamStatusRequest, robot.v1.robot_pb2.StreamStatusResponse), '/viam.robot.v1.RobotService/StopAll': grpclib.const.Handler(self.StopAll, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.StopAllRequest, robot.v1.robot_pb2.StopAllResponse), '/viam.robot.v1.RobotService/StartSession': grpclib.const.Handler(self.StartSession, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.StartSessionRequest, robot.v1.robot_pb2.StartSessionResponse), '/viam.robot.v1.RobotService/SendSessionHeartbeat': grpclib.const.Handler(self.SendSessionHeartbeat, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.SendSessionHeartbeatRequest, robot.v1.robot_pb2.SendSessionHeartbeatResponse), '/viam.robot.v1.RobotService/Log': grpclib.const.Handler(self.Log, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.LogRequest, robot.v1.robot_pb2.LogResponse), '/viam.robot.v1.RobotService/GetCloudMetadata': grpclib.const.Handler(self.GetCloudMetadata, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.GetCloudMetadataRequest, robot.v1.robot_pb2.GetCloudMetadataResponse), '/viam.robot.v1.RobotService/RestartModule': grpclib.const.Handler(self.RestartModule, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.RestartModuleRequest, robot.v1.robot_pb2.RestartModuleResponse), '/viam.robot.v1.RobotService/Shutdown': grpclib.const.Handler(self.Shutdown, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.ShutdownRequest, robot.v1.robot_pb2.ShutdownResponse), '/viam.robot.v1.RobotService/GetMachineStatus': grpclib.const.Handler(self.GetMachineStatus, grpclib.const.Cardinality.UNARY_UNARY, robot.v1.robot_pb2.GetMachineStatusRequest, robot.v1.robot_pb2.GetMachineStatusResponse)}
[docs]class UnimplementedRobotServiceBase(RobotServiceBase):
[docs] async def GetOperations(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetOperationsRequest, robot.v1.robot_pb2.GetOperationsResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def GetSessions(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetSessionsRequest, robot.v1.robot_pb2.GetSessionsResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def ResourceNames(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.ResourceNamesRequest, robot.v1.robot_pb2.ResourceNamesResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def ResourceRPCSubtypes(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.ResourceRPCSubtypesRequest, robot.v1.robot_pb2.ResourceRPCSubtypesResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def CancelOperation(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.CancelOperationRequest, robot.v1.robot_pb2.CancelOperationResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def BlockForOperation(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.BlockForOperationRequest, robot.v1.robot_pb2.BlockForOperationResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def DiscoverComponents(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.DiscoverComponentsRequest, robot.v1.robot_pb2.DiscoverComponentsResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def FrameSystemConfig(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.FrameSystemConfigRequest, robot.v1.robot_pb2.FrameSystemConfigResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def TransformPose(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.TransformPoseRequest, robot.v1.robot_pb2.TransformPoseResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def TransformPCD(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.TransformPCDRequest, robot.v1.robot_pb2.TransformPCDResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def GetStatus(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetStatusRequest, robot.v1.robot_pb2.GetStatusResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def StreamStatus(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.StreamStatusRequest, robot.v1.robot_pb2.StreamStatusResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def StopAll(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.StopAllRequest, robot.v1.robot_pb2.StopAllResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def StartSession(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.StartSessionRequest, robot.v1.robot_pb2.StartSessionResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def SendSessionHeartbeat(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.SendSessionHeartbeatRequest, robot.v1.robot_pb2.SendSessionHeartbeatResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def Log(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.LogRequest, robot.v1.robot_pb2.LogResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def GetCloudMetadata(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetCloudMetadataRequest, robot.v1.robot_pb2.GetCloudMetadataResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def RestartModule(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.RestartModuleRequest, robot.v1.robot_pb2.RestartModuleResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def Shutdown(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.ShutdownRequest, robot.v1.robot_pb2.ShutdownResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def GetMachineStatus(self, stream: 'grpclib.server.Stream[robot.v1.robot_pb2.GetMachineStatusRequest, robot.v1.robot_pb2.GetMachineStatusResponse]') -> None: raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]class RobotServiceStub: def __init__(self, channel: grpclib.client.Channel) -> None: self.GetOperations = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/GetOperations', robot.v1.robot_pb2.GetOperationsRequest, robot.v1.robot_pb2.GetOperationsResponse) self.GetSessions = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/GetSessions', robot.v1.robot_pb2.GetSessionsRequest, robot.v1.robot_pb2.GetSessionsResponse) self.ResourceNames = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/ResourceNames', robot.v1.robot_pb2.ResourceNamesRequest, robot.v1.robot_pb2.ResourceNamesResponse) self.ResourceRPCSubtypes = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/ResourceRPCSubtypes', robot.v1.robot_pb2.ResourceRPCSubtypesRequest, robot.v1.robot_pb2.ResourceRPCSubtypesResponse) self.CancelOperation = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/CancelOperation', robot.v1.robot_pb2.CancelOperationRequest, robot.v1.robot_pb2.CancelOperationResponse) self.BlockForOperation = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/BlockForOperation', robot.v1.robot_pb2.BlockForOperationRequest, robot.v1.robot_pb2.BlockForOperationResponse) self.DiscoverComponents = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/DiscoverComponents', robot.v1.robot_pb2.DiscoverComponentsRequest, robot.v1.robot_pb2.DiscoverComponentsResponse) self.FrameSystemConfig = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/FrameSystemConfig', robot.v1.robot_pb2.FrameSystemConfigRequest, robot.v1.robot_pb2.FrameSystemConfigResponse) self.TransformPose = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/TransformPose', robot.v1.robot_pb2.TransformPoseRequest, robot.v1.robot_pb2.TransformPoseResponse) self.TransformPCD = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/TransformPCD', robot.v1.robot_pb2.TransformPCDRequest, robot.v1.robot_pb2.TransformPCDResponse) self.GetStatus = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/GetStatus', robot.v1.robot_pb2.GetStatusRequest, robot.v1.robot_pb2.GetStatusResponse) self.StreamStatus = grpclib.client.UnaryStreamMethod(channel, '/viam.robot.v1.RobotService/StreamStatus', robot.v1.robot_pb2.StreamStatusRequest, robot.v1.robot_pb2.StreamStatusResponse) self.StopAll = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/StopAll', robot.v1.robot_pb2.StopAllRequest, robot.v1.robot_pb2.StopAllResponse) self.StartSession = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/StartSession', robot.v1.robot_pb2.StartSessionRequest, robot.v1.robot_pb2.StartSessionResponse) self.SendSessionHeartbeat = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/SendSessionHeartbeat', robot.v1.robot_pb2.SendSessionHeartbeatRequest, robot.v1.robot_pb2.SendSessionHeartbeatResponse) self.Log = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/Log', robot.v1.robot_pb2.LogRequest, robot.v1.robot_pb2.LogResponse) self.GetCloudMetadata = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/GetCloudMetadata', robot.v1.robot_pb2.GetCloudMetadataRequest, robot.v1.robot_pb2.GetCloudMetadataResponse) self.RestartModule = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/RestartModule', robot.v1.robot_pb2.RestartModuleRequest, robot.v1.robot_pb2.RestartModuleResponse) self.Shutdown = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/Shutdown', robot.v1.robot_pb2.ShutdownRequest, robot.v1.robot_pb2.ShutdownResponse) self.GetMachineStatus = grpclib.client.UnaryUnaryMethod(channel, '/viam.robot.v1.RobotService/GetMachineStatus', robot.v1.robot_pb2.GetMachineStatusRequest, robot.v1.robot_pb2.GetMachineStatusResponse)