Source code for viam.resource.types

import re
import sys
from typing import TYPE_CHECKING, Callable, ClassVar, Mapping, Optional, Protocol, Sequence, runtime_checkable

if sys.version_info >= (3, 10):
    from typing import TypeAlias
else:
    from typing_extensions import TypeAlias

from typing_extensions import Self

from viam.proto.app.robot import ComponentConfig
from viam.proto.common import GetGeometriesRequest, GetGeometriesResponse, ResourceName

if TYPE_CHECKING:
    from .base import ResourceBase

RESOURCE_NAMESPACE_RDK = "rdk"
RESOURCE_TYPE_COMPONENT = "component"
RESOURCE_TYPE_SERVICE = "service"


[docs]class Subtype: """Represents a known component/service (resource) API""" namespace: str """The namespace of the resource""" resource_type: str """The type of the resource, for example `component` or `service`""" resource_subtype: str """The subtype of the resource for example `servo`, `arm`, `vision`""" def __init__(self, namespace: str, resource_type: str, resource_subtype: str): self.namespace = namespace self.resource_type = resource_type self.resource_subtype = resource_subtype
[docs] def __str__(self) -> str: return f"{self.namespace}:{self.resource_type}:{self.resource_subtype}"
[docs] def __repr__(self) -> str: return f"<viam.resource.types.Subtype {str(self)} at {hex(id(self))}>"
[docs] def __hash__(self) -> int: return hash(str(self))
[docs] def __eq__(self, other: object) -> bool: if isinstance(other, Subtype): return str(self) == str(other) return False
[docs] @classmethod def from_resource_name(cls, resource_name: ResourceName) -> Self: """Convert a ```ResourceName``` into a ```Subtype``` Args: resource_name (viam.proto.common.ResourceName): The ResourceName to convert Returns: Self: A new Subtype """ return cls(resource_name.namespace, resource_name.type, resource_name.subtype)
[docs] @classmethod def from_string(cls, string: str) -> Self: """Create a ```Subtype``` from its string representation (namespace:resource_type:resource_subtype) Args: string (str): The Subtype as a string Raises: ValueError: Raised if the string does not represent a valid Subtype Returns: Self: A new Subtype """ regex = re.compile(r"^([\w-]+):([\w-]+):([\w-]+)$") match = regex.match(string) if not match: raise ValueError(f"{string} is not a valid Subtype") return cls(match.group(1), match.group(2), match.group(3))
[docs]class ModelFamily: """Represents a family of related models""" namespace: str """The namespace of the model family""" family: str """The family name""" DEFAULT_FAMILY_NAME: ClassVar[str] = "builtin" DEFAULT: ClassVar["ModelFamily"] def __init__(self, namespace: str, family: str): self.namespace = namespace self.family = family
[docs] def __str__(self) -> str: return f"{self.namespace}:{self.family}"
[docs] def __repr__(self) -> str: return f"<viam.resource.types.ModelFamily {str(self)} at {hex(id(self))}>"
[docs] def __hash__(self) -> int: return hash(str(self))
[docs] def __eq__(self, other: object) -> bool: if isinstance(other, ModelFamily): return str(self) == str(other) return False
ModelFamily.DEFAULT = ModelFamily(RESOURCE_NAMESPACE_RDK, ModelFamily.DEFAULT_FAMILY_NAME)
[docs]class Model: """Represents a specific model within a family of models""" model_family: ModelFamily """The family to which this model belongs""" name: str """The name of the model""" def __init__(self, model_family: ModelFamily, name: str): self.model_family = model_family self.name = name
[docs] def __str__(self) -> str: return f"{self.model_family}:{self.name}"
[docs] def __repr__(self) -> str: return f"<viam.resource.types.Model {str(self)} at {hex(id(self))}>"
[docs] def __hash__(self) -> int: return hash(str(self))
[docs] def __eq__(self, other: object) -> bool: if isinstance(other, Model): return str(self) == str(other) return False
[docs] @classmethod def from_string(cls, model: str, *, ignore_errors=False) -> Self: """Create a ```Model``` from its string representation (namespace:family:name). Args: model (str): The Model as a string ignore_errors (bool, optional): If namespace or family are not found in the string, default to empty string rather than raise an exception. Defaults to False. Raises: ValueError: Raised if the provided string is not a valid Model Returns: Self: The Model """ regex = re.compile(r"^([\w-]+):([\w-]+):([\w-]+)$") match = regex.match(model) if match: namespace = match.group(1) family = match.group(2) name = match.group(3) model_family = ModelFamily(namespace, family) elif ignore_errors: model_family = ModelFamily("", "") name = model else: raise ValueError(f"{model} is not a valid Model") return cls(model_family, name)
[docs]def resource_name_from_string(string: str) -> ResourceName: """Create a ResourceName from its string representation (namespace:resource_type:resource_subtype/<optional_remote:>name) Args: string (str): The ResourceName as a string Raises: ValueError: Raised if the provided string is not a valid ResourceName Returns: viam.proto.common.ResourceName: The new ResourceName """ regex = re.compile(r"^([\w-]+:[\w-]+:(?:[\w-]+))\/?([\w-]+:(?:[\w-]+:)*)?(.+)?$") match = regex.match(string) if not match: raise ValueError(f"{string} is not a valid ResourceName") parts = match[1].split(":") if len(parts) != 3: raise ValueError(f"{string} is not a valid ResourceName") if match[2]: name = f"{match[2]}{match[3]}" else: name = match[3] return ResourceName(namespace=parts[0], type=parts[1], subtype=parts[2], name=name)
ResourceCreator: TypeAlias = Callable[[ComponentConfig, Mapping[ResourceName, "ResourceBase"]], "ResourceBase"] Validator: TypeAlias = Callable[[ComponentConfig], Sequence[str]]
[docs]@runtime_checkable class SupportsGetGeometries(Protocol): """The SupportsGetGeometries protocol defines the requirements for a resource to call get_geometries."""
[docs] async def GetGeometries(self, request: GetGeometriesRequest, *, timeout: Optional[float] = None, **kwargs) -> GetGeometriesResponse: ...