Source code for viam.services.vision.client

from io import BytesIO
from typing import Any, List, Mapping, Optional, Union

from grpclib.client import Channel

from viam.media.viam_rgba_plugin import Image
from viam.media.video import CameraMimeType, RawImage
from viam.proto.common import DoCommandRequest, DoCommandResponse, PointCloudObject
from viam.proto.service.vision import (
    Classification,
    Detection,
    GetClassificationsFromCameraRequest,
    GetClassificationsFromCameraResponse,
    GetClassificationsRequest,
    GetClassificationsResponse,
    GetDetectionsFromCameraRequest,
    GetDetectionsFromCameraResponse,
    GetDetectionsRequest,
    GetDetectionsResponse,
    GetObjectPointCloudsRequest,
    GetObjectPointCloudsResponse,
    VisionServiceStub,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import ValueTypes, dict_to_struct, struct_to_dict

from .vision import Vision


[docs]class VisionClient(Vision, ReconfigurableResourceRPCClientBase): """ Connect to the Vision service, which allows you to access various computer vision algorithms (like detection, segmentation, tracking, etc) that usually only require a camera or image input. """ client: VisionServiceStub def __init__(self, name: str, channel: Channel): super().__init__(name) self.channel = channel self.client = VisionServiceStub(channel)
[docs] async def get_detections_from_camera( self, camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, ) -> List[Detection]: if extra is None: extra = {} request = GetDetectionsFromCameraRequest(name=self.name, camera_name=camera_name, extra=dict_to_struct(extra)) response: GetDetectionsFromCameraResponse = await self.client.GetDetectionsFromCamera(request, timeout=timeout) return list(response.detections)
[docs] async def get_detections( self, image: Union[Image.Image, RawImage], *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, ) -> List[Detection]: if extra is None: extra = {} mime_type = CameraMimeType.JPEG if isinstance(image, RawImage): image = Image.open(BytesIO(image.data), formats=[mime_type.name]) request = GetDetectionsRequest( name=self.name, image=mime_type.encode_image(image), width=image.width, height=image.height, mime_type=mime_type, extra=dict_to_struct(extra), ) response: GetDetectionsResponse = await self.client.GetDetections(request, timeout=timeout) return list(response.detections)
[docs] async def get_classifications_from_camera( self, camera_name: str, count: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, ) -> List[Classification]: if extra is None: extra = {} request = GetClassificationsFromCameraRequest(name=self.name, camera_name=camera_name, n=count, extra=dict_to_struct(extra)) response: GetClassificationsFromCameraResponse = await self.client.GetClassificationsFromCamera(request, timeout=timeout) return list(response.classifications)
[docs] async def get_classifications( self, image: Union[Image.Image, RawImage], count: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, ) -> List[Classification]: if extra is None: extra = {} mime_type = CameraMimeType.JPEG if isinstance(image, RawImage): image = Image.open(BytesIO(image.data), formats=[mime_type.name]) request = GetClassificationsRequest( name=self.name, image=mime_type.encode_image(image), width=image.width, height=image.height, mime_type=mime_type, n=count, extra=dict_to_struct(extra), ) response: GetClassificationsResponse = await self.client.GetClassifications(request, timeout=timeout) return list(response.classifications)
[docs] async def get_object_point_clouds( self, camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, ) -> List[PointCloudObject]: if extra is None: extra = {} request = GetObjectPointCloudsRequest( name=self.name, camera_name=camera_name, mime_type=CameraMimeType.PCD, extra=dict_to_struct(extra), ) response: GetObjectPointCloudsResponse = await self.client.GetObjectPointClouds(request, timeout=timeout) return list(response.objects)
[docs] async def do_command( self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **__, ) -> Mapping[str, ValueTypes]: request = DoCommandRequest(name=self.name, command=dict_to_struct(command)) response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout) return struct_to_dict(response.result)