Source code for herosdevices.hardware.santec.slm200

"""HERO driver and functions to control a Santec SLM."""

import logging
import struct
import sys
import time
from typing import Any

import numpy as np
import numpy.typing as npt

from herosdevices.core.templates import DisplayDeviceTemplate
from herosdevices.helper import log, mark_driver

try:
    import ftd3xx

    if sys.platform == "win32":
        import ftd3xx._ftd3xx_win32 as _ft
    elif sys.platform.startswith("linux"):
        import ftd3xx._ftd3xx_linux as _ft
except (ImportError, OSError) as ex:
    msg = "\n".join(
        [
            "Missing ftd3xx python module.",
            "Get it from\n\t>>>https://www.ftdichip.com/Support/SoftwareExamples/FT60X.htm\nand install it.",
            "Make sure to also install D3XX driver for your platform. Find it under:",
            "\t>>>https://www.ftdichip.com/Drivers/D3XX.htm",
            "Import Exception:",
            f"\t{ex}",
        ]
    )
    log.error(msg)


FPGA_STATUS = {
    "OK": "Command successful",
    "BS": "Boot in progress",
    "NG": "Error: Could not execute command",
    "NO RESPONSE": "No answer received from the FPGA (yet)",
}

VIDEO_MODE = {"USB": 0, "DVI": 1}  # DVI Mode is not supported by the driver yet!

# FPGA request types
CMD_CONTROL = {"id": 1, "magic_length": 0xFF}
CMD_STATUSREQUEST = {"id": 2, "magic_length": 0}
CMD_STATUSRESPONSE = {"id": 3, "magic_length": 0xFF}
CMD_IMAGEDATA = {"id": 4, "magic_length": 0x03BF}


def _checksum_image(image: npt.NDArray[int]) -> int:
    """Calculate checksum by interpreting an image as as sequence of UNSIGNED SHORTS (2 Byte) and summing over those.

    Note: for large images, this checksum can easily overrun the range of an int! But this is intended on the SLM200.

    Args:
        image: The image.

    Returns:
        The checksum.
    """
    return int(image.sum())


[docs] @mark_driver(name="SLM-200", info="120Hz Full-HD LCOS SLM", state="beta", requires={"ftd3xx": "ftd3xx"}) class SLM200(DisplayDeviceTemplate): """ Driver for a Santec SLM200 spatial light modulator. The images can be transferred to the SLM200 via USB or DVI. This driver, however, only supports communication via USB. Since image upload via USB takes roughly 150ms, this driver is limited to around 6Hz of refresh rate. Precise and externally synchronized timing is only possible in USB mode. In USB mode (also called memory mode) images can be pushed via USB and stored in one of 128 memory slots. The image slot displayed on the SLM can be set randomly by the driver or an advance from one slot to the next can be triggered by a software trigger, and internal timer, or a logic signal (Trigger IN SMB jack). The SLM contains an FPGA that the computer communicates with by writing to an output FIFO/pipe and reading from an input FIFO/pipe. These two FIFOs are transparently accessed through the FTDI driver (D3xx) which provides methods to read and write these FIFOs/pipes. This driver is reverse engineered by looking at the data the vendor software sends via USB. This was achieved by looking at the API calls to the functions FT_WritePipe and FT_ReadPipe in the D3XX.dll which handles the communication with the FTDI USB3.0 chip (FT601). """ MIN_SLOT: int = 1 MAX_SLOT: int = 128 IMAGE_SIZE: tuple[int] = (1200, 1920) default_config_dict: dict = { "video_mode": VIDEO_MODE["USB"], "trigger_mode": "none", "streaming_mode": False, "request_sleep": 0.015, } firmware_versions = ["2018021001", "2018021101", "2018020001", "2017080002", "2015010001"] trigger_functions = { "none": {"start": None, "stop": None}, "manual": { "start": lambda x: SLM200._trigger_software(x, True), "stop": lambda x: SLM200._trigger_software(x, False), }, "external": { "start": lambda x: SLM200._trigger_external(x, True), "stop": lambda x: SLM200._trigger_external(x, False), }, "auto": {"start": lambda x: SLM200._trigger_auto(x, True), "stop": lambda x: SLM200._trigger_auto(x, False)}, } def __init__( self, serial_number: str, config_dict: dict, default_config: str | None = None, keep_device_open: bool = True, channel: int = 0, ) -> None: self.serial_number = serial_number self.channel = channel if sys.platform.startswith("linux"): self.endpoints = {"out": channel, "in": channel} else: self.endpoints = {"out": 2 + channel, "in": 130 + channel} self.firmware: str | None = None self.streaming_mode: bool = False self.request_sleep: float = 0.015 super().__init__(config_dict, default_config, keep_device_open) @staticmethod def _handle_return_status(status: str) -> bool: """ Check if the return status is OK, else report the status. Args: status: The return status Returns: True if return status is OK, else False """ if status in FPGA_STATUS: if status == "OK": log.spam(FPGA_STATUS[status]) return True log.error(FPGA_STATUS[status]) return False return False @staticmethod def _command_assemble(cmd_dict: dict, payload: bytes, seq_id: int = 0) -> bytes: if cmd_dict["magic_length"] == 0xFF: # noqa:PLR2004 padded_payload = bytearray(1024) padded_payload[: len(payload)] = payload payload = padded_payload return struct.pack(">4sHxBIxxxx", b"SEND", cmd_dict["magic_length"], cmd_dict["id"], seq_id) + payload @staticmethod def _command_disassemble(buffer: bytes) -> dict: _, magic_length, idx = struct.unpack(">4sHxBxxxxxxxx", buffer[0:16]) return {"magic_length": magic_length, "id": idx, "payload": buffer[16:]}
[docs] def write(self, buffer: bytes) -> None: """Stream buffered data to the SLM.""" # enable streaming mode with self.get_device() as d3xx: if self.streaming_mode and sys.platform.startswith("linux"): d3xx.setStreamPipe(self.endpoints["out"], len(buffer)) d3xx.setStreamPipe(self.endpoints["in"], len(buffer)) bytes_written = d3xx.writePipe(self.endpoints["out"], buffer, len(buffer)) assert bytes_written == len(buffer) log.debug(f"OUT({self.endpoints['out']}) {buffer}") # disable streaming mode if self.streaming_mode and sys.platform.startswith("linux"): d3xx.clearStreamPipe(self.endpoints["out"]) d3xx.clearStreamPipe(self.endpoints["in"])
[docs] def write_command(self, cmd_dict: dict, payload: bytes, seq_id: int = 0) -> None: """Send a command to the SLM.""" # TODO: what are arguments doing? self.write(self._command_assemble(cmd_dict, payload, seq_id))
[docs] def read(self, length: int) -> bytes: """Read `length` bytes from the device.""" bytes_read = 0 buffer_read = b"" while bytes_read < length: with self.get_device() as d3xx: output = d3xx.readPipeEx(self.endpoints["in"], length - bytes_read, raw=True) bytes_read += output["bytesTransferred"] buffer_read += output["bytes"].decode("latin1") log.debug("IN(%i) %s", self.endpoints["in"], buffer_read) return buffer_read
[docs] def read_command(self, length: int) -> dict: """ Read a command response from the SLM. Args: length: Length of the response to read in byte """ return self._command_disassemble(self.read(length))
[docs] def request( self, cmd_dict: dict, payload: bytes, # TODO: make this also optional buffer: bytes | None = None, trials: int = 100, request_sleep: float | None = None, ) -> str: """Send a request to the SLM and wait for a response. Args: cmd_dict: Dictionary containing command metadata payload: Payload to send, unused when buffer is used buffer: Pre-assembled buffer to send trials: Number of trials to attempt request_sleep: Sleep time between retries Returns: Status """ if request_sleep is None: request_sleep = self.request_sleep if buffer is None: self.write_command(cmd_dict, payload) else: self.write(buffer) for _ in range(trials): status = self.status() if status != "NO RESPONSE": break time.sleep(request_sleep) # check if system is booting. If it is, wait 1s and try again if status in ("BS", "TO"): log.warning("The SLM is booting. Waiting for 1s and try again!") time.sleep(1) status = self.request(cmd_dict, payload, buffer, trials, request_sleep) return status
def _controlcommand(self, command: bytes, param_list: list[int] | None = None, longrunning: bool = False) -> str: """ Call a get or set function on the SLM depending on whether parameter param is given. Args: command: Two character command identifier for control commands. param_list: Parameters to set. empty, if a read command should be issued. longrunning: Flag if the command runs for a long(er) time. """ payload = command if param_list is not None: for param in param_list: payload += b" %i" % (param) payload += b"\x0d" return self.request(CMD_CONTROL, payload, request_sleep=1 if longrunning else self.request_sleep) def _open(self) -> Any: devices = ftd3xx.listDevices(_ft.FT_OPEN_BY_SERIAL_NUMBER) try: idx = [dev.decode("utf-8") for dev in devices].index(self.serial_number) except Exception as e: msg = f"Device with serial number {self.serial_number} not found" raise RuntimeError(msg) from e d3xx = ftd3xx.create(devices[idx], _ft.FT_OPEN_BY_SERIAL_NUMBER) # set input pipe timeout to 1 second if sys.platform == "win32": d3xx.setPipeTimeout(self.endpoints["in"], 1000) # clear read pipe. To check no residual response is in the input pipe, we issue a SN command # and repeat until we receive a valid serial number. d3xx.flushPipe(self.endpoints["in"]) for _ in range(10): firmware = self.firmware_serialnumber() if firmware not in self.firmware_versions: self.status() else: self.firmware = firmware return d3xx msg = f"Could not open device {self.serial_number}" raise RuntimeError(msg) def _teardown(self) -> None: try: self._device.close() except RuntimeError: log.exception("Error during closing device!") def _get_status(self) -> dict: """ Get the current status of the SLM. Returns: Status string. """ self.write_command(CMD_STATUSREQUEST, b"") res = self.read_command(16 + 1024) if res["id"] != 3 and res["magic_length"] != 255: # noqa: PLR2004 msg = f"Invalid status response: {res}" raise RuntimeError(msg) status_msg = res["payload"].decode("utf-8").split("\x00")[0].strip() return { "firmware_serialnumber": self.firmware_serialnumber(), "firmware": self.firmware, "status_msg": status_msg, } def _set_config(self, config: dict) -> bool: """Set the configuration from a dict.""" config_methods = ["video_mode", "contrast_level", "trigger_output", "trigger"] config_attrs = ["streaming_mode", "request_sleep"] for co, val in config.items(): if co in config_methods: getattr(self, co)(val) if co in config_attrs: setattr(self, co, val) log.debug(f"Setting {co}: {val}.") return True def _set_upload_slot(self, slot: int) -> bool: """ Set the memory slot to upload the next image to. Args: slot: Slot number. The lowest slot number is 1. Returns: Status of the operation. """ return self._handle_return_status(self._controlcommand(b"MI", [slot])) def _push_image(self, slot: int, image: npt.NDArray[np.uint16]) -> bool: """ Upload an image into a specified memory slot. Args: slot: Slot number (slot numbers range from 1 to 128) image: The image. Dimension must be (1200, 1920). Dtype must be 'u2' (`unit16`), otherwise is it casted to 'u2' which might have unpredictable results. Returns: Status of the operation """ assert self.MIN_SLOT <= slot <= self.MAX_SLOT, f"Slot {slot} is out of range!" if self._set_upload_slot(slot) != "OK": pass # raise Exception("Could not select image slot %i"%(slot)) #noqa: ERA001 TODO: is this needed? if image.shape != self.IMAGE_SIZE: msg = f"Image size {image.shape} is not correct. Must be {self.IMAGE_SIZE}" raise ValueError(msg) image = image.astype("u2") buffer = bytearray() for i, line in enumerate(image): linedata = bytearray(4096) linedata[: 2 * len(line)] = line.tobytes() buffer += self._command_assemble(CMD_IMAGEDATA, linedata, i) # set last four bytes of the buffer as uint32 checksum buffer[-4:] = struct.pack("I", _checksum_image(image) % 2**32) return self._handle_return_status(self.request(None, None, buffer=bytes(buffer))) def _display_slot(self, slot: int = 1) -> bool: """Set the memory slot to display on the SLM. Args: slot: Slot number. The lowest slot number is 1. Returns: Status of the operation. """ assert self.MIN_SLOT <= slot <= self.MAX_SLOT, f"Slot {slot} is out of range!" assert self._handle_return_status(self.contrast_level(0)), "Could not set contrast level to 0!" return self._handle_return_status(self._controlcommand(b"DS", [slot]))
[docs] def firmware_serialnumber(self) -> str: """Get the serial number of the santec firmware running on the SLM. This not the same as the serial number used to identify the FTDI chip. Returns: Serial number. """ return self._controlcommand(b"SN")
[docs] def video_mode(self, mode: int | str = "USB") -> bool: """ Set the video source the SLM draws the images from. Args: mode: Video source. 0 = USB/Memory, 1 = DVI. Returns: Status of the operation. """ if isinstance(mode, str): mode = VIDEO_MODE[mode.upper()] return self._handle_return_status(self._controlcommand(b"VI", [mode]))
[docs] def contrast_level(self, value: int) -> bool: """ Set the contrast/gamma level of the LCOS. Args: value: Value between 0 and 1023. Returns: Status of the operation. """ return self._handle_return_status(self._controlcommand(b"GS", [int(value)]))
[docs] def trigger_output(self, on: bool | None = None) -> bool: """Activate the trigger output of the SLM. This is especially useful in DVI mode orwhen software/automatic triggers are used. Args: on: Determines whether the trigger output should be activated If not set the current status is returned Returns: Status of the operation. """ if on is None: return self._controlcommand(b"TM") return self._handle_return_status(self._controlcommand(b"TM", [1 if on else 0]))
[docs] def do_phase_calibration(self, wavelength: int, max_phase: int = 2) -> bool: """ Calibrate the change of the lights phase as function of the bit value of each pixel. This can be used to adapt for different wavelength of the light and to change the maximum phase change for the maximum pixel value of 1023 (10bit). .. attention:: This command takes some minutes to finish! Args: wavelength: Wavelength of the indicent light in nm max_phase: Maximum phase change in units of pi Returns: Status of the operation. """ return self._handle_return_status(self._controlcommand(b"WL", [wavelength, max_phase], longrunning=True))
def _trigger_external(self, on: bool = True) -> bool: """ Set SLM to react on external trigger. Args: on: Determines whether the hardware trigger input should be used. Returns: Status of the operation. """ return self._handle_return_status(self._controlcommand(b"TI", [1 if on else 0])) def _trigger_auto(self, on: bool = True, period: float = 2) -> None: """ Set SLM to trigger periodically. Args: on: Determines whether the auto trigger should be used period: Trigger period time in s. Can range from 1/60 to 2 Returns: Status of the operation. """ if on: self._controlcommand(b"MW", [int(period * 60)]) self._controlcommand(b"DR", [1]) else: self._controlcommand(b"DB") self._controlcommand(b"MP", [1]) self._controlcommand(b"MW", [0]) def _trigger_software(self, on: bool) -> None: """ Set SLM to react on software trigger. Args: on: Determines whether the software trigger should be used Returns: Status of the operation """ self._controlcommand(b"TC", [1]) if on: self.trigger_software_fire()
[docs] def trigger_software_fire(self) -> bool: """ Fire a software trigger. Only works if the trigger mode was set to manual before. Returns: Status of the operation """ if self.trigger_mode == "manual": return self._handle_return_status(self._controlcommand(b"TS")) return False
[docs] def trigger(self, mode: str | None = None) -> str: """ Set or query the trigger mode. .. hint:: The SLM can be triggered from four different sources: * none : The image selected by :func:`display_slot` is displayed continuously \ and no trigger changes this. * manual : The image in the next slot is displayed when the software trigger \ :func:`trigger_software_fire` is called. * auto : The change to the next image is periodically triggered by an internal timer. * external : The change to the next image happens when a logic pulse on the trigger in \ SMB-connector is received. Args: mode: Name of the trigger mode to set. If no argument is given, the current trigger mode is returned Returns: The trigger mode """ if mode is None: return self.trigger_mode if mode in self.trigger_functions: if self.trigger_mode != "none": self.trigger_functions[self.trigger_mode]["stop"](self) if mode != "none": self.trigger_functions[mode]["start"](self) self.trigger_mode = mode return self.trigger_mode msg = f"Unknown trigger mode: {mode}" raise ValueError(msg)
if __name__ == "__main__": import numpy as np logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) serialnumber = "000000000001" slm = SLM200(serialnumber) # define test images testimage_offset = np.ones(slm.IMAGE_SIZE, dtype=np.dtype("u2")) * 1023 testimage_box = np.zeros(slm.IMAGE_SIZE, dtype=np.dtype("u2")) testimage_box[300:900, 300:900] = 1023 testimage_sine = np.zeros(slm.IMAGE_SIZE, dtype=np.dtype("u2")) testimage_sine[:, :] = ( 1023 * np.sin(2 * np.pi * np.repeat([slm.IMAGE_SIZE[1]], slm.IMAGE_SIZE[0], axis=0) / 10) ** 2 ) # Get serial number logger.info("Serial number: %s", slm.firmware_serial_number()) logger.info("Change video mode to USB: %s", slm.video_mode(VIDEO_MODE["USB"])) logger.info("Set gamma to 0: %s", slm.contrast_level(0)) start_time = time.time() logger.info("Upload image to slot 1: %s", slm.push_image(1, testimage_box)) logger.info("Upload image to slot 2: %s", slm.push_image(2, testimage_box[:, ::-1])) logger.info("Uploading took %.1f ms", (time.time() - start_time) * 1e3) # Display image uploaded to slot 1 logger.info("Display slot 1: %s", slm.display_slot(1)) # test software trigger slm.trigger("manual") for _ in range(10): time.sleep(1) slm.trigger_software_fire() # test automatic trigger (timer based) slm.trigger("auto") time.sleep(10) # set trigger to external an leave it like that slm.trigger("external") # query trigger mode logger.info("Trigger mode: %s", slm.trigger())