Source code for herosdevices.hardware.rohde_schwarz.ngx

"""Device driver for Rohde Schwarz NGX Power Supplies."""

from typing import Any

from herosdevices.core import DeviceCommandQuantity
from herosdevices.core.templates import VisaDeviceTemplate
from herosdevices.helper import add_class_descriptor, get_or_create_dynamic_subclass, mark_driver


[docs] @mark_driver( info="Rohde Schwarz NGX Power Supply", product_page="https://www.rohde-schwarz.com/de/produkte/messtechnik/dc-netzgeraete_64067.html", state="beta", ) class NGX(VisaDeviceTemplate): """A channel of a RS NGX PSU with TCP/IP connection.""" _default_observables: tuple[tuple[str, str], ...] = ( ("voltage", "V"), ("current", "A"), ("power", "W"), ("state", ""), ) channels: tuple[tuple[int, str], ...] = ((1, "ch1"),) def __init__( self, resource: str, channels: tuple[tuple[int, str], ...] = ((1, "ch1"),), keep_alive: bool = True, **kwargs ) -> None: VisaDeviceTemplate.__init__(self, resource=resource, keep_alive=keep_alive, **kwargs) self.channels = channels def __new__(cls, channels: tuple[tuple[int, str], ...] = ((1, "ch1"),), *_args, **_kwargs) -> Any: """Create a new NGX instance.""" # get new or cashed subclass new_cls = get_or_create_dynamic_subclass(cls, channels=channels) # global commands (affecting all channels) channel_query_suffix = f"(@{','.join(str(ch_idx) for ch_idx, ch_name in channels)})" # voltage add_class_descriptor( new_cls, "voltage", DeviceCommandQuantity( command_get=f"MEAS:VOLT? {channel_query_suffix}", format_fun=lambda x: x.split(","), dtype=float, unit="V", ), ) # current add_class_descriptor( new_cls, "current", DeviceCommandQuantity( command_get=f"MEAS:CURR? {channel_query_suffix}", format_fun=lambda x: x.split(","), dtype=float, unit="V", ), ) # power add_class_descriptor( new_cls, "power", DeviceCommandQuantity( command_get=f"MEAS:POW? {channel_query_suffix}", format_fun=lambda x: x.split(","), dtype=float, unit="V", ), ) # status add_class_descriptor( new_cls, "state", DeviceCommandQuantity( command_get=f"OUTP? {channel_query_suffix}", format_fun=lambda x: x.split(","), dtype=bool, unit="V", ), ) # per channel commands for ch_idx, ch_name in channels: attr_name = f"{ch_name}_voltage" add_class_descriptor( new_cls, attr_name, DeviceCommandQuantity( command_get=f"MEAS:VOLT? (@{ch_idx!s})", command_set="SOUR:VOLT {}, " + f"(@{ch_idx!s}); *OPC?", dtype=float, unit="V", return_check="1", ), ) attr_name = f"{ch_name}_current" add_class_descriptor( new_cls, attr_name, DeviceCommandQuantity( command_get=f"MEAS:CURR? (@{ch_idx!s})", command_set="SOUR:CURR {}, " + f"(@{ch_idx!s}); *OPC?", dtype=float, unit="A", return_check="1", ), ) return object.__new__(new_cls) def _observable_data(self) -> dict: data = {} for query, unit in self._default_observables: for (_ch_idx, ch_name), result in zip(self.channels, getattr(self, query), strict=True): obs_name = f"{ch_name}_{query}" data[obs_name] = (result, unit) return data