2025-12-23 14:12:21 +08:00

92 lines
2.5 KiB
Python

import argparse
import sys
import threading
import time
def _load_serial():
try:
import serial # type: ignore
except ImportError:
sys.stderr.write("pyserial is not installed. Run: pip install pyserial\n")
sys.stderr.flush()
sys.exit(2)
return serial
def _reader_loop(ser, stop_event):
buffer = bytearray()
while not stop_event.is_set():
try:
data = ser.read(ser.in_waiting or 1)
except Exception as exc:
sys.stderr.write(f"serial read error: {exc}\n")
sys.stderr.flush()
break
if data:
buffer.extend(data)
while b"\n" in buffer:
line, _, rest = buffer.partition(b"\n")
buffer = bytearray(rest)
text = line.decode("utf-8", errors="replace").rstrip("\r")
sys.stdout.write(text + "\n")
sys.stdout.flush()
else:
time.sleep(0.01)
if buffer:
text = buffer.decode("utf-8", errors="replace").rstrip("\r")
sys.stdout.write(text + "\n")
sys.stdout.flush()
def main():
parser = argparse.ArgumentParser(description="UART bridge using pyserial.")
parser.add_argument("--port", required=True, help="COM port name, e.g. COM3")
parser.add_argument("--baud", type=int, required=True, help="Baud rate")
args = parser.parse_args()
serial = _load_serial()
try:
ser = serial.Serial(args.port, args.baud, timeout=0.1, write_timeout=1)
except Exception as exc:
sys.stderr.write(f"failed to open {args.port} @ {args.baud}: {exc}\n")
sys.stderr.flush()
return 1
sys.stdout.write(f"[UART] OPEN {args.port} @ {args.baud}\n")
sys.stdout.flush()
stop_event = threading.Event()
reader = threading.Thread(target=_reader_loop, args=(ser, stop_event), daemon=True)
reader.start()
try:
for line in sys.stdin:
command = line.rstrip("\n")
if command == "__exit__":
break
if not command:
continue
try:
ser.write(command.encode("utf-8") + b"\r\n")
ser.flush()
except Exception as exc:
sys.stderr.write(f"serial write error: {exc}\n")
sys.stderr.flush()
break
finally:
stop_event.set()
try:
ser.close()
except Exception:
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())