"""Drivers for Megatec-compatible UPS devices."""
from herosdevices.core import DeviceCommandQuantity, SubQuantity
from herosdevices.core.templates import SerialDeviceTemplate as SerialDevice
from herosdevices.helper import log, mark_driver
__vendor_name__ = "Megatec"
DEFAULT_OBSERVABLES = {
"load_percentage": {"name": "load_percentage", "unit": "%"},
"temperature": {"name": "temperature", "unit": "°C"},
"battery_low": {"name": "battery_low", "unit": ""},
"bypass_active": {"name": "bypass_active", "unit": ""},
}
SERIAL_UPS_STATUS_FORMAT = [
"voltage_input",
"voltage_input_fail",
"voltage_output",
"load_percentage",
"input_frequency",
"battery_voltage",
"temperature",
"utility_fail",
"battery_low",
"bypass_active",
"ups_failed",
"ups_standby",
"test_in_progress",
"shutdown_active",
"beeper_on",
]
[docs]
def split_status_reply(status_str: str) -> dict[str, float | int]:
"""
Parse Megatec status reply string into components.
Format: (MMM.M NNN.N PPP.P QQQ RR.R S.SS TT.T b7b6b5b4b3b2b1b0<cr>)
Returns:
Flat list of values in order SERIAL_UPS_STATUS_FORMAT
"""
# Split the status string into components
parts = status_str.strip("()\r\n").split()
if len(parts) != 8: # noqa: PLR2004
msg = f"Expected 8 bit of status fields, got {len(parts)}"
raise ValueError(msg)
# Parse numeric fields
result = {}
for i in range(7):
if i == 3: # noqa:PLR2004 load_percentage should be int
result[SERIAL_UPS_STATUS_FORMAT[i]] = int(float(parts[i]))
else:
result[SERIAL_UPS_STATUS_FORMAT[i]] = float(parts[i])
# Parse status bits - convert binary string to individual integers
status_bits_str = parts[7]
if len(status_bits_str) != 8 or not all(c in "01" for c in status_bits_str): # noqa: PLR2004
msg = f"Invalid status bits format: {status_bits_str}"
raise ValueError(msg)
# Add each bit as a separate entry (from b7 to b0)
result |= {SERIAL_UPS_STATUS_FORMAT[i + 7]: int(bit) for i, bit in enumerate(status_bits_str)}
return result
[docs]
@mark_driver(
state="stable",
name="UPS (Serial)",
info="UPS with Megatec Serial Adapter",
product_page="https://www.megatec.com.tw/allproducts/",
)
class SerialUPS(SerialDevice):
"""Representation of Megatec based uninterruptible power supplies.
Several UPS vendors use the Megatec based controller card. Among them are:
- AdPoS
- Green Cell
Important:
The devices are quite picky concerning the serial to USB chip. It is known to work with a ``Prolific PL2303``
chip. Others might work but it is not guaranteed.
Important:
Some devices seem to have an incorrectly grounded serial interface, generating massive noise on the output
line. Check for noise on the output power sockets before installing permanently!
A documentation of the full protocol can be found `here <https://networkupstools.org/protocols/megatec.html>`_.
"""
_status_dict: list = DeviceCommandQuantity(command_get="Q1", format_fun=split_status_reply)
voltage_input: float = SubQuantity(parent="_status_dict", unit="V")
voltage_input_fail: float = SubQuantity(parent="_status_dict", unit="V")
voltage_output: float = SubQuantity(parent="_status_dict", unit="V")
load_percentage: int = SubQuantity(parent="_status_dict", unit="%")
input_frequency: float = SubQuantity(parent="_status_dict", unit="Hz")
battery_voltage: float = SubQuantity(parent="_status_dict", unit="V")
temperature: float = SubQuantity(parent="_status_dict", unit="°C")
utility_fail: int = SubQuantity(parent="_status_dict")
battery_low: int = SubQuantity(parent="_status_dict")
bypass_active: int = SubQuantity(parent="_status_dict")
ups_failed: int = SubQuantity(parent="_status_dict")
ups_standby: int = SubQuantity(parent="_status_dict")
test_in_progress: int = SubQuantity(parent="_status_dict")
shutdown_active: int = SubQuantity(parent="_status_dict")
beeper_on: int = SubQuantity(parent="_status_dict")
observables: dict
def __init__(self, address: str, timeout: float = 1.0, observables: dict | None = None) -> None:
"""Initialize device driver object.
Args:
address: Serial port address of the target device. Example:"/dev/ttyUSB0"
timeout: Timeout for serial communication.
observables: Dictionary of attributes which should be exposed at datasource :py:meth:`_observable_data`
events. Of the form {"attribute_name": {"name": "display_name", "unit": "unit"}}.
"""
SerialDevice.__init__(
self,
address,
baudrate=2400,
timeout=timeout,
delays={"read_echo": 0.05},
read_line_termination=b"\r",
write_line_termination=b"\r",
)
self.observables = observables if observables is not None else DEFAULT_OBSERVABLES
[docs]
def run_test(self, test_length: str | int = "short") -> None:
"""Run a self test.
Args:
test_length: Length of the test to run. Can be given in minutes (integer, 1-99) or as one of ``"short"`` or
``"low"``. ``"short"`` runs a 10 second test, ``"low"` runs a test until the battery is low.
"""
if isinstance(test_length, int):
if test_length < 1 or test_length > 99: # noqa: PLR2004
log.error("Invalid test length: %s. Must be between 1 and 99", test_length)
else:
self.connection.write(f"T{test_length:02d}")
elif test_length == "short":
self.connection.write("T")
elif test_length == "low":
self.connection.write("TL")
else:
log.error("Invalid test length: %s. 'short', 'low' or integer (minutes) expected", test_length)
def _observable_data(self) -> dict:
data = {}
for attr, description in self.observables.items():
data[description["name"]] = (getattr(self, attr), description["unit"])
return data