from typing import Any, List, Mapping, Optional
from grpclib.client import Channel
from viam.errors import ViamError
from viam.media.video import CameraMimeType, ViamImage
from viam.proto.common import DoCommandRequest, DoCommandResponse, PointCloudObject
from viam.proto.service.vision import (
CaptureAllFromCameraRequest,
CaptureAllFromCameraResponse,
Classification,
Detection,
GetClassificationsFromCameraRequest,
GetClassificationsFromCameraResponse,
GetClassificationsRequest,
GetClassificationsResponse,
GetDetectionsFromCameraRequest,
GetDetectionsFromCameraResponse,
GetDetectionsRequest,
GetDetectionsResponse,
GetObjectPointCloudsRequest,
GetObjectPointCloudsResponse,
GetPropertiesRequest,
GetPropertiesResponse,
VisionServiceStub,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict
from .vision import CaptureAllResult, Vision
[docs]class VisionClient(Vision, ReconfigurableResourceRPCClientBase):
"""
Connect to the Vision service, which allows you to access various computer vision algorithms
(like detection, segmentation, tracking, etc) that usually only require a camera or image input.
"""
client: VisionServiceStub
def __init__(self, name: str, channel: Channel):
super().__init__(name)
self.channel = channel
self.client = VisionServiceStub(channel)
[docs] async def capture_all_from_camera(
self,
camera_name: str,
return_image: bool = False,
return_classifications: bool = False,
return_detections: bool = False,
return_object_point_clouds: bool = False,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> CaptureAllResult:
md = kwargs.get("metadata", self.Metadata()).proto
request = CaptureAllFromCameraRequest(
name=self.name,
camera_name=camera_name,
return_image=return_image,
return_classifications=return_classifications,
return_detections=return_detections,
return_object_point_clouds=return_object_point_clouds,
extra=dict_to_struct(extra),
)
response: CaptureAllFromCameraResponse = await self.client.CaptureAllFromCamera(request, timeout=timeout, metadata=md)
result = CaptureAllResult()
result.extra = struct_to_dict(response.extra)
if return_image:
mime_type = CameraMimeType.from_proto(response.image.format)
img = ViamImage(response.image.image, mime_type)
result.image = img
if return_classifications:
result.classifications = list(response.classifications)
if return_detections:
result.detections = list(response.detections)
if return_object_point_clouds:
result.objects = list(response.objects)
return result
[docs] async def get_detections_from_camera(
self,
camera_name: str,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> List[Detection]:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetDetectionsFromCameraRequest(name=self.name, camera_name=camera_name, extra=dict_to_struct(extra))
response: GetDetectionsFromCameraResponse = await self.client.GetDetectionsFromCamera(request, timeout=timeout, metadata=md)
return list(response.detections)
[docs] async def get_detections(
self,
image: ViamImage,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> List[Detection]:
md = kwargs.get("metadata", self.Metadata()).proto
mime_type = CameraMimeType.JPEG
if image.width is None or image.height is None:
raise ViamError(f"image {image} needs to have a specified width and height")
else:
request = GetDetectionsRequest(
name=self.name,
image=image.data,
width=image.width,
height=image.height,
mime_type=mime_type,
extra=dict_to_struct(extra),
)
response: GetDetectionsResponse = await self.client.GetDetections(request, timeout=timeout, metadata=md)
return list(response.detections)
[docs] async def get_classifications_from_camera(
self,
camera_name: str,
count: int,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> List[Classification]:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetClassificationsFromCameraRequest(name=self.name, camera_name=camera_name, n=count, extra=dict_to_struct(extra))
response: GetClassificationsFromCameraResponse = await self.client.GetClassificationsFromCamera(
request, timeout=timeout, metadata=md
)
return list(response.classifications)
[docs] async def get_classifications(
self,
image: ViamImage,
count: int,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> List[Classification]:
md = kwargs.get("metadata", self.Metadata()).proto
mime_type = CameraMimeType.JPEG
if image.width is None or image.height is None:
raise ViamError(f"image {image} needs to have a specified width and height")
request = GetClassificationsRequest(
name=self.name,
image=image.data,
width=image.width,
height=image.height,
mime_type=mime_type,
n=count,
extra=dict_to_struct(extra),
)
response: GetClassificationsResponse = await self.client.GetClassifications(request, timeout=timeout, metadata=md)
return list(response.classifications)
[docs] async def get_object_point_clouds(
self,
camera_name: str,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> List[PointCloudObject]:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetObjectPointCloudsRequest(
name=self.name,
camera_name=camera_name,
mime_type=CameraMimeType.PCD,
extra=dict_to_struct(extra),
)
response: GetObjectPointCloudsResponse = await self.client.GetObjectPointClouds(request, timeout=timeout, metadata=md)
return list(response.objects)
[docs] async def get_properties(
self,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Vision.Properties:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetPropertiesRequest(
name=self.name,
extra=dict_to_struct(extra),
)
response: GetPropertiesResponse = await self.client.GetProperties(request, timeout=timeout, metadata=md)
return response
[docs] async def do_command(
self,
command: Mapping[str, ValueTypes],
*,
timeout: Optional[float] = None,
**kwargs,
) -> Mapping[str, ValueTypes]:
md = kwargs.get("metadata", self.Metadata()).proto
request = DoCommandRequest(name=self.name, command=dict_to_struct(command))
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md)
return struct_to_dict(response.result)