viam.services.vision

Submodules

Classes

Classification

the general form of the output from a classifier

Detection

Abstract base class for protocol messages.

VisionClient

Connect to the Vision service, which allows you to access various computer vision algorithms

CaptureAllResult

CaptureAllResult represents the collection of things that you have requested from the

Vision

Vision represents a Vision service.

Package Contents

class viam.services.vision.Classification(*, class_name: str = ..., confidence: float = ...)

Bases: google.protobuf.message.Message

the general form of the output from a classifier

class_name: str

the class name

confidence: float

the confidence score of the classification

class viam.services.vision.Detection(*, x_min: int | None = ..., y_min: int | None = ..., x_max: int | None = ..., y_max: int | None = ..., confidence: float = ..., class_name: str = ...)

Bases: google.protobuf.message.Message

Abstract base class for protocol messages.

Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below.

x_min: int

the four corners of the box

y_min: int
x_max: int
y_max: int
confidence: float

the confidence of the detection

class_name: str

label associated with the detected object

HasField(field_name: Literal['_x_max', b'_x_max', '_x_min', b'_x_min', '_y_max', b'_y_max', '_y_min', b'_y_min', 'x_max', b'x_max', 'x_min', b'x_min', 'y_max', b'y_max', 'y_min', b'y_min']) bool

Checks if a certain field is set for the message.

For a oneof group, checks if any field inside is set. Note that if the field_name is not defined in the message descriptor, ValueError will be raised.

Parameters:

field_name (str) – The name of the field to check for presence.

Returns:

Whether a value has been set for the named field.

Return type:

bool

Raises:

ValueError – if the field_name is not a member of this message.

WhichOneof(oneof_group: Literal['_x_max', b'_x_max']) Literal['x_max'] | None
WhichOneof(oneof_group: Literal['_x_min', b'_x_min']) Literal['x_min'] | None
WhichOneof(oneof_group: Literal['_y_max', b'_y_max']) Literal['y_max'] | None
WhichOneof(oneof_group: Literal['_y_min', b'_y_min']) Literal['y_min'] | None

Returns the name of the field that is set inside a oneof group.

If no field is set, returns None.

Parameters:

oneof_group (str) – the name of the oneof group to check.

Returns:

The name of the group that is set, or None.

Return type:

str or None

Raises:

ValueError – no group with the given name exists

class viam.services.vision.VisionClient(name: str, channel: grpclib.client.Channel)[source]

Bases: viam.services.vision.vision.Vision, viam.resource.rpc_client_base.ReconfigurableResourceRPCClientBase

Connect to the Vision service, which allows you to access various computer vision algorithms (like detection, segmentation, tracking, etc) that usually only require a camera or image input.

client: viam.proto.service.vision.VisionServiceStub
channel
async capture_all_from_camera(camera_name: str, return_image: bool = False, return_classifications: bool = False, return_detections: bool = False, return_object_point_clouds: bool = False, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) viam.services.vision.vision.CaptureAllResult[source]

Get the next image, detections, classifications, and objects all together, given a camera name. Used for visualization.

my_detector = VisionClient.from_robot(machine, "my_detector")

# Get the captured data for a camera
result = await my_detector.capture_all_from_camera(
    "my_camera",
    return_image=True,
    return_detections=True,
)
image = result.image
detections = result.detections
Parameters:
  • camera_name (str) – The name of the camera to use for detection

  • return_image (bool) – Ask the vision service to return the camera’s latest image

  • return_classifications (bool) – Ask the vision service to return its latest classifications

  • return_detections (bool) – Ask the vision service to return its latest detections

  • return_object_point_clouds (bool) – Ask the vision service to return its latest 3D segmentations

Returns:

A class that stores all potential returns from the vision service. It can return the image from the camera along with its associated detections, classifications, and objects, as well as any extra info the model may provide.

Return type:

vision.CaptureAllResult

For more information, see Computer Vision service.

async get_detections_from_camera(camera_name: str, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) List[viam.proto.service.vision.Detection][source]

Get a list of detections in the next image given a camera and a detector

my_detector = VisionClient.from_robot(robot=machine, "my_detector")

# Get detections for the next image from the specified camera
detections = await my_detector.get_detections_from_camera("my_camera")
Parameters:

camera_name (str) – The name of the camera to use for detection

Raises:

ViamError – Raised if given an image without a specified width and height

Returns:

A list of 2D bounding boxes, their labels, and the confidence score of the labels, around the found objects in the next 2D image from the given camera, with the given detector applied to it.

Return type:

List[viam.proto.service.vision.Detection]

For more information, see Computer Vision service.

async get_detections(image: viam.media.video.ViamImage, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) List[viam.proto.service.vision.Detection][source]

Get a list of detections in the given image using the specified detector

my_camera = Camera.from_robot(robot=machine, "my_camera")
my_detector = VisionClient.from_robot(robot=machine, "my_detector")

# Get an image from the camera
img = await my_camera.get_image()

# Get detections for that image
detections = await my_detector.get_detections(img)
Parameters:

image (ViamImage) – The image to get detections for

Raises:

ViamError – Raised if given an image without a specified width and height

Returns:

A list of 2D bounding boxes, their labels, and the confidence score of the labels, around the found objects in the next 2D image from the given camera, with the given detector applied to it.

Return type:

List[viam.proto.service.vision.Detection]

For more information, see Computer Vision service.

async get_classifications_from_camera(camera_name: str, count: int, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) List[viam.proto.service.vision.Classification][source]

Get a list of classifications in the next image given a camera and a classifier

my_classifier = VisionClient.from_robot(robot=machine, "my_classifier")

# Get the 2 classifications with the highest confidence scores for the next image from the camera
classifications = await my_classifier.get_classifications_from_camera(
    "my_camera", 2)
Parameters:
  • camera_name (str) – The name of the camera to use for detection

  • count (int) – The number of classifications desired

Returns:

The list of Classifications

Return type:

List[viam.proto.service.vision.Classification]

For more information, see Computer Vision service.

async get_classifications(image: viam.media.video.ViamImage, count: int, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) List[viam.proto.service.vision.Classification][source]

Get a list of classifications in the given image using the specified classifier

my_camera = Camera.from_robot(robot=machine, "my_camera")
my_classifier = VisionClient.from_robot(robot=machine, "my_classifier")

# Get an image from the camera
img = await my_camera.get_image()

# Get the 2 classifications with the highest confidence scores for the image
classifications = await my_classifier.get_classifications(img, 2)
Parameters:
  • image (ViamImage) – The image to get detections for

  • count (int) – The number of classifications desired

Returns:

The list of Classifications

Return type:

List[viam.proto.service.vision.Classification]

For more information, see Computer Vision service.

async get_object_point_clouds(camera_name: str, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) List[viam.proto.common.PointCloudObject][source]

Returns a list of the 3D point cloud objects and associated metadata in the latest picture obtained from the specified 3D camera (using the specified segmenter).

To deserialize the returned information into a numpy array, use the Open3D library.

import numpy as np
import open3d as o3d

my_segmenter = VisionClient.from_robot(robot=machine, "my_segmenter")
# Get the objects from the camera output
objects = await my_segmenter.get_object_point_clouds("my_camera")
# write the first object point cloud into a temporary file
with open("/tmp/pointcloud_data.pcd", "wb") as f:
    f.write(objects[0].point_cloud)
pcd = o3d.io.read_point_cloud("/tmp/pointcloud_data.pcd")
points = np.asarray(pcd.points)
Parameters:

camera_name (str) – The name of the camera

Returns:

The pointcloud objects with metadata

Return type:

List[viam.proto.common.PointCloudObject]

For more information, see Computer Vision service.

async get_properties(*, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) viam.services.vision.vision.Vision.Properties[source]

Get info about what vision methods the vision service provides. Currently returns boolean values that state whether the service implements the classification, detection, and/or 3D object segmentation methods.

my_detector = VisionClient.from_robot(robot=machine, "my_detector")
properties = await my_detector.get_properties()
detections_supported = properties.detections_supported
classifications_supported = properties.classifications_supported
Returns:

The properties of the vision service

Return type:

Properties

For more information, see Computer Vision service.

async do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: float | None = None, **kwargs) Mapping[str, viam.utils.ValueTypes][source]

Send/receive arbitrary commands.

service = SERVICE.from_robot(robot=machine, "builtin")  # replace SERVICE with the appropriate class

my_command = {
  "cmnd": "dosomething",
  "someparameter": 52
}

# Can be used with any resource, using the motion service as an example
await service.do_command(command=my_command)
Parameters:

command (Dict[str, ValueTypes]) – The command to execute

Returns:

Result of the executed command

Return type:

Dict[str, ValueTypes]

classmethod from_robot(robot: viam.robot.client.RobotClient, name: str) typing_extensions.Self

Get the service named name from the provided robot.

async def connect() -> RobotClient:
    # Replace "<API-KEY>" (including brackets) with your API key and "<API-KEY-ID>" with your API key ID
    options = RobotClient.Options.with_api_key("<API-KEY>", "<API-KEY-ID>")
    # Replace "<MACHINE-URL>" (included brackets) with your machine's connection URL or FQDN
    return await RobotClient.at_address("<MACHINE-URL>", options)

async def main():
    robot = await connect()

    # Can be used with any resource, using the motion service as an example
    motion = MotionClient.from_robot(robot=machine, name="builtin")

    robot.close()
Parameters:
  • robot (RobotClient) – The robot

  • name (str) – The name of the service

Returns:

The service, if it exists on the robot

Return type:

Self

classmethod get_resource_name(name: str) viam.proto.common.ResourceName

Get the ResourceName for this Resource with the given name

# Can be used with any resource, using an arm as an example
my_arm_name = Arm.get_resource_name("my_arm")
Parameters:

name (str) – The name of the Resource

Returns:

The ResourceName of this Resource

Return type:

ResourceName

get_operation(kwargs: Mapping[str, Any]) viam.operations.Operation

Get the Operation associated with the currently running function.

When writing custom resources, you should get the Operation by calling this function and check to see if it’s cancelled. If the Operation is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ).

Parameters:

kwargs (Mapping[str, Any]) – The kwargs object containing the operation

Returns:

The operation associated with this function

Return type:

viam.operations.Operation

async close()

Safely shut down the resource and prevent further use.

Close must be idempotent. Later configuration may allow a resource to be “open” again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called.

await component.close()
class viam.services.vision.CaptureAllResult(image: viam.media.video.ViamImage | None = None, classifications: List[viam.proto.service.vision.Classification] | None = None, detections: List[viam.proto.service.vision.Detection] | None = None, objects: List[viam.proto.common.PointCloudObject] | None = None, extra: Mapping[str, viam.utils.ValueTypes] | None = None)[source]

CaptureAllResult represents the collection of things that you have requested from the CaptureAllFromCamera method. This is used most often for visualization purposes, since normally, returning the image on every call to a classifier/detector/etc would be costly and unnecessary. The default result for each field is None rather than the empty list to distinguish between “there was no request for the classifier/detector to return a result” vs. “the classifier/detector was requested, but there were no results”.

image
classifications
detections
objects
extra
class viam.services.vision.Vision(name: str, *, logger: logging.Logger | None = None)[source]

Bases: viam.services.service_base.ServiceBase

Vision represents a Vision service.

This acts as an abstract base class for any drivers representing specific vision implementations. This cannot be used on its own. If the __init__() function is overridden, it must call the super().__init__() function.

For more information, see Computer Vision service.

SUBTYPE: Final

The Subtype of the Resource

Properties: TypeAlias = GetPropertiesResponse

Properties is a class that states what features are supported on the associated vision service. Currently, these are the following properties: - classifications_supported (bool): GetClassifications and GetClassificationsFromCamera are implemented. - detections_supported (bool): GetDetections and GetDetectionsFromCamera are implemented. - object_point_clouds_supported (bool): GetObjectPointClouds is implemented.

abstract capture_all_from_camera(camera_name: str, return_image: bool = False, return_classifications: bool = False, return_detections: bool = False, return_object_point_clouds: bool = False, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) CaptureAllResult[source]
Async:

Get the next image, detections, classifications, and objects all together, given a camera name. Used for visualization.

my_detector = VisionClient.from_robot(machine, "my_detector")

# Get the captured data for a camera
result = await my_detector.capture_all_from_camera(
    "my_camera",
    return_image=True,
    return_detections=True,
)
image = result.image
detections = result.detections
Parameters:
  • camera_name (str) – The name of the camera to use for detection

  • return_image (bool) – Ask the vision service to return the camera’s latest image

  • return_classifications (bool) – Ask the vision service to return its latest classifications

  • return_detections (bool) – Ask the vision service to return its latest detections

  • return_object_point_clouds (bool) – Ask the vision service to return its latest 3D segmentations

Returns:

A class that stores all potential returns from the vision service. It can return the image from the camera along with its associated detections, classifications, and objects, as well as any extra info the model may provide.

Return type:

vision.CaptureAllResult

For more information, see Computer Vision service.

abstract get_detections_from_camera(camera_name: str, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) List[viam.proto.service.vision.Detection][source]
Async:

Get a list of detections in the next image given a camera and a detector

my_detector = VisionClient.from_robot(robot=machine, "my_detector")

# Get detections for the next image from the specified camera
detections = await my_detector.get_detections_from_camera("my_camera")
Parameters:

camera_name (str) – The name of the camera to use for detection

Raises:

ViamError – Raised if given an image without a specified width and height

Returns:

A list of 2D bounding boxes, their labels, and the confidence score of the labels, around the found objects in the next 2D image from the given camera, with the given detector applied to it.

Return type:

List[viam.proto.service.vision.Detection]

For more information, see Computer Vision service.

abstract get_detections(image: viam.media.video.ViamImage, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) List[viam.proto.service.vision.Detection][source]
Async:

Get a list of detections in the given image using the specified detector

my_camera = Camera.from_robot(robot=machine, "my_camera")
my_detector = VisionClient.from_robot(robot=machine, "my_detector")

# Get an image from the camera
img = await my_camera.get_image()

# Get detections for that image
detections = await my_detector.get_detections(img)
Parameters:

image (ViamImage) – The image to get detections for

Raises:

ViamError – Raised if given an image without a specified width and height

Returns:

A list of 2D bounding boxes, their labels, and the confidence score of the labels, around the found objects in the next 2D image from the given camera, with the given detector applied to it.

Return type:

List[viam.proto.service.vision.Detection]

For more information, see Computer Vision service.

abstract get_classifications_from_camera(camera_name: str, count: int, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) List[viam.proto.service.vision.Classification][source]
Async:

Get a list of classifications in the next image given a camera and a classifier

my_classifier = VisionClient.from_robot(robot=machine, "my_classifier")

# Get the 2 classifications with the highest confidence scores for the next image from the camera
classifications = await my_classifier.get_classifications_from_camera(
    "my_camera", 2)
Parameters:
  • camera_name (str) – The name of the camera to use for detection

  • count (int) – The number of classifications desired

Returns:

The list of Classifications

Return type:

List[viam.proto.service.vision.Classification]

For more information, see Computer Vision service.

abstract get_classifications(image: viam.media.video.ViamImage, count: int, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) List[viam.proto.service.vision.Classification][source]
Async:

Get a list of classifications in the given image using the specified classifier

my_camera = Camera.from_robot(robot=machine, "my_camera")
my_classifier = VisionClient.from_robot(robot=machine, "my_classifier")

# Get an image from the camera
img = await my_camera.get_image()

# Get the 2 classifications with the highest confidence scores for the image
classifications = await my_classifier.get_classifications(img, 2)
Parameters:
  • image (ViamImage) – The image to get detections for

  • count (int) – The number of classifications desired

Returns:

The list of Classifications

Return type:

List[viam.proto.service.vision.Classification]

For more information, see Computer Vision service.

abstract get_object_point_clouds(camera_name: str, *, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) List[viam.proto.common.PointCloudObject][source]
Async:

Returns a list of the 3D point cloud objects and associated metadata in the latest picture obtained from the specified 3D camera (using the specified segmenter).

To deserialize the returned information into a numpy array, use the Open3D library.

import numpy as np
import open3d as o3d

my_segmenter = VisionClient.from_robot(robot=machine, "my_segmenter")
# Get the objects from the camera output
objects = await my_segmenter.get_object_point_clouds("my_camera")
# write the first object point cloud into a temporary file
with open("/tmp/pointcloud_data.pcd", "wb") as f:
    f.write(objects[0].point_cloud)
pcd = o3d.io.read_point_cloud("/tmp/pointcloud_data.pcd")
points = np.asarray(pcd.points)
Parameters:

camera_name (str) – The name of the camera

Returns:

The pointcloud objects with metadata

Return type:

List[viam.proto.common.PointCloudObject]

For more information, see Computer Vision service.

abstract get_properties(*, extra: Mapping[str, viam.utils.ValueTypes] | None = None, timeout: float | None = None) Properties[source]
Async:

Get info about what vision methods the vision service provides. Currently returns boolean values that state whether the service implements the classification, detection, and/or 3D object segmentation methods.

my_detector = VisionClient.from_robot(robot=machine, "my_detector")
properties = await my_detector.get_properties()
detections_supported = properties.detections_supported
classifications_supported = properties.classifications_supported
Returns:

The properties of the vision service

Return type:

Properties

For more information, see Computer Vision service.

classmethod from_robot(robot: viam.robot.client.RobotClient, name: str) typing_extensions.Self

Get the service named name from the provided robot.

async def connect() -> RobotClient:
    # Replace "<API-KEY>" (including brackets) with your API key and "<API-KEY-ID>" with your API key ID
    options = RobotClient.Options.with_api_key("<API-KEY>", "<API-KEY-ID>")
    # Replace "<MACHINE-URL>" (included brackets) with your machine's connection URL or FQDN
    return await RobotClient.at_address("<MACHINE-URL>", options)

async def main():
    robot = await connect()

    # Can be used with any resource, using the motion service as an example
    motion = MotionClient.from_robot(robot=machine, name="builtin")

    robot.close()
Parameters:
  • robot (RobotClient) – The robot

  • name (str) – The name of the service

Returns:

The service, if it exists on the robot

Return type:

Self

abstract do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: float | None = None, **kwargs) Mapping[str, viam.utils.ValueTypes]
Async:

Send/receive arbitrary commands.

service = SERVICE.from_robot(robot=machine, "builtin")  # replace SERVICE with the appropriate class

my_command = {
  "cmnd": "dosomething",
  "someparameter": 52
}

# Can be used with any resource, using the motion service as an example
await service.do_command(command=my_command)
Parameters:

command (Dict[str, ValueTypes]) – The command to execute

Returns:

Result of the executed command

Return type:

Dict[str, ValueTypes]

classmethod get_resource_name(name: str) viam.proto.common.ResourceName

Get the ResourceName for this Resource with the given name

# Can be used with any resource, using an arm as an example
my_arm_name = Arm.get_resource_name("my_arm")
Parameters:

name (str) – The name of the Resource

Returns:

The ResourceName of this Resource

Return type:

ResourceName

get_operation(kwargs: Mapping[str, Any]) viam.operations.Operation

Get the Operation associated with the currently running function.

When writing custom resources, you should get the Operation by calling this function and check to see if it’s cancelled. If the Operation is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ).

Parameters:

kwargs (Mapping[str, Any]) – The kwargs object containing the operation

Returns:

The operation associated with this function

Return type:

viam.operations.Operation

async close()

Safely shut down the resource and prevent further use.

Close must be idempotent. Later configuration may allow a resource to be “open” again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called.

await component.close()