Source code for herosdevices.hardware.raspberrypi.waveshare

"""Driver for Waveshare SBC/microcontroller extension boards."""

from statistics import mean

from herosdevices.hardware.texas_instruments.ads1256 import ADS1256, ADS1256_CHANNEL, ADS1256_DRATE, ADS1256_GAIN
from herosdevices.helper import mark_driver
from herosdevices.interfaces import atomiq


[docs] @mark_driver( name="ADS1256 Board", info="High-Precision AD/DA Board", product_page="https://www.waveshare.com/wiki/High-Precision_AD/DA_Board", state="beta", requires={"gpiod": "gpiod", "spidev": "spidev"}, ) class WaveshareADS1256(atomiq.ADC): """This class represents one channel of the Waveshare ADS1256 ADC board. .. note:: Concurrent access to the SPI device is not possible. This means only one object with a given spi_bus and spi_device can exist. If you would like to access individual channels of this ADC consider creating objects of :class:WaveshareADS1256Channel in addition to an object of this class. Args: spi_bus: number of the SPI bus spi_device: number of the SPI device in bus. gain: gain to set in the PGIA drate: conversion rate of the ADS1256. See .ads1256.ADS1256_DRATE for valid values vref: Reference voltage given to the ADC. Usually 3.3V or 5V samples: number of samples to average when sending out measured values as heros datasource channel_name_map: dict to map the internal names of the pin (AIN0...AIN7) to user-definable names. The names are used if the operated as HEROS datasource. Example: {"AIN0": "my_voltage", "AIN2": "custom_sensor_3"} """ def __init__( self, spi_bus: int = 0, spi_device: int = 0, rst_pin: int = 18, drdy_pin: int = 17, gain: int = 1, drate: str = "2000SPS", vref: float = 5.0, channel_name_map: dict[str, str] | None = None ) -> None: self.gain = ADS1256_GAIN[f"GAIN_{gain}"] self.drate = ADS1256_DRATE[f"DRATE_{drate}"] self.vref = vref self.channel_name_map = channel_name_map if channel_name_map is not None else {} self.channel_name_map_reverse = {value: key for key, value in self.channel_name_map.items()} self._adc = ADS1256(spi_bus, spi_device, drdy_pin=drdy_pin, rst_pin=rst_pin, default_gain=self.gain, default_drate=self.drate ) def _calculate_voltage(self, data: float) -> float: return self.vref * float(data) / (1 << 23) / int(self.gain.name.split("_")[-1]) def _channel_from_name(self, channel_name: str) -> ADS1256_CHANNEL: if channel_name in self.channel_name_map_reverse: return ADS1256_CHANNEL[self.channel_name_map_reverse[channel_name]] return ADS1256_CHANNEL[channel_name]
[docs] def get_samples(self, samples: int = 1, channel: str = "", neg_pin: str = "AINCOM") -> list[float]: """ Measure the given number of samples from the given channel. Args: samples (int): Number of samples. channel (str): Name of the pin where the positive end of the voltage is applied neg_pin (str): Name of the pin where the negative end of the voltage is applied Returns: float: Measured voltage in volts. """ return list(map(self._calculate_voltage, self._adc.get_channel_value(self._channel_from_name(channel), self._channel_from_name(neg_pin), samples )))
[docs] def measure(self, samples: int = 1, channel: str = "", neg_pin: str = "AINCOM") -> float: """ Measure the voltage by averaging multiple ADC samples. Args: samples (int): Number of samples to average. channel (str): Name of the pin where the positive end of the voltage is applied neg_pin (str): Name of the pin where the negative end of the voltage is applied Returns: float: Measured voltage in volts. """ return mean(self.get_samples(samples, channel, neg_pin))
def _observable_data(self) -> dict[str, tuple[float, str]]: data = {} for channel in ADS1256_CHANNEL: if channel == ADS1256_CHANNEL["AINCOM"]: continue name = self.channel_name_map.get(channel.name, channel.name) data[name] = (self.measure(20, channel=name), "V") return data
[docs] @mark_driver( name="Single Channel of a ADS1256 Board", info="High-Precision AD/DA Board", product_page="https://www.waveshare.com/wiki/High-Precision_AD/DA_Board", state="beta", requires={"gpiod": "gpiod", "spidev": "spidev"}, ) class WaveshareADS1256Channel(atomiq.ADCChannel): """A Single Channel of the Waveshare ADS1256 ADC board. Args: waveshare_ads1256: Object or HERO of class WaveshareADS1256 posPin: Name of the pin receiving the positive voltage. I.e. AIN0 ... AIN7 or AINCOM negPin: Name of the pin receiving the negative voltage. I.e. AIN0 ... AIN7 or AINCOM. For non-differential measurements this is usually AINCOM. """ def __init__(self, waveshare_ads1256: WaveshareADS1256, pos_pin: str, neg_pin: str = "AINCOM", samples: int = 1, ) -> None: self.adc = waveshare_ads1256 self.pos_pin = pos_pin self.neg_pin = neg_pin self.samples = samples
[docs] def measure(self, samples: int | None = None, cached: bool = False, channel: str = "") -> float: # noqa: ARG002 """Measure voltage at the current channel. Args: samples: Number of samples to average. cached: not used channel: not used """ samples = self.samples if samples is None else samples return self.adc.measure(samples=samples, channel=self.pos_pin, neg_pin=self.neg_pin)
def _observable_data(self) -> tuple[float, str]: return (self.measure(), "V")