:py:mod:`viam.services.vision` ============================== .. py:module:: viam.services.vision Submodules ---------- .. toctree:: :titlesonly: :maxdepth: 1 client/index.rst service/index.rst vision/index.rst Package Contents ---------------- Classes ~~~~~~~ .. autoapisummary:: viam.services.vision.Classification viam.services.vision.Detection viam.services.vision.VisionClient viam.services.vision.Vision .. py:class:: Classification(*, class_name: str = ..., confidence: float = ...) Bases: :py:obj:`google.protobuf.message.Message` the general form of the output from a classifier .. py:attribute:: class_name :type: str the class name .. py:attribute:: confidence :type: float the confidence score of the classification .. py:class:: Detection(*, x_min: int | None = ..., y_min: int | None = ..., x_max: int | None = ..., y_max: int | None = ..., confidence: float = ..., class_name: str = ...) Bases: :py:obj:`google.protobuf.message.Message` Abstract base class for protocol messages. Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below. .. py:attribute:: x_min :type: int the four corners of the box .. py:attribute:: y_min :type: int .. py:attribute:: x_max :type: int .. py:attribute:: y_max :type: int .. py:attribute:: confidence :type: float the confidence of the detection .. py:attribute:: class_name :type: str label associated with the detected object .. py:method:: HasField(field_name: Literal[_x_max, b'_x_max', _x_min, b'_x_min', _y_max, b'_y_max', _y_min, b'_y_min', x_max, b'x_max', x_min, b'x_min', y_max, b'y_max', y_min, b'y_min']) -> bool Checks if a certain field is set for the message. For a oneof group, checks if any field inside is set. Note that if the field_name is not defined in the message descriptor, :exc:`ValueError` will be raised. :param field_name: The name of the field to check for presence. :type field_name: str :returns: Whether a value has been set for the named field. :rtype: bool :raises ValueError: if the `field_name` is not a member of this message. .. py:method:: WhichOneof(oneof_group: Literal[_x_max, b'_x_max']) -> Literal[x_max] | None WhichOneof(oneof_group: Literal[_x_min, b'_x_min']) -> Literal[x_min] | None WhichOneof(oneof_group: Literal[_y_max, b'_y_max']) -> Literal[y_max] | None WhichOneof(oneof_group: Literal[_y_min, b'_y_min']) -> Literal[y_min] | None Returns the name of the field that is set inside a oneof group. If no field is set, returns None. :param oneof_group: the name of the oneof group to check. :type oneof_group: str :returns: The name of the group that is set, or None. :rtype: str or None :raises ValueError: no group with the given name exists .. py:class:: VisionClient(name: str, channel: grpclib.client.Channel) Bases: :py:obj:`viam.services.vision.vision.Vision`, :py:obj:`viam.resource.rpc_client_base.ReconfigurableResourceRPCClientBase` Connect to the Vision service, which allows you to access various computer vision algorithms (like detection, segmentation, tracking, etc) that usually only require a camera or image input. .. py:attribute:: client :type: viam.proto.service.vision.VisionServiceStub .. py:method:: get_detections_from_camera(camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.service.vision.Detection] :async: Get a list of detections in the next image given a camera and a detector :: camera_name = "cam1" # Grab the detector you configured on your machine my_detector = VisionClient.from_robot(robot, "my_detector") # Get detections from the next image from the camera detections = await my_detector.get_detections_from_camera(camera_name) :param camera_name: The name of the camera to use for detection :type camera_name: str :returns: A list of 2D bounding boxes, their labels, and the confidence score of the labels, around the found objects in the next 2D image from the given camera, with the given detector applied to it. :rtype: List[viam.proto.service.vision.Detection] .. py:method:: get_detections(image: Union[viam.media.viam_rgba_plugin.Image.Image, viam.media.video.RawImage], *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.service.vision.Detection] :async: Get a list of detections in the given image using the specified detector :: # Grab camera from the machine cam1 = Camera.from_robot(robot, "cam1") # Get the detector you configured on your machine my_detector = VisionClient.from_robot(robot, "my_detector") # Get an image from the camera img = await cam1.get_image() # Get detections from that image detections = await my_detector.get_detections(img) :param image: The image to get detections from :type image: Image :returns: A list of 2D bounding boxes, their labels, and the confidence score of the labels, around the found objects in the next 2D image from the given camera, with the given detector applied to it. :rtype: List[viam.proto.service.vision.Detection] .. py:method:: get_classifications_from_camera(camera_name: str, count: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.service.vision.Classification] :async: Get a list of classifications in the next image given a camera and a classifier :: camera_name = "cam1" # Grab the classifier you configured on your machine my_classifier = VisionClient.from_robot(robot, "my_classifier") # Get the 2 classifications with the highest confidence scores from the next image from the camera classifications = await my_classifier.get_classifications_from_camera( camera_name, 2) :param camera_name: The name of the camera to use for detection :type camera_name: str :param count: The number of classifications desired :type count: int :returns: The list of Classifications :rtype: List[viam.proto.service.vision.Classification] .. py:method:: get_classifications(image: Union[viam.media.viam_rgba_plugin.Image.Image, viam.media.video.RawImage], count: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.service.vision.Classification] :async: Get a list of classifications in the given image using the specified classifier :: # Grab camera from the machine cam1 = Camera.from_robot(robot, "cam1") # Get the classifier you configured on your machine my_classifier = VisionClient.from_robot(robot, "my_classifier") # Get an image from the camera img = await cam1.get_image() # Get the 2 classifications with the highest confidence scores classifications = await my_classifier.get_classifications(img, 2) :param image: The image to get detections from :type image: Image :param count: The number of classifications desired :type count: int :returns: The list of Classifications :rtype: List[viam.proto.service.vision.Classification] .. py:method:: get_object_point_clouds(camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.common.PointCloudObject] :async: Returns a list of the 3D point cloud objects and associated metadata in the latest picture obtained from the specified 3D camera (using the specified segmenter). To deserialize the returned information into a numpy array, use the Open3D library. :: import numpy as np import open3d as o3d # Grab the 3D camera from the machine cam1 = Camera.from_robot(robot, "cam1") # Grab the object segmenter you configured on your machine my_segmenter = VisionClient.from_robot(robot, "my_segmenter") # Get the objects from the camera output objects = await my_segmenter.get_object_point_clouds(cam1) # write the first object point cloud into a temporary file with open("/tmp/pointcloud_data.pcd", "wb") as f: f.write(objects[0].point_cloud) pcd = o3d.io.read_point_cloud("/tmp/pointcloud_data.pcd") points = np.asarray(pcd.points) :param camera_name: The name of the camera :type camera_name: str :returns: The pointcloud objects with metadata :rtype: List[viam.proto.common.PointCloudObject] .. py:method:: do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: Optional[float] = None, **__) -> Mapping[str, viam.utils.ValueTypes] :async: Send/receive arbitrary commands. :: motion = MotionClient.from_robot(robot, "builtin") my_command = { "cmnd": "dosomething", "someparameter": 52 } # Can be used with any resource, using the motion service as an example await motion.do_command(command=my_command) :param command: The command to execute :type command: Dict[str, ValueTypes] :returns: Result of the executed command :rtype: Dict[str, ValueTypes] .. py:method:: from_robot(robot: viam.robot.client.RobotClient, name: str) -> typing_extensions.Self :classmethod: Get the service named ``name`` from the provided robot. :: async def connect() -> ViamClient: # Replace "" (including brackets) with your API key and "" with your API key ID dial_options = DialOptions.with_api_key("", "") return await ViamClient.create_from_dial_options(dial_options) async def main(): robot = await connect() # Can be used with any resource, using the motion service as an example motion = MotionClient.from_robot(robot=robot, name="builtin") robot.close() :param robot: The robot :type robot: RobotClient :param name: The name of the service :type name: str :returns: The service, if it exists on the robot :rtype: Self .. py:method:: get_resource_name(name: str) -> viam.proto.common.ResourceName :classmethod: Get the ResourceName for this Resource with the given name :: # Can be used with any resource, using an arm as an example my_arm_name = my_arm.get_resource_name("my_arm") :param name: The name of the Resource :type name: str .. py:method:: get_operation(kwargs: Mapping[str, Any]) -> viam.operations.Operation Get the ``Operation`` associated with the currently running function. When writing custom resources, you should get the ``Operation`` by calling this function and check to see if it's cancelled. If the ``Operation`` is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ). :param kwargs: The kwargs object containing the operation :type kwargs: Mapping[str, Any] :returns: The operation associated with this function :rtype: viam.operations.Operation .. py:method:: close() :async: Safely shut down the resource and prevent further use. Close must be idempotent. Later configuration may allow a resource to be "open" again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called. :: await component.close() .. py:class:: Vision(name: str) Bases: :py:obj:`viam.services.service_base.ServiceBase` Vision represents a Vision service. This acts as an abstract base class for any drivers representing specific arm implementations. This cannot be used on its own. If the ``__init__()`` function is overridden, it must call the ``super().__init__()`` function. .. py:attribute:: SUBTYPE :type: Final .. py:method:: get_detections_from_camera(camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.service.vision.Detection] :abstractmethod: :async: Get a list of detections in the next image given a camera and a detector :: camera_name = "cam1" # Grab the detector you configured on your machine my_detector = VisionClient.from_robot(robot, "my_detector") # Get detections from the next image from the camera detections = await my_detector.get_detections_from_camera(camera_name) :param camera_name: The name of the camera to use for detection :type camera_name: str :returns: A list of 2D bounding boxes, their labels, and the confidence score of the labels, around the found objects in the next 2D image from the given camera, with the given detector applied to it. :rtype: List[viam.proto.service.vision.Detection] .. py:method:: get_detections(image: Union[PIL.Image.Image, viam.media.video.RawImage], *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.service.vision.Detection] :abstractmethod: :async: Get a list of detections in the given image using the specified detector :: # Grab camera from the machine cam1 = Camera.from_robot(robot, "cam1") # Get the detector you configured on your machine my_detector = VisionClient.from_robot(robot, "my_detector") # Get an image from the camera img = await cam1.get_image() # Get detections from that image detections = await my_detector.get_detections(img) :param image: The image to get detections from :type image: Image :returns: A list of 2D bounding boxes, their labels, and the confidence score of the labels, around the found objects in the next 2D image from the given camera, with the given detector applied to it. :rtype: List[viam.proto.service.vision.Detection] .. py:method:: get_classifications_from_camera(camera_name: str, count: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.service.vision.Classification] :abstractmethod: :async: Get a list of classifications in the next image given a camera and a classifier :: camera_name = "cam1" # Grab the classifier you configured on your machine my_classifier = VisionClient.from_robot(robot, "my_classifier") # Get the 2 classifications with the highest confidence scores from the next image from the camera classifications = await my_classifier.get_classifications_from_camera( camera_name, 2) :param camera_name: The name of the camera to use for detection :type camera_name: str :param count: The number of classifications desired :type count: int :returns: The list of Classifications :rtype: List[viam.proto.service.vision.Classification] .. py:method:: get_classifications(image: Union[PIL.Image.Image, viam.media.video.RawImage], count: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.service.vision.Classification] :abstractmethod: :async: Get a list of classifications in the given image using the specified classifier :: # Grab camera from the machine cam1 = Camera.from_robot(robot, "cam1") # Get the classifier you configured on your machine my_classifier = VisionClient.from_robot(robot, "my_classifier") # Get an image from the camera img = await cam1.get_image() # Get the 2 classifications with the highest confidence scores classifications = await my_classifier.get_classifications(img, 2) :param image: The image to get detections from :type image: Image :param count: The number of classifications desired :type count: int :returns: The list of Classifications :rtype: List[viam.proto.service.vision.Classification] .. py:method:: get_object_point_clouds(camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None) -> List[viam.proto.common.PointCloudObject] :abstractmethod: :async: Returns a list of the 3D point cloud objects and associated metadata in the latest picture obtained from the specified 3D camera (using the specified segmenter). To deserialize the returned information into a numpy array, use the Open3D library. :: import numpy as np import open3d as o3d # Grab the 3D camera from the machine cam1 = Camera.from_robot(robot, "cam1") # Grab the object segmenter you configured on your machine my_segmenter = VisionClient.from_robot(robot, "my_segmenter") # Get the objects from the camera output objects = await my_segmenter.get_object_point_clouds(cam1) # write the first object point cloud into a temporary file with open("/tmp/pointcloud_data.pcd", "wb") as f: f.write(objects[0].point_cloud) pcd = o3d.io.read_point_cloud("/tmp/pointcloud_data.pcd") points = np.asarray(pcd.points) :param camera_name: The name of the camera :type camera_name: str :returns: The pointcloud objects with metadata :rtype: List[viam.proto.common.PointCloudObject] .. py:method:: from_robot(robot: viam.robot.client.RobotClient, name: str) -> typing_extensions.Self :classmethod: Get the service named ``name`` from the provided robot. :: async def connect() -> ViamClient: # Replace "" (including brackets) with your API key and "" with your API key ID dial_options = DialOptions.with_api_key("", "") return await ViamClient.create_from_dial_options(dial_options) async def main(): robot = await connect() # Can be used with any resource, using the motion service as an example motion = MotionClient.from_robot(robot=robot, name="builtin") robot.close() :param robot: The robot :type robot: RobotClient :param name: The name of the service :type name: str :returns: The service, if it exists on the robot :rtype: Self .. py:method:: do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, viam.utils.ValueTypes] :abstractmethod: :async: Send/receive arbitrary commands. :: motion = MotionClient.from_robot(robot, "builtin") my_command = { "cmnd": "dosomething", "someparameter": 52 } # Can be used with any resource, using the motion service as an example await motion.do_command(command=my_command) :param command: The command to execute :type command: Dict[str, ValueTypes] :returns: Result of the executed command :rtype: Dict[str, ValueTypes] .. py:method:: get_resource_name(name: str) -> viam.proto.common.ResourceName :classmethod: Get the ResourceName for this Resource with the given name :: # Can be used with any resource, using an arm as an example my_arm_name = my_arm.get_resource_name("my_arm") :param name: The name of the Resource :type name: str .. py:method:: get_operation(kwargs: Mapping[str, Any]) -> viam.operations.Operation Get the ``Operation`` associated with the currently running function. When writing custom resources, you should get the ``Operation`` by calling this function and check to see if it's cancelled. If the ``Operation`` is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ). :param kwargs: The kwargs object containing the operation :type kwargs: Mapping[str, Any] :returns: The operation associated with this function :rtype: viam.operations.Operation .. py:method:: close() :async: Safely shut down the resource and prevent further use. Close must be idempotent. Later configuration may allow a resource to be "open" again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called. :: await component.close()