import abc
from dataclasses import dataclass
from datetime import timedelta
from multiprocessing import Queue
from typing import Any, Callable, Dict, Final, List, Optional
from viam.proto.common import BoardStatus
from viam.proto.component.board import PowerMode
from viam.resource.types import RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT, Subtype
from ..component_base import ComponentBase
PostProcessor = Callable[[int], int]
[docs]class Board(ComponentBase):
"""
Board represents a physical general purpose compute board that contains various
components such as analog readers, and digital interrupts.
This acts as an abstract base class for any drivers representing specific
board implementations. This cannot be used on its own. If the ``__init__()`` function is
overridden, it must call the ``super().__init__()`` function.
"""
SUBTYPE: Final = Subtype(RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT, "board")
[docs] @dataclass
class Attributes:
remote: bool
"""
Indicates whether this board is accessed over a remote connection, e.g. gRPC.
"""
[docs] class AnalogReader(ComponentBase):
"""
AnalogReader represents an analog pin reader that resides on a Board.
"""
[docs] @abc.abstractmethod
async def read(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
"""
Read the current value.
Returns:
int: The current value.
"""
...
[docs] class DigitalInterrupt(ComponentBase):
"""
DigitalInterrupt represents a configured interrupt on the Board that
when interrupted, calls the added callbacks. Post processors can
be added to modify what Value it ultimately returns.
"""
[docs] @abc.abstractmethod
async def value(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
"""
Get the current value of the interrupt,
which is based on the type of interrupt.
Returns:
int: The current value.
"""
...
[docs] @abc.abstractmethod
async def tick(self, high: bool, nanos: int):
"""
This method is to be called either manually if the interrupt
is a proxy to some real hardware interrupt or for tests.
Args:
high (bool): If the signal of the interrupt is high.
nanos (int): Nanoseconds from an arbitrary point in time,
but always increasing and always needs to be accurate.
Using ``time.time_ns()`` would be acceptable.
"""
...
[docs] @abc.abstractmethod
async def add_callback(self, queue: Queue):
"""
Add a callback to be sent the low/high value on ``tick()``.
Args:
queue (Queue): The receiving queue.
"""
...
[docs] @abc.abstractmethod
async def add_post_processor(self, processor: PostProcessor):
"""
Add a post processor that should be used to modify what
is returned by ``self.value()``
Args:
processor (PostProcessor): The post processor to add.
"""
...
[docs] class GPIOPin(ComponentBase):
"""
Abstract representation of an individual GPIO pin on a board
"""
[docs] @abc.abstractmethod
async def set(self, high: bool, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs):
"""
Set the pin to either low or high.
Args:
high (bool): When true, sets the pin to high. When false, sets the pin to low.
"""
...
[docs] @abc.abstractmethod
async def get(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> bool:
"""
Get the high/low state of the pin.
Returns:
bool: Indicates if the state of the pin is high.
"""
...
[docs] @abc.abstractmethod
async def get_pwm(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> float:
"""
Get the pin's given duty cycle.
Returns:
float: The duty cycle.
"""
...
[docs] @abc.abstractmethod
async def set_pwm(self, duty_cycle: float, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs):
"""
Set the pin to the given ``duty_cycle``.
Args:
duty_cycle (float): The duty cycle.
"""
...
[docs] @abc.abstractmethod
async def get_pwm_frequency(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
"""
Get the PWM frequency of the pin.
Returns:
int: The PWM frequency.
"""
...
[docs] @abc.abstractmethod
async def set_pwm_frequency(
self,
frequency: int,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
"""
Set the pin to the given PWM ``frequency`` (in Hz).
When ``frequency`` is 0, it will use the board's default PWM frequency.
Args:
frequency (int): The frequency, in Hz.
"""
...
[docs] @abc.abstractmethod
async def analog_reader_by_name(self, name: str) -> AnalogReader:
"""
Get an AnalogReader by ``name``.
Args:
name (str): Name of the analog reader to be retrieved.
Returns:
AnalogReader: The analog reader.
"""
...
[docs] @abc.abstractmethod
async def digital_interrupt_by_name(self, name: str) -> DigitalInterrupt:
"""
Get a DigitalInterrupt by ``name``.
Args:
name (str): Name of the digital interrupt.
Returns:
DigitalInterrupt: the digital interrupt.
"""
...
[docs] @abc.abstractmethod
async def gpio_pin_by_name(self, name: str) -> GPIOPin:
"""
Get a GPIO Pin by ``name``.
Args:
name (str): Name of the GPIO pin.
Returns:
GPIOPin: the pin.
"""
...
[docs] @abc.abstractmethod
async def analog_reader_names(self) -> List[str]:
"""
Get the names of all known analog readers.
Returns:
List[str]: The names of the analog readers..
"""
...
[docs] @abc.abstractmethod
async def digital_interrupt_names(self) -> List[str]:
"""
Get the names of all known digital interrupts.
Returns:
List[str]: The names of the digital interrupts.
"""
...
[docs] @abc.abstractmethod
async def status(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> BoardStatus:
"""
Return the current status of the board.
Returns:
viam.proto.common.BoardStatus: the status.
"""
...
[docs] @abc.abstractmethod
async def set_power_mode(
self, mode: PowerMode.ValueType, duration: Optional[timedelta] = None, *, timeout: Optional[float] = None, **kwargs
):
"""
Set the board to the indicated power mode.
Args:
mode: the desired power mode
"""
...
[docs] @abc.abstractmethod
async def write_analog(self, pin: str, value: int, *, timeout: Optional[float] = None, **kwargs):
"""
Write an analog value to a pin on the board.
Args:
pin (str): name of the pin.
value (int): value to write.
"""
...