Source code for herosdevices.hardware.megatec

"""Drivers for Megatec-compatible UPS devices."""

from herosdevices.core import DeviceCommandQuantity, SubQuantity
from herosdevices.core.templates import SerialDeviceTemplate as SerialDevice
from herosdevices.helper import log, mark_driver

__vendor_name__ = "Megatec"

DEFAULT_OBSERVABLES = {
    "load_percentage": {"name": "load_percentage", "unit": "%"},
    "temperature": {"name": "temperature", "unit": "°C"},
    "battery_low": {"name": "battery_low", "unit": ""},
    "bypass_active": {"name": "bypass_active", "unit": ""},
}

SERIAL_UPS_STATUS_FORMAT = [
    "voltage_input",
    "voltage_input_fail",
    "voltage_output",
    "load_percentage",
    "input_frequency",
    "battery_voltage",
    "temperature",
    "utility_fail",
    "battery_low",
    "bypass_active",
    "ups_failed",
    "ups_standby",
    "test_in_progress",
    "shutdown_active",
    "beeper_on",
]


[docs] def split_status_reply(status_str: str) -> dict[str, float | int]: """ Parse Megatec status reply string into components. Format: (MMM.M NNN.N PPP.P QQQ RR.R S.SS TT.T b7b6b5b4b3b2b1b0<cr>) Returns: Flat list of values in order SERIAL_UPS_STATUS_FORMAT """ # Split the status string into components parts = status_str.strip("()\r\n").split() if len(parts) != 8: # noqa: PLR2004 msg = f"Expected 8 bit of status fields, got {len(parts)}" raise ValueError(msg) # Parse numeric fields result = {} for i in range(7): if i == 3: # noqa:PLR2004 load_percentage should be int result[SERIAL_UPS_STATUS_FORMAT[i]] = int(float(parts[i])) else: result[SERIAL_UPS_STATUS_FORMAT[i]] = float(parts[i]) # Parse status bits - convert binary string to individual integers status_bits_str = parts[7] if len(status_bits_str) != 8 or not all(c in "01" for c in status_bits_str): # noqa: PLR2004 msg = f"Invalid status bits format: {status_bits_str}" raise ValueError(msg) # Add each bit as a separate entry (from b7 to b0) result |= {SERIAL_UPS_STATUS_FORMAT[i + 7]: int(bit) for i, bit in enumerate(status_bits_str)} return result
[docs] @mark_driver( state="stable", name="UPS (Serial)", info="UPS with Megatec Serial Adapter", product_page="https://www.megatec.com.tw/allproducts/", ) class SerialUPS(SerialDevice): """Representation of Megatec based uninterruptible power supplies. Several UPS vendors use the Megatec based controller card. Among them are: - AdPoS - Green Cell Important: The devices are quite picky concerning the serial to USB chip. It is known to work with a ``Prolific PL2303`` chip. Others might work but it is not guaranteed. Important: Some devices seem to have an incorrectly grounded serial interface, generating massive noise on the output line. Check for noise on the output power sockets before installing permanently! A documentation of the full protocol can be found `here <https://networkupstools.org/protocols/megatec.html>`_. """ _status_dict: list = DeviceCommandQuantity(command_get="Q1", format_fun=split_status_reply) voltage_input: float = SubQuantity(parent="_status_dict", unit="V") voltage_input_fail: float = SubQuantity(parent="_status_dict", unit="V") voltage_output: float = SubQuantity(parent="_status_dict", unit="V") load_percentage: int = SubQuantity(parent="_status_dict", unit="%") input_frequency: float = SubQuantity(parent="_status_dict", unit="Hz") battery_voltage: float = SubQuantity(parent="_status_dict", unit="V") temperature: float = SubQuantity(parent="_status_dict", unit="°C") utility_fail: int = SubQuantity(parent="_status_dict") battery_low: int = SubQuantity(parent="_status_dict") bypass_active: int = SubQuantity(parent="_status_dict") ups_failed: int = SubQuantity(parent="_status_dict") ups_standby: int = SubQuantity(parent="_status_dict") test_in_progress: int = SubQuantity(parent="_status_dict") shutdown_active: int = SubQuantity(parent="_status_dict") beeper_on: int = SubQuantity(parent="_status_dict") observables: dict def __init__(self, address: str, timeout: float = 1.0, observables: dict | None = None) -> None: """Initialize device driver object. Args: address: Serial port address of the target device. Example:"/dev/ttyUSB0" timeout: Timeout for serial communication. observables: Dictionary of attributes which should be exposed at datasource :py:meth:`_observable_data` events. Of the form {"attribute_name": {"name": "display_name", "unit": "unit"}}. """ SerialDevice.__init__( self, address, baudrate=2400, timeout=timeout, delays={"read_echo": 0.05}, read_line_termination=b"\r", write_line_termination=b"\r", ) self.observables = observables if observables is not None else DEFAULT_OBSERVABLES
[docs] def run_test(self, test_length: str | int = "short") -> None: """Run a self test. Args: test_length: Length of the test to run. Can be given in minutes (integer, 1-99) or as one of ``"short"`` or ``"low"``. ``"short"`` runs a 10 second test, ``"low"` runs a test until the battery is low. """ if isinstance(test_length, int): if test_length < 1 or test_length > 99: # noqa: PLR2004 log.error("Invalid test length: %s. Must be between 1 and 99", test_length) else: self.connection.write(f"T{test_length:02d}") elif test_length == "short": self.connection.write("T") elif test_length == "low": self.connection.write("TL") else: log.error("Invalid test length: %s. 'short', 'low' or integer (minutes) expected", test_length)
def _observable_data(self) -> dict: data = {} for attr, description in self.observables.items(): data[description["name"]] = (getattr(self, attr), description["unit"]) return data