viam.services.worldstatestore

Submodules

Classes

WorldStateStoreClient

gRPC client for the WorldStateStore service.

WorldStateStoreService

Helper class that provides a standard way to create an ABC using

WorldStateStore

WorldStateStore is a Viam service that manages world state transforms.

StreamTransformChangesResponse

Abstract base class for protocol messages.

TransformChangeType

Package Contents

class viam.services.worldstatestore.WorldStateStoreClient(name: str, channel: grpclib.client.Channel)[source]

Bases: viam.services.worldstatestore.worldstatestore.WorldStateStore, viam.resource.rpc_client_base.ReconfigurableResourceRPCClientBase

gRPC client for the WorldStateStore service.

client: viam.proto.service.worldstatestore.WorldStateStoreServiceStub
channel
async list_uuids(*, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) List[bytes][source]

List all world state transform UUIDs.

worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")

uuids = await worldstatestore.list_uuids()
Returns:

A list of transform UUIDs

Return type:

List[bytes]

async get_transform(uuid: bytes, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) viam.proto.common.Transform[source]

Get a world state transform by UUID.

worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")

transform = await worldstatestore.get_transform(uuid=b"some-uuid")
Parameters:

uuid (bytes) – The UUID of the transform to retrieve

Returns:

The requested transform

Return type:

Transform

async stream_transform_changes(*, extra: Mapping[str, Any] | None = None, timeout: float | None = None, **kwargs) AsyncGenerator[viam.proto.service.worldstatestore.StreamTransformChangesResponse, None][source]

Stream changes to world state transforms.

worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")

async for change in worldstatestore.stream_transform_changes():
    print(f"Transform {change.transform.uuid} {change.change_type}")
Returns:

A stream of transform changes

Return type:

AsyncIterator[StreamTransformChangesResponse]

async do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: float | None = None, **kwargs) Mapping[str, viam.utils.ValueTypes][source]

Send/receive arbitrary commands.

service = SERVICE.from_robot(robot=machine, "builtin")  # replace SERVICE with the appropriate class

my_command = {
  "cmnd": "dosomething",
  "someparameter": 52
}

# Can be used with any resource, using the motion service as an example
await service.do_command(command=my_command)
Parameters:

command (Dict[str, ValueTypes]) – The command to execute

Returns:

Result of the executed command

Return type:

Dict[str, ValueTypes]

classmethod from_robot(robot: viam.robot.client.RobotClient, name: str) typing_extensions.Self

Get the service named name from the provided robot.

async def connect() -> RobotClient:
    # Replace "<API-KEY>" (including brackets) with your API key and "<API-KEY-ID>" with your API key ID
    options = RobotClient.Options.with_api_key("<API-KEY>", "<API-KEY-ID>")
    # Replace "<MACHINE-URL>" (included brackets) with your machine's connection URL or FQDN
    return await RobotClient.at_address("<MACHINE-URL>", options)

async def main():
    robot = await connect()

    # Can be used with any resource, using the motion service as an example
    motion = MotionClient.from_robot(robot=machine, name="builtin")

    robot.close()
Parameters:
  • robot (RobotClient) – The robot

  • name (str) – The name of the service

Returns:

The service, if it exists on the robot

Return type:

Self

classmethod get_resource_name(name: str) viam.proto.common.ResourceName

Get the ResourceName for this Resource with the given name

# Can be used with any resource, using an arm as an example
my_arm_name = Arm.get_resource_name("my_arm")
Parameters:

name (str) – The name of the Resource

Returns:

The ResourceName of this Resource

Return type:

ResourceName

get_operation(kwargs: Mapping[str, Any]) viam.operations.Operation

Get the Operation associated with the currently running function.

When writing custom resources, you should get the Operation by calling this function and check to see if it’s cancelled. If the Operation is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ).

Parameters:

kwargs (Mapping[str, Any]) – The kwargs object containing the operation

Returns:

The operation associated with this function

Return type:

viam.operations.Operation

async close()

Safely shut down the resource and prevent further use.

Close must be idempotent. Later configuration may allow a resource to be “open” again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called.

await component.close()
class viam.services.worldstatestore.WorldStateStoreService(manager: viam.resource.manager.ResourceManager)[source]

Bases: viam.proto.service.worldstatestore.UnimplementedWorldStateStoreServiceBase, viam.resource.rpc_service_base.ResourceRPCServiceBase[viam.services.worldstatestore.worldstatestore.WorldStateStore]

Helper class that provides a standard way to create an ABC using inheritance.

RESOURCE_TYPE
async ListUUIDs(stream: grpclib.server.Stream[viam.proto.service.worldstatestore.ListUUIDsRequest, viam.proto.service.worldstatestore.ListUUIDsResponse]) None[source]
async GetTransform(stream: grpclib.server.Stream[viam.proto.service.worldstatestore.GetTransformRequest, viam.proto.service.worldstatestore.GetTransformResponse]) None[source]
async StreamTransformChanges(stream: grpclib.server.Stream[viam.proto.service.worldstatestore.StreamTransformChangesRequest, viam.proto.service.worldstatestore.StreamTransformChangesResponse]) None[source]
async DoCommand(stream: grpclib.server.Stream[viam.proto.common.DoCommandRequest, viam.proto.common.DoCommandResponse]) None[source]
class viam.services.worldstatestore.WorldStateStore(name: str, *, logger: logging.Logger | None = None)[source]

Bases: viam.services.service_base.ServiceBase

WorldStateStore is a Viam service that manages world state transforms.

The WorldStateStore service provides functionality to store, retrieve, and stream changes to world state transforms, which represent the pose of objects in different reference frames. This functionality can be used to create custom visualizations of the world state.

For more information, see WorldStateStore service.

API: Final

The API of the Resource

abstract list_uuids(*, extra: Mapping[str, Any] | None = None, timeout: float | None = None) List[bytes][source]
Async:

List all world state transform UUIDs.

worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")

uuids = await worldstatestore.list_uuids()
Returns:

A list of transform UUIDs

Return type:

List[bytes]

abstract get_transform(uuid: bytes, *, extra: Mapping[str, Any] | None = None, timeout: float | None = None) viam.proto.common.Transform[source]
Async:

Get a world state transform by UUID.

worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")

transform = await worldstatestore.get_transform(uuid=b"some-uuid")
Parameters:

uuid (bytes) – The UUID of the transform to retrieve

Returns:

The requested transform

Return type:

Transform

abstract stream_transform_changes(*, extra: Mapping[str, Any] | None = None, timeout: float | None = None) AsyncGenerator[viam.proto.service.worldstatestore.StreamTransformChangesResponse, None][source]
Async:

Stream changes to world state transforms.

worldstatestore = WorldStateStoreClient.from_robot(robot=machine, name="builtin")

async for change in worldstatestore.stream_transform_changes():
    print(f"Transform {change.transform.uuid} {change.change_type}")
Returns:

A stream of transform changes

Return type:

AsyncIterator[StreamTransformChangesResponse]

classmethod from_robot(robot: viam.robot.client.RobotClient, name: str) typing_extensions.Self

Get the service named name from the provided robot.

async def connect() -> RobotClient:
    # Replace "<API-KEY>" (including brackets) with your API key and "<API-KEY-ID>" with your API key ID
    options = RobotClient.Options.with_api_key("<API-KEY>", "<API-KEY-ID>")
    # Replace "<MACHINE-URL>" (included brackets) with your machine's connection URL or FQDN
    return await RobotClient.at_address("<MACHINE-URL>", options)

async def main():
    robot = await connect()

    # Can be used with any resource, using the motion service as an example
    motion = MotionClient.from_robot(robot=machine, name="builtin")

    robot.close()
Parameters:
  • robot (RobotClient) – The robot

  • name (str) – The name of the service

Returns:

The service, if it exists on the robot

Return type:

Self

abstract do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: float | None = None, **kwargs) Mapping[str, viam.utils.ValueTypes]
Async:

Send/receive arbitrary commands.

service = SERVICE.from_robot(robot=machine, "builtin")  # replace SERVICE with the appropriate class

my_command = {
  "cmnd": "dosomething",
  "someparameter": 52
}

# Can be used with any resource, using the motion service as an example
await service.do_command(command=my_command)
Parameters:

command (Dict[str, ValueTypes]) – The command to execute

Returns:

Result of the executed command

Return type:

Dict[str, ValueTypes]

classmethod get_resource_name(name: str) viam.proto.common.ResourceName

Get the ResourceName for this Resource with the given name

# Can be used with any resource, using an arm as an example
my_arm_name = Arm.get_resource_name("my_arm")
Parameters:

name (str) – The name of the Resource

Returns:

The ResourceName of this Resource

Return type:

ResourceName

get_operation(kwargs: Mapping[str, Any]) viam.operations.Operation

Get the Operation associated with the currently running function.

When writing custom resources, you should get the Operation by calling this function and check to see if it’s cancelled. If the Operation is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ).

Parameters:

kwargs (Mapping[str, Any]) – The kwargs object containing the operation

Returns:

The operation associated with this function

Return type:

viam.operations.Operation

async close()

Safely shut down the resource and prevent further use.

Close must be idempotent. Later configuration may allow a resource to be “open” again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called.

await component.close()
class viam.services.worldstatestore.StreamTransformChangesResponse(*, change_type: global___TransformChangeType = ..., transform: viam.gen.common.v1.common_pb2.Transform | None = ..., updated_fields: google.protobuf.field_mask_pb2.FieldMask | None = ...)

Bases: google.protobuf.message.Message

Abstract base class for protocol messages.

Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below.

change_type: global___TransformChangeType
property transform: viam.gen.common.v1.common_pb2.Transform
property updated_fields: google.protobuf.field_mask_pb2.FieldMask

The field mask of the transform that has changed, if any. For added transforms, this will be empty. For updated transforms, this will be the fields that have changed. For removed transforms, this will be the transform’s UUID path.

HasField(field_name: Literal['transform', b'transform', 'updated_fields', b'updated_fields']) bool

Checks if a certain field is set for the message.

For a oneof group, checks if any field inside is set. Note that if the field_name is not defined in the message descriptor, ValueError will be raised.

Parameters:

field_name (str) – The name of the field to check for presence.

Returns:

Whether a value has been set for the named field.

Return type:

bool

Raises:

ValueError – if the field_name is not a member of this message.

class viam.services.worldstatestore.TransformChangeType

Bases: _TransformChangeType