viam.robot.client

Module Contents

Classes

RobotClient

gRPC client for a Robot. This class should be used for all interactions with a robot.

Attributes

LOGGER

viam.robot.client.LOGGER
class viam.robot.client.RobotClient[source]

gRPC client for a Robot. This class should be used for all interactions with a robot.

There are 2 ways to instantiate a robot client:

RobotClient.at_address(...)
RobotClient.with_channel(...)

You can use the client standalone or within a context:

robot = await RobotClient.at_address(...)
async with await RobotClient.with_channel(...) as robot: ...

You must close() the robot to release resources.

Note: Robots used within a context are automatically closed UNLESS created with a channel. Robots created using with_channel are not automatically closed.

Establish a Connection:

import asyncio

from viam.rpc.dial import DialOptions, Credentials
from viam.robot.client import RobotClient


async def connect():
    opts = RobotClient.Options.with_api_key(
        # Replace "<API-KEY>" (including brackets) with your machine's API key
        api_key='<API-KEY>',
        # Replace "<API-KEY-ID>" (including brackets) with your machine's API key ID
        api_key_id='<API-KEY-ID>'
    )
    return await RobotClient.at_address('<ADDRESS-FROM-THE-VIAM-APP>', opts)


async def main():
    # Make a RobotClient
    robot = await connect()
    print('Resources:')
    print(robot.resource_names)
    await robot.close()

if __name__ == '__main__':
    asyncio.run(main())
class Options[source]
refresh_interval: int = 0

How often to refresh the status/parts of the robot in seconds. If not set, the robot will not be refreshed automatically

dial_options: viam.rpc.dial.DialOptions | None

Options used to connect clients to gRPC servers

log_level: int

The log level to output

check_connection_interval: int = 10

The frequency (in seconds) at which to check if the robot is still connected. 0 (zero) signifies no connection checks

attempt_reconnect_interval: int = 1

The frequency (in seconds) at which to attempt to reconnect a disconnected robot. 0 (zero) signifies no reconnection attempts

disable_sessions: bool = False

Whether sessions are disabled

classmethod with_api_key(api_key: str, api_key_id: str, **kwargs) typing_extensions.Self[source]

Create RobotClient.Options with an API key for credentials and default values for other arguments.

# Replace "<API-KEY>" (including brackets) with your machine's API key
api_key = '<API-KEY>'
# Replace "<API-KEY-ID>" (including brackets) with your machine's API key ID
api_key_id = '<API-KEY-ID>'

opts = RobotClient.Options.with_api_key(api_key, api_key_id)

robot = await RobotClient.at_address('<ADDRESS-FROM-THE-VIAM-APP>', opts)
Parameters:
  • api_key (str) – your API key

  • api_key_id (str) – your API key ID. Must be a valid UUID

Raises:

ValueError – Raised if the api_key_id is not a valid UUID

Returns:

the RobotClient.Options

Return type:

Self

property resource_names: List[viam.proto.common.ResourceName]

Get a list of all resource names

resource_names = robot.resource_names
Returns:

The list of resource names

Return type:

List[viam.proto.common.ResourceName]

async classmethod at_address(address: str, options: Options) typing_extensions.Self[source]

Create a robot client that is connected to the robot at the provided address.

async def connect():

    opts = RobotClient.Options.with_api_key(
        # Replace "<API-KEY>" (including brackets) with your machine's API key
        api_key='<API-KEY>',
        # Replace "<API-KEY-ID>" (including brackets) with your machine's API key ID
        api_key_id='<API-KEY-ID>'
    )
    return await RobotClient.at_address('ADDRESS FROM THE VIAM APP', opts)


async def main():
    # Make a RobotClient
    robot = await connect()
Parameters:
  • address (str) – Address of the robot (IP address, URL, etc.)

  • options (Options) – Options for connecting and refreshing

Returns:

the RobotClient

Return type:

Self

async classmethod with_channel(channel: grpclib.client.Channel | viam.rpc.dial.ViamChannel, options: Options) typing_extensions.Self[source]

Create a robot that is connected to a robot over the given channel.

Any robots created using this method will NOT automatically close the channel upon exit.

from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions, dial


async def connect_with_channel() -> RobotClient:
    async with await dial('ADDRESS', DialOptions()) as channel:
        return await RobotClient.with_channel(channel, RobotClient.Options())

robot = await connect_with_channel()
Parameters:
  • channel (ViamChannel) – The channel that is connected to a robot, obtained by viam.rpc.dial

  • options (Options) – Options for refreshing. Any connection options will be ignored.

Returns:

the RobotClient

Return type:

Self

async refresh()[source]

Manually refresh the underlying parts of this robot

await robot.refresh()
get_component(name: viam.proto.common.ResourceName) viam.components.component_base.ComponentBase[source]

Get a component using its ResourceName.

This function should not be called directly except in specific cases. The method Component.from_robot(...) is the preferred method for obtaining components.

arm = Arm.from_robot(robot=robot, name="my_arm")

Because this function returns a generic ComponentBase rather than the specific component type, it will be necessary to cast the returned component to the desired component. This can be done using a few different methods:

  • Assertion:

    arm = robot.get_component(Arm.get_resource_name("my_arm"))
    assert isinstance(arm, Arm)
    end_pos = await arm.get_end_position()
    
  • Explicit cast:

    from typing import cast
    arm = robot.get_component(Arm.get_resource_name("my_arm"))
    arm = cast(Arm, arm)
    end_pos = await arm.get_end_position()
    
  • Declare type on variable assignment.

    • Note: If using an IDE, a type error may be shown which can be ignored.

    arm: Arm = robot.get_component(Arm.get_resource_name("my_arm"))  # type: ignore
    end_pos = await arm.get_end_position()
    
Parameters:

name (viam.proto.common.ResourceName) – The component’s ResourceName

Raises:
  • ValueError – Raised if the requested resource is not a component

  • ComponentNotFoundError – Error if component with the given type and name does not exist in the registry

Returns:

The component

Return type:

ComponentBase

get_service(name: viam.proto.common.ResourceName) viam.services.service_base.ServiceBase[source]

Get a service using its ResourceName

This function should not be called directly except in specific cases. The method Service.from_robot(...) is the preferred method for obtaining services.

service = MyService.from_robot(robot=robot, name="my_service")

Because this function returns a generic ServiceBase rather than a specific service type, it will be necessary to cast the returned service to the desired service. This can be done using a few methods:

  • Assertion:

    service = robot.get_service(MyService.get_resource_name("my_service"))
    assert isinstance(service, MyService)
    
  • Explicit cast:

    from typing import cast
    service = robot.get_service(MyService.get_resource_name("my_service"))
    service = cast(MyService, my_service)
    
  • Declare type on variable assignment

    • Note: If using an IDE, a type error may be shown which can be ignored.

    service: MyService = robot.get_service(MyService.get_resource_name("my_service"))  # type: ignore
    
Parameters:

name (viam.proto.common.ResourceName) – The service’s ResourceName

Raises:
  • ValueError – Raised if the requested resource is not a component

  • ComponentNotFoundError – Error if component with the given type and name does not exist in the registry

Returns:

The service

Return type:

ServiceBase

async close()[source]

Cleanly close the underlying connections and stop any periodic tasks.

await robot.close()
async __aenter__()[source]
async __aexit__(exc_type, exc_value, traceback)[source]
async get_status(components: List[viam.proto.common.ResourceName] | None = None)[source]

Get the status of the robot’s components. You can optionally provide a list of ResourceName for which you want statuses.

# Get the status of the resources on the machine.
statuses = await robot.get_status()
Parameters:

components (Optional[List[viam.proto.common.ResourceName]]) – Optional list of ResourceName for components you want statuses.

async get_operations() List[viam.proto.robot.Operation][source]

Get the list of operations currently running on the robot.

operations = await robot.get_operations()
Returns:

The list of operations currently running on a given robot.

Return type:

List[viam.proto.robot.Operation]

async cancel_operation(id: str)[source]

Cancels the specified operation on the robot.

await robot.cancel_operation("INSERT OPERATION ID")
Parameters:

id (str) – ID of operation to cancel.

async block_for_operation(id: str)[source]

Blocks on the specified operation on the robot. This function will only return when the specific operation has finished or has been cancelled.

await robot.block_for_operation("INSERT OPERATION ID")
Parameters:

id (str) – ID of operation to block on.

async get_frame_system_config(additional_transforms: List[viam.proto.common.Transform] | None = None) List[viam.proto.robot.FrameSystemConfig][source]

Get the configuration of the frame system of a given robot.

# Get a list of each of the reference frames configured on the machine.
frame_system = await robot.get_frame_system_config()
print(f"frame system configuration: {frame_system}")
Returns:

The configuration of a given robot’s frame system.

Return type:

List[viam.proto.robot.FrameSystemConfig]

async transform_pose(query: viam.proto.common.PoseInFrame, destination: str, additional_transforms: List[viam.proto.common.Transform] | None = None) viam.proto.common.PoseInFrame[source]

Transform a given source Pose from the reference frame to a new specified destination which is a reference frame.

pose = await robot.transform_pose(PoseInFrame(), "origin")
Parameters:
  • query (viam.proto.common.PoseInFrame) – The pose that should be transformed.

  • destination (str) – The name of the reference frame to transform the given pose to.

Returns:

The pose and the reference frame for the new destination.

Return type:

PoseInFrame

abstract async transform_point_cloud()[source]
async discover_components(queries: List[viam.proto.robot.DiscoveryQuery]) List[viam.proto.robot.Discovery][source]

Get the list of discovered component configurations.

# Define a new discovery query.
q = robot.DiscoveryQuery(subtype=acme.API, model="some model")

# Define a list of discovery queries.
qs = [q]

# Get component configurations with these queries.
component_configs = await robot.discover_components(qs)
Parameters:

queries (List[viam.proto.robot.DiscoveryQuery]) – The list of component models to lookup configurations for.

Returns:

A list of discovered component configurations.

Return type:

List[Discovery]

async stop_all(extra: Dict[viam.proto.common.ResourceName, Dict[str, Any]] = {})[source]

Cancel all current and outstanding operations for the robot and stop all actuators and movement.

# Cancel all current and outstanding operations for the robot and stop all actuators and movement.
await robot.stop_all()
await robot.stop_all()
Parameters:

extra (Dict[viam.proto.common.ResourceName, Dict[str, Any]]) – Any extra parameters to pass to the resources’ stop methods, keyed on the resource’s ResourceName

async log(name: str, level: str, time: datetime.datetime, log: str, stack: str)[source]

Send log from Python module over gRPC.

Create a LogEntry object from the log to send to RDK.

Parameters:
  • name (str) – The logger’s name.

  • level (str) – The level of the log.

  • time (datetime) – The log creation time.

  • log (str) – The log message.

  • stack (str) – The stack information of the log.

async get_cloud_metadata() viam.proto.robot.GetCloudMetadataResponse[source]

Get app-related information about the robot.

Returns:

App-related metadata.

Return type:

viam.proto.robot.GetCloudMetadataResponse