From afb2ea67fe27c169ec21f8f16039084803e12b2a Mon Sep 17 00:00:00 2001 From: ducoterra Date: Mon, 10 Feb 2025 17:29:02 -0500 Subject: [PATCH] initial python serial repl --- .python-version | 1 + pyproject.toml | 9 ++++++ serial_repl.py | 82 +++++++++++++++++++++++++++++++++++++++++++++++++ uv.lock | 22 +++++++++++++ 4 files changed, 114 insertions(+) create mode 100644 .python-version create mode 100644 pyproject.toml create mode 100644 serial_repl.py create mode 100644 uv.lock diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b651cd7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "off-grid-lora" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "pyserial>=3.5", +] diff --git a/serial_repl.py b/serial_repl.py new file mode 100644 index 0000000..c7a22fa --- /dev/null +++ b/serial_repl.py @@ -0,0 +1,82 @@ +import serial +import threading +from time import sleep + +class SelfHealingSerial(): + """A wrapper class for pyserial's Serial class + + This reimplements the basic Serial functions but wraps them in _reestablish_and_retry. + This attempts a reconnect if the arduino is unplugged or otherwise unavailable. + + Also this has defaults that work with the Adafruit M0 LoRa feather. + """ + def __init__(self, path: str = '/dev/ttyACM0', baudrate: int = 115200, timeout: int = 2): + self._path = path + self._baudrate = baudrate + self._timeout = timeout + self._connection = None + self._waiting_response = False + + def _establish_connection(self): + """Connects to the feather + """ + self._connection = serial.Serial(self._path, self._baudrate, timeout=self._timeout) + + def _reestablish_and_retry(func): + """If disconnect, keep trying every second + + We know if there's a disconnect when the attempted function fails. + """ + def wrapper(self: "SelfHealingSerial", *args, **kwargs): + while True: + try: + if not self._connection: + self._establish_connection() + return func(self, *args, **kwargs) + except (IOError, serial.serialutil.SerialException): + print("Connection failed.") + sleep(1) + print("Reestablishing connection.") + self._connection = False + return wrapper + + @_reestablish_and_retry + def read_all(self): + """Read all data from the serial port + """ + return self._connection.read_all() + + @_reestablish_and_retry + def write(self, bytestring: bytes): + """Write to the serial port + """ + self._waiting_response = True + return self._connection.write(bytestring) + +def print_serial(serial_obj: SelfHealingSerial): + """Check for new serial output and print it to the terminal + """ + while True: + line = serial_obj.read_all() + if line: + serial_obj._waiting_response = False + print("\n" + line.decode() + "\n>>>", end="") + +def write_serial(serial_obj: SelfHealingSerial): + """If we're not waiting for a response, write data to the serial port, then wait for a response. + """ + while True: + if not serial_obj._waiting_response: + send_string = input("") + if send_string and send_string != '\n' and send_string != '\r\n': + serial_obj.write(send_string.encode('utf-8')) + else: + sleep(1) + +if __name__ == "__main__": + connection: SelfHealingSerial = SelfHealingSerial('/dev/ttyACM0', 115200, timeout=1) + listen_thread = threading.Thread(target=print_serial, args=(connection,)) + listen_thread.start() + print(">>> ", end="") + + write_serial(connection) \ No newline at end of file diff --git a/uv.lock b/uv.lock new file mode 100644 index 0000000..25ca698 --- /dev/null +++ b/uv.lock @@ -0,0 +1,22 @@ +version = 1 +requires-python = ">=3.12" + +[[package]] +name = "off-grid-lora" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "pyserial" }, +] + +[package.metadata] +requires-dist = [{ name = "pyserial", specifier = ">=3.5" }] + +[[package]] +name = "pyserial" +version = "3.5" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb", size = 159125 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585 }, +]