Source code for herosdevices.hardware.rigol

"""Rigol Devices."""

import threading
from typing import Any

import numpy as np
import pyvisa
from pyvisa.resources.resource import Resource

from herosdevices.core.templates.oscilloscope import OscilloscopeTemplate
from herosdevices.helper import log


[docs] class RigolDSA(OscilloscopeTemplate): """ A class to interface Rigol digital spectrum analyzers. Args: address: Address or IP of the instrument. **kwargs: Additional keyword arguments are passed to :py:class:`herosdevices.core.templates.oscilloscope.OscilloscopeTemplate` Note: Currently, this driver only implements retrieving the current trace and getting the marker positions. """ MAX_MARKERS: int = 4 def __init__( self, address: str, **kwargs, ) -> None: self.address: str = address self.default_config_dict = {} super().__init__(config_dict={"default": {}}, **kwargs) # initialize pyvisa resource manager self._pyvisa_rm: pyvisa.ResourceManager | None = None self.open() def _query(self, cmd: str) -> Any: """ Do device read query. Args: cmd: Query command Returns: Read result """ with self.get_scope() as sa: try: return sa.query(cmd) except pyvisa.errors.VisaIOError as ex: log.exception("Error during query!") self.reset() raise OSError from ex def _write(self, cmd: str) -> None: """ Do device qrite query. Args: cmd: Query command """ with self.get_scope() as sa: try: sa.write(cmd) except pyvisa.errors.VisaIOError as ex: log.exception("Error during query!") self.reset() raise OSError from ex def _start(self) -> bool: log.error("Rigol DSA do not support manual trigger") return False def _stop(self) -> bool: log.error("Rigol DSA do not support manual stopping (yet)") return False def _set_config(self, config: dict) -> bool: # noqa: ARG002 """ Set a config dictionary on the device. Note: Currently, this driver does not implement configuring the device. """ return False def _get_status(self) -> dict: return { "acquisition_running": self.acquisition_running, "idn": self.get_identification(), } def _open(self) -> Resource: log.info(f"Connecting to {self.address}") self._pyvisa_rm = pyvisa.ResourceManager("@py") return self._pyvisa_rm.open_resource(f"TCPIP0::{self.address}::INSTR") def _teardown(self) -> None: try: self._device.close() except pyvisa.errors.Error: log.exception("Error while closind DSA.") try: self._pyvisa_rm.close() except pyvisa.errors.Error: log.exception("Error while closing pyvisa resource manager.") def _acquisition_loop(self) -> None: # get trace as ASCII CSV self._write(":FORM:DATA ASCII") # set ASCII mode raw = self._query(":TRAC:DATA? TRACE1") # remove any leading/trailing whitespace or newlines raw = raw.strip() # if the device adds a leading '#' or count if raw.startswith("#"): # skip SCPI-style length header raw = raw[2 + int(raw[1]) :] # '#' + digit-count + digits # now parse trace = np.fromstring(raw, sep=",") start_freq = float(self._query(":FREQ:STAR?")) stop_freq = float(self._query(":FREQ:STOP?")) metadata = { "start_freq": start_freq, "stop_freq": stop_freq, "frame": 0, } self.acquisition_stopped() self.acquisition_data(trace, metadata) def _arm(self) -> bool: log.debug("Starting acquisition thread") self._stop_acquisition_event.clear() self._acquisition_thread = threading.Thread(target=self._acquisition_loop) self._acquisition_thread.start() return True
[docs] def get_identification(self) -> str: """ Query the device identification. Returns: Device identification string. """ return self._query("*IDN?")
[docs] def get_markers(self) -> dict: """ Query all markers from the Rigol spectrum analyzer. Returns: Dictionary with :code:`{marker_number: (freq_Hz, amp_dBm)}`. """ markers = {} for i in range(1, self.MAX_MARKERS + 1): try: state = int(self._query(f":CALC:MARK{i}:STAT?").strip()) except pyvisa.errors.Error: # in case the instrument doesn't support more markers, break break if state == 1: freq = float(self._query(f":CALC:MARK{i}:X?")) amp = float(self._query(f":CALC:MARK{i}:Y?")) markers[i] = (freq, amp) return markers
def _observable_data(self) -> dict: """ Implement method :meth:`_observable_data` s.t. this class can be used as a :code:`PolledLocalDatasourceHERO`. Returns: Dictionary with marker positions and amplitudes. """ out = {} try: for i, (freq, amp) in self.get_markers().items(): out[f"marker_{i}_frequency"] = (freq, "Hz") out[f"marker_{i}_amplitude"] = (amp, "dBm") except OSError: log.error("Could not query observable_data!") return out