Source code for viam.gen.proto.rpc.webrtc.v1.signaling_grpc

import abc
import typing
import grpclib.const
import grpclib.client
if typing.TYPE_CHECKING:
    import grpclib.server
import google.api.annotations_pb2
import google.protobuf.timestamp_pb2
import google.rpc.status_pb2
from ..... import proto

[docs]class SignalingServiceBase(abc.ABC):
[docs] @abc.abstractmethod async def Call(self, stream: 'grpclib.server.Stream[proto.rpc.webrtc.v1.signaling_pb2.CallRequest, proto.rpc.webrtc.v1.signaling_pb2.CallResponse]') -> None: pass
[docs] @abc.abstractmethod async def CallUpdate(self, stream: 'grpclib.server.Stream[proto.rpc.webrtc.v1.signaling_pb2.CallUpdateRequest, proto.rpc.webrtc.v1.signaling_pb2.CallUpdateResponse]') -> None: pass
[docs] @abc.abstractmethod async def Answer(self, stream: 'grpclib.server.Stream[proto.rpc.webrtc.v1.signaling_pb2.AnswerResponse, proto.rpc.webrtc.v1.signaling_pb2.AnswerRequest]') -> None: pass
[docs] @abc.abstractmethod async def OptionalWebRTCConfig(self, stream: 'grpclib.server.Stream[proto.rpc.webrtc.v1.signaling_pb2.OptionalWebRTCConfigRequest, proto.rpc.webrtc.v1.signaling_pb2.OptionalWebRTCConfigResponse]') -> None: pass
[docs] def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: return {'/proto.rpc.webrtc.v1.SignalingService/Call': grpclib.const.Handler(self.Call, grpclib.const.Cardinality.UNARY_STREAM, proto.rpc.webrtc.v1.signaling_pb2.CallRequest, proto.rpc.webrtc.v1.signaling_pb2.CallResponse), '/proto.rpc.webrtc.v1.SignalingService/CallUpdate': grpclib.const.Handler(self.CallUpdate, grpclib.const.Cardinality.UNARY_UNARY, proto.rpc.webrtc.v1.signaling_pb2.CallUpdateRequest, proto.rpc.webrtc.v1.signaling_pb2.CallUpdateResponse), '/proto.rpc.webrtc.v1.SignalingService/Answer': grpclib.const.Handler(self.Answer, grpclib.const.Cardinality.STREAM_STREAM, proto.rpc.webrtc.v1.signaling_pb2.AnswerResponse, proto.rpc.webrtc.v1.signaling_pb2.AnswerRequest), '/proto.rpc.webrtc.v1.SignalingService/OptionalWebRTCConfig': grpclib.const.Handler(self.OptionalWebRTCConfig, grpclib.const.Cardinality.UNARY_UNARY, proto.rpc.webrtc.v1.signaling_pb2.OptionalWebRTCConfigRequest, proto.rpc.webrtc.v1.signaling_pb2.OptionalWebRTCConfigResponse)}
[docs]class SignalingServiceStub: def __init__(self, channel: grpclib.client.Channel) -> None: self.Call = grpclib.client.UnaryStreamMethod(channel, '/proto.rpc.webrtc.v1.SignalingService/Call', proto.rpc.webrtc.v1.signaling_pb2.CallRequest, proto.rpc.webrtc.v1.signaling_pb2.CallResponse) self.CallUpdate = grpclib.client.UnaryUnaryMethod(channel, '/proto.rpc.webrtc.v1.SignalingService/CallUpdate', proto.rpc.webrtc.v1.signaling_pb2.CallUpdateRequest, proto.rpc.webrtc.v1.signaling_pb2.CallUpdateResponse) self.Answer = grpclib.client.StreamStreamMethod(channel, '/proto.rpc.webrtc.v1.SignalingService/Answer', proto.rpc.webrtc.v1.signaling_pb2.AnswerResponse, proto.rpc.webrtc.v1.signaling_pb2.AnswerRequest) self.OptionalWebRTCConfig = grpclib.client.UnaryUnaryMethod(channel, '/proto.rpc.webrtc.v1.SignalingService/OptionalWebRTCConfig', proto.rpc.webrtc.v1.signaling_pb2.OptionalWebRTCConfigRequest, proto.rpc.webrtc.v1.signaling_pb2.OptionalWebRTCConfigResponse)