Source code for herosdevices.core.bus.onewire
"""This module provides a class for managing OneWire connections."""
from collections.abc import Callable
from pathlib import Path
from herosdevices.helper import log
[docs]
class OneWire:
"""A read-only Onewire driver that relies on the Linux kernel w1 driver.
Linux exposes onewire devices in sysfs.
This driver can read the sysfs files as they are specified in the :param sensors: list.
"""
# the _obervables describe which quantities can be extracted from the onewire device. The list entries have the
# form (name_of_observable, conversion_function, unit). The "name_of_observable" must be an endpoint in the sysfs
# directory of the onewire device.
_observables: list[tuple[str, Callable, str]] = []
def __init__(self, device_id: str, sysfs_path: str = "/sys/bus/w1/") -> None:
"""
Initialize a onewire connection to a device.
Args:
device_id: id of the onewire device as named by the w1 Linux kernel driver.
sysfs_path: sys path of the Linux w1 kernel driver
"""
self.sysfs_path = Path(sysfs_path) / "devices" / device_id
if not Path(self.sysfs_path).exists():
log.error(f"Can not access Linux w1 sysfs path {sysfs_path}")
def _observable_data(self) -> dict[str, tuple[float, str]]:
try:
return {
observable: (conversion(self._read_content(self.sysfs_path / observable)), unit)
for observable, conversion, unit in self._observables
}
except ValueError:
return {}
def _read_content(self, path: Path) -> str | None:
"""Read Contents of :param path: (str) filename."""
try:
with path.open("r") as file:
return "".join(file.readlines())
except (OSError, TypeError, ValueError):
log.warning(f"Could not read file {path}")
return None