Source code for herosdevices.hardware.rohde_schwarz.ngx

"""Device driver for Rohde Schwarz NGX Power Supplies."""

from typing import Any

from herosdevices.core import DeviceCommandQuantity
from herosdevices.core.templates import VisaDeviceTemplate
from herosdevices.helper import add_class_descriptor, get_or_create_dynamic_subclass, mark_driver


[docs] @mark_driver( info="Rohde Schwarz NGX Power Supply", product_page="https://www.rohde-schwarz.com/de/produkte/messtechnik/dc-netzgeraete_64067.html", state="beta", ) class NGX(VisaDeviceTemplate): """Driver for Rohde Schwarz NGX Power Supplies using the VISA interface. The desired channels have to configured using the channel index (starting from 1) and a user-friendly channel name. For each channel, the HERO gains properties `channel_name_` `current`, `voltage`, `power` and `status`. In addition, current and voltage can be set using the property. Querying can also occur for the configured number of channels simultaneously by using the class-level properties. All properties are provided as observable data. Args: resource: VISA resource identifier e.g. `TCPIP::my_device_address::INSTR`. channels: List of tuples of channel index and channel name. keep_alive: Keep VISA connection open. """ _default_observables: tuple[tuple[str, str], ...] = ( ("voltage", "V"), ("current", "A"), ("power", "W"), ("status", ""), ) channels: tuple[tuple[int, str], ...] = ((1, "ch1"),) def __init__( self, resource: str, channels: tuple[tuple[int, str], ...] = ((1, "ch1"),), keep_alive: bool = True, **kwargs ) -> None: VisaDeviceTemplate.__init__(self, resource=resource, keep_alive=keep_alive, **kwargs) self.channels = channels def __new__(cls, channels: tuple[tuple[int, str], ...] = ((1, "ch1"),), *_args, **_kwargs) -> Any: """Create a new NGX instance.""" # get new or cashed subclass new_cls = get_or_create_dynamic_subclass(cls, channels=channels) # global commands (affecting all channels) channel_query_suffix = f"(@{','.join(str(ch_idx) for ch_idx, ch_name in channels)})" # voltage add_class_descriptor( new_cls, "voltage", DeviceCommandQuantity( command_get=f"MEAS:VOLT? {channel_query_suffix}", format_fun=lambda x: x.split(","), dtype=float, unit="V", ), ) # current add_class_descriptor( new_cls, "current", DeviceCommandQuantity( command_get=f"MEAS:CURR? {channel_query_suffix}", format_fun=lambda x: x.split(","), dtype=float, unit="A", ), ) # power add_class_descriptor( new_cls, "power", DeviceCommandQuantity( command_get=f"MEAS:POW? {channel_query_suffix}", format_fun=lambda x: x.split(","), dtype=float, unit="W", ), ) # status add_class_descriptor( new_cls, "status", DeviceCommandQuantity( command_get=f"OUTP? {channel_query_suffix}", format_fun=lambda x: x.split(","), dtype=bool, unit="", ), ) # per channel commands for ch_idx, ch_name in channels: add_class_descriptor( new_cls, f"{ch_name}_voltage", DeviceCommandQuantity( command_get=f"MEAS:VOLT? (@{ch_idx!s})", command_set="SOUR:VOLT {}, " + f"(@{ch_idx!s}); *OPC?", dtype=float, unit="V", return_check="1", ), ) add_class_descriptor( new_cls, f"{ch_name}_current", DeviceCommandQuantity( command_get=f"MEAS:CURR? (@{ch_idx!s})", command_set="SOUR:CURR {}, " + f"(@{ch_idx!s}); *OPC?", dtype=float, unit="A", return_check="1", ), ) add_class_descriptor( new_cls, f"{ch_name}_power", DeviceCommandQuantity( command_get=f"MEAS:POW? (@{ch_idx!s})", dtype=float, unit="W", return_check="1", ), ) add_class_descriptor( new_cls, f"{ch_name}_status", DeviceCommandQuantity( command_get=f"OUTP? (@{ch_idx!s})", dtype=bool, unit="", return_check="1", ), ) return object.__new__(new_cls) def _observable_data(self) -> dict: data = {} for query, unit in self._default_observables: for (_ch_idx, ch_name), result in zip(self.channels, getattr(self, query), strict=True): obs_name = f"{ch_name}_{query}" data[obs_name] = (result, unit) return data