Source code for herosdevices.hardware.rigol
"""Rigol Devices."""
import threading
from typing import Any
import numpy as np
import pyvisa
from pyvisa.resources.resource import Resource
from herosdevices.core.templates.oscilloscope import OscilloscopeTemplate
from herosdevices.helper import log, mark_driver
__vendor_name__ = "Rigol Technologies"
[docs]
@mark_driver(
info="DSA series spectrum analyzer",
product_page="https://www.rigol.com/intl/products/spectrum-analyzer.html",
state="beta",
)
class RigolDSA(OscilloscopeTemplate):
"""
A class to interface Rigol digital spectrum analyzers.
Args:
address: Address or IP of the instrument.
**kwargs: Additional keyword arguments are passed to
:py:class:`herosdevices.core.templates.oscilloscope.OscilloscopeTemplate`
Note:
Currently, this driver only implements retrieving the current trace and getting the marker positions.
"""
MAX_MARKERS: int = 4
def __init__(
self,
address: str,
**kwargs,
) -> None:
self.address: str = address
self.default_config_dict = {}
super().__init__(config_dict={"default": {}}, **kwargs)
# initialize pyvisa resource manager
self._pyvisa_rm: pyvisa.ResourceManager | None = None
self.open()
def _query(self, cmd: str) -> Any:
"""
Do device read query.
Args:
cmd: Query command
Returns:
Read result
"""
with self.get_scope() as sa:
try:
return sa.query(cmd)
except pyvisa.errors.VisaIOError as ex:
log.exception("Error during query!")
self.reset()
raise OSError from ex
def _write(self, cmd: str) -> None:
"""
Do device qrite query.
Args:
cmd: Query command
"""
with self.get_scope() as sa:
try:
sa.write(cmd)
except pyvisa.errors.VisaIOError as ex:
log.exception("Error during query!")
self.reset()
raise OSError from ex
def _start(self) -> bool:
log.error("Rigol DSA do not support manual trigger")
return False
def _stop(self) -> bool:
log.error("Rigol DSA do not support manual stopping (yet)")
return False
def _set_config(self, config: dict) -> bool: # noqa: ARG002
"""
Set a config dictionary on the device.
Note:
Currently, this driver does not implement configuring the device.
"""
return False
def _get_status(self) -> dict:
return {
"acquisition_running": self.acquisition_running,
"idn": self.get_identification(),
}
def _open(self) -> Resource:
log.info(f"Connecting to {self.address}")
self._pyvisa_rm = pyvisa.ResourceManager("@py")
return self._pyvisa_rm.open_resource(f"TCPIP0::{self.address}::INSTR")
def _teardown(self) -> None:
try:
self._device.close()
except pyvisa.errors.Error:
log.exception("Error while closind DSA.")
try:
self._pyvisa_rm.close()
except pyvisa.errors.Error:
log.exception("Error while closing pyvisa resource manager.")
def _acquisition_loop(self) -> None:
# get trace as ASCII CSV
self._write(":FORM:DATA ASCII") # set ASCII mode
raw = self._query(":TRAC:DATA? TRACE1")
# remove any leading/trailing whitespace or newlines
raw = raw.strip()
# if the device adds a leading '#' or count
if raw.startswith("#"):
# skip SCPI-style length header
raw = raw[2 + int(raw[1]) :] # '#' + digit-count + digits
# now parse
trace = np.fromstring(raw, sep=",")
start_freq = float(self._query(":FREQ:STAR?"))
stop_freq = float(self._query(":FREQ:STOP?"))
metadata = {
"start_freq": start_freq,
"stop_freq": stop_freq,
"frame": 0,
}
self.acquisition_stopped()
self.acquisition_data(trace, metadata)
def _arm(self) -> bool:
log.debug("Starting acquisition thread")
self._stop_acquisition_event.clear()
self._acquisition_thread = threading.Thread(target=self._acquisition_loop)
self._acquisition_thread.start()
return True
[docs]
def get_identification(self) -> str:
"""
Query the device identification.
Returns:
Device identification string.
"""
return self._query("*IDN?")
[docs]
def get_markers(self) -> dict:
"""
Query all markers from the Rigol spectrum analyzer.
Returns:
Dictionary with :code:`{marker_number: (freq_Hz, amp_dBm)}`.
"""
markers = {}
for i in range(1, self.MAX_MARKERS + 1):
try:
state = int(self._query(f":CALC:MARK{i}:STAT?").strip())
except pyvisa.errors.Error:
# in case the instrument doesn't support more markers, break
break
if state == 1:
freq = float(self._query(f":CALC:MARK{i}:X?"))
amp = float(self._query(f":CALC:MARK{i}:Y?"))
markers[i] = (freq, amp)
return markers
def _observable_data(self) -> dict:
"""
Implement method :meth:`_observable_data` s.t. this class can be used as a :code:`PolledLocalDatasourceHERO`.
Returns:
Dictionary with marker positions and amplitudes.
"""
out = {}
try:
for i, (freq, amp) in self.get_markers().items():
out[f"marker_{i}_frequency"] = (freq, "Hz")
out[f"marker_{i}_amplitude"] = (amp, "dBm")
except OSError:
log.error("Could not query observable_data!")
return out