Example usage
The Viam SDK can be used in three ways:
As a client to connect to a (remote or local) robot
Integrate custom resources to a robot
As a client to connect to app.viam.com to upload and retrieve data
Connect as a client
To connect to a robot as a client, you should instantiate an instance of a RobotClient
from viam import logging
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions
async def connect() -> RobotClient:
options = RobotClient.Options(dial_options=DialOptions(insecure=True, disable_webrtc=True), log_level=logging.FATAL)
return await RobotClient.at_address("localhost:9091", options)
You can also create a RobotClient
by providing an existing connection
from viam import logging
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions, dial
async def connect_with_channel() -> RobotClient:
async with await dial("localhost:9091", DialOptions(insecure=True, disable_webrtc=True)) as channel:
return await RobotClient.with_channel(channel, RobotClient.Options(refresh_interval=10, log_level=logging.FATAL))
robot = await connect_with_channel()
print(robot.resource_names)
await robot.close()
Show code cell output
[<viam.proto.common.ResourceName rdk:component:pose_tracker/pose_tracker0 at 0x7efe25b70ef0>, <viam.proto.common.ResourceName rdk:component:audio_input/audio_input0 at 0x7efe25b70f90>, <viam.proto.common.ResourceName rdk:component:base/base0 at 0x7efe25b70e50>, <viam.proto.common.ResourceName rdk:component:servo/servo0 at 0x7efe25b70fe0>, <viam.proto.common.ResourceName rdk:component:encoder/encoder0 at 0x7efe25b71030>, <viam.proto.common.ResourceName rdk:service:slam/slam0 at 0x7efe25b71080>, <viam.proto.common.ResourceName rdk:component:gantry/gantry0 at 0x7efe25b710d0>, <viam.proto.common.ResourceName rdk:component:motor/motor0 at 0x7efe25b71120>, <viam.proto.common.ResourceName rdk:component:gripper/gripper0 at 0x7efe25b71170>, <viam.proto.common.ResourceName rdk:component:movement_sensor/movement_sensor0 at 0x7efe25b711c0>, <viam.proto.common.ResourceName rdk:service:mlmodel/mlmodel0 at 0x7efe25b71210>, <viam.proto.common.ResourceName rdk:component:camera/camera0 at 0x7efe25b71260>, <viam.proto.common.ResourceName rdk:component:sensor/sensor0 at 0x7efe25b712b0>, <viam.proto.common.ResourceName rdk:component:board/board at 0x7efe25b71300>, <viam.proto.common.ResourceName rdk:component:arm/arm0 at 0x7efe25b71350>]
Once you have a connected RobotClient
, you can then obtain the robot’s components by their name
from viam.components.camera import Camera
from viam.media.video import CameraMimeType
from viam.media.utils.pil import viam_to_pil_image
robot = await connect_with_channel()
camera = Camera.from_robot(robot, "camera0")
image = await camera.get_image(CameraMimeType.JPEG)
pil = viam_to_pil_image(image)
pil.save("foo.png")
# Don't forget to close the robot when you're done!
await robot.close()
Show code cell outputs
<viam.media.video.ViamImage at 0x7efe242d5f10>
You can also use the RobotClient
to make service calls to the connected robot.
from viam.services.vision import VisionClient
async def vision():
robot = await connect_with_channel()
vision = VisionClient.from_robot(robot)
detections = await vision.get_detections_from_camera("camera_1", "detector_1")
At the end, don’t forget to close the connection
async def cleanup():
await robot.close()
Create custom modules
Make a modular resource from an existing resource API. In this document, we will be going over subclassing existing resource APIs. To learn more about creating a new resource API, see the Viam docs, and complex module example.
For a fully fleshed-out example of a Python module that uses Github CI to upload to the Viam Registry, take a look at python-example-module. For an example that uses Docker to manage dependencies, take a look at python-container-module. For a list of example modules in different Viam SDKs, take a look here.
The code below resembles the simple module example, so look there for the final, completed modular resource.
The steps required in creating a modular resource and connecting it to a robot are:
Subclass a resource and implement desired functions. This will be your custom resource model.
For functions that you do not wish to implement, you must at least define them by putting
pass
orraise NotImplementedError()
in the function.
Register a new model. This will register the new modular resource model into the
Registry
.Start the module. Create and start the new module.
Make the module executable. This allows
viam-server
to access and execute the module.Configure a modular resource. Use the new module and instantiate a new resource to a robot!
Knowing this, let’s implement a custom resource.
1. Subclass a resource
The SDK provides a wide array of resource APIs to customize. You can browse through the API Reference to see the available resources. Subclass a resource and implement the required functions. You may leave the other methods unimplemented by putting pass
or raise NotImplementedError()
in the function.
This example uses a Sensor
as an example.
Create a sensor that returns the Wi-Fi signal of a Pi. Start by creating a directory on the Pi named wifi-sensor
, then create a directory inside the wifi-sensor
directory named src
. Then, inside the src
directory, create a new file named wifi_sensor_module.py
.
Copy the code below into your wifi_sensor_module.py
file:
# wifi-sensor/src/wifi_sensor_module.py
import asyncio
from typing import Any, Dict, Mapping, Optional
from viam.components.sensor import Sensor
from viam.utils import SensorReading
class MySensor(Sensor):
# Subclass the Viam Sensor component and implement the required functions
async def get_readings(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Mapping[str, SensorReading]:
with open("/proc/net/wireless") as wifi_stats:
content = wifi_stats.readlines()
wifi_signal = [x for x in content[2].split(" ") if x != ""]
return {"link": wifi_signal[2], "level": wifi_signal[3], "noise": wifi_signal[4]}
async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]:
raise NotImplementedError
async def close(self):
# This is a completely optional function to include. This will be called when the resource is removed from the config or the module
# is shutting down.
self.logger.info(f"{self.name} is closed.")
# Anything below this line is optional and will be replaced later, but may come in handy for debugging and testing.
# To use, call `python wifi_sensor_module.py` in the command line while in the `src` directory.
async def main():
wifi=MySensor(name="wifi")
signal = await wifi.get_readings()
print(signal)
if __name__ == '__main__':
asyncio.run(main())
You can view more component implementations in the examples.
2. Register the new modular resource
Now that we’ve created the modular resource model, we must register it to the registry.
Define a Model
name and a creator function in the resource. A creator function is a function that can create a resource given a config and map of dependencies.
To define a creator function and Model
in wifi_sensor_module.py
, import ComponentConfig
, ResourceName
, ResourceBase
, Model
, and ModelFamily
from the relative viam packages. Please note that the import packages also have to be changed to include Mapping
and ClassVar
from typing
and Self
from typing_extensions
. The changes can be seen in the example below.
In your wifi_sensor_module.py
, define a new MODEL
variable in your MySensor
class, shown below. Then add a new()
function that will act as our creator function. We get the name from the config and return the resource. We can also assign class variables based on what the config has.
# wifi-sensor/src/wifi_sensor_module.py
import asyncio
from typing import Any, ClassVar, Dict, Mapping, Optional
from typing_extensions import Self
from viam.proto.app.robot import ComponentConfig
from viam.proto.common import ResourceName
from viam.resource.base import ResourceBase
from viam.resource.types import Model, ModelFamily
class MySensor(Sensor):
# Subclass the Viam Sensor component and implement the required functions
MODEL: ClassVar[Model] = Model(ModelFamily("viam","sensor"), "linux-wifi")
@classmethod
def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
sensor = cls(config.name)
return sensor
After the resource model has a defined model and creator function, the resource model must be registered to the Registry
. This can be done using register_resource_creator()
and passing a ResourceCreatorRegistration
object with the creator function as a parameter.
In the main function of wifi_sensor_module.py
, call Registry.register_resource_creator()
with the subtype of the resource that was subclassed, the model name, and a ResourceCreatorRegistration
object with the creator function defined in the modular resource. In a more complex module, it makes sense to call the function in an __init__.py
file in the same folder as the new resource model. See here for an example.
# wifi-sensor/src/wifi_sensor_module.py
from viam.resource.registry import Registry, ResourceCreatorRegistration
async def main():
Registry.register_resource_creator(Sensor.SUBTYPE, MySensor.MODEL, ResourceCreatorRegistration(MySensor.new))
Click here to see the file with its new MODEL
variable, creator function, and main()
function.
Optional Validator and Reconfiguration Functions
Modules also have support for validator and reconfiguration functions.
Custom validation can be done by specifying a validate function. Validate functions can raise errors that will be returned to the parent through gRPC. Validate functions return a sequence of strings representing the implicit dependencies of the resource. If there are none, return an empty sequence []
.
For example, let’s say MySensor
had a multiplier
argument that is used to multiply the returned readings in get_readings()
. The validation function can check if a multiplier attribute was provided and prevent erroneous resource start ups.
Reconfiguration specifies what values can change based on a new configuration. To allow this, add a reconfigure()
function to the MySensor
class. A good pattern is to call reconfigure()
within new()
, since initialization and reconfiguration are usually very similar.
# ADD `Sequence` FROM `typing`.
from typing import Sequence
from viam.utils import SensorReading
class MySensor(Sensor):
# ADD A VALIDATOR FUNCTION
@classmethod
def validate_config(cls, config: ComponentConfig) -> Sequence[str]:
if "multiplier" in config.attributes.fields:
if not config.attributes.fields["multiplier"].HasField("number_value"):
raise Exception("Multiplier must be a float.")
multiplier = config.attributes.fields["multiplier"].number_value
if multiplier == 0:
raise Exception("Multiplier cannot be 0.")
# RETURN AN EMPTY SEQUENCE
return []
@classmethod
def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
sensor = cls(config.name)
# CALL RECONFIGURE TO INITIALIZE THE RESOURCE
sensor.reconfigure(config, dependencies)
return sensor
def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]):
if "multiplier" in config.attributes.fields:
multiplier = config.attributes.fields["multiplier"].number_value
else:
multiplier = 1.0
self.multiplier = multiplier
async def get_readings(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Mapping[str, SensorReading]:
with open("/proc/net/wireless") as wifi_stats:
content = wifi_stats.readlines()
result = [x for x in content[2].split(" ") if x != ""]
# MULTIPLY THE READINGS WITH MULTIPLIER
wifi_signal = []
for i in range(2:5):
wifi_signal.append(int(result[i]) * self.multiplier)
return {"link": wifi_signal[0], "level": wifi_signal[1], "noise": wifi_signal[2]}
If a validator function is added, it must also be called when registering the resource. The ResourceCreatorRegistration
should now also have the validator function passed in as the second parameter.
async def main():
Registry.register_resource_creator(Sensor.SUBTYPE, MySensor.MODEL, ResourceCreatorRegistration(MySensor.new, MySensor.validate_config))
Click here to see the new wifi_sensor_module.py
with the optional validator and reconfiguration functions.
3. Start the module
The module now has to be created and started. In the wifi_sensor_module.py
file, update the main function to the following. If there is a validator function in the class, don’t forget to add the function to the ResourceCreatorRegistration
object. In a more complex module, it makes sense to create an entirely new entrypoint file. See here for an example.
# wifi-sensor/src/wifi_sensor_module.py
from viam.module.module import Module
async def main():
"""
This function creates and starts a new module, after adding all desired resource model.
Resource creators must be registered to the resource registry before the module adds the resource model.
"""
Registry.register_resource_creator(Sensor.SUBTYPE, MySensor.MODEL, ResourceCreatorRegistration(MySensor.new))
module = Module.from_args()
module.add_model_from_registry(Sensor.SUBTYPE, MySensor.MODEL)
await module.start()
if __name__ == "__main__":
asyncio.run(main())
To see wifi_sensor_module.py
with all of the changes, click here.
4. Make the module executable
To add the module to your robot, you must provide it as an executable file that viam-server
can access.
Dependencies for the module (including Viam SDK) have to be installed. In the wifi-sensor
directory, create a new file called requirements.txt
that has all your dependencies. For this example, add viam-sdk
in requirements.txt
.
# add a version if viam should be pinned to a specific version
viam-sdk
One option with the Python SDK is to create a new shell script (.sh
) that runs your module, which can also be used to install the dependencies. For example, in the wifi-sensor
directory, add an run.sh
file:
#!/bin/sh
cd `dirname $0`
# Create a virtual environment to run our code
VENV_NAME="venv"
PYTHON="$VENV_NAME/bin/python"
ENV_ERROR="This module requires Python >=3.8, pip, and virtualenv to be installed."
if ! python3 -m venv $VENV_NAME >/dev/null 2>&1; then
echo "Failed to create virtualenv."
if command -v apt-get >/dev/null; then
echo "Detected Debian/Ubuntu, attempting to install python3-venv automatically."
SUDO="sudo"
if ! command -v $SUDO >/dev/null; then
SUDO=""
fi
if ! apt info python3-venv >/dev/null 2>&1; then
echo "Package info not found, trying apt update"
$SUDO apt -qq update >/dev/null
fi
$SUDO apt install -qqy python3-venv >/dev/null 2>&1
if ! python3 -m venv $VENV_NAME >/dev/null 2>&1; then
echo $ENV_ERROR >&2
exit 1
fi
else
echo $ENV_ERROR >&2
exit 1
fi
fi
# remove -U if viam-sdk should not be upgraded whenever possible
# -qq suppresses extraneous output from pip
echo "Virtualenv found/created. Installing/upgrading Python packages..."
if ! $PYTHON -m pip install -r requirements.txt -Uqq; then
exit 1
fi
# Be sure to use `exec` so that termination signals reach the python process,
# or handle forwarding termination signals manually
echo "Starting module..."
exec $PYTHON src/wifi-sensor.py $@
Please note that a more complex module should be run as a Python module, meaning that the file extension .py
has to be omitted. See here for an example.
To make this shell script executable, run this in the Terminal:
sudo chmod +x <FILEPATH>/<FILENAME>
<FILENAME>
would be run.sh
.
5. Configure a modular resource
NOTE: If you are adding your module to the registry, follow these instructions instead. Otherwise, continue with these instructions, which will show you how to configure the module locally.
Configure your new module on your robot by navigating to the Config tab of the robot’s page on the Viam app, then click on the Modules subtab. Add the name of your module and the executable path. For our example, the path would be <path-on-your-filesystem>/linux-wifi/run.sh
.
Once you have configured a module as part of your robot configuration, configure your modular resource made available by that module by adding new components or services configured with your modular resources’ new type or model. To instantiate a new resource from your module, specify the type
, model
, and name
of your modular resource. The aforementioned type
, model
, and name
should be the same as those in the MODEL
class attribute defined in your component class. This is a JSON example:
{
"components": [
{
"model": "viam:sensor:linux-wifi",
"attributes": {},
"depends_on": [],
"name": "my-sensor",
"type": "sensor"
}
],
"modules": [
{
"executable_path": "<path-on-your-filesystem>/linux-wifi/run.sh",
"name": "wifi_sensor"
}
]
}
Note the nomenclature of the above model
field, viam:sensor:linux-wifi
. Models are uniquely namespaced as colon-delimited-triplets in the form namespace:family:name
, and are named according to the Viam API that your model implements. A model with the viam
namespace is always Viam-provided. Read more about making custom namespaces here.
Viam also provides many built-in models that implement API capabilities, each using rdk
as the namespace
, and builtin
as the family
:
The
rdk:builtin:gpio
model of therdk:component:motor
API provides RDK support for GPIO-controlled DC motors.The
rdk:builtin:DMC4000
model of the samerdk:component:motor
API provides RDK support for the DMC4000 motor.
Custom Modular Arm Example
The following is an example of how to implement a custom modular arm. For further instructions, read the detailed Sensor
example above. Our custom Arm will be extremely simple. Please note that the minimum set of endpoints you need to implement for an arm are GetKinematics
, GetJointPositions
, and MoveToJointPositions
.
This arm example contains a minimal kinematics model. For a full model, take a look here.
Subclassing the Arm
component and implementing the required functions:
# modular-arm/src/my_modular_arm.py
import asyncio
import os
from typing import Any, ClassVar, Dict, Mapping, Optional, Tuple
from typing_extensions import Self
from viam.components.arm import Arm, JointPositions, KinematicsFileFormat, Pose
from viam.module.module import Module
from viam.operations import run_with_operation
from viam.proto.app.robot import ComponentConfig
from viam.proto.common import ResourceName
from viam.resource.base import ResourceBase
from viam.resource.registry import Registry, ResourceCreatorRegistration
from viam.resource.types import Model, ModelFamily
class MyModularArm(Arm):
# Subclass the Viam Arm component and implement the required functions
MODEL: ClassVar[Model] = Model(ModelFamily("viam", "arm"), "my-arm")
def __init__(self, name: str):
# Starting joint positions
self.joint_positions = JointPositions(values=[0, 0, 0, 0, 0, 0])
# Minimal working kinematics
self.kinematics = json.dumps(
{
"name": "MyArm",
"links": [{"id": "base", "parent": "world", "translation": {"x": 0, "y": 0, "z": 0}}],
"joints": [
{"id": "waist", "type": "revolute", "parent": "base", "axis": {"x": 0, "y": 0, "z": 1}, "max": 359, "min": -359}
],
}
).encode("utf-8")
super().__init__(name)
@classmethod
def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
arm = cls(config.name)
return arm
async def get_end_position(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Pose:
raise NotImplementedError()
async def move_to_position(self, pose: Pose, extra: Optional[Dict[str, Any]] = None, **kwargs):
raise NotImplementedError()
async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> JointPositions:
return self.joint_positions
@run_with_operation
async def move_to_joint_positions(self, positions: JointPositions, extra: Optional[Dict[str, Any]] = None, **kwargs):
operation = self.get_operation(kwargs)
self.is_stopped = False
# Simulate the length of time it takes for the arm to move to its new joint position
for x in range(10):
await asyncio.sleep(1)
# Check if the operation is cancelled and, if it is, stop the arm's motion
if await operation.is_cancelled():
await self.stop()
break
self.joint_positions = positions
self.is_stopped = True
async def stop(self, extra: Optional[Dict[str, Any]] = None, **kwargs):
self.is_stopped = True
async def is_moving(self) -> bool:
return not self.is_stopped
async def get_kinematics(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Tuple[KinematicsFileFormat.ValueType, bytes]:
return KinematicsFileFormat.KINEMATICS_FILE_FORMAT_SVA, self.kinematics
async def main():
"""This function creates and starts a new module, after adding all desired resource model.
Resource creators must be registered to the resource registry before the module adds the resource model.
"""
Registry.register_resource_creator(Arm.SUBTYPE, MyModularArm.MODEL, ResourceCreatorRegistration(MyModularArm.new))
module = Module.from_args()
module.add_model_from_registry(Arm.SUBTYPE, MyModularArm.MODEL)
await module.start()
if __name__ == "__main__":
asyncio.run(main())
Lastly, make the module executable and configure the module on your robot.
Create custom remotes
CAUTION Modular resources are the preferred method of creating custom resource implementations for the Python SDK unless you are hosting viam-server on a non-Linux system or have another issue with compilation.
While the main RDK is written in golang, you can create custom components in python and connect them to a robot as a remote
component. This allows you to extend the functionality of a robot, or even create an entire robot exclusively in python.
To create a custom component and connect it to the RDK:
Subclass a component and implement desired functions. This will be your custom component.
For functions you do not wish to implement, you must at least define them by putting
pass
orraiseNotImplementedError()
in the function.
Create an
rpc.server.Server
instance and register the custom component. This is theRPC
server that enables communication with your custom component.Start the
Server
and register the running server as a remote. Registering thisRPC
server as a remote allows the RDK to communicate with the server.
1. Subclass a component
The SDK provides a wide array of components to customize. You can browse through the API Reference to see all of them, but for now we’ll use an Arm
as an example. Our custom Arm will be extremely simple – it will only save and return the positions provided to it.
Let’s start by creating a directory called my-python-robot
. Inside of that directory, create a file called my_cool_arm.py
. The contents of my_cool_arm.py
should be as follows:
# my-python-robot/my_cool_arm.py
import asyncio
import json
from typing import Any, Dict, List, Optional, Tuple
from viam.components.arm import Arm, JointPositions, KinematicsFileFormat, Pose
from viam.operations import run_with_operation
from viam.proto.common import Capsule, Geometry, Sphere
class MyCoolArm(Arm):
# Subclass the Viam Arm component and implement the required functions
def __init__(self, name: str):
# Starting position
self.position = Pose(
x=0,
y=0,
z=0,
o_x=0,
o_y=0,
o_z=0,
theta=0,
)
# Starting joint positions
self.joint_positions = JointPositions(values=[0, 0, 0, 0, 0, 0])
self.is_stopped = True
self.geometries = [
Geometry(center=Pose(x=1, y=2, z=3, o_x=2, o_y=3, o_z=4, theta=20), sphere=Sphere(radius_mm=2)),
Geometry(center=Pose(x=1, y=2, z=3, o_x=2, o_y=3, o_z=4, theta=20), capsule=Capsule(radius_mm=3, length_mm=8)),
]
# Minimal working kinematics model
self.kinematics = json.dumps(
{
"name": "MyArm",
"links": [{"id": "base", "parent": "world", "translation": {"x": 0, "y": 0, "z": 0}}],
"joints": [
{"id": "waist", "type": "revolute", "parent": "base", "axis": {"x": 0, "y": 0, "z": 1}, "max": 359, "min": -359}
],
}
).encode("utf-8")
super().__init__(name)
async def get_end_position(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Pose:
return self.position
@run_with_operation
async def move_to_position(
self,
pose: Pose,
extra: Optional[Dict[str, Any]] = None,
**kwargs,
):
operation = self.get_operation(kwargs)
self.is_stopped = False
self.position = pose
# Simulate the length of time it takes for the arm to move to its new position
for x in range(10):
await asyncio.sleep(1)
# Check if the operation is cancelled and, if it is, stop the arm's motion
if await operation.is_cancelled():
await self.stop()
break
self.is_stopped = True
async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> JointPositions:
return self.joint_positions
@run_with_operation
async def move_to_joint_positions(self, positions: JointPositions, extra: Optional[Dict[str, Any]] = None, **kwargs):
operation = self.get_operation(kwargs)
self.is_stopped = False
self.joint_positions = positions
# Simulate the length of time it takes for the arm to move to its new joint position
for x in range(10):
await asyncio.sleep(1)
# Check if the operation is cancelled and, if it is, stop the arm's motion
if await operation.is_cancelled():
await self.stop()
break
self.is_stopped = True
async def stop(self, extra: Optional[Dict[str, Any]] = None, **kwargs):
self.is_stopped = True
async def is_moving(self) -> bool:
return not self.is_stopped
async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]:
return self.geometries
async def get_kinematics(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Tuple[KinematicsFileFormat.ValueType, bytes]:
return KinematicsFileFormat.KINEMATICS_FILE_FORMAT_SVA, self.kinematics
async def close(self):
# This is a completely optional function to include. This will be called when the resource is removed from the config or the module
# is shutting down.
self.logger.info(f"{self.name} is closed.")
You can view more component implementations in the examples.
This arm example contains a minimal kinematics model. For a full model, take a look here.
2. Register the custom component
Now that we’ve created the custom component, we must register it so that it will be visible to any robots connecting to it. This is done by creating an RPC
server and adding MyCoolArm
as a connected component. This RPC
server will receive gRPC requests from the Viam Server or any other connections and forward those requests to your custom component.
In the same my-python-robot
directory, create a new file called main.py
.
# my-python-robot/main.py
import asyncio
from viam.rpc.server import Server
from my_cool_arm import MyCoolArm
async def main():
srv = Server([MyCoolArm("my-arm")])
await srv.serve()
if __name__ == "__main__":
try:
asyncio.run(main())
except:
pass
3. Start the Server and add it as a Remote
Now that we have a server that knows about our custom Arm component, we need to start the server so it can receive gRPC calls. Once it’s started, we can then register this server as a remote.
# my-python-robot/main.py
async def main():
...
await srv.serve()
NB: When you call srv.serve()
, the default host and port is localhost:9090
. This can be changed by passing in a host
and/or port
parameter to the serve
function.
To use this custom server as part of a larger robot, you’ll want to add it as a remote
in the config for your main part.
[
{
"name": "my-cool-python-components", // The name of the remote, can be anything
"insecure": true, // Whether this connection should use SSL
"address": "localhost:9090" // The location of the remote robot
}
]
And to ensure that the python server starts up with the rest of the robot, you can add it as a process.
[
{
"id": "my-python-server-process",
"log": true,
"name": "python",
"args": [
"/home/pi/my-python-robot/main.py"
]
}
]
NB: The viam-server
starts as a root process, so you may need to switch users to run the python SDK server.
[
{
"id": "0",
"log": true,
"name": "sudo",
"args": [
"-u",
"pi",
"python",
"/home/pi/my-python-robot/main.py"
]
}
]
Convert a custom remote to a custom module
Modular resources are the preferred method of creating custom resource implementations for the Python SDK unless you are hosting viam-server on a non-Linux system or have another issue with compilation. Already existing custom remote resources can be converted to custom module resources.
This section will go over the steps it takes to convert a custom remote to a custom module.
MySensor
is currently a custom remote component, written here:
import asyncio
from typing import Any, Dict, List, Mapping, Optional
from viam.components.sensor import Geometry, Sensor
from viam.rpc.server import Server
from viam.utils import SensorReading
class MySensor(Sensor):
async def get_readings(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Mapping[str, SensorReading]:
with open("/proc/net/wireless") as wifi_stats:
content = wifi_stats.readlines()
wifi_signal = [x for x in content[2].split(" ") if x != ""]
return {"link": wifi_signal[2], "level": wifi_signal[3], "noise": wifi_signal[4]}
async def get_geometries(self) -> List[Geometry]:
raise NotImplementedError
async def main():
srv = Server([MySensor("my-sensor")])
await srv.serve()
if __name__ == "__main__":
asyncio.run(main())
The two steps that are required to convert a custom remote resource to a custom module resource are as follows:
Register the resource. This will register a new modular resource model into the Registry.
Make the module executable. This allows viam-server to access and execute the module.
Add a Model
name and a creator function in the resource by defining a new MODEL
variable in the class. Then add a function that will act as the creator function. For more details on how to add these two, follow the pertaining steps in Register the new modular resource.
# ADD THE NECESSARY IMPORTS TO DEFINE A MODEL NAME AND CREATOR FUNCTION.
from typing import ClassVar, Mapping
from typing_extensions import Self
from viam.proto.app.robot import ComponentConfig
from viam.proto.common import ResourceName
from viam.resource.base import ResourceBase
from viam.resource.types import Model, ModelFamily
class MySensor(Sensor):
# ADD A MODEL NAME IN THE EXISTING RESOURCE CLASS.
MODEL: ClassVar[Model] = Model(ModelFamily("viam-labs","sensor"), "test")
# ADD A CREATOR FUNCTION IN THE EXISTING RESOURCE CLASS.
@classmethod
def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
resource = cls(config.name)
return resource
In the resource’s main function or main.py
file, replace the content of main()
with the content below:
# ADD THE NECESSARY IMPORTS TO REGISTER THE NEW MODEL
from viam.module.module import Module
from viam.resource.registry import Registry, ResourceCreatorRegistration
# REPLACE THE MAIN FUNCTION/FILE SO THAT IT CALLS `register_resource_creator()` AND STARTS THE MODULE.
async def main():
Registry.register_resource_creator(Sensor.SUBTYPE, MySensor.MODEL, ResourceCreatorRegistration(MySensor.new))
module = Module.from_args()
module.add_model_from_registry(Sensor.SUBTYPE, MySensor.MODEL)
await module.start()
Follow the steps outlined in the section Make the module executable in the directory of the component.
The custom remote component is now a custom module component. To use the component on the Viam app, follow the steps to configure the new modular resource.
For easy visualization, the differences in the configuration JSONs and code are below.
Custom remote |
Custom module |
---|---|
{ |
{ |
Show code cell source
from IPython.core.display import display, Image
display(Image(filename="./codediff.png"))
Operations
Operations are used by the RDK to keep track of running tasks, and provide a means to query their status and even cancel them. In order for a component to respond appropriately to any Operation requests, we must obtain and listen to the Operation in our method calls.
In order to take advantage of operations, you should wrap the component method with the run_with_operation
decorator from the viam.operations
package. Each component has a function, get_operation(kwargs: Mapping[str, Any]) -> Operation
, that will return an Operation that will tell us if the operation is ever cancelled, allowing us to clean up any long running tasks, close connections, etc.
It is extremely important that we check the Operation
status, as this not only prevents any unnecessary resource usage, but also allows us to respond to urgent cancellation requests and stop components’ motion.
Connect as a client to app
To connect to app as a client and make calls to the data API, you should create an instance of a ViamClient
and retrieve its DataClient
member.
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
async def connect() -> ViamClient:
dial_options = DialOptions.with_api_key(api_key="<API_KEY>", api_key_id="<API_KEY_ID>")
return await ViamClient.create_from_dial_options(dial_options)
Once you have a connected ViamClient
, you can then obtain a DataClient
as a property.
data_client = viam_client.data_client
This DataClient
can be used to make method calls that retrieve data from app.
from datetime import datetime
from viam.utils import create_filter
left_motor_filter = create_filter(
component_name="left_motor", start_time=datetime(2023, 6, 5, 11), end_time=datetime(2023, 6, 5, 13, 30), tags=["speed_test_run"]
)
data = await data_client.tabular_data_by_filter(filter=left_motor_filter)
for tab in data:
print(tab)
[DataClient.TabularData(data={'IsPowered': False, 'PowerPct': 0.0}, metadata=method_name: "IsPowered"
, time_requested=datetime.datetime(2022, 1, 1, 1, 1, 1), time_received=datetime.datetime(2022, 12, 31, 23, 59, 59)), DataClient.TabularData(data={'IsPowered': False, 'PowerPct': 0.0}, metadata=location_id: "loc-id"
, time_requested=datetime.datetime(2023, 1, 2, 0, 0), time_received=datetime.datetime(2023, 3, 4, 0, 0)), DataClient.TabularData(data={'Position': 0.0}, metadata=, time_requested=datetime.datetime(2023, 5, 6, 0, 0), time_received=datetime.datetime(2023, 7, 8, 0, 0))]
0
You can also use your DataClient
to upload data to app.
time_requested_1 = datetime(2023, 6, 5, 11)
time_received_1 = datetime(2023, 6, 5, 11, 0, 3)
await data_client.tabular_data_capture_upload(
part_id="<ID>", # Unique ID of the relevant robot part.
component_type="rdk:component:motor",
component_name="left_motor",
method_name="IsPowered",
method_parameters=None,
tags=["tag_1", "tag_2"],
data_request_times=[(time_requested_1, time_received_1)],
tabular_data=[{"PowerPCT": 0, "IsPowered": False}],
)
''
ViamClient
objects also house an AppClient
, which can be used to retrieve information about your organization in App or make certain changes to it. The AppClient
can be obtained in a manner similar to the DataClient
.
app_client = viam_client.app_client
With this AppClient
, you can access information about entities within your organization. For example, you can list information about all robots under your organization.
my_organizations = await app_client.list_organizations()
MY_ORG_ID = my_organizations[0].id
my_locations = await app_client.list_locations(org_id=MY_ORG_ID)
robots = []
for location in my_locations:
more_robots = await app_client.list_robots(location_id=location.id)
robots += more_robots
for robot in robots:
print(robot.name)
robot-0
robot-1
robot-2
robot-3
You can also create new robots/robot parts or make updates to robots/robot parts that already already exist, or even delete them!
# Create a new robot named "new_rover".
NEW_ROBOT_ID = await app_client.new_robot(name="new_rover", location_id=my_locations[0].id)
# Change the new robot's name to "rover" and assign it a new parent location.
updated_robot = await app_client.update_robot(robot_id=NEW_ROBOT_ID, name="rover", location_id="<LOCATION_ID")
# Delete the new robot.
await app_client.delete_robot(robot_id=NEW_ROBOT_ID)
Certain types returned by AppClient
methods (namely, RobotPart
, LogEntry
, Fragment
, and RobotPartHistoryEntry
objects) are wrapped in mirror classes that convert proto data into a friendlier, more readable format. Each of these mirrors still allows access to the initial proto type returned by the response as a property.
logs = await app_client.get_robot_part_logs(robot_part_id="<ID>", num_log_entries=1)
assert logs[0].caller is not None
for item in logs[0].caller.items():
print(f"{item[0]}: {item[1]}")
print(f"\n*****PROTO*****\n")
print(logs[0].proto.caller)
Function: go.viam.com/rdk/robot/impl.(*resourceManager).updateResources
Line: 922.0
Defined: True
File: /__w/rdk/rdk/robot/impl/resource_manager.go
*****PROTO*****
fields {
key: "Line"
value {
number_value: 922
}
}
fields {
key: "Function"
value {
string_value: "go.viam.com/rdk/robot/impl.(*resourceManager).updateResources"
}
}
fields {
key: "File"
value {
string_value: "/__w/rdk/rdk/robot/impl/resource_manager.go"
}
}
fields {
key: "Defined"
value {
bool_value: true
}
}
At the end, you may close the connection.
viam_client.close()