import argparse import sys import threading import time def _load_serial(): try: import serial # type: ignore except ImportError: sys.stderr.write("pyserial is not installed. Run: pip install pyserial\n") sys.stderr.flush() sys.exit(2) return serial def _reader_loop(ser, stop_event): buffer = bytearray() while not stop_event.is_set(): try: data = ser.read(ser.in_waiting or 1) except Exception as exc: sys.stderr.write(f"serial read error: {exc}\n") sys.stderr.flush() break if data: buffer.extend(data) while b"\n" in buffer: line, _, rest = buffer.partition(b"\n") buffer = bytearray(rest) text = line.decode("utf-8", errors="replace").rstrip("\r") sys.stdout.write(text + "\n") sys.stdout.flush() else: time.sleep(0.01) if buffer: text = buffer.decode("utf-8", errors="replace").rstrip("\r") sys.stdout.write(text + "\n") sys.stdout.flush() def main(): parser = argparse.ArgumentParser(description="UART bridge using pyserial.") parser.add_argument("--port", required=True, help="COM port name, e.g. COM3") parser.add_argument("--baud", type=int, required=True, help="Baud rate") args = parser.parse_args() serial = _load_serial() try: ser = serial.Serial(args.port, args.baud, timeout=0.1, write_timeout=1) except Exception as exc: sys.stderr.write(f"failed to open {args.port} @ {args.baud}: {exc}\n") sys.stderr.flush() return 1 sys.stdout.write(f"[UART] OPEN {args.port} @ {args.baud}\n") sys.stdout.flush() stop_event = threading.Event() reader = threading.Thread(target=_reader_loop, args=(ser, stop_event), daemon=True) reader.start() try: for line in sys.stdin: command = line.rstrip("\n") if command == "__exit__": break if not command: continue try: ser.write(command.encode("utf-8") + b"\r\n") ser.flush() except Exception as exc: sys.stderr.write(f"serial write error: {exc}\n") sys.stderr.flush() break finally: stop_event.set() try: ser.close() except Exception: pass return 0 if __name__ == "__main__": raise SystemExit(main())