"""Dual UART + J-Link debugger control panel for AmebaPro3 boards.""" from __future__ import annotations import datetime import os import re import shutil import subprocess import sys import tempfile import threading import time import tkinter as tk from collections import deque from tkinter import filedialog, messagebox, ttk from typing import Callable, Iterable, List, Optional import serial # type: ignore import serial.tools.list_ports # type: ignore try: from version_info import display_version as _DISPLAY_VERSION_VALUE, version as _CANONICAL_VERSION_VALUE except Exception: # pragma: no cover - optional metadata _DISPLAY_VERSION_VALUE = "" _CANONICAL_VERSION_VALUE = "" if getattr(sys, "frozen", False): APP_ROOT = os.path.dirname(sys.executable) else: APP_ROOT = os.path.dirname(os.path.abspath(__file__)) PYLINK_DLL_PATH = os.path.join(APP_ROOT, "pylink", "JLinkARM.dll") if os.path.exists(PYLINK_DLL_PATH): os.environ.setdefault("JLINKARM_DLL_PATH", PYLINK_DLL_PATH) try: import pylink # type: ignore except Exception: # pragma: no cover - pylink is optional pylink = None FLASH_PROFILE = "devices/Profiles/AmebaPro3_FreeRTOS_NOR.rdev" FLASH_IMAGE_DIR = "fw" FLASH_BAUDRATE = "1500000" FLASH_MEMORY_TYPE = "nor" FIRMWARE_COPY_MAP = [ ("fw_en.bin", "amebapro3_app.bin"), ("np_boot.bin", "amebapro3_boot.bin"), ] PORT_REFRESH_MS = 1000 LOG_HISTORY_LIMIT = 2000 HISTORY_LIMIT = 200 JLINK_SCRIPT_EXTS = {".jlink", ".jscr", ".jsf", ".txt"} GDB_SCRIPT_EXTS = {".gdb", ".gdbinit"} def _folder_version_guess() -> str: base_name = os.path.basename(APP_ROOT) match = re.search(r"_v([0-9A-Za-z._-]+)$", base_name) return match.group(1) if match else "" def _format_version_candidate(tag: str, trim_trailing_zero: bool = False) -> str: text = str(tag or "").strip() if not text: return "" if trim_trailing_zero: parts = [segment for segment in text.replace("_", ".").split(".") if segment] if not parts: return "" while len(parts) > 1 and parts[-1] == "0": parts.pop() return ".".join(parts) return text def _resolve_app_version() -> str: for candidate, trim in ( (_DISPLAY_VERSION_VALUE, False), (_CANONICAL_VERSION_VALUE, True), (_folder_version_guess(), False), ): formatted = _format_version_candidate(candidate, trim_trailing_zero=trim) if formatted: return formatted return "unknown" APP_VERSION = _resolve_app_version() def timestamp() -> str: now = datetime.datetime.now() return now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] def safe_int(value) -> Optional[int]: try: return int(value) except Exception: return None class DevicePanel: """Single UART console with optional macro buttons and flashing control.""" def __init__( self, parent: tk.Widget, title: str, mode: str, default_baud: str, line_ending: str, console_height: int = 15, flash_handler: Optional[Callable[[], None]] = None, flash_button_label: str = "Flash Firmware", on_connect: Optional[Callable[[str], None]] = None, on_disconnect: Optional[Callable[[str], None]] = None, exclude_ports_provider: Optional[Callable[[], Iterable[str]]] = None, before_button_callback: Optional[Callable[[], None]] = None, ) -> None: self.mode = mode self.line_ending = line_ending or "" self.flash_handler = flash_handler self.flash_button_label = flash_button_label self.on_connect = on_connect self.on_disconnect = on_disconnect self.exclude_ports_provider = exclude_ports_provider self.before_button_callback = before_button_callback self.serial_port: Optional[serial.Serial] = None self.connected_port: Optional[str] = None self.read_thread: Optional[threading.Thread] = None self.read_thread_running = False self.msg_queue: deque[str] = deque() self.msg_lock = threading.Lock() self.log_line_count = 0 self.port_map: dict[str, str] = {} self.raw_history: List[str] = [] self.raw_history_index = 0 self.arduino_history: List[str] = [] self.arduino_history_index = 0 self.frame = ttk.LabelFrame(parent, text=title) self._build_widgets(console_height, default_baud) self.enable_controls(False) self.refresh_ports() self._schedule_port_refresh() # ------------------------------------------------------------------ # UI construction # ------------------------------------------------------------------ def _build_widgets(self, console_height: int, default_baud: str) -> None: conn_frame = ttk.Frame(self.frame) conn_frame.pack(fill="x", padx=5, pady=5) conn_frame.columnconfigure(1, weight=1) ttk.Label(conn_frame, text="Port:").grid(row=0, column=0, sticky="w") self.port_var = tk.StringVar() self.port_combo = ttk.Combobox(conn_frame, textvariable=self.port_var, state="readonly", width=80) self.port_combo.grid(row=0, column=1, columnspan=2, sticky="ew", padx=3) ttk.Label(conn_frame, text="Baud:").grid(row=1, column=0, sticky="w", pady=(8, 0)) self.baud_var = tk.StringVar(value=str(default_baud)) ttk.Entry(conn_frame, textvariable=self.baud_var, width=12).grid(row=1, column=1, sticky="w", padx=3) self.connect_button = ttk.Button(conn_frame, text="Connect", width=12, command=self.connect_serial) self.connect_button.grid(row=0, column=3, rowspan=2, padx=(8, 0)) self.status_var = tk.StringVar(value="Disconnected") self.status_label = ttk.Label(conn_frame, textvariable=self.status_var, foreground="red") self.status_label.grid(row=2, column=0, columnspan=4, sticky="w", pady=(6, 0)) console_frame = ttk.Frame(self.frame) console_frame.pack(fill="both", expand=True, padx=5, pady=5) console_frame.columnconfigure(0, weight=1) console_frame.rowconfigure(0, weight=1) self.log_text = tk.Text(console_frame, height=console_height, wrap="none") self.log_text.grid(row=0, column=0, sticky="nsew") self.log_text.configure(state="disabled") vscroll = ttk.Scrollbar(console_frame, orient="vertical", command=self.log_text.yview) vscroll.grid(row=0, column=1, sticky="ns") self.log_text.configure(yscrollcommand=vscroll.set) hscroll = ttk.Scrollbar(console_frame, orient="horizontal", command=self.log_text.xview) hscroll.grid(row=1, column=0, sticky="ew") self.log_text.configure(xscrollcommand=hscroll.set) button_row = ttk.Frame(console_frame) button_row.grid(row=2, column=0, columnspan=2, sticky="ew", pady=(5, 0)) button_row.columnconfigure(0, weight=1) clear_row = ttk.Frame(button_row) clear_row.grid(row=0, column=0, sticky="w") ttk.Button(clear_row, text="Clear Console", width=14, command=self.clear_console).grid(row=0, column=0, padx=(0, 5)) ttk.Button(clear_row, text="Save Log", width=14, command=self.save_console).grid(row=0, column=1) if self.flash_handler: self.flash_button = ttk.Button(button_row, text=self.flash_button_label, width=18, command=self.flash_handler) self.flash_button.grid(row=0, column=1, sticky="e") else: self.flash_button = None if self.mode == "raw": self._build_raw_controls(console_frame) else: self._build_arduino_controls(console_frame) def _build_raw_controls(self, console_frame: tk.Widget) -> None: history_frame = ttk.Frame(console_frame) history_frame.grid(row=3, column=0, columnspan=2, sticky="nsew", pady=(5, 0)) history_frame.rowconfigure(1, weight=1) history_frame.columnconfigure(0, weight=1) ttk.Label(history_frame, text="Input history:").grid(row=0, column=0, sticky="w") self.history_listbox = tk.Listbox(history_frame, height=8) self.history_listbox.grid(row=1, column=0, sticky="nsew") self.history_listbox.bind("", self.history_select) history_scroll = ttk.Scrollbar(history_frame, orient="vertical", command=self.history_listbox.yview) history_scroll.grid(row=1, column=1, sticky="ns") self.history_listbox.configure(yscrollcommand=history_scroll.set) input_frame = ttk.Frame(console_frame) input_frame.grid(row=4, column=0, sticky="ew", pady=(5, 0)) input_frame.columnconfigure(0, weight=1) self.raw_entry_var = tk.StringVar() self.raw_entry = ttk.Entry(input_frame, textvariable=self.raw_entry_var) self.raw_entry.grid(row=0, column=0, sticky="ew", padx=(0, 5)) self.raw_entry.bind("", lambda _evt: self.send_input_line()) self.raw_entry.bind("", self._raw_history_prev) self.raw_entry.bind("", self._raw_history_next) self.raw_send_button = ttk.Button(input_frame, text="Send", command=self.send_input_line) self.raw_send_button.grid(row=0, column=1) self.arduino_entry = None self.arduino_entry_var = None self.arduino_send_button = None def _build_arduino_controls(self, console_frame: tk.Widget) -> None: command_frame = ttk.Frame(console_frame) command_frame.grid(row=3, column=0, columnspan=2, sticky="ew", pady=(5, 0)) command_frame.columnconfigure((0, 1, 2), weight=1) commands = [ ("Download Mode", ("pg 1", "reset")), ("Normal Mode", ("pg 0", "reset")), ("RESET", ("reset",)), ] for idx, (label, payload) in enumerate(commands): ttk.Button( command_frame, text=label, command=lambda seq=payload: self.send_macro(seq), ).grid(row=0, column=idx, padx=5, sticky="ew") input_frame = ttk.Frame(console_frame) input_frame.grid(row=4, column=0, sticky="ew", pady=(5, 0)) input_frame.columnconfigure(0, weight=1) self.arduino_entry_var = tk.StringVar() self.arduino_entry = ttk.Entry(input_frame, textvariable=self.arduino_entry_var) self.arduino_entry.grid(row=0, column=0, sticky="ew", padx=(0, 5)) self.arduino_entry.bind("", lambda _evt: self.send_arduino_input()) self.arduino_entry.bind("", self._arduino_history_prev) self.arduino_entry.bind("", self._arduino_history_next) self.arduino_send_button = ttk.Button(input_frame, text="Send", command=self.send_arduino_input) self.arduino_send_button.grid(row=0, column=1) self.history_listbox = None self.raw_entry = None self.raw_entry_var = None self.raw_send_button = None # ------------------------------------------------------------------ # Console helpers # ------------------------------------------------------------------ def enable_controls(self, enabled: bool) -> None: state = "normal" if enabled else "disabled" targets = [self.raw_entry, self.raw_send_button, self.arduino_entry, self.arduino_send_button] for widget in targets: if widget is not None: widget.configure(state=state) def clear_console(self) -> None: with self.msg_lock: self.msg_queue.clear() self.log_text.configure(state="normal") self.log_text.delete("1.0", "end") self.log_text.configure(state="disabled") self.log_line_count = 0 def save_console(self) -> None: try: content = self.log_text.get("1.0", "end-1c") except tk.TclError: content = "" if not content.strip(): messagebox.showinfo("Save Console Log", "There is no console output to save.") return path = filedialog.asksaveasfilename( title="Save Console Log", defaultextension=".txt", filetypes=(("Text Files", "*.txt"), ("All Files", "*.*")), ) if not path: return try: with open(path, "w", encoding="utf-8") as handle: handle.write(content) except Exception as exc: messagebox.showerror("Save Failed", f"Unable to save log: {exc}") def queue_message(self, text: str) -> None: if not text: return with self.msg_lock: self.msg_queue.append(str(text)) def poll_queue(self) -> None: with self.msg_lock: if not self.msg_queue: return entries = list(self.msg_queue) self.msg_queue.clear() self._log_lines(entries) def _log_lines(self, entries: Iterable[str]) -> None: prepared = [] for entry in entries: trimmed = str(entry).strip() if not trimmed: continue if trimmed.strip(".") == "": continue prepared.append(f"[{timestamp()}] {trimmed}\n") if not prepared: return self.log_line_count += len(prepared) block = "".join(prepared) self.log_text.configure(state="normal") self.log_text.insert("end", block) if LOG_HISTORY_LIMIT and self.log_line_count > LOG_HISTORY_LIMIT: excess = self.log_line_count - LOG_HISTORY_LIMIT self.log_text.delete("1.0", f"{excess + 1}.0") self.log_line_count = LOG_HISTORY_LIMIT self.log_text.see("end") self.log_text.configure(state="disabled") # ------------------------------------------------------------------ # Serial control # ------------------------------------------------------------------ def refresh_ports(self) -> None: selected = self._resolve_port(self.port_var.get()) excluded = set(self.exclude_ports_provider() or []) if self.exclude_ports_provider else set() mapping: dict[str, str] = {} display: List[str] = [] for port in serial.tools.list_ports.comports(): desc = (port.description or port.name or "").strip() label = port.device if not desc else f"{port.device}: {desc}" keep = self.connected_port == port.device if port.device in excluded and not keep: continue mapping[label] = port.device display.append(label) self.port_map = mapping self.port_combo["values"] = display if not display: self.port_var.set("") return if selected: for label, device in mapping.items(): if device == selected: self.port_var.set(label) return preferred = next((label for label, device in mapping.items() if device not in excluded), None) self.port_var.set(preferred or display[0]) def _schedule_port_refresh(self) -> None: try: self.frame.after(PORT_REFRESH_MS, self._auto_refresh_ports) except tk.TclError: pass def _auto_refresh_ports(self) -> None: if not self.frame.winfo_exists(): return self.refresh_ports() self._schedule_port_refresh() def connect_serial(self) -> None: if self.serial_port and self.serial_port.is_open: self.disconnect_serial() return port = self._resolve_port(self.port_var.get()) if not port: messagebox.showwarning("Select Port", "Please select a COM port") return try: baud = int((self.baud_var.get() or "0").strip()) except ValueError: messagebox.showwarning("Invalid Baud", "Enter a numeric baud rate") return try: self.serial_port = serial.Serial(port, baudrate=baud, timeout=0.1) except Exception as exc: messagebox.showerror("Serial Error", str(exc)) return self.connected_port = port self.status_var.set(f"Connected to {port} @ {baud}") self._set_status_color("green") self.connect_button.configure(text="Disconnect") self.enable_controls(True) if self.on_connect: try: self.on_connect(port) except Exception: pass self.read_thread_running = True self.read_thread = threading.Thread(target=self._read_loop, daemon=True) self.read_thread.start() def disconnect_serial(self) -> None: self.read_thread_running = False if self.serial_port: try: self.serial_port.close() except Exception: pass if self.read_thread and self.read_thread.is_alive(): self.read_thread.join(timeout=0.2) self.serial_port = None last_port = self.connected_port self.connected_port = None self.status_var.set("Disconnected") self._set_status_color("red") self.connect_button.configure(text="Connect") self.enable_controls(False) if last_port and self.on_disconnect: try: self.on_disconnect(last_port) except Exception: pass def _set_status_color(self, color: str) -> None: try: self.status_label.configure(foreground=color) except tk.TclError: pass def _resolve_port(self, value: str) -> str: if value in self.port_map: return self.port_map[value] base = (value or "").split(":", 1)[0].strip() return base def is_connected(self) -> bool: return bool(self.serial_port and self.serial_port.is_open) def get_selected_port(self) -> str: return self._resolve_port(self.port_var.get()) def send_macro(self, commands: Iterable[str]) -> None: if self.before_button_callback: try: self.before_button_callback() except Exception: pass for cmd in commands: self.send_uart(cmd) time.sleep(0.05) def send_input_line(self) -> None: if not self.raw_entry_var: return payload = self.raw_entry_var.get() if not payload: return self.send_uart(payload) self.raw_history.append(payload) self.raw_history_index = len(self.raw_history) if self.history_listbox: self.history_listbox.insert("end", payload) if self.history_listbox.size() > HISTORY_LIMIT: self.history_listbox.delete(0) self.raw_entry_var.set("") def send_arduino_input(self) -> None: if not self.arduino_entry_var: return payload = self.arduino_entry_var.get() if not payload: return self.send_uart(payload) self.arduino_history.append(payload) self.arduino_history_index = len(self.arduino_history) self.arduino_entry_var.set("") def history_select(self, _event) -> None: if not self.history_listbox: return sel = self.history_listbox.curselection() if not sel: return cmd = self.history_listbox.get(sel[0]) self.raw_entry_var.set(cmd) if self.raw_entry: try: self.raw_entry.icursor("end") except tk.TclError: pass self.send_uart(cmd) def _raw_history_prev(self, _event) -> str: self._navigate_history(self.raw_history, "raw_history_index", self.raw_entry_var, self.raw_entry, -1) return "break" def _raw_history_next(self, _event) -> str: self._navigate_history(self.raw_history, "raw_history_index", self.raw_entry_var, self.raw_entry, 1) return "break" def _arduino_history_prev(self, _event) -> str: self._navigate_history(self.arduino_history, "arduino_history_index", self.arduino_entry_var, self.arduino_entry, -1) return "break" def _arduino_history_next(self, _event) -> str: self._navigate_history(self.arduino_history, "arduino_history_index", self.arduino_entry_var, self.arduino_entry, 1) return "break" def _navigate_history(self, history: List[str], attr: str, var: Optional[tk.StringVar], widget, delta: int) -> None: if not history or var is None: return index = getattr(self, attr, len(history)) index = max(0, min(len(history), index + delta)) setattr(self, attr, index) var.set("" if index == len(history) else history[index]) if widget: try: widget.icursor("end") except tk.TclError: pass def send_uart(self, text: str) -> None: if not (self.serial_port and self.serial_port.is_open): return try: payload = text + self.line_ending self.serial_port.write(payload.encode("utf-8")) except Exception as exc: messagebox.showerror("UART Error", str(exc)) def _read_loop(self) -> None: buffer = bytearray() while self.read_thread_running: port = self.serial_port if not port or not port.is_open: time.sleep(0.05) continue try: waiting = port.in_waiting except Exception: waiting = 0 if waiting: try: chunk = port.read(waiting) except Exception: chunk = b"" if chunk: buffer.extend(chunk) while True: idx = buffer.find(b"\n") if idx == -1: break raw_line = buffer[:idx] del buffer[: idx + 1] raw_line = raw_line.rstrip(b"\r") if not raw_line: continue try: text = raw_line.decode("utf-8", errors="replace") except Exception: text = str(raw_line) with self.msg_lock: self.msg_queue.append(text) else: time.sleep(0.002) if buffer: try: text = buffer.rstrip(b"\r\n").decode("utf-8", errors="replace") except Exception: text = str(buffer) with self.msg_lock: self.msg_queue.append(text) class JLinkGDBWrapper: SIZE_MAP = {"b": 1, "h": 2, "w": 4, "g": 8} def __init__(self, link, session=None) -> None: self.link = link self.session = session def execute(self, command: str): if not command: raise ValueError("Empty command") stripped = command.strip() op = stripped.split(None, 1)[0].lower() if op.startswith("x"): return True, self._handle_mem_read(stripped) if op == "set": return True, self._handle_mem_write(stripped) if op in ("halt", "h"): return True, self._handle_halt() if op in ("go", "g", "resume", "run"): return True, self._handle_go() if op in ("reset", "r"): return True, self._handle_reset(False) if op in ("reset_halt", "rh"): return True, self._handle_reset(True) if op in ("b", "break"): return True, self._handle_break_set(stripped) if op in ("bc", "breakclear", "clearbreak", "deletebreak", "bd"): return True, self._handle_break_clear(stripped) if op in ("s", "si", "step", "stepi"): return True, self._handle_step_instruction() if op in ("next", "n", "ni"): return True, self._handle_step_over() if op in ("finish", "fin", "so"): return True, self._handle_step_out() if stripped.lower() in ("i r", "ir", "info r", "info reg", "info regs", "info registers"): return True, self._handle_info_registers() return False, self.link.exec_command(stripped) def _handle_mem_read(self, command: str): token = command[1:].strip() fmt = "" expr = token if token.startswith("/"): token = token[1:] spec = [] while token and not token[0].isspace(): spec.append(token[0]) token = token[1:] fmt = "".join(spec).lower() expr = token.strip() if not expr: raise ValueError("Address is required for 'x' command") count = 1 size_char = "w" if fmt: match = re.match(r"(?P\d+)?(?P[bhwg]?)$", fmt) if not match: raise ValueError("Invalid x/ specifier") if match.group("count"): count = max(1, int(match.group("count"))) if match.group("size"): size_char = match.group("size") count = min(count, 256) width = self.SIZE_MAP.get(size_char, 4) address = self._parse_int(expr) return self._read_memory(address, width, count) def _handle_mem_write(self, command: str): match = re.match(r"set\s+\*(?P[^=]+)=(?P.+)$", command, re.IGNORECASE) if not match: raise ValueError("Use 'set *ADDR=VALUE' for writes") address = self._parse_int(match.group("addr")) value = self._parse_int(match.group("value")) self._write_memory(address, value, 4) return f"Wrote 0x{value & 0xFFFFFFFF:08x} to {address:#010x}" def _handle_halt(self): func = getattr(self.link, "halt", None) if not func: raise RuntimeError("halt() is not supported by this J-Link binding") func() return self._pc_report_line("Core halted.") def _handle_go(self): attempts = [ getattr(self.link, "go", None), getattr(self.link, "run", None), getattr(self.link, "restart", None), ] last_error = None for func in attempts: if not callable(func): continue try: func() return "Execution resumed." except Exception as exc: last_error = exc raw = getattr(self.link, "exec_command", None) if raw: for cmd in ("g", "go", "Run"): try: result = raw(cmd) if result in (None, 0, "0", "OK"): return "Execution resumed." return str(result) except Exception as exc: last_error = exc raise RuntimeError(f"Resume is not supported ({last_error})") def _handle_reset(self, halt: bool): func = getattr(self.link, "reset", None) if not func: raise RuntimeError("reset() is not supported by this J-Link binding") try: if halt: func(True) else: func(False) except TypeError: try: if halt: func(halt=True) else: func() except TypeError: func() return "Reset issued." if not halt else "Reset and halted." def _handle_break_set(self, command: str): parts = command.split(None, 1) if len(parts) < 2: raise ValueError("Use 'b
' to set a breakpoint.") addr = self._parse_int(parts[1]) return self.install_breakpoint(addr) def _handle_break_clear(self, command: str): parts = command.split(None, 1) if len(parts) < 2: raise ValueError("Use 'bc
' to clear a breakpoint.") addr = self._parse_int(parts[1]) return self.remove_breakpoint(addr) def _handle_step_instruction(self): self._single_step_no_report() return self._pc_report_line("Single step executed.") def _handle_step_over(self): over = getattr(self.link, "step_over", None) if over: over() return self._pc_report_line("Step over executed.") raw = getattr(self.link, "exec_command", None) if raw: for cmd in ("StepOver", "so", "n", "Next"): try: raw(cmd) return self._pc_report_line("Step over executed.") except Exception: continue return self._software_step_over() def _handle_step_out(self): fin = getattr(self.link, "step_out", None) if fin: fin() return self._pc_report_line("Step out executed.") raw = getattr(self.link, "exec_command", None) if raw: for cmd in ("StepOut", "finish", "Finish"): try: raw(cmd) return self._pc_report_line("Step out executed.") except Exception: continue return self._software_step_out() def install_breakpoint(self, addr: int): handle = self._set_breakpoint_api(addr) if handle is None: handle = self._set_breakpoint_exec(addr) if handle is None: raise RuntimeError("Breakpoint could not be set on this target.") self._record_breakpoint(addr, handle) return f"Breakpoint set at {addr:#010x}" def remove_breakpoint(self, addr: int): normalized = self._normalize_addr(addr) handle = self._resolve_breakpoint_handle(normalized) cleared = False if handle is not None: cleared = self._clear_breakpoint_api(handle) if not cleared: cleared = self._clear_breakpoint_exec(normalized) if not cleared: raise RuntimeError("Unable to clear breakpoint; no matching breakpoint found.") self._forget_breakpoint(normalized) return f"Breakpoint cleared at {normalized:#010x}" def reinstall_breakpoints(self): if not self.session or not self.session.breakpoints: return addresses = list(self.session.breakpoints.keys()) self.session.breakpoints.clear() for addr in addresses: try: self.install_breakpoint(addr) except Exception as exc: self.session.queue_message(f"[bp] Unable to restore breakpoint at {addr:#010x}: {exc}") def _set_breakpoint_api(self, addr: int): last_error = None for name in ("breakpoint_set", "software_breakpoint_set", "hardware_breakpoint_set"): setter = getattr(self.link, name, None) if not callable(setter): continue try: handle = setter(addr) if handle: return handle except Exception as exc: last_error = exc return None def _set_breakpoint_exec(self, addr: int): exec_cmd = getattr(self.link, "exec_command", None) if not exec_cmd: return None commands = [ f"SetBP {addr:#x},0x1", f"SetBP {addr:#x}", f"SetBPAddr = {addr:#x}", f"SetBPAddr {addr:#x}", f"SetBPAddr,{addr:#x}", f"BreakSet {addr:#x}", f"BP {addr:#x}", f"Break.Set {addr:#x}", ] last_error = None for cmd in commands: try: result = exec_cmd(cmd) if result not in (None, 0, "0", "OK"): last_error = result continue handle = self._find_breakpoint_handle(addr) return handle if handle else None except Exception as exc: last_error = exc if last_error: raise RuntimeError(f"Breakpoint command failed: {last_error}") return None def _clear_breakpoint_api(self, handle) -> bool: clearer = getattr(self.link, "breakpoint_clear", None) if not callable(clearer): return False try: return bool(clearer(handle)) except Exception: return False def _clear_breakpoint_exec(self, addr: int) -> bool: exec_cmd = getattr(self.link, "exec_command", None) if not exec_cmd: return False commands = [ f"ClrBP {addr:#x}", f"ClrBP {addr:#x},0x1", f"ClearBP {addr:#x}", f"ClrBPAddr = {addr:#x}", f"ClrBPAddr {addr:#x}", f"BreakClr {addr:#x}", f"Break.Clr {addr:#x}", ] last_error = None for cmd in commands: try: result = exec_cmd(cmd) if result in (None, 0, "0", "OK"): return True last_error = result except Exception as exc: last_error = exc if last_error: raise RuntimeError(f"Breakpoint clear failed: {last_error}") return False def _suspend_breakpoints(self, addresses): if not self.session: return [] suspended = [] seen = set() for addr in addresses: if addr is None: continue normalized = self._normalize_addr(addr) if normalized in seen: continue seen.add(normalized) if not self.session.has_breakpoint(normalized): continue handle = self.session.get_breakpoint_handle(normalized) cleared = False if handle is not None: cleared = self._clear_breakpoint_api(handle) if not cleared: cleared = self._clear_breakpoint_exec(normalized) if cleared: self.session.unregister_breakpoint(normalized) suspended.append(normalized) return suspended def _restore_breakpoints(self, addresses): if not self.session: return for addr in {self._normalize_addr(a) for a in addresses if a is not None}: if addr is None: continue try: handle = self._set_breakpoint_api(addr) if handle is None: handle = self._set_breakpoint_exec(addr) if handle is None: raise RuntimeError("Unable to restore") self.session.register_breakpoint(addr, handle) except Exception as exc: self.session.queue_message(f"[bp] Unable to restore breakpoint at {addr:#010x}: {exc}") def _record_breakpoint(self, addr: int, handle): if not self.session: return self.session.register_breakpoint(addr, handle) def _forget_breakpoint(self, addr: int): if not self.session: return self.session.unregister_breakpoint(addr) def _resolve_breakpoint_handle(self, addr: int): if self.session: cached = self.session.get_breakpoint_handle(addr) if cached is not None: return cached finder = getattr(self.link, "breakpoint_find", None) if not callable(finder): return None try: handle = finder(addr) return handle if handle else None except Exception: return None def _find_breakpoint_handle(self, addr: int): finder = getattr(self.link, "breakpoint_find", None) if not callable(finder): return None try: return finder(addr) except Exception: return None def _normalize_addr(self, addr: int) -> int: return int(addr) & 0xFFFFFFFF def _pc_report_line(self, prefix: str): value = self._read_pc() if value is None: return [prefix] if self.session: self.session.record_pc_report() return [f"{prefix} — PC = 0x{value:08x}"] def _read_pc(self): getter = getattr(self.link, "register_read", None) if not callable(getter): return None for key in ("PC", "pc", 15): try: value = getter(key) if value is not None: return int(value) & 0xFFFFFFFF except Exception: continue return None def _read_register(self, name): getter = getattr(self.link, "register_read", None) if not callable(getter): return None for candidate in self._register_candidates(name): try: value = getter(candidate) except Exception: continue if value is None: continue try: return int(value) & 0xFFFFFFFF except Exception: continue return None def _register_candidates(self, name): mapping = { "pc": ["PC", "pc", "R15", 15], "lr": ["LR", "lr", "R14", 14], "sp": ["SP", "sp", "R13", 13], "fp": ["FP", "fp", "R11", 11], } if isinstance(name, str): key = name.lower() if key in mapping: return mapping[key] variants = [name, name.lower(), name.upper()] if name.upper().startswith("R"): try: idx = int(name[1:]) variants.append(idx) except Exception: pass return variants return [name] def _software_step_over(self): current_pc = self._read_pc() if current_pc is None: raise RuntimeError("Unable to read PC.") next_addr = self._next_instruction_address(current_pc) suspended = self._suspend_breakpoints([current_pc, next_addr]) handle = None via_api = False bp_installed = False lr_before = self._read_register("LR") try: stepped_pc = self._single_step_no_report() if stepped_pc is not None and self._pc_matches_target(stepped_pc, next_addr): return self._pc_report_line("Step over executed.") lr_after = self._read_register("LR") if not self._stepped_into_subroutine(lr_before, lr_after, next_addr): return self._pc_report_line("Step over executed.") handle, via_api = self._set_temporary_breakpoint(next_addr) bp_installed = True self._resume_cpu() if not self._wait_for_halt(expected_pc=next_addr): raise RuntimeError("Step over timed out; target did not halt.") return self._pc_report_line("Step over executed.") finally: if bp_installed: self._clear_temporary_breakpoint(handle, next_addr, via_api) self._restore_breakpoints(suspended) def _software_step_out(self): lr = self._read_register("LR") if lr is None: sp = self._read_register("SP") lr = self._read_stack_word(sp) if sp is not None else None if lr is None: raise RuntimeError("Unable to determine LR for step out.") target = lr & ~1 if target == 0: raise RuntimeError("LR is zero; cannot determine return address.") current_pc = self._read_pc() suspended = self._suspend_breakpoints([current_pc, target]) handle, via_api = self._set_temporary_breakpoint(target) try: self._resume_cpu() if not self._wait_for_halt(expected_pc=target): raise RuntimeError("Step out timed out; target did not halt.") return self._pc_report_line("Step out executed.") finally: self._clear_temporary_breakpoint(handle, target, via_api) self._restore_breakpoints(suspended) def _next_instruction_address(self, pc=None): if pc is None: pc = self._read_pc() if pc is None: raise RuntimeError("Unable to read PC.") current = pc & ~1 size = self._estimate_thumb_instruction_size(current) return (current + size) & 0xFFFFFFFF def _estimate_thumb_instruction_size(self, addr: int) -> int: opcode = self._read_halfword(addr) if opcode is None: return 2 prefix = (opcode >> 11) & 0x1F if prefix in (0b11101, 0b11110, 0b11111): return 4 return 2 def _read_halfword(self, addr: int): reader = getattr(self.link, "memory_read16", None) if not callable(reader): return None base = addr & ~1 try: values = reader(base, 1) if values: return values[0] & 0xFFFF except Exception: return None return None def _read_stack_word(self, sp): reader = getattr(self.link, "memory_read32", None) if not callable(reader) or sp is None: return None base = sp & ~3 try: values = reader(base, 1) if values: return values[0] & 0xFFFFFFFF except Exception: return None return None def _set_temporary_breakpoint(self, addr: int): last_error = None try: handle = self._set_breakpoint_api(addr) if handle is not None: return handle, True except Exception as exc: last_error = exc try: handle = self._set_breakpoint_exec(addr) if handle is not None: return handle, False except Exception as exc: last_error = exc raise RuntimeError(f"Unable to place temporary breakpoint: {last_error}") def _clear_temporary_breakpoint(self, handle, addr: int, via_api: bool): try: if via_api and handle: self._clear_breakpoint_api(handle) else: self._clear_breakpoint_exec(addr) except Exception: pass def _resume_cpu(self): attempts = [ getattr(self.link, "go", None), getattr(self.link, "run", None), getattr(self.link, "restart", None), ] last_error = None for func in attempts: if not callable(func): continue try: func() return except Exception as exc: last_error = exc raw = getattr(self.link, "exec_command", None) if raw: for cmd in ("g", "go", "Run"): try: raw(cmd) return except Exception as exc: last_error = exc raise RuntimeError(f"Resume is not supported ({last_error})") def _wait_for_halt(self, timeout: float = 5.0, expected_pc: Optional[int] = None) -> bool: deadline = time.time() + timeout target = self._normalize_addr(expected_pc) if expected_pc is not None else None while time.time() < deadline: try: if self.link.halted(): return True except Exception: pass if target is not None: pc = self._read_pc() if pc is not None and self._pc_matches_target(pc, target): # Some J-Link bindings don't report a halted state even when # a breakpoint is hit, so fall back to checking the PC. return True time.sleep(0.01) if target is not None: pc = self._read_pc() if pc is not None and self._pc_matches_target(pc, target): return True return False def _pc_matches_target(self, pc: int, target: int) -> bool: normalized_pc = self._normalize_addr(pc) normalized_target = self._normalize_addr(target) if normalized_pc == normalized_target: return True return (normalized_pc & ~1) == (normalized_target & ~1) def _stepped_into_subroutine(self, lr_before, lr_after, next_addr: int) -> bool: if lr_after is None: return True if not self._pc_matches_target(lr_after, next_addr): return False if lr_before is None: return True return not self._pc_matches_target(lr_before, next_addr) def _single_step_no_report(self): step = getattr(self.link, "step", None) if step: step() return self._read_pc() raw = getattr(self.link, "exec_command", None) if raw: raw("Step") return self._read_pc() raise RuntimeError("Single-step is not supported by this J-Link binding.") def _handle_info_registers(self): entries = self._gather_registers() if not entries: raise RuntimeError("Unable to read registers.") width = max(len(name) for name, _ in entries) lines = ["Registers:"] for name, value in entries: lines.append(f"{name:>{width}}: 0x{value:08x}") if self.session: self.session.record_pc_report() return lines def _gather_registers(self): getter = getattr(self.link, "register_read", None) if not callable(getter): return [] register_entries = [] reg_list = [] list_func = getattr(self.link, "register_list", None) name_func = getattr(self.link, "register_name", None) if callable(list_func) and callable(name_func): try: indices = list_func() for idx in indices: try: name = name_func(idx) except Exception: continue reg_list.append((name, idx)) except Exception: reg_list = [] if not reg_list: ordered = [ "PC", "LR", "SP", "FP", "R12", "R11", "R10", "R9", "R8", "R7", "R6", "R5", "R4", "R3", "R2", "R1", "R0", "xPSR", ] reg_list = [(name, name) for name in ordered] for name, key in reg_list: try: value = getter(key) except Exception: continue if value is None: continue text = name.decode("utf-8", errors="ignore") if isinstance(name, bytes) else str(name) register_entries.append((text.strip(), int(value) & 0xFFFFFFFF)) unique = [] seen = set() for name, value in register_entries: if name in seen: continue seen.add(name) unique.append((name, value)) return unique def _parse_int(self, text: str) -> int: try: return int(text.strip(), 0) except Exception: raise ValueError(f"Unable to parse integer value '{text}'") def _read_memory(self, address: int, width: int, count: int): bits = width * 8 func = getattr(self.link, f"memory_read{bits}", None) if not func: raise RuntimeError(f"memory_read{bits} not supported") values = func(address, count) or [] mask = (1 << bits) - 1 lines = [] for idx, raw in enumerate(values): value = raw & mask addr = address + idx * width lines.append(f"{addr:#010x}: 0x{value:0{width * 2}x}") if not lines: lines.append(f"No data returned when reading {count} value(s) at {address:#010x}.") return lines def _write_memory(self, address: int, value: int, width: int) -> None: bits = width * 8 func = getattr(self.link, f"memory_write{bits}", None) if not func: raise RuntimeError(f"memory_write{bits} not supported") mask = (1 << bits) - 1 func(address, [value & mask]) class DebuggerSession: def __init__(self, notebook, panel, ap_info): self.panel = panel self.ap_info = dict(ap_info) self.label = self.ap_info.get("label", "AP") self.core = self.ap_info.get("core", "Unknown") self.frame = ttk.Frame(notebook) self.status_var = tk.StringVar(value="Disconnected") self.script_path_var = tk.StringVar() self.log_text = None self.history_listbox = None self.command_entry = None self.command_entry_var = None self.send_button = None self.breakpoint_button = None self.connect_button = None self.status_label = None self.msg_queue = deque() self.msg_lock = threading.Lock() self.log_line_count = 0 self.command_history = [] self.history_index = 0 self.command_wrapper = None self.connected = False self._script_path = None self.breakpoints: dict[int, Optional[int]] = {} self._last_pc_report = 0.0 self._build_ui() self.enable_controls(False) def _build_ui(self): header = ttk.Frame(self.frame) header.pack(fill="x", padx=5, pady=5) ttk.Label(header, text=f"{self.label} ({self.core})").grid(row=0, column=0, sticky="w") self.connect_button = ttk.Button(header, text="Connect", width=12, command=self.toggle_connection) self.connect_button.grid(row=0, column=1, sticky="e", padx=(8, 0)) self.status_label = ttk.Label(header, textvariable=self.status_var, foreground="red") self.status_label.grid(row=1, column=0, columnspan=2, sticky="w", pady=(4, 0)) script_frame = ttk.Frame(self.frame) script_frame.pack(fill="x", padx=5, pady=(0, 5)) ttk.Label(script_frame, text="Script:").grid(row=0, column=0, sticky="w") self.script_entry = ttk.Entry(script_frame, textvariable=self.script_path_var) self.script_entry.grid(row=0, column=1, sticky="ew", padx=3) ttk.Button(script_frame, text="Browse...", command=self.browse_script_file).grid(row=0, column=2, padx=(0, 5)) ttk.Button(script_frame, text="Run Script", command=self.apply_script_file).grid(row=0, column=3) script_frame.columnconfigure(1, weight=1) console_frame = ttk.Frame(self.frame) console_frame.pack(fill="both", expand=True, padx=5, pady=5) console_frame.columnconfigure(0, weight=1) console_frame.rowconfigure(0, weight=1) self.log_text = tk.Text(console_frame, height=10, wrap="none") self.log_text.grid(row=0, column=0, sticky="nsew") self.log_text.configure(state="disabled") vscroll = ttk.Scrollbar(console_frame, orient="vertical", command=self.log_text.yview) vscroll.grid(row=0, column=1, sticky="ns") self.log_text.configure(yscrollcommand=vscroll.set) hscroll = ttk.Scrollbar(console_frame, orient="horizontal", command=self.log_text.xview) hscroll.grid(row=1, column=0, sticky="ew") self.log_text.configure(xscrollcommand=hscroll.set) button_row = ttk.Frame(console_frame) button_row.grid(row=2, column=0, columnspan=2, sticky="ew", pady=(5, 0)) button_row.columnconfigure(0, weight=1) buttons = ttk.Frame(button_row) buttons.grid(row=0, column=0, sticky="w") ttk.Button(buttons, text="Clear Console", command=self.clear_console, width=14).grid(row=0, column=0, padx=(0, 5)) ttk.Button(buttons, text="Save Log", command=self.save_console, width=14).grid(row=0, column=1) self.breakpoint_button = ttk.Button(buttons, text="Breakpoints", command=self.show_breakpoints, width=14) self.breakpoint_button.grid(row=0, column=2, padx=(5, 0)) history_frame = ttk.Frame(console_frame) history_frame.grid(row=3, column=0, sticky="nsew", pady=(5, 0)) history_frame.columnconfigure(0, weight=1) history_frame.rowconfigure(1, weight=1) ttk.Label(history_frame, text="Command history:").grid(row=0, column=0, sticky="w") self.history_listbox = tk.Listbox(history_frame, height=6) self.history_listbox.grid(row=1, column=0, sticky="nsew") self.history_listbox.bind("", self.history_select) history_scroll = ttk.Scrollbar(history_frame, orient="vertical", command=self.history_listbox.yview) history_scroll.grid(row=1, column=1, sticky="ns") self.history_listbox.configure(yscrollcommand=history_scroll.set) input_frame = ttk.Frame(console_frame) input_frame.grid(row=4, column=0, sticky="ew", pady=(5, 0)) input_frame.columnconfigure(0, weight=1) self.command_entry_var = tk.StringVar() self.command_entry = ttk.Entry(input_frame, textvariable=self.command_entry_var) self.command_entry.grid(row=0, column=0, sticky="ew", padx=(0, 5)) self.command_entry.bind("", lambda _evt: self.send_command()) self.command_entry.bind("", self._history_prev) self.command_entry.bind("", self._history_next) self.send_button = ttk.Button(input_frame, text="Send", command=self.send_command) self.send_button.grid(row=0, column=1) def set_unavailable(self, reason: str): self._set_status(reason, "red") if self.connect_button: self.connect_button.configure(state="disabled") self.enable_controls(False) def ensure_script_file(self) -> str: if self._script_path: return self._script_path script = self.ap_info.get("script") if not script: return "" fd, path = tempfile.mkstemp(prefix=f"jlink_{self.label.lower()}_", suffix=".jlink") with os.fdopen(fd, "w", encoding="utf-8") as handle: handle.write(script) self._script_path = path return path def cleanup_script(self): if not self._script_path: return try: os.unlink(self._script_path) except Exception: pass self._script_path = None def enable_controls(self, enabled: bool): state = "normal" if enabled else "disabled" if self.command_entry: self.command_entry.configure(state=state) if self.send_button: self.send_button.configure(state=state) if self.breakpoint_button: self.breakpoint_button.configure(state=state) def queue_message(self, text: str): if not text: return with self.msg_lock: self.msg_queue.append(str(text)) def poll_queue(self): with self.msg_lock: if not self.msg_queue: return entries = list(self.msg_queue) self.msg_queue.clear() self._log_entries(entries) def _log_entries(self, entries): if not self.log_text: return prepared = [] for entry in entries: trimmed = str(entry).strip() if not trimmed or trimmed.strip(".") == "": continue prepared.append(f"[{timestamp()}] {trimmed}\n") if not prepared: return self.log_line_count += len(prepared) block = "".join(prepared) self.log_text.configure(state="normal") self.log_text.insert("end", block) if LOG_HISTORY_LIMIT and self.log_line_count > LOG_HISTORY_LIMIT: excess = self.log_line_count - LOG_HISTORY_LIMIT self.log_text.delete("1.0", f"{excess + 1}.0") self.log_line_count = LOG_HISTORY_LIMIT self.log_text.see("end") self.log_text.configure(state="disabled") def clear_console(self): if not self.log_text: return with self.msg_lock: self.msg_queue.clear() self.log_text.configure(state="normal") self.log_text.delete("1.0", "end") self.log_text.configure(state="disabled") self.log_line_count = 0 def save_console(self): if not self.log_text: return try: contents = self.log_text.get("1.0", "end-1c") except tk.TclError: contents = "" if not contents.strip(): messagebox.showinfo("Save Console Log", "There is no console output to save.") return path = filedialog.asksaveasfilename( title="Save Console Log", defaultextension=".txt", filetypes=(("Text Files", "*.txt"), ("All Files", "*.*")), ) if not path: return try: with open(path, "w", encoding="utf-8") as handle: handle.write(contents) except Exception as exc: messagebox.showerror("Save Failed", f"Unable to save log: {exc}") def show_breakpoints(self): if not self.breakpoints: self.queue_message("[bp] No breakpoints set.") return entries = sorted(self.breakpoints.items()) lines = ["[bp] Active breakpoints:"] for addr, handle in entries: if handle is not None: lines.append(f"[bp] {addr:#010x} (handle {handle})") else: lines.append(f"[bp] {addr:#010x}") for line in lines: self.queue_message(line) def toggle_connection(self): if self.connected: self.panel.detach_session(self) return self.panel.attach_session(self) def mark_connected(self, serial_no: int): self.connected = True self._set_status(f"Connected — SN {serial_no}", "green") if self.connect_button: self.connect_button.configure(text="Disconnect", state="normal") self.enable_controls(True) def mark_disconnected(self): self.connected = False self.command_wrapper = None self._set_status("Disconnected", "red") if self.connect_button: self.connect_button.configure(text="Connect", state="normal") self.enable_controls(False) def browse_script_file(self): path = filedialog.askopenfilename( title="Select Script", filetypes=(("Scripts", "*.jlink *.jscr *.jsf *.gdb *.gdbinit *.txt"), ("All Files", "*.*")), ) if not path: return self.script_path_var.set(path) self._process_script_file(path, run_now=False, show_dialog=False) def apply_script_file(self): path = (self.script_path_var.get() or "").strip() if not path: messagebox.showinfo("Script File", "Enter or select a script file first.") return self._process_script_file(path, run_now=True, show_dialog=False) def _process_script_file(self, path, run_now, show_dialog): normalized = os.path.abspath(path) if not os.path.isfile(normalized): msg = f"Script file not found: {normalized}" self.queue_message(f"[script] {msg}") if show_dialog: messagebox.showerror("Script File Not Found", msg) return False self.script_path_var.set(normalized) script_type = self._detect_script_type(normalized) filename = os.path.basename(normalized) if script_type == "gdb": if not self.connected or not self.command_wrapper: info = f"GDB script '{filename}' stored. Connect and run it manually." self.queue_message(f"[script] {info}") if show_dialog: messagebox.showinfo("GDB Script", info) return False if not run_now: info = f"GDB script '{filename}' ready. Click Run Script to execute." self.queue_message(f"[script] {info}") return True ok, message = self.panel.run_gdb_script(self, normalized) if not ok: messagebox.showerror("GDB Script", message) return ok if not self.connected: info = f"Stored J-Link script '{filename}'. It will load after connecting." self.queue_message(f"[script] {info}") if show_dialog: messagebox.showinfo("J-Link Script", info) return True ok, message = self.panel.set_shared_script(normalized, session=self) if not ok and show_dialog: messagebox.showerror("J-Link Script", message) return ok def _detect_script_type(self, path): ext = os.path.splitext(path)[1].lower() if ext in GDB_SCRIPT_EXTS: return "gdb" return "jlink" def send_command(self): if not self.command_entry_var: return command = self.command_entry_var.get().strip() self._dispatch_command(command, add_to_history=True) def _dispatch_command(self, command, add_to_history: bool): if not command: return if not self.connected: messagebox.showwarning("Not Connected", "Connect to the debugger before sending commands.") return if add_to_history and self.history_listbox: self.history_listbox.insert("end", command) if self.history_listbox.size() > HISTORY_LIMIT: self.history_listbox.delete(0) if add_to_history: self.command_history.append(command) self.history_index = len(self.command_history) self.command_entry_var.set("") self.queue_message(f"> {command}") threading.Thread(target=self.panel.run_session_command, args=(self, command), daemon=True).start() def history_select(self, _event): if not self.history_listbox or not self.command_entry_var: return sel = self.history_listbox.curselection() if not sel: return command = self.history_listbox.get(sel[0]) self.command_entry_var.set(command) if self.command_entry: try: self.command_entry.icursor("end") except tk.TclError: pass self._dispatch_command(command, add_to_history=False) def _history_prev(self, _event): self._navigate_history(-1) return "break" def _history_next(self, _event): self._navigate_history(1) return "break" def _navigate_history(self, delta: int): if not self.command_history or not self.command_entry_var: return new_index = max(0, min(len(self.command_history), self.history_index + delta)) self.history_index = new_index value = "" if new_index == len(self.command_history) else self.command_history[new_index] self.command_entry_var.set(value) if self.command_entry: try: self.command_entry.icursor("end") except tk.TclError: pass def _set_status(self, text: str, color: str): self.status_var.set(text) if self.status_label: try: self.status_label.configure(foreground=color) except tk.TclError: pass def shutdown(self): if self.connected: try: self.panel.detach_session(self) except Exception: pass self.cleanup_script() def _emit_command_result(self, result): if result is None: return if isinstance(result, tuple) and len(result) == 2: status, response = result self.queue_message(f"[pylink] Result code: {status}") if response: self._emit_response_lines(response) return self._emit_response_lines(result) def _emit_response_lines(self, response): text = str(response) for line in text.splitlines(): trimmed = line.strip() if trimmed: self.queue_message(trimmed) def _emit_wrapper_result(self, result): if result is None: return if isinstance(result, (list, tuple)): for entry in result: trimmed = str(entry).strip() if trimmed: self.queue_message(trimmed) else: trimmed = str(result).strip() if trimmed: self.queue_message(trimmed) def register_breakpoint(self, addr: int, handle): normalized = self._normalize_breakpoint_addr(addr) self.breakpoints[normalized] = handle def unregister_breakpoint(self, addr: int): normalized = self._normalize_breakpoint_addr(addr) self.breakpoints.pop(normalized, None) def get_breakpoint_handle(self, addr: int): return self.breakpoints.get(self._normalize_breakpoint_addr(addr)) def list_breakpoints(self): return list(self.breakpoints.keys()) def clear_breakpoints(self): self.breakpoints.clear() def _normalize_breakpoint_addr(self, value: int) -> int: return int(value) & 0xFFFFFFFF def record_pc_report(self): self._last_pc_report = time.monotonic() def should_suppress_pc_report(self) -> bool: if self._last_pc_report <= 0: return False return (time.monotonic() - self._last_pc_report) < 0.3 def has_breakpoint(self, addr: int) -> bool: return self._normalize_breakpoint_addr(addr) in self.breakpoints class SharedDebuggerController: def __init__(self, panel): self.panel = panel self.link = None self.serial = None self.speed = None self._link_lock = threading.Lock() self._command_lock = threading.Lock() self._sessions = set() self._current_session = None self._monitor_thread = None self._monitor_stop = threading.Event() self._monitor_last_halted = False def attach(self, session, serial_no, speed): with self._link_lock: if self.link is None: ok, msg = self._open_link(serial_no, speed) if not ok: return False, msg elif self.serial != serial_no: return False, f"Debugger already connected to emulator {self.serial}." self._sessions.add(session) session.command_wrapper = JLinkGDBWrapper(self.link, session) ok, msg = self.ensure_session_ready(session) if not ok: self.detach(session) return False, msg return True, None def detach(self, session): with self._link_lock: self._sessions.discard(session) if self._current_session is session: self._current_session = None if not self._sessions: self._close_link() def _open_link(self, serial_no, speed): if not self.panel.is_pylink_available(): return False, "pylink not available." try: link = pylink.JLink() link.open(serial_no) interface = self.panel._resolve_swd_interface() if interface is not None: link.set_tif(interface) link.set_speed(speed) except Exception as exc: return False, str(exc) self.link = link self.serial = serial_no self.speed = speed self._start_monitor() return True, None def _close_link(self): if self.link: try: self.link.close() except Exception: pass self.link = None self.serial = None self.speed = None self._current_session = None self._stop_monitor() def ensure_session_ready(self, session): with self._command_lock: ok, msg = self._apply_context(session) return ok, msg def execute(self, session, command): if not self.link: session.queue_message("[pylink] Shared link is not available.") return with self._command_lock: ok, msg = self._apply_context(session) if not ok: if msg: session.queue_message(f"[pylink] {msg}") return try: handled = False result = None if session.command_wrapper: handled, result = session.command_wrapper.execute(command) else: result = self.link.exec_command(command) if handled: session._emit_wrapper_result(result) else: session._emit_command_result(result) except Exception as exc: session.queue_message(f"[pylink] Command failed: {exc}") def run_gdb_script(self, session, path): if not self.link: return False, "Debugger not connected." try: with open(path, "r", encoding="utf-8") as handle: lines = handle.readlines() except Exception as exc: msg = f"Unable to read script '{path}': {exc}" session.queue_message(f"[script] {msg}") return False, msg commands = [] for line in lines: stripped = line.strip() if not stripped or stripped.startswith("#"): continue if stripped.lower().startswith("monitor "): stripped = stripped[8:].strip() commands.append(stripped) if not commands: msg = f"No commands found in '{os.path.basename(path)}'." session.queue_message(f"[script] {msg}") return True, msg with self._command_lock: ok, ctx_msg = self._apply_context(session) if not ok: return False, ctx_msg or "Unable to configure debugger context." for command in commands: session.queue_message(f"[script] gdb> {command}") try: if session.command_wrapper: handled, result = session.command_wrapper.execute(command) else: handled = False result = self.link.exec_command(command) except Exception as exc: msg = f"Command '{command}' failed: {exc}" session.queue_message(f"[script] {msg}") return False, msg if handled: session._emit_wrapper_result(result) else: session._emit_command_result(result) msg = f"Executed {len(commands)} command(s) from '{os.path.basename(path)}'." session.queue_message(f"[script] {msg}") return True, msg def set_script(self, path): if not self.link: return False, "Debugger not connected." try: self.link.set_script_file(path) except Exception as exc: msg = f"Unable to load J-Link script '{path}': {exc}" return False, msg self._current_session = None return True, f"Loaded J-Link script '{os.path.basename(path)}'." def _apply_context(self, session): if not self.link: return False, "J-Link is not connected." if self._current_session is session: return True, None reinitialize = self._current_session is not None ok, msg = self._configure_link(session, reinitialize) if not ok: return False, msg self._current_session = session if session.command_wrapper: session.command_wrapper.reinstall_breakpoints() return True, None def _configure_link(self, session, reinitialize: bool): if reinitialize: serial = self.serial speed = self.speed if serial is None or speed is None: return False, "Debugger is not initialized." try: if self.link: self.link.close() except Exception: pass self.link = None ok, msg = self._open_link(serial, speed) if not ok: return False, msg or "Unable to reopen J-Link." self._refresh_wrappers() else: try: self.link.disconnect() except Exception: pass script_path = session.ensure_script_file() if script_path: try: self.link.set_script_file(script_path) except Exception as exc: return False, f"Unable to apply script: {exc}" targets = session.ap_info.get("targets") or [session.core] target = targets[0] try: try: self.link.connect(target, speed=self.speed) except TypeError: self.link.connect(chip_name=target, speed=self.speed) except Exception as exc: return False, f"Unable to connect to {target}: {exc}" return True, None def _refresh_wrappers(self): if not self.link: return for sess in list(self._sessions): if sess.connected: sess.command_wrapper = JLinkGDBWrapper(self.link, sess) def _start_monitor(self): if self._monitor_thread and self._monitor_thread.is_alive(): return self._monitor_stop.clear() self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self._monitor_thread.start() def _stop_monitor(self): self._monitor_stop.set() thread = self._monitor_thread if thread and thread.is_alive(): thread.join(timeout=0.5) self._monitor_thread = None self._monitor_last_halted = False def _monitor_loop(self): while not self._monitor_stop.is_set(): if not self.link: if self._monitor_stop.wait(0.2): break continue with self._command_lock: halted = False try: halted = bool(self.link.halted()) except Exception: halted = False if halted and not self._monitor_last_halted: self._report_monitor_pc() self._monitor_last_halted = halted if self._monitor_stop.wait(0.2): break def _report_monitor_pc(self): session = self._current_session if not session or session.should_suppress_pc_report(): return pc = self._read_pc_locked() if pc is None: session.queue_message("[pc] CPU halted.") else: session.queue_message(f"[pc] Halted at 0x{pc:08x}") session.record_pc_report() def _read_pc_locked(self): if not self.link: return None getter = getattr(self.link, "register_read", None) if not callable(getter): return None for key in ("PC", "pc", 15): try: value = getter(key) if value is not None: return int(value) & 0xFFFFFFFF except Exception: continue return None class PylinkDebuggerPanel: def __init__(self, parent, dll_path=None): self.parent = parent self.dll_path = dll_path if dll_path and os.path.exists(dll_path) else None self.frame = ttk.LabelFrame(parent, text="Pylink Debugger") self.device_var = tk.StringVar() self.speed_var = tk.StringVar(value="4000") self.status_var = tk.StringVar(value="") self.device_combo = None self.status_label = None self.notebook = None self.sessions: List[DebuggerSession] = [] self._device_display = {} self._device_refresh_after = None self._device_error_reported = False self._pylink_available = pylink is not None self.controller = SharedDebuggerController(self) self.ap_definitions = self._build_ap_definitions() self._build_widgets() self.refresh_devices() self._schedule_device_refresh() if not self._pylink_available: self._set_status("pylink not installed", "red") self.queue_message("pylink module is missing; install 'pylink-square' to use the debugger.") for session in self.sessions: session.set_unavailable("pylink not installed") elif not self.is_dll_available(): self._set_status("J-Link DLL missing", "orange") self.queue_message("pylink/JLinkARM.dll not found; copy the DLL into the 'pylink' folder.") for session in self.sessions: session.set_unavailable("J-Link DLL missing") def _build_widgets(self): conn_frame = ttk.Frame(self.frame) conn_frame.pack(fill="x", padx=5, pady=5) conn_frame.columnconfigure(1, weight=1) ttk.Label(conn_frame, text="J-Link:").grid(row=0, column=0, sticky="w") self.device_combo = ttk.Combobox(conn_frame, textvariable=self.device_var, width=45, state="readonly") self.device_combo.grid(row=0, column=1, sticky="ew", padx=3) ttk.Button(conn_frame, text="Help", command=self.show_help).grid(row=0, column=2, padx=(3, 0)) ttk.Label(conn_frame, text="Speed (kHz):").grid(row=1, column=0, sticky="w", pady=(6, 0)) ttk.Entry(conn_frame, textvariable=self.speed_var, width=10).grid(row=1, column=1, sticky="w", padx=3, pady=(6, 0)) self.status_label = ttk.Label(conn_frame, textvariable=self.status_var, foreground="red") self.status_label.grid(row=2, column=0, columnspan=2, sticky="w", pady=(6, 0)) self.notebook = ttk.Notebook(self.frame) self.notebook.pack(fill="both", expand=True, padx=5, pady=(0, 5)) for ap in self.ap_definitions: session = DebuggerSession(self.notebook, self, ap) self.sessions.append(session) self.notebook.add(session.frame, text=self._format_session_title(ap)) def _format_session_title(self, ap_def: dict) -> str: label = ap_def.get("label", "AP") core = ap_def.get("core", "Unknown") return f"{label} - {core}" def _build_ap_definitions(self) -> List[dict]: entries = [ {"label": "NP", "core": "Cortex-M33", "ap_index": 0, "targets": ["Cortex-M33"], "extras": []}, {"label": "MP", "core": "Cortex-M33", "ap_index": 1, "targets": ["Cortex-M33"], "extras": []}, {"label": "FP", "core": "Cortex-M23", "ap_index": 2, "targets": ["Cortex-M23"], "extras": []}, { "label": "CA32", "core": "Cortex-A32", "ap_index": 3, "targets": ["Cortex-A32"], "extras": [ " CORESIGHT_CoreBaseAddr = 0x80210000;", ' JLINK_ExecCommand("CORESIGHT_SetCSCTICoreBaseAddr = 0x80218000");', ], }, ] results = [] for ap in entries: entry = dict(ap) entry["script"] = self._build_ap_script(entry) results.append(entry) return results def _build_ap_script(self, ap_info: dict) -> str: cpu = self._cpu_constant(ap_info.get("core", "Cortex-M33")) ap_index = ap_info.get("ap_index", 0) extras = ap_info.get("extras") or [] lines = [ "int InitTarget(void) {", " JLINK_CORESIGHT_AddAP(0, CORESIGHT_AHB_AP);", " JLINK_CORESIGHT_AddAP(1, CORESIGHT_AHB_AP);", " JLINK_CORESIGHT_AddAP(2, CORESIGHT_AHB_AP);", " JLINK_CORESIGHT_AddAP(3, CORESIGHT_APB_AP);", " JLINK_CORESIGHT_AddAP(4, CORESIGHT_AHB_AP);", f" CPU = {cpu};", f" CORESIGHT_IndexAHBAPToUse = {ap_index};", ] lines.extend(extras) lines.append(" return 0;") lines.append("}") return "\n".join(lines) def _cpu_constant(self, core_name: str) -> str: mapping = { "Cortex-M33": "CORTEX_M33", "Cortex-M23": "CORTEX_M23", "Cortex-A32": "CORTEX_A32", } return mapping.get(core_name, "CORTEX_M33") def _resolve_swd_interface(self): if pylink is None: return None try: return pylink.enums.JLinkInterfaces.SWD except Exception: try: return pylink.JLinkInterfaces.SWD except Exception: return None def attach_session(self, session: DebuggerSession): if not self.is_pylink_available(): session.set_unavailable("pylink not installed") return if not self.is_dll_available(): messagebox.showerror("J-Link DLL Missing", "Copy pylink/JLinkARM.dll before connecting.") session.set_unavailable("J-Link DLL missing") return device = self.get_selected_device() if not device: messagebox.showwarning("J-Link Not Found", "Select a connected J-Link before connecting.") return serial = device.get("serial") speed = self.validate_speed() if speed is None or serial is None: return ok, msg = self.controller.attach(session, serial, speed) if not ok: messagebox.showerror("Pylink Connection Failed", msg) session.queue_message(f"[error] {msg}") return session.mark_connected(serial) session.queue_message(f"Attached to emulator {serial} at {speed} kHz.") def detach_session(self, session: DebuggerSession): session.mark_disconnected() self.controller.detach(session) def run_session_command(self, session: DebuggerSession, command: str): self.controller.execute(session, command) def run_gdb_script(self, session: DebuggerSession, path: str): return self.controller.run_gdb_script(session, path) def set_shared_script(self, path: str, session: Optional[DebuggerSession] = None): ok, msg = self.controller.set_script(path) target_session = session or None if target_session: target_session.queue_message(f"[script] {msg}") else: self.queue_message(f"[script] {msg}") return ok, msg def _set_status(self, text: str, color: str): self.status_var.set(text) if self.status_label: try: self.status_label.configure(foreground=color) except tk.TclError: pass def is_pylink_available(self) -> bool: return self._pylink_available def is_dll_available(self) -> bool: return bool(self.dll_path and os.path.exists(self.dll_path)) def get_selected_device(self): label = (self.device_var.get() or "").strip() return self._device_display.get(label) def validate_speed(self) -> Optional[int]: try: speed = int((self.speed_var.get() or "").strip()) if speed <= 0: raise ValueError return speed except ValueError: messagebox.showwarning("Invalid Speed", "Enter a positive integer speed in kHz.") return None def queue_message(self, text: str): if not text: return for session in self.sessions: session.queue_message(text) def show_help(self): sections = [ ( "Memory Access", [ ("x ", "Read a 32-bit word at . Use x/COUNT{b|h|w|g} for other sizes/counts."), ("set *=", "Write a 32-bit value to ."), ], ), ( "Execution Control", [ ("halt / h", "Stop the current core."), ("go / g / resume / run", "Resume execution from the current PC."), ("reset / r", "Reset and continue."), ("reset_halt / rh", "Reset and halt immediately."), ("step / s / si", "Single-step one instruction."), ("next / n / ni", "Step over calls (uses software breakpoint when needed)."), ("finish / fin / so", "Step out of the current function."), ("i r", "Show current PC and general-purpose registers."), ], ), ( "Breakpoints", [ ("b ", "Set breakpoint at ."), ("bc ", "Clear the breakpoint at ."), ], ), ( "J-Link Exec", [ ("Any other text", "Forwarded directly to J-Link 'exec' (ex: regs, mem32, log)."), ], ), ( "Scripts", [ ("Run Script", "Load .jlink or .gdb files for the selected tab."), ("monitor ", "In .gdb files, these lines send directly to J-Link."), ], ), ( "Sessions", [ ("Per-tab isolation", "Each core tab owns its connection, history, and log."), ], ), ] lines = ["Debugger command reference", ""] for title, entries in sections: lines.append(f"{title}:") for cmd, desc in entries: lines.append(f" {cmd}") lines.append(f" {desc}") lines.append("") text = "\n".join(lines).rstrip() try: messagebox.showinfo("Debugger Help", text) except tk.TclError: pass def poll_queue(self): for session in self.sessions: session.poll_queue() def shutdown(self): self._cancel_device_refresh() for session in list(self.sessions): session.shutdown() def disconnect(self): for session in list(self.sessions): if session.connected: self.detach_session(session) def has_active_connections(self) -> bool: return any(session.connected for session in self.sessions) def refresh_devices(self): entries = self._enumerate_emulators() display = [] mapping = {} for entry in entries: serial, detail = self._extract_emulator_metadata(entry) if serial is None: continue label = self._format_device_label(serial, detail) display.append(label) mapping[label] = {"serial": serial, "detail": detail} self._device_display = mapping if self.device_combo: self.device_combo["values"] = display current = self.device_var.get() if display and current not in mapping: self.device_var.set(display[0]) elif not display: self.device_var.set("") def _schedule_device_refresh(self): if self._device_refresh_after is not None: return try: self._device_refresh_after = self.frame.after(1000, self._auto_refresh_devices) except tk.TclError: self._device_refresh_after = None def _auto_refresh_devices(self): self._device_refresh_after = None try: if not self.frame.winfo_exists(): return except tk.TclError: return self.refresh_devices() self._schedule_device_refresh() def _cancel_device_refresh(self): if self._device_refresh_after is None: return try: self.frame.after_cancel(self._device_refresh_after) except tk.TclError: pass self._device_refresh_after = None def _enumerate_emulators(self): if not self._pylink_available: return [] for provider in self._emulator_providers(): try: entries = provider() except Exception as exc: if not self._device_error_reported: self.queue_message(f"[pylink] Unable to enumerate J-Link: {exc}") self._device_error_reported = True continue if entries: self._device_error_reported = False return entries return [] def _emulator_providers(self): providers = [] if hasattr(pylink, "JLink"): def by_instance(): link = pylink.JLink() try: if hasattr(link, "list_emulators"): return link.list_emulators() return link.connected_emulators() finally: try: link.close() except Exception: pass providers.append(by_instance) dll_module = getattr(pylink, "JLinkARMDll", None) dll_class = getattr(dll_module, "JLinkARMDll", None) if dll_module else None if dll_class: def by_dll(): dll = dll_class() try: if hasattr(dll, "list_emulators"): return dll.list_emulators() finally: try: dll.close() except Exception: pass providers.append(by_dll) return providers def _extract_emulator_metadata(self, entry): serial = self._extract_serial(entry) if serial is None: return None, "" product = self._extract_attr(entry, ("product", "Product", "model")) connection = self._extract_attr(entry, ("connection", "Connection", "interface")) if hasattr(connection, "name"): connection = connection.name parts = [] if product: parts.append(str(product).strip()) if connection: parts.append(str(connection).strip()) return serial, " / ".join(parts) def _extract_serial(self, entry): candidates = [] if isinstance(entry, dict): candidates.extend([ entry.get("serial_number"), entry.get("SerialNumber"), entry.get("serial"), entry.get("Serial"), ]) else: candidates.extend([ getattr(entry, "serial_number", None), getattr(entry, "SerialNumber", None), getattr(entry, "serial", None), getattr(entry, "Serial", None), ]) for value in candidates: serial = safe_int(value) if serial is not None: return serial return None def _extract_attr(self, entry, keys): for key in keys: if isinstance(entry, dict) and key in entry: return entry.get(key) if hasattr(entry, key): return getattr(entry, key) return None def _format_device_label(self, serial, detail): serial_text = str(serial).strip() info_parts = [] normalized = self._normalize_device_detail(detail or "") if normalized: info_parts.append(normalized) ap_overview = " / ".join(f"{ap['label']} ({ap['core']})" for ap in self.ap_definitions) if ap_overview: info_parts.append(ap_overview) if info_parts: return f"{serial_text} - {' | '.join(info_parts)}" return serial_text def _normalize_device_detail(self, detail: str) -> str: text = (detail or "").strip() if text.endswith("- 1"): text = text[:-3].rstrip() return text class DualUARTApp: def __init__(self, root: tk.Tk) -> None: self.root = root version_suffix = f"v{APP_VERSION}" if APP_VERSION not in ("", "unknown") else "version unknown" self.root.title(f"AmebaPro3 Control Panel — Powered by yiekheng, {version_suffix}") self._closing = False self._poll_after_id = None self.flash_thread = None self._icon_image = self._create_robot_icon() if self._icon_image: self.root.iconphoto(False, self._icon_image) paned = ttk.Panedwindow(root, orient="horizontal") paned.pack(fill="both", expand=True, padx=10, pady=10) left_container = ttk.Frame(paned) right_container = ttk.Frame(paned) paned.add(left_container, weight=2) paned.add(right_container, weight=3) self.port_assignments: dict[str, str] = {} self.dev1 = DevicePanel( left_container, title="Control Device", mode="arduino", default_baud="9600", line_ending="\n", console_height=10, on_connect=lambda port, key="dev1": self._panel_connected(key, port), on_disconnect=lambda port, key="dev1": self._panel_disconnected(key, port), exclude_ports_provider=lambda key="dev1": self._get_excluded_ports(key), before_button_callback=self._before_control_button, ) self.dev1.frame.pack(fill="both", expand=True, padx=(0, 5)) self.debugger = PylinkDebuggerPanel(left_container, PYLINK_DLL_PATH) self.debugger.frame.pack(fill="both", expand=True, padx=(0, 5), pady=(5, 0)) self.dev2 = DevicePanel( right_container, title="AmebaPro3", mode="raw", default_baud="1500000", line_ending="\r\n", flash_handler=self.flash_ameba, flash_button_label="Flash Image", on_connect=lambda port, key="dev2": self._panel_connected(key, port), on_disconnect=lambda port, key="dev2": self._panel_disconnected(key, port), exclude_ports_provider=lambda key="dev2": self._get_excluded_ports(key), ) self.dev2.frame.pack(fill="both", expand=True, padx=(5, 0)) self.poll_loop() self.root.protocol("WM_DELETE_WINDOW", self.close) # ------------------------------------------------------------------ def _create_robot_icon(self): try: icon = tk.PhotoImage(width=32, height=32) except tk.TclError: return None icon.put("#1e1f2a", to=(0, 0, 32, 32)) icon.put("#9ca5c9", to=(6, 6, 26, 18)) icon.put("#3c3f58", to=(10, 18, 22, 28)) icon.put("#f5f7ff", to=(10, 10, 14, 14)) icon.put("#f5f7ff", to=(18, 10, 22, 14)) icon.put("#1e1f2a", to=(12, 12, 13, 13)) icon.put("#1e1f2a", to=(20, 12, 21, 13)) icon.put("#9ca5c9", to=(14, 3, 18, 5)) icon.put("#9ca5c9", to=(15, 5, 17, 6)) icon.put("#6b708f", to=(6, 14, 8, 22)) icon.put("#6b708f", to=(24, 14, 26, 22)) return icon # ------------------------------------------------------------------ def poll_loop(self) -> None: if self._closing: return self.dev1.poll_queue() self.dev2.poll_queue() if self.debugger: self.debugger.poll_queue() try: self._poll_after_id = self.root.after(10, self.poll_loop) except tk.TclError: self._closing = True def close(self) -> None: if self._closing: return self._closing = True if self._poll_after_id is not None: try: self.root.after_cancel(self._poll_after_id) except tk.TclError: pass self._poll_after_id = None self.dev1.disconnect_serial() self.dev2.disconnect_serial() if self.debugger: self.debugger.shutdown() self.debugger.disconnect() self.root.destroy() # ------------------------------------------------------------------ def _panel_connected(self, key: str, port: str) -> None: self.port_assignments[key] = port self._refresh_other_panels(key) def _panel_disconnected(self, key: str, _port: str) -> None: self.port_assignments.pop(key, None) self._refresh_other_panels(key) def _get_excluded_ports(self, key: str) -> List[str]: return [port for panel_key, port in self.port_assignments.items() if panel_key != key] def _refresh_other_panels(self, key: str) -> None: for panel_key, panel in ("dev1", self.dev1), ("dev2", self.dev2): if panel_key != key: panel.refresh_ports() def _before_control_button(self) -> None: self._disconnect_debugger_for_reset("Arduino control command") def _disconnect_debugger_for_reset(self, reason: str) -> None: if not self.debugger or not self.debugger.has_active_connections(): return note = f"Disconnecting debugger before {reason}." for panel in (self.dev1, self.dev2): panel.queue_message(f"[debugger] {note}") try: self.debugger.queue_message(f"[info] {note}") except Exception: pass try: self.debugger.disconnect() except Exception: pass def _log_flash_step(self, text: str) -> None: message = f"[flash] {text}" self.dev2.queue_message(message) self.dev1.queue_message(message) def _prepare_firmware_images(self) -> bool: image_dir = os.path.join(APP_ROOT, FLASH_IMAGE_DIR) try: os.makedirs(image_dir, exist_ok=True) except Exception as exc: msg = f"Unable to create firmware directory '{image_dir}': {exc}" self._log_flash_step(msg) messagebox.showerror("Firmware Directory Error", msg) return False missing = [src for src, _ in FIRMWARE_COPY_MAP if not os.path.exists(os.path.join(image_dir, src))] if missing: msg = f"Missing firmware file(s): {', '.join(missing)} in {image_dir}" self._log_flash_step(msg) messagebox.showerror("Missing Firmware", msg) return False for src, dest in FIRMWARE_COPY_MAP: src_path = os.path.join(image_dir, src) dest_path = os.path.join(image_dir, dest) try: shutil.copyfile(src_path, dest_path) self._log_flash_step(f"Prepared '{dest}' from '{src}'.") except Exception as exc: err = f"Failed to copy '{src}' to '{dest}': {exc}" self._log_flash_step(err) messagebox.showerror("Firmware Copy Failed", err) return False return True def _send_arduino_pg_reset(self, value: int) -> bool: self._disconnect_debugger_for_reset("Arduino PG/reset sequence") if not self.dev1.is_connected(): self._log_flash_step("Arduino is not connected; cannot toggle PG.") return False pg_cmd = f"pg {value}" self._log_flash_step(f"Arduino command → '{pg_cmd}'") self.dev1.send_uart(pg_cmd) self._log_flash_step("Arduino command → 'reset'") self.dev1.send_uart("reset") return True def _set_flash_controls_state(self, enabled: bool) -> None: state = "normal" if enabled else "disabled" for widget in (self.dev2.flash_button, self.dev2.connect_button, self.dev1.connect_button): if widget is not None: widget.configure(state=state) def flash_ameba(self) -> None: if self.flash_thread and self.flash_thread.is_alive(): messagebox.showinfo("Flash in Progress", "Please wait for the current flash.py run to finish.") return self._disconnect_debugger_for_reset("flash.py run") if not self.dev1.is_connected(): messagebox.showwarning("Arduino Not Connected", "Connect Device 1 (Arduino) before flashing.") return if not self.dev2.is_connected(): messagebox.showwarning("Not Connected", "Connect Device 2 (AmebaPro3) before flashing.") return port = self.dev2.get_selected_port() if not port: messagebox.showwarning("No Port", "Select a COM port for AmebaPro3 before flashing.") return if not self._prepare_firmware_images(): return self._log_flash_step(f"Starting flash.py on {port}.") self._set_flash_controls_state(False) if not self._send_arduino_pg_reset(1): self._set_flash_controls_state(True) messagebox.showerror("Arduino Error", "Unable to set PG=1 and reset through Arduino.") return self._log_flash_step("Disconnecting AmebaPro3 UART before flashing.") self.dev2.disconnect_serial() self.flash_thread = threading.Thread(target=self._flash_worker, args=(port,), daemon=True) self.flash_thread.start() def _flash_worker(self, port: str) -> None: command = self._build_flash_command(port) self.dev2.queue_message(f"[flash] Executing: {' '.join(command)}") try: process = subprocess.Popen( command, cwd=APP_ROOT, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) except Exception as exc: self._flash_complete(False, f"Failed to start flash.py: {exc}") return if process.stdout: timestamp_pattern = re.compile(r"\[\d{4}-\d{2}-\d{2} [^\]]+\]") for line in process.stdout: clean = line.rstrip() if not clean: continue sanitized = timestamp_pattern.sub("", clean).strip() if not sanitized: continue self.dev2.queue_message(f"[flash.py] {sanitized}") ret = process.wait() if process.stdout: process.stdout.close() success = ret == 0 message = "flash.py completed successfully." if success else f"flash.py exited with code {ret}." self._flash_complete(success, message) def _flash_complete(self, success: bool, message: str) -> None: def finalize(): self.dev2.queue_message(f"[flash] {message}") self._set_flash_controls_state(True) self.flash_thread = None if not self._closing: self._log_flash_step("Reconnecting AmebaPro3 UART...") try: self.dev2.connect_serial() except Exception: pass if self.dev2.is_connected(): self._log_flash_step("AmebaPro3 reconnected. Lowering PG and resetting.") self._send_arduino_pg_reset(0) else: self._log_flash_step("Failed to reconnect AmebaPro3; PG reset skipped.") if not success and not self._closing: messagebox.showerror("Flash Failed", message) try: self.root.after(0, finalize) except tk.TclError: pass def _build_flash_command(self, port: str) -> List[str]: args = [ "-d", "--profile", FLASH_PROFILE, "--image-dir", FLASH_IMAGE_DIR, "--baudrate", FLASH_BAUDRATE, "--memory-type", FLASH_MEMORY_TYPE, "--port", port, ] exe = self._locate_flash_executable() if exe: return [exe] + args script = os.path.join(APP_ROOT, "flash.py") return [sys.executable, script] + args def _locate_flash_executable(self) -> Optional[str]: candidates = [] if getattr(sys, "frozen", False): candidates.append(os.path.dirname(sys.executable)) candidates.append(APP_ROOT) for directory in candidates: exe = os.path.join(directory, "flash.exe") if os.path.exists(exe): return exe return None if __name__ == "__main__": root = tk.Tk() DualUARTApp(root) root.mainloop()