Source code for herosdevices.core.bus.visa
"""Primitive functions and classes representing visa connections."""
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, cast
if TYPE_CHECKING:
from types import ModuleType
from heros.helper import log
try:
import pyvisa
from pyvisa.rname import ResourceName
except ModuleNotFoundError:
pyvisa = cast("ModuleType", None)
[docs]
class VisaConnection:
"""A class to manage VISA communication connections.
This class provides functionality to handle visa connections including opening/closing connections, reading
data, and writing data.
Args:
resource: The resource name of the visa instrument (e.g. ``TCPIP::my_device_hostname::INSTR``).
keep_alive: Flag indicating whether to keep the connection open between operations.
**kwargs: Keyword arguments passed to :code:`pyvisa.open_resource`
"""
resource: str
def __init__(
self,
resource: str,
keep_alive: bool = True,
**kwargs,
) -> None:
self.resource = resource
if pyvisa is None:
raise ModuleNotFoundError(
"Could not import the 'pyvisa' python module, visa devices will not be available available"
)
try:
ResourceName.from_string(self.resource)
except ValueError:
log.exception(f"Invalid resource: {self.resource}")
self._pyvisa_rm: pyvisa.ResourceManager | None = None
self.connection: pyvisa.resources.MessageBasedResource | None = None
self.keep_alive = keep_alive
self._resource_kwargs = kwargs
def _open(self) -> pyvisa.resources.MessageBasedResource:
log.info(f"Connecting to {self.resource}")
if self._pyvisa_rm is None:
self._pyvisa_rm = pyvisa.ResourceManager("@py")
assert self._pyvisa_rm is not None
resource = self._pyvisa_rm.open_resource(self.resource, **self._resource_kwargs)
assert isinstance(resource, pyvisa.resources.MessageBasedResource)
return resource
def _teardown(self) -> None:
try:
assert self.connection is not None
self.connection.close()
except pyvisa.errors.Error:
log.exception("Error while closing.")
finally:
self.connection = None
[docs]
@contextmanager
def operation(self) -> Iterator[None]:
"""Context manager for handling visa connection operations.
Ensures the visa connection is open before performing operations and closes it afterward
if :code:`self.keep_alive` is False.
Yields:
Yields control back to the caller for performing operations within the context.
"""
if self.connection is None:
self.connection = self._open()
try:
yield
finally:
if not self.keep_alive:
self._teardown()
[docs]
def read(self) -> str | None:
"""Do device read query.
Returns:
The decoded data as string, or None if an error occurs.
"""
with self.operation():
assert self.connection is not None
try:
return self.connection.read().rstrip("\n")
except pyvisa.errors.VisaIOError:
log.exception("Error during query!")
self._teardown()
return None
[docs]
def write(self, message: str, read_echo: bool = False, *args, **kwargs) -> str | None: # noqa: ARG002
"""Write to the visa connection.
Args:
message: The message to be written to the serial connection.
read_echo: If True, reads back the echo after writing. Defaults to False.
"""
with self.operation():
assert self.connection is not None
try:
self.connection.write(message)
if read_echo:
return self.connection.read()
except pyvisa.errors.VisaIOError:
log.exception("Error during query!")
self._teardown()
return None