"""Drivers for the ADS1256 24-Bit, 30kSPS, 8-Channel ADC."""
# TODO: make gpiod and spidev optional dependencies
import time
from enum import Enum
import gpiod
import spidev # type: ignore
from herosdevices.helper import log
[docs]
class ADS1256_CHANNEL(Enum):
"""Multiplexer channel."""
AIN0 = 0
AIN1 = 1
AIN2 = 2
AIN3 = 3
AIN4 = 4
AIN5 = 5
AIN6 = 6
AIN7 = 7
AINCOM = 8
[docs]
class ADS1256_GAIN(Enum):
"""Gain of the programmable gain amplifier.
For voltage ranges for each gain settings refer to ADS1256 datasheet Table 8.
"""
GAIN_1 = 0 # GAIN 1
GAIN_2 = 1 # GAIN 2
GAIN_4 = 2 # GAIN 4
GAIN_8 = 3 # GAIN 8
GAIN_16 = 4 # GAIN 16
GAIN_32 = 5 # GAIN 32
GAIN_64 = 6 # GAIN 64
[docs]
class ADS1256_DRATE(Enum):
"""Possible A/D Datarates.
Refer to table: DRATE: A/D Data Rate (Address 03h) in the datasheet for more information.
"""
DRATE_30000SPS = 0xF0 # reset the default values
DRATE_15000SPS = 0xE0
DRATE_7500SPS = 0xD0
DRATE_3750SPS = 0xC0
DRATE_2000SPS = 0xB0
DRATE_1000SPS = 0xA1
DRATE_500SPS = 0x92
DRATE_100SPS = 0x82
DRATE_60SPS = 0x72
DRATE_50SPS = 0x63
DRATE_30SPS = 0x53
DRATE_25SPS = 0x43
DRATE_15SPS = 0x33
DRATE_10SPS = 0x20
DRATE_5SPS = 0x13
DRATE_2d5SPS = 0x03
[docs]
class ADS1256_REG(Enum):
"""Registers of the ADS1256."""
STATUS = 0 # x1H
MUX = 1 # 01H
ADCON = 2 # 20H
DRATE = 3 # F0H
IO = 4 # E0H
OFC0 = 5 # xxH
OFC1 = 6 # xxH
OFC2 = 7 # xxH
FSC0 = 8 # xxH
FSC1 = 9 # xxH
FSC2 = 10 # xxH
# command definition
[docs]
class ADS1256_CMD(Enum):
"""Command definitions.
Refer to table 24 in the datasheet for details.
"""
WAKEUP = 0x00 # Completes SYNC and Exits Standby Mode 0000 0000 (00h)
RDATA = 0x01 # Read Data 0000 0001 (01h)
RDATAC = 0x03 # Read Data Continuously 0000 0011 (03h)
SDATAC = 0x0F # Stop Read Data Continuously 0000 1111 (0Fh)
RREG = 0x10 # Read from REG rrr 0001 rrrr (1xh)
WREG = 0x50 # Write to REG rrr 0101 rrrr (5xh)
# Offset and Gain Self-Calibration 1111 0000 (F0h)
SELFCAL = 0xF0
SELFOCAL = 0xF1 # Offset Self-Calibration 1111 0001 (F1h)
SELFGCAL = 0xF2 # Gain Self-Calibration 1111 0010 (F2h)
SYSOCAL = 0xF3 # System Offset Calibration 1111 0011 (F3h)
SYSGCAL = 0xF4 # System Gain Calibration 1111 0100 (F4h)
# Synchronize the A/D Conversion 1111 1100 (FCh)
SYNC = 0xFC
STANDBY = 0xFD # Begin Standby Mode 1111 1101 (FDh)
RESET = 0xFE # Reset to Power-Up Values 1111 1110 (FEh)
[docs]
def delay_us(delay: int) -> None:
"""Delay execution by `delay` microseconds (Uses sleep function).
Args:
delay: delay in microseconds.
"""
time.sleep(delay / 1e6)
[docs]
class ADS1256:
"""Interfacing the ADS1256 SPI ADC (as e.g. used on the Waveshare high-precision AD-DA board)."""
_instances = []
_init_done = False
def __new__(cls, *args, **_kwargs) -> "ADS1256":
"""Avoid SPI conflict when multiple instances of same bus are requested.
Two instances working concurrently on the same bus might mess things up. We thus return the running instance
if a second instance with the same SPI parameters is requested
"""
running_inst = [
inst["object"] for inst in cls._instances if inst["bus"] == args[0] and inst["device"] == args[1]
]
if len(running_inst) == 0:
instance = object.__new__(cls)
cls._instances.append({"bus": args[0], "device": args[1], "object": instance})
else:
instance = running_inst[0]
log.warning(
"A second instance of ADS1256 on the same SPI bus was requested. Returned the existing instance"
)
return instance
def __init__(
self,
spi_bus: int = 0,
spi_device: int = 0,
cs_pin: int = 22,
rst_pin: int = 18,
drdy_pin: int = 17,
spi=None, # noqa: ANN001 TODO: no idea what the type is here, needs to be checked
gpio_device: str = "/dev/gpiochip0",
default_gain: ADS1256_GAIN = ADS1256_GAIN.GAIN_1,
default_drate: ADS1256_DRATE = ADS1256_DRATE.DRATE_30000SPS,
) -> None:
"""
Interfacing the ADS1256 SPI ADC (as e.g. used on the Waveshare high-precision AD-DA board).
Args:
spi_bus: Number of the SPI bus the ADS1256 is attached to
spi_device: Device number at the SPI bus the ADS1256 is attached to
cs_pin: Pin number of the chip select pin
rst_pin: pin number of the reset pin
drdy_pin: pin number of the data ready pin
spi: optional spidev device. If this is given, spi_bus and spi_device are ignored.
gpio_device: full path to kernel character device that holds the needed gpios. Typically "/dev/gpiochip0"
default_gain: default gain to set when initializing the device
default_drate: default data rate to set when initializing the device
"""
if not self._init_done:
# init SPI bus
self._spi = spi if spi is not None else spidev.SpiDev(spi_bus, spi_device)
self.scan_mode = 0
self._spi.max_speed_hz = 20000
self._spi.mode = 0b01
# init GPIOs
self.gpio_chip = gpiod.Chip(gpio_device)
self.gpios = self.gpio_chip.request_lines(
consumer="ADS1256",
config={
cs_pin: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT),
rst_pin: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT),
drdy_pin: gpiod.LineSettings(direction=gpiod.line.Direction.INPUT, bias=gpiod.line.Bias.PULL_UP),
},
)
self.cs_pin = cs_pin
self.rst_pin = rst_pin
self.drdy_pin = drdy_pin
self.reset()
if self._read_chip_id() == 3: # noqa: PLR2004 TODO: this seems specific. Isn't this different for each chip?
log.info("ID Read successful")
else:
log.warning("ID Read failed")
self.config_adc(default_gain, default_drate)
self._init_done = True
# Hardware reset
[docs]
def reset(self) -> None:
"""Reset the chip by pulsing the reset pin."""
self.gpios.set_value(self.rst_pin, gpiod.line.Value.ACTIVE)
time.sleep(200 / 1000)
self.gpios.set_value(self.rst_pin, gpiod.line.Value.INACTIVE)
time.sleep(200 / 1000)
self.gpios.set_value(self.rst_pin, gpiod.line.Value.ACTIVE)
def _write_cmd(self, cmd: ADS1256_CMD) -> None:
"""Run a command on the chip."""
self.gpios.set_value(self.cs_pin, gpiod.line.Value.INACTIVE) # cs 0
self._spi.writebytes([cmd.value])
self.gpios.set_value(self.cs_pin, gpiod.line.Value.ACTIVE) # cs 1
def _write_reg(self, reg: ADS1256_REG, data: int) -> None:
"""Write data to a register."""
self.gpios.set_value(self.cs_pin, gpiod.line.Value.INACTIVE) # cs 0
self._spi.writebytes([ADS1256_CMD.WREG.value | reg.value, 0x00, data])
self.gpios.set_value(self.cs_pin, gpiod.line.Value.ACTIVE) # cs 1
def _read_data(self, reg: ADS1256_REG) -> list:
"""Read data from a register."""
self.gpios.set_value(self.cs_pin, gpiod.line.Value.INACTIVE) # cs 0
self._spi.writebytes([ADS1256_CMD.RREG.value | reg.value, 0x00])
data = self._spi.readbytes(1)
self.gpios.set_value(self.cs_pin, gpiod.line.Value.ACTIVE) # cs 1
return data
def _wait_drdy(self) -> None:
"""Wait for the DRDY (Data Ready) pin to be low."""
for _ in range(0, 400000, 1): # TODO: With this, the timeout is execution speed dependent, needs fixing!
if self.gpios.get_value(self.drdy_pin) == gpiod.line.Value.INACTIVE:
break
else:
log.warning("Time out when waiting for data ready...")
# TODO: maybe include a return value. Calling functions can not know if data is ready or not.
def _read_chip_id(self) -> int:
self._wait_drdy()
status = self._read_data(ADS1256_REG.STATUS)
return status[0] >> 4
# The configuration parameters of ADC, gain and data rate
[docs]
def config_adc(self, gain: ADS1256_GAIN, drate: ADS1256_DRATE) -> None:
"""Configure the gain and datarate of the ADC."""
self._wait_drdy()
buf = [0, 0, 0, 0, 0, 0, 0, 0]
buf[0] = (0 << 3) | (1 << 2) | (0 << 1)
buf[1] = 0x08
buf[2] = (0 << 5) | (0 << 3) | (gain.value << 0)
buf[3] = drate.value
self.gpios.set_value(self.cs_pin, gpiod.line.Value.INACTIVE) # cs 0
self._spi.writebytes([ADS1256_CMD.WREG.value | 0, 0x03])
self._spi.writebytes(buf)
self.gpios.set_value(self.cs_pin, gpiod.line.Value.ACTIVE) # cs 1
time.sleep(1 / 1000)
[docs]
def self_calibration(self) -> None:
"""Run offset and gain self calibration."""
self._write_cmd(ADS1256_CMD.SELFCAL)
self._wait_drdy()
[docs]
def set_channel(self, pos_channel: ADS1256_CHANNEL, neg_channel: ADS1256_CHANNEL = ADS1256_CHANNEL.AINCOM) -> None:
"""Set channel of the input multiplexer.
The multiplexer performs a differential measurement between `pos_channel` and `neg_channel`. Details can be
found in the datasheet under "Input Multiplexer".
Args:
pos_channel: Positive channel.
neg_channel: Negative channel. Defaults to AINCOM (non-differential measurement).
"""
self._write_reg(ADS1256_REG.MUX, (pos_channel.value << 4) | neg_channel.value)
[docs]
def set_scan_mode(self, mode): # noqa: ANN201 ANN001 D102
# TODO: seems to do nothing...
self.scan_mode = mode
[docs]
def read_adc_data(self) -> int:
"""Read ADC value from the currently active channel in bits."""
self._wait_drdy()
self.gpios.set_value(self.cs_pin, gpiod.line.Value.INACTIVE) # cs 0
self._spi.writebytes([ADS1256_CMD.RDATA.value])
delay_us(10)
buf = self._spi.readbytes(3)
self.gpios.set_value(self.cs_pin, gpiod.line.Value.ACTIVE) # cs 1
return int.from_bytes(buf, "big", signed=True)
[docs]
def get_channel_value(
self, pos_channel: ADS1256_CHANNEL, neg_channel: ADS1256_CHANNEL = ADS1256_CHANNEL.AINCOM
) -> int:
"""Read ADC value from the given channel in bits either single-ended or differential."""
self.set_channel(pos_channel, neg_channel)
self._write_cmd(ADS1256_CMD.SYNC)
delay_us(10)
self._write_cmd(ADS1256_CMD.WAKEUP)
delay_us(10)
return self.read_adc_data()
[docs]
def get_all(self) -> list[int]:
"""Read ADC for all channels subsequently."""
return [self.get_channel_value(channel) for channel in ADS1256_CHANNEL]