Example usage

The Viam SDK can be used in three ways:

  1. As a client to connect to a (remote or local) robot

  2. Integrate custom resources to a robot

  3. As a client to connect to app.viam.com to upload and retrieve data

Connect as a client

To connect to a robot as a client, you should instantiate an instance of a RobotClient

from viam import logging
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions


async def connect() -> RobotClient:
    options = RobotClient.Options(dial_options=DialOptions(insecure=True, disable_webrtc=True), log_level=logging.FATAL)
    return await RobotClient.at_address("localhost:9091", options)

You can also create a RobotClient by providing an existing connection

from viam import logging
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions, dial


async def connect_with_channel() -> RobotClient:
    async with await dial("localhost:9091", DialOptions(insecure=True, disable_webrtc=True)) as channel:
        return await RobotClient.with_channel(channel, RobotClient.Options(refresh_interval=10, log_level=logging.FATAL))


robot = await connect_with_channel()
print(robot.resource_names)
await robot.close()
Hide code cell output
[<viam.proto.common.ResourceName rdk:component:pose_tracker/pose_tracker0 at 0x7efe25b70ef0>, <viam.proto.common.ResourceName rdk:component:audio_input/audio_input0 at 0x7efe25b70f90>, <viam.proto.common.ResourceName rdk:component:base/base0 at 0x7efe25b70e50>, <viam.proto.common.ResourceName rdk:component:servo/servo0 at 0x7efe25b70fe0>, <viam.proto.common.ResourceName rdk:component:encoder/encoder0 at 0x7efe25b71030>, <viam.proto.common.ResourceName rdk:service:slam/slam0 at 0x7efe25b71080>, <viam.proto.common.ResourceName rdk:component:gantry/gantry0 at 0x7efe25b710d0>, <viam.proto.common.ResourceName rdk:component:motor/motor0 at 0x7efe25b71120>, <viam.proto.common.ResourceName rdk:component:gripper/gripper0 at 0x7efe25b71170>, <viam.proto.common.ResourceName rdk:component:movement_sensor/movement_sensor0 at 0x7efe25b711c0>, <viam.proto.common.ResourceName rdk:service:mlmodel/mlmodel0 at 0x7efe25b71210>, <viam.proto.common.ResourceName rdk:component:camera/camera0 at 0x7efe25b71260>, <viam.proto.common.ResourceName rdk:component:sensor/sensor0 at 0x7efe25b712b0>, <viam.proto.common.ResourceName rdk:component:board/board at 0x7efe25b71300>, <viam.proto.common.ResourceName rdk:component:arm/arm0 at 0x7efe25b71350>]

Once you have a connected RobotClient, you can then obtain the robot’s components by their name

from viam.components.camera import Camera
from viam.media.video import CameraMimeType
from viam.media.utils.pil import viam_to_pil_image

robot = await connect_with_channel()
camera = Camera.from_robot(robot, "camera0")
image = await camera.get_image(CameraMimeType.JPEG)
pil = viam_to_pil_image(image)
pil.save("foo.png")

# Don't forget to close the robot when you're done!
await robot.close()
Hide code cell outputs
<viam.media.video.ViamImage at 0x7efe242d5f10>

You can also use the RobotClient to make service calls to the connected robot.

from viam.services.vision import VisionClient


async def vision():
    robot = await connect_with_channel()
    vision = VisionClient.from_robot(robot)
    detections = await vision.get_detections_from_camera("camera_1", "detector_1")

At the end, don’t forget to close the connection

async def cleanup():
    await robot.close()

Create custom modules

Make a modular resource from an existing resource API. In this document, we will be going over subclassing existing resource APIs. To learn more about creating a new resource API, see the Viam docs, and complex module example.

For a fully fleshed-out example of a Python module that uses Github CI to upload to the Viam Registry, take a look at python-example-module. For an example that uses Docker to manage dependencies, take a look at python-container-module. For a list of example modules in different Viam SDKs, take a look here.

The code below resembles the simple module example, so look there for the final, completed modular resource.

The steps required in creating a modular resource and connecting it to a robot are:

  1. Subclass a resource and implement desired functions. This will be your custom resource model.

    • For functions that you do not wish to implement, you must at least define them by putting pass or raise NotImplementedError() in the function.

  2. Register a new model. This will register the new modular resource model into the Registry.

  3. Start the module. Create and start the new module.

  4. Make the module executable. This allows viam-server to access and execute the module.

  5. Configure a modular resource. Use the new module and instantiate a new resource to a robot!

Knowing this, let’s implement a custom resource.

1. Subclass a resource

The SDK provides a wide array of resource APIs to customize. You can browse through the API Reference to see the available resources. Subclass a resource and implement the required functions. You may leave the other methods unimplemented by putting pass or raise NotImplementedError() in the function.

This example uses a Sensor as an example.

Create a sensor that returns the Wi-Fi signal of a Pi. Start by creating a directory on the Pi named wifi-sensor, then create a directory inside the wifi-sensor directory named src. Then, inside the src directory, create a new file named wifi_sensor_module.py. Copy the code below into your wifi_sensor_module.py file:

# wifi-sensor/src/wifi_sensor_module.py
import asyncio
from typing import Any, Dict, Mapping, Optional

from viam.components.sensor import Sensor
from viam.utils import SensorReading


class MySensor(Sensor):
    # Subclass the Viam Sensor component and implement the required functions
    async def get_readings(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Mapping[str, SensorReading]:
        with open("/proc/net/wireless") as wifi_stats:
            content = wifi_stats.readlines()
        wifi_signal = [x for x in content[2].split(" ") if x != ""]
        return {"link": wifi_signal[2], "level": wifi_signal[3], "noise": wifi_signal[4]}
    
    async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]:
        raise NotImplementedError

    async def close(self):
        # This is a completely optional function to include. This will be called when the resource is removed from the config or the module
        # is shutting down.
        self.logger.info(f"{self.name} is closed.")

# Anything below this line is optional and will be replaced later, but may come in handy for debugging and testing.
# To use, call `python wifi_sensor_module.py` in the command line while in the `src` directory.
async def main():
    wifi=MySensor(name="wifi")
    signal = await wifi.get_readings()
    print(signal)

if __name__ == '__main__':
    asyncio.run(main())

You can view more component implementations in the examples.

2. Register the new modular resource

Now that we’ve created the modular resource model, we must register it to the registry.

Define a Model name and a creator function in the resource. A creator function is a function that can create a resource given a config and map of dependencies.

To define a creator function and Model in wifi_sensor_module.py, import ComponentConfig, ResourceName, ResourceBase, Model, and ModelFamily from the relative viam packages. Please note that the import packages also have to be changed to include Mapping and ClassVar from typing and Self from typing_extensions. The changes can be seen in the example below.

In your wifi_sensor_module.py, define a new MODEL variable in your MySensor class, shown below. Then add a new() function that will act as our creator function. We get the name from the config and return the resource. We can also assign class variables based on what the config has.

# wifi-sensor/src/wifi_sensor_module.py
import asyncio
from typing import Any, ClassVar, Dict, Mapping, Optional
from typing_extensions import Self

from viam.proto.app.robot import ComponentConfig
from viam.proto.common import ResourceName
from viam.resource.base import ResourceBase
from viam.resource.types import Model, ModelFamily

class MySensor(Sensor):
    # Subclass the Viam Sensor component and implement the required functions
    MODEL: ClassVar[Model] = Model(ModelFamily("viam","sensor"), "linux-wifi")

    @classmethod
    def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
        sensor = cls(config.name)
        return sensor

After the resource model has a defined model and creator function, the resource model must be registered to the Registry. This can be done using register_resource_creator() and passing a ResourceCreatorRegistration object with the creator function as a parameter.

In the main function of wifi_sensor_module.py, call Registry.register_resource_creator() with the subtype of the resource that was subclassed, the model name, and a ResourceCreatorRegistration object with the creator function defined in the modular resource. In a more complex module, it makes sense to call the function in an __init__.py file in the same folder as the new resource model. See here for an example.

# wifi-sensor/src/wifi_sensor_module.py
from viam.resource.registry import Registry, ResourceCreatorRegistration

async def main():
    Registry.register_resource_creator(Sensor.SUBTYPE, MySensor.MODEL, ResourceCreatorRegistration(MySensor.new))

Click here to see the file with its new MODEL variable, creator function, and main() function.

Optional Validator and Reconfiguration Functions

Modules also have support for validator and reconfiguration functions.

Custom validation can be done by specifying a validate function. Validate functions can raise errors that will be returned to the parent through gRPC. Validate functions return a sequence of strings representing the implicit dependencies of the resource. If there are none, return an empty sequence [].

For example, let’s say MySensor had a multiplier argument that is used to multiply the returned readings in get_readings(). The validation function can check if a multiplier attribute was provided and prevent erroneous resource start ups.

Reconfiguration specifies what values can change based on a new configuration. To allow this, add a reconfigure() function to the MySensor class. A good pattern is to call reconfigure() within new(), since initialization and reconfiguration are usually very similar.

# ADD `Sequence` FROM `typing`.
from typing import Sequence

from viam.utils import SensorReading

class MySensor(Sensor):
    # ADD A VALIDATOR FUNCTION 
    @classmethod
    def validate_config(cls, config: ComponentConfig) -> Sequence[str]:
        if "multiplier" in config.attributes.fields:
            if not config.attributes.fields["multiplier"].HasField("number_value"):
                raise Exception("Multiplier must be a float.")
            multiplier = config.attributes.fields["multiplier"].number_value
            if multiplier == 0:
                raise Exception("Multiplier cannot be 0.")
        # RETURN AN EMPTY SEQUENCE
        return []

    @classmethod
    def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
        sensor = cls(config.name)
        # CALL RECONFIGURE TO INITIALIZE THE RESOURCE
        sensor.reconfigure(config, dependencies)        
        return sensor

    def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]):
        if "multiplier" in config.attributes.fields:
            multiplier = config.attributes.fields["multiplier"].number_value
        else:
            multiplier = 1.0
        self.multiplier = multiplier

    async def get_readings(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Mapping[str, SensorReading]:
        with open("/proc/net/wireless") as wifi_stats:
            content = wifi_stats.readlines()
        result = [x for x in content[2].split(" ") if x != ""]
        
        # MULTIPLY THE READINGS WITH MULTIPLIER
        wifi_signal = []
        for i in range(2:5):
            wifi_signal.append(int(result[i]) * self.multiplier)
        return {"link": wifi_signal[0], "level": wifi_signal[1], "noise": wifi_signal[2]}

If a validator function is added, it must also be called when registering the resource. The ResourceCreatorRegistration should now also have the validator function passed in as the second parameter.

        async def main():
            Registry.register_resource_creator(Sensor.SUBTYPE, MySensor.MODEL, ResourceCreatorRegistration(MySensor.new, MySensor.validate_config))

Click here to see the new wifi_sensor_module.py with the optional validator and reconfiguration functions.

3. Start the module

The module now has to be created and started. In the wifi_sensor_module.py file, update the main function to the following. If there is a validator function in the class, don’t forget to add the function to the ResourceCreatorRegistration object. In a more complex module, it makes sense to create an entirely new entrypoint file. See here for an example.

# wifi-sensor/src/wifi_sensor_module.py
from viam.module.module import Module


async def main():
    """
    This function creates and starts a new module, after adding all desired resource model.
    Resource creators must be registered to the resource registry before the module adds the resource model.
    """
    Registry.register_resource_creator(Sensor.SUBTYPE, MySensor.MODEL, ResourceCreatorRegistration(MySensor.new))

    module = Module.from_args()
    module.add_model_from_registry(Sensor.SUBTYPE, MySensor.MODEL)
    await module.start()


if __name__ == "__main__":
    asyncio.run(main())

To see wifi_sensor_module.py with all of the changes, click here.

4. Make the module executable

To add the module to your robot, you must provide it as an executable file that viam-server can access.

Dependencies for the module (including Viam SDK) have to be installed. In the wifi-sensor directory, create a new file called requirements.txt that has all your dependencies. For this example, add viam-sdk in requirements.txt.

# add a version if viam should be pinned to a specific version
viam-sdk

One option with the Python SDK is to create a new shell script (.sh) that runs your module, which can also be used to install the dependencies. For example, in the wifi-sensor directory, add an run.sh file:

#!/bin/sh
cd `dirname $0`

# Create a virtual environment to run our code
VENV_NAME="venv"
PYTHON="$VENV_NAME/bin/python"
ENV_ERROR="This module requires Python >=3.8, pip, and virtualenv to be installed."

if ! python3 -m venv $VENV_NAME >/dev/null 2>&1; then
    echo "Failed to create virtualenv."
    if command -v apt-get >/dev/null; then
        echo "Detected Debian/Ubuntu, attempting to install python3-venv automatically."
        SUDO="sudo"
        if ! command -v $SUDO >/dev/null; then
            SUDO=""
        fi
		if ! apt info python3-venv >/dev/null 2>&1; then
			echo "Package info not found, trying apt update"
			$SUDO apt -qq update >/dev/null
		fi
        $SUDO apt install -qqy python3-venv >/dev/null 2>&1
        if ! python3 -m venv $VENV_NAME >/dev/null 2>&1; then
            echo $ENV_ERROR >&2
            exit 1
        fi
    else
        echo $ENV_ERROR >&2
        exit 1
    fi
fi

# remove -U if viam-sdk should not be upgraded whenever possible
# -qq suppresses extraneous output from pip
echo "Virtualenv found/created. Installing/upgrading Python packages..."
if ! $PYTHON -m pip install -r requirements.txt -Uqq; then
    exit 1
fi

# Be sure to use `exec` so that termination signals reach the python process,
# or handle forwarding termination signals manually
echo "Starting module..."
exec $PYTHON src/wifi-sensor.py $@

Please note that a more complex module should be run as a Python module, meaning that the file extension .py has to be omitted. See here for an example.

To make this shell script executable, run this in the Terminal:

sudo chmod +x <FILEPATH>/<FILENAME>

<FILENAME> would be run.sh.

5. Configure a modular resource

NOTE: If you are adding your module to the registry, follow these instructions instead. Otherwise, continue with these instructions, which will show you how to configure the module locally.

Configure your new module on your robot by navigating to the Config tab of the robot’s page on the Viam app, then click on the Modules subtab. Add the name of your module and the executable path. For our example, the path would be <path-on-your-filesystem>/linux-wifi/run.sh.

Once you have configured a module as part of your robot configuration, configure your modular resource made available by that module by adding new components or services configured with your modular resources’ new type or model. To instantiate a new resource from your module, specify the type, model, and name of your modular resource. The aforementioned type, model, and name should be the same as those in the MODEL class attribute defined in your component class. This is a JSON example:

{
  "components": [
    {
      "model": "viam:sensor:linux-wifi",
      "attributes": {},
      "depends_on": [],
      "name": "my-sensor",
      "type": "sensor"
    }
  ],
  "modules": [
    {
      "executable_path": "<path-on-your-filesystem>/linux-wifi/run.sh",
      "name": "wifi_sensor"
    }
  ]
}

Note the nomenclature of the above model field, viam:sensor:linux-wifi. Models are uniquely namespaced as colon-delimited-triplets in the form namespace:family:name, and are named according to the Viam API that your model implements. A model with the viam namespace is always Viam-provided. Read more about making custom namespaces here.

Viam also provides many built-in models that implement API capabilities, each using rdk as the namespace, and builtin as the family:

  • The rdk:builtin:gpio model of the rdk:component:motor API provides RDK support for GPIO-controlled DC motors.

  • The rdk:builtin:DMC4000 model of the same rdk:component:motor API provides RDK support for the DMC4000 motor.

Custom Modular Arm Example

The following is an example of how to implement a custom modular arm. For further instructions, read the detailed Sensor example above. Our custom Arm will be extremely simple. Please note that the minimum set of endpoints you need to implement for an arm are GetKinematics, GetJointPositions, and MoveToJointPositions.

This arm example contains a minimal kinematics model. For a full model, take a look here.

Subclassing the Arm component and implementing the required functions:

# modular-arm/src/my_modular_arm.py
import asyncio
import os
from typing import Any, ClassVar, Dict, Mapping, Optional, Tuple
from typing_extensions import Self

from viam.components.arm import Arm, JointPositions, KinematicsFileFormat, Pose
from viam.module.module import Module
from viam.operations import run_with_operation
from viam.proto.app.robot import ComponentConfig
from viam.proto.common import ResourceName
from viam.resource.base import ResourceBase
from viam.resource.registry import Registry, ResourceCreatorRegistration
from viam.resource.types import Model, ModelFamily


class MyModularArm(Arm):
    # Subclass the Viam Arm component and implement the required functions
    MODEL: ClassVar[Model] = Model(ModelFamily("viam", "arm"), "my-arm")

    def __init__(self, name: str):
        # Starting joint positions
        self.joint_positions = JointPositions(values=[0, 0, 0, 0, 0, 0])


        # Minimal working kinematics
        self.kinematics = json.dumps(
            {
                "name": "MyArm",
                "links": [{"id": "base", "parent": "world", "translation": {"x": 0, "y": 0, "z": 0}}],
                "joints": [
                    {"id": "waist", "type": "revolute", "parent": "base", "axis": {"x": 0, "y": 0, "z": 1}, "max": 359, "min": -359}
                ],
            }
        ).encode("utf-8")
        super().__init__(name)

    @classmethod
    def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
        arm = cls(config.name)
        return arm

    async def get_end_position(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Pose:
        raise NotImplementedError()

    async def move_to_position(self, pose: Pose, extra: Optional[Dict[str, Any]] = None, **kwargs):
        raise NotImplementedError()

    async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> JointPositions:
        return self.joint_positions

    @run_with_operation
    async def move_to_joint_positions(self, positions: JointPositions, extra: Optional[Dict[str, Any]] = None, **kwargs):
        operation = self.get_operation(kwargs)

        self.is_stopped = False

        # Simulate the length of time it takes for the arm to move to its new joint position
        for x in range(10):
            await asyncio.sleep(1)

            # Check if the operation is cancelled and, if it is, stop the arm's motion
            if await operation.is_cancelled():
                await self.stop()
                break
        
        self.joint_positions = positions
        self.is_stopped = True

    async def stop(self, extra: Optional[Dict[str, Any]] = None, **kwargs):
        self.is_stopped = True

    async def is_moving(self) -> bool:
        return not self.is_stopped

    async def get_kinematics(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Tuple[KinematicsFileFormat.ValueType, bytes]:
        return KinematicsFileFormat.KINEMATICS_FILE_FORMAT_SVA, self.kinematics

async def main():
    """This function creates and starts a new module, after adding all desired resource model.
    Resource creators must be registered to the resource registry before the module adds the resource model.
    """
    Registry.register_resource_creator(Arm.SUBTYPE, MyModularArm.MODEL, ResourceCreatorRegistration(MyModularArm.new))
    module = Module.from_args()
    module.add_model_from_registry(Arm.SUBTYPE, MyModularArm.MODEL)
    await module.start()


if __name__ == "__main__":
    asyncio.run(main())

Lastly, make the module executable and configure the module on your robot.

Create custom remotes

CAUTION Modular resources are the preferred method of creating custom resource implementations for the Python SDK unless you are hosting viam-server on a non-Linux system or have another issue with compilation.

While the main RDK is written in golang, you can create custom components in python and connect them to a robot as a remote component. This allows you to extend the functionality of a robot, or even create an entire robot exclusively in python.

To create a custom component and connect it to the RDK:

  1. Subclass a component and implement desired functions. This will be your custom component.

    • For functions you do not wish to implement, you must at least define them by putting pass or raiseNotImplementedError() in the function.

  2. Create an rpc.server.Server instance and register the custom component. This is the RPC server that enables communication with your custom component.

  3. Start the Server and register the running server as a remote. Registering this RPC server as a remote allows the RDK to communicate with the server.

1. Subclass a component

The SDK provides a wide array of components to customize. You can browse through the API Reference to see all of them, but for now we’ll use an Arm as an example. Our custom Arm will be extremely simple – it will only save and return the positions provided to it.

Let’s start by creating a directory called my-python-robot. Inside of that directory, create a file called my_cool_arm.py. The contents of my_cool_arm.py should be as follows:

# my-python-robot/my_cool_arm.py

import asyncio
import json
from typing import Any, Dict, List, Optional, Tuple

from viam.components.arm import Arm, JointPositions, KinematicsFileFormat, Pose
from viam.operations import run_with_operation
from viam.proto.common import Capsule, Geometry, Sphere


class MyCoolArm(Arm):
    # Subclass the Viam Arm component and implement the required functions

    def __init__(self, name: str):
        # Starting position
        self.position = Pose(
            x=0,
            y=0,
            z=0,
            o_x=0,
            o_y=0,
            o_z=0,
            theta=0,
        )

        # Starting joint positions
        self.joint_positions = JointPositions(values=[0, 0, 0, 0, 0, 0])
        self.is_stopped = True
        self.geometries = [
            Geometry(center=Pose(x=1, y=2, z=3, o_x=2, o_y=3, o_z=4, theta=20), sphere=Sphere(radius_mm=2)),
            Geometry(center=Pose(x=1, y=2, z=3, o_x=2, o_y=3, o_z=4, theta=20), capsule=Capsule(radius_mm=3, length_mm=8)),
        ]

        # Minimal working kinematics model
        self.kinematics = json.dumps(
            {
                "name": "MyArm",
                "links": [{"id": "base", "parent": "world", "translation": {"x": 0, "y": 0, "z": 0}}],
                "joints": [
                    {"id": "waist", "type": "revolute", "parent": "base", "axis": {"x": 0, "y": 0, "z": 1}, "max": 359, "min": -359}
                ],
            }
        ).encode("utf-8")
        super().__init__(name)

    async def get_end_position(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Pose:
        return self.position

    @run_with_operation
    async def move_to_position(
        self,
        pose: Pose,
        extra: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        operation = self.get_operation(kwargs)

        self.is_stopped = False
        self.position = pose

        # Simulate the length of time it takes for the arm to move to its new position
        for x in range(10):
            await asyncio.sleep(1)

            # Check if the operation is cancelled and, if it is, stop the arm's motion
            if await operation.is_cancelled():
                await self.stop()
                break

        self.is_stopped = True

    async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> JointPositions:
        return self.joint_positions

    @run_with_operation
    async def move_to_joint_positions(self, positions: JointPositions, extra: Optional[Dict[str, Any]] = None, **kwargs):
        operation = self.get_operation(kwargs)

        self.is_stopped = False
        self.joint_positions = positions

        # Simulate the length of time it takes for the arm to move to its new joint position
        for x in range(10):
            await asyncio.sleep(1)

            # Check if the operation is cancelled and, if it is, stop the arm's motion
            if await operation.is_cancelled():
                await self.stop()
                break

        self.is_stopped = True

    async def stop(self, extra: Optional[Dict[str, Any]] = None, **kwargs):
        self.is_stopped = True

    async def is_moving(self) -> bool:
        return not self.is_stopped

    async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]:
        return self.geometries

    async def get_kinematics(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Tuple[KinematicsFileFormat.ValueType, bytes]:
        return KinematicsFileFormat.KINEMATICS_FILE_FORMAT_SVA, self.kinematics

    async def close(self):
        # This is a completely optional function to include. This will be called when the resource is removed from the config or the module
        # is shutting down.
        self.logger.info(f"{self.name} is closed.")

You can view more component implementations in the examples.

This arm example contains a minimal kinematics model. For a full model, take a look here.

2. Register the custom component

Now that we’ve created the custom component, we must register it so that it will be visible to any robots connecting to it. This is done by creating an RPC server and adding MyCoolArm as a connected component. This RPC server will receive gRPC requests from the Viam Server or any other connections and forward those requests to your custom component.

In the same my-python-robot directory, create a new file called main.py.

# my-python-robot/main.py

import asyncio
from viam.rpc.server import Server

from my_cool_arm import MyCoolArm


async def main():
    srv = Server([MyCoolArm("my-arm")])
    await srv.serve()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except:
        pass

3. Start the Server and add it as a Remote

Now that we have a server that knows about our custom Arm component, we need to start the server so it can receive gRPC calls. Once it’s started, we can then register this server as a remote.

# my-python-robot/main.py

async def main():
  ...
  await srv.serve()

NB: When you call srv.serve(), the default host and port is localhost:9090. This can be changed by passing in a host and/or port parameter to the serve function.

To use this custom server as part of a larger robot, you’ll want to add it as a remote in the config for your main part.

[
    {
      "name": "my-cool-python-components", // The name of the remote, can be anything
      "insecure": true,                    // Whether this connection should use SSL
      "address": "localhost:9090"          // The location of the remote robot
    }
  ]

And to ensure that the python server starts up with the rest of the robot, you can add it as a process.

[
  {
    "id": "my-python-server-process",
    "log": true,
    "name": "python",
    "args": [
      "/home/pi/my-python-robot/main.py"
    ]
  }
]

NB: The viam-server starts as a root process, so you may need to switch users to run the python SDK server.

[
  {
    "id": "0",
    "log": true,
    "name": "sudo",
    "args": [
      "-u",
      "pi",
      "python",
      "/home/pi/my-python-robot/main.py"
    ]
  }
]

Convert a custom remote to a custom module

Modular resources are the preferred method of creating custom resource implementations for the Python SDK unless you are hosting viam-server on a non-Linux system or have another issue with compilation. Already existing custom remote resources can be converted to custom module resources.

This section will go over the steps it takes to convert a custom remote to a custom module. MySensor is currently a custom remote component, written here:

import asyncio
from typing import Any, Dict, List, Mapping, Optional

from viam.components.sensor import Geometry, Sensor
from viam.rpc.server import Server
from viam.utils import SensorReading


class MySensor(Sensor):
    async def get_readings(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Mapping[str, SensorReading]:
        with open("/proc/net/wireless") as wifi_stats:
            content = wifi_stats.readlines()
        wifi_signal = [x for x in content[2].split(" ") if x != ""]
        return {"link": wifi_signal[2], "level": wifi_signal[3], "noise": wifi_signal[4]}

    async def get_geometries(self) -> List[Geometry]:
        raise NotImplementedError


async def main():
    srv = Server([MySensor("my-sensor")])
    await srv.serve()


if __name__ == "__main__":
    asyncio.run(main())

The two steps that are required to convert a custom remote resource to a custom module resource are as follows:

  1. Register the resource. This will register a new modular resource model into the Registry.

  2. Make the module executable. This allows viam-server to access and execute the module.

Add a Model name and a creator function in the resource by defining a new MODEL variable in the class. Then add a function that will act as the creator function. For more details on how to add these two, follow the pertaining steps in Register the new modular resource.

# ADD THE NECESSARY IMPORTS TO DEFINE A MODEL NAME AND CREATOR FUNCTION.
from typing import ClassVar, Mapping
from typing_extensions import Self

from viam.proto.app.robot import ComponentConfig
from viam.proto.common import ResourceName
from viam.resource.base import ResourceBase
from viam.resource.types import Model, ModelFamily

class MySensor(Sensor):
    # ADD A MODEL NAME IN THE EXISTING RESOURCE CLASS.
    MODEL: ClassVar[Model] = Model(ModelFamily("viam-labs","sensor"), "test")

    # ADD A CREATOR FUNCTION IN THE EXISTING RESOURCE CLASS.
    @classmethod
    def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
        resource = cls(config.name)
        return resource

In the resource’s main function or main.py file, replace the content of main() with the content below:

# ADD THE NECESSARY IMPORTS TO REGISTER THE NEW MODEL
from viam.module.module import Module
from viam.resource.registry import Registry, ResourceCreatorRegistration

# REPLACE THE MAIN FUNCTION/FILE SO THAT IT CALLS `register_resource_creator()` AND STARTS THE MODULE.
async def main():
    Registry.register_resource_creator(Sensor.SUBTYPE, MySensor.MODEL, ResourceCreatorRegistration(MySensor.new))

    module = Module.from_args()
    module.add_model_from_registry(Sensor.SUBTYPE, MySensor.MODEL)
    await module.start()

Follow the steps outlined in the section Make the module executable in the directory of the component.

The custom remote component is now a custom module component. To use the component on the Viam app, follow the steps to configure the new modular resource.

For easy visualization, the differences in the configuration JSONs and code are below.

Custom remote

Custom module

{
“remotes”: [
{
“name”: “my-sensor”,
“insecure”: true,
“address”: “localhost:9090”
}
],
“processes”: [
{
“id”: “my-python-server-process”,
“log”: true,
“name”: “python”,
“args”: [
“/home/my_sensor/my_sensor.py”
]
}
]
}

{
“components”: [
{
“depends_on”: [],
“model”: “viam-labs:sensor:test”,
“name”: “my-sensor”,
“type”: “sensor”,
“attributes”: {}
}
],
“modules”: [
{
“executable_path”: “/home/my_sensor/run.sh”,
“name”: “testsensor”,
“type”: “local”
}
]
}

Hide code cell source
from IPython.core.display import display, Image

display(Image(filename="./codediff.png"))
../_images/3217c6da0cbe6fc51d6f40e845b3f2c3ab886988041ea6513264ba959f9e7465.png

Operations

Operations are used by the RDK to keep track of running tasks, and provide a means to query their status and even cancel them. In order for a component to respond appropriately to any Operation requests, we must obtain and listen to the Operation in our method calls.

In order to take advantage of operations, you should wrap the component method with the run_with_operation decorator from the viam.operations package. Each component has a function, get_operation(kwargs: Mapping[str, Any]) -> Operation, that will return an Operation that will tell us if the operation is ever cancelled, allowing us to clean up any long running tasks, close connections, etc.

It is extremely important that we check the Operation status, as this not only prevents any unnecessary resource usage, but also allows us to respond to urgent cancellation requests and stop components’ motion.

Connect as a client to app

To connect to app as a client and make calls to the data API, you should create an instance of a ViamClient and retrieve its DataClient member.

from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient


async def connect() -> ViamClient:
    dial_options = DialOptions.with_api_key(api_key="<API_KEY>", api_key_id="<API_KEY_ID>")
    return await ViamClient.create_from_dial_options(dial_options)

Once you have a connected ViamClient, you can then obtain a DataClient as a property.

data_client = viam_client.data_client

This DataClient can be used to make method calls that retrieve data from app.

from datetime import datetime
from viam.utils import create_filter

left_motor_filter = create_filter(
    component_name="left_motor", start_time=datetime(2023, 6, 5, 11), end_time=datetime(2023, 6, 5, 13, 30), tags=["speed_test_run"]
)

data = await data_client.tabular_data_by_filter(filter=left_motor_filter)
for tab in data:
    print(tab)
[DataClient.TabularData(data={'IsPowered': False, 'PowerPct': 0.0}, metadata=method_name: "IsPowered"
, time_requested=datetime.datetime(2022, 1, 1, 1, 1, 1), time_received=datetime.datetime(2022, 12, 31, 23, 59, 59)), DataClient.TabularData(data={'IsPowered': False, 'PowerPct': 0.0}, metadata=location_id: "loc-id"
, time_requested=datetime.datetime(2023, 1, 2, 0, 0), time_received=datetime.datetime(2023, 3, 4, 0, 0)), DataClient.TabularData(data={'Position': 0.0}, metadata=, time_requested=datetime.datetime(2023, 5, 6, 0, 0), time_received=datetime.datetime(2023, 7, 8, 0, 0))]
0

You can also use your DataClient to upload data to app.

time_requested_1 = datetime(2023, 6, 5, 11)
time_received_1 = datetime(2023, 6, 5, 11, 0, 3)

await data_client.tabular_data_capture_upload(
    part_id="<ID>",  # Unique ID of the relevant robot part.
    component_type="rdk:component:motor",
    component_name="left_motor",
    method_name="IsPowered",
    method_parameters=None,
    tags=["tag_1", "tag_2"],
    data_request_times=[(time_requested_1, time_received_1)],
    tabular_data=[{"PowerPCT": 0, "IsPowered": False}],
)
''

ViamClient objects also house an AppClient, which can be used to retrieve information about your organization in App or make certain changes to it. The AppClient can be obtained in a manner similar to the DataClient.

app_client = viam_client.app_client

With this AppClient, you can access information about entities within your organization. For example, you can list information about all robots under your organization.

my_organizations = await app_client.list_organizations()
MY_ORG_ID = my_organizations[0].id
my_locations = await app_client.list_locations(org_id=MY_ORG_ID)
robots = []

for location in my_locations:
    more_robots = await app_client.list_robots(location_id=location.id)
    robots += more_robots

for robot in robots:
    print(robot.name)
robot-0
robot-1
robot-2
robot-3

You can also create new robots/robot parts or make updates to robots/robot parts that already already exist, or even delete them!

# Create a new robot named "new_rover".
NEW_ROBOT_ID = await app_client.new_robot(name="new_rover", location_id=my_locations[0].id)

# Change the new robot's name to "rover" and assign it a new parent location.
updated_robot = await app_client.update_robot(robot_id=NEW_ROBOT_ID, name="rover", location_id="<LOCATION_ID")

# Delete the new robot.
await app_client.delete_robot(robot_id=NEW_ROBOT_ID)

Certain types returned by AppClient methods (namely, RobotPart, LogEntry, Fragment, and RobotPartHistoryEntry objects) are wrapped in mirror classes that convert proto data into a friendlier, more readable format. Each of these mirrors still allows access to the initial proto type returned by the response as a property.

logs = await app_client.get_robot_part_logs(robot_part_id="<ID>", num_log_entries=1)
assert logs[0].caller is not None
for item in logs[0].caller.items():
    print(f"{item[0]}: {item[1]}")

print(f"\n*****PROTO*****\n")
print(logs[0].proto.caller)
Function: go.viam.com/rdk/robot/impl.(*resourceManager).updateResources
Line: 922.0
Defined: True
File: /__w/rdk/rdk/robot/impl/resource_manager.go

*****PROTO*****

fields {
  key: "Line"
  value {
    number_value: 922
  }
}
fields {
  key: "Function"
  value {
    string_value: "go.viam.com/rdk/robot/impl.(*resourceManager).updateResources"
  }
}
fields {
  key: "File"
  value {
    string_value: "/__w/rdk/rdk/robot/impl/resource_manager.go"
  }
}
fields {
  key: "Defined"
  value {
    bool_value: true
  }
}

At the end, you may close the connection.

viam_client.close()