"""Driver for Waveshare SBC/microcontroller extension boards."""
from statistics import mean
from herosdevices.hardware.texas_instruments.ads1256 import ADS1256, ADS1256_CHANNEL, ADS1256_DRATE, ADS1256_GAIN
from herosdevices.helper import mark_driver
from herosdevices.interfaces import atomiq
[docs]
@mark_driver(
name="ADS1256 Board",
info="High-Precision AD/DA Board",
product_page="https://www.waveshare.com/wiki/High-Precision_AD/DA_Board",
state="beta",
requires={"gpiod": "gpiod", "spidev": "spidev"},
)
class WaveshareADS1256(atomiq.ADC):
"""This class represents one channel of the Waveshare ADS1256 ADC board.
.. note::
Concurrent access to the SPI device is not possible. This means only one object with a given spi_bus
and spi_device can exist.
If you would like to access individual channels of this ADC consider creating objects of
:class:WaveshareADS1256Channel in addition to an object of this class.
Args:
spi_bus: number of the SPI bus
spi_device: number of the SPI device in bus.
gain: gain to set in the PGIA
drate: conversion rate of the ADS1256. See .ads1256.ADS1256_DRATE for valid values
vref: Reference voltage given to the ADC. Usually 3.3V or 5V
samples: number of samples to average when sending out measured values as heros datasource
channel_name_map: dict to map the internal names of the pin (AIN0...AIN7) to user-definable names. The names
are used if the operated as HEROS datasource. Example: {"AIN0": "my_voltage", "AIN2": "custom_sensor_3"}
"""
def __init__(
self,
spi_bus: int = 0,
spi_device: int = 0,
rst_pin: int = 18,
drdy_pin: int = 17,
gain: int = 1,
drate: str = "2000SPS",
vref: float = 5.0,
channel_name_map: dict[str, str] | None = None
) -> None:
self.gain = ADS1256_GAIN[f"GAIN_{gain}"]
self.drate = ADS1256_DRATE[f"DRATE_{drate}"]
self.vref = vref
self.channel_name_map = channel_name_map if channel_name_map is not None else {}
self.channel_name_map_reverse = {value: key for key, value in self.channel_name_map.items()}
self._adc = ADS1256(spi_bus,
spi_device,
drdy_pin=drdy_pin,
rst_pin=rst_pin,
default_gain=self.gain,
default_drate=self.drate
)
def _calculate_voltage(self, data: float) -> float:
return self.vref * float(data) / (1 << 23) / int(self.gain.name.split("_")[-1])
def _channel_from_name(self, channel_name: str) -> ADS1256_CHANNEL:
if channel_name in self.channel_name_map_reverse:
return ADS1256_CHANNEL[self.channel_name_map_reverse[channel_name]]
return ADS1256_CHANNEL[channel_name]
[docs]
def get_samples(self, samples: int = 1, channel: str = "", neg_pin: str = "AINCOM") -> list[float]:
"""
Measure the given number of samples from the given channel.
Args:
samples (int): Number of samples.
channel (str): Name of the pin where the positive end of the voltage is applied
neg_pin (str): Name of the pin where the negative end of the voltage is applied
Returns:
float: Measured voltage in volts.
"""
return list(map(self._calculate_voltage, self._adc.get_channel_value(self._channel_from_name(channel),
self._channel_from_name(neg_pin),
samples
)))
[docs]
def measure(self, samples: int = 1, channel: str = "", neg_pin: str = "AINCOM") -> float:
"""
Measure the voltage by averaging multiple ADC samples.
Args:
samples (int): Number of samples to average.
channel (str): Name of the pin where the positive end of the voltage is applied
neg_pin (str): Name of the pin where the negative end of the voltage is applied
Returns:
float: Measured voltage in volts.
"""
return mean(self.get_samples(samples, channel, neg_pin))
def _observable_data(self) -> dict[str, tuple[float, str]]:
data = {}
for channel in ADS1256_CHANNEL:
if channel == ADS1256_CHANNEL["AINCOM"]:
continue
name = self.channel_name_map.get(channel.name, channel.name)
data[name] = (self.measure(20, channel=name), "V")
return data
[docs]
@mark_driver(
name="Single Channel of a ADS1256 Board",
info="High-Precision AD/DA Board",
product_page="https://www.waveshare.com/wiki/High-Precision_AD/DA_Board",
state="beta",
requires={"gpiod": "gpiod", "spidev": "spidev"},
)
class WaveshareADS1256Channel(atomiq.ADCChannel):
"""A Single Channel of the Waveshare ADS1256 ADC board.
Args:
waveshare_ads1256: Object or HERO of class WaveshareADS1256
posPin: Name of the pin receiving the positive voltage. I.e. AIN0 ... AIN7 or AINCOM
negPin: Name of the pin receiving the negative voltage. I.e. AIN0 ... AIN7 or AINCOM. For non-differential
measurements this is usually AINCOM.
"""
def __init__(self,
waveshare_ads1256: WaveshareADS1256,
pos_pin: str,
neg_pin: str = "AINCOM",
samples: int = 1,
) -> None:
self.adc = waveshare_ads1256
self.pos_pin = pos_pin
self.neg_pin = neg_pin
self.samples = samples
[docs]
def measure(self, samples: int | None = None, cached: bool = False, channel: str = "") -> float: # noqa: ARG002
"""Measure voltage at the current channel.
Args:
samples: Number of samples to average.
cached: not used
channel: not used
"""
samples = self.samples if samples is None else samples
return self.adc.measure(samples=samples, channel=self.pos_pin, neg_pin=self.neg_pin)
def _observable_data(self) -> tuple[float, str]:
return (self.measure(), "V")