Source code for herosdevices.hardware.bristol.wlm

"""Includes classes for controlling Bristol Instruments wavemeters."""

from typing import Any, Literal

import numpy as np

from herosdevices.core import DeviceCommandQuantity, SubQuantity
from herosdevices.core.templates import TelnetDeviceTemplate
from herosdevices.helper import explicit, limits, mark_driver

__vendor_name__ = "Bristol Instruments"

DEFAULT_OBSERVABLES = {
    "wavelength": {"name": "wavelength", "unit": "nm"},
    "power": {"name": "power", "unit": "a.u."},
}


[docs] @mark_driver( name="WLM871 (Ethernet)", info="Laser wavelength meter with ±0.2 ppm accuracy", product_page="https://www.bristol-inst.com/bristol-instruments-products/wavelength-meters-scientific/871-series-pulsed-laser-wavelength-meter/", state="beta", ) class WLM871(TelnetDeviceTemplate): """Controller for the Bristol Instruments 871A Laser Wavelength Meter with **network** interface.""" wavelength = DeviceCommandQuantity( command_get=":MEAS:WAV?", dtype=float, unit="nm", ) frequency = DeviceCommandQuantity( command_get=":MEAS:FREQ?", dtype=float, unit="THz", ) wavenumber = DeviceCommandQuantity( command_get=":MEAS:WNUM?", dtype=float, unit="cm^-1", ) power = DeviceCommandQuantity( command_get=":MEAS:POW?", dtype=float, unit="a.u.", ) _environment_data = DeviceCommandQuantity( command_get=":MEAS:ENV?", dtype=None, unit="", format_fun=lambda x: tuple(float(val.strip().split()[0]) for val in x.split(",") if val.strip()), ) temperature = SubQuantity( parent="_environment_data", index=0, unit="°C", dtype=float, ) pressure = SubQuantity( parent="_environment_data", index=1, unit="mmHg", dtype=float, ) light_type = DeviceCommandQuantity( command_get=":SENS:DET:FUNC?", command_set=":SENS:DET:FUNC {}", dtype=str, unit="", value_check_fun=explicit(["CW", "PULS"]), # codespell:ignore ) auto_exposure = DeviceCommandQuantity( command_get=":SENS:EXPO:AUTO?", command_set=":SENS:EXPO:AUTO {}", dtype=str, unit="", value_check_fun=explicit(["ON", "OFF"]), ) observables: dict def __new__(cls, *_args, pid_module: bool = False, **_kwargs) -> "WLM871": # noqa: C901 """Return a new instance of the the wlm class (with unique hash) and attach optional modules.""" if pid_module: def get_pid_constants(self) -> dict[str, float]: # noqa: ANN001 """Get PID constants (proportional, integral, derivative). Returns: Dictionary with 'proportional', 'integral', 'derivative' keys """ prop_response = self.connection.write(":SENS:PID:LCON:PROP?", read_echo=True, read_line=True) int_response = self.connection.write(":SENS:PID:LCON:INT?", read_echo=True, read_line=True) deriv_response = self.connection.write(":SENS:PID:LCON:DER?", read_echo=True, read_line=True) return { "proportional": float(prop_response.strip()) if isinstance(prop_response, str) else 0.0, "integral": float(int_response.strip()) if isinstance(int_response, str) else 0.0, "derivative": float(deriv_response.strip()) if isinstance(deriv_response, str) else 0.0, } def set_pid_constants( self, # noqa: ANN001 proportional: float | None = None, integral: float | None = None, derivative: float | None = None, ) -> None: """Set PID constants. Arguments that are None are not changed. Args: proportional: Proportional constant (0-50.0) integral: Integral constant (0-50.0) derivative: Derivative constant (0-50.0) """ if proportional is not None: self.connection.write(f":SENS:PID:LCON:PROP {proportional}") if integral is not None: self.connection.write(f":SENS:PID:LCON:INT {integral}") if derivative is not None: self.connection.write(f":SENS:PID:LCON:DER {derivative}") def get_pid_voltage_settings(self) -> dict[str, float]: # noqa: ANN001 """Get PID voltage settings. Returns: Dictionary with voltage settings """ default_response = self.connection.write(":SENS:PID:VOLT:DEF?", read_echo=True, read_line=True) min_response = self.connection.write(":SENS:PID:VOLT:MIN?", read_echo=True, read_line=True) max_response = self.connection.write(":SENS:PID:VOLT:MAX?", read_echo=True, read_line=True) offset_response = self.connection.write(":SENS:PID:VOLT:OFFS?", read_echo=True, read_line=True) scale_response = self.connection.write(":SENS:PID:VOLT:SCAL?", read_echo=True, read_line=True) return { "default": float(default_response.strip()) if isinstance(default_response, str) else 0.0, "minimum": float(min_response.strip()) if isinstance(min_response, str) else 0.0, "maximum": float(max_response.strip()) if isinstance(max_response, str) else 0.0, "offset": float(offset_response.strip()) if isinstance(offset_response, str) else 0.0, "scale": float(scale_response.strip()) if isinstance(scale_response, str) else 0.0, } def set_pid_voltage_settings( self, # noqa: ANN001 default: float | None = None, minimum: float | None = None, maximum: float | None = None, offset: float | None = None, scale: float | None = None, ) -> None: """Set PID voltage settings. Arguments that are None are not changed. Args: default: Default voltage (-5.0 to 5.0) minimum: Minimum voltage (-5.0 to 5.0) maximum: Maximum voltage (-5.0 to 5.0) offset: Voltage offset (-5.0 to 5.0) scale: Voltage scale (-500.0 to 500.0) """ if default is not None: self.connection.write(f":SENS:PID:VOLT:DEF {default}") if minimum is not None: self.connection.write(f":SENS:PID:VOLT:MIN {minimum}") if maximum is not None: self.connection.write(f":SENS:PID:VOLT:MAX {maximum}") if offset is not None: self.connection.write(f":SENS:PID:VOLT:OFFS {offset}") if scale is not None: self.connection.write(f":SENS:PID:VOLT:SCAL {scale}") pid_ns = { "pid_state": DeviceCommandQuantity( command_get=":SENS:PID:STAT?", command_set=":SENS:PID:STAT {}", dtype=str, value_check_fun=explicit(["ON", "OFF"]), ), "pid_setpoint": DeviceCommandQuantity( command_get=":SENS:PID:SPO?", command_set=":SENS:PID:SPO {}", dtype=float, unit="nm", value_check_fun=limits(375, 14000), ), "pid_error": DeviceCommandQuantity( command_get=":SENS:PID:ERR?", dtype=float, unit="nm", ), "pid_output": DeviceCommandQuantity( command_get=":SENS:PID:OUT?", dtype=float, unit="V", ), "get_pid_voltage_settings": get_pid_voltage_settings, "set_pid_voltage_settings": set_pid_voltage_settings, "get_pid_constants": get_pid_constants, "set_pid_constants": set_pid_constants, } return super().__new__(type(f"{cls.__name__}_pid", (cls,), pid_ns)) return super().__new__(cls) def __init__(self, address: str, *args, **kwargs) -> None: """Initialize device driver object. Args: address: IP address of the target device. ``args`` and ``kwargs`` are passed to the :py:class:`herosdevices.core.templates.TelnetDeviceTemplate` constructor. """ kwargs.pop("pid_module", None) super().__init__(address, *args, port=23, read_line_termination=b"\n", write_line_termination=b"\n", **kwargs) self.observables = DEFAULT_OBSERVABLES
[docs] def set_averaging(self, state: Literal["ON", "OFF"] | bool, count: int | None = None) -> None: """Set averaging state and optionally count. Args: state: Averaging state ('ON', 'OFF', or boolean) count: Number of readings to average (2-128), if None, current set value is not changes """ state_str = "ON" if (isinstance(state, bool) and state) or state == "ON" else "OFF" self.connection.write(f":SENS:AVER:STAT {state_str}") if count is not None: self.connection.write(f":SENS:AVER:COUN {count}")
[docs] def get_averaging(self) -> dict[str, Any]: """Get current averaging settings. Returns: Dictionary with 'state' (str) and 'count' (int) """ state_response = self.connection.write(":SENS:AVER:STAT?", read_echo=True, read_line=True) count_response = self.connection.write(":SENS:AVER:COUN?", read_echo=True, read_line=True) return { "state": state_response.strip() if isinstance(state_response, str) else "UNKNOWN", "count": int(count_response.strip()) if isinstance(count_response, str) else 0, }
[docs] def get_averaged_data(self, data_type: Literal["POW", "FREQ", "WAV", "WNUM"] = "WAV") -> float: """Get averaged data for the specified type. Args: data_type: Type of data to get ('POW', 'FREQ', 'WAV', 'WNUM') Returns: Averaged value as float """ valid_types = ["POW", "FREQ", "WAV", "WNUM"] if data_type not in valid_types: msg = f"data_type must be one of {valid_types}" raise ValueError(msg) response = self.connection.write(f":SENS:AVER:DATA? {data_type}", read_echo=True, read_line=True) if isinstance(response, str): return float(response.strip()) return 0.0
[docs] def calibrate(self) -> None: """Initiate instrument calibration.""" self.connection.write(":SENS:CALI")
[docs] def get_calibration_settings(self) -> dict[str, Any]: """Get all calibration settings. Returns: Dictionary with calibration method, temperature, and timer settings """ # Get individual settings method_response = self.connection.write(":SENS:CALI:METH?", read_echo=True, read_line=True) temp_response = self.connection.write(":SENS:CALI:TEMP?", read_echo=True, read_line=True) timer_response = self.connection.write(":SENS:CALI:TIM?", read_echo=True, read_line=True) return { "method": method_response.strip() if isinstance(method_response, str) else "UNKNOWN", "temperature_change": int(temp_response.strip()) if isinstance(temp_response, str) else 0, "timer_minutes": int(timer_response.strip()) if isinstance(timer_response, str) else 0, }
[docs] def set_calibration_settings( self, method: Literal["OFF", "TIME", "TEMP"] | None = None, temperature_change: int | None = None, timer_minutes: int | None = None, ) -> None: """Set calibration settings at once. Settings not specified or None are not changed. Args: method: Calibration method ('OFF', 'TIME', 'TEMP') temperature_change: Temperature change in 1/10th °C (1-50) timer_minutes: Time between calibrations in minutes (5-1440) """ if method is not None: valid_methods = ["OFF", "TIME", "TEMP"] if method not in valid_methods: msg = f"method must be one of {valid_methods}" raise ValueError(msg) self.connection.write(f":SENS:CALI:METH {method}") if temperature_change is not None: self.connection.write(f":SENS:CALI:TEMP {temperature_change}") if timer_minutes is not None: self.connection.write(f":SENS:CALI:TIM {timer_minutes}")
[docs] def memory_clear(self) -> None: """Clear all measurements in the memory buffer.""" self.connection.write(":MMEM:INIT")
[docs] def memory_start_record(self) -> None: """Start writing measured data to the memory buffer.""" self.connection.write(":MMEM:OPEN")
[docs] def memory_stop_record(self) -> None: """Stop writing measured data to the memory buffer.""" self.connection.write(":MMEM:CLOS") # codespell:ignore
[docs] def memory_get_data(self) -> np.ndarray: """Get buffered data from memory. Returns: Buffered data as numpy array (format defined in manual) """ response = self.connection.write(":MMEM:DATA?", read_echo=True, read_line=False) if isinstance(response, str): # Convert the string data to numpy array # The exact parsing depends on the data format, but this is a placeholder try: # Try to parse as comma-separated values data_values = [float(x) for x in response.strip().split(",")] return np.array(data_values) except ValueError: # If that fails, return as byte array return np.frombuffer(response.encode(), dtype=np.uint8) return np.array([])
[docs] def get_identity(self) -> str: """Get instrument identification. Returns: Instrument identification string """ response = self.connection.write("*IDN?", read_echo=True, read_line=True) return response.strip() if isinstance(response, str) else "UNKNOWN"
[docs] def get_error(self) -> tuple[int, str]: """Get error from error queue. Returns: Tuple of (error_number, error_message) """ response = self.connection.write(":SYST:ERR?", read_echo=True, read_line=True) if isinstance(response, str): parts = response.strip().split(",", 1) if len(parts) == 2: # noqa: PLR2004 return int(parts[0].strip()), parts[1].strip() return 0, "No error"
[docs] def save_settings(self) -> None: """Save current instrument settings.""" self.connection.write("*SAV") # codespell:ignore
[docs] def recall_settings(self) -> None: """Recall saved instrument settings.""" self.connection.write("*RCL")
[docs] def reset_instrument(self) -> None: """Reset instrument to default state.""" self.connection.write("*RST")