Example usage

The Viam SDK can be used in two ways:

  1. As a client to connect to a (remote or local) robot

  2. To create custom components and provide additional functionality to a robot

Connect as a client

To connect to a robot as a client, you should instantiate an instance of a RobotClient

from viam import logging
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions

async def connect() -> RobotClient:
    options = RobotClient.Options(
        dial_options=DialOptions(insecure=True, disable_webrtc=True),
        log_level=logging.FATAL
    )
    return await RobotClient.at_address('localhost:9091', options)

You can also create a RobotClient by providing an existing connection

from viam import logging
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions, dial

async def connect_with_channel() -> RobotClient:
    async with await dial('localhost:9091', DialOptions(insecure=True, disable_webrtc=True)) as channel:
        return await RobotClient.with_channel(channel, RobotClient.Options(refresh_interval=10, log_level=logging.FATAL))

robot = await connect_with_channel()
print(robot.resource_names)
await robot.close()
Hide code cell output
[namespace: "rdk"
type: "component"
subtype: "arm"
name: "arm0"
, namespace: "rdk"
type: "component"
subtype: "audio_input"
name: "audio_input0"
, namespace: "rdk"
type: "component"
subtype: "base"
name: "base0"
, namespace: "rdk"
type: "component"
subtype: "board"
name: "board"
, namespace: "rdk"
type: "component"
subtype: "camera"
name: "camera0"
, namespace: "rdk"
type: "component"
subtype: "encoder"
name: "encoder0"
, namespace: "rdk"
type: "component"
subtype: "gantry"
name: "gantry0"
, namespace: "rdk"
type: "component"
subtype: "gripper"
name: "gripper0"
, namespace: "rdk"
type: "service"
subtype: "mlmodel"
name: "mlmodel0"
, namespace: "rdk"
type: "component"
subtype: "motor"
name: "motor0"
, namespace: "rdk"
type: "component"
subtype: "movement_sensor"
name: "movement_sensor0"
, namespace: "rdk"
type: "component"
subtype: "sensor"
name: "movement_sensor0"
, namespace: "rdk"
type: "component"
subtype: "movement_sensor"
name: "movement_sensor0"
, namespace: "rdk"
type: "component"
subtype: "sensor"
name: "movement_sensor0"
, namespace: "rdk"
type: "component"
subtype: "pose_tracker"
name: "pose_tracker0"
, namespace: "rdk"
type: "component"
subtype: "sensor"
name: "sensor0"
, namespace: "rdk"
type: "component"
subtype: "servo"
name: "servo0"
, namespace: "rdk"
type: "service"
subtype: "slam"
name: "slam0"
]

Once you have a connected RobotClient, you can then obtain the robot’s components by their name

from viam.components.camera import Camera
from viam.media.video import CameraMimeType

robot = await connect_with_channel()
camera = Camera.from_robot(robot, "camera0")
image = await camera.get_image(CameraMimeType.JPEG)
image.save("foo.png")

# Don't forget to close the robot when you're done!
await robot.close()
Hide code cell outputs
../_images/b4d9e23f9d6298e1c49a021d9ead93131077094f8ead772f83d5b09fa5eebfe5.png

You can also use the RobotClient to make service calls to the connected robot.

from viam.services.vision import VisionClient

async def vision():
    robot = await connect_with_channel()
    vision = VisionClient.from_robot(robot)
    detections = await vision.get_detections_from_camera("camera_1", "detector_1")

At the end, don’t forget to close the connection

async def cleanup():
    await robot.close()

Create custom components

While the main RDK is written in golang, you can create custom components in python and connect them to a robot as a remote component. This allows you to extend the functionality of a robot, or even create an entire robot exclusively in python.

The steps required in creating a custom component and connecting it to the RDK are

  1. Subclass a component and implement desired functions. This will be your custom component.

    • For functions you do not wish to implement, you must at least define them by putting pass or raiseNotImplementedError() in the function.

  2. Create an rpc.server.Server instance and register the custom component. This is the RPC server that enables communication with your custom component.

  3. Start the Server and register the running server as a remote. Registering this RPC server as a remote allows the RDK to communicate with the server.

But before we start, we need to cover one additional topic: Operations.

Operations

Operations are used by the RDK to keep track of running tasks, and provide a means to query their status and even cancel them. In order for our custom component to respond appropriately to any Operation requests, we must obtain and listen to the Operation in our method calls.

In order to take advantage of operations, you should wrap the component method with the run_with_operation decorator from the viam.operations package. Each component has a function, get_operation(kwargs: Mapping[str, Any]) -> Operation, that will return an Operation that will tell us if the operation is ever cancelled, allowing us to clean up any long running tasks, close connections, etc.

It is extremely important that we check the Operation status, as this not only prevents any unnecessary resource usage, but also allows us to respond to urgent cancellation requests and stop components’ motion.

Knowing this, let’s implement a custom component where Operations might come in handy.

1. Subclass a component

The SDK provides a wide array of components to customize. You can browse through the API Reference to see all of them, but for now we’ll use an Arm as an example. Our custom Arm will be extremely simple – it will only save and return the positions provided to it.

Let’s start by creating a directory called my-python-robot. Inside of that directory, create a file called my_cool_arm.py. The contents of my_cool_arm.py should be as follows:

# my-python-robot/my_cool_arm.py

import asyncio
from typing import Any, Dict, Optional
from viam.components.arm import Arm, JointPositions, Pose
from viam.operations import run_with_operation


class MyCoolArm(Arm):
    # Subclass the Viam Arm component and implement the required functions

    def __init__(self, name: str):
        # Starting position
        self.position = Pose(
            x=0,
            y=0,
            z=0,
            o_x=0,
            o_y=0,
            o_z=0,
            theta=0,
        )

        # Starting joint positions
        self.joint_positions = JointPositions(values=[0, 0, 0, 0, 0, 0])
        self.is_stopped = True
        super().__init__(name)

    async def get_end_position(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Pose:
        return self.position

    @run_with_operation
    async def move_to_position(
        self,
        pose: Pose,
        extra: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        operation = self.get_operation(kwargs)

        self.is_stopped = False
        self.position = pose

        # Simulate the length of time it takes for the arm to move to its new position
        for x in range(10):
            await asyncio.sleep(1)

            # Check if the operation is cancelled and, if it is, stop the arm's motion
            if await operation.is_cancelled():
                await self.stop()
                break

        self.is_stopped = True

    async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> JointPositions:
        return self.joint_positions

    @run_with_operation
    async def move_to_joint_positions(self, positions: JointPositions, extra: Optional[Dict[str, Any]] = None, **kwargs):
        operation = self.get_operation(kwargs)

        self.is_stopped = False
        self.joint_positions = positions

        # Simulate the length of time it takes for the arm to move to its new joint position
        for x in range(10):
            await asyncio.sleep(1)

            # Check if the operation is cancelled and, if it is, stop the arm's motion
            if await operation.is_cancelled():
                await self.stop()
                break

        self.is_stopped = True

    async def stop(self, extra: Optional[Dict[str, Any]] = None, **kwargs):
        self.is_stopped = True

    async def is_moving(self) -> bool:
        return not self.is_stopped

You can view more component implementations in the examples.

2. Register the custom component

Now that we’ve created the custom component, we must register it so that it will be visible to any robots connecting to it. This is done by creating an RPC server and adding MyCoolArm as a connected component. This RPC server will receive gRPC requests from the Viam Server or any other connections and forward those requests to your custom component.

In the same my-python-robot directory, create a new file called main.py.

# my-python-robot/main.py

import asyncio
from viam.rpc.server import Server

from my_cool_arm import MyCoolArm

async def main():
    srv = Server([MyCoolArm('my-arm')])
    await srv.serve()

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except:
        pass

3. Start the Server and add it as a Remote

Now that we have a server that knows about our custom Arm component, we need to start the server so it can receive gRPC calls. Once it’s started, we can then register this server as a remote.

# my-python-robot/main.py

async def main():
  ...
  await srv.serve()

NB: When you call srv.serve(), the default host and port is localhost:9090. This can be changed by passing in a host and/or port parameter to the serve function.

To use this custom server as part of a larger robot, you’ll want to add it as a remote in the config for your main part.

[
    {
      "name": "my-cool-python-components", // The name of the remote, can be anything
      "insecure": true,                    // Whether this connection should use SSL
      "address": "localhost:9090"          // The location of the remote robot
    }
  ]

And to ensure that the python server starts up with the rest of the robot, you can add it as a process.

[
  {
    "id": "my-python-server-process",
    "log": true,
    "name": "python",
    "args": [
      "/home/pi/my-python-robot/main.py"
    ]
  }
]

NB: The viam-server starts as a root process, so you may need to switch users to run the python SDK server.

[
  {
    "id": "0",
    "log": true,
    "name": "sudo",
    "args": [
      "-u",
      "pi",
      "python",
      "/home/pi/my-python-robot/main.py"
    ]
  }
]