Source code for viam.resource.registry

from dataclasses import dataclass
from threading import Lock
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Coroutine, Dict, Generic, Mapping, Type, TypeVar

from google.protobuf.struct_pb2 import Struct
from grpclib.client import Channel

from viam.errors import DuplicateResourceError, ResourceNotFoundError, ValidationError
from viam.proto.robot import Status

from .base import ResourceBase

if TYPE_CHECKING:
    from .rpc_service_base import ResourceRPCServiceBase
    from .types import Model, ResourceCreator, Subtype, Validator

Resource = TypeVar("Resource", bound=ResourceBase)


[docs]async def default_create_status(resource: ResourceBase) -> Status: return Status(name=resource.get_resource_name(resource.name), status=Struct())
[docs]@dataclass class ResourceCreatorRegistration: """An object representing a resource creator to be registered. If creating a custom Resource creator, you should register the creator by creating a ``ResourceCreatorRegistration`` object and registering it to the ``Registry``. """ creator: "ResourceCreator" """A function that can create a resource given a mapping of dependencies (``ResourceName`` to ``ResourceBase`` """ validator: "Validator" = lambda x: [] """A function that can validate a resource and return implicit dependencies. If called without a validator function, default to a function returning an empty Sequence """
[docs]@dataclass class ResourceRegistration(Generic[Resource]): """An object representing a resource to be registered. This object is generic over the ``ResourceBase``, and it includes various functionality for the resource, such as creating its RPC client or status. If creating a custom Resource type, you should register the resource by creating a ``ResourceRegistration`` object and registering it to the ``Registry``. """ resource_type: Type[Resource] """The type of the Resource to be registered """ rpc_service: Type["ResourceRPCServiceBase"] """The type of the RPC service of the resource. This must extend from ``RPCServiceBase`` """ create_rpc_client: Callable[[str, Channel], Resource] """A function that will create the RPC client for this resource """ create_status: Callable[[Resource], Coroutine[Any, Any, Status]] = default_create_status """A function to create a Status object for this resource. If the resource does not provide a custom status type, the default implementation can be used. """
[docs]class Registry: """The global registry of robotic parts. **NB** The Registry should almost never be used directly The Registry keeps track of the types of Resources that are available on robots using this SDK. All the base resource types are pre-registered (for example Arm, Motor). If you create a new resource type that is not an extension of any of the existing base resource types, then you must register said resource using ``Registry.register(...)``. """ _SUBTYPES: ClassVar[Dict["Subtype", ResourceRegistration]] = {} _RESOURCES: ClassVar[Dict[str, ResourceCreatorRegistration]] = {} _lock: ClassVar[Lock] = Lock()
[docs] @classmethod def register_subtype(cls, registration: ResourceRegistration[Resource]): """Register a Subtype with the Registry Args: registration (ResourceRegistration): Object containing registration data for the subtype Raises: DuplicateResourceError: Raised if the Subtype to register is already in the registry ValidationError: Raised if registration is missing any necessary parameters """ with cls._lock: if registration.resource_type.SUBTYPE in cls._SUBTYPES: raise DuplicateResourceError(str(registration.resource_type.SUBTYPE)) if registration.resource_type and registration.rpc_service and registration.create_rpc_client: cls._SUBTYPES[registration.resource_type.SUBTYPE] = registration else: raise ValidationError("Passed resource registration does not have correct parameters")
[docs] @classmethod def register_resource_creator(cls, subtype: "Subtype", model: "Model", registration: ResourceCreatorRegistration): """Register a specific ``Model`` and validator function for the specific resource ``Subtype`` with the Registry Args: subtype (Subtype): The Subtype of the resource model (Model): The Model of the resource registration (ResourceCreatorRegistration): The registration functions of the model Raises: DuplicateResourceError: Raised if the Subtype and Model pairing is already registered ValidationError: Raised if registration does not have creator """ key = f"{subtype}/{model}" with cls._lock: if key in cls._RESOURCES: raise DuplicateResourceError(key) if registration.creator: cls._RESOURCES[key] = registration else: raise ValidationError("A creator function was not provided")
[docs] @classmethod def lookup_subtype(cls, subtype: "Subtype") -> ResourceRegistration: """Lookup and retrieve a registered Subtype by its name Args: subtype (str): The subtype of the resource Raises: ResourceNotFoundError: Raised if the Subtype is not registered Returns: ResourceRegistration: The registration object of the resource """ with cls._lock: try: return cls._SUBTYPES[subtype] except KeyError: raise ResourceNotFoundError(subtype.resource_type, subtype.resource_subtype)
[docs] @classmethod def lookup_resource_creator(cls, subtype: "Subtype", model: "Model") -> "ResourceCreator": """Lookup and retrieve a registered resource creator by its subtype and model Args: subtype (Subtype): The Subtype of the resource model (Model): The Model of the resource Raises: ResourceNotFoundError: Raised if the Subtype Model pairing is not registered Returns: ResourceCreator: The function to create the resource """ with cls._lock: try: return cls._RESOURCES[f"{subtype}/{model}"].creator except KeyError: raise ResourceNotFoundError(subtype.resource_type, subtype.resource_subtype)
[docs] @classmethod def lookup_validator(cls, subtype: "Subtype", model: "Model") -> "Validator": """Lookup and retrieve a registered validator function by its subtype and model. If there is none, return None Args: subtype (Subtype): The Subtype of the resource model (Model): The Model of the resource Returns: Validator: The function to validate the resource """ try: return cls._RESOURCES[f"{subtype}/{model}"].validator except AttributeError: return lambda x: [] except KeyError: raise ResourceNotFoundError(subtype.resource_type, subtype.resource_subtype)
[docs] @classmethod def REGISTERED_SUBTYPES(cls) -> Mapping["Subtype", ResourceRegistration]: """The dictionary of all registered resources - Key: Subtype of the resource - Value: The registration object for the resource Returns: Mapping[Subtype, ResourceRegistration]: All registered resources """ with cls._lock: return cls._SUBTYPES.copy()
[docs] @classmethod def REGISTERED_RESOURCE_CREATORS(cls) -> Mapping[str, "ResourceCreatorRegistration"]: """The dictionary of all registered resources - Key: subtype/model - Value: The ResourceCreatorRegistration for the resource Returns: Mapping[str, ResourceCreatorRegistration]: All registered resources """ with cls._lock: return cls._RESOURCES.copy()