viam.services.navigation.client
Classes
Connect to the NavigationService, which allows the robot to navigate to specified locations. |
Module Contents
- class viam.services.navigation.client.NavigationClient(name: str, channel: grpclib.client.Channel)[source]
Bases:
viam.services.navigation.navigation.Navigation
,viam.resource.rpc_client_base.ReconfigurableResourceRPCClientBase
Connect to the NavigationService, which allows the robot to navigate to specified locations.
- channel
- async get_paths(*, timeout: float | None = None, **kwargs) → List[viam.proto.service.navigation.Path][source]
Get each path, the series of geo points the robot plans to travel through to get to a destination waypoint, in the machine’s motion planning.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Get a list containing each path stored by the navigation service paths = await my_nav.get_paths()
- Parameters:
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
- Returns:
An array comprised of Paths, where each path is either a user-provided destination or a Waypoint, along with the corresponding set of geopoints. This outlines the route the machine is expected to take to reach the specified destination or Waypoint.
- Return type:
List[navigation.Path]
For more information, see Navigation service.
- async get_location(*, timeout: float | None = None, **kwargs) → viam.services.navigation.GeoPoint[source]
Get the current location of the robot in the navigation service.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Get the current location of the robot in the navigation service location = await my_nav.get_location()
- Parameters:
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
- Returns:
The current location of the robot in the navigation service, represented in a GeoPoint with latitude and longitude values.
- Return type:
For more information, see Navigation service.
- async get_obstacles(*, timeout: float | None = None, **kwargs) → List[viam.services.navigation.GeoGeometry][source]
Get an array or list of the obstacles currently in the service’s data storage. These are objects designated for the robot to avoid when navigating. These include all transient obstacles which are discovered by the vision services configured for the navigation service, in addition to the obstacles that are configured as a part of the service.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Get a list containing each obstacle stored by the navigation service obstacles = await my_nav.get_obstacles()
- Parameters:
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
- Returns:
A list comprised of each GeoGeometry in the service’s data storage. These are objects designated for the robot to avoid when navigating.
- Return type:
List[navigation.GeoGeometry]
For more information, see Navigation service.
- async get_waypoints(*, timeout: float | None = None, **kwargs) → List[viam.services.navigation.Waypoint][source]
Get an array of waypoints currently in the service’s data storage. These are locations designated within a path for the robot to navigate to.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Get a list containing each waypoint stored by the navigation service waypoints = await my_nav.get_waypoints()
- Parameters:
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
- Returns:
An array comprised of each Waypoint in the service’s data storage. These are locations designated within a path for the robot to navigate to.
- Return type:
List[navigation.Waypoint]
For more information, see Navigation service.
- async add_waypoint(point: viam.services.navigation.GeoPoint, *, timeout: float | None = None, **kwargs)[source]
Add a waypoint to the service’s data storage.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Create a new waypoint with latitude and longitude values of 0 degrees location = GeoPoint(latitude=0, longitude=0) # Add your waypoint to the service's data storage await my_nav.add_waypoint(point=location)
- Parameters:
point (navigation.GeoPoint) – The current location of the robot in the navigation service, represented in a GeoPoint with latitude and longitude values.
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
For more information, see Navigation service.
- async remove_waypoint(id: str, *, timeout: float | None = None, **kwargs)[source]
Remove a waypoint from the service’s data storage. If the robot is currently navigating to this waypoint, the motion will be canceled, and the robot will proceed to the next waypoint.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Remove the waypoint matching that ObjectID from the service's data storage await my_nav.remove_waypoint(waypoint_id)
- Parameters:
id (str) – The MongoDB ObjectID of the Waypoint to remove from the service’s data storage.
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
For more information, see Navigation service.
- async get_mode(*, timeout: float | None = None, **kwargs) → viam.services.navigation.Mode.ValueType[source]
Get the Mode the service is operating in.
There are two options for modes: MODE_MANUAL or MODE_WAYPOINT.
MODE_WAYPOINT: Start to look for added waypoints and begin autonomous navigation. MODE_MANUAL: Stop autonomous navigation between waypoints and allow the base to be controlled manually.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Get the Mode the service is operating in await my_nav.get_mode()
- Parameters:
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
- Returns:
The Mode the service is operating in.
- Return type:
navigation.Mode.ValueType
For more information, see Navigation service.
- async set_mode(mode: viam.services.navigation.Mode.ValueType, *, timeout: float | None = None, **kwargs)[source]
Set the Mode the service is operating in.
There are two options for modes: MODE_MANUAL or MODE_WAYPOINT.
MODE_WAYPOINT: Start to look for added waypoints and begin autonomous navigation. MODE_MANUAL: Stop autonomous navigation between waypoints and allow the base to be controlled manually.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Set the Mode the service is operating in to MODE_WAYPOINT and begin navigation await my_nav.set_mode(Mode.ValueType.MODE_WAYPOINT)
- Parameters:
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
mode (navigation.Mode.ValueType) – The Mode for the service to operate in.
For more information, see Navigation service.
- async get_properties(*, timeout: float | None = None, **kwargs) → viam.services.navigation.MapType.ValueType[source]
Get information about the navigation service.
my_nav = NavigationClient.from_robot(robot=machine, name="my_nav_service") # Get the properties of the current navigation service. nav_properties = await my_nav.get_properties()
- Parameters:
timeout (Optional[float]) – An option to set how long to wait (in seconds) before calling a time-out and closing the underlying RPC call.
- Returns:
Information about the type of map the service is using.
- Return type:
MapType.ValueType
For more information, see Navigation service.
- async do_command(command: Mapping[str, viam.utils.ValueTypes], *, timeout: float | None = None, **kwargs) → Mapping[str, viam.utils.ValueTypes][source]
Send/receive arbitrary commands.
service = SERVICE.from_robot(robot=machine, "builtin") # replace SERVICE with the appropriate class my_command = { "cmnd": "dosomething", "someparameter": 52 } # Can be used with any resource, using the motion service as an example await service.do_command(command=my_command)
- Parameters:
command (Dict[str, ValueTypes]) – The command to execute
- Returns:
Result of the executed command
- Return type:
Dict[str, ValueTypes]
- classmethod from_robot(robot: viam.robot.client.RobotClient, name: str) → typing_extensions.Self
Get the service named
name
from the provided robot.async def connect() -> RobotClient: # Replace "<API-KEY>" (including brackets) with your API key and "<API-KEY-ID>" with your API key ID options = RobotClient.Options.with_api_key("<API-KEY>", "<API-KEY-ID>") # Replace "<MACHINE-URL>" (included brackets) with your machine's connection URL or FQDN return await RobotClient.at_address("<MACHINE-URL>", options) async def main(): robot = await connect() # Can be used with any resource, using the motion service as an example motion = MotionClient.from_robot(robot=machine, name="builtin") robot.close()
- Parameters:
robot (RobotClient) – The robot
name (str) – The name of the service
- Returns:
The service, if it exists on the robot
- Return type:
Self
- classmethod get_resource_name(name: str) → viam.proto.common.ResourceName
Get the ResourceName for this Resource with the given name
# Can be used with any resource, using an arm as an example my_arm_name = Arm.get_resource_name("my_arm")
- Parameters:
name (str) – The name of the Resource
- Returns:
The ResourceName of this Resource
- Return type:
- get_operation(kwargs: Mapping[str, Any]) → viam.operations.Operation
Get the
Operation
associated with the currently running function.When writing custom resources, you should get the
Operation
by calling this function and check to see if it’s cancelled. If theOperation
is cancelled, then you can perform any necessary (terminating long running tasks, cleaning up connections, etc. ).- Parameters:
kwargs (Mapping[str, Any]) – The kwargs object containing the operation
- Returns:
The operation associated with this function
- Return type:
- async close()
Safely shut down the resource and prevent further use.
Close must be idempotent. Later configuration may allow a resource to be “open” again. If a resource does not want or need a close function, it is assumed that the resource does not need to return errors when future non-Close methods are called.
await component.close()