pro3_control_panel/pro3_uart.py
wongyiekheng fb52c81e52 + Add find bar
+ Add splitter
+ History deletable
2025-12-16 17:25:12 +08:00

3734 lines
138 KiB
Python

"""Dual UART + J-Link debugger control panel for AmebaPro3 boards."""
from __future__ import annotations
import datetime
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import tkinter as tk
from collections import deque
from tkinter import filedialog, messagebox, ttk
from typing import Callable, Iterable, List, Optional
import serial # type: ignore
import serial.tools.list_ports # type: ignore
try:
from version_info import display_version as _DISPLAY_VERSION_VALUE, version as _CANONICAL_VERSION_VALUE
except Exception: # pragma: no cover - optional metadata
_DISPLAY_VERSION_VALUE = ""
_CANONICAL_VERSION_VALUE = ""
if getattr(sys, "frozen", False):
APP_ROOT = os.path.dirname(sys.executable)
else:
APP_ROOT = os.path.dirname(os.path.abspath(__file__))
PYLINK_DLL_PATH = os.path.join(APP_ROOT, "pylink", "JLinkARM.dll")
if os.path.exists(PYLINK_DLL_PATH):
os.environ.setdefault("JLINKARM_DLL_PATH", PYLINK_DLL_PATH)
try:
import pylink # type: ignore
except Exception: # pragma: no cover - pylink is optional
pylink = None
FLASH_PROFILE = "devices/Profiles/AmebaPro3_FreeRTOS_NOR.rdev"
FLASH_IMAGE_DIR = "fw"
FLASH_BAUDRATE = "1500000"
FLASH_MEMORY_TYPE = "nor"
FIRMWARE_COPY_MAP = [
("fw_en.bin", "amebapro3_app.bin"),
("np_boot.bin", "amebapro3_boot.bin"),
]
ARDUINO_SKETCH_DIR = os.path.join(APP_ROOT, "arduino", "control")
ARDUINO_SKETCH_PATH = os.path.join(ARDUINO_SKETCH_DIR, "control.ino")
_DEFAULT_ARDUINO_CLI = os.path.join(APP_ROOT, "arduino", "arduino-cli.exe")
ARDUINO_CLI = os.environ.get("ARDUINO_CLI", _DEFAULT_ARDUINO_CLI if os.path.exists(_DEFAULT_ARDUINO_CLI) else "arduino-cli")
ARDUINO_FQBN = os.environ.get("ARDUINO_FQBN", "arduino:avr:uno")
PORT_REFRESH_MS = 1000
LOG_HISTORY_LIMIT = 2000
HISTORY_LIMIT = 200
JLINK_SCRIPT_EXTS = {".jlink", ".jscr", ".jsf", ".txt"}
GDB_SCRIPT_EXTS = {".gdb", ".gdbinit"}
COMMAND_HISTORY_FILE = os.path.join(APP_ROOT, "command_history.json")
_HISTORY_FILE_LOCK = threading.Lock()
def _folder_version_guess() -> str:
base_name = os.path.basename(APP_ROOT)
match = re.search(r"_v([0-9A-Za-z._-]+)$", base_name)
return match.group(1) if match else ""
def _format_version_candidate(tag: str, trim_trailing_zero: bool = False) -> str:
text = str(tag or "").strip()
if not text:
return ""
if trim_trailing_zero:
parts = [segment for segment in text.replace("_", ".").split(".") if segment]
if not parts:
return ""
while len(parts) > 1 and parts[-1] == "0":
parts.pop()
return ".".join(parts)
return text
def _resolve_app_version() -> str:
for candidate, trim in (
(_DISPLAY_VERSION_VALUE, False),
(_CANONICAL_VERSION_VALUE, True),
(_folder_version_guess(), False),
):
formatted = _format_version_candidate(candidate, trim_trailing_zero=trim)
if formatted:
return formatted
return "unknown"
APP_VERSION = _resolve_app_version()
def timestamp() -> str:
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
def safe_int(value) -> Optional[int]:
try:
return int(value)
except Exception:
return None
def _should_skip_history_entry(text: str) -> bool:
entry = (text or "").strip().lower()
if not entry:
return True
if "testmode" in entry:
return True
if entry.startswith("ready. commands:") and "testmode" in entry:
return True
return False
def _load_history_store() -> dict:
with _HISTORY_FILE_LOCK:
try:
with open(COMMAND_HISTORY_FILE, "r", encoding="utf-8") as handle:
data = json.load(handle)
return data if isinstance(data, dict) else {}
except FileNotFoundError:
return {}
except Exception:
return {}
def _persist_history_store(data: dict) -> None:
with _HISTORY_FILE_LOCK:
try:
tmp_path = COMMAND_HISTORY_FILE + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
os.replace(tmp_path, COMMAND_HISTORY_FILE)
except Exception:
pass
_CONSOLE_GRIP_STYLE = "ConsoleSplitGrip.TFrame"
_CONSOLE_GRIP_STYLE_READY = False
def _ensure_console_grip_style() -> None:
global _CONSOLE_GRIP_STYLE_READY
if _CONSOLE_GRIP_STYLE_READY:
return
try:
style = ttk.Style()
style.configure(_CONSOLE_GRIP_STYLE, background="#9e9e9e", borderwidth=0, relief="flat")
except tk.TclError:
return
_CONSOLE_GRIP_STYLE_READY = True
class ConsoleFindBar:
"""Inline find controls embedded in a console frame."""
TAG = "find_highlight"
def __init__(self, parent: tk.Widget, text_widget: tk.Text) -> None:
self.text_widget = text_widget
self.frame = ttk.Frame(parent)
self.frame.columnconfigure(1, weight=1)
self.query_var = tk.StringVar()
self.match_case_var = tk.BooleanVar(value=False)
self.status_var = tk.StringVar(value="")
self._visible = True
self._last_query = ""
self._current_range: Optional[tuple[str, str]] = None
self._child_layouts: dict[tk.Widget, dict] = {}
self._preferred_height: Optional[int] = None
ttk.Label(self.frame, text="Find:").grid(row=0, column=0, sticky="w", padx=(0, 4))
entry = ttk.Entry(self.frame, textvariable=self.query_var)
entry.grid(row=0, column=1, sticky="ew")
entry.bind("<Return>", lambda _evt: self.find_next())
self.entry = entry
buttons = ttk.Frame(self.frame)
buttons.grid(row=0, column=2, padx=(6, 0))
ttk.Button(buttons, text="Next", width=8, command=self.find_next).grid(row=0, column=0, padx=(0, 3))
ttk.Button(buttons, text="Prev", width=8, command=self.find_prev).grid(row=0, column=1, padx=(0, 3))
ttk.Button(buttons, text="Find All", width=10, command=self.find_all).grid(row=0, column=2, padx=(0, 3))
ttk.Checkbutton(self.frame, text="Match case", variable=self.match_case_var).grid(
row=0, column=3, padx=(6, 0)
)
ttk.Label(self.frame, textvariable=self.status_var, foreground="gray").grid(
row=1, column=0, columnspan=4, sticky="w", pady=(4, 0)
)
self.text_widget.tag_configure(self.TAG, background="#fff59d")
self.query_var.trace_add("write", lambda *_: self._reset_progress())
def grid(self, **kwargs) -> None:
self.frame.grid(**kwargs)
if not self._child_layouts:
self._capture_child_layout()
self._ensure_frame_height()
self._restore_child_layout()
def show(self) -> None:
self.entry.focus_set()
def hide(self) -> None:
pass
def toggle(self) -> None:
self.entry.focus_set()
def find_next(self) -> None:
self._find(backwards=False)
def find_prev(self) -> None:
self._find(backwards=True)
def find_all(self) -> None:
query = self._current_query()
if not query:
return
self.clear_highlight()
start = "1.0"
count = 0
while True:
idx = self.text_widget.search(
query,
start,
stopindex="end",
nocase=not self.match_case_var.get(),
)
if not idx:
break
end = self.text_widget.index(f"{idx}+{len(query)}c")
self.text_widget.tag_add(self.TAG, idx, end)
start = end
count += 1
if count:
self.status_var.set(f"{count} matches highlighted.")
else:
self.status_var.set("No matches found.")
self._current_range = None
def clear_highlight(self) -> None:
try:
self.text_widget.tag_remove(self.TAG, "1.0", "end")
except tk.TclError:
pass
def _current_query(self) -> str:
text = (self.query_var.get() or "").strip()
if not text:
self.status_var.set("Enter text to find.")
self.clear_highlight()
self._current_range = None
return ""
return text
def _find(self, backwards: bool) -> None:
query = self._current_query()
if not query:
return
if query != self._last_query:
self._current_range = None
self._last_query = query
start = self._initial_anchor(backwards)
stop = "1.0" if backwards else "end"
idx = self.text_widget.search(
query,
start,
stopindex=stop,
nocase=not self.match_case_var.get(),
backwards=backwards,
)
wrapped = False
if not idx:
wrapped = True
start = "end" if backwards else "1.0"
idx = self.text_widget.search(
query,
start,
stopindex=stop,
nocase=not self.match_case_var.get(),
backwards=backwards,
)
if not idx:
self.status_var.set("No matches found.")
self.clear_highlight()
self._current_range = None
self.text_widget.bell()
return
end = self.text_widget.index(f"{idx}+{len(query)}c")
self._current_range = (idx, end)
self.clear_highlight()
self.text_widget.tag_add(self.TAG, idx, end)
try:
self.text_widget.see(idx)
self.text_widget.mark_set("insert", end)
except tk.TclError:
pass
self.status_var.set("Wrapped search." if wrapped else "")
def _initial_anchor(self, backwards: bool) -> str:
if self._current_range:
anchor = self._current_range[0] if backwards else self._current_range[1]
if backwards:
try:
return self.text_widget.index(f"{anchor} -1c")
except tk.TclError:
return "1.0"
return anchor
return "end" if backwards else "1.0"
def _reset_progress(self) -> None:
self._current_range = None
self._last_query = ""
self.status_var.set("")
def _capture_child_layout(self) -> None:
children = list(self.frame.grid_slaves())
for child in children:
info = child.grid_info()
# grid_info values are strings; convert ints for needed keys.
for key in ("row", "column", "columnspan", "rowspan", "ipadx", "ipady", "padx", "pady"):
if key in info and isinstance(info[key], str):
try:
info[key] = int(info[key])
except ValueError:
pass
self._child_layouts[child] = info
def _ensure_frame_height(self) -> None:
if self._preferred_height is None:
try:
self.frame.update_idletasks()
self._preferred_height = max(1, self.frame.winfo_reqheight())
except tk.TclError:
self._preferred_height = 1
if self._preferred_height:
try:
self.frame.configure(height=self._preferred_height)
self.frame.grid_propagate(False)
except tk.TclError:
pass
def _apply_hidden_state(self) -> None:
for child in self._child_layouts:
try:
child.grid_remove()
except tk.TclError:
pass
def _restore_child_layout(self) -> None:
for child, info in self._child_layouts.items():
try:
child.grid(**info)
except tk.TclError:
pass
class DevicePanel:
"""Single UART console with optional macro buttons and flashing control."""
def __init__(
self,
parent: tk.Widget,
title: str,
mode: str,
default_baud: str,
line_ending: str,
console_height: int = 15,
flash_handler: Optional[Callable[[], None]] = None,
flash_button_label: str = "Flash Firmware",
arduino_flash_handler: Optional[Callable[[], None]] = None,
on_connect: Optional[Callable[[str], None]] = None,
on_disconnect: Optional[Callable[[str], None]] = None,
exclude_ports_provider: Optional[Callable[[], Iterable[str]]] = None,
before_button_callback: Optional[Callable[[], None]] = None,
) -> None:
self.mode = mode
self.line_ending = line_ending or ""
self.flash_handler = flash_handler
self.flash_button_label = flash_button_label
self.arduino_flash_handler = arduino_flash_handler
self.on_connect = on_connect
self.on_disconnect = on_disconnect
self.exclude_ports_provider = exclude_ports_provider
self.before_button_callback = before_button_callback
self.serial_port: Optional[serial.Serial] = None
self.connected_port: Optional[str] = None
self.read_thread: Optional[threading.Thread] = None
self.read_thread_running = False
self.msg_queue: deque[str] = deque()
self.msg_lock = threading.Lock()
self.log_line_count = 0
self.port_map: dict[str, str] = {}
self.raw_history: List[str] = []
self.raw_history_index = 0
self.arduino_history: List[str] = []
self.arduino_history_index = 0
self.history_frame = None
self.history_grip: Optional[ttk.Frame] = None
self.find_bar: Optional[ConsoleFindBar] = None
self._sash_drag_start: Optional[int] = None
self._sash_drag_initial = 0
self.frame = ttk.LabelFrame(parent, text=title)
self._build_widgets(console_height, default_baud)
self._load_history_cache()
self.enable_controls(False)
self.refresh_ports()
self._schedule_port_refresh()
# ------------------------------------------------------------------
# UI construction
# ------------------------------------------------------------------
def _build_widgets(self, console_height: int, default_baud: str) -> None:
conn_frame = ttk.Frame(self.frame)
conn_frame.pack(fill="x", padx=5, pady=5)
conn_frame.columnconfigure(1, weight=1)
ttk.Label(conn_frame, text="Port:").grid(row=0, column=0, sticky="w")
self.port_var = tk.StringVar()
self.port_combo = ttk.Combobox(conn_frame, textvariable=self.port_var, state="readonly", width=80)
self.port_combo.grid(row=0, column=1, sticky="ew", padx=3)
ttk.Label(conn_frame, text="Baud:").grid(row=1, column=0, sticky="w", pady=(8, 0))
self.baud_var = tk.StringVar(value=str(default_baud))
ttk.Entry(conn_frame, textvariable=self.baud_var, width=12).grid(row=1, column=1, sticky="w", padx=3)
self.connect_button = ttk.Button(conn_frame, text="Connect", width=12, command=self.connect_serial)
self.connect_button.grid(row=0, column=2, sticky="e", padx=(8, 0))
self.status_var = tk.StringVar(value="Disconnected")
self.status_label = ttk.Label(conn_frame, textvariable=self.status_var, foreground="red")
self.status_label.grid(row=2, column=0, columnspan=4, sticky="w", pady=(6, 0))
console_frame = ttk.Frame(self.frame)
console_frame.pack(fill="both", expand=True, padx=5, pady=5)
console_frame.columnconfigure(0, weight=1)
console_frame.rowconfigure(0, weight=1)
self.console_split = ttk.Panedwindow(console_frame, orient="vertical")
self.console_split.grid(row=0, column=0, columnspan=2, sticky="nsew")
log_container = ttk.Frame(self.console_split)
log_container.columnconfigure(0, weight=1)
log_container.rowconfigure(0, weight=1)
self.log_text = tk.Text(log_container, height=console_height, wrap="none")
self.log_text.grid(row=0, column=0, sticky="nsew")
self.log_text.configure(state="disabled")
vscroll = ttk.Scrollbar(log_container, orient="vertical", command=self.log_text.yview)
vscroll.grid(row=0, column=1, sticky="ns")
self.log_text.configure(yscrollcommand=vscroll.set)
hscroll = ttk.Scrollbar(log_container, orient="horizontal", command=self.log_text.xview)
hscroll.grid(row=1, column=0, sticky="ew")
self.log_text.configure(xscrollcommand=hscroll.set)
self.find_bar = None
if self.mode == "raw":
self.find_bar = ConsoleFindBar(log_container, self.log_text)
self.find_bar.grid(row=2, column=0, columnspan=2, sticky="ew", pady=(4, 0))
self.find_bar.show()
self.console_split.add(log_container, weight=3)
button_row = ttk.Frame(console_frame)
button_row.grid(row=3, column=0, columnspan=2, sticky="ew", pady=(5, 0))
button_row.columnconfigure(0, weight=1)
clear_row = ttk.Frame(button_row)
clear_row.grid(row=0, column=0, sticky="w")
ttk.Button(clear_row, text="Clear Console", width=14, command=self.clear_console).grid(row=0, column=0, padx=(0, 5))
ttk.Button(clear_row, text="Save Log", width=14, command=self.save_console).grid(row=0, column=1)
next_col = 2
self.flash_button = None
if self.flash_handler:
self.flash_button = ttk.Button(clear_row, text=self.flash_button_label, width=18, command=self.flash_handler)
self.flash_button.grid(row=0, column=next_col, padx=(10, 0))
next_col += 1
self.arduino_flash_button = None
if self.mode == "arduino" and self.arduino_flash_handler:
self.arduino_flash_button = ttk.Button(
clear_row, text="Flash Arduino", width=16, command=self._handle_arduino_flash
)
self.arduino_flash_button.grid(row=0, column=next_col, padx=(5, 0))
next_col += 1
if self.mode == "raw":
self._build_raw_controls(console_frame)
else:
self._build_arduino_controls(console_frame)
def _build_raw_controls(self, console_frame: tk.Widget) -> None:
history_container = ttk.Frame(self.console_split)
history_container.columnconfigure(0, weight=1)
history_container.rowconfigure(2, weight=1)
self.history_frame = history_container
_ensure_console_grip_style()
grip = ttk.Frame(history_container, height=1, style=_CONSOLE_GRIP_STYLE, cursor="sb_v_double_arrow")
grip.grid(row=0, column=0, columnspan=2, sticky="ew")
history_container.rowconfigure(0, minsize=1)
self.history_grip = grip
grip.bind("<ButtonPress-1>", self._start_history_sash_drag)
grip.bind("<B1-Motion>", self._drag_history_sash)
grip.bind("<ButtonRelease-1>", self._end_history_sash_drag)
grip.bind("<Enter>", lambda _evt: self._set_history_grip_active(True))
grip.bind("<Leave>", lambda _evt: self._set_history_grip_active(False))
ttk.Label(history_container, text="Command history:").grid(row=1, column=0, sticky="w", pady=(5, 0))
self.history_listbox = tk.Listbox(history_container, height=8, activestyle="none")
self.history_listbox.grid(row=2, column=0, sticky="nsew")
self.history_listbox.bind("<Double-Button-1>", self.history_select)
self.history_listbox.bind("<Key>", self._history_keypress)
history_scroll = ttk.Scrollbar(history_container, orient="vertical", command=self.history_listbox.yview)
history_scroll.grid(row=2, column=1, sticky="ns")
self.history_listbox.configure(yscrollcommand=history_scroll.set)
input_frame = ttk.Frame(history_container)
input_frame.grid(row=3, column=0, columnspan=2, sticky="ew", pady=(5, 0))
input_frame.columnconfigure(0, weight=1)
self.raw_entry_var = tk.StringVar()
self.raw_entry = ttk.Entry(input_frame, textvariable=self.raw_entry_var)
self.raw_entry.grid(row=0, column=0, sticky="ew", padx=(0, 5))
self.raw_entry.bind("<Return>", lambda _evt: self.send_input_line())
self.raw_entry.bind("<Up>", self._raw_history_prev)
self.raw_entry.bind("<Down>", self._raw_history_next)
self.raw_send_button = ttk.Button(input_frame, text="Send", command=self.send_input_line)
self.raw_send_button.grid(row=0, column=1)
self.arduino_entry = None
self.arduino_entry_var = None
self.arduino_send_button = None
self.console_split.add(history_container, weight=1)
def _build_arduino_controls(self, console_frame: tk.Widget) -> None:
command_frame = ttk.Frame(console_frame)
command_frame.grid(row=3, column=0, columnspan=2, sticky="ew", pady=(5, 0))
command_frame.columnconfigure(0, weight=1)
command_frame.columnconfigure((0, 1, 2), weight=1)
commands = [
("Download Mode", ("pg 1", "reset")),
("Normal Mode", ("pg 0", "reset")),
("RESET", ("reset",)),
]
for idx, (label, payload) in enumerate(commands):
ttk.Button(
command_frame,
text=label,
command=lambda seq=payload: self.send_macro(seq),
).grid(row=0, column=idx, padx=5, sticky="ew")
input_frame = ttk.Frame(console_frame)
input_frame.grid(row=4, column=0, sticky="ew", pady=(5, 0))
input_frame.columnconfigure(0, weight=1)
self.arduino_entry_var = tk.StringVar()
self.arduino_entry = ttk.Entry(input_frame, textvariable=self.arduino_entry_var)
self.arduino_entry.grid(row=0, column=0, sticky="ew", padx=(0, 5))
self.arduino_entry.bind("<Return>", lambda _evt: self.send_arduino_input())
self.arduino_entry.bind("<Up>", self._arduino_history_prev)
self.arduino_entry.bind("<Down>", self._arduino_history_next)
self.arduino_send_button = ttk.Button(input_frame, text="Send", command=self.send_arduino_input)
self.arduino_send_button.grid(row=0, column=1)
self.history_listbox = None
self.raw_entry = None
self.raw_entry_var = None
self.raw_send_button = None
def _handle_arduino_flash(self) -> None:
if not self.arduino_flash_handler:
return
if self.before_button_callback:
try:
self.before_button_callback()
except Exception:
pass
try:
self.arduino_flash_handler()
except Exception as exc:
messagebox.showerror("Arduino Flash", f"Unable to start flash: {exc}")
# ------------------------------------------------------------------
# Console helpers
# ------------------------------------------------------------------
def enable_controls(self, enabled: bool) -> None:
state = "normal" if enabled else "disabled"
targets = [
self.raw_entry,
self.raw_send_button,
self.arduino_entry,
self.arduino_send_button,
self.arduino_flash_button,
]
for widget in targets:
if widget is not None:
widget.configure(state=state)
def clear_console(self) -> None:
with self.msg_lock:
self.msg_queue.clear()
self.log_text.configure(state="normal")
self.log_text.delete("1.0", "end")
self.log_text.configure(state="disabled")
self.log_line_count = 0
self._clear_find_highlight()
def save_console(self) -> None:
try:
content = self.log_text.get("1.0", "end-1c")
except tk.TclError:
content = ""
if not content.strip():
messagebox.showinfo("Save Console Log", "There is no console output to save.")
return
path = filedialog.asksaveasfilename(
title="Save Console Log",
defaultextension=".txt",
filetypes=(("Text Files", "*.txt"), ("All Files", "*.*")),
)
if not path:
return
try:
with open(path, "w", encoding="utf-8") as handle:
handle.write(content)
except Exception as exc:
messagebox.showerror("Save Failed", f"Unable to save log: {exc}")
def _clear_find_highlight(self) -> None:
if self.find_bar:
self.find_bar.clear_highlight()
def _start_history_sash_drag(self, event: tk.Event) -> None:
if not self.console_split:
return
try:
self._sash_drag_initial = self.console_split.sashpos(0)
except tk.TclError:
self._sash_drag_start = None
return
self._sash_drag_start = event.y_root
self._set_history_grip_active(True)
def _drag_history_sash(self, event: tk.Event) -> None:
if self._sash_drag_start is None or not self.console_split:
return
delta = event.y_root - self._sash_drag_start
new_pos = self._sash_drag_initial + delta
try:
self.console_split.sashpos(0, max(0, new_pos))
except tk.TclError:
pass
def _end_history_sash_drag(self, _event: tk.Event) -> None:
self._sash_drag_start = None
self._set_history_grip_active(False)
def _set_history_grip_active(self, active: bool) -> None:
if not self.history_frame or not self.history_grip:
return
size = 6 if active else 1
try:
self.history_frame.rowconfigure(0, minsize=size)
self.history_grip.configure(height=size)
except tk.TclError:
pass
def queue_message(self, text: str) -> None:
if not text:
return
with self.msg_lock:
self.msg_queue.append(str(text))
def poll_queue(self) -> None:
with self.msg_lock:
if not self.msg_queue:
return
entries = list(self.msg_queue)
self.msg_queue.clear()
self._log_lines(entries)
def _log_lines(self, entries: Iterable[str]) -> None:
prepared = []
for entry in entries:
trimmed = str(entry).strip()
if not trimmed:
continue
if trimmed.strip(".") == "":
continue
prepared.append(f"[{timestamp()}] {trimmed}\n")
if not prepared:
return
self.log_line_count += len(prepared)
block = "".join(prepared)
self.log_text.configure(state="normal")
self.log_text.insert("end", block)
if LOG_HISTORY_LIMIT and self.log_line_count > LOG_HISTORY_LIMIT:
excess = self.log_line_count - LOG_HISTORY_LIMIT
self.log_text.delete("1.0", f"{excess + 1}.0")
self.log_line_count = LOG_HISTORY_LIMIT
self.log_text.see("end")
self.log_text.configure(state="disabled")
# ------------------------------------------------------------------
# Serial control
# ------------------------------------------------------------------
def refresh_ports(self) -> None:
selected = self._resolve_port(self.port_var.get())
excluded = set(self.exclude_ports_provider() or []) if self.exclude_ports_provider else set()
mapping: dict[str, str] = {}
display: List[str] = []
for port in serial.tools.list_ports.comports():
desc = (port.description or port.name or "").strip()
label = port.device if not desc else f"{port.device}: {desc}"
keep = self.connected_port == port.device
if port.device in excluded and not keep:
continue
mapping[label] = port.device
display.append(label)
self.port_map = mapping
self.port_combo["values"] = display
if not display:
self.port_var.set("")
return
if selected:
for label, device in mapping.items():
if device == selected:
self.port_var.set(label)
return
preferred = next((label for label, device in mapping.items() if device not in excluded), None)
self.port_var.set(preferred or display[0])
def _schedule_port_refresh(self) -> None:
try:
self.frame.after(PORT_REFRESH_MS, self._auto_refresh_ports)
except tk.TclError:
pass
def _auto_refresh_ports(self) -> None:
if not self.frame.winfo_exists():
return
self.refresh_ports()
self._schedule_port_refresh()
def connect_serial(self) -> None:
if self.serial_port and self.serial_port.is_open:
self.disconnect_serial()
return
port = self._resolve_port(self.port_var.get())
if not port:
messagebox.showwarning("Select Port", "Please select a COM port")
return
try:
baud = int((self.baud_var.get() or "0").strip())
except ValueError:
messagebox.showwarning("Invalid Baud", "Enter a numeric baud rate")
return
try:
self.serial_port = serial.Serial(port, baudrate=baud, timeout=0.1)
except Exception as exc:
messagebox.showerror("Serial Error", str(exc))
return
self.connected_port = port
self.status_var.set(f"Connected to {port} @ {baud}")
self._set_status_color("green")
self.connect_button.configure(text="Disconnect")
self.enable_controls(True)
if self.on_connect:
try:
self.on_connect(port)
except Exception:
pass
self.read_thread_running = True
self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
self.read_thread.start()
def disconnect_serial(self) -> None:
self.read_thread_running = False
if self.serial_port:
try:
self.serial_port.close()
except Exception:
pass
if self.read_thread and self.read_thread.is_alive():
self.read_thread.join(timeout=0.2)
self.serial_port = None
last_port = self.connected_port
self.connected_port = None
self.status_var.set("Disconnected")
self._set_status_color("red")
self.connect_button.configure(text="Connect")
self.enable_controls(False)
if last_port and self.on_disconnect:
try:
self.on_disconnect(last_port)
except Exception:
pass
def _set_status_color(self, color: str) -> None:
try:
self.status_label.configure(foreground=color)
except tk.TclError:
pass
def _resolve_port(self, value: str) -> str:
if value in self.port_map:
return self.port_map[value]
base = (value or "").split(":", 1)[0].strip()
return base
def is_connected(self) -> bool:
return bool(self.serial_port and self.serial_port.is_open)
def get_selected_port(self) -> str:
return self._resolve_port(self.port_var.get())
def send_macro(self, commands: Iterable[str]) -> None:
command_list = [str(cmd) for cmd in commands if str(cmd).strip()]
if not command_list:
return
if not (self.serial_port and self.serial_port.is_open):
messagebox.showwarning(
"Not Connected", f"Connect {(self.frame.cget('text') or 'this device')} before sending commands."
)
return
if self.before_button_callback:
try:
self.before_button_callback()
except Exception:
pass
threading.Thread(target=self._run_macro_commands, args=(command_list,), daemon=True).start()
def _run_macro_commands(self, commands: List[str]) -> None:
for cmd in commands:
self.queue_message(f"[macro] {cmd}")
self.send_uart(cmd)
time.sleep(0.05)
def send_input_line(self) -> None:
if not self.raw_entry_var:
return
payload = self.raw_entry_var.get()
if not payload:
return
self.send_uart(payload)
if not _should_skip_history_entry(payload):
self.raw_history.append(payload)
if len(self.raw_history) > HISTORY_LIMIT:
self.raw_history.pop(0)
if self.history_listbox and self.history_listbox.size():
self.history_listbox.delete(0)
self.raw_history_index = len(self.raw_history)
if self.history_listbox:
self.history_listbox.insert("end", payload)
self._persist_history_cache()
self.raw_entry_var.set("")
def send_arduino_input(self) -> None:
if not self.arduino_entry_var:
return
payload = self.arduino_entry_var.get()
if not payload:
return
self.send_uart(payload)
self.arduino_history.append(payload)
self.arduino_history_index = len(self.arduino_history)
self.arduino_entry_var.set("")
def history_select(self, _event) -> None:
if not self.history_listbox:
return
sel = self.history_listbox.curselection()
if not sel:
return
cmd = self.history_listbox.get(sel[0])
self.raw_entry_var.set(cmd)
if self.raw_entry:
try:
self.raw_entry.icursor("end")
except tk.TclError:
pass
self.send_uart(cmd)
def _history_keypress(self, event) -> None:
if not self.history_listbox:
return
if event.keysym.lower() == "delete":
self._delete_history_entry()
def _history_storage_key(self) -> str:
title = ""
if self.frame:
try:
title = str(self.frame.cget("text") or "").strip()
except tk.TclError:
title = ""
return f"{self.mode}:{title}"
def _load_history_cache(self) -> None:
if self.mode != "raw":
return
entries = _load_history_store().get(self._history_storage_key(), [])
if not entries:
return
filtered: List[str] = []
dirty = False
for cmd in entries[-HISTORY_LIMIT:]:
if not isinstance(cmd, str):
dirty = True
continue
if _should_skip_history_entry(cmd):
dirty = True
continue
filtered.append(cmd)
self.raw_history = []
if self.history_listbox:
self.history_listbox.delete(0, "end")
for cmd in filtered:
self.raw_history.append(cmd)
if self.history_listbox:
self.history_listbox.insert("end", cmd)
self.raw_history = self.raw_history[-HISTORY_LIMIT:]
self.raw_history_index = len(self.raw_history)
if dirty:
self._persist_history_cache()
def _persist_history_cache(self) -> None:
if self.mode != "raw":
return
data = _load_history_store()
cleaned = [cmd for cmd in self.raw_history[-HISTORY_LIMIT:] if isinstance(cmd, str) and not _should_skip_history_entry(cmd)]
data[self._history_storage_key()] = cleaned
_persist_history_store(data)
def _history_mouse_motion(self, event) -> None:
return
def _hide_history_delete_button(self) -> None:
return
def _delete_history_entry(self) -> None:
idx = None
if not self.history_listbox:
return
sel = self.history_listbox.curselection()
if sel:
idx = sel[0]
if idx is None:
return
listbox_size = self.history_listbox.size() if self.history_listbox else 0
if idx >= listbox_size:
return
self.history_listbox.delete(idx)
if 0 <= idx < len(self.raw_history):
del self.raw_history[idx]
if self.raw_history_index > idx:
self.raw_history_index -= 1
self.raw_history_index = min(self.raw_history_index, len(self.raw_history))
self._persist_history_cache()
def _raw_history_prev(self, _event) -> str:
self._navigate_history(self.raw_history, "raw_history_index", self.raw_entry_var, self.raw_entry, -1)
return "break"
def _raw_history_next(self, _event) -> str:
self._navigate_history(self.raw_history, "raw_history_index", self.raw_entry_var, self.raw_entry, 1)
return "break"
def _arduino_history_prev(self, _event) -> str:
self._navigate_history(self.arduino_history, "arduino_history_index", self.arduino_entry_var, self.arduino_entry, -1)
return "break"
def _arduino_history_next(self, _event) -> str:
self._navigate_history(self.arduino_history, "arduino_history_index", self.arduino_entry_var, self.arduino_entry, 1)
return "break"
def _navigate_history(self, history: List[str], attr: str, var: Optional[tk.StringVar], widget, delta: int) -> None:
if not history or var is None:
return
index = getattr(self, attr, len(history))
index = max(0, min(len(history), index + delta))
setattr(self, attr, index)
var.set("" if index == len(history) else history[index])
if widget:
try:
widget.icursor("end")
except tk.TclError:
pass
def send_uart(self, text: str) -> None:
if not (self.serial_port and self.serial_port.is_open):
return
try:
payload = text + self.line_ending
self.serial_port.write(payload.encode("utf-8"))
except Exception as exc:
messagebox.showerror("UART Error", str(exc))
def _read_loop(self) -> None:
buffer = bytearray()
while self.read_thread_running:
port = self.serial_port
if not port or not port.is_open:
time.sleep(0.05)
continue
try:
waiting = port.in_waiting
except Exception:
waiting = 0
if waiting:
try:
chunk = port.read(waiting)
except Exception:
chunk = b""
if chunk:
buffer.extend(chunk)
while True:
idx = buffer.find(b"\n")
if idx == -1:
break
raw_line = buffer[:idx]
del buffer[: idx + 1]
raw_line = raw_line.rstrip(b"\r")
if not raw_line:
continue
try:
text = raw_line.decode("utf-8", errors="replace")
except Exception:
text = str(raw_line)
with self.msg_lock:
self.msg_queue.append(text)
else:
time.sleep(0.002)
if buffer:
try:
text = buffer.rstrip(b"\r\n").decode("utf-8", errors="replace")
except Exception:
text = str(buffer)
with self.msg_lock:
self.msg_queue.append(text)
class JLinkGDBWrapper:
SIZE_MAP = {"b": 1, "h": 2, "w": 4, "g": 8}
_DELAY_UNIT_MAP = {
"ms": "ms",
"msec": "ms",
"msecs": "ms",
"millisecond": "ms",
"milliseconds": "ms",
"us": "us",
"usec": "us",
"usecs": "us",
"microsecond": "us",
"microseconds": "us",
"s": "s",
"sec": "s",
"secs": "s",
"second": "s",
"seconds": "s",
}
_DELAY_UNIT_LABEL = {"ms": "ms", "us": "us", "s": "s"}
def __init__(self, link, session=None) -> None:
self.link = link
self.session = session
def execute(self, command: str):
if not command:
raise ValueError("Empty command")
stripped = command.strip()
op = stripped.split(None, 1)[0].lower()
if op.startswith("x"):
return True, self._handle_mem_read(stripped)
if op == "set":
return True, self._handle_mem_write(stripped)
if op in ("sleep", "delay"):
return True, self._handle_delay_command(op, stripped)
if op in ("halt", "h"):
return True, self._handle_halt()
if op in ("go", "g", "resume", "run"):
return True, self._handle_go()
if op in ("reset", "r"):
return True, self._handle_reset(False)
if op in ("reset_halt", "rh"):
return True, self._handle_reset(True)
if op in ("b", "break"):
return True, self._handle_break_set(stripped)
if op in ("bc", "breakclear", "clearbreak", "deletebreak", "bd"):
return True, self._handle_break_clear(stripped)
if op in ("s", "si", "step", "stepi"):
return True, self._handle_step_instruction()
if op in ("next", "n", "ni"):
return True, self._handle_step_over()
if op in ("finish", "fin", "so"):
return True, self._handle_step_out()
if stripped.lower() in ("i r", "ir", "info r", "info reg", "info regs", "info registers"):
return True, self._handle_info_registers()
return False, self.link.exec_command(stripped)
def _handle_mem_read(self, command: str):
token = command[1:].strip()
fmt = ""
expr = token
if token.startswith("/"):
token = token[1:]
spec = []
while token and not token[0].isspace():
spec.append(token[0])
token = token[1:]
fmt = "".join(spec).lower()
expr = token.strip()
if not expr:
raise ValueError("Address is required for 'x' command")
count = 1
size_char = "w"
if fmt:
match = re.match(r"(?P<count>\d+)?(?P<size>[bhwg]?)$", fmt)
if not match:
raise ValueError("Invalid x/ specifier")
if match.group("count"):
count = max(1, int(match.group("count")))
if match.group("size"):
size_char = match.group("size")
count = min(count, 256)
width = self.SIZE_MAP.get(size_char, 4)
address = self._parse_int(expr)
return self._read_memory(address, width, count)
def _handle_mem_write(self, command: str):
match = re.match(r"set\s+\*(?P<addr>[^=]+)=(?P<value>.+)$", command, re.IGNORECASE)
if not match:
raise ValueError("Use 'set *ADDR=VALUE' for writes")
address = self._parse_int(match.group("addr"))
value = self._parse_int(match.group("value"))
self._write_memory(address, value, 4)
return f"Wrote 0x{value & 0xFFFFFFFF:08x} to {address:#010x}"
def _handle_delay_command(self, op: str, command: str):
parts = command.split(None, 1)
if len(parts) < 2 or not parts[1].strip():
raise ValueError(f"Use '{op} <milliseconds>' to insert a delay.")
payload = parts[1].strip()
tokens = payload.split()
unit = "ms"
if tokens:
unit_key = tokens[-1].lower()
if unit_key in self._DELAY_UNIT_MAP:
unit = self._DELAY_UNIT_MAP[unit_key]
tokens = tokens[:-1]
payload = " ".join(tokens).strip()
if not payload:
raise ValueError("Delay value is required before the unit.")
duration = self._parse_expression(payload)
if duration < 0:
raise ValueError("Delay must be non-negative.")
seconds = duration / 1000.0
if unit == "us":
seconds = duration / 1_000_000.0
elif unit == "s":
seconds = duration
if duration > 0 and seconds > 0:
time.sleep(seconds)
label = self._DELAY_UNIT_LABEL[unit]
return f"Delay {duration} {label}."
def _handle_halt(self):
func = getattr(self.link, "halt", None)
if not func:
raise RuntimeError("halt() is not supported by this J-Link binding")
func()
return self._pc_report_line("Core halted.")
def _handle_go(self):
attempts = [
getattr(self.link, "go", None),
getattr(self.link, "run", None),
getattr(self.link, "restart", None),
]
last_error = None
for func in attempts:
if not callable(func):
continue
try:
func()
return "Execution resumed."
except Exception as exc:
last_error = exc
raw = getattr(self.link, "exec_command", None)
if raw:
for cmd in ("g", "go", "Run"):
try:
result = raw(cmd)
if result in (None, 0, "0", "OK"):
return "Execution resumed."
return str(result)
except Exception as exc:
last_error = exc
raise RuntimeError(f"Resume is not supported ({last_error})")
def _handle_reset(self, halt: bool):
func = getattr(self.link, "reset", None)
if not func:
raise RuntimeError("reset() is not supported by this J-Link binding")
try:
if halt:
func(True)
else:
func(False)
except TypeError:
try:
if halt:
func(halt=True)
else:
func()
except TypeError:
func()
return "Reset issued." if not halt else "Reset and halted."
def _handle_break_set(self, command: str):
parts = command.split(None, 1)
if len(parts) < 2:
raise ValueError("Use 'b <address>' to set a breakpoint.")
addr = self._parse_int(parts[1])
return self.install_breakpoint(addr)
def _handle_break_clear(self, command: str):
parts = command.split(None, 1)
if len(parts) < 2:
raise ValueError("Use 'bc <address>' to clear a breakpoint.")
addr = self._parse_int(parts[1])
return self.remove_breakpoint(addr)
def _handle_step_instruction(self):
self._single_step_no_report()
return self._pc_report_line("Single step executed.")
def _handle_step_over(self):
over = getattr(self.link, "step_over", None)
if over:
over()
return self._pc_report_line("Step over executed.")
raw = getattr(self.link, "exec_command", None)
if raw:
for cmd in ("StepOver", "so", "n", "Next"):
try:
raw(cmd)
return self._pc_report_line("Step over executed.")
except Exception:
continue
return self._software_step_over()
def _handle_step_out(self):
fin = getattr(self.link, "step_out", None)
if fin:
fin()
return self._pc_report_line("Step out executed.")
raw = getattr(self.link, "exec_command", None)
if raw:
for cmd in ("StepOut", "finish", "Finish"):
try:
raw(cmd)
return self._pc_report_line("Step out executed.")
except Exception:
continue
return self._software_step_out()
def install_breakpoint(self, addr: int):
handle = self._set_breakpoint_api(addr)
if handle is None:
handle = self._set_breakpoint_exec(addr)
if handle is None:
raise RuntimeError("Breakpoint could not be set on this target.")
self._record_breakpoint(addr, handle)
return f"Breakpoint set at {addr:#010x}"
def remove_breakpoint(self, addr: int):
normalized = self._normalize_addr(addr)
handle = self._resolve_breakpoint_handle(normalized)
cleared = False
if handle is not None:
cleared = self._clear_breakpoint_api(handle)
if not cleared:
cleared = self._clear_breakpoint_exec(normalized)
if not cleared:
raise RuntimeError("Unable to clear breakpoint; no matching breakpoint found.")
self._forget_breakpoint(normalized)
return f"Breakpoint cleared at {normalized:#010x}"
def reinstall_breakpoints(self):
if not self.session or not self.session.breakpoints:
return
addresses = list(self.session.breakpoints.keys())
self.session.breakpoints.clear()
for addr in addresses:
try:
self.install_breakpoint(addr)
except Exception as exc:
self.session.queue_message(f"[bp] Unable to restore breakpoint at {addr:#010x}: {exc}")
def _set_breakpoint_api(self, addr: int):
last_error = None
for name in ("breakpoint_set", "software_breakpoint_set", "hardware_breakpoint_set"):
setter = getattr(self.link, name, None)
if not callable(setter):
continue
try:
handle = setter(addr)
if handle:
return handle
except Exception as exc:
last_error = exc
return None
def _set_breakpoint_exec(self, addr: int):
exec_cmd = getattr(self.link, "exec_command", None)
if not exec_cmd:
return None
commands = [
f"SetBP {addr:#x},0x1",
f"SetBP {addr:#x}",
f"SetBPAddr = {addr:#x}",
f"SetBPAddr {addr:#x}",
f"SetBPAddr,{addr:#x}",
f"BreakSet {addr:#x}",
f"BP {addr:#x}",
f"Break.Set {addr:#x}",
]
last_error = None
for cmd in commands:
try:
result = exec_cmd(cmd)
if result not in (None, 0, "0", "OK"):
last_error = result
continue
handle = self._find_breakpoint_handle(addr)
return handle if handle else None
except Exception as exc:
last_error = exc
if last_error:
raise RuntimeError(f"Breakpoint command failed: {last_error}")
return None
def _clear_breakpoint_api(self, handle) -> bool:
clearer = getattr(self.link, "breakpoint_clear", None)
if not callable(clearer):
return False
try:
return bool(clearer(handle))
except Exception:
return False
def _clear_breakpoint_exec(self, addr: int) -> bool:
exec_cmd = getattr(self.link, "exec_command", None)
if not exec_cmd:
return False
commands = [
f"ClrBP {addr:#x}",
f"ClrBP {addr:#x},0x1",
f"ClearBP {addr:#x}",
f"ClrBPAddr = {addr:#x}",
f"ClrBPAddr {addr:#x}",
f"BreakClr {addr:#x}",
f"Break.Clr {addr:#x}",
]
last_error = None
for cmd in commands:
try:
result = exec_cmd(cmd)
if result in (None, 0, "0", "OK"):
return True
last_error = result
except Exception as exc:
last_error = exc
if last_error:
raise RuntimeError(f"Breakpoint clear failed: {last_error}")
return False
def _suspend_breakpoints(self, addresses):
if not self.session:
return []
suspended = []
seen = set()
for addr in addresses:
if addr is None:
continue
normalized = self._normalize_addr(addr)
if normalized in seen:
continue
seen.add(normalized)
if not self.session.has_breakpoint(normalized):
continue
handle = self.session.get_breakpoint_handle(normalized)
cleared = False
if handle is not None:
cleared = self._clear_breakpoint_api(handle)
if not cleared:
cleared = self._clear_breakpoint_exec(normalized)
if cleared:
self.session.unregister_breakpoint(normalized)
suspended.append(normalized)
return suspended
def _restore_breakpoints(self, addresses):
if not self.session:
return
for addr in {self._normalize_addr(a) for a in addresses if a is not None}:
if addr is None:
continue
try:
handle = self._set_breakpoint_api(addr)
if handle is None:
handle = self._set_breakpoint_exec(addr)
if handle is None:
raise RuntimeError("Unable to restore")
self.session.register_breakpoint(addr, handle)
except Exception as exc:
self.session.queue_message(f"[bp] Unable to restore breakpoint at {addr:#010x}: {exc}")
def _record_breakpoint(self, addr: int, handle):
if not self.session:
return
self.session.register_breakpoint(addr, handle)
def _forget_breakpoint(self, addr: int):
if not self.session:
return
self.session.unregister_breakpoint(addr)
def _resolve_breakpoint_handle(self, addr: int):
if self.session:
cached = self.session.get_breakpoint_handle(addr)
if cached is not None:
return cached
finder = getattr(self.link, "breakpoint_find", None)
if not callable(finder):
return None
try:
handle = finder(addr)
return handle if handle else None
except Exception:
return None
def _find_breakpoint_handle(self, addr: int):
finder = getattr(self.link, "breakpoint_find", None)
if not callable(finder):
return None
try:
return finder(addr)
except Exception:
return None
def _normalize_addr(self, addr: int) -> int:
return int(addr) & 0xFFFFFFFF
def _pc_report_line(self, prefix: str):
value = self._read_pc()
if value is None:
return [prefix]
if self.session:
self.session.record_pc_report()
return [f"{prefix} — PC = 0x{value:08x}"]
def _read_pc(self):
getter = getattr(self.link, "register_read", None)
if not callable(getter):
return None
for key in ("PC", "pc", 15):
try:
value = getter(key)
if value is not None:
return int(value) & 0xFFFFFFFF
except Exception:
continue
return None
def _read_register(self, name):
getter = getattr(self.link, "register_read", None)
if not callable(getter):
return None
for candidate in self._register_candidates(name):
try:
value = getter(candidate)
except Exception:
continue
if value is None:
continue
try:
return int(value) & 0xFFFFFFFF
except Exception:
continue
return None
def _register_candidates(self, name):
mapping = {
"pc": ["PC", "pc", "R15", 15],
"lr": ["LR", "lr", "R14", 14],
"sp": ["SP", "sp", "R13", 13],
"fp": ["FP", "fp", "R11", 11],
}
if isinstance(name, str):
key = name.lower()
if key in mapping:
return mapping[key]
variants = [name, name.lower(), name.upper()]
if name.upper().startswith("R"):
try:
idx = int(name[1:])
variants.append(idx)
except Exception:
pass
return variants
return [name]
def _software_step_over(self):
current_pc = self._read_pc()
if current_pc is None:
raise RuntimeError("Unable to read PC.")
next_addr = self._next_instruction_address(current_pc)
suspended = self._suspend_breakpoints([current_pc, next_addr])
handle = None
via_api = False
bp_installed = False
lr_before = self._read_register("LR")
try:
stepped_pc = self._single_step_no_report()
if stepped_pc is not None and self._pc_matches_target(stepped_pc, next_addr):
return self._pc_report_line("Step over executed.")
lr_after = self._read_register("LR")
if not self._stepped_into_subroutine(lr_before, lr_after, next_addr):
return self._pc_report_line("Step over executed.")
handle, via_api = self._set_temporary_breakpoint(next_addr)
bp_installed = True
self._resume_cpu()
if not self._wait_for_halt(expected_pc=next_addr):
raise RuntimeError("Step over timed out; target did not halt.")
return self._pc_report_line("Step over executed.")
finally:
if bp_installed:
self._clear_temporary_breakpoint(handle, next_addr, via_api)
self._restore_breakpoints(suspended)
def _software_step_out(self):
lr = self._read_register("LR")
if lr is None:
sp = self._read_register("SP")
lr = self._read_stack_word(sp) if sp is not None else None
if lr is None:
raise RuntimeError("Unable to determine LR for step out.")
target = lr & ~1
if target == 0:
raise RuntimeError("LR is zero; cannot determine return address.")
current_pc = self._read_pc()
suspended = self._suspend_breakpoints([current_pc, target])
handle, via_api = self._set_temporary_breakpoint(target)
try:
self._resume_cpu()
if not self._wait_for_halt(expected_pc=target):
raise RuntimeError("Step out timed out; target did not halt.")
return self._pc_report_line("Step out executed.")
finally:
self._clear_temporary_breakpoint(handle, target, via_api)
self._restore_breakpoints(suspended)
def _next_instruction_address(self, pc=None):
if pc is None:
pc = self._read_pc()
if pc is None:
raise RuntimeError("Unable to read PC.")
current = pc & ~1
size = self._estimate_thumb_instruction_size(current)
return (current + size) & 0xFFFFFFFF
def _estimate_thumb_instruction_size(self, addr: int) -> int:
opcode = self._read_halfword(addr)
if opcode is None:
return 2
prefix = (opcode >> 11) & 0x1F
if prefix in (0b11101, 0b11110, 0b11111):
return 4
return 2
def _read_halfword(self, addr: int):
reader = getattr(self.link, "memory_read16", None)
if not callable(reader):
return None
base = addr & ~1
try:
values = reader(base, 1)
if values:
return values[0] & 0xFFFF
except Exception:
return None
return None
def _read_stack_word(self, sp):
reader = getattr(self.link, "memory_read32", None)
if not callable(reader) or sp is None:
return None
base = sp & ~3
try:
values = reader(base, 1)
if values:
return values[0] & 0xFFFFFFFF
except Exception:
return None
return None
def _set_temporary_breakpoint(self, addr: int):
last_error = None
try:
handle = self._set_breakpoint_api(addr)
if handle is not None:
return handle, True
except Exception as exc:
last_error = exc
try:
handle = self._set_breakpoint_exec(addr)
if handle is not None:
return handle, False
except Exception as exc:
last_error = exc
raise RuntimeError(f"Unable to place temporary breakpoint: {last_error}")
def _clear_temporary_breakpoint(self, handle, addr: int, via_api: bool):
try:
if via_api and handle:
self._clear_breakpoint_api(handle)
else:
self._clear_breakpoint_exec(addr)
except Exception:
pass
def _resume_cpu(self):
attempts = [
getattr(self.link, "go", None),
getattr(self.link, "run", None),
getattr(self.link, "restart", None),
]
last_error = None
for func in attempts:
if not callable(func):
continue
try:
func()
return
except Exception as exc:
last_error = exc
raw = getattr(self.link, "exec_command", None)
if raw:
for cmd in ("g", "go", "Run"):
try:
raw(cmd)
return
except Exception as exc:
last_error = exc
raise RuntimeError(f"Resume is not supported ({last_error})")
def _wait_for_halt(self, timeout: float = 5.0, expected_pc: Optional[int] = None) -> bool:
deadline = time.time() + timeout
target = self._normalize_addr(expected_pc) if expected_pc is not None else None
while time.time() < deadline:
try:
if self.link.halted():
return True
except Exception:
pass
if target is not None:
pc = self._read_pc()
if pc is not None and self._pc_matches_target(pc, target):
# Some J-Link bindings don't report a halted state even when
# a breakpoint is hit, so fall back to checking the PC.
return True
time.sleep(0.01)
if target is not None:
pc = self._read_pc()
if pc is not None and self._pc_matches_target(pc, target):
return True
return False
def _pc_matches_target(self, pc: int, target: int) -> bool:
normalized_pc = self._normalize_addr(pc)
normalized_target = self._normalize_addr(target)
if normalized_pc == normalized_target:
return True
return (normalized_pc & ~1) == (normalized_target & ~1)
def _stepped_into_subroutine(self, lr_before, lr_after, next_addr: int) -> bool:
if lr_after is None:
return True
if not self._pc_matches_target(lr_after, next_addr):
return False
if lr_before is None:
return True
return not self._pc_matches_target(lr_before, next_addr)
def _single_step_no_report(self):
step = getattr(self.link, "step", None)
if step:
step()
return self._read_pc()
raw = getattr(self.link, "exec_command", None)
if raw:
raw("Step")
return self._read_pc()
raise RuntimeError("Single-step is not supported by this J-Link binding.")
def _handle_info_registers(self):
entries = self._gather_registers()
if not entries:
raise RuntimeError("Unable to read registers.")
width = max(len(name) for name, _ in entries)
lines = ["Registers:"]
for name, value in entries:
lines.append(f"{name:>{width}}: 0x{value:08x}")
if self.session:
self.session.record_pc_report()
return lines
def _gather_registers(self):
getter = getattr(self.link, "register_read", None)
if not callable(getter):
return []
register_entries = []
reg_list = []
list_func = getattr(self.link, "register_list", None)
name_func = getattr(self.link, "register_name", None)
if callable(list_func) and callable(name_func):
try:
indices = list_func()
for idx in indices:
try:
name = name_func(idx)
except Exception:
continue
reg_list.append((name, idx))
except Exception:
reg_list = []
if not reg_list:
ordered = [
"PC",
"LR",
"SP",
"FP",
"R12",
"R11",
"R10",
"R9",
"R8",
"R7",
"R6",
"R5",
"R4",
"R3",
"R2",
"R1",
"R0",
"xPSR",
]
reg_list = [(name, name) for name in ordered]
for name, key in reg_list:
try:
value = getter(key)
except Exception:
continue
if value is None:
continue
text = name.decode("utf-8", errors="ignore") if isinstance(name, bytes) else str(name)
register_entries.append((text.strip(), int(value) & 0xFFFFFFFF))
unique = []
seen = set()
for name, value in register_entries:
if name in seen:
continue
seen.add(name)
unique.append((name, value))
return unique
class _ExpressionParser:
_NUMBER_RE = re.compile(
r"""
(?P<value>
0[xX][0-9a-fA-F_]+ |
0[bB][01_]+ |
0[oO][0-7_]+ |
0[dD][0-9_]+ |
\d[\d_]*
)
(?P<suffix>[uUlL]*)
""",
re.VERBOSE,
)
def __init__(self, text: str, deref_cb):
self.text = text or ""
self.length = len(self.text)
self.pos = 0
self._deref = deref_cb
def parse(self) -> int:
value = self._parse_or()
self._skip_ws()
if self.pos != self.length:
raise ValueError(f"Unexpected token at position {self.pos + 1}")
return value
def _skip_ws(self):
while self.pos < self.length and self.text[self.pos].isspace():
self.pos += 1
def _match(self, token: str) -> bool:
self._skip_ws()
if self.text.startswith(token, self.pos):
self.pos += len(token)
return True
return False
def _parse_or(self) -> int:
value = self._parse_xor()
while True:
if self._match("|"):
value |= self._parse_xor()
else:
break
return value
def _parse_xor(self) -> int:
value = self._parse_and()
while True:
if self._match("^"):
value ^= self._parse_and()
else:
break
return value
def _parse_and(self) -> int:
value = self._parse_shift()
while True:
if self._match("&"):
value &= self._parse_shift()
else:
break
return value
def _parse_shift(self) -> int:
value = self._parse_add()
while True:
if self._match("<<"):
rhs = self._parse_add()
value <<= rhs
elif self._match(">>"):
rhs = self._parse_add()
value >>= rhs
else:
break
return value
def _parse_add(self) -> int:
value = self._parse_mul()
while True:
if self._match("+"):
value += self._parse_mul()
elif self._match("-"):
value -= self._parse_mul()
else:
break
return value
def _parse_mul(self) -> int:
value = self._parse_unary()
while True:
if self._match("*"):
value *= self._parse_unary()
elif self._match("/"):
rhs = self._parse_unary()
if rhs == 0:
raise ValueError("Division by zero")
value //= rhs
elif self._match("%"):
rhs = self._parse_unary()
if rhs == 0:
raise ValueError("Modulo by zero")
value %= rhs
else:
break
return value
def _parse_unary(self) -> int:
if self._match("+"):
return self._parse_unary()
if self._match("-"):
return -self._parse_unary()
if self._match("~"):
return ~self._parse_unary()
if self._match("*"):
address = self._parse_unary()
return self._deref(address)
return self._parse_primary()
def _parse_primary(self) -> int:
if self._match("("):
value = self._parse_or()
if not self._match(")"):
raise ValueError("Expected ')'")
return value
number = self._consume_number()
if number is None:
raise ValueError(f"Expected number at position {self.pos + 1}")
return number
def _consume_number(self) -> Optional[int]:
self._skip_ws()
match = self._NUMBER_RE.match(self.text, self.pos)
if not match:
return None
self.pos = match.end()
token = match.group("value")
suffix = match.group("suffix") or ""
token = token.rstrip("uUlL") if not suffix else token
token_lower = token.lower()
if token_lower.startswith("0d"):
digits = token[2:]
if not digits:
raise ValueError("Invalid decimal literal")
return int(digits.replace("_", ""), 10)
if token_lower.startswith("0b"):
return int(token_lower, 2)
if token_lower.startswith("0o"):
return int(token_lower, 8)
return int(token, 0)
def _parse_expression(self, text: str) -> int:
parser = self._ExpressionParser(text, self._deref_u32)
try:
return parser.parse()
except ValueError as exc:
raise ValueError(f"Unable to parse integer value '{text}': {exc}") from exc
def _parse_int(self, text: str) -> int:
return self._parse_expression(text) & 0xFFFFFFFF
def _deref_u32(self, address: int) -> int:
reader = getattr(self.link, "memory_read32", None)
if not callable(reader):
raise ValueError("memory_read32 is not available for expressions using '*'")
addr = int(address) & 0xFFFFFFFF
try:
values = reader(addr, 1) or []
except Exception as exc:
raise ValueError(f"Unable to read 32-bit value at {addr:#010x}: {exc}") from exc
if not values:
raise ValueError(f"No data returned when reading 32-bit value at {addr:#010x}")
return int(values[0]) & 0xFFFFFFFF
def _read_memory(self, address: int, width: int, count: int):
bits = width * 8
func = getattr(self.link, f"memory_read{bits}", None)
if not func:
raise RuntimeError(f"memory_read{bits} not supported")
values = func(address, count) or []
mask = (1 << bits) - 1
lines = []
for idx, raw in enumerate(values):
value = raw & mask
addr = address + idx * width
lines.append(f"{addr:#010x}: 0x{value:0{width * 2}x}")
if not lines:
lines.append(f"No data returned when reading {count} value(s) at {address:#010x}.")
return lines
def _write_memory(self, address: int, value: int, width: int) -> None:
bits = width * 8
func = getattr(self.link, f"memory_write{bits}", None)
if not func:
raise RuntimeError(f"memory_write{bits} not supported")
mask = (1 << bits) - 1
func(address, [value & mask])
class DebuggerSession:
def __init__(self, notebook, panel, ap_info):
self.panel = panel
self.ap_info = dict(ap_info)
self.label = self.ap_info.get("label", "AP")
self.core = self.ap_info.get("core", "Unknown")
self.frame = ttk.Frame(notebook)
self.status_var = tk.StringVar(value="Disconnected")
self.script_path_var = tk.StringVar()
self.log_text = None
self.history_listbox = None
self.command_entry = None
self.command_entry_var = None
self.send_button = None
self.breakpoint_button = None
self.connect_button = None
self.status_label = None
self.find_bar: Optional[ConsoleFindBar] = None
self.msg_queue = deque()
self.msg_lock = threading.Lock()
self.log_line_count = 0
self.command_history = []
self.history_index = 0
self.command_wrapper = None
self.connected = False
self._script_path = None
self.breakpoints: dict[int, Optional[int]] = {}
self._last_pc_report = 0.0
self._build_ui()
self._load_history_cache()
self.enable_controls(False)
def _build_ui(self):
header = ttk.Frame(self.frame)
header.pack(fill="x", padx=5, pady=5)
ttk.Label(header, text=f"{self.label} ({self.core})").grid(row=0, column=0, sticky="w")
self.connect_button = ttk.Button(header, text="Connect", width=12, command=self.toggle_connection)
self.connect_button.grid(row=0, column=1, sticky="e", padx=(8, 0))
self.status_label = ttk.Label(header, textvariable=self.status_var, foreground="red")
self.status_label.grid(row=1, column=0, columnspan=2, sticky="w", pady=(4, 0))
script_frame = ttk.Frame(self.frame)
script_frame.pack(fill="x", padx=5, pady=(0, 5))
ttk.Label(script_frame, text="Script:").grid(row=0, column=0, sticky="w")
self.script_entry = ttk.Entry(script_frame, textvariable=self.script_path_var)
self.script_entry.grid(row=0, column=1, sticky="ew", padx=3)
ttk.Button(script_frame, text="Browse...", command=self.browse_script_file).grid(row=0, column=2, padx=(0, 5))
ttk.Button(script_frame, text="Run Script", command=self.apply_script_file).grid(row=0, column=3)
script_frame.columnconfigure(1, weight=1)
console_frame = ttk.Frame(self.frame)
console_frame.pack(fill="both", expand=True, padx=5, pady=5)
console_frame.columnconfigure(0, weight=1)
console_frame.rowconfigure(0, weight=1)
self.log_text = tk.Text(console_frame, height=10, wrap="none")
self.log_text.grid(row=0, column=0, sticky="nsew")
self.log_text.configure(state="disabled")
vscroll = ttk.Scrollbar(console_frame, orient="vertical", command=self.log_text.yview)
vscroll.grid(row=0, column=1, sticky="ns")
self.log_text.configure(yscrollcommand=vscroll.set)
hscroll = ttk.Scrollbar(console_frame, orient="horizontal", command=self.log_text.xview)
hscroll.grid(row=1, column=0, sticky="ew")
self.log_text.configure(xscrollcommand=hscroll.set)
self.find_bar = ConsoleFindBar(console_frame, self.log_text)
self.find_bar.grid(row=2, column=0, columnspan=2, sticky="ew", pady=(4, 0))
button_row = ttk.Frame(console_frame)
button_row.grid(row=3, column=0, columnspan=2, sticky="ew", pady=(5, 0))
button_row.columnconfigure(0, weight=1)
buttons = ttk.Frame(button_row)
buttons.grid(row=0, column=0, sticky="w")
ttk.Button(buttons, text="Clear Console", command=self.clear_console, width=14).grid(row=0, column=0, padx=(0, 5))
ttk.Button(buttons, text="Save Log", command=self.save_console, width=14).grid(row=0, column=1)
self.breakpoint_button = ttk.Button(buttons, text="Breakpoints", command=self.show_breakpoints, width=14)
self.breakpoint_button.grid(row=0, column=2, padx=(5, 0))
history_frame = ttk.Frame(console_frame)
history_frame.grid(row=4, column=0, sticky="nsew", pady=(5, 0))
history_frame.columnconfigure(0, weight=1)
history_frame.rowconfigure(1, weight=1)
ttk.Label(history_frame, text="Command history:").grid(row=0, column=0, sticky="w")
self.history_listbox = tk.Listbox(history_frame, height=6, activestyle="none")
self.history_listbox.grid(row=1, column=0, sticky="nsew")
self.history_listbox.bind("<Double-Button-1>", self.history_select)
self.history_listbox.bind("<Key>", self._history_keypress)
history_scroll = ttk.Scrollbar(history_frame, orient="vertical", command=self.history_listbox.yview)
history_scroll.grid(row=1, column=1, sticky="ns")
self.history_listbox.configure(yscrollcommand=history_scroll.set)
input_frame = ttk.Frame(console_frame)
input_frame.grid(row=5, column=0, sticky="ew", pady=(5, 0))
input_frame.columnconfigure(0, weight=1)
self.command_entry_var = tk.StringVar()
self.command_entry = ttk.Entry(input_frame, textvariable=self.command_entry_var)
self.command_entry.grid(row=0, column=0, sticky="ew", padx=(0, 5))
self.command_entry.bind("<Return>", lambda _evt: self.send_command())
self.command_entry.bind("<Up>", self._history_prev)
self.command_entry.bind("<Down>", self._history_next)
self.send_button = ttk.Button(input_frame, text="Send", command=self.send_command)
self.send_button.grid(row=0, column=1)
def set_unavailable(self, reason: str):
self._set_status(reason, "red")
if self.connect_button:
self.connect_button.configure(state="disabled")
self.enable_controls(False)
def ensure_script_file(self) -> str:
if self._script_path:
return self._script_path
script = self.ap_info.get("script")
if not script:
return ""
fd, path = tempfile.mkstemp(prefix=f"jlink_{self.label.lower()}_", suffix=".jlink")
with os.fdopen(fd, "w", encoding="utf-8") as handle:
handle.write(script)
self._script_path = path
return path
def cleanup_script(self):
if not self._script_path:
return
try:
os.unlink(self._script_path)
except Exception:
pass
self._script_path = None
def enable_controls(self, enabled: bool):
state = "normal" if enabled else "disabled"
if self.command_entry:
self.command_entry.configure(state=state)
if self.send_button:
self.send_button.configure(state=state)
if self.breakpoint_button:
self.breakpoint_button.configure(state=state)
def queue_message(self, text: str):
if not text:
return
with self.msg_lock:
self.msg_queue.append(str(text))
def poll_queue(self):
with self.msg_lock:
if not self.msg_queue:
return
entries = list(self.msg_queue)
self.msg_queue.clear()
self._log_entries(entries)
def _log_entries(self, entries):
if not self.log_text:
return
prepared = []
for entry in entries:
trimmed = str(entry).strip()
if not trimmed or trimmed.strip(".") == "":
continue
prepared.append(f"[{timestamp()}] {trimmed}\n")
if not prepared:
return
self.log_line_count += len(prepared)
block = "".join(prepared)
self.log_text.configure(state="normal")
self.log_text.insert("end", block)
if LOG_HISTORY_LIMIT and self.log_line_count > LOG_HISTORY_LIMIT:
excess = self.log_line_count - LOG_HISTORY_LIMIT
self.log_text.delete("1.0", f"{excess + 1}.0")
self.log_line_count = LOG_HISTORY_LIMIT
self.log_text.see("end")
self.log_text.configure(state="disabled")
def clear_console(self):
if not self.log_text:
return
with self.msg_lock:
self.msg_queue.clear()
self.log_text.configure(state="normal")
self.log_text.delete("1.0", "end")
self.log_text.configure(state="disabled")
self.log_line_count = 0
self._clear_find_highlight()
self._clear_find_highlight()
def save_console(self):
if not self.log_text:
return
try:
contents = self.log_text.get("1.0", "end-1c")
except tk.TclError:
contents = ""
if not contents.strip():
messagebox.showinfo("Save Console Log", "There is no console output to save.")
return
path = filedialog.asksaveasfilename(
title="Save Console Log",
defaultextension=".txt",
filetypes=(("Text Files", "*.txt"), ("All Files", "*.*")),
)
if not path:
return
try:
with open(path, "w", encoding="utf-8") as handle:
handle.write(contents)
except Exception as exc:
messagebox.showerror("Save Failed", f"Unable to save log: {exc}")
def _clear_find_highlight(self) -> None:
if self.find_bar:
self.find_bar.clear_highlight()
def show_breakpoints(self):
if not self.breakpoints:
self.queue_message("[bp] No breakpoints set.")
return
entries = sorted(self.breakpoints.items())
lines = ["[bp] Active breakpoints:"]
for addr, handle in entries:
if handle is not None:
lines.append(f"[bp] {addr:#010x} (handle {handle})")
else:
lines.append(f"[bp] {addr:#010x}")
for line in lines:
self.queue_message(line)
def toggle_connection(self):
if self.connected:
self.panel.detach_session(self)
return
self.panel.attach_session(self)
def mark_connected(self, serial_no: int):
self.connected = True
self._set_status(f"Connected — SN {serial_no}", "green")
if self.connect_button:
self.connect_button.configure(text="Disconnect", state="normal")
self.enable_controls(True)
def mark_disconnected(self):
self.connected = False
self.command_wrapper = None
self._set_status("Disconnected", "red")
if self.connect_button:
self.connect_button.configure(text="Connect", state="normal")
self.enable_controls(False)
def browse_script_file(self):
path = filedialog.askopenfilename(
title="Select Script",
filetypes=(("Scripts", "*.jlink *.jscr *.jsf *.gdb *.gdbinit *.txt"), ("All Files", "*.*")),
)
if not path:
return
self.script_path_var.set(path)
self._process_script_file(path, run_now=False, show_dialog=False)
def apply_script_file(self):
path = (self.script_path_var.get() or "").strip()
if not path:
messagebox.showinfo("Script File", "Enter or select a script file first.")
return
self._process_script_file(path, run_now=True, show_dialog=False)
def _process_script_file(self, path, run_now, show_dialog):
normalized = os.path.abspath(path)
if not os.path.isfile(normalized):
msg = f"Script file not found: {normalized}"
self.queue_message(f"[script] {msg}")
if show_dialog:
messagebox.showerror("Script File Not Found", msg)
return False
self.script_path_var.set(normalized)
script_type = self._detect_script_type(normalized)
filename = os.path.basename(normalized)
if script_type == "gdb":
if not self.connected or not self.command_wrapper:
info = f"GDB script '{filename}' stored. Connect and run it manually."
self.queue_message(f"[script] {info}")
if show_dialog:
messagebox.showinfo("GDB Script", info)
return False
if not run_now:
info = f"GDB script '{filename}' ready. Click Run Script to execute."
self.queue_message(f"[script] {info}")
return True
ok, message = self.panel.run_gdb_script(self, normalized)
if not ok:
messagebox.showerror("GDB Script", message)
return ok
if not self.connected:
info = f"Stored J-Link script '{filename}'. It will load after connecting."
self.queue_message(f"[script] {info}")
if show_dialog:
messagebox.showinfo("J-Link Script", info)
return True
ok, message = self.panel.set_shared_script(normalized, session=self)
if not ok and show_dialog:
messagebox.showerror("J-Link Script", message)
return ok
def _detect_script_type(self, path):
ext = os.path.splitext(path)[1].lower()
if ext in GDB_SCRIPT_EXTS:
return "gdb"
return "jlink"
def send_command(self):
if not self.command_entry_var:
return
command = self.command_entry_var.get().strip()
self._dispatch_command(command, add_to_history=True)
def _dispatch_command(self, command, add_to_history: bool):
if not command:
return
if not self.connected:
messagebox.showwarning("Not Connected", "Connect to the debugger before sending commands.")
return
if add_to_history and self.history_listbox:
self.history_listbox.insert("end", command)
if add_to_history:
self._append_history_entry(command)
self.command_entry_var.set("")
self.queue_message(f"> {command}")
threading.Thread(target=self.panel.run_session_command, args=(self, command), daemon=True).start()
def history_select(self, _event):
if not self.history_listbox or not self.command_entry_var:
return
sel = self.history_listbox.curselection()
if not sel:
return
command = self.history_listbox.get(sel[0])
self.command_entry_var.set(command)
if self.command_entry:
try:
self.command_entry.icursor("end")
except tk.TclError:
pass
self._dispatch_command(command, add_to_history=False)
def _history_prev(self, _event):
self._navigate_history(-1)
return "break"
def _history_next(self, _event):
self._navigate_history(1)
return "break"
def _navigate_history(self, delta: int):
if not self.command_history or not self.command_entry_var:
return
new_index = max(0, min(len(self.command_history), self.history_index + delta))
self.history_index = new_index
value = "" if new_index == len(self.command_history) else self.command_history[new_index]
self.command_entry_var.set(value)
if self.command_entry:
try:
self.command_entry.icursor("end")
except tk.TclError:
pass
def _append_history_entry(self, command: str) -> None:
self.command_history.append(command)
if len(self.command_history) > HISTORY_LIMIT:
self.command_history.pop(0)
if self.history_listbox and self.history_listbox.size() > HISTORY_LIMIT:
self.history_listbox.delete(0)
self.history_index = len(self.command_history)
self._persist_history_cache()
def _history_keypress(self, event) -> None:
if (event.keysym or "").lower() == "delete":
self._delete_history_entry()
return "break"
return None
def _delete_history_entry(self) -> None:
if not self.history_listbox:
return
sel = self.history_listbox.curselection()
if not sel:
return
idx = sel[0]
if idx >= self.history_listbox.size():
return
self.history_listbox.delete(idx)
if 0 <= idx < len(self.command_history):
del self.command_history[idx]
if self.history_index > idx:
self.history_index -= 1
self.history_index = min(self.history_index, len(self.command_history))
self._persist_history_cache()
def _history_storage_key(self) -> str:
label = self.label or self.ap_info.get("label", "AP")
core = self.ap_info.get("core", "core")
return f"debugger:{label}:{core}"
def _load_history_cache(self) -> None:
entries = _load_history_store().get(self._history_storage_key(), [])
cleaned: List[str] = []
for cmd in entries[-HISTORY_LIMIT:]:
if not isinstance(cmd, str):
continue
text = cmd.strip()
if not text:
continue
cleaned.append(cmd)
self.command_history = cleaned[-HISTORY_LIMIT:]
self.history_index = len(self.command_history)
if self.history_listbox:
self.history_listbox.delete(0, "end")
for cmd in self.command_history:
self.history_listbox.insert("end", cmd)
def _persist_history_cache(self) -> None:
data = _load_history_store()
data[self._history_storage_key()] = [cmd for cmd in self.command_history[-HISTORY_LIMIT:] if isinstance(cmd, str) and cmd.strip()]
_persist_history_store(data)
def _set_status(self, text: str, color: str):
self.status_var.set(text)
if self.status_label:
try:
self.status_label.configure(foreground=color)
except tk.TclError:
pass
def shutdown(self):
if self.connected:
try:
self.panel.detach_session(self)
except Exception:
pass
self.cleanup_script()
def _emit_command_result(self, result):
if result is None:
return
if isinstance(result, tuple) and len(result) == 2:
status, response = result
self.queue_message(f"[pylink] Result code: {status}")
if response:
self._emit_response_lines(response)
return
self._emit_response_lines(result)
def _emit_response_lines(self, response):
text = str(response)
for line in text.splitlines():
trimmed = line.strip()
if trimmed:
self.queue_message(trimmed)
def _emit_wrapper_result(self, result):
if result is None:
return
if isinstance(result, (list, tuple)):
for entry in result:
trimmed = str(entry).strip()
if trimmed:
self.queue_message(trimmed)
else:
trimmed = str(result).strip()
if trimmed:
self.queue_message(trimmed)
def register_breakpoint(self, addr: int, handle):
normalized = self._normalize_breakpoint_addr(addr)
self.breakpoints[normalized] = handle
def unregister_breakpoint(self, addr: int):
normalized = self._normalize_breakpoint_addr(addr)
self.breakpoints.pop(normalized, None)
def get_breakpoint_handle(self, addr: int):
return self.breakpoints.get(self._normalize_breakpoint_addr(addr))
def list_breakpoints(self):
return list(self.breakpoints.keys())
def clear_breakpoints(self):
self.breakpoints.clear()
def _normalize_breakpoint_addr(self, value: int) -> int:
return int(value) & 0xFFFFFFFF
def record_pc_report(self):
self._last_pc_report = time.monotonic()
def should_suppress_pc_report(self) -> bool:
if self._last_pc_report <= 0:
return False
return (time.monotonic() - self._last_pc_report) < 0.3
def has_breakpoint(self, addr: int) -> bool:
return self._normalize_breakpoint_addr(addr) in self.breakpoints
class SharedDebuggerController:
def __init__(self, panel):
self.panel = panel
self.link = None
self.serial = None
self.speed = None
self._link_lock = threading.Lock()
self._command_lock = threading.Lock()
self._sessions = set()
self._current_session = None
self._monitor_thread = None
self._monitor_stop = threading.Event()
self._monitor_last_halted = False
def attach(self, session, serial_no, speed):
with self._link_lock:
if self.link is None:
ok, msg = self._open_link(serial_no, speed)
if not ok:
return False, msg
elif self.serial != serial_no:
return False, f"Debugger already connected to emulator {self.serial}."
self._sessions.add(session)
session.command_wrapper = JLinkGDBWrapper(self.link, session)
ok, msg = self.ensure_session_ready(session)
if not ok:
self.detach(session)
return False, msg
return True, None
def detach(self, session):
with self._link_lock:
self._sessions.discard(session)
if self._current_session is session:
self._current_session = None
if not self._sessions:
self._close_link()
def _open_link(self, serial_no, speed):
if not self.panel.is_pylink_available():
return False, "pylink not available."
try:
link = pylink.JLink()
link.open(serial_no)
interface = self.panel._resolve_swd_interface()
if interface is not None:
link.set_tif(interface)
link.set_speed(speed)
except Exception as exc:
return False, str(exc)
self.link = link
self.serial = serial_no
self.speed = speed
self._start_monitor()
return True, None
def _close_link(self):
if self.link:
try:
self.link.close()
except Exception:
pass
self.link = None
self.serial = None
self.speed = None
self._current_session = None
self._stop_monitor()
def ensure_session_ready(self, session):
with self._command_lock:
ok, msg = self._apply_context(session)
return ok, msg
def execute(self, session, command):
if not self.link:
session.queue_message("[pylink] Shared link is not available.")
return
with self._command_lock:
ok, msg = self._apply_context(session)
if not ok:
if msg:
session.queue_message(f"[pylink] {msg}")
return
try:
handled = False
result = None
if session.command_wrapper:
handled, result = session.command_wrapper.execute(command)
else:
result = self.link.exec_command(command)
if handled:
session._emit_wrapper_result(result)
else:
session._emit_command_result(result)
except Exception as exc:
session.queue_message(f"[pylink] Command failed: {exc}")
def run_gdb_script(self, session, path):
if not self.link:
return False, "Debugger not connected."
try:
with open(path, "r", encoding="utf-8") as handle:
lines = handle.readlines()
except Exception as exc:
msg = f"Unable to read script '{path}': {exc}"
session.queue_message(f"[script] {msg}")
return False, msg
commands = []
for line in lines:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if stripped.lower().startswith("monitor "):
stripped = stripped[8:].strip()
commands.append(stripped)
if not commands:
msg = f"No commands found in '{os.path.basename(path)}'."
session.queue_message(f"[script] {msg}")
return True, msg
with self._command_lock:
ok, ctx_msg = self._apply_context(session)
if not ok:
return False, ctx_msg or "Unable to configure debugger context."
for command in commands:
session.queue_message(f"[script] gdb> {command}")
try:
if session.command_wrapper:
handled, result = session.command_wrapper.execute(command)
else:
handled = False
result = self.link.exec_command(command)
except Exception as exc:
msg = f"Command '{command}' failed: {exc}"
session.queue_message(f"[script] {msg}")
return False, msg
if handled:
session._emit_wrapper_result(result)
else:
session._emit_command_result(result)
msg = f"Executed {len(commands)} command(s) from '{os.path.basename(path)}'."
session.queue_message(f"[script] {msg}")
return True, msg
def set_script(self, path):
if not self.link:
return False, "Debugger not connected."
try:
self.link.set_script_file(path)
except Exception as exc:
msg = f"Unable to load J-Link script '{path}': {exc}"
return False, msg
self._current_session = None
return True, f"Loaded J-Link script '{os.path.basename(path)}'."
def _apply_context(self, session):
if not self.link:
return False, "J-Link is not connected."
if self._current_session is session:
return True, None
reinitialize = self._current_session is not None
ok, msg = self._configure_link(session, reinitialize)
if not ok:
return False, msg
self._current_session = session
if session.command_wrapper:
session.command_wrapper.reinstall_breakpoints()
return True, None
def _configure_link(self, session, reinitialize: bool):
if reinitialize:
serial = self.serial
speed = self.speed
if serial is None or speed is None:
return False, "Debugger is not initialized."
try:
if self.link:
self.link.close()
except Exception:
pass
self.link = None
ok, msg = self._open_link(serial, speed)
if not ok:
return False, msg or "Unable to reopen J-Link."
self._refresh_wrappers()
else:
try:
self.link.disconnect()
except Exception:
pass
script_path = session.ensure_script_file()
if script_path:
try:
self.link.set_script_file(script_path)
except Exception as exc:
return False, f"Unable to apply script: {exc}"
targets = session.ap_info.get("targets") or [session.core]
target = targets[0]
try:
try:
self.link.connect(target, speed=self.speed)
except TypeError:
self.link.connect(chip_name=target, speed=self.speed)
except Exception as exc:
return False, f"Unable to connect to {target}: {exc}"
return True, None
def _refresh_wrappers(self):
if not self.link:
return
for sess in list(self._sessions):
if sess.connected:
sess.command_wrapper = JLinkGDBWrapper(self.link, sess)
def _start_monitor(self):
if self._monitor_thread and self._monitor_thread.is_alive():
return
self._monitor_stop.clear()
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
def _stop_monitor(self):
self._monitor_stop.set()
thread = self._monitor_thread
if thread and thread.is_alive():
thread.join(timeout=0.5)
self._monitor_thread = None
self._monitor_last_halted = False
def _monitor_loop(self):
while not self._monitor_stop.is_set():
if not self.link:
if self._monitor_stop.wait(0.2):
break
continue
with self._command_lock:
halted = False
try:
halted = bool(self.link.halted())
except Exception:
halted = False
if halted and not self._monitor_last_halted:
self._report_monitor_pc()
self._monitor_last_halted = halted
if self._monitor_stop.wait(0.2):
break
def _report_monitor_pc(self):
session = self._current_session
if not session or session.should_suppress_pc_report():
return
pc = self._read_pc_locked()
if pc is None:
session.queue_message("[pc] CPU halted.")
else:
session.queue_message(f"[pc] Halted at 0x{pc:08x}")
session.record_pc_report()
def _read_pc_locked(self):
if not self.link:
return None
getter = getattr(self.link, "register_read", None)
if not callable(getter):
return None
for key in ("PC", "pc", 15):
try:
value = getter(key)
if value is not None:
return int(value) & 0xFFFFFFFF
except Exception:
continue
return None
class PylinkDebuggerPanel:
def __init__(self, parent, dll_path=None):
self.parent = parent
self.dll_path = dll_path if dll_path and os.path.exists(dll_path) else None
self.frame = ttk.LabelFrame(parent, text="Pylink Debugger")
self.device_var = tk.StringVar()
self.speed_var = tk.StringVar(value="4000")
self.status_var = tk.StringVar(value="")
self.device_combo = None
self.status_label = None
self.notebook = None
self.sessions: List[DebuggerSession] = []
self._device_display = {}
self._device_refresh_after = None
self._device_error_reported = False
self._pylink_available = pylink is not None
self.controller = SharedDebuggerController(self)
self.ap_definitions = self._build_ap_definitions()
self._build_widgets()
self.refresh_devices()
self._schedule_device_refresh()
if not self._pylink_available:
self._set_status("pylink not installed", "red")
self.queue_message("pylink module is missing; install 'pylink-square' to use the debugger.")
for session in self.sessions:
session.set_unavailable("pylink not installed")
elif not self.is_dll_available():
self._set_status("J-Link DLL missing", "orange")
self.queue_message("pylink/JLinkARM.dll not found; copy the DLL into the 'pylink' folder.")
for session in self.sessions:
session.set_unavailable("J-Link DLL missing")
def _build_widgets(self):
conn_frame = ttk.Frame(self.frame)
conn_frame.pack(fill="x", padx=5, pady=5)
conn_frame.columnconfigure(1, weight=1)
ttk.Label(conn_frame, text="J-Link:").grid(row=0, column=0, sticky="w")
self.device_combo = ttk.Combobox(conn_frame, textvariable=self.device_var, width=45, state="readonly")
self.device_combo.grid(row=0, column=1, sticky="ew", padx=3)
ttk.Button(conn_frame, text="Help", command=self.show_help).grid(row=0, column=2, padx=(3, 0))
ttk.Label(conn_frame, text="Speed (kHz):").grid(row=1, column=0, sticky="w", pady=(6, 0))
ttk.Entry(conn_frame, textvariable=self.speed_var, width=10).grid(row=1, column=1, sticky="w", padx=3, pady=(6, 0))
self.status_label = ttk.Label(conn_frame, textvariable=self.status_var, foreground="red")
self.status_label.grid(row=2, column=0, columnspan=2, sticky="w", pady=(6, 0))
self.notebook = ttk.Notebook(self.frame)
self.notebook.pack(fill="both", expand=True, padx=5, pady=(0, 5))
for ap in self.ap_definitions:
session = DebuggerSession(self.notebook, self, ap)
self.sessions.append(session)
self.notebook.add(session.frame, text=self._format_session_title(ap))
def _format_session_title(self, ap_def: dict) -> str:
label = ap_def.get("label", "AP")
core = ap_def.get("core", "Unknown")
return f"{label} - {core}"
def _build_ap_definitions(self) -> List[dict]:
entries = [
{"label": "NP", "core": "Cortex-M33", "ap_index": 0, "targets": ["Cortex-M33"], "extras": []},
{"label": "MP", "core": "Cortex-M33", "ap_index": 1, "targets": ["Cortex-M33"], "extras": []},
{"label": "FP", "core": "Cortex-M23", "ap_index": 2, "targets": ["Cortex-M23"], "extras": []},
{
"label": "CA32",
"core": "Cortex-A32",
"ap_index": 3,
"targets": ["Cortex-A32"],
"extras": [
" CORESIGHT_CoreBaseAddr = 0x80210000;",
' JLINK_ExecCommand("CORESIGHT_SetCSCTICoreBaseAddr = 0x80218000");',
],
},
]
results = []
for ap in entries:
entry = dict(ap)
entry["script"] = self._build_ap_script(entry)
results.append(entry)
return results
def _build_ap_script(self, ap_info: dict) -> str:
cpu = self._cpu_constant(ap_info.get("core", "Cortex-M33"))
ap_index = ap_info.get("ap_index", 0)
extras = ap_info.get("extras") or []
lines = [
"int InitTarget(void) {",
" JLINK_CORESIGHT_AddAP(0, CORESIGHT_AHB_AP);",
" JLINK_CORESIGHT_AddAP(1, CORESIGHT_AHB_AP);",
" JLINK_CORESIGHT_AddAP(2, CORESIGHT_AHB_AP);",
" JLINK_CORESIGHT_AddAP(3, CORESIGHT_APB_AP);",
" JLINK_CORESIGHT_AddAP(4, CORESIGHT_AHB_AP);",
f" CPU = {cpu};",
f" CORESIGHT_IndexAHBAPToUse = {ap_index};",
]
lines.extend(extras)
lines.append(" return 0;")
lines.append("}")
return "\n".join(lines)
def _cpu_constant(self, core_name: str) -> str:
mapping = {
"Cortex-M33": "CORTEX_M33",
"Cortex-M23": "CORTEX_M23",
"Cortex-A32": "CORTEX_A32",
}
return mapping.get(core_name, "CORTEX_M33")
def _resolve_swd_interface(self):
if pylink is None:
return None
try:
return pylink.enums.JLinkInterfaces.SWD
except Exception:
try:
return pylink.JLinkInterfaces.SWD
except Exception:
return None
def attach_session(self, session: DebuggerSession):
if not self.is_pylink_available():
session.set_unavailable("pylink not installed")
return
if not self.is_dll_available():
messagebox.showerror("J-Link DLL Missing", "Copy pylink/JLinkARM.dll before connecting.")
session.set_unavailable("J-Link DLL missing")
return
device = self.get_selected_device()
if not device:
messagebox.showwarning("J-Link Not Found", "Select a connected J-Link before connecting.")
return
serial = device.get("serial")
speed = self.validate_speed()
if speed is None or serial is None:
return
ok, msg = self.controller.attach(session, serial, speed)
if not ok:
messagebox.showerror("Pylink Connection Failed", msg)
session.queue_message(f"[error] {msg}")
return
session.mark_connected(serial)
session.queue_message(f"Attached to emulator {serial} at {speed} kHz.")
def detach_session(self, session: DebuggerSession):
session.mark_disconnected()
self.controller.detach(session)
def run_session_command(self, session: DebuggerSession, command: str):
self.controller.execute(session, command)
def run_gdb_script(self, session: DebuggerSession, path: str):
return self.controller.run_gdb_script(session, path)
def set_shared_script(self, path: str, session: Optional[DebuggerSession] = None):
ok, msg = self.controller.set_script(path)
target_session = session or None
if target_session:
target_session.queue_message(f"[script] {msg}")
else:
self.queue_message(f"[script] {msg}")
return ok, msg
def _set_status(self, text: str, color: str):
self.status_var.set(text)
if self.status_label:
try:
self.status_label.configure(foreground=color)
except tk.TclError:
pass
def is_pylink_available(self) -> bool:
return self._pylink_available
def is_dll_available(self) -> bool:
return bool(self.dll_path and os.path.exists(self.dll_path))
def get_selected_device(self):
label = (self.device_var.get() or "").strip()
return self._device_display.get(label)
def validate_speed(self) -> Optional[int]:
try:
speed = int((self.speed_var.get() or "").strip())
if speed <= 0:
raise ValueError
return speed
except ValueError:
messagebox.showwarning("Invalid Speed", "Enter a positive integer speed in kHz.")
return None
def queue_message(self, text: str):
if not text:
return
for session in self.sessions:
session.queue_message(text)
def show_help(self):
sections = [
(
"Memory Access",
[
("x <addr>", "Read a 32-bit word at <addr>. Use x/COUNT{b|h|w|g} for other sizes/counts."),
("set *<addr>=<value>", "Write a 32-bit value to <addr>."),
],
),
(
"Execution Control",
[
("halt / h", "Stop the current core."),
("go / g / resume / run", "Resume execution from the current PC."),
("reset / r", "Reset and continue."),
("reset_halt / rh", "Reset and halt immediately."),
("step / s / si", "Single-step one instruction."),
("next / n / ni", "Step over calls (uses software breakpoint when needed)."),
("finish / fin / so", "Step out of the current function."),
("i r", "Show current PC and general-purpose registers."),
("sleep <ms> / delay <ms>", "Pause for <ms> milliseconds (accepts 's' or 'us' units)."),
],
),
(
"Breakpoints",
[
("b <addr>", "Set breakpoint at <addr>."),
("bc <addr>", "Clear the breakpoint at <addr>."),
],
),
(
"J-Link Exec",
[
("Any other text", "Forwarded directly to J-Link 'exec' (ex: regs, mem32, log)."),
],
),
(
"Scripts",
[
("Run Script", "Load .jlink or .gdb files for the selected tab."),
("monitor <cmd>", "In .gdb files, these lines send <cmd> directly to J-Link."),
],
),
(
"Sessions",
[
("Per-tab isolation", "Each core tab owns its connection, history, and log."),
],
),
]
lines = ["Debugger command reference", ""]
for title, entries in sections:
lines.append(f"{title}:")
for cmd, desc in entries:
lines.append(f" {cmd}")
lines.append(f" {desc}")
lines.append("")
text = "\n".join(lines).rstrip()
try:
messagebox.showinfo("Debugger Help", text)
except tk.TclError:
pass
def poll_queue(self):
for session in self.sessions:
session.poll_queue()
def shutdown(self):
self._cancel_device_refresh()
for session in list(self.sessions):
session.shutdown()
def disconnect(self):
for session in list(self.sessions):
if session.connected:
self.detach_session(session)
def has_active_connections(self) -> bool:
return any(session.connected for session in self.sessions)
def refresh_devices(self):
entries = self._enumerate_emulators()
display = []
mapping = {}
for entry in entries:
serial, detail = self._extract_emulator_metadata(entry)
if serial is None:
continue
label = self._format_device_label(serial, detail)
display.append(label)
mapping[label] = {"serial": serial, "detail": detail}
self._device_display = mapping
if self.device_combo:
self.device_combo["values"] = display
current = self.device_var.get()
if display and current not in mapping:
self.device_var.set(display[0])
elif not display:
self.device_var.set("")
def _schedule_device_refresh(self):
if self._device_refresh_after is not None:
return
try:
self._device_refresh_after = self.frame.after(1000, self._auto_refresh_devices)
except tk.TclError:
self._device_refresh_after = None
def _auto_refresh_devices(self):
self._device_refresh_after = None
try:
if not self.frame.winfo_exists():
return
except tk.TclError:
return
self.refresh_devices()
self._schedule_device_refresh()
def _cancel_device_refresh(self):
if self._device_refresh_after is None:
return
try:
self.frame.after_cancel(self._device_refresh_after)
except tk.TclError:
pass
self._device_refresh_after = None
def _enumerate_emulators(self):
if not self._pylink_available:
return []
for provider in self._emulator_providers():
try:
entries = provider()
except Exception as exc:
if not self._device_error_reported:
self.queue_message(f"[pylink] Unable to enumerate J-Link: {exc}")
self._device_error_reported = True
continue
if entries:
self._device_error_reported = False
return entries
return []
def _emulator_providers(self):
providers = []
if hasattr(pylink, "JLink"):
def by_instance():
link = pylink.JLink()
try:
if hasattr(link, "list_emulators"):
return link.list_emulators()
return link.connected_emulators()
finally:
try:
link.close()
except Exception:
pass
providers.append(by_instance)
dll_module = getattr(pylink, "JLinkARMDll", None)
dll_class = getattr(dll_module, "JLinkARMDll", None) if dll_module else None
if dll_class:
def by_dll():
dll = dll_class()
try:
if hasattr(dll, "list_emulators"):
return dll.list_emulators()
finally:
try:
dll.close()
except Exception:
pass
providers.append(by_dll)
return providers
def _extract_emulator_metadata(self, entry):
serial = self._extract_serial(entry)
if serial is None:
return None, ""
product = self._extract_attr(entry, ("product", "Product", "model"))
connection = self._extract_attr(entry, ("connection", "Connection", "interface"))
if hasattr(connection, "name"):
connection = connection.name
parts = []
if product:
parts.append(str(product).strip())
if connection:
parts.append(str(connection).strip())
return serial, " / ".join(parts)
def _extract_serial(self, entry):
candidates = []
if isinstance(entry, dict):
candidates.extend([
entry.get("serial_number"),
entry.get("SerialNumber"),
entry.get("serial"),
entry.get("Serial"),
])
else:
candidates.extend([
getattr(entry, "serial_number", None),
getattr(entry, "SerialNumber", None),
getattr(entry, "serial", None),
getattr(entry, "Serial", None),
])
for value in candidates:
serial = safe_int(value)
if serial is not None:
return serial
return None
def _extract_attr(self, entry, keys):
for key in keys:
if isinstance(entry, dict) and key in entry:
return entry.get(key)
if hasattr(entry, key):
return getattr(entry, key)
return None
def _format_device_label(self, serial, detail):
serial_text = str(serial).strip()
info_parts = []
normalized = self._normalize_device_detail(detail or "")
if normalized:
info_parts.append(normalized)
ap_overview = " / ".join(f"{ap['label']} ({ap['core']})" for ap in self.ap_definitions)
if ap_overview:
info_parts.append(ap_overview)
if info_parts:
return f"{serial_text} - {' | '.join(info_parts)}"
return serial_text
def _normalize_device_detail(self, detail: str) -> str:
text = (detail or "").strip()
if text.endswith("- 1"):
text = text[:-3].rstrip()
return text
class DualUARTApp:
def __init__(self, root: tk.Tk) -> None:
self.root = root
version_suffix = f"v{APP_VERSION}" if APP_VERSION not in ("", "unknown") else "version unknown"
self.root.title(f"AmebaPro3 Control Panel — Powered by yiekheng, {version_suffix}")
self._closing = False
self._poll_after_id = None
self.flash_thread = None
self.arduino_flash_thread = None
self._arduino_reconnect_after_flash = False
self._icon_image = self._create_robot_icon()
if self._icon_image:
self.root.iconphoto(False, self._icon_image)
try:
self.root.geometry("1400x900")
self.root.minsize(1200, 720)
except tk.TclError:
pass
self._main_splitter = None
self._main_split_indicator = None
self._main_split_drag_start: Optional[int] = None
self._main_split_initial = 0
self._main_split_hover = False
self._main_split_active = False
paned = ttk.Panedwindow(root, orient="horizontal")
paned.pack(fill="both", expand=True, padx=10, pady=10)
self._main_splitter = paned
left_container = ttk.Frame(paned)
right_container = ttk.Frame(paned)
paned.add(left_container, weight=2)
paned.add(right_container, weight=3)
self.port_assignments: dict[str, str] = {}
self.dev1 = DevicePanel(
left_container,
title="Control Device",
mode="arduino",
default_baud="115200",
line_ending="\n",
console_height=10,
arduino_flash_handler=self.flash_arduino_sketch,
on_connect=lambda port, key="dev1": self._panel_connected(key, port),
on_disconnect=lambda port, key="dev1": self._panel_disconnected(key, port),
exclude_ports_provider=lambda key="dev1": self._get_excluded_ports(key),
before_button_callback=self._before_control_button,
)
self.dev1.frame.pack(fill="both", expand=True, padx=(0, 5))
self.debugger = PylinkDebuggerPanel(left_container, PYLINK_DLL_PATH)
self.debugger.frame.pack(fill="both", expand=True, padx=(0, 5), pady=(5, 0))
self.dev2 = DevicePanel(
right_container,
title="AmebaPro3",
mode="raw",
default_baud="1500000",
line_ending="\r\n",
flash_handler=self.flash_ameba,
flash_button_label="Flash Image",
on_connect=lambda port, key="dev2": self._panel_connected(key, port),
on_disconnect=lambda port, key="dev2": self._panel_disconnected(key, port),
exclude_ports_provider=lambda key="dev2": self._get_excluded_ports(key),
)
self.dev2.frame.pack(fill="both", expand=True, padx=(5, 0))
self._init_main_split_indicator()
self.poll_loop()
self.root.protocol("WM_DELETE_WINDOW", self.close)
# ------------------------------------------------------------------
def _create_robot_icon(self):
try:
icon = tk.PhotoImage(width=32, height=32)
except tk.TclError:
return None
icon.put("#1e1f2a", to=(0, 0, 32, 32))
icon.put("#9ca5c9", to=(6, 6, 26, 18))
icon.put("#3c3f58", to=(10, 18, 22, 28))
icon.put("#f5f7ff", to=(10, 10, 14, 14))
icon.put("#f5f7ff", to=(18, 10, 22, 14))
icon.put("#1e1f2a", to=(12, 12, 13, 13))
icon.put("#1e1f2a", to=(20, 12, 21, 13))
icon.put("#9ca5c9", to=(14, 3, 18, 5))
icon.put("#9ca5c9", to=(15, 5, 17, 6))
icon.put("#6b708f", to=(6, 14, 8, 22))
icon.put("#6b708f", to=(24, 14, 26, 22))
return icon
def _init_main_split_indicator(self) -> None:
if not self._main_splitter:
return
_ensure_console_grip_style()
indicator = ttk.Frame(self.root, style=_CONSOLE_GRIP_STYLE, width=1, cursor="sb_h_double_arrow")
indicator.bind("<ButtonPress-1>", self._start_main_split_drag)
indicator.bind("<B1-Motion>", self._drag_main_split)
indicator.bind("<ButtonRelease-1>", self._end_main_split_drag)
indicator.bind("<Enter>", lambda _evt: self._set_main_split_hover(True))
indicator.bind("<Leave>", lambda _evt: self._set_main_split_hover(False))
self._main_split_indicator = indicator
self.root.bind("<Configure>", self._sync_main_split_indicator, add="+")
self._main_splitter.bind("<Configure>", self._sync_main_split_indicator, add="+")
self._main_splitter.bind("<B1-Motion>", self._sync_main_split_indicator, add="+")
self._main_splitter.bind("<ButtonRelease-1>", self._sync_main_split_indicator, add="+")
self.root.after(0, self._sync_main_split_indicator)
def _sync_main_split_indicator(self, _event: Optional[tk.Event] = None) -> None:
indicator = self._main_split_indicator
splitter = self._main_splitter
if not indicator or not splitter:
return
try:
sash = splitter.sashpos(0)
x = splitter.winfo_x() + sash
y = splitter.winfo_y()
height = splitter.winfo_height()
except tk.TclError:
return
if height <= 1:
indicator.place_forget()
return
width = 1
if self._main_split_hover or self._main_split_active:
width = 5
indicator.place(x=max(0, x - width // 2), y=y, width=width, height=height)
def _start_main_split_drag(self, event: tk.Event) -> None:
splitter = self._main_splitter
if not splitter:
self._main_split_drag_start = None
return
try:
self._main_split_initial = splitter.sashpos(0)
except tk.TclError:
self._main_split_drag_start = None
return
self._main_split_drag_start = event.x_root
self._main_split_active = True
self._sync_main_split_indicator()
def _drag_main_split(self, event: tk.Event) -> None:
if self._main_split_drag_start is None or not self._main_splitter:
return
delta = event.x_root - self._main_split_drag_start
new_pos = self._main_split_initial + delta
try:
self._main_splitter.sashpos(0, new_pos)
except tk.TclError:
return
self._sync_main_split_indicator()
def _end_main_split_drag(self, _event: tk.Event) -> None:
self._main_split_drag_start = None
self._main_split_active = False
self._sync_main_split_indicator()
def _set_main_split_hover(self, state: bool) -> None:
self._main_split_hover = state
self._sync_main_split_indicator()
# ------------------------------------------------------------------
def poll_loop(self) -> None:
if self._closing:
return
self.dev1.poll_queue()
self.dev2.poll_queue()
if self.debugger:
self.debugger.poll_queue()
try:
self._poll_after_id = self.root.after(10, self.poll_loop)
except tk.TclError:
self._closing = True
def close(self) -> None:
if self._closing:
return
self._closing = True
if self._poll_after_id is not None:
try:
self.root.after_cancel(self._poll_after_id)
except tk.TclError:
pass
self._poll_after_id = None
self.dev1.disconnect_serial()
self.dev2.disconnect_serial()
if self.debugger:
self.debugger.shutdown()
self.debugger.disconnect()
if self._main_split_indicator:
try:
self._main_split_indicator.destroy()
except tk.TclError:
pass
self.root.destroy()
# ------------------------------------------------------------------
def _panel_connected(self, key: str, port: str) -> None:
self.port_assignments[key] = port
self._refresh_other_panels(key)
def _panel_disconnected(self, key: str, _port: str) -> None:
self.port_assignments.pop(key, None)
self._refresh_other_panels(key)
def _get_excluded_ports(self, key: str) -> List[str]:
return [port for panel_key, port in self.port_assignments.items() if panel_key != key]
def _refresh_other_panels(self, key: str) -> None:
for panel_key, panel in ("dev1", self.dev1), ("dev2", self.dev2):
if panel_key != key:
panel.refresh_ports()
def _before_control_button(self) -> None:
self._disconnect_debugger_for_reset("Arduino control command")
def _disconnect_debugger_for_reset(self, reason: str) -> None:
if not self.debugger or not self.debugger.has_active_connections():
return
note = f"Disconnecting debugger before {reason}."
for panel in (self.dev1, self.dev2):
panel.queue_message(f"[debugger] {note}")
try:
self.debugger.queue_message(f"[info] {note}")
except Exception:
pass
try:
self.debugger.disconnect()
except Exception:
pass
def _log_flash_step(self, text: str) -> None:
message = f"[flash] {text}"
self.dev2.queue_message(message)
self.dev1.queue_message(message)
def flash_arduino_sketch(self) -> None:
if self.arduino_flash_thread and self.arduino_flash_thread.is_alive():
messagebox.showinfo("Arduino Flash", "Arduino flashing is already in progress.")
return
if not os.path.exists(ARDUINO_SKETCH_PATH):
messagebox.showerror("Arduino Flash", f"Sketch not found: {ARDUINO_SKETCH_PATH}")
return
port = self.dev1.get_selected_port()
if not port:
messagebox.showwarning("Arduino Flash", "Select an Arduino COM port before flashing.")
return
if not os.path.isdir(ARDUINO_SKETCH_DIR):
messagebox.showerror("Arduino Flash", f"Sketch directory not found: {ARDUINO_SKETCH_DIR}")
return
reconnect = self.dev1.is_connected()
if reconnect:
try:
self.dev1.disconnect_serial()
except Exception:
pass
time.sleep(0.1)
if self.dev1.arduino_flash_button:
self.dev1.arduino_flash_button.configure(state="disabled")
cli_cmd = ARDUINO_CLI or "arduino-cli"
fqbn = ARDUINO_FQBN or "arduino:avr:uno"
self.dev1.queue_message(
f"[arduino] Flashing '{os.path.basename(ARDUINO_SKETCH_PATH)}' via {cli_cmd} on {port}..."
)
self._arduino_reconnect_after_flash = reconnect
thread = threading.Thread(
target=self._arduino_flash_worker,
args=(cli_cmd, fqbn, port, ARDUINO_SKETCH_DIR),
daemon=True,
)
self.arduino_flash_thread = thread
thread.start()
def _arduino_flash_worker(self, cli_cmd: str, fqbn: str, port: str, sketch_dir: str) -> None:
steps = [
("compile", [cli_cmd, "compile", "--fqbn", fqbn, sketch_dir]),
("upload", [cli_cmd, "upload", "-p", port, "--fqbn", fqbn, sketch_dir]),
]
for label, cmd in steps:
try:
process = subprocess.Popen(
cmd,
cwd=APP_ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
except FileNotFoundError:
message = "arduino-cli executable was not found. Set ARDUINO_CLI env var to its path."
self.dev1.queue_message(f"[arduino] {message}")
self._complete_arduino_flash(False, message)
return
except Exception as exc:
message = f"Unable to start arduino-cli ({label}): {exc}"
self.dev1.queue_message(f"[arduino] {message}")
self._complete_arduino_flash(False, message)
return
if process.stdout:
for line in process.stdout:
clean = line.rstrip()
if clean:
self.dev1.queue_message(f"[arduino-cli:{label}] {clean}")
ret = process.wait()
if process.stdout:
process.stdout.close()
if ret != 0:
message = f"arduino-cli {label} exited with code {ret}."
self._complete_arduino_flash(False, message)
return
self._complete_arduino_flash(True, "Arduino sketch uploaded successfully.")
def _complete_arduino_flash(self, success: bool, message: str) -> None:
def finish():
if self.dev1.arduino_flash_button:
self.dev1.arduino_flash_button.configure(state="normal")
self.arduino_flash_thread = None
reconnect = getattr(self, "_arduino_reconnect_after_flash", False)
self._arduino_reconnect_after_flash = False
if success and reconnect and not self.dev1.is_connected():
try:
self.dev1.connect_serial()
except Exception:
pass
self.dev1.queue_message(f"[arduino] {message}")
if success:
try:
messagebox.showinfo("Arduino Flash", message)
except tk.TclError:
pass
else:
try:
messagebox.showerror("Arduino Flash", message)
except tk.TclError:
pass
try:
self.root.after(0, finish)
except tk.TclError:
finish()
def _prepare_firmware_images(self) -> bool:
image_dir = os.path.join(APP_ROOT, FLASH_IMAGE_DIR)
try:
os.makedirs(image_dir, exist_ok=True)
except Exception as exc:
msg = f"Unable to create firmware directory '{image_dir}': {exc}"
self._log_flash_step(msg)
messagebox.showerror("Firmware Directory Error", msg)
return False
missing = [src for src, _ in FIRMWARE_COPY_MAP if not os.path.exists(os.path.join(image_dir, src))]
if missing:
msg = f"Missing firmware file(s): {', '.join(missing)} in {image_dir}"
self._log_flash_step(msg)
messagebox.showerror("Missing Firmware", msg)
return False
for src, dest in FIRMWARE_COPY_MAP:
src_path = os.path.join(image_dir, src)
dest_path = os.path.join(image_dir, dest)
try:
shutil.copyfile(src_path, dest_path)
self._log_flash_step(f"Prepared '{dest}' from '{src}'.")
except Exception as exc:
err = f"Failed to copy '{src}' to '{dest}': {exc}"
self._log_flash_step(err)
messagebox.showerror("Firmware Copy Failed", err)
return False
return True
def _send_arduino_pg_reset(self, value: int) -> bool:
self._disconnect_debugger_for_reset("Arduino PG/reset sequence")
if not self.dev1.is_connected():
self._log_flash_step("Arduino is not connected; cannot toggle PG.")
return False
pg_cmd = f"pg {value}"
self._log_flash_step(f"Arduino command → '{pg_cmd}'")
self.dev1.send_uart(pg_cmd)
self._log_flash_step("Arduino command → 'reset'")
self.dev1.send_uart("reset")
return True
def _set_flash_controls_state(self, enabled: bool) -> None:
state = "normal" if enabled else "disabled"
for widget in (self.dev2.flash_button, self.dev2.connect_button, self.dev1.connect_button):
if widget is not None:
widget.configure(state=state)
def flash_ameba(self) -> None:
if self.flash_thread and self.flash_thread.is_alive():
messagebox.showinfo("Flash in Progress", "Please wait for the current flash.py run to finish.")
return
self._disconnect_debugger_for_reset("flash.py run")
if not self.dev1.is_connected():
messagebox.showwarning("Arduino Not Connected", "Connect Device 1 (Arduino) before flashing.")
return
if not self.dev2.is_connected():
messagebox.showwarning("Not Connected", "Connect Device 2 (AmebaPro3) before flashing.")
return
port = self.dev2.get_selected_port()
if not port:
messagebox.showwarning("No Port", "Select a COM port for AmebaPro3 before flashing.")
return
if not self._prepare_firmware_images():
return
self._log_flash_step(f"Starting flash.py on {port}.")
self._set_flash_controls_state(False)
if not self._send_arduino_pg_reset(1):
self._set_flash_controls_state(True)
messagebox.showerror("Arduino Error", "Unable to set PG=1 and reset through Arduino.")
return
self._log_flash_step("Disconnecting AmebaPro3 UART before flashing.")
self.dev2.disconnect_serial()
self.flash_thread = threading.Thread(target=self._flash_worker, args=(port,), daemon=True)
self.flash_thread.start()
def _flash_worker(self, port: str) -> None:
command = self._build_flash_command(port)
self.dev2.queue_message(f"[flash] Executing: {' '.join(command)}")
try:
process = subprocess.Popen(
command,
cwd=APP_ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
except Exception as exc:
self._flash_complete(False, f"Failed to start flash.py: {exc}")
return
if process.stdout:
timestamp_pattern = re.compile(r"\[\d{4}-\d{2}-\d{2} [^\]]+\]")
for line in process.stdout:
clean = line.rstrip()
if not clean:
continue
sanitized = timestamp_pattern.sub("", clean).strip()
if not sanitized:
continue
self.dev2.queue_message(f"[flash.py] {sanitized}")
ret = process.wait()
if process.stdout:
process.stdout.close()
success = ret == 0
message = "flash.py completed successfully." if success else f"flash.py exited with code {ret}."
self._flash_complete(success, message)
def _flash_complete(self, success: bool, message: str) -> None:
def finalize():
self.dev2.queue_message(f"[flash] {message}")
self._set_flash_controls_state(True)
self.flash_thread = None
if not self._closing:
self._log_flash_step("Reconnecting AmebaPro3 UART...")
try:
self.dev2.connect_serial()
except Exception:
pass
if self.dev2.is_connected():
self._log_flash_step("AmebaPro3 reconnected. Lowering PG and resetting.")
self._send_arduino_pg_reset(0)
else:
self._log_flash_step("Failed to reconnect AmebaPro3; PG reset skipped.")
if not success and not self._closing:
messagebox.showerror("Flash Failed", message)
try:
self.root.after(0, finalize)
except tk.TclError:
pass
def _build_flash_command(self, port: str) -> List[str]:
args = [
"-d",
"--profile", FLASH_PROFILE,
"--image-dir", FLASH_IMAGE_DIR,
"--baudrate", FLASH_BAUDRATE,
"--memory-type", FLASH_MEMORY_TYPE,
"--port", port,
]
exe = self._locate_flash_executable()
if exe:
return [exe] + args
script = os.path.join(APP_ROOT, "flash.py")
return [sys.executable, script] + args
def _locate_flash_executable(self) -> Optional[str]:
candidates = []
if getattr(sys, "frozen", False):
candidates.append(os.path.dirname(sys.executable))
candidates.append(APP_ROOT)
for directory in candidates:
exe = os.path.join(directory, "flash.exe")
if os.path.exists(exe):
return exe
return None
if __name__ == "__main__":
root = tk.Tk()
DualUARTApp(root)
root.mainloop()