Source code for herosdevices.hardware.ids.peak_camera

"""Driver for cameras based on the ids peak library."""

import os
import threading
from typing import TYPE_CHECKING, cast

if TYPE_CHECKING:
    from types import ModuleType

from heros.helper import log

from herosdevices.core.templates.camera import CameraTemplate

try:
    from ids_peak import (
        ids_peak,  # type: ignore
        ids_peak_ipl_extension,  # type: ignore
    )
except ModuleNotFoundError:
    ids_peak = cast("ModuleType", None)
    ids_peak_ipl_extension = cast("ModuleType", None)
    log.exception("Could not import the 'ids_peak' or 'ids_peak_ipl' python module, required for using ids cameras")

DEFAULT_CONFIG = {
    "timeout": -1,
}


[docs] class PeakCompatibleCamera(CameraTemplate): """ A class to interface with IDS Peak cameras. The class provides functionality to control and capture images from IDS Peak cameras. It manages camera configuration, acquisition, and data streaming. Note: To access the camera as non-root user, you need to add the following udev rule to :code:`/etc/udev/rules.d/99-ids.rules`:: ATTRS{idVendor}=="1409", MODE="666" The vendor library must be obtained from the [official website](https://en.ids-imaging.com/download-peak.html). Download the `IDS peak archive file`, unpack it at move the content of `idspeak/ids/cti/` to a place where the user running the driver can access it. This path then needs to be specified via the `lib_path` argument (see example below) Note: The :code:`node_map` attribute provides access to the camera node map. If you need to set some special nodes you can use that. You can find the available nodes in the official API manuals: - https://en.ids-imaging.com/manuals/ids-peak/ids-peak-api-documentation/2.16.0/en/index.html - https://en.ids-imaging.com/manuals/ids-peak/ids-peak-user-manual/1.3.0/en/preface.html Example: The class can be started with BOSS with the following example JSON dict:: { "_id": "my_camera", "classname": "herosdevices.hardware.ids.PeakCompatibleCamera", "arguments": { "cam_id": "1410d4e7c3b5", "lib_path": "/opt/idspeak/ids/cti/", "default_config": "default", "config_dict": { "default": { "ExposureTime": 1000, "TriggerSelector": "ExposureStart", "TriggerMode": "On", "TriggerSource": "Software", "AcquisitionMode": "MultiFrame", "AcquisitionFrameCount": 5, } } } The keys in the config dictionary starting with a capital letter are nodes in the camera node map. """ _special_config_keys = ["timeout"] # non peak node map config keys _datastream: ids_peak.DataStream node_map: ids_peak.NodeMap | None = None def __init__( self, cam_id: str, config_dict: dict, default_config: str | None = None, lib_path: str | None = None, reset_to_continuous: bool = False, ) -> None: """Create a class to interface with IDS Peak cameras. Args: cam_id: Serial number of the cam. Can be obtained for example from the ids-peak GUI. Note, that the id is only the first part of the value shown in the GUI, the part including the device type is not unique and may not be added to :code:`cam_id`. lib_path: Path to vendor library. config_dict: Dict of configuration values like shown in the json example above. default_config: Default key in :code:`config_dict` to use. reset_to_continuous: If True, the camera will be set to continuous acquisition mode on teardown. """ self.cam_id = cam_id self.default_config_dict = DEFAULT_CONFIG self.reset_to_continuous = reset_to_continuous if lib_path is not None: os.environ["GENICAM_GENTL32_PATH"] = lib_path os.environ["GENICAM_GENTL64_PATH"] = lib_path ids_peak.Library.Initialize() super().__init__(config_dict, default_config)
[docs] def get_config_nodes(self, only_implemented: bool = True) -> dict: """Get all nodes from the camera in form of a dict including information about if they can be set/read. Args: only_implemented: Shows only nodes that are implemented on the attached camera. If False, returns all nodes the driver library knows """ node_dict = {} with self.get_camera(): for n in self.node_map.Nodes(): # type: ignore if not isinstance(n, ids_peak.CategoryNode): node_props = {} node_props["Display Name"] = n.DisplayName() node_props["ToolTip"] = n.ToolTip() node_props["IsReadable"] = n.IsReadable() node_props["IsWriteable"] = n.IsWriteable() if not only_implemented or n.IsImplemented(): node_dict[n.Name()] = node_props return node_dict
def _open(self) -> ids_peak.Device: """Open the connection to the device. Don't call directly. :meta private: """ device_manager = ids_peak.DeviceManager.Instance() device_manager.Update() for i, device in enumerate(device_manager.Devices()): if self.cam_id in device.ID(): camera = device_manager.Devices()[i].OpenDevice(ids_peak.DeviceAccessType_Control) break else: msg = f"Camera {self.cam_id} not found." raise RuntimeError(msg) self.node_map = camera.RemoteDevice().NodeMaps()[0] return camera def _set_config(self, config: dict) -> bool: with self.get_camera(): for key, value in config.items(): if key not in self._special_config_keys: node = self.node_map.FindNode(key) # type: ignore if type(node) is ids_peak.IntegerNode or type(node) is ids_peak.FloatNode: node.SetValue(value) elif type(node) is ids_peak.EnumerationNode: node.SetCurrentEntry(value) else: msg = f"The node {key} has an unrecognised node type: {type(node)}" raise ValueError(msg) return True def _get_expected_frames(self) -> int: current_config = self.get_configuration() if current_config["AcquisitionMode"] == "Continuous": return -1 if current_config["AcquisitionMode"] == "SingleFrame": return 1 # MultiFrame return current_config["AcquisitionFrameCount"] def _acquisition_loop(self) -> None: meta_data = {} frame_id = 0 config = self.get_configuration() expected_frames = self._get_expected_frames() if config["timeout"] == -1: timeout = ids_peak.Timeout.INFINITE_TIMEOUT else: timeout = ids_peak.Timeout(int(config["timeout"] * 1e3)) while not self._stop_acquisition_event.is_set() and (frame_id < expected_frames - 1 or expected_frames < 0): try: buffer = self._datastream.WaitForFinishedBuffer(timeout) if buffer.DeliveredDataSize() > 0: frame_id = buffer.FrameID() - 1 # FrameID starts at 1 meta_data["frame"] = frame_id meta_data["acquisition_time"] = buffer.Timestamp_ns() ipl_image = ids_peak_ipl_extension.BufferToImage(buffer) self._datastream.QueueBuffer(buffer) img_array = ipl_image.get_numpy_2D().copy() self.acquisition_data(img_array, meta_data) except (ids_peak.AbortedException, ids_peak.TimeoutException): log.exception("Error during acquisition.") self._stop_acquisition_event.set() if expected_frames > 0: if expected_frames > 0 and frame_id != expected_frames - 1: log.error("Incorrect number of received frames: %s instead of %s!", frame_id + 1, expected_frames) self.stop() def _arm(self) -> bool: with self.get_camera() as camera: self.node_map.FindNode("TLParamsLocked").SetValue(1) # type: ignore try: self._datastream = camera.DataStreams()[0].OpenDataStream() except ids_peak.BadAccessException: self._datastream = camera.DataStreams()[0].OpenedDataStream() self._datastream.Flush(ids_peak.DataStreamFlushMode_DiscardAll) for buffer in self._datastream.AnnouncedBuffers(): self._datastream.RevokeBuffer(buffer) payload_size = self.node_map.FindNode("PayloadSize").Value() # type: ignore max_buffer = self._datastream.NumBuffersAnnouncedMinRequired() * 5 for _ in range(max_buffer): buffer = self._datastream.AllocAndAnnounceBuffer(payload_size) self._datastream.QueueBuffer(buffer) self._datastream.StartAcquisition() self.node_map.FindNode("AcquisitionStart").Execute() # type: ignore self.node_map.FindNode("AcquisitionStart").WaitUntilDone() # type: ignore self._start_acquisition_thread() return True def _start_acquisition_thread(self) -> None: """Start the acquisition thread.""" log.debug("Starting acquisition thread") self._stop_acquisition_event.clear() self._acquisition_thread = threading.Thread(target=self._acquisition_loop) self._acquisition_thread.start() def _start(self) -> bool: self.node_map.FindNode("TriggerSoftware").Execute() # type: ignore return True def _stop(self) -> bool: if self.acquisition_running is False or self._acquisition_thread is None: self.acquisition_running = False return True try: self.node_map.FindNode("AcquisitionStop").Execute() # type: ignore if threading.current_thread().ident != self._acquisition_thread.ident: # Kill the datastream to exit out of pending `WaitForFinishedBuffer` calls self._datastream.KillWait() self._acquisition_thread.join() self.acquisition_stopped() else: # only set stop flag if closed from acquisition thread itself self._datastream.StopAcquisition(ids_peak.AcquisitionStopMode_Default) # Unlock parameters self.node_map.FindNode("TLParamsLocked").SetValue(0) # type: ignore self._acquisition_thread = None except Exception: # noqa: BLE001 log.exception("Exception during stop acquisition.") return False else: return True def _get_status(self) -> dict: return { "acquisition_running": self.acquisition_running, } def _teardown(self) -> None: if self.reset_to_continuous: self._set_config({"AcquisitionMode": "Continuous"}) log.debug(f"closing down connection to {self.cam_id}") self.stop() self.node_map = None self._device = None def __del__(self) -> None: """Call teardown method and close ids_peak library on delete.""" self.teardown() ids_peak.Library.Close()