"""HEROS drivers for teledyne cameras using the PVCam API."""
import math
import os
import threading
import time
from typing import Any
from heros.helper import log
from herosdevices.core.templates.camera import CameraTemplate
try:
from pyvcam import constants as pvc_constants # type: ignore
from pyvcam import pvc # type: ignore
from pyvcam.camera import Camera as PvcCamera # type: ignore
except ModuleNotFoundError:
pvc = None
PvcCamera = None
pvc_constants = None
log.exception("Could not import the 'pvcam' python module, required for using teledyne cameras")
DEFAULT_CONFIG = {
"frame_count": 10,
"timeout": -1,
"trigger_delay": 0,
}
[docs]
class PvcamCamera(CameraTemplate):
"""
A class to interface with Teledyne PVCam Cameras.
The class provides functionality to control and capture images from cameras.
It manages camera configuration, acquisition, and data streaming compatible with the atomiq camera template.
Args:
cam_id: Serial number of the cam. Can be obtained for example from the ids-peak GUI. Note, that the id
is only the first part of the value shown in the GUI, the part including the device type is not
unique and may not be added to :code:`cam_id`.
lib_path: Path to vendor library.
config_dict: Dict of configuration values like shown in the json example above.
default_config: Default key in :code:`config_dict` to use.
Note:
The vendor library must be obtained from the
`official website <https://www.teledynevisionsolutions.com/products/pvcam-sdk-amp-driver/>`_
Make sure that after the installation either the environment variable :code:`PVCAM_SDK_PATH` is set to the
correct path or pass it via the :code:`lib_path` argument.
Example:
The class can be started with BOSS with the following example JSON dict::
{
"_id": "my-camera",
"classname": "herosdevices.hardware.teledyne.PvcamCamera",
"arguments": {
"cam_id": "pvcamUSB_0",
"default_config": "default",
"config_dict": {
"default": {
"roi": [
1500,
800,
700,
700
],
"exp_mode": "Edge Trigger"
}
}
}
}
Other than the special config keys listed below, the keys are property names of the pyvcam
:code:`Camera` object. You can find the full list of available properties sometimes in the pyvcam documentation
or by crawling `this file <https://github.com/Photometrics/PyVCAM/blob/master/src/pyvcam/camera.py>`_.
**Special Config Keys:**
- ``exposure_time``: exposure time in seconds. Automatically sets the nearest resolution (`exp_res` camera
parameter). If you want to set the `exp_res` attribute manually, use the camera key `exp_time`
(in the corresponding units) instead of `exposure_time`.
- ``trigger_delay``: Is added to the trigger delay that can be obtained by :py:attr:`PvcamCamera.trigger_delay`
and can be used for a cable and camera dependent delay.
- ``frame_count``: number of frame buffers to set up, `-1` means infinite number of frames. Note, that this is
currently not supported though. If you need it implemented via a circular buffer, please open an issue or get
in contact with us.
- ``post_processing``: A dict of post processing parameters (only if camera supports it), of the form
:code:`{'feature_name': ['param_name', 'param_value']}`. See the documentation of the camera for details.
"""
trigger_delay: float | None = None
_special_config_keys = ["frame_count", "timeout"] # non peak node map config keys
def __init__(
self, cam_id: str, config_dict: dict, default_config: str | None = None, lib_path: str | None = None
) -> None:
self.default_config_dict = DEFAULT_CONFIG
self.cam_id = cam_id
if lib_path is not None:
os.environ["PVCAM_SDK_PATH"] = lib_path
try:
pvc.init_pvcam()
except RuntimeError as e:
if "PL_ERR_LIBRARY_ALREADY_INITIALIZED" in str(e):
pass
else:
raise
super().__init__(config_dict, default_config)
[docs]
def get_pyvcam_property(self, name: str) -> Any:
"""Read a property value from the underlying pyvcam camera class.
This is for example useful to get possible
configuration options for the camera. Possible properties can be found in the source code of the
`pyvcam driver <https://github.com/Photometrics/PyVCAM/blob/master/src/pyvcam/camera.py>`_.
Args:
name: name of the property as defined in the driver
"""
with self.get_camera() as camera:
return getattr(camera, name)
def _open(self) -> PvcCamera:
"""Open camera object. Do not call manually.
:meta private:
"""
try:
camera = PvcCamera(self.cam_id)
camera.open()
except RuntimeError as e:
avail_devices = PvcCamera.get_available_camera_names()
msg = f"Camera {self.cam_id} not found. Available cameras are: {avail_devices}"
raise RuntimeError(msg) from e
return camera
def _set_config(self, config: dict) -> bool:
with self.get_camera() as camera:
camera.reset_pp()
for key, value in config.items():
if key == "roi":
camera.set_roi(*value)
elif key == "exposure_time":
if value % 1e-3 < 1e-5: # noqa: PLR2004
camera.exp_res = 0 # pass the exposure time in ms
camera.exp_time = int(value * 1e3) # convert from seconds to ms
elif value % 1e-6 < 1e-5: # noqa: PLR2004
camera.exp_res = 1 # pass the exposure time in us
camera.exp_time = int(value * 1e6) # convert from seconds to us
else:
camera.exp_res = 2 # pass the exposure time in s
camera.exp_time = int(value)
elif key == "post_processing":
for pkey, pvalue in value.items():
camera.set_post_processing_param(pkey, *pvalue)
elif key not in self._special_config_keys:
try:
setattr(camera, key, value)
except RuntimeError as e:
log.error("Failed to set PVCam camera attribute %s to value %s: %s", key, value, e)
return True
def _acquisition_loop(self) -> None:
frame_id = -1
config = self.get_configuration()
frame_count = config["frame_count"]
timeout = config["timeout"]
metadata = {}
with self.get_camera() as camera:
try:
while not self._stop_acquisition_event.is_set() and (frame_id < frame_count - 1 or frame_count < 0):
frame_id += 1
time_start = time.time()
while not self._stop_acquisition_event.is_set():
try:
img_array, _, _ = camera.poll_frame(timeout_ms=1000, copyData=False)
metadata["frame"] = frame_id
self.acquisition_data(img_array["pixel_data"].astype("int16"), metadata)
break
except RuntimeError:
time.sleep(1)
if timeout > 0:
if timeout < (time.time() - time_start):
log.error("Frame number %s timed out", frame_id)
break
except RuntimeError as e:
log.error("Exception occurred in acquisition thread: %s", e)
self._stop_acquisition_event.set()
if frame_count > 0:
if frame_id != frame_count - 1:
log.error("Incorrect number of received frames: %s instead of %s!", frame_id, frame_count)
self.stop()
def _arm(self) -> bool:
self._start_acquisition_thread()
return True
def _start_acquisition_thread(self) -> None:
"""Start the acquisition thread."""
config = self.get_configuration()
frame_count = config["frame_count"]
with self.get_camera() as camera:
try:
camera.start_seq(num_frames=frame_count)
self._calculate_trigger_delay()
camera.start_seq(num_frames=frame_count)
except Exception as e: # noqa: BLE001
log.error("Starting PVCam acquisition failed: %s", e)
log.debug("Starting acquisition thread")
self._stop_acquisition_event.clear()
self._acquisition_thread = threading.Thread(target=self._acquisition_loop)
self._acquisition_thread.start()
def _start(self) -> bool:
with self.get_camera() as camera:
camera.sw_trigger()
return True
def _stop(self) -> bool:
if self.acquisition_running is False or self._acquisition_thread is None:
self.acquisition_running = False
return True
try:
self._stop_acquisition_event.set()
if threading.current_thread().ident != self._acquisition_thread.ident:
# Kill the datastream to exit out of pending `WaitForFinishedBuffer` calls
self._acquisition_thread.join()
else:
self.acquisition_stopped()
with self.get_camera() as camera:
camera.finish()
self._acquisition_thread = None
except Exception as e: # noqa: BLE001
log.error("Exception (stop acquisition): %s", str(e))
return False
else:
return True
def _calculate_trigger_delay(self) -> None:
"""Calculate the trigger delay of the current exposure.
In this universal driver class here, it only uses the
user set :code:`trigger_delay` from the configuration dict.
To include line scan times and the like, it must be implemented on a model to model basis as it strongly
deviates.
"""
self.trigger_delay = self.get_configuration()["trigger_delay"]
def _get_status(self) -> dict:
return {
"acquisition_running": self.acquisition_running,
}
def _teardown(self) -> None:
log.debug(f"closing down connection to {self.cam_id}")
self.stop()
if self._device is not None:
self._device.close()
def __del__(self) -> None:
"""Call teardown method on deletion."""
self.teardown()
[docs]
class Kinetix(PvcamCamera):
"""Driver class for the Kinetix camera.
This class adds the following device specific functionality:
- The trigger delay stored in the :py:attr:`Kinetix.trigger_delay` is calculated from the user specified trigger
delay and the delay from the trigger input to the `All Rows` condition.
For more information refer to :py:class:`PvcamCamera`
Note:
It is well possible to that other PVCam cameras that support the fake global shutter concept (e.g. defined
`All Rows` condition) can also be used with this driver.
"""
def _calculate_trigger_delay(self) -> None:
"""
Calculate the delay from trigger input to the "All Rows" condition.
This can be for example the time you want to switch on a light source.
This delay is dependent on the read out port and the number of pixel lines used in the exposure.
More details can be found in the official Kinetix manual.
The user specified `trigger_delay` config value is added to the returned value. The returned value is in
units of seconds.
"""
config = self.get_configuration()
with self.get_camera() as camera:
if "roi" in config:
n_lines = config["roi"][3]
else:
# no roi configured -> full sensor
n_lines = camera.sensor_size[1]
if camera.readout_port == 0:
base_delay_num = 2
else:
base_delay_num = 1
n_lines = math.ceil(n_lines / 2) * 2
try:
eff_scan_time_ns = camera.scan_line_time * (base_delay_num + camera.scan_line_delay) / 2
# Somehow only some KINETIX can do a scan line delay
except AttributeError:
eff_scan_time_ns = camera.scan_line_time * (base_delay_num) / 2
scan_time = 1e-9 * eff_scan_time_ns * n_lines
self.trigger_delay = config["trigger_delay"] + scan_time