"""Includes core functionalities relevant to all/many hardware driver implementations."""
import time
from collections.abc import Callable
from typing import Any, Generic, TypeVar, overload
from heros.helper import log
FieldType = TypeVar("FieldType")
def any__new__(cls, *_args, **_kwargs) -> Any: # noqa: ANN001
"""Monkey patch for https://github.com/python/cpython/pull/117111.
Debian and Ubuntu (and possibly other distros with a slow release cycle) ship a version of python with the
aforementioned bug.
"""
if cls is Any:
raise TypeError("Any cannot be instantiated")
return object.__new__(cls)
Any.__new__ = any__new__
del any__new__
[docs]
class DeviceCommandQuantity(Any, Generic[FieldType]):
"""
Descriptor for attaching getting/setting configuration of hardware directly to class attributes exposed to HEROs.
This class provides functionality to define a class attribute of the host object based on certain set and get
commands of a device on a given interface. Defining an attribute this way makes it directly accessible to HEROS.
args:
command_set: Command to send to the remote device to set the quantity. Must include a single input
placeholder in f-string format for the value to be set.
command_get: Command to send to the remote device to get the quantity.
return_check: Return value to check for success
unit: Unit of the quantity.
dtype: Data type of the quantity values.
format_fun: Function to format the raw device return value to obtain the quantity in the correct unit.
For example if the device returns a complicated string, this function could use a regex to extract
the target value.
value_check_fun: Function to check if a set value is valid. Can be used in combination with for example
`:py:func:`herosdevices.core.utils.limits` to check if the value is within a certain range.
poll_interval_limit: When getting the value of this quantity, the value is only read from the device if the
last read operation was longer ago than the value of :code:`poll_interval_limit`. If it is shorter, the
cached value is returned.
read_line: If true, the value is read from the device as a single line until the line termination set in the
device connection occurs. If false, all waiting data is read, this is typically slower as one needs a
longer delay, however it must be used if the return value is multiline.
Info:
The host instance must provide :code:`read()->str` and
:code:`write(message: str, read_echo: bool) -> None | str` methods. For an example implementation see
:py:class:`herosdevices.core.templates.serial.SerialDeviceTemplate`.
Warning:
This mechanism stores values in the device object with the name :code:`instance._{attr_name}` and
:code:`_{attr_name}_last_poll`, where `attr_name` is the class attribute name (`frequency` in the example
below). This means you can not implement attributes with these names in you device driver class.
Example:
.. code-block:: python
class SomeRFSource(RFSource):
frequency = DeviceCommandQuantity(
command_set="f{:.3f}",
command_get="f?",
dtype=float,
unit="base",
value_check_fun=limits(12.5e6, 5.4e9),
transform_fun=transform_unit("base", "MHz"),
format_fun=lambda x: float(x)*1e-3,
) # Frequency in Hz
"""
def __init__(
self,
command_set: str | None = None,
command_get: str | None = None,
return_check: None | str = None,
unit: str = "",
dtype: type[FieldType] | None = None,
format_fun: Callable[[str], Any] = lambda x: x.rstrip(),
value_check_fun: Callable[[Any], bool | str] = lambda _: True,
poll_interval_limit: float = 1.0,
transform_fun: Callable[[Any, bool], Any] = lambda x, _=False: x,
read_line: bool = True,
) -> None:
self.command_set = command_set
self.command_get = command_get
self.return_check = return_check
self.unit = unit
self.dtype = dtype
self.format_fun = format_fun
self.value_check_fun = value_check_fun
self.poll_interval_limit = poll_interval_limit
self.transform_fun = transform_fun
self.read_line = read_line
def __set_name__(self, owner, name: str) -> None: # noqa: ANN001
"""Make the DeviceCommandQuantity instance aware of its parent class."""
self.name = name
self.owner = owner
def __set__(self, instance, value: FieldType) -> None: # noqa: ANN001
"""Set the value on the physical device. Involves sending and receiving data to/from the device."""
if (check_return := self.value_check_fun(value)) is not True:
msg = f"Value {value} {self.unit} is not valid for {self.name}: {check_return}"
raise ValueError(msg)
if self.command_set:
instance.connection.write(self.command_set.format(self.transform_fun(value, False)))
else:
log.error(
"Attribute %s on device %s is not settable",
self.name,
self.owner.__name__,
)
if self.return_check:
re = instance.connection.read()
if re != self.return_check:
log.error(
"Device %s returned error %s when setting %s",
self.owner.__name__,
re,
self.name,
)
else:
setattr(instance, f"_{self.name}_last_poll", time.time())
setattr(instance, f"_{self.name}", value)
@overload
def __get__(self, instance: None, owner: type) -> "DeviceCommandQuantity[FieldType]": ...
@overload
def __get__(self, instance: object, owner: type) -> FieldType: ...
def __get__(self, instance, owner):
"""Read the value from the physical device. Involves sending and receiving data to/from the device.
Values are cached and fast subsequent reads return the same value without contacting the device.
"""
if instance is None:
return self
# read from device
if self.command_get:
if hasattr(instance, f"_{self.name}_last_poll"):
if time.time() - getattr(instance, f"_{self.name}_last_poll") < self.poll_interval_limit:
return getattr(instance, f"_{self.name}")
restring = instance.connection.write(self.command_get, read_echo=True, read_line=self.read_line)
if type(restring) is not bool:
restring = self.format_fun(restring)
if self.dtype is not None:
try:
restring = self.dtype(restring)
except ValueError as e:
log.warning("%s while reading from %s", e, self.name)
restring = None
setattr(instance, f"_{self.name}", self.transform_fun(restring, True))
setattr(instance, f"_{self.name}_last_poll", time.time())
return getattr(instance, f"_{self.name}")
return None