2026-03-29 16:49:10 +07:00

256 lines
10 KiB
Python

from __future__ import annotations
import logging
import sys
from pathlib import Path
from typing import Callable, List, Optional, Tuple, TYPE_CHECKING
from PySide6.QtCore import QCoreApplication, QTimer
from PySide6.QtWidgets import QFileDialog, QLineEdit, QMessageBox
from ameba_control_panel import config
from ameba_control_panel.config import Direction, Mode
from ameba_control_panel.services.flash_runner import FlashRunner
if TYPE_CHECKING:
from ameba_control_panel.services.serial_service import SerialService
from ameba_control_panel.views.device_tab_view import DeviceTabView
logger = logging.getLogger(__name__)
FLASH_IMAGE_SLOTS = [
("Bootloader", "boot_flash_checkbox", "boot_path_edit", "boot_start_addr_edit", "boot_end_addr_edit"),
("Application", "app_flash_checkbox", "app_path_edit", "app_start_addr_edit", "app_end_addr_edit"),
("NN Model", "nn_flash_checkbox", "nn_bin_path_edit", "nn_start_addr_edit", "nn_end_addr_edit"),
]
class FlashManager:
"""Handles flash, mode switching, and DTR/RTS GPIO control."""
def __init__(
self,
view: DeviceTabView,
serial: SerialService,
alive: Callable[[], bool],
enqueue: Callable[[str, Direction], None],
save_session: Callable[[], None],
) -> None:
self.view = view
self.serial = serial
self._alive = alive
self._enqueue = enqueue
self._save_session = save_session
self._flash_runner: Optional[FlashRunner] = None
self._connected_port: Optional[str] = None
self._connected_baud: Optional[int] = None
self._dtr_busy = False
@property
def is_flashing(self) -> bool:
return bool(self._flash_runner and self._flash_runner.isRunning())
def set_connected(self, port: str, baud: int) -> None:
self._connected_port = port
self._connected_baud = baud
def browse_file(self, title: str, target: QLineEdit, file_filter: str = "Binary files (*);;All files (*)") -> None:
dlg = QFileDialog(self.view, title, "", file_filter)
dlg.setFileMode(QFileDialog.FileMode.ExistingFile)
if sys.platform == "linux":
dlg.setOption(QFileDialog.Option.DontUseNativeDialog, True)
if dlg.exec() == QFileDialog.DialogCode.Accepted and dlg.selectedFiles():
target.setText(dlg.selectedFiles()[0])
self._save_session()
@staticmethod
def normalized_addr(value: str) -> Optional[str]:
text = value.strip()
if not text:
return None
try:
parsed = int(text, 0)
except ValueError:
return None
return hex(parsed) if parsed >= 0 else None
def build_flash_image(self, label: str, filepath: str, start_addr: str, end_addr: str) -> Optional[Tuple[str, str, str]]:
binary_text = filepath.strip()
if not binary_text:
QMessageBox.warning(self.view, "Flash", f"{label}: file path is empty.")
return None
binary = Path(binary_text)
if not binary.exists():
QMessageBox.warning(self.view, "Flash", f"{label}: file not found.")
return None
start = self.normalized_addr(start_addr)
end = self.normalized_addr(end_addr)
if not (start and end):
QMessageBox.warning(self.view, "Flash", f"{label}: provide valid start/end addresses.")
return None
if int(start, 16) >= int(end, 16):
QMessageBox.warning(self.view, "Flash", f"{label}: start must be less than end.")
return None
return str(binary), start, end
def use_dtr_rts(self) -> bool:
return not self.view.control_port_combo.currentData()
def run_flash(self) -> None:
dut = self.view.dut_port_combo.currentData()
ctrl = self.view.control_port_combo.currentData()
baud = config.parse_baud(self.view.dut_baud_combo.currentText())
if not dut:
QMessageBox.warning(self.view, "Flash", "Please select a DUT COM port.")
return
images: List[Tuple[str, str, str]] = []
for label, cb_attr, path_attr, start_attr, end_attr in FLASH_IMAGE_SLOTS:
if not getattr(self.view, cb_attr).isChecked():
continue
image = self.build_flash_image(
label, getattr(self.view, path_attr).text(),
getattr(self.view, start_attr).text(), getattr(self.view, end_attr).text(),
)
if image is None:
return
images.append(image)
# Dynamic custom binaries from Advanced section
for entry in self.view.get_custom_bins():
if not entry["checkbox"].isChecked():
continue
image = self.build_flash_image(
"Custom", entry["path_edit"].text(),
entry["start_edit"].text(), entry["end_edit"].text(),
)
if image is None:
return
images.append(image)
if not images:
QMessageBox.warning(self.view, "Flash", "Select at least one image checkbox to flash.")
return
args: List[str] = []
for path, start, end in images:
args.extend(["--multi-bin", path, start, end])
args.extend(["-t", dut, "-B", str(baud)])
rdev = self.view.rdev_path_edit.text().strip()
if rdev:
args.extend(["--profile", rdev])
if ctrl:
args.extend(["-p", ctrl])
else:
args.append("--dtr-rts")
self._save_session()
self._invoke_flash(args, close_uart=True, auto_normal=True)
def run_mode(self, mode: Mode) -> None:
if not self._alive():
return
dut = self.view.dut_port_combo.currentData()
ctrl = self.view.control_port_combo.currentData()
baud = config.parse_baud(self.view.dut_baud_combo.currentText())
if not dut:
QMessageBox.warning(self.view, "Mode", "Select a DUT COM port.")
return
if self.use_dtr_rts() and self.serial.is_connected():
self._run_mode_dtr_rts(mode)
return
if not ctrl:
QMessageBox.warning(self.view, "Mode", "No Control COM and DUT not connected for DTR/RTS.")
return
mode_args = {
Mode.DOWNLOAD: ["--download-mode", "1"],
Mode.NORMAL: ["--download-mode", "0"],
Mode.RESET: ["--reset"],
}
args = mode_args[mode] + ["-t", dut, "-p", ctrl, "-B", str(baud)]
self._invoke_flash(args, close_uart=False, auto_normal=False)
def _run_mode_dtr_rts(self, mode: Mode) -> None:
if self._dtr_busy:
return
self._dtr_busy = True
if mode == Mode.DOWNLOAD:
self.serial.set_dtr(True)
self.serial.set_rts(True)
QTimer.singleShot(100, self._dtr_step2_download)
elif mode == Mode.NORMAL:
self.serial.set_dtr(False)
self.serial.set_rts(True)
QTimer.singleShot(200, self._dtr_pulse_reset)
elif mode == Mode.RESET:
self.serial.set_rts(True)
QTimer.singleShot(50, self._dtr_pulse_reset)
def _dtr_step2_download(self) -> None:
if not self._alive():
return
self.serial.set_rts(False)
QTimer.singleShot(100, self._dtr_finish)
self._enqueue("Download mode (DTR/RTS)", Direction.INFO)
def _dtr_pulse_reset(self) -> None:
if not self._alive():
return
self.serial.set_rts(False)
QTimer.singleShot(100, self._dtr_finish)
self._enqueue("Reset (DTR/RTS)", Direction.INFO)
def _dtr_finish(self) -> None:
if self._alive():
self.serial.set_rts(True)
self._dtr_busy = False
def _invoke_flash(self, args: List[str], close_uart: bool, auto_normal: bool) -> None:
if self.is_flashing:
QMessageBox.information(self.view, "Flash", "Flash helper already running.")
return
was_connected = self.serial.is_connected()
if close_uart and was_connected:
self.serial.close()
flash_script = self._resolve_flash_script()
if not flash_script.exists():
QMessageBox.critical(self.view, "Flash", f"flash_amebapro3.py not found at {flash_script}")
return
self._flash_runner = FlashRunner(args, flash_script)
self._flash_runner.output.connect(lambda line: self._enqueue(line, Direction.INFO))
self._flash_runner.finished.connect(
lambda code: self._on_flash_finished(code, close_uart, auto_normal, was_connected)
)
self._flash_runner.start()
logger.info("Flash started: %s", " ".join(args))
self._enqueue(f"Running flash helper with args: {' '.join(args)}", Direction.INFO)
@staticmethod
def _resolve_flash_script() -> Path:
candidates = [
Path(QCoreApplication.applicationDirPath()) / "Flash" / "flash_amebapro3.py",
Path(QCoreApplication.applicationDirPath()) / "flash_amebapro3.py",
Path(__file__).resolve().parents[2] / "Flash" / "flash_amebapro3.py",
]
for path in candidates:
if path.exists():
return path.resolve()
return candidates[0].resolve()
def _on_flash_finished(self, code: int, close_uart: bool, auto_normal: bool, was_connected: bool) -> None:
if not self._alive():
return
logger.info("Flash completed with code %d", code)
self._enqueue(f"Flash helper completed with code {code}", Direction.INFO)
if self._flash_runner:
self._flash_runner.wait(100)
self._flash_runner = None
if close_uart and was_connected and self._connected_port and self._connected_baud:
QTimer.singleShot(200, self._reconnect_after_flash)
if auto_normal:
QTimer.singleShot(500, lambda: self._alive() and self.run_mode(Mode.NORMAL))
def _reconnect_after_flash(self) -> None:
if self._alive() and self._connected_port and self._connected_baud:
self.serial.open(self._connected_port, self._connected_baud)
def shutdown(self) -> None:
if self._flash_runner and self._flash_runner.isRunning():
self._flash_runner.requestInterruption()
self._flash_runner.wait(5000)