viam.robot.client
Attributes
Classes
gRPC client for a machine. This class should be used for all interactions with a machine. |
Module Contents
- viam.robot.client.LOGGER
- class viam.robot.client.RobotClient[source]
gRPC client for a machine. This class should be used for all interactions with a machine.
There are 2 ways to instantiate a robot client:
RobotClient.at_address(...) RobotClient.with_channel(...)
You can use the client standalone or within a context:
machine = await RobotClient.at_address(...) async with await RobotClient.with_channel(...) as machine: ...
You must
close()
the machine to release resources.Note: Machines used within a context are automatically closed UNLESS created with a channel. Machines created using
with_channel
are not automatically closed.Establish a Connection:
import asyncio from viam.rpc.dial import DialOptions, Credentials from viam.robot.client import RobotClient async def connect(): opts = RobotClient.Options.with_api_key( # Replace "<API-KEY>" (including brackets) with your machine's API key api_key='<API-KEY>', # Replace "<API-KEY-ID>" (including brackets) with your machine's API key ID api_key_id='<API-KEY-ID>' ) return await RobotClient.at_address('<ADDRESS-FROM-THE-VIAM-APP>', opts) async def main(): # Make a RobotClient machine = await connect() print('Resources:') print(machine.resource_names) await machine.close() if __name__ == '__main__': asyncio.run(main())
For more information, see Machine Management API.
- class Options[source]
- refresh_interval: int = 0
How often to refresh the status of the parts of the machine in seconds. If not set, the machine will not be refreshed automatically
- dial_options: viam.rpc.dial.DialOptions | None = None
Options used to connect clients to gRPC servers
- log_level: int
The log level to output
- check_connection_interval: int = 10
The frequency (in seconds) at which to check if the machine is still connected. 0 (zero) signifies no connection checks
- attempt_reconnect_interval: int = 1
The frequency (in seconds) at which to attempt to reconnect a disconnected machine. 0 (zero) signifies no reconnection attempts
- disable_sessions: bool = False
Whether sessions are disabled
- classmethod with_api_key(api_key: str, api_key_id: str, **kwargs) typing_extensions.Self [source]
Create RobotClient.Options with an API key for credentials and default values for other arguments.
# Replace "<API-KEY>" (including brackets) with your machine's API key api_key = '<API-KEY>' # Replace "<API-KEY-ID>" (including brackets) with your machine's API key ID api_key_id = '<API-KEY-ID>' opts = RobotClient.Options.with_api_key(api_key, api_key_id) machine = await RobotClient.at_address('<ADDRESS-FROM-THE-VIAM-APP>', opts)
- Parameters:
api_key (str) – your API key
api_key_id (str) – your API key ID. Must be a valid UUID
- Raises:
ValueError – Raised if the api_key_id is not a valid UUID
- Returns:
the RobotClient.Options
- Return type:
Self
For more information, see Establish a connection.
- classmethod at_address(address: str, options: Options) typing_extensions.Self [source]
- Async:
Create a robot client that is connected to the machine at the provided address.
async def connect(): opts = RobotClient.Options.with_api_key( # Replace "<API-KEY>" (including brackets) with your machine's API key api_key='<API-KEY>', # Replace "<API-KEY-ID>" (including brackets) with your machine's API key ID api_key_id='<API-KEY-ID>' ) return await RobotClient.at_address('ADDRESS FROM THE VIAM APP', opts) async def main(): # Make a RobotClient machine = await connect()
- Parameters:
address (str) – Address of the machine (IP address, URL, etc.)
options (Options) – Options for connecting and refreshing
- Returns:
the RobotClient
- Return type:
Self
For more information, see Establish a connection.
- classmethod with_channel(channel: grpclib.client.Channel | viam.rpc.dial.ViamChannel, options: Options) typing_extensions.Self [source]
- Async:
Create a machine that is connected to a machine over the given channel.
Any machines created using this method will NOT automatically close the channel upon exit.
from viam.robot.client import RobotClient from viam.rpc.dial import DialOptions, dial async def connect_with_channel() -> RobotClient: async with await dial('ADDRESS', DialOptions()) as channel: return await RobotClient.with_channel(channel, RobotClient.Options()) machine = await connect_with_channel()
- Parameters:
channel (ViamChannel) – The channel that is connected to a machine, obtained by
viam.rpc.dial
options (Options) – Options for refreshing. Any connection options will be ignored.
- Returns:
the RobotClient
- Return type:
Self
For more information, see Establish a connection.
- async refresh()[source]
Manually refresh the underlying parts of this machine.
await machine.refresh()
For more information, see Machine Management API.
- get_component(name: viam.proto.common.ResourceName) viam.components.component_base.ComponentBase [source]
Get a component using its ResourceName.
This function should not be called directly except in specific cases. The method
Component.from_robot(...)
is the preferred method for obtaining components.arm = Arm.from_robot(robot=machine, name="my_arm")
Because this function returns a generic
ComponentBase
rather than the specific component type, it will be necessary to cast the returned component to the desired component. This can be done using a few different methods:Assertion:
arm = machine.get_component(Arm.get_resource_name("my_arm")) assert isinstance(arm, Arm) end_pos = await arm.get_end_position()
Explicit cast:
from typing import cast arm = machine.get_component(Arm.get_resource_name("my_arm")) arm = cast(Arm, arm) end_pos = await arm.get_end_position()
Declare type on variable assignment.
Note: If using an IDE, a type error may be shown which can be ignored.
arm: Arm = machine.get_component(Arm.get_resource_name("my_arm")) # type: ignore end_pos = await arm.get_end_position()
- Parameters:
name (viam.proto.common.ResourceName) – The component’s ResourceName
- Raises:
ValueError – Raised if the requested resource is not a component
ComponentNotFoundError – Error if component with the given type and name does not exist in the registry
- Returns:
The component
- Return type:
For more information, see Machine Management API.
- get_service(name: viam.proto.common.ResourceName) viam.services.service_base.ServiceBase [source]
Get a service using its ResourceName
This function should not be called directly except in specific cases. The method
Service.from_robot(...)
is the preferred method for obtaining services.service = MyService.from_robot(robot=machine, name="my_service")
Because this function returns a generic
ServiceBase
rather than a specific service type, it will be necessary to cast the returned service to the desired service. This can be done using a few methods:Assertion:
service = machine.get_service(MyService.get_resource_name("my_service")) assert isinstance(service, MyService)
Explicit cast:
from typing import cast service = machine.get_service(MyService.get_resource_name("my_service")) service = cast(MyService, my_service)
Declare type on variable assignment
Note: If using an IDE, a type error may be shown which can be ignored.
service: MyService = machine.get_service(MyService.get_resource_name("my_service")) # type: ignore
- Parameters:
name (viam.proto.common.ResourceName) – The service’s ResourceName
- Raises:
ValueError – Raised if the requested resource is not a component
ComponentNotFoundError – Error if component with the given type and name does not exist in the registry
- Returns:
The service
- Return type:
For more information, see Machine Management API.
- property resource_names: List[viam.proto.common.ResourceName]
Get a list of all resource names
resource_names = machine.resource_names
- Returns:
The list of resource names
- Return type:
For more information, see Machine Management API.
- async close()[source]
Cleanly close the underlying connections and stop any periodic tasks.
await machine.close()
For more information, see Machine Management API.
- async get_operations() List[viam.proto.robot.Operation] [source]
Get the list of operations currently running on the machine.
operations = await machine.get_operations()
- Returns:
The list of operations currently running on a given machine.
- Return type:
For more information, see Machine Management API.
- async cancel_operation(id: str)[source]
Cancels the specified operation on the machine.
await machine.cancel_operation("INSERT OPERATION ID")
- Parameters:
id (str) – ID of operation to cancel.
For more information, see Machine Management API.
- async block_for_operation(id: str)[source]
Blocks on the specified operation on the machine. This function will only return when the specific operation has finished or has been cancelled.
await machine.block_for_operation("INSERT OPERATION ID")
- Parameters:
id (str) – ID of operation to block on.
For more information, see Machine Management API.
- async get_frame_system_config(additional_transforms: List[viam.proto.common.Transform] | None = None) List[viam.proto.robot.FrameSystemConfig] [source]
Get the configuration of the frame system of a given machine.
# Get a list of each of the reference frames configured on the machine. frame_system = await machine.get_frame_system_config() print(f"frame system configuration: {frame_system}")
- Returns:
The configuration of a given machine’s frame system.
- Return type:
For more information, see Machine Management API.
- async transform_pose(query: viam.proto.common.PoseInFrame, destination: str, additional_transforms: List[viam.proto.common.Transform] | None = None) viam.proto.common.PoseInFrame [source]
Transform a given source Pose from the reference frame to a new specified destination which is a reference frame.
from viam.proto.common import Pose, PoseInFrame pose = Pose( x=1.0, # X coordinate in mm y=2.0, # Y coordinate in mm z=3.0, # Z coordinate in mm o_x=0.0, # X component of orientation vector o_y=0.0, # Y component of orientation vector o_z=0.0, # Z component of orientation vector theta=0.0 # Orientation angle in degrees ) pose_in_frame = PoseInFrame( reference_frame="world", pose=pose ) transformed_pose = await machine.transform_pose(pose_in_frame, "world")
- Parameters:
query (viam.proto.common.PoseInFrame) – The pose that should be transformed.
destination (str) – The name of the reference frame to transform the given pose to.
- Returns:
The pose and the reference frame for the new destination.
- Return type:
For more information, see Machine Management API.
- async discover_components(queries: List[viam.proto.robot.DiscoveryQuery]) List[viam.proto.robot.Discovery] [source]
Get a list of discovered potential component configurations, for example listing different supported resolutions. Currently only works for some cameras. Returns module names for modules.
from viam.proto.robot import DiscoveryQuery # Define a new discovery query. q = DiscoveryQuery(subtype="camera", model="webcam") # Define a list of discovery queries. qs = [q] # Get component configurations with these queries. component_configs = await machine.discover_components(qs)
- Parameters:
queries (List[viam.proto.robot.DiscoveryQuery]) – The list of component models to lookup potential configurations for.
- Returns:
A list of discovered potential component configurations.
- Return type:
List[Discovery]
For more information, see Machine Management API.
- async stop_all(extra: Dict[viam.proto.common.ResourceName, Dict[str, Any]] = {})[source]
Cancel all current and outstanding operations for the machine and stop all actuators and movement.
# Cancel all current and outstanding operations for the machine and stop all actuators and movement. await machine.stop_all()
await machine.stop_all()
- Parameters:
extra (Dict[viam.proto.common.ResourceName, Dict[str, Any]]) – Any extra parameters to pass to the resources’
stop
methods, keyed on the resource’sResourceName
.
For more information, see Machine Management API.
- async log(name: str, level: str, time: datetime.datetime, message: str, stack: str)[source]
Send log from Python module over gRPC.
Create a LogEntry object from the log to send to RDK.
- Parameters:
name (str) – The logger’s name.
level (str) – The level of the log.
time (datetime) – The log creation time.
message (str) – The log message.
stack (str) – The stack information of the log.
For more information, see Machine Management API.
- async get_cloud_metadata() viam.proto.robot.GetCloudMetadataResponse [source]
Get app-related information about the machine.
metadata = await machine.get_cloud_metadata() print(metadata.machine_id) print(metadata.machine_part_id) print(metadata.primary_org_id) print(metadata.location_id)
- Returns:
App-related metadata.
- Return type:
For more information, see Machine Management API.
- async shutdown()[source]
Shutdown shuts down the machine.
await machine.shutdown()
- Raises:
GRPCError – Raised with DeadlineExceeded status if shutdown request times out, or if the machine server shuts down before having a chance to send a response. Raised with status Unavailable if server is unavailable, or if machine server is in the process of shutting down when response is ready.
For more information, see Machine Management API.
- async get_version() viam.proto.robot.GetVersionResponse [source]
Get version information about the machine.
result = await machine.get_version() print(result.platform) print(result.version) print(result.api_version)
- Returns:
Machine version related information.
- Return type:
For more information, see Machine Management API.
- async get_machine_status() viam.proto.robot.GetMachineStatusResponse [source]
Get status information about the machine’s resources and configuration.
machine_status = await machine.get_machine_status() resource_statuses = machine_status.resources config_status = machine_status.config
- Returns:
current status of the resources (List[ResourceStatus]) and config of the machine.
- Return type:
For more information, see Machine Management API.