import warnings
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple
from google.protobuf.struct_pb2 import Struct
from grpclib.client import Channel, Stream
from viam import logging
from viam.proto.app.data import (
AddBinaryDataToDatasetByIDsRequest,
AddBoundingBoxToImageByIDRequest,
AddBoundingBoxToImageByIDResponse,
AddTagsToBinaryDataByFilterRequest,
AddTagsToBinaryDataByIDsRequest,
BinaryDataByFilterRequest,
BinaryDataByFilterResponse,
BinaryDataByIDsRequest,
BinaryDataByIDsResponse,
BinaryID,
BinaryMetadata,
BoundingBoxLabelsByFilterRequest,
BoundingBoxLabelsByFilterResponse,
CaptureMetadata,
ConfigureDatabaseUserRequest,
DataRequest,
DataServiceStub,
DeleteBinaryDataByFilterRequest,
DeleteBinaryDataByFilterResponse,
DeleteBinaryDataByIDsRequest,
DeleteBinaryDataByIDsResponse,
DeleteTabularDataRequest,
DeleteTabularDataResponse,
Filter,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
Order,
RemoveBinaryDataFromDatasetByIDsRequest,
RemoveBoundingBoxFromImageByIDRequest,
RemoveTagsFromBinaryDataByFilterRequest,
RemoveTagsFromBinaryDataByFilterResponse,
RemoveTagsFromBinaryDataByIDsRequest,
RemoveTagsFromBinaryDataByIDsResponse,
TabularDataByFilterRequest,
TabularDataByFilterResponse,
TabularDataByMQLRequest,
TabularDataByMQLResponse,
TabularDataBySQLRequest,
TabularDataBySQLResponse,
TagsByFilterRequest,
TagsByFilterResponse,
)
from viam.proto.app.dataset import (
CreateDatasetRequest,
CreateDatasetResponse,
Dataset,
DatasetServiceStub,
DeleteDatasetRequest,
ListDatasetsByIDsRequest,
ListDatasetsByIDsResponse,
ListDatasetsByOrganizationIDRequest,
ListDatasetsByOrganizationIDResponse,
RenameDatasetRequest,
)
from viam.proto.app.datasync import (
DataCaptureUploadMetadata,
DataCaptureUploadRequest,
DataCaptureUploadResponse,
DataSyncServiceStub,
DataType,
FileData,
FileUploadRequest,
FileUploadResponse,
SensorData,
SensorMetadata,
StreamingDataCaptureUploadRequest,
StreamingDataCaptureUploadResponse,
UploadMetadata,
)
from viam.utils import ValueTypes, create_filter, datetime_to_timestamp, struct_to_dict
LOGGER = logging.getLogger(__name__)
[docs]class DataClient:
"""gRPC client for uploading and retrieving data from app.
Constructor is used by `ViamClient` to instantiate relevant service stubs. Calls to `DataClient` methods should be made through
`ViamClient`.
Establish a Connection::
import asyncio
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
async def connect() -> ViamClient:
# Replace "<API-KEY>" (including brackets) with your API key and "<API-KEY-ID>" with your API key ID
dial_options = DialOptions.with_api_key("<API-KEY>", "<API-KEY-ID>")
return await ViamClient.create_from_dial_options(dial_options)
async def main():
# Make a ViamClient
viam_client = await connect()
# Instantiate a DataClient to run data client API methods on
data_client = viam_client.data_client
viam_client.close()
if __name__ == '__main__':
asyncio.run(main())
"""
[docs] @dataclass
class TabularData:
"""Class representing a piece of tabular data and associated metadata."""
data: Mapping[str, Any]
"""The requested data"""
metadata: CaptureMetadata
"""The metadata associated with the data"""
time_requested: datetime
"""The time the data were requested"""
time_received: datetime
"""The time the data were received"""
[docs] def __str__(self) -> str:
return f"{self.data}\n{self.metadata}Time requested: {self.time_requested}\nTime received: {self.time_received}\n"
[docs] def __eq__(self, other: object) -> bool:
if isinstance(other, DataClient.TabularData):
return str(self) == str(other)
return False
# TODO (RSDK-6684): Revisit if this shadow type is necessary
[docs] @dataclass
class BinaryData:
"""Class representing a piece of binary data and associated metadata."""
data: bytes
"""The request data"""
metadata: BinaryMetadata
"""The metadata associated with the data"""
[docs] def __str__(self) -> str:
return f"{self.data}\n{self.metadata}"
[docs] def __eq__(self, other: object) -> bool:
if isinstance(other, DataClient.BinaryData):
return str(self) == str(other)
return False
def __init__(self, channel: Channel, metadata: Mapping[str, str]):
"""Create a `DataClient` that maintains a connection to app.
Args:
channel (grpclib.client.Channel): Connection to app.
metadata (Mapping[str, str]): Required authorization token to send requests to app.
"""
self._metadata = metadata
self._data_client = DataServiceStub(channel)
self._data_sync_client = DataSyncServiceStub(channel)
self._dataset_client = DatasetServiceStub(channel)
self._channel = channel
_data_client: DataServiceStub
_data_sync_client: DataSyncServiceStub
_dataset_client: DatasetServiceStub
_metadata: Mapping[str, str]
_channel: Channel
[docs] async def tabular_data_by_filter(
self,
filter: Optional[Filter] = None,
limit: Optional[int] = None,
sort_order: Optional[Order.ValueType] = None,
last: Optional[str] = None,
count_only: bool = False,
include_internal_data: bool = False,
dest: Optional[str] = None,
) -> Tuple[List[TabularData], int, str]:
"""Filter and download tabular data. The data will be paginated into pages of `limit` items, and the pagination ID will be included
in the returned tuple. If a destination is provided, the data will be saved to that file.
If the file is not empty, it will be overwritten.
::
from viam.proto.app.data import Filter
my_data = []
last = None
my_filter = Filter(component_name="left_motor")
while True:
tabular_data, count, last = await data_client.tabular_data_by_filter(my_filter, last)
if not tabular_data:
break
my_data.extend(tabular_data)
Args:
filter (viam.proto.app.data.Filter): Optional `Filter` specifying tabular data to retrieve. No `Filter` implies all tabular
data.
limit (int): The maximum number of entries to include in a page. Defaults to 50 if unspecified.
sort_order (viam.proto.app.data.Order): The desired sort order of the data.
last (str): Optional string indicating the ID of the last-returned data.
If provided, the server will return the next data entries after the `last` ID.
count_only (bool): Whether to return only the total count of entries.
include_internal_data (bool): Whether to return the internal data. Internal data is used for Viam-specific data ingestion,
like cloud SLAM. Defaults to `False`
dest (str): Optional filepath for writing retrieved data.
Returns:
List[TabularData]: The tabular data.
int: The count (number of entries)
str: The last-returned page ID.
"""
filter = filter if filter else Filter()
data_request = DataRequest(filter=filter)
if limit:
data_request.limit = limit
if sort_order:
data_request.sort_order = sort_order
if last:
data_request.last = last
request = TabularDataByFilterRequest(data_request=data_request, count_only=count_only, include_internal_data=include_internal_data)
response: TabularDataByFilterResponse = await self._data_client.TabularDataByFilter(request, metadata=self._metadata)
data = [
DataClient.TabularData(
struct_to_dict(struct.data),
response.metadata[struct.metadata_index],
struct.time_requested.ToDatetime(),
struct.time_received.ToDatetime(),
)
for struct in response.data
]
if dest:
try:
file = open(dest, "w")
file.write(f"{[str(d) for d in data]}")
file.flush()
except Exception as e:
LOGGER.error(f"Failed to write tabular data to file {dest}", exc_info=e)
return data, response.count, response.last
[docs] async def tabular_data_by_sql(self, organization_id: str, sql_query: str) -> List[Dict[str, ValueTypes]]:
"""Obtain unified tabular data and metadata, queried with SQL.
::
data = await data_client.tabular_data_by_sql(org_id="<your-org-id>", sql_query="<sql-query>")
Args:
organization_id (str): The ID of the organization that owns the data.
sql_query (str): The SQL query to run.
Returns:
List[Dict[str, ValueTypes]]: An array of data objects.
"""
request = TabularDataBySQLRequest(organization_id=organization_id, sql_query=sql_query)
response: TabularDataBySQLResponse = await self._data_client.TabularDataBySQL(request, metadata=self._metadata)
return [struct_to_dict(struct) for struct in response.data]
[docs] async def tabular_data_by_mql(self, organization_id: str, mql_binary: List[bytes]) -> List[Dict[str, ValueTypes]]:
"""Obtain unified tabular data and metadata, queried with MQL.
::
data = await data_client.tabular_data_by_mql(org_id="<your-org-id>", mql_binary=[<mql-bytes-1>, <mql-bytes-2>])
Args:
organization_id (str): The ID of the organization that owns the data.
mql_binary (List[bytes]):The MQL query to run as a list of BSON documents.
Returns:
List[Dict[str, ValueTypes]]: An array of data objects.
"""
request = TabularDataByMQLRequest(organization_id=organization_id, mql_binary=mql_binary)
response: TabularDataByMQLResponse = await self._data_client.TabularDataByMQL(request, metadata=self._metadata)
return [struct_to_dict(struct) for struct in response.data]
[docs] async def binary_data_by_filter(
self,
filter: Optional[Filter] = None,
limit: Optional[int] = None,
sort_order: Optional[Order.ValueType] = None,
last: Optional[str] = None,
include_binary_data: bool = True,
count_only: bool = False,
include_internal_data: bool = False,
dest: Optional[str] = None,
) -> Tuple[List[BinaryData], int, str]:
"""Filter and download binary data. The data will be paginated into pages of `limit` items, and the pagination ID will be included
in the returned tuple. If a destination is provided, the data will be saved to that file.
If the file is not empty, it will be overwritten.
::
from viam.proto.app.data import Filter
my_data = []
last = None
my_filter = Filter(component_name="camera")
while True:
data, count, last = await data_client.binary_data_by_filter(my_filter, last)
if not data:
break
my_data.extend(data)
Args:
filter (viam.proto.app.data.Filter): Optional `Filter` specifying tabular data to retrieve. No `Filter` implies all binary
data.
limit (int): The maximum number of entries to include in a page. Defaults to 50 if unspecified.
sort_order (viam.proto.app.data.Order): The desired sort order of the data.
last (str): Optional string indicating the ID of the last-returned data.
If provided, the server will return the next data entries after the `last` ID.
include_binary_data (bool): Boolean specifying whether to actually include the binary file data with each retrieved file.
Defaults to true (i.e., both the files' data and metadata are returned).
count_only (bool): Whether to return only the total count of entries.
include_internal_data (bool): Whether to return the internal data. Internal data is used for Viam-specific data ingestion,
like cloud SLAM. Defaults to `False`
dest (str): Optional filepath for writing retrieved data.
Returns:
List[BinaryData]: The binary data.
int: The count (number of entries)
str: The last-returned page ID.
"""
data_request = DataRequest(filter=filter)
if limit:
data_request.limit = limit
if sort_order:
data_request.sort_order = sort_order
if last:
data_request.last = last
request = BinaryDataByFilterRequest(
data_request=data_request,
include_binary=include_binary_data,
count_only=count_only,
include_internal_data=include_internal_data,
)
response: BinaryDataByFilterResponse = await self._data_client.BinaryDataByFilter(request, metadata=self._metadata)
data = [DataClient.BinaryData(data.binary, data.metadata) for data in response.data]
if dest:
try:
file = open(dest, "w")
file.write(f"{[str(d) for d in data]}")
file.flush()
except Exception as e:
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)
return data, response.count, response.last
[docs] async def binary_data_by_ids(
self,
binary_ids: List[BinaryID],
dest: Optional[str] = None,
) -> List[BinaryData]:
"""Filter and download binary data.
::
from viam.proto.app.data import BinaryID
binary_metadata = await data_client.binary_data_by_filter(
include_file_data=False
)
my_ids = []
for obj in binary_metadata:
my_ids.append(
BinaryID(
file_id=obj.metadata.id,
organization_id=obj.metadata.capture_metadata.organization_id,
location_id=obj.metadata.capture_metadata.location_id
)
)
binary_data = await data_client.binary_data_by_ids(my_ids)
Args:
binary_ids (List[viam.proto.app.data.BinaryID]): `BinaryID` objects specifying the desired data. Must be non-empty.
dest (str): Optional filepath for writing retrieved data.
Raises:
GRPCError: If no `BinaryID` objects are provided.
Returns:
List[BinaryData]: The binary data.
"""
request = BinaryDataByIDsRequest(binary_ids=binary_ids, include_binary=True)
response: BinaryDataByIDsResponse = await self._data_client.BinaryDataByIDs(request, metadata=self._metadata)
if dest:
try:
file = open(dest, "w")
file.write(f"{response.data}")
file.flush()
except Exception as e:
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)
return [DataClient.BinaryData(data.binary, data.metadata) for data in response.data]
[docs] async def delete_tabular_data(self, organization_id: str, delete_older_than_days: int) -> int:
"""Delete tabular data older than a specified number of days.
::
from viam.proto.app.data import Filter
my_filter = Filter(component_name="left_motor")
days_of_data_to_delete = 10
tabular_data = await data_client.delete_tabular_data(
org_id="a12b3c4e-1234-1abc-ab1c-ab1c2d345abc", days_of_data_to_delete)
Args:
organization_id (str): ID of organization to delete data from.
delete_older_than_days (int): Delete data that was captured up to this many days ago. For example if `delete_older_than_days`
is 10, this deletes any data that was captured up to 10 days ago. If it is 0, all existing data is deleted.
Returns:
int: The number of items deleted.
"""
request = DeleteTabularDataRequest(organization_id=organization_id, delete_older_than_days=delete_older_than_days)
response: DeleteTabularDataResponse = await self._data_client.DeleteTabularData(request, metadata=self._metadata)
return response.deleted_count
[docs] async def delete_tabular_data_by_filter(self, filter: Optional[Filter]) -> int:
"""Deprecated: use delete_tabular_data instead."""
raise NotImplementedError()
[docs] async def delete_binary_data_by_filter(self, filter: Optional[Filter]) -> int:
"""Filter and delete binary data.
::
from viam.proto.app.data import Filter
my_filter = Filter(component_name="left_motor")
res = await data_client.delete_binary_data_by_filter(my_filter)
Args:
filter (viam.proto.app.data.Filter): Optional `Filter` specifying binary data to delete. Passing an empty `Filter` will lead to
all data being deleted. Exercise caution when using this option.
Returns:
int: The number of items deleted.
"""
filter = filter if filter else Filter()
request = DeleteBinaryDataByFilterRequest(filter=filter)
response: DeleteBinaryDataByFilterResponse = await self._data_client.DeleteBinaryDataByFilter(request, metadata=self._metadata)
return response.deleted_count
[docs] async def delete_binary_data_by_ids(self, binary_ids: List[BinaryID]) -> int:
"""Filter and delete binary data.
::
from viam.proto.app.data import BinaryID
binary_metadata = await data_client.binary_data_by_filter(
include_file_data=False
)
my_ids = []
for obj in binary_metadata:
my_ids.append(
BinaryID(
file_id=obj.metadata.id,
organization_id=obj.metadata.capture_metadata.organization_id,
location_id=obj.metadata.capture_metadata.location_id
)
)
binary_data = await data_client.delete_binary_data_by_ids(my_ids)
Args:
binary_ids (List[viam.proto.app.data.BinaryID]): `BinaryID` objects specifying the data to be deleted. Must be non-empty.
Raises:
GRPCError: If no `BinaryID` objects are provided.
Returns:
int: The number of items deleted.
"""
request = DeleteBinaryDataByIDsRequest(binary_ids=binary_ids)
response: DeleteBinaryDataByIDsResponse = await self._data_client.DeleteBinaryDataByIDs(request, metadata=self._metadata)
return response.deleted_count
[docs] async def add_bounding_box_to_image_by_id(
self,
binary_id: BinaryID,
label: str,
x_min_normalized: float,
y_min_normalized: float,
x_max_normalized: float,
y_max_normalized: float,
) -> str:
"""Add a bounding box to an image.
::
from viam.proto.app.data import BinaryID
MY_BINARY_ID = BinaryID(
file_id=your-file_id,
organization_id=your-org-id,
location_id=your-location-id
)
bbox_label = await data_client.add_bounding_box_to_image_by_id(
binary_id=MY_BINARY_ID,
label="label",
x_min_normalized=0,
y_min_normalized=.1,
x_max_normalized=.2,
y_max_normalized=.3
)
print(bbox_label)
Args:
binary_id (viam.proto.app.data.BinaryID): The ID of the image to add the bounding box to.
label (str): A label for the bounding box.
x_min_normalized (float): Min X value of the bounding box normalized from 0 to 1.
y_min_normalized (float): Min Y value of the bounding box normalized from 0 to 1.
x_max_normalized (float): Max X value of the bounding box normalized from 0 to 1.
y_max_normalized (float): Max Y value of the bounding box normalized from 0 to 1.
Raises:
GRPCError: If the X or Y values are outside of the [0, 1] range.
Returns:
str: The bounding box ID
"""
request = AddBoundingBoxToImageByIDRequest(
label=label,
binary_id=binary_id,
x_max_normalized=x_max_normalized,
x_min_normalized=x_min_normalized,
y_max_normalized=y_max_normalized,
y_min_normalized=y_min_normalized,
)
response: AddBoundingBoxToImageByIDResponse = await self._data_client.AddBoundingBoxToImageByID(request, metadata=self._metadata)
return response.bbox_id
[docs] async def remove_bounding_box_from_image_by_id(self, bbox_id: str, binary_id: BinaryID) -> None:
"""Removes a bounding box from an image.
::
from viam.proto.app.data import BinaryID
MY_BINARY_ID = BinaryID(
file_id=your-file_id,
organization_id=your-org-id,
location_id=your-location-id
)
await data_client.remove_bounding_box_from_image_by_id(
binary_id=MY_BINARY_ID,
bbox_id="your-bounding-box-id-to-delete"
)
Args:
bbox_id (str): The ID of the bounding box to remove.
Binary_id (viam.proto.arr.data.BinaryID): Binary ID of the image to to remove the bounding box from
"""
request = RemoveBoundingBoxFromImageByIDRequest(bbox_id=bbox_id, binary_id=binary_id)
await self._data_client.RemoveBoundingBoxFromImageByID(request, metadata=self._metadata)
[docs] async def bounding_box_labels_by_filter(self, filter: Optional[Filter] = None) -> List[str]:
"""Get a list of bounding box labels using a `Filter`.
::
from viam.proto.app.data import Filter
my_filter = Filter(component_name="my_camera")
bounding_box_labels = await data_client.bounding_box_labels_by_filter(
my_filter)
Args:
filter (viam.proto.app.data.Filter): `Filter` specifying data to retrieve from. If no `Filter` is provided, all labels will
return.
Returns:
List[str]: The list of bounding box labels.
"""
filter = filter if filter else Filter()
request = BoundingBoxLabelsByFilterRequest(filter=filter)
response: BoundingBoxLabelsByFilterResponse = await self._data_client.BoundingBoxLabelsByFilter(request, metadata=self._metadata)
return list(response.labels)
[docs] async def get_database_connection(self, organization_id: str) -> str:
"""Get a connection to access a MongoDB Atlas Data federation instance.
::
data_client.get_database_connection(org_id="a12b3c4e-1234-1abc-ab1c-ab1c2d345abc")
Args:
organization_id (str): Organization to retrieve the connection for.
Returns:
str: The hostname of the federated database.
"""
request = GetDatabaseConnectionRequest(organization_id=organization_id)
response: GetDatabaseConnectionResponse = await self._data_client.GetDatabaseConnection(request, metadata=self._metadata)
return response.hostname
[docs] async def create_dataset(self, name: str, organization_id: str) -> str:
"""Create a new dataset.
::
name = await data_client.create_dataset(
name="<dataset-name>",
organization_id="<your-org-id>"
)
print(name)
Args:
name (str): The name of the dataset being created.
organization_id (str): The ID of the organization where the dataset is being created.
Returns:
str: The dataset ID of the created dataset.
"""
request = CreateDatasetRequest(name=name, organization_id=organization_id)
response: CreateDatasetResponse = await self._dataset_client.CreateDataset(request, metadata=self._metadata)
return response.id
[docs] async def list_dataset_by_ids(self, ids: List[str]) -> Sequence[Dataset]:
"""Get a list of datasets using their IDs.
::
datasets = await data_client.list_dataset_by_ids(
ids=["abcd-1234xyz-8765z-123abc"]
)
print(datasets)
Args:
ids (List[str]): The IDs of the datasets being called for.
Returns:
Sequence[Dataset]: The list of datasets.
"""
request = ListDatasetsByIDsRequest(ids=ids)
response: ListDatasetsByIDsResponse = await self._dataset_client.ListDatasetsByIDs(request, metadata=self._metadata)
return response.datasets
[docs] async def list_datasets_by_organization_id(self, organization_id: str) -> Sequence[Dataset]:
"""Get the datasets in an organization.
::
datasets = await data_client.list_dataset_by_ids(
ids=["abcd-1234xyz-8765z-123abc"]
)
print(datasets)
Args:
organization_id (str): The ID of the organization.
Returns:
Sequence[Dataset]: The list of datasets in the organization.
"""
request = ListDatasetsByOrganizationIDRequest(organization_id=organization_id)
response: ListDatasetsByOrganizationIDResponse = await self._dataset_client.ListDatasetsByOrganizationID(
request, metadata=self._metadata
)
return response.datasets
[docs] async def rename_dataset(self, id: str, name: str) -> None:
"""Rename a dataset specified by the dataset ID.
::
await data_client.rename_dataset(
id="abcd-1234xyz-8765z-123abc",
name="<dataset-name>"
)
Args:
id (str): The ID of the dataset.
name (str): The new name of the dataset.
"""
request = RenameDatasetRequest(id=id, name=name)
await self._dataset_client.RenameDataset(request, metadata=self._metadata)
[docs] async def delete_dataset(self, id: str) -> None:
"""Delete a dataset.
::
await data_client.delete_dataset(
id="abcd-1234xyz-8765z-123abc"
)
Args:
id (str): The ID of the dataset.
"""
request = DeleteDatasetRequest(id=id)
await self._dataset_client.DeleteDataset(request, metadata=self._metadata)
[docs] async def add_binary_data_to_dataset_by_ids(self, binary_ids: List[BinaryID], dataset_id: str) -> None:
"""Add the BinaryData to the provided dataset.
This BinaryData will be tagged with the VIAM_DATASET_{id} label.
::
from viam.proto.app.data import BinaryID
binary_metadata = await data_client.binary_data_by_filter(
include_file_data=False
)
my_binary_ids = []
for obj in binary_metadata:
my_binary_ids.append(
BinaryID(
file_id=obj.metadata.id,
organization_id=obj.metadata.capture_metadata.organization_id,
location_id=obj.metadata.capture_metadata.location_id
)
)
await data_client.add_binary_data_to_dataset_by_ids(
binary_ids=my_binary_ids,
dataset_id="abcd-1234xyz-8765z-123abc"
)
Args:
binary_ids (List[BinaryID]): The IDs of binary data to add to dataset.
dataset_id (str): The ID of the dataset to be added to.
"""
request = AddBinaryDataToDatasetByIDsRequest(binary_ids=binary_ids, dataset_id=dataset_id)
await self._data_client.AddBinaryDataToDatasetByIDs(request, metadata=self._metadata)
[docs] async def remove_binary_data_from_dataset_by_ids(self, binary_ids: List[BinaryID], dataset_id: str) -> None:
"""Remove the BinaryData from the provided dataset.
This BinaryData will lose the VIAM_DATASET_{id} tag.
::
from viam.proto.app.data import BinaryID
binary_metadata = await data_client.binary_data_by_filter(
include_file_data=False
)
my_binary_ids = []
for obj in binary_metadata:
my_binary_ids.append(
BinaryID(
file_id=obj.metadata.id,
organization_id=obj.metadata.capture_metadata.organization_id,
location_id=obj.metadata.capture_metadata.location_id
)
)
await data_client.remove_binary_data_from_dataset_by_ids(
binary_ids=my_binary_ids,
dataset_id="abcd-1234xyz-8765z-123abc"
)
Args:
binary_ids (List[BinaryID]): The IDs of binary data to remove from dataset.
dataset_id (str): The ID of the dataset to be removed from.
"""
request = RemoveBinaryDataFromDatasetByIDsRequest(binary_ids=binary_ids, dataset_id=dataset_id)
await self._data_client.RemoveBinaryDataFromDatasetByIDs(request, metadata=self._metadata)
[docs] async def binary_data_capture_upload(
self,
binary_data: bytes,
part_id: str,
component_type: str,
component_name: str,
method_name: str,
file_extension: str,
method_parameters: Optional[Mapping[str, Any]] = None,
tags: Optional[List[str]] = None,
data_request_times: Optional[Tuple[datetime, datetime]] = None,
) -> str:
"""Upload binary sensor data.
Upload binary data collected on a robot through a specific component (e.g., a motor) along with the relevant metadata to
app.viam.com. Binary data can be found under the "Files" subtab of the Data tab on app.viam.com.
::
time_requested = datetime(2023, 6, 5, 11)
time_received = datetime(2023, 6, 5, 11, 0, 3)
file_id = await data_client.binary_data_capture_upload(
part_id="INSERT YOUR PART ID",
component_type='camera',
component_name='my_camera',
method_name='GetImages',
method_parameters=None,
tags=["tag_1", "tag_2"],
data_request_times=[time_requested, time_received],
file_extension=".jpg",
binary_data=b"Encoded image bytes"
)
Args:
binary_data (bytes): The data to be uploaded, represented in bytes.
part_id (str): Part ID of the component used to capture the data.
component_type (str): Type of the component used to capture the data (e.g., "movement_sensor").
component_name (str): Name of the component used to capture the data.
method_name (str): Name of the method used to capture the data.
file_extension (str): The file extension of binary data including the period, e.g. .jpg, .png, .pcd.
The backend will route the binary to its corresponding mime type based on this extension. Files with a .jpeg, .jpg,
or .png extension will be saved to the images tab.
method_parameters (Optional[Mapping[str, Any]]): Optional dictionary of method parameters. No longer in active use.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based data filtering when retrieving data.
data_request_times (Optional[Tuple[datetime.datetime, datetime.datetime]]): Optional tuple containing `datetime`s objects
denoting the times this data was requested[0] by the robot and received[1] from the appropriate sensor.
Raises:
GRPCError: If an invalid part ID is passed.
Returns:
str: the file_id of the uploaded data.
"""
sensor_contents = SensorData(
metadata=(
SensorMetadata(
time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times else None,
time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times else None,
)
if data_request_times
else None
),
struct=None, # Used for tabular data.
binary=binary_data,
)
metadata = UploadMetadata(
part_id=part_id,
component_type=component_type,
component_name=component_name,
method_name=method_name,
type=DataType.DATA_TYPE_BINARY_SENSOR,
method_parameters=method_parameters,
tags=tags,
)
if file_extension:
metadata.file_extension = file_extension if file_extension[0] == "." else f".{file_extension}"
response = await self._data_capture_upload(metadata=metadata, sensor_contents=[sensor_contents])
return response.file_id
[docs] async def tabular_data_capture_upload(
self,
tabular_data: List[Mapping[str, Any]],
part_id: str,
component_type: str,
component_name: str,
method_name: str,
method_parameters: Optional[Mapping[str, Any]] = None,
tags: Optional[List[str]] = None,
data_request_times: Optional[List[Tuple[datetime, datetime]]] = None,
) -> str:
"""Upload tabular sensor data.
Upload tabular data collected on a robot through a specific component (e.g., a motor) along with the relevant metadata to
app.viam.com. Tabular data can be found under the "Sensors" subtab of the Data tab on app.viam.com.
::
time_requested = datetime(2023, 6, 5, 11)
time_received = datetime(2023, 6, 5, 11, 0, 3)
file_id = await data_client.tabular_data_capture_upload(
part_id="INSERT YOUR PART ID",
component_type='motor',
component_name='left_motor',
method_name='IsPowered',
tags=["tag_1", "tag_2"],
data_request_times=[(time_requested, time_received)],
tabular_data=[{'PowerPCT': 0, 'IsPowered': False}]
)
Args:
tabular_data (List[Mapping[str, Any]]): List of the data to be uploaded, represented tabularly as a collection of dictionaries.
part_id (str): Part ID of the component used to capture the data.
component_type (str): Type of the component used to capture the data (e.g., "movement_sensor").
component_name (str): Name of the component used to capture the data.
method_name (str): Name of the method used to capture the data.
method_parameters (Optional[Mapping[str, Any]]): Optional dictionary of method parameters. No longer in active use.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based data filtering when retrieving data.
data_request_times (Optional[List[Tuple[datetime.datetime, datetime.datetime]]]): Optional list of tuples, each containing
`datetime` objects denoting the times this data was requested[0] by the robot and received[1] from the appropriate sensor.
Passing a list of tabular data and Timestamps with length n > 1 will result in n datapoints being uploaded, all tied to the
same metadata.
Raises:
GRPCError: If an invalid part ID is passed.
ValueError: If a list of `Timestamp` objects is provided and its length does not match the length of the list of tabular
data.
Returns:
str: the file_id of the uploaded data.
"""
sensor_contents = []
if data_request_times:
if len(data_request_times) != len(tabular_data):
raise ValueError("data_request_times and tabular_data lengths must be equal.")
for idx, tab in enumerate(tabular_data):
s = Struct()
s.update(tab)
sensor_contents.append(
SensorData(
metadata=(
SensorMetadata(
time_requested=datetime_to_timestamp(data_request_times[idx][0]) if data_request_times else None,
time_received=datetime_to_timestamp(data_request_times[idx][1]) if data_request_times else None,
)
if data_request_times[idx]
else None
)
if data_request_times
else None,
struct=s,
)
)
metadata = UploadMetadata(
part_id=part_id,
component_type=component_type,
component_name=component_name,
method_name=method_name,
type=DataType.DATA_TYPE_TABULAR_SENSOR,
method_parameters=method_parameters,
tags=tags,
)
response = await self._data_capture_upload(metadata=metadata, sensor_contents=sensor_contents)
return response.file_id
async def _data_capture_upload(self, metadata: UploadMetadata, sensor_contents: List[SensorData]) -> DataCaptureUploadResponse:
request = DataCaptureUploadRequest(metadata=metadata, sensor_contents=sensor_contents)
response: DataCaptureUploadResponse = await self._data_sync_client.DataCaptureUpload(request, metadata=self._metadata)
return response
[docs] async def streaming_data_capture_upload(
self,
data: bytes,
part_id: str,
file_ext: str,
component_type: Optional[str] = None,
component_name: Optional[str] = None,
method_name: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
data_request_times: Optional[Tuple[datetime, datetime]] = None,
tags: Optional[List[str]] = None,
) -> str:
"""Uploads the metadata and contents of streaming binary data.
::
time_requested = datetime(2023, 6, 5, 11)
time_received = datetime(2023, 6, 5, 11, 0, 3)
file_id = await data_client.streaming_data_capture_upload(
data="byte-data-to-upload",
part_id="INSERT YOUR PART ID",
file_ext="png",
component_type='motor',
component_name='left_motor',
method_name='IsPowered',
data_request_times=[(time_requested, time_received)],
tags=["tag_1", "tag_2"]
)
Args:
data (bytes): the data to be uploaded.
part_id (str): Part ID of the resource associated with the file.
file_ext (str): file extension type for the data. required for determining MIME type.
component_type (Optional[str]): Optional type of the component associated with the file (e.g., "movement_sensor").
component_name (Optional[str]): Optional name of the component associated with the file.
method_name (Optional[str]): Optional name of the method associated with the file.
method_parameters (Optional[str]): Optional dictionary of the method parameters. No longer in active use.
data_request_times (Optional[Tuple[datetime.datetime, datetime.datetime]]): Optional tuple containing `datetime`s objects
denoting the times this data was requested[0] by the robot and received[1] from the appropriate sensor.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based filtering when retrieving data.
Raises:
GRPCError: If an invalid part ID is passed.
Returns:
str: the file_id of the uploaded data.
"""
upload_metadata = UploadMetadata(
part_id=part_id,
component_type=component_type if component_type else "",
component_name=component_name if component_name else "",
method_name=method_name if method_name else "",
method_parameters=method_parameters,
type=DataType.DATA_TYPE_BINARY_SENSOR,
file_extension=file_ext if file_ext[0] == "." else f".{file_ext}",
tags=tags,
)
sensor_metadata = SensorMetadata(
time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times else None,
time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times else None,
)
metadata = DataCaptureUploadMetadata(upload_metadata=upload_metadata, sensor_metadata=sensor_metadata)
request_metadata = StreamingDataCaptureUploadRequest(metadata=metadata)
stream: Stream[StreamingDataCaptureUploadRequest, StreamingDataCaptureUploadResponse]
async with self._data_sync_client.StreamingDataCaptureUpload.open(metadata=self._metadata) as stream:
await stream.send_message(request_metadata)
await stream.send_message(StreamingDataCaptureUploadRequest(data=data), end=True)
response = await stream.recv_message()
if not response:
await stream.recv_trailing_metadata() # causes us to throw appropriate gRPC error
raise TypeError("Response cannot be empty")
return response.file_id
[docs] async def file_upload(
self,
part_id: str,
data: bytes,
component_type: Optional[str] = None,
component_name: Optional[str] = None,
method_name: Optional[str] = None,
file_name: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
file_extension: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> str:
"""Upload arbitrary file data.
Upload file data that may be stored on a robot along with the relevant metadata to app.viam.com. File data can be found under the
"Files" subtab of the Data tab on app.viam.com.
::
file_id = await data_client.file_upload(
data=b"Encoded image bytes",
part_id="INSERT YOUR PART ID",
tags=["tag_1", "tag_2"],
file_name="your-file",
file_extension=".txt"
)
Args:
part_id (str): Part ID of the resource associated with the file.
data (bytes): Bytes representing file data to upload.
component_type (Optional[str]): Optional type of the component associated with the file (e.g., "movement_sensor").
component_name (Optional[str]): Optional name of the component associated with the file.
method_name (Optional[str]): Optional name of the method associated with the file.
file_name (Optional[str]): Optional name of the file. The empty string "" will be assigned as the file name if one isn't
provided.
method_parameters (Optional[str]): Optional dictionary of the method parameters. No longer in active use.
file_extension (Optional[str]): Optional file extension. The empty string "" will be assigned as the file extension if one isn't
provided. Files with a .jpeg, .jpg, or .png extension will be saved to the images tab.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based filtering when retrieving data.
Raises:
GRPCError: If an invalid part ID is passed.
Returns:
str: ID of the new file.
"""
metadata = UploadMetadata(
part_id=part_id,
component_type=component_type if component_type else "",
component_name=component_name if component_name else "",
method_name=method_name if method_name else "",
type=DataType.DATA_TYPE_FILE,
file_name=file_name if file_name else "",
method_parameters=method_parameters,
file_extension=file_extension if file_extension else "",
tags=tags,
)
response: FileUploadResponse = await self._file_upload(metadata=metadata, file_contents=FileData(data=data))
return response.file_id
[docs] async def file_upload_from_path(
self,
filepath: str,
part_id: str,
component_type: Optional[str] = None,
component_name: Optional[str] = None,
method_name: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
tags: Optional[List[str]] = None,
) -> str:
"""Upload arbitrary file data.
Upload file data that may be stored on a robot along with the relevant metadata to app.viam.com. File data can be found under the
"Files" subtab of the Data tab on app.viam.com.
::
file_id = await data_client.file_upload_from_path(
part_id="INSERT YOUR PART ID",
tags=["tag_1", "tag_2"],
filepath="/Users/<your-username>/<your-directory>/<your-file.txt>"
)
Args:
filepath (str): Absolute filepath of file to be uploaded.
part_id (str): Part ID of the component associated with the file.
component_type (Optional[str]): Optional type of the component associated with the file (e.g., "movement_sensor").
component_name (Optional[str]): Optional name of the component associated with the file.
method_name (Optional[str]): Optional name of the method associated with the file.
method_parameters (Optional[str]): Optional dictionary of the method parameters. No longer in active use.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based filtering when retrieving data.
Raises:
GRPCError: If an invalid part ID is passed.
FileNotFoundError: If the provided filepath is not found.
Returns:
str: ID of the new file.
"""
path = Path(filepath)
file_name = path.stem
file_extension = path.suffix if path.suffix != "" else None
f = open(filepath, "rb")
data = f.read()
f.close()
metadata = UploadMetadata(
part_id=part_id,
component_type=component_type if component_type else "",
component_name=component_name if component_name else "",
method_name=method_name if method_name else "",
type=DataType.DATA_TYPE_FILE,
file_name=file_name,
method_parameters=method_parameters,
file_extension=file_extension if file_extension else "",
tags=tags,
)
response: FileUploadResponse = await self._file_upload(metadata=metadata, file_contents=FileData(data=data if data else bytes()))
return response.file_id
async def _file_upload(self, metadata: UploadMetadata, file_contents: FileData) -> FileUploadResponse:
request_metadata = FileUploadRequest(metadata=metadata)
request_file_contents = FileUploadRequest(file_contents=file_contents)
stream: Stream[FileUploadRequest, FileUploadResponse]
async with self._data_sync_client.FileUpload.open(metadata=self._metadata) as stream:
await stream.send_message(request_metadata)
await stream.send_message(request_file_contents, end=True)
response = await stream.recv_message()
if not response:
await stream.recv_trailing_metadata() # causes us to throw appropriate gRPC error.
raise TypeError("Response cannot be empty")
return response
[docs] @staticmethod
def create_filter(
component_name: Optional[str] = None,
component_type: Optional[str] = None,
method: Optional[str] = None,
robot_name: Optional[str] = None,
robot_id: Optional[str] = None,
part_name: Optional[str] = None,
part_id: Optional[str] = None,
location_ids: Optional[List[str]] = None,
organization_ids: Optional[List[str]] = None,
mime_type: Optional[List[str]] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
tags: Optional[List[str]] = None,
bbox_labels: Optional[List[str]] = None,
dataset_id: Optional[str] = None,
) -> Filter:
warnings.warn("DataClient.create_filter is deprecated. Use utils.create_filter instead.", DeprecationWarning, stacklevel=2)
return create_filter(
component_name,
component_type,
method,
robot_name,
robot_id,
part_name,
part_id,
location_ids,
organization_ids,
mime_type,
start_time,
end_time,
tags,
bbox_labels,
dataset_id,
)