Source code for herosdevices.hardware.rigol

"""Rigol Devices."""

import threading
from typing import Any

import numpy as np
import pyvisa
from pyvisa.resources.resource import Resource

from herosdevices.core.templates.oscilloscope import OscilloscopeTemplate
from herosdevices.helper import log, mark_driver

__vendor_name__ = "Rigol Technologies"


[docs] @mark_driver( info="DSA series spectrum analyzer", product_page="https://www.rigol.com/intl/products/spectrum-analyzer.html", state="beta", ) class RigolDSA(OscilloscopeTemplate): """ A class to interface Rigol digital spectrum analyzers. Args: address: Address or IP of the instrument. **kwargs: Additional keyword arguments are passed to :py:class:`herosdevices.core.templates.oscilloscope.OscilloscopeTemplate` Note: Currently, this driver only implements retrieving the current trace and getting the marker positions. """ MAX_MARKERS: int = 4 def __init__( self, address: str, **kwargs, ) -> None: self.address: str = address self.default_config_dict = {} super().__init__(config_dict={"default": {}}, **kwargs) # initialize pyvisa resource manager self._pyvisa_rm: pyvisa.ResourceManager | None = None self.open() def _query(self, cmd: str) -> Any: """ Do device read query. Args: cmd: Query command Returns: Read result """ with self.get_scope() as sa: try: return sa.query(cmd) except pyvisa.errors.VisaIOError as ex: log.exception("Error during query!") self.reset() raise OSError from ex def _write(self, cmd: str) -> None: """ Do device qrite query. Args: cmd: Query command """ with self.get_scope() as sa: try: sa.write(cmd) except pyvisa.errors.VisaIOError as ex: log.exception("Error during query!") self.reset() raise OSError from ex def _start(self) -> bool: log.error("Rigol DSA do not support manual trigger") return False def _stop(self) -> bool: log.error("Rigol DSA do not support manual stopping (yet)") return False def _set_config(self, config: dict) -> bool: # noqa: ARG002 """ Set a config dictionary on the device. Note: Currently, this driver does not implement configuring the device. """ return False def _get_status(self) -> dict: return { "acquisition_running": self.acquisition_running, "idn": self.get_identification(), } def _open(self) -> Resource: log.info(f"Connecting to {self.address}") self._pyvisa_rm = pyvisa.ResourceManager("@py") return self._pyvisa_rm.open_resource(f"TCPIP0::{self.address}::INSTR") def _teardown(self) -> None: try: self._device.close() except pyvisa.errors.Error: log.exception("Error while closind DSA.") try: self._pyvisa_rm.close() except pyvisa.errors.Error: log.exception("Error while closing pyvisa resource manager.") def _acquisition_loop(self) -> None: # get trace as ASCII CSV self._write(":FORM:DATA ASCII") # set ASCII mode raw = self._query(":TRAC:DATA? TRACE1") # remove any leading/trailing whitespace or newlines raw = raw.strip() # if the device adds a leading '#' or count if raw.startswith("#"): # skip SCPI-style length header raw = raw[2 + int(raw[1]) :] # '#' + digit-count + digits # now parse trace = np.fromstring(raw, sep=",") start_freq = float(self._query(":FREQ:STAR?")) stop_freq = float(self._query(":FREQ:STOP?")) metadata = { "start_freq": start_freq, "stop_freq": stop_freq, "frame": 0, } self.acquisition_stopped() self.acquisition_data(trace, metadata) def _arm(self) -> bool: log.debug("Starting acquisition thread") self._stop_acquisition_event.clear() self._acquisition_thread = threading.Thread(target=self._acquisition_loop) self._acquisition_thread.start() return True
[docs] def get_identification(self) -> str: """ Query the device identification. Returns: Device identification string. """ return self._query("*IDN?")
[docs] def get_markers(self) -> dict: """ Query all markers from the Rigol spectrum analyzer. Returns: Dictionary with :code:`{marker_number: (freq_Hz, amp_dBm)}`. """ markers = {} for i in range(1, self.MAX_MARKERS + 1): try: state = int(self._query(f":CALC:MARK{i}:STAT?").strip()) except pyvisa.errors.Error: # in case the instrument doesn't support more markers, break break if state == 1: freq = float(self._query(f":CALC:MARK{i}:X?")) amp = float(self._query(f":CALC:MARK{i}:Y?")) markers[i] = (freq, amp) return markers
def _observable_data(self) -> dict: """ Implement method :meth:`_observable_data` s.t. this class can be used as a :code:`PolledLocalDatasourceHERO`. Returns: Dictionary with marker positions and amplitudes. """ out = {} try: for i, (freq, amp) in self.get_markers().items(): out[f"marker_{i}_frequency"] = (freq, "Hz") out[f"marker_{i}_amplitude"] = (amp, "dBm") except OSError: log.error("Could not query observable_data!") return out