Source code for viam.media.video

from array import array
from enum import Enum
from io import BytesIO
from typing import List, NamedTuple, Optional, Tuple, Union

from PIL import Image, UnidentifiedImageError
from typing_extensions import Self

from viam.errors import NotSupportedError
from viam.proto.component.camera import Format

from .viam_rgba_plugin import RGBA_FORMAT_LABEL

LAZY_SUFFIX = "+lazy"

# Formats that are supported by PIL
LIBRARY_SUPPORTED_FORMATS = ["JPEG", "PNG", RGBA_FORMAT_LABEL]


[docs]class RawImage(NamedTuple): """**DEPRECATED** Use ``ViamImage`` instead A raw bytes representation of an image. A RawImage should be returned instead of a PIL Image instance under one of the following conditions 1) The requested mime type has the LAZY_SUFFIX string appended to it 2) The requested mime type is not supported for decoding/encoding by Viam's Python SDK """ data: bytes """The raw data of the image""" mime_type: str """The mimetype of the image"""
[docs] def close(self): """Close the image and release resources. For RawImage, this is a noop.""" return
[docs] def bytes_to_depth_array(self) -> List[List[int]]: """Decode the data of an image that has the custom depth MIME type ``image/vnd.viam.dep`` into a standard representation. Raises: NotSupportedError: Raised if given an image that is not of MIME type `image/vnd.viam.dep`. Returns: List[List[int]]: The standard representation of the image. """ if self.mime_type != CameraMimeType.VIAM_RAW_DEPTH.value: raise NotSupportedError("Type must be `image/vnd.viam.dep` to use bytes_to_depth_array()") width = int.from_bytes(self.data[8:16], "big") height = int.from_bytes(self.data[16:24], "big") depth_arr = array("H", self.data[24:]) depth_arr.byteswap() depth_arr_2d = [[depth_arr[row * width + col] for col in range(width)] for row in range(height)] return depth_arr_2d
[docs]class CameraMimeType(str, Enum): VIAM_RGBA = "image/vnd.viam.rgba" VIAM_RAW_DEPTH = "image/vnd.viam.dep" JPEG = "image/jpeg" PNG = "image/png" PCD = "pointcloud/pcd" UNSUPPORTED = "unsupported"
[docs] @classmethod def from_lazy(cls, value: str) -> Tuple[Self, bool]: is_lazy = False mime_type = value if value.endswith(LAZY_SUFFIX): mime_type = value[: (len(value) - len(LAZY_SUFFIX))] is_lazy = True if not cls.is_supported(value) and not is_lazy: mime_type = CameraMimeType.UNSUPPORTED return (cls(mime_type), is_lazy)
@property def with_lazy_suffix(self) -> str: return f"{self.value}{LAZY_SUFFIX}"
[docs] def encode_image(self, image: Union[Image.Image, RawImage]) -> bytes: if isinstance(image, RawImage): return image.data if self.name in LIBRARY_SUPPORTED_FORMATS: buf = BytesIO() if image.mode == "RGBA" and self.name == "JPEG": image = image.convert("RGB") image.save(buf, format=self.name) return buf.getvalue() else: raise ValueError(f"Cannot encode image to {self}")
@property def _should_be_raw(self) -> bool: return self in [CameraMimeType.UNSUPPORTED, CameraMimeType.PCD, CameraMimeType.VIAM_RAW_DEPTH] or not CameraMimeType.is_supported( self )
[docs] @classmethod def is_supported(cls, mime_type: str) -> bool: """Check if the provided mime_type is supported. Args: mime_type (str): The mime_type to check Returns: bool: Whether the mime_type is supported """ if mime_type == cls.UNSUPPORTED: return False return mime_type in set(item.value for item in cls)
[docs] @classmethod def from_proto(cls, format: Format.ValueType) -> "CameraMimeType": """Returns the mimetype from a proto enum. Args: format (Format.ValueType): The mimetype in a proto enum. Returns: Self: The mimetype. """ mimetypes = { Format.FORMAT_RAW_RGBA: CameraMimeType.VIAM_RGBA, Format.FORMAT_RAW_DEPTH: CameraMimeType.VIAM_RAW_DEPTH, Format.FORMAT_JPEG: CameraMimeType.JPEG, Format.FORMAT_PNG: CameraMimeType.PNG, Format.FORMAT_UNSPECIFIED: CameraMimeType.UNSUPPORTED, } return mimetypes.get(format, CameraMimeType.UNSUPPORTED)
[docs] def to_proto(self) -> Format.ValueType: """Returns the mimetype in a proto enum. Returns: Format.ValueType: The mimetype in a proto enum. """ formats = { self.VIAM_RGBA: Format.FORMAT_RAW_RGBA, self.VIAM_RAW_DEPTH: Format.FORMAT_RAW_DEPTH, self.JPEG: Format.FORMAT_JPEG, self.PNG: Format.FORMAT_PNG, self.UNSUPPORTED: Format.FORMAT_UNSPECIFIED, } return formats.get(self, Format.FORMAT_UNSPECIFIED)
[docs]class ViamImage: """A native implementation of an image. Provides the raw data and the mime type, as well as lazily loading and caching the PIL.Image representation. """ _data: bytes _mime_type: CameraMimeType _image: Optional[Image.Image] = None _image_decoded = False def __init__(self, data: bytes, mime_type: CameraMimeType) -> None: self._data = data self._mime_type = mime_type @property def data(self) -> bytes: """The raw bytes of the image""" return self._data @property def mime_type(self) -> CameraMimeType: """The mime type of the image""" return self._mime_type @mime_type.setter def mime_type(self, value: CameraMimeType): if value == self.mime_type: return self._mime_type = value self.close() self._image_decoded = False self._image = None @property def image(self) -> Optional[Image.Image]: """The PIL.Image representation of the image. If the mime type is not supported, this will be None.""" if not CameraMimeType.is_supported(self.mime_type): self._image = None self._image_decoded = True return self._image try: self._image = Image.open(BytesIO(self.data), formats=LIBRARY_SUPPORTED_FORMATS) except UnidentifiedImageError: self._image = None self._image_decoded = True return self._image
[docs] def close(self): """Close the image and release resources.""" if self._image is not None: self._image.close()
[docs] def bytes_to_depth_array(self) -> List[List[int]]: """Decode the data of an image that has the custom depth MIME type ``image/vnd.viam.dep`` into a standard representation. Raises: NotSupportedError: Raised if given an image that is not of MIME type `image/vnd.viam.dep`. Returns: List[List[int]]: The standard representation of the image. """ if self.mime_type != CameraMimeType.VIAM_RAW_DEPTH.value: raise NotSupportedError("Type must be `image/vnd.viam.dep` to use bytes_to_depth_array()") width = int.from_bytes(self.data[8:16], "big") height = int.from_bytes(self.data[16:24], "big") depth_arr = array("H", self.data[24:]) depth_arr.byteswap() depth_arr_2d = [[depth_arr[row * width + col] for col in range(width)] for row in range(height)] return depth_arr_2d
[docs]class NamedImage(ViamImage): """An implementation of ViamImage that contains a name attribute.""" name: str """The name of the image """ def __init__(self, name: str, data: bytes, mime_type: CameraMimeType) -> None: self.name = name super().__init__(data, mime_type)