2026-02-06 09:52:23 +08:00

175 lines
5.7 KiB
Python

from __future__ import annotations
import queue
import threading
import time
from dataclasses import dataclass
from typing import Optional, Tuple
import serial
from PySide6.QtCore import QObject, QThread, Signal, Slot
from ameba_control_panel import config
from ameba_control_panel.services.line_parser import decode_line
@dataclass
class SerialState:
port: str
baudrate: int
connected: bool
error: Optional[str] = None
class _SerialWorker(QThread):
line_received = Signal(str, str) # text, direction
status_changed = Signal(object) # SerialState
def __init__(self, port: str, baudrate: int, parent: Optional[QObject] = None) -> None:
super().__init__(parent)
self._port = port
self._baudrate = baudrate
self._write_queue: "queue.SimpleQueue[Tuple[bytes, bool]]" = queue.SimpleQueue()
self._running = threading.Event()
self._serial: Optional[serial.Serial] = None
def run(self) -> None:
try:
self._serial = serial.Serial(self._port, self._baudrate, timeout=0.05)
self.status_changed.emit(SerialState(self._port, self._baudrate, True))
except Exception as exc: # noqa: BLE001
self.status_changed.emit(SerialState(self._port, self._baudrate, False, str(exc)))
return
self._running.set()
try:
while self._running.is_set():
# writes
try:
while True:
payload, log_tx = self._write_queue.get_nowait()
self._serial.write(payload)
if log_tx:
try:
text = payload.decode(errors="ignore").rstrip("\r\n")
except Exception:
text = repr(payload)
self.line_received.emit(text, "tx")
# fallthrough only when queue empty
except queue.Empty:
pass
# reads
line = self._serial.readline()
if line:
try:
text = decode_line(line).strip("\r\n")
except Exception:
text = repr(line)
self.line_received.emit(text, "rx")
finally:
if self._serial:
try:
self._serial.close()
except Exception:
pass
self.status_changed.emit(SerialState(self._port, self._baudrate, False))
@Slot(str)
def write_text(self, text: str) -> None:
if not text.endswith("\r\n"):
text = text + "\r\n"
self._write_queue.put((text.encode("utf-8", errors="ignore"), True))
@Slot(bytes)
def write_bytes(self, payload: bytes) -> None:
self._write_queue.put((payload, False))
@Slot()
def stop(self) -> None:
self._running.clear()
class _SyntheticWorker(QThread):
line_received = Signal(str, str)
status_changed = Signal(object)
def __init__(self, rate_hz: float = 50.0, parent: Optional[QObject] = None) -> None:
super().__init__(parent)
self._rate_hz = rate_hz
self._running = threading.Event()
self._counter = 0
def run(self) -> None:
self._running.set()
self.status_changed.emit(SerialState("synthetic", config.DEFAULT_BAUD, True))
try:
while self._running.is_set():
self._counter += 1
self.line_received.emit(f"SYN {self._counter:06d}", "rx")
time.sleep(1.0 / self._rate_hz)
finally:
self.status_changed.emit(SerialState("synthetic", config.DEFAULT_BAUD, False))
@Slot(str)
def write_text(self, text: str) -> None:
# Echo back as RX to simulate loopback.
clean = text.rstrip("\r\n")
self.line_received.emit(clean, "tx")
self.line_received.emit(f"ECHO: {clean}", "rx")
@Slot(bytes)
def write_bytes(self, payload: bytes) -> None:
try:
clean = payload.decode(errors="ignore")
except Exception:
clean = repr(payload)
self.line_received.emit(clean, "tx")
self.line_received.emit(f"ECHO: {clean}", "rx")
@Slot()
def stop(self) -> None:
self._running.clear()
class SerialService(QObject):
line_received = Signal(str, str)
status_changed = Signal(object)
def __init__(self, parent: Optional[QObject] = None) -> None:
super().__init__(parent)
self._worker: Optional[QThread] = None
def open(self, port: str, baudrate: int) -> None:
self.close()
if port.lower() == "synthetic":
worker: QThread = _SyntheticWorker()
else:
worker = _SerialWorker(port, baudrate)
worker.line_received.connect(self.line_received)
worker.status_changed.connect(self.status_changed)
self._worker = worker
worker.start()
def close(self) -> None:
if self._worker:
try:
self._worker.stop() # type: ignore[attr-defined]
self._worker.wait(1000)
except Exception:
pass
self._worker = None
def write(self, text: str) -> None:
if self._worker:
self._worker.write_text(text) # type: ignore[attr-defined]
def write_raw(self, data: bytes | str) -> None:
if isinstance(data, str):
data = data.encode("utf-8", errors="ignore")
if self._worker:
self._worker.write_bytes(data) # type: ignore[attr-defined]
def is_connected(self) -> bool:
return bool(self._worker and self._worker.isRunning())