Source code for viam.resource.manager

from threading import RLock
from typing import Dict, List, Type, TypeVar

from viam.logging import getLogger
from viam.proto.common import ResourceName
from viam.resource.base import ResourceBase
from viam.resource.registry import Registry

from ..components.component_base import ComponentBase
from ..errors import DuplicateResourceError, ResourceNotFoundError
from ..services.service_base import ServiceBase

LOGGER = getLogger(__name__)
ResourceType = TypeVar("ResourceType", bound=ResourceBase)


[docs]class ResourceManager: """ Registry containing all components registered to this server. """ resources: Dict[ResourceName, ResourceBase] _short_to_long_name: Dict[str, List[ResourceName]] _lock: RLock def __init__(self, components: List[ResourceBase] = []) -> None: self._lock = RLock() self.resources = {} self._short_to_long_name = {} for component in components: self.register(component)
[docs] def register(self, resource: ResourceBase): """ Register a new resource with the registry. Resources may not have the same name. If a resource is remote and the short name is unique, save a short name version. Raises: DuplicateResourceError: Error if attempting to register resource with the name of an existing resource ResourceNotFoundError: Raised if the subtype of the resource is not registered Args: resource (ResourceBase): The resource to register """ Registry.lookup_subtype(resource.SUBTYPE) # confirm the subtype is registered in Registry _BaseClasses = (ResourceBase, ComponentBase, ServiceBase) rnames: Dict[ResourceName, ResourceBase] = {} for subtype in resource.__class__.mro(): if subtype in _BaseClasses: continue if hasattr(subtype, "get_resource_name"): rn = subtype.get_resource_name(resource.name) # type: ignore rnames[rn] = resource for rn in rnames: if ":" in rn.name: short_name = rn.name.split(":")[-1] if short_name in self._short_to_long_name and rn not in self._short_to_long_name[short_name]: self._short_to_long_name[short_name].append(rn) elif short_name not in self._short_to_long_name: self._short_to_long_name[short_name] = [rn] if rnames.keys() & self.resources.keys(): raise DuplicateResourceError(resource.name) with self._lock: self.resources.update(rnames)
[docs] def get_resource(self, of_type: Type[ResourceType], name: ResourceName) -> ResourceType: """ Return a resource from the registry. If a unique short name version is given, return a remote resource with the name. Args: of_type (Type[ResourceType]): The type of the resource name (viam.proto.common.ResourceName): The name of the resource Raises: ResourceNotFoundError: Error if resource with the given type and name does not exist in the registry Returns: ResourceType: The resource """ with self._lock: resource = self.resources.get(name, None) if resource and isinstance(resource, of_type): return resource if name.name in self._short_to_long_name and len(self._short_to_long_name[name.name]) == 1: return self.get_resource(of_type, self._short_to_long_name[name.name][0]) raise ResourceNotFoundError(name.subtype, name.name)
[docs] async def remove_resource(self, name: ResourceName): """Remove the resource with the specified ```ResourceName```. Args: name (viam.proto.common.ResourceName): The ResourceName of the resource """ with self._lock: try: resource = self.resources[name] await resource.close() except Exception as e: raise e finally: del self.resources[name]
[docs] async def close(self): """Close the resourcce manager by removing all resources. Please note that any errors will not raise an exception. Errors will still be logged.""" rns = [key for key in self.resources.keys()] with self._lock: for rn in rns: try: await self.remove_resource(rn) except Exception as e: LOGGER.error(f"Error while closing {rn.name}:", e)
def _resource_by_name_only(self, name: str) -> ResourceBase: for rname, resource in self.resources.items(): if rname.name == name: return resource raise ResourceNotFoundError("resource", name)