Source code for herosdevices.core.bus.serial
"""Primitive functions and classes representing serial connections."""
import time
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, cast
if TYPE_CHECKING:
from types import ModuleType
from heros.helper import log
SERIAL_DELAYS_DEFAULT: dict = {"write": 0.001, "read_echo": 0.001, "flush": 0.001}
try:
import serial
except ModuleNotFoundError:
serial = cast("ModuleType", None)
[docs]
class SerialConnection:
"""A class to manage serial communication connections.
This class provides functionality to handle serial connections including opening/closing connections, reading
data, and writing data. It inherits from the base class Connection.
Args:
address: The address of the serial socket, something like /dev/ttyUSB0.
baudrate: The baud rate for the serial communication.
line_termination: character that terminates a line in the communication.
keep_alive: Flag indicating whether to keep the connection open between operations.
delays: Dictionary containing delay times for in between serial operations.
Default serial delays for serial devices. Available keys are:
* "write": Time to wait after writing a command to the device.
* "read_echo": Time to wait before reading a response from the device.
* "flush": Time to wait after flushing the device.
:py:data:`herosdevices.core.bus.SERIAL_DELAYS_DEFAULT` sets the default delays.
**kwargs: Keyword arguments passed to :code:`serial.serial_for_url`
"""
def __init__(
self,
address: str,
baudrate: int = 115200,
line_termination: bytes = b"\n",
keep_alive: bool = True,
delays: dict | None = None,
**kwargs,
) -> None:
self.address = address
self.baudrate = baudrate
self.line_termination = line_termination
if serial is None:
raise ModuleNotFoundError(
"Could not import the 'pyserial' python module, serial devices are not be available"
)
self.connection = serial.serial_for_url(address, do_not_open=True, baudrate=baudrate, **kwargs)
self.keep_alive = keep_alive
self.delays = SERIAL_DELAYS_DEFAULT | delays if delays else SERIAL_DELAYS_DEFAULT
[docs]
@contextmanager
def operation(self) -> Iterator[None]:
"""Context manager for handling serial connection operations.
Ensures the serial connection is open before performing operations and closes it afterward
if :code:`self.keep_alive` is False.
Yields:
Yields control back to the caller for performing operations within the context.
"""
if not self.connection.isOpen():
self.connection.open()
try:
yield
finally:
if not self.keep_alive:
self.connection.close()
[docs]
def wait(self, operation: str) -> None:
"""Introduce a (synchronous) delay based on the specified operation type.
Args:
operation: The operation type. For possible types see :code:`SERIAL_DELAYS_DEFAULT`.
"""
time.sleep(self.delays[operation])
[docs]
def read(self) -> str | None:
"""Read all available data from the serial connection and decodes it into a string.
Returns:
The decoded data as string, or None if an error occurs.
"""
with self.operation():
try:
read = b""
while bytes_to_read := self.connection.in_waiting > 0:
read += self.connection.read(bytes_to_read)
return read.decode("ascii")
except Exception: # noqa: BLE001
log.exception(
"Error reading from serial connection at %s",
self.address,
)
return None
[docs]
def read_line(self) -> str | None:
"""Read a single line from the serial connection.
Returns:
The decoded line as string, or None if an error occurs.
"""
with self.operation():
try:
read = self.connection.read_until(self.line_termination)
return read.decode("ascii")
except Exception: # noqa: BLE001
log.exception(
"Error reading from serial connection at %s",
self.address,
)
return None
[docs]
def write(self, message: str, flush: bool = True, read_echo: bool = False, read_line: bool = False) -> None | str:
"""Write a message to the serial connection.
Args:
message: The message to be written to the serial connection.
flush: If True, flushes the written data immediately. Defaults to True.
read_echo: If True, reads back the echo after writing. Defaults to False.
read_line: If True, data is read until `self.line_termination` occurs in the data. Otherwise all available
data is read.
Returns:
If read_echo is True, returns the echo read from the connection as string; otherwise returns None.
"""
with self.operation():
self.connection.reset_input_buffer()
self.connection.write(message.encode("ascii"))
self.wait("write")
if flush:
self.connection.flush()
self.wait("flush")
if read_echo:
if read_line:
read = self.read_line()
else:
self.wait("read_echo")
read = self.read()
return read
return None