Source code for herosdevices.core.bus.serial
"""Primitive functions and classes representing serial connections."""
import time
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, cast
if TYPE_CHECKING:
from types import ModuleType
from heros.helper import log
SERIAL_DELAYS_DEFAULT: dict = {"write": 0.001, "read_echo": 0.001, "flush": 0.001}
try:
import serial
except ModuleNotFoundError:
serial = cast("ModuleType", None)
[docs]
class SerialConnection:
"""A class to manage serial communication connections.
This class provides functionality to handle serial connections including opening/closing connections, reading
data, and writing data.
Args:
address: The address of the serial socket, something like /dev/ttyUSB0.
baudrate: The baud rate for the serial communication.
read_line_termination: character that terminates a line when reading from the device.
write_line_termination: character that terminates a line when wrtiting to the device.
keep_alive: Flag indicating whether to keep the connection open between operations.
delays: Dictionary containing delay times for in between serial operations.
Default serial delays for serial devices. Available keys are:
* "write": Time to wait after writing a command to the device.
* "read_echo": Time to wait before reading a response from the device.
* "flush": Time to wait after flushing the device.
:py:data:`herosdevices.core.bus.SERIAL_DELAYS_DEFAULT` sets the default delays.
**kwargs: Keyword arguments passed to :code:`serial.serial_for_url`
"""
def __init__(
self,
address: str,
baudrate: int = 115200,
write_line_termination: bytes = b"",
read_line_termination: bytes = b"\n",
keep_alive: bool = True,
delays: dict | None = None,
timeout: float = 0.1,
**kwargs,
) -> None:
self.address = address
self.baudrate = baudrate
self.read_line_termination = read_line_termination
self.write_line_termination = write_line_termination
if serial is None:
raise ModuleNotFoundError(
"Could not import the 'pyserial' python module, serial devices will not be available"
)
self.connection = serial.serial_for_url(address,
do_not_open=True,
baudrate=baudrate,
timeout=timeout,
**kwargs
)
self.keep_alive = keep_alive
self.delays = SERIAL_DELAYS_DEFAULT | delays if delays else SERIAL_DELAYS_DEFAULT
[docs]
@contextmanager
def operation(self) -> Iterator[None]:
"""Context manager for handling serial connection operations.
Ensures the serial connection is open before performing operations and closes it afterward
if :code:`self.keep_alive` is False.
Yields:
Yields control back to the caller for performing operations within the context.
"""
if not self.connection.isOpen():
self.connection.open()
try:
yield
finally:
if not self.keep_alive:
self.connection.close()
[docs]
def wait(self, operation: str) -> None:
"""Introduce a (synchronous) delay based on the specified operation type.
Args:
operation: The operation type. For possible types see :code:`SERIAL_DELAYS_DEFAULT`.
"""
time.sleep(self.delays[operation])
[docs]
def read(self) -> str | None:
"""Read all available data from the serial connection and decodes it into a string.
Returns:
The decoded data as string, or None if an error occurs.
"""
with self.operation():
try:
read = b""
while bytes_to_read := self.connection.in_waiting > 0:
read += self.connection.read(bytes_to_read)
return read.decode("ascii")
except Exception: # noqa: BLE001
log.exception(
"Error reading from serial connection at %s",
self.address,
)
return None
[docs]
def read_line(self) -> str | None:
"""Read a single line from the serial connection.
Returns:
The decoded line as string, or None if an error occurs.
"""
with self.operation():
try:
read = self.connection.read_until(self.read_line_termination)
return read.decode("ascii")
except Exception: # noqa: BLE001
log.exception(
"Error reading from serial connection at %s",
self.address,
)
return None
[docs]
def write(self, message: str, flush: bool = True, read_echo: bool = False, read_line: bool = False) -> None | str:
"""Write a message to the serial connection.
The `self.write_line_termination` is automatically appended to the message if it is not already present.
Args:
message: The message to be written to the serial connection.
flush: If True, flushes the written data immediately. Defaults to True.
read_echo: If True, reads back the echo after writing. Defaults to False.
read_line: If True, data is read until `self.read_line_termination` occurs in the data. Otherwise all
available data is read.
Returns:
If read_echo is True, returns the echo read from the connection as string; otherwise returns None.
"""
with self.operation():
enc_msg = message.encode("ascii")
self.connection.reset_input_buffer()
if not enc_msg.endswith(self.write_line_termination):
enc_msg += self.write_line_termination
self.connection.write(enc_msg)
self.wait("write")
if flush:
self.connection.flush()
self.wait("flush")
if read_echo:
if read_line:
read = self.read_line()
else:
self.wait("read_echo")
read = self.read()
return read
return None