initial python serial repl

This commit is contained in:
2025-02-10 17:29:02 -05:00
parent d27bfc9734
commit afb2ea67fe
4 changed files with 114 additions and 0 deletions

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.12

9
pyproject.toml Normal file
View File

@@ -0,0 +1,9 @@
[project]
name = "off-grid-lora"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pyserial>=3.5",
]

82
serial_repl.py Normal file
View File

@@ -0,0 +1,82 @@
import serial
import threading
from time import sleep
class SelfHealingSerial():
"""A wrapper class for pyserial's Serial class
This reimplements the basic Serial functions but wraps them in _reestablish_and_retry.
This attempts a reconnect if the arduino is unplugged or otherwise unavailable.
Also this has defaults that work with the Adafruit M0 LoRa feather.
"""
def __init__(self, path: str = '/dev/ttyACM0', baudrate: int = 115200, timeout: int = 2):
self._path = path
self._baudrate = baudrate
self._timeout = timeout
self._connection = None
self._waiting_response = False
def _establish_connection(self):
"""Connects to the feather
"""
self._connection = serial.Serial(self._path, self._baudrate, timeout=self._timeout)
def _reestablish_and_retry(func):
"""If disconnect, keep trying every second
We know if there's a disconnect when the attempted function fails.
"""
def wrapper(self: "SelfHealingSerial", *args, **kwargs):
while True:
try:
if not self._connection:
self._establish_connection()
return func(self, *args, **kwargs)
except (IOError, serial.serialutil.SerialException):
print("Connection failed.")
sleep(1)
print("Reestablishing connection.")
self._connection = False
return wrapper
@_reestablish_and_retry
def read_all(self):
"""Read all data from the serial port
"""
return self._connection.read_all()
@_reestablish_and_retry
def write(self, bytestring: bytes):
"""Write to the serial port
"""
self._waiting_response = True
return self._connection.write(bytestring)
def print_serial(serial_obj: SelfHealingSerial):
"""Check for new serial output and print it to the terminal
"""
while True:
line = serial_obj.read_all()
if line:
serial_obj._waiting_response = False
print("\n" + line.decode() + "\n>>>", end="")
def write_serial(serial_obj: SelfHealingSerial):
"""If we're not waiting for a response, write data to the serial port, then wait for a response.
"""
while True:
if not serial_obj._waiting_response:
send_string = input("")
if send_string and send_string != '\n' and send_string != '\r\n':
serial_obj.write(send_string.encode('utf-8'))
else:
sleep(1)
if __name__ == "__main__":
connection: SelfHealingSerial = SelfHealingSerial('/dev/ttyACM0', 115200, timeout=1)
listen_thread = threading.Thread(target=print_serial, args=(connection,))
listen_thread.start()
print(">>> ", end="")
write_serial(connection)

22
uv.lock generated Normal file
View File

@@ -0,0 +1,22 @@
version = 1
requires-python = ">=3.12"
[[package]]
name = "off-grid-lora"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "pyserial" },
]
[package.metadata]
requires-dist = [{ name = "pyserial", specifier = ">=3.5" }]
[[package]]
name = "pyserial"
version = "3.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb", size = 159125 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/bc/587a445451b253b285629263eb51c2d8e9bcea4fc97826266d186f96f558/pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0", size = 90585 },
]