Source code for viam.media.video

from array import array
from enum import Enum
from typing import List, Optional, Tuple

from typing_extensions import Self

from viam.errors import NotSupportedError
from viam.proto.component.camera import Format

from .viam_rgba import RGBA_HEADER_LENGTH, RGBA_MAGIC_NUMBER


[docs]class CameraMimeType(str, Enum): VIAM_RGBA = "image/vnd.viam.rgba" VIAM_RAW_DEPTH = "image/vnd.viam.dep" JPEG = "image/jpeg" PNG = "image/png" PCD = "pointcloud/pcd"
[docs] @classmethod def from_string(cls, value: str) -> Self: """Return the mimetype from a string. Args: value (str): The mimetype as a string Returns: Self: The mimetype """ value_mime = value[:-5] if value.endswith("+lazy") else value # ViamImage lazy encodes by default return cls(value_mime)
[docs] @classmethod def from_proto(cls, format: Format.ValueType) -> "CameraMimeType": """Returns the mimetype from a proto enum. Args: format (Format.ValueType): The mimetype in a proto enum. Returns: Self: The mimetype. """ mimetypes = { Format.FORMAT_RAW_RGBA: CameraMimeType.VIAM_RGBA, Format.FORMAT_RAW_DEPTH: CameraMimeType.VIAM_RAW_DEPTH, Format.FORMAT_JPEG: CameraMimeType.JPEG, Format.FORMAT_PNG: CameraMimeType.PNG, } return mimetypes.get(format, CameraMimeType.JPEG)
[docs] def to_proto(self) -> Format.ValueType: """Returns the mimetype in a proto enum. Returns: Format.ValueType: The mimetype in a proto enum. """ formats = { self.VIAM_RGBA: Format.FORMAT_RAW_RGBA, self.VIAM_RAW_DEPTH: Format.FORMAT_RAW_DEPTH, self.JPEG: Format.FORMAT_JPEG, self.PNG: Format.FORMAT_PNG, } return formats.get(self, Format.FORMAT_UNSPECIFIED)
[docs]class ViamImage: """A native implementation of an image. Provides the raw data and the mime type. """ _data: bytes _mime_type: CameraMimeType _height: Optional[int] = None _width: Optional[int] = None def __init__(self, data: bytes, mime_type: CameraMimeType) -> None: self._data = data self._mime_type = mime_type self._width, self._height = _getDimensions(data, mime_type) @property def data(self) -> bytes: """The raw bytes of the image""" return self._data @property def mime_type(self) -> CameraMimeType: """The mime type of the image""" return self._mime_type @property def width(self) -> Optional[int]: """The width of the image""" return self._width @property def height(self) -> Optional[int]: """The height of the image""" return self._height
[docs] def bytes_to_depth_array(self) -> List[List[int]]: """ Decode the data of an image that has the custom depth MIME type ``image/vnd.viam.dep`` into a standard representation. Raises: NotSupportedError: Raised if the image is not of MIME type `image/vnd.viam.dep`. Returns: List[List[int]]: The standard representation of the image. """ if self.mime_type != CameraMimeType.VIAM_RAW_DEPTH: raise NotSupportedError("Type must be `image/vnd.viam.dep` to use bytes_to_depth_array()") self._width = int.from_bytes(self.data[8:16], "big") self._height = int.from_bytes(self.data[16:24], "big") depth_arr = array("H", self.data[24:]) depth_arr.byteswap() depth_arr_2d = [[depth_arr[row * self._width + col] for col in range(self._width)] for row in range(self._height)] return depth_arr_2d
[docs]class NamedImage(ViamImage): """An implementation of ViamImage that contains a name attribute.""" name: str """The name of the image """ def __init__(self, name: str, data: bytes, mime_type: CameraMimeType) -> None: self.name = name super().__init__(data, mime_type)
def _getDimensions(image: bytes, mime_type: CameraMimeType) -> Tuple[Optional[int], Optional[int]]: try: if mime_type == CameraMimeType.JPEG: return _getDimensionsFromJPEG(image) if mime_type == CameraMimeType.PNG: return _getDimensionsFromPNG(image) if mime_type == CameraMimeType.VIAM_RGBA: return _getDimensionsFromRGBA(image) except ValueError: return (None, None) return (None, None) def _getDimensionsFromJPEG(image: bytes) -> Tuple[int, int]: # JPEG Specification: https://www.w3.org/Graphics/JPEG/itu-t81.pdf # Specification for markers: Table B.1 offset = 0 while offset < len(image): while image[offset] == 0xFF: # Skip all 0xFF bytes offset += 1 marker = image[offset] offset += 1 if marker == 0x01: # Temporary/private use marker offset += 1 continue if marker in range(0xD0, 0xD7): # Restart (RST) marker offset += 1 continue if marker == 0xD8: # Start of image (SOI) marker offset += 1 continue if marker == 0xD9: # End of image (EOI) marker break length = int.from_bytes(image[offset : offset + 1], byteorder="big") # length of section if marker == 0xC0 or marker == 0xC2: height = int.from_bytes(image[offset + 3 : offset + 5], byteorder="big") width = int.from_bytes(image[offset + 5 : offset + 7], byteorder="big") return (width, height) offset += length raise ValueError("Invalid JPEG: Could not extract dimensions") def _getDimensionsFromPNG(image: bytes) -> Tuple[int, int]: # PNG Specification: https://www.w3.org/TR/png/ # PNG will always start with this signature signature = image[:8] if signature != [0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]: ValueError("Invalid PNG: Invalid signature") header = image[12:24] chunk_type = header[:4].decode() if chunk_type != "IHDR": ValueError("Invalid PNG: Invalid headers") width = int.from_bytes(header[4:8], byteorder="big") height = int.from_bytes(header[8:], byteorder="big") return (width, height) def _getDimensionsFromRGBA(image: bytes) -> Tuple[int, int]: # Viam RGBA header comes in 3 4-byte chunks: # * Magic Number/Signature # * Width # * Height header = image[:RGBA_HEADER_LENGTH] if header[:4] != RGBA_MAGIC_NUMBER: raise ValueError("Invalid Viam RGBA: Invalid headers") width = int.from_bytes(header[4:8], byteorder="big") height = int.from_bytes(header[8:], byteorder="big") return (width, height)