import serial import threading from time import sleep class SelfHealingSerial(): """A wrapper class for pyserial's Serial class This reimplements the basic Serial functions but wraps them in _reestablish_and_retry. This attempts a reconnect if the arduino is unplugged or otherwise unavailable. Also this has defaults that work with the Adafruit M0 LoRa feather. """ def __init__(self, path: str = '/dev/ttyACM0', baudrate: int = 115200, timeout: int = 2): self._path = path self._baudrate = baudrate self._timeout = timeout self._connection = None self._waiting_response = False def _establish_connection(self): """Connects to the feather """ self._connection = serial.Serial(self._path, self._baudrate, timeout=self._timeout) def _reestablish_and_retry(func): """If disconnect, keep trying every second We know if there's a disconnect when the attempted function fails. """ def wrapper(self: "SelfHealingSerial", *args, **kwargs): while True: try: if not self._connection: self._establish_connection() return func(self, *args, **kwargs) except (IOError, serial.serialutil.SerialException): print("Connection failed.") sleep(1) print("Reestablishing connection.") self._connection = False return wrapper @_reestablish_and_retry def read_all(self): """Read all data from the serial port """ return self._connection.read_all() @_reestablish_and_retry def write(self, bytestring: bytes): """Write to the serial port """ self._waiting_response = True return self._connection.write(bytestring) def print_serial(serial_obj: SelfHealingSerial): """Check for new serial output and print it to the terminal """ while True: line = serial_obj.read_all() if line: serial_obj._waiting_response = False print("\n" + line.decode() + "\n>>>", end="") def write_serial(serial_obj: SelfHealingSerial): """If we're not waiting for a response, write data to the serial port, then wait for a response. """ while True: if not serial_obj._waiting_response: send_string = input("") if send_string and send_string != '\n' and send_string != '\r\n': serial_obj.write(send_string.encode('utf-8')) else: sleep(1) if __name__ == "__main__": connection: SelfHealingSerial = SelfHealingSerial('/dev/ttyACM0', 115200, timeout=1) listen_thread = threading.Thread(target=print_serial, args=(connection,)) listen_thread.start() print(">>> ", end="") write_serial(connection)