Source code for viam.services.worldstatestore.worldstatestore
import abc
from typing import Any, AsyncGenerator, Final, List, Mapping, Optional
from viam.proto.common import Transform
from viam.proto.service.worldstatestore import (
StreamTransformChangesResponse,
)
from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE
from ..service_base import ServiceBase
[docs]class WorldStateStore(ServiceBase):
"""WorldStateStore is a Viam service that manages world state transforms.
The WorldStateStore service provides functionality to store, retrieve, and stream
changes to world state transforms, which represent the pose of objects in different
reference frames. This functionality can be used to create custom visualizations of the world state.
For more information, see `WorldStateStore service <https://docs.viam.com/dev/reference/apis/services/worldstatestore/>`_.
"""
API: Final = API( # pyright: ignore [reportIncompatibleVariableOverride]
RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE, "world_state_store"
)
[docs] @abc.abstractmethod
async def list_uuids(
self,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> List[bytes]:
"""List all world state transform UUIDs.
::
worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")
uuids = await worldstatestore.list_uuids()
Returns:
List[bytes]: A list of transform UUIDs
"""
...
[docs] @abc.abstractmethod
async def get_transform(
self,
uuid: bytes,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> Transform:
"""Get a world state transform by UUID.
::
worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")
transform = await worldstatestore.get_transform(uuid=b"some-uuid")
Args:
uuid (bytes): The UUID of the transform to retrieve
Returns:
Transform: The requested transform
"""
...
[docs] @abc.abstractmethod
async def stream_transform_changes(
self,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> AsyncGenerator[StreamTransformChangesResponse, None]:
"""Stream changes to world state transforms.
::
worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")
async for change in worldstatestore.stream_transform_changes():
print(f"Transform {change.transform.uuid} {change.change_type}")
Returns:
AsyncIterator[StreamTransformChangesResponse]: A stream of transform changes
"""
...