Source code for viam.services.mlmodel.utils

from typing import Dict

import numpy as np
from numpy.typing import NDArray
from packaging.version import Version

from viam.proto.service.mlmodel import (
    FlatTensor,
    FlatTensorDataDouble,
    FlatTensorDataFloat,
    FlatTensorDataInt8,
    FlatTensorDataInt16,
    FlatTensorDataInt32,
    FlatTensorDataInt64,
    FlatTensorDataUInt8,
    FlatTensorDataUInt16,
    FlatTensorDataUInt32,
    FlatTensorDataUInt64,
    FlatTensors,
)


[docs]def flat_tensors_to_ndarrays(flat_tensors: FlatTensors) -> Dict[str, NDArray]: property_name_to_dtype = { "float_tensor": np.float32, "double_tensor": np.float64, "int8_tensor": np.int8, "int16_tensor": np.int16, "int32_tensor": np.int32, "int64_tensor": np.int64, "uint8_tensor": np.uint8, "uint16_tensor": np.uint16, "uint32_tensor": np.uint32, "uint64_tensor": np.uint64, } def make_ndarray(flat_data, dtype, shape): """Takes flat data (protobuf RepeatedScalarFieldContainer | bytes) to output an ndarray of appropriate dtype and shape""" make_array = np.frombuffer if dtype == np.int8 or dtype == np.uint8 else np.array # As per proto, int16 and uint16 are stored as uint32. As of numpy v2, this creates # some strange interactions with negative values for int16. Specifically, we end up # trying to create an np.Int16 value with an out of bounds int due to rollover. # Creating our array as a uint32 array initially and then casting to int16 solves this. if Version(np.__version__) >= Version("2") and dtype == np.int16: arr = np.astype(make_array(flat_data, np.uint32), np.int16) # pyright: ignore [reportAttributeAccessIssue] else: arr = make_array(flat_data, dtype) return arr.reshape(shape) ndarrays: Dict[str, NDArray] = dict() for name, flat_tensor in flat_tensors.tensors.items(): property_name = flat_tensor.WhichOneof("tensor") or flat_tensor.WhichOneof(b"tensor") if property_name: tensor_data = getattr(flat_tensor, property_name) flat_data, dtype, shape = tensor_data.data, property_name_to_dtype[property_name], flat_tensor.shape ndarrays[name] = make_ndarray(flat_data, dtype, shape) return ndarrays
[docs]def ndarrays_to_flat_tensors(ndarrays: Dict[str, NDArray]) -> FlatTensors: dtype_name_to_tensor_data_class = { "float32": FlatTensorDataFloat, "float64": FlatTensorDataDouble, "int8": FlatTensorDataInt8, "int16": FlatTensorDataInt16, "int32": FlatTensorDataInt32, "int64": FlatTensorDataInt64, "uint8": FlatTensorDataUInt8, "uint16": FlatTensorDataUInt16, "uint32": FlatTensorDataUInt32, "uint64": FlatTensorDataUInt64, } def get_tensor_data(ndarray: NDArray): """Takes an ndarray and returns the corresponding tensor data class instance for example FlatTensorDataInt8, FlatTensorDataUInt8 etc.""" tensor_data_class = dtype_name_to_tensor_data_class[ndarray.dtype.name] data = ndarray.flatten() if tensor_data_class == FlatTensorDataInt8 or tensor_data_class == FlatTensorDataUInt8: data = data.tobytes() # as per the proto, int8 and uint8 are stored as bytes elif tensor_data_class == FlatTensorDataInt16 or tensor_data_class == FlatTensorDataUInt16: data = data.astype(np.uint32) # as per the proto, int16 and uint16 are stored as uint32 tensor_data = tensor_data_class(data=data) return tensor_data def get_tensor_data_type(ndarray: NDArray): """Takes ndarray and returns a FlatTensor datatype property to be set for example "float_tensor", "uint32_tensor" etc.""" if ndarray.dtype == np.float32: return "float_tensor" elif ndarray.dtype == np.float64: return "double_tensor" return f"{ndarray.dtype.name}_tensor" tensors_mapping: Dict[str, FlatTensor] = dict() for name, ndarray in ndarrays.items(): prop_name, prop_value = get_tensor_data_type(ndarray), get_tensor_data(ndarray) tensors_mapping[name] = FlatTensor(shape=ndarray.shape, **{prop_name: prop_value}) return FlatTensors(tensors=tensors_mapping)