Source code for herosdevices.hardware.dummy.cameras
"""Dummy camera devices for use in testing. Do not use in production."""
import threading
import numpy as np
from numpy.typing import NDArray
from herosdevices.core.templates import CameraTemplate
from herosdevices.helper import log
[docs]
class ImageGeneratorDummy:
"""Act like a real camera, no one will notice."""
def __init__(self) -> None:
self.width = 800
self.height = 600
self.frame_count = 1
self._is_armed = False
self._image_buffer = []
[docs]
def set_shape(self, width: int = 800, height: int = 600) -> None:
"""Set the image shape."""
self.width = width
self.height = height
[docs]
@staticmethod
def generate_gaussian_image(
w: int, h: int, amplitude: float = 65535, noise_level: float = 0.05
) -> NDArray[np.uint16]:
"""Generate a 2D Gaussian image with added random noise.
The Gaussian is centered in the image with a fixed standard deviation,
scaled to the specified amplitude. Additive Gaussian noise is applied
and the result is clipped to the valid `uint16` range.
Args:
w: Width of the image.
h: Height of the image.
amplitude: Peak value of the Gaussian. Defaults to 65535.
noise_level: Standard deviation of noise relative to
the amplitude (e.g., 0.05 means ±5% noise). Defaults to 0.05.
Returns:
np.ndarray: A (h, w) image array of dtype `np.uint16`.
"""
x = np.linspace(-1, 1, w)
y = np.linspace(-1, 1, h)
xv, yv = np.meshgrid(x, y)
sigma = 0.3
gaussian = np.exp(-(xv**2 + yv**2) / (2 * sigma**2))
gaussian *= amplitude
noise = np.random.default_rng().normal(loc=0, scale=noise_level * amplitude, size=(h, w))
image = gaussian + noise
image = np.clip(image, 0, 65535)
return image.astype(np.uint16)
[docs]
def arm(self) -> None:
"""Arm the dummy device."""
self._is_armed = True
[docs]
def trigger(self) -> None:
"""Append an image to the buffer."""
if self._is_armed:
if len(self._image_buffer) == self.frame_count:
raise RuntimeError("Camera was armed and triggered while buffer was full!")
self._image_buffer.append(self.generate_gaussian_image(self.width, self.height))
# did we reach the max frame count?
if len(self._image_buffer) == self.frame_count:
self._is_armed = False
[docs]
def get_image(self) -> np.ndarray:
"""Get the last image from the buffer."""
if len(self._image_buffer) > 0:
return self._image_buffer.pop(0)
raise RuntimeError("Image buffer empty!")
[docs]
def clear_buffer(self) -> None:
"""Clear the image buffer."""
self._image_buffer = []
[docs]
def abort(self) -> None:
"""Abort the acquisition."""
self._is_armed = False
self.clear_buffer()
[docs]
class CameraDummy(CameraTemplate):
"""A dummy camera."""
_auto_trigger: bool = True
default_config_dict: dict = {}
def _open(self) -> ImageGeneratorDummy:
"""Device specific code to open the camera handler and return it."""
return ImageGeneratorDummy()
def _teardown(self) -> None:
"""Device specific code to release the camera handler and potentially de-initialize the API."""
def _start(self) -> bool:
"""
Device specific code to fire a software trigger via DCAM.
Returns:
True if successful
"""
with self.get_camera() as camera:
camera.trigger()
return True
def _stop(self) -> bool:
"""
Device specific code to abort the exposure and release queued buffers.
Returns:
True if successful
"""
self._stop_acquisition_thread()
with self.get_camera() as camera:
camera.abort()
camera.clear_buffer()
return True
def _get_status(self) -> dict:
"""
Device specific code to get a dict with the current device status.
Returns:
A dict with the device status
"""
return {"foo": "bar"}
def _set_config(self, config: dict) -> bool:
"""
Device specific code to configure camera features.
Args:
config: A valid configuration dict passed from :meth:`set_config`
Returns:
True if configuration is possible
"""
with self.get_camera() as camera:
camera.set_shape(config["height"], config["width"])
return True
def _arm(self) -> bool:
"""
Device specific code to arm the camera with the currently active configuration.
Returns:
True if arming was successful else False
"""
try:
with self.get_camera() as camera:
camera.frame_count = self.get_configuration()["frame_count"]
camera.arm()
self._start_acquisition_thread() # has to be implement for the specific device
except Exception as e: # noqa: BLE001
log.error(e)
return False
return True
def _start_acquisition_thread(self) -> None:
"""Start the acquisition thread."""
log.debug("Starting acquisition thread")
self._stop_acquisition_event.clear()
self._acquisition_thread = threading.Thread(target=self._acquisition_loop)
self._acquisition_thread.daemon = False # daemon thread?
self._acquisition_thread.start()
self.acquisition_running = True
self.acquisition_started(self.get_configuration())
def _stop_acquisition_thread(self) -> None:
"""Stop the acquisition thread and wait for it to terminate."""
if self._acquisition_thread is not None:
# set the stop event and mark acquisition as not running
self._stop_acquisition_event.set()
self.acquisition_running = False
# join the thread to wait for its termination
self._acquisition_thread.join(timeout=1)
# check if the thread is still alive
if self._acquisition_thread.is_alive():
log.warn("Acquisition thread did not terminate gracefully")
else:
log.debug("Acquisition thread stopped successfully")
self._acquisition_thread = None
self.acquisition_running = False
def _acquisition_loop(self) -> None:
"""Grab all images from queued buffers and release buffers."""
images = []
frame_count = self.get_configuration()["frame_count"]
with self.get_camera() as camera:
for i in range(frame_count):
# check if we need to stop
if self._stop_acquisition_event.is_set(): # Check stop event with timeout
return
log.debug(f"Waiting for frame {i} / {frame_count}")
if self._auto_trigger:
camera.trigger()
image = camera.get_image()
images.append(image)
# emit image via event
self.acquisition_data(image, {"frame": i})
log.debug("Stopping exposure")
camera.clear_buffer()
# cleanup
self._acquisition_thread = None
self.acquisition_running = False
self.acquisition_stopped({"frames": len(images), "frame_count": frame_count})
if len(images) != frame_count:
log.error(f"Incorrect number of received frames: {len(images)} instead of {frame_count}!")
self.reset()