Major refactor of Ameba Control Panel v3.1.0: - Three-column layout: icon sidebar, config+history, log view - Dracula PRO theme with light/dark toggle - DTR/RTS GPIO control (replaces ASCII commands) - Multi-CDC firmware support for AmebaSmart control device - Dynamic DUT tabs with +/- management - NN Model flash image support - Settings dialog (Font, Serial, Flash, Command tabs) - Background port scanning, debounced session store - Adaptive log flush rate, format cache optimization - Smooth sidebar animation, deferred startup - pytest test framework with session/log/settings tests - Thread safety fixes: _alive guards, parented timers, safe baud parsing - Find highlight: needle-only highlighting with focused match color - Partial line buffering for table output - PyInstaller packaging with version stamp and module exclusions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
252 lines
10 KiB
Python
252 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Callable, List, Optional, Tuple, TYPE_CHECKING
|
|
|
|
from PySide6.QtCore import QCoreApplication, QTimer
|
|
from PySide6.QtWidgets import QFileDialog, QLineEdit, QMessageBox
|
|
|
|
from ameba_control_panel import config
|
|
from ameba_control_panel.config import Direction, Mode
|
|
from ameba_control_panel.services.flash_runner import FlashRunner
|
|
|
|
if TYPE_CHECKING:
|
|
from ameba_control_panel.services.serial_service import SerialService
|
|
from ameba_control_panel.views.device_tab_view import DeviceTabView
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FLASH_IMAGE_SLOTS = [
|
|
("Bootloader", "boot_flash_checkbox", "boot_path_edit", "boot_start_addr_edit", "boot_end_addr_edit"),
|
|
("Application", "app_flash_checkbox", "app_path_edit", "app_start_addr_edit", "app_end_addr_edit"),
|
|
("NN Model", "nn_flash_checkbox", "nn_bin_path_edit", "nn_start_addr_edit", "nn_end_addr_edit"),
|
|
]
|
|
|
|
|
|
class FlashManager:
|
|
"""Handles flash, mode switching, and DTR/RTS GPIO control."""
|
|
|
|
def __init__(
|
|
self,
|
|
view: DeviceTabView,
|
|
serial: SerialService,
|
|
alive: Callable[[], bool],
|
|
enqueue: Callable[[str, Direction], None],
|
|
save_session: Callable[[], None],
|
|
) -> None:
|
|
self.view = view
|
|
self.serial = serial
|
|
self._alive = alive
|
|
self._enqueue = enqueue
|
|
self._save_session = save_session
|
|
self._flash_runner: Optional[FlashRunner] = None
|
|
self._connected_port: Optional[str] = None
|
|
self._connected_baud: Optional[int] = None
|
|
self._dtr_busy = False
|
|
|
|
@property
|
|
def is_flashing(self) -> bool:
|
|
return bool(self._flash_runner and self._flash_runner.isRunning())
|
|
|
|
def set_connected(self, port: str, baud: int) -> None:
|
|
self._connected_port = port
|
|
self._connected_baud = baud
|
|
|
|
def browse_file(self, title: str, target: QLineEdit, file_filter: str = "Binary files (*);;All files (*)") -> None:
|
|
path, _ = QFileDialog.getOpenFileName(self.view, title, "", file_filter)
|
|
if path:
|
|
target.setText(path)
|
|
self._save_session()
|
|
|
|
@staticmethod
|
|
def normalized_addr(value: str) -> Optional[str]:
|
|
text = value.strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
parsed = int(text, 0)
|
|
except ValueError:
|
|
return None
|
|
return hex(parsed) if parsed >= 0 else None
|
|
|
|
def build_flash_image(self, label: str, filepath: str, start_addr: str, end_addr: str) -> Optional[Tuple[str, str, str]]:
|
|
binary_text = filepath.strip()
|
|
if not binary_text:
|
|
QMessageBox.warning(self.view, "Flash", f"{label}: file path is empty.")
|
|
return None
|
|
binary = Path(binary_text)
|
|
if not binary.exists():
|
|
QMessageBox.warning(self.view, "Flash", f"{label}: file not found.")
|
|
return None
|
|
start = self.normalized_addr(start_addr)
|
|
end = self.normalized_addr(end_addr)
|
|
if not (start and end):
|
|
QMessageBox.warning(self.view, "Flash", f"{label}: provide valid start/end addresses.")
|
|
return None
|
|
if int(start, 16) >= int(end, 16):
|
|
QMessageBox.warning(self.view, "Flash", f"{label}: start must be less than end.")
|
|
return None
|
|
return str(binary), start, end
|
|
|
|
def use_dtr_rts(self) -> bool:
|
|
return not self.view.control_port_combo.currentData()
|
|
|
|
def run_flash(self) -> None:
|
|
dut = self.view.dut_port_combo.currentData()
|
|
ctrl = self.view.control_port_combo.currentData()
|
|
baud = config.parse_baud(self.view.dut_baud_combo.currentText())
|
|
if not dut:
|
|
QMessageBox.warning(self.view, "Flash", "Please select a DUT COM port.")
|
|
return
|
|
images: List[Tuple[str, str, str]] = []
|
|
for label, cb_attr, path_attr, start_attr, end_attr in FLASH_IMAGE_SLOTS:
|
|
if not getattr(self.view, cb_attr).isChecked():
|
|
continue
|
|
image = self.build_flash_image(
|
|
label, getattr(self.view, path_attr).text(),
|
|
getattr(self.view, start_attr).text(), getattr(self.view, end_attr).text(),
|
|
)
|
|
if image is None:
|
|
return
|
|
images.append(image)
|
|
# Dynamic custom binaries from Advanced section
|
|
for entry in self.view.get_custom_bins():
|
|
if not entry["checkbox"].isChecked():
|
|
continue
|
|
image = self.build_flash_image(
|
|
"Custom", entry["path_edit"].text(),
|
|
entry["start_edit"].text(), entry["end_edit"].text(),
|
|
)
|
|
if image is None:
|
|
return
|
|
images.append(image)
|
|
if not images:
|
|
QMessageBox.warning(self.view, "Flash", "Select at least one image checkbox to flash.")
|
|
return
|
|
args: List[str] = []
|
|
for path, start, end in images:
|
|
args.extend(["--multi-bin", path, start, end])
|
|
args.extend(["-t", dut, "-B", str(baud)])
|
|
rdev = self.view.rdev_path_edit.text().strip()
|
|
if rdev:
|
|
args.extend(["--profile", rdev])
|
|
if ctrl:
|
|
args.extend(["-p", ctrl])
|
|
else:
|
|
args.append("--dtr-rts")
|
|
self._save_session()
|
|
self._invoke_flash(args, close_uart=True, auto_normal=True)
|
|
|
|
def run_mode(self, mode: Mode) -> None:
|
|
if not self._alive():
|
|
return
|
|
dut = self.view.dut_port_combo.currentData()
|
|
ctrl = self.view.control_port_combo.currentData()
|
|
baud = config.parse_baud(self.view.dut_baud_combo.currentText())
|
|
if not dut:
|
|
QMessageBox.warning(self.view, "Mode", "Select a DUT COM port.")
|
|
return
|
|
if self.use_dtr_rts() and self.serial.is_connected():
|
|
self._run_mode_dtr_rts(mode)
|
|
return
|
|
if not ctrl:
|
|
QMessageBox.warning(self.view, "Mode", "No Control COM and DUT not connected for DTR/RTS.")
|
|
return
|
|
mode_args = {
|
|
Mode.DOWNLOAD: ["--download-mode", "1"],
|
|
Mode.NORMAL: ["--download-mode", "0"],
|
|
Mode.RESET: ["--reset"],
|
|
}
|
|
args = mode_args[mode] + ["-t", dut, "-p", ctrl, "-B", str(baud)]
|
|
self._invoke_flash(args, close_uart=False, auto_normal=False)
|
|
|
|
def _run_mode_dtr_rts(self, mode: Mode) -> None:
|
|
if self._dtr_busy:
|
|
return
|
|
self._dtr_busy = True
|
|
if mode == Mode.DOWNLOAD:
|
|
self.serial.set_dtr(True)
|
|
self.serial.set_rts(True)
|
|
QTimer.singleShot(100, self._dtr_step2_download)
|
|
elif mode == Mode.NORMAL:
|
|
self.serial.set_dtr(False)
|
|
self.serial.set_rts(True)
|
|
QTimer.singleShot(200, self._dtr_pulse_reset)
|
|
elif mode == Mode.RESET:
|
|
self.serial.set_rts(True)
|
|
QTimer.singleShot(50, self._dtr_pulse_reset)
|
|
|
|
def _dtr_step2_download(self) -> None:
|
|
if not self._alive():
|
|
return
|
|
self.serial.set_rts(False)
|
|
QTimer.singleShot(100, self._dtr_finish)
|
|
self._enqueue("Download mode (DTR/RTS)", Direction.INFO)
|
|
|
|
def _dtr_pulse_reset(self) -> None:
|
|
if not self._alive():
|
|
return
|
|
self.serial.set_rts(False)
|
|
QTimer.singleShot(100, self._dtr_finish)
|
|
self._enqueue("Reset (DTR/RTS)", Direction.INFO)
|
|
|
|
def _dtr_finish(self) -> None:
|
|
if self._alive():
|
|
self.serial.set_rts(True)
|
|
self._dtr_busy = False
|
|
|
|
def _invoke_flash(self, args: List[str], close_uart: bool, auto_normal: bool) -> None:
|
|
if self.is_flashing:
|
|
QMessageBox.information(self.view, "Flash", "Flash helper already running.")
|
|
return
|
|
was_connected = self.serial.is_connected()
|
|
if close_uart and was_connected:
|
|
self.serial.close()
|
|
flash_script = self._resolve_flash_script()
|
|
if not flash_script.exists():
|
|
QMessageBox.critical(self.view, "Flash", f"flash_amebapro3.py not found at {flash_script}")
|
|
return
|
|
self._flash_runner = FlashRunner(args, flash_script)
|
|
self._flash_runner.output.connect(lambda line: self._enqueue(line, Direction.INFO))
|
|
self._flash_runner.finished.connect(
|
|
lambda code: self._on_flash_finished(code, close_uart, auto_normal, was_connected)
|
|
)
|
|
self._flash_runner.start()
|
|
logger.info("Flash started: %s", " ".join(args))
|
|
self._enqueue(f"Running flash helper with args: {' '.join(args)}", Direction.INFO)
|
|
|
|
@staticmethod
|
|
def _resolve_flash_script() -> Path:
|
|
candidates = [
|
|
Path(QCoreApplication.applicationDirPath()) / "Flash" / "flash_amebapro3.py",
|
|
Path(QCoreApplication.applicationDirPath()) / "flash_amebapro3.py",
|
|
Path(__file__).resolve().parents[2] / "Flash" / "flash_amebapro3.py",
|
|
]
|
|
for path in candidates:
|
|
if path.exists():
|
|
return path.resolve()
|
|
return candidates[0].resolve()
|
|
|
|
def _on_flash_finished(self, code: int, close_uart: bool, auto_normal: bool, was_connected: bool) -> None:
|
|
if not self._alive():
|
|
return
|
|
logger.info("Flash completed with code %d", code)
|
|
self._enqueue(f"Flash helper completed with code {code}", Direction.INFO)
|
|
if self._flash_runner:
|
|
self._flash_runner.wait(100)
|
|
self._flash_runner = None
|
|
if close_uart and was_connected and self._connected_port and self._connected_baud:
|
|
QTimer.singleShot(200, self._reconnect_after_flash)
|
|
if auto_normal:
|
|
QTimer.singleShot(500, lambda: self._alive() and self.run_mode(Mode.NORMAL))
|
|
|
|
def _reconnect_after_flash(self) -> None:
|
|
if self._alive() and self._connected_port and self._connected_baud:
|
|
self.serial.open(self._connected_port, self._connected_baud)
|
|
|
|
def shutdown(self) -> None:
|
|
if self._flash_runner and self._flash_runner.isRunning():
|
|
self._flash_runner.requestInterruption()
|
|
self._flash_runner.wait(5000)
|