Example usage
The Viam SDK can be used in two ways:
As a client to connect to a (remote or local) robot
To create custom components and provide additional functionality to a robot
Connect as a client
To connect to a robot as a client, you should instantiate an instance of a RobotClient
from viam import logging
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions
async def connect() -> RobotClient:
options = RobotClient.Options(
dial_options=DialOptions(insecure=True, disable_webrtc=True),
log_level=logging.FATAL
)
return await RobotClient.at_address('localhost:9091', options)
You can also create a RobotClient
by providing an existing connection
from viam import logging
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions, dial
async def connect_with_channel() -> RobotClient:
async with await dial('localhost:9091', DialOptions(insecure=True, disable_webrtc=True)) as channel:
return await RobotClient.with_channel(channel, RobotClient.Options(refresh_interval=10, log_level=logging.FATAL))
robot = await connect_with_channel()
print(robot.resource_names)
await robot.close()
Show code cell output
[namespace: "rdk"
type: "component"
subtype: "arm"
name: "arm0"
, namespace: "rdk"
type: "component"
subtype: "audio_input"
name: "audio_input0"
, namespace: "rdk"
type: "component"
subtype: "base"
name: "base0"
, namespace: "rdk"
type: "component"
subtype: "board"
name: "board"
, namespace: "rdk"
type: "component"
subtype: "camera"
name: "camera0"
, namespace: "rdk"
type: "component"
subtype: "encoder"
name: "encoder0"
, namespace: "rdk"
type: "component"
subtype: "gantry"
name: "gantry0"
, namespace: "rdk"
type: "component"
subtype: "gripper"
name: "gripper0"
, namespace: "rdk"
type: "service"
subtype: "mlmodel"
name: "mlmodel0"
, namespace: "rdk"
type: "component"
subtype: "motor"
name: "motor0"
, namespace: "rdk"
type: "component"
subtype: "movement_sensor"
name: "movement_sensor0"
, namespace: "rdk"
type: "component"
subtype: "sensor"
name: "movement_sensor0"
, namespace: "rdk"
type: "component"
subtype: "movement_sensor"
name: "movement_sensor0"
, namespace: "rdk"
type: "component"
subtype: "sensor"
name: "movement_sensor0"
, namespace: "rdk"
type: "component"
subtype: "pose_tracker"
name: "pose_tracker0"
, namespace: "rdk"
type: "component"
subtype: "sensor"
name: "sensor0"
, namespace: "rdk"
type: "component"
subtype: "servo"
name: "servo0"
, namespace: "rdk"
type: "service"
subtype: "slam"
name: "slam0"
]
Once you have a connected RobotClient
, you can then obtain the robot’s components by their name
from viam.components.camera import Camera
from viam.media.video import CameraMimeType
robot = await connect_with_channel()
camera = Camera.from_robot(robot, "camera0")
image = await camera.get_image(CameraMimeType.JPEG)
image.save("foo.png")
# Don't forget to close the robot when you're done!
await robot.close()
Show code cell outputs

You can also use the RobotClient
to make service calls to the connected robot.
from viam.services.vision import VisionClient
async def vision():
robot = await connect_with_channel()
vision = VisionClient.from_robot(robot)
detections = await vision.get_detections_from_camera("camera_1", "detector_1")
At the end, don’t forget to close the connection
async def cleanup():
await robot.close()
Create custom components
While the main RDK is written in golang, you can create custom components in python and connect them to a robot as a remote
component. This allows you to extend the functionality of a robot, or even create an entire robot exclusively in python.
The steps required in creating a custom component and connecting it to the RDK are
Subclass a component and implement desired functions. This will be your custom component.
For functions you do not wish to implement, you must at least define them by putting
pass
orraiseNotImplementedError()
in the function.
Create an
rpc.server.Server
instance and register the custom component. This is theRPC
server that enables communication with your custom component.Start the
Server
and register the running server as a remote. Registering thisRPC
server as a remote allows the RDK to communicate with the server.
But before we start, we need to cover one additional topic: Operations.
Operations
Operations are used by the RDK to keep track of running tasks, and provide a means to query their status and even cancel them. In order for our custom component to respond appropriately to any Operation requests, we must obtain and listen to the Operation in our method calls.
In order to take advantage of operations, you should wrap the component method with the run_with_operation
decorator from the viam.operations
package. Each component has a function, get_operation(kwargs: Mapping[str, Any]) -> Operation
, that will return an Operation that will tell us if the operation is ever cancelled, allowing us to clean up any long running tasks, close connections, etc.
It is extremely important that we check the Operation
status, as this not only prevents any unnecessary resource usage, but also allows us to respond to urgent cancellation requests and stop components’ motion.
Knowing this, let’s implement a custom component where Operations might come in handy.
1. Subclass a component
The SDK provides a wide array of components to customize. You can browse through the API Reference to see all of them, but for now we’ll use an Arm
as an example. Our custom Arm will be extremely simple – it will only save and return the positions provided to it.
Let’s start by creating a directory called my-python-robot
. Inside of that directory, create a file called my_cool_arm.py
. The contents of my_cool_arm.py
should be as follows:
# my-python-robot/my_cool_arm.py
import asyncio
from typing import Any, Dict, Optional
from viam.components.arm import Arm, JointPositions, Pose
from viam.operations import run_with_operation
class MyCoolArm(Arm):
# Subclass the Viam Arm component and implement the required functions
def __init__(self, name: str):
# Starting position
self.position = Pose(
x=0,
y=0,
z=0,
o_x=0,
o_y=0,
o_z=0,
theta=0,
)
# Starting joint positions
self.joint_positions = JointPositions(values=[0, 0, 0, 0, 0, 0])
self.is_stopped = True
super().__init__(name)
async def get_end_position(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Pose:
return self.position
@run_with_operation
async def move_to_position(
self,
pose: Pose,
extra: Optional[Dict[str, Any]] = None,
**kwargs,
):
operation = self.get_operation(kwargs)
self.is_stopped = False
self.position = pose
# Simulate the length of time it takes for the arm to move to its new position
for x in range(10):
await asyncio.sleep(1)
# Check if the operation is cancelled and, if it is, stop the arm's motion
if await operation.is_cancelled():
await self.stop()
break
self.is_stopped = True
async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> JointPositions:
return self.joint_positions
@run_with_operation
async def move_to_joint_positions(self, positions: JointPositions, extra: Optional[Dict[str, Any]] = None, **kwargs):
operation = self.get_operation(kwargs)
self.is_stopped = False
self.joint_positions = positions
# Simulate the length of time it takes for the arm to move to its new joint position
for x in range(10):
await asyncio.sleep(1)
# Check if the operation is cancelled and, if it is, stop the arm's motion
if await operation.is_cancelled():
await self.stop()
break
self.is_stopped = True
async def stop(self, extra: Optional[Dict[str, Any]] = None, **kwargs):
self.is_stopped = True
async def is_moving(self) -> bool:
return not self.is_stopped
You can view more component implementations in the examples.
2. Register the custom component
Now that we’ve created the custom component, we must register it so that it will be visible to any robots connecting to it. This is done by creating an RPC
server and adding MyCoolArm
as a connected component. This RPC
server will receive gRPC requests from the Viam Server or any other connections and forward those requests to your custom component.
In the same my-python-robot
directory, create a new file called main.py
.
# my-python-robot/main.py
import asyncio
from viam.rpc.server import Server
from my_cool_arm import MyCoolArm
async def main():
srv = Server([MyCoolArm('my-arm')])
await srv.serve()
if __name__ == '__main__':
try:
asyncio.run(main())
except:
pass
3. Start the Server and add it as a Remote
Now that we have a server that knows about our custom Arm component, we need to start the server so it can receive gRPC calls. Once it’s started, we can then register this server as a remote.
# my-python-robot/main.py
async def main():
...
await srv.serve()
NB: When you call srv.serve()
, the default host and port is localhost:9090
. This can be changed by passing in a host
and/or port
parameter to the serve
function.
To use this custom server as part of a larger robot, you’ll want to add it as a remote
in the config for your main part.
[
{
"name": "my-cool-python-components", // The name of the remote, can be anything
"insecure": true, // Whether this connection should use SSL
"address": "localhost:9090" // The location of the remote robot
}
]
And to ensure that the python server starts up with the rest of the robot, you can add it as a process.
[
{
"id": "my-python-server-process",
"log": true,
"name": "python",
"args": [
"/home/pi/my-python-robot/main.py"
]
}
]
NB: The viam-server
starts as a root process, so you may need to switch users to run the python SDK server.
[
{
"id": "0",
"log": true,
"name": "sudo",
"args": [
"-u",
"pi",
"python",
"/home/pi/my-python-robot/main.py"
]
}
]