ameba_control_panel_v3/Flash/flash_amebapro3.py
wongyiekheng c92fbe7548 Second Commit
Major refactor of Ameba Control Panel v3.1.0:
- Three-column layout: icon sidebar, config+history, log view
- Dracula PRO theme with light/dark toggle
- DTR/RTS GPIO control (replaces ASCII commands)
- Multi-CDC firmware support for AmebaSmart control device
- Dynamic DUT tabs with +/- management
- NN Model flash image support
- Settings dialog (Font, Serial, Flash, Command tabs)
- Background port scanning, debounced session store
- Adaptive log flush rate, format cache optimization
- Smooth sidebar animation, deferred startup
- pytest test framework with session/log/settings tests
- Thread safety fixes: _alive guards, parented timers, safe baud parsing
- Find highlight: needle-only highlighting with focused match color
- Partial line buffering for table output
- PyInstaller packaging with version stamp and module exclusions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 13:01:12 +08:00

623 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Wrapper for flashing AmebaPro3 via a USBCDC bridge (e.g., AmebaSmart).
It forces the flasher to treat the port as a plain UART so the ROM XMODEM
protocol uses 1024byte STX frames instead of the 2048byte USB variant.
Command line arguments are identical to flash.py.
"""
import os
import sys
import argparse
import pathlib
import time
import json
import base64
from typing import Callable
import serial
from serial import SerialException
# Ensure local imports resolve to the sibling flash package files.
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
if BASE_DIR not in sys.path:
sys.path.insert(0, BASE_DIR)
# Patch Ameba hooks to force UART mode and to prefer floader handshake first.
import base.download_handler as download_handler # type: ignore
from base.json_utils import JsonUtils # type: ignore
from base.rt_settings import RtSettings # type: ignore
from base.device_profile import RtkDeviceProfile # type: ignore
from base.rtk_logging import create_logger # type: ignore
from base.download_handler import ErrType # type: ignore
from base.rtk_utils import RtkUtils # type: ignore
from base import rom_handler as base_rom_handler # type: ignore
# Force UART path (so STX = 1024B).
download_handler.Ameba.is_realtek_usb = lambda self: False
def _check_download_mode_floader_first(self):
"""Prefer floader handshake, then fall back to ROM + reset sequence."""
ErrType = download_handler.ErrType
CmdEsc = download_handler.CmdEsc
CmdSetBackupRegister = download_handler.CmdSetBackupRegister
CmdResetIntoDownloadMode = download_handler.CmdResetIntoDownloadMode
ret = ErrType.SYS_IO
is_floader = False
boot_delay = self.setting.usb_rom_boot_delay_in_second if self.profile_info.support_usb_download else self.setting.rom_boot_delay_in_second
self.logger.debug(f"Check download mode with baudrate {self.serial_port.baudrate}")
retry = 0
while retry < 3:
retry += 1
try:
# 1) See if flashloader is already running (common after stage-1).
self.logger.debug(f"Check whether in floader with baudrate {self.baudrate}")
ret, status = self.floader_handler.sense(self.setting.sync_response_timeout_in_second)
if ret == ErrType.OK:
is_floader = True
self.logger.debug("Floader handshake ok")
break
else:
self.logger.debug(f"Floader handshake fail: {ret}")
# 2) ROM handshake.
self.logger.debug(f"Check whether in rom download mode")
ret = self.rom_handler.handshake()
if ret == ErrType.OK:
self.logger.debug(f"Handshake ok, device in rom download mode")
break
# 3) Try to reset into ROM download mode (UART path only).
if not self.is_usb:
self.logger.debug(
f'Assume in application or ROM normal mode with baudrate {self.profile_info.log_baudrate}')
self.switch_baudrate(self.profile_info.log_baudrate, self.setting.baudrate_switch_delay_in_second)
self.logger.debug("Try to reset device...")
self.serial_port.flushOutput()
self.write_bytes(CmdEsc)
time.sleep(0.1)
if self.profile_info.is_amebad():
self.serial_port.flushOutput()
self.write_bytes(CmdSetBackupRegister)
time.sleep(0.1)
self.serial_port.flushOutput()
self.write_bytes(CmdResetIntoDownloadMode)
self.switch_baudrate(self.profile_info.handshake_baudrate, boot_delay, True)
self.logger.debug(
f'Check whether reset in ROM download mode with baudrate {self.profile_info.handshake_baudrate}')
ret = self.rom_handler.handshake()
if ret == ErrType.OK:
self.logger.debug("Handshake ok, device in ROM download mode")
break
else:
self.logger.debug("Handshake fail, cannot enter UART download mode")
self.switch_baudrate(self.baudrate, self.setting.baudrate_switch_delay_in_second, True)
except Exception as err:
self.logger.error(f"Check download mode exception: {err}")
return ret, is_floader
download_handler.Ameba.check_download_mode = _check_download_mode_floader_first
# ---------------------------------------------------------------------------
# Resource path patches (packaged exe support)
# ---------------------------------------------------------------------------
_orig_get_root = RtkUtils.get_executable_root_path
def _patched_get_root():
"""
Ensure flash assets resolve in both source and PyInstaller builds.
Priority:
1) _MEIPASS/Flash (onefile extraction)
2) dir of the running executable + /Flash
3) fallback to original logic
"""
# PyInstaller onefile: files are extracted to _MEIPASS
mei = getattr(sys, "_MEIPASS", None)
if mei:
flash_dir = os.path.join(mei, "Flash")
if os.path.isdir(flash_dir):
return flash_dir
# PyInstaller onedir: Flash sits next to the exe
exe_dir = os.path.dirname(os.path.abspath(sys.executable))
flash_dir = os.path.join(exe_dir, "Flash")
if os.path.isdir(flash_dir):
return flash_dir
return _orig_get_root()
RtkUtils.get_executable_root_path = staticmethod(_patched_get_root)
def _patched_get_floader_path(self):
"""
Wrapper around RomHandler.get_floader_path to retry common packaged locations
without touching base code. Uses patched root (above) first, then falls back
to the original implementation for source runs.
"""
# Prefer patched root (covers _MEIPASS/Flash and exe_dir/Flash)
root = RtkUtils.get_executable_root_path()
if self.profile.floader:
candidate = os.path.realpath(
os.path.join(root, base_rom_handler.FloaderDictionary, self.profile.floader)
)
if os.path.exists(candidate):
return candidate
# Fallback: original behavior
return _orig_rom_get_floader_path(self)
_orig_rom_get_floader_path = base_rom_handler.RomHandler.get_floader_path
base_rom_handler.RomHandler.get_floader_path = _patched_get_floader_path
# Reuse the original entry point and argument parsing.
import flash # type: ignore
def resolve_profile_path(path: str) -> str:
"""Return an absolute profile path. If relative and not found from CWD, try relative to BASE_DIR."""
p = pathlib.Path(path)
if p.is_absolute():
return str(p)
if p.exists():
return str(p.resolve())
alt = pathlib.Path(BASE_DIR) / path
if alt.exists():
return str(alt.resolve())
return str(p) # let flash.py error out visibly
def main():
default_profile = str((pathlib.Path(BASE_DIR) / "Devices/Profiles/AmebaPro3_FreeRTOS_NOR.rdev").resolve())
parser = argparse.ArgumentParser(
description="AmebaPro3 flasher wrapper (UART mode via USB-CDC bridge).",
add_help=True,
)
parser.add_argument("-p", "--port", help="CDC device of AmebaSmart bridge for BOOT/RESET commands (e.g. /dev/ttyACM0 or COM29)")
parser.add_argument("-P", "--bridge-port", dest="bridge_port", help="Alias for --port (bridge CDC device), if both set, --bridge-port wins")
parser.add_argument("-t", "--target", dest="target_port", help="Serial/CDC device to the target SoC for flashing (passed to flash.py)")
parser.add_argument("-B", "--baudrate", type=int, default=1500000, help="Baudrate (default 1500000)")
parser.add_argument("-m", "--memory-type", default="nor", choices=["nor", "nand", "ram"], help="Memory type")
parser.add_argument("--profile", default=default_profile, help="Device profile (.rdev)")
parser.add_argument("-b", "--boot", help="Boot image path (e.g. amebapro3_boot.bin)")
parser.add_argument("-a", "--app", help="App image path (e.g. amebapro3_app.bin)")
parser.add_argument("-i", "--image-dir", dest="image_dir", help="Override image directory (skips -a/-b)")
parser.add_argument("-bin", dest="single_image", help="Single image path (custom address mode)")
parser.add_argument("-s", "--start", dest="start_addr", help="Start address (hex) for single image")
parser.add_argument("-e", "--end", dest="end_addr", help="End address (hex) for single image")
parser.add_argument(
"--multi-bin",
dest="multi_bin",
action="append",
nargs=3,
metavar=("IMAGE", "START", "END"),
help="Repeatable image triplet for one-shot flashing.",
)
parser.add_argument("--debug", action="store_true", help="Set log-level to debug")
parser.add_argument("--auto-dtr", action="store_true",
help="(Deprecated) legacy auto DTR/RTS toggle; ignored when using GPIO helper.")
parser.add_argument("--dtr-rts", action="store_true", dest="dtr_rts",
help="Use DTR/RTS on the target port for BOOT/RESET control instead of ASCII GPIO commands on a bridge port.")
parser.add_argument("-r", "--reset", action="store_true",
help="Force reset to normal mode after flashing (post-process RESET). Default unless -dl 1.")
parser.add_argument("-dl", "--download-mode", type=int, choices=[0, 1],
help="1: stay in download mode (no reset). 0: exit to normal mode (reset).")
parser.add_argument("extra", nargs=argparse.REMAINDER, help="Additional args passed through to flash.py")
args = parser.parse_args()
def uart_index_from_port(port: str) -> int | None:
# Extract trailing digits; COM29 -> 29, /dev/ttyACM0 -> 0, else None
import re
m = re.search(r'(\d+)$', port or "")
return int(m.group(1)) if m else None
bridge_port = args.bridge_port or args.port
target_port = args.target_port
def parse_addr(text: str) -> int | None:
try:
value = int(text, 0)
except Exception:
return None
return value if value >= 0 else None
multi_images: list[tuple[pathlib.Path, int, int]] = []
if args.multi_bin:
for image_path, start_text, end_text in args.multi_bin:
path = pathlib.Path(image_path).expanduser()
if not path.exists():
print(f"Multi image not found: {path}", file=sys.stderr)
sys.exit(1)
start_value = parse_addr(start_text)
end_value = parse_addr(end_text)
if start_value is None or end_value is None or start_value >= end_value:
print(f"Invalid multi image range: {image_path} {start_text} {end_text}", file=sys.stderr)
sys.exit(1)
multi_images.append((path, start_value, end_value))
if multi_images and any([args.single_image, args.boot, args.app, args.image_dir]):
print("Cannot combine --multi-bin with -bin/-s/-e, -a/-b, or -i.", file=sys.stderr)
sys.exit(1)
# Decide image directory vs per-image flashing.
image_dir = args.image_dir
if multi_images:
boot_path = None
app_path = None
image_dir = None
start_addr = None
end_addr = None
elif args.single_image:
boot_path = pathlib.Path(args.single_image).expanduser()
if not boot_path.exists():
print("Custom image not found", file=sys.stderr)
sys.exit(1)
app_path = None
image_dir = None
if not (args.start_addr and args.end_addr):
print("Custom image requires --start and --end addresses (hex)", file=sys.stderr)
sys.exit(1)
start_addr = args.start_addr
end_addr = args.end_addr
elif args.boot or args.app:
if not (args.boot and args.app):
print("Both --boot and --app must be provided together", file=sys.stderr)
sys.exit(1)
boot_path = pathlib.Path(args.boot).expanduser()
app_path = pathlib.Path(args.app).expanduser()
if not boot_path.exists() or not app_path.exists():
print("Boot or app image not found", file=sys.stderr)
sys.exit(1)
# per-image flashing, so don't pass image-dir
image_dir = None
start_addr = None
end_addr = None
else:
boot_path = None
app_path = None
start_addr = None
end_addr = None
# If the user supplied an image directory (-i) but no explicit boot/app,
# map the standard images inside that directory to the fixed boot/app
# address ranges expected by AmebaPro3 (boot: 0x08000000-0x08040000,
# app: 0x08040000-0x08440000). This avoids relying on the profile's
# partition table size checks, which can reject larger binaries.
if image_dir and not (boot_path or app_path or args.single_image or multi_images):
candidate_boot = pathlib.Path(image_dir) / "amebapro3_boot.bin"
candidate_app = pathlib.Path(image_dir) / "amebapro3_app.bin"
if candidate_boot.exists() and candidate_app.exists():
boot_path = candidate_boot
app_path = candidate_app
image_dir = None # switch to per-image flashing path below
start_addr = None
end_addr = None
# If invoked via a basename containing "reset" (e.g., pro3_reset), default to reset-only.
exe_stem = pathlib.Path(sys.argv[0]).stem.lower()
if (args.download_mode is None) and (not args.reset) and ("reset" in exe_stem):
args.reset = True
args.download_mode = 0
# Control-only flags: -dl / -r are meant for toggling modes, not flashing.
control_only = (args.download_mode is not None) or args.reset
# Optional: enable DTR/RTS driven auto boot/reset by patching settings load.
# We monkeypatch JsonUtils.load_from_file to inject the flags before flash.py reads Settings.json.
created_files = []
patched_load = False
# If control-only flags are provided alongside any flash inputs, bail out early.
if control_only and any([args.single_image, boot_path, app_path, image_dir, multi_images]):
print("Cannot combine -dl/-r with flashing arguments; use them alone to toggle mode/reset.", file=sys.stderr)
sys.exit(1)
# Disable built-in DTR/RTS automation; we'll drive GPIO via pro3_gpio helper.
orig_load: Callable = JsonUtils.load_from_file
def patched_load(path, need_decrypt=True):
# Only inject for Settings.json; leave profiles untouched.
if os.path.basename(path) == "Settings.json":
# Settings.json in this project is plain JSON (not DES/base64),
# so force no-decrypt to avoid parse errors.
data = orig_load(path, False) or {}
data["AutoSwitchToDownloadModeWithDtrRts"] = 0
data["AutoResetDeviceWithDtrRts"] = 0
data["PostProcess"] = "NONE"
return data
return orig_load(path, need_decrypt)
JsonUtils.load_from_file = patched_load # type: ignore
patched_load = True
# -- GPIO control: ASCII commands (legacy) --------------------------------
def send_boot_reset(port: str, baud: int, idx: int, boot: int | None = None, reset: int | None = None):
"""ASCII commands understood by AmebaSmart CDC bridge."""
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
if boot is not None:
ser.write(f"BOOT {idx} {boot}\r\n".encode())
if reset is not None:
ser.write(f"RESET {idx} {reset}\r\n".encode())
ser.flush()
def boot_seq_ascii(port: str, baud: int, idx: int):
"""Enter download mode via ASCII GPIO commands on control port."""
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
ser.write(f"BOOT {idx} 1\r\n".encode())
ser.write(f"RESET {idx} 1\r\n".encode())
ser.flush()
time.sleep(0.1)
ser.write(f"RESET {idx} 0\r\n".encode())
ser.flush()
time.sleep(0.1)
ser.write(f"RESET {idx} 1\r\n".encode())
ser.flush()
time.sleep(0.2)
def reset_seq_ascii(port: str, baud: int, idx: int):
"""Pulse RESET only via ASCII GPIO commands."""
send_boot_reset(port, baud, idx, reset=1)
time.sleep(0.05)
send_boot_reset(port, baud, idx, reset=0)
time.sleep(0.05)
send_boot_reset(port, baud, idx, reset=1)
def reset_exit_seq_ascii(port: str, baud: int, idx: int):
"""Exit download mode via ASCII GPIO commands."""
time.sleep(0.1)
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
ser.write(f"BOOT {idx} 0\r\n".encode())
ser.flush()
time.sleep(0.2)
ser.write(f"RESET {idx} 1\r\n".encode())
ser.flush()
time.sleep(0.2)
ser.write(f"RESET {idx} 0\r\n".encode())
ser.flush()
time.sleep(0.2)
ser.write(f"RESET {idx} 1\r\n".encode())
ser.flush()
time.sleep(0.5)
def release_to_normal_ascii(port: str, baud: int, idx: int):
"""After flashing: BOOT=0, RESET high via ASCII GPIO."""
reset_exit_seq_ascii(port, baud, idx)
time.sleep(0.2)
try:
send_boot_reset(port, baud, idx, boot=0)
send_boot_reset(port, baud, idx, reset=1)
except SerialException:
pass
# -- GPIO control: DTR/RTS on DUT port (no control port needed) ----------
# Mapping: DTR → BOOT pin, RTS → CHIP_EN (reset) pin.
# True = asserted, False = released.
def boot_seq_dtr(port: str, baud: int):
"""Enter download mode via DTR/RTS on the DUT port."""
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
ser.setDTR(True) # BOOT asserted
ser.setRTS(True) # RESET asserted (hold chip in reset)
time.sleep(0.1)
ser.setRTS(False) # RESET released → chip starts with BOOT=1 → download mode
time.sleep(0.1)
ser.setRTS(True) # RESET back to idle
time.sleep(0.2)
def reset_seq_dtr(port: str, baud: int):
"""Pulse RESET via RTS (BOOT unchanged)."""
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
ser.setRTS(True)
time.sleep(0.05)
ser.setRTS(False)
time.sleep(0.05)
ser.setRTS(True)
def reset_exit_seq_dtr(port: str, baud: int):
"""Exit download mode via DTR/RTS: BOOT=0, pulse RESET."""
time.sleep(0.1)
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
ser.setDTR(False) # BOOT released
time.sleep(0.2)
ser.setRTS(True) # RESET assert
time.sleep(0.2)
ser.setRTS(False) # RESET release → chip starts with BOOT=0 → normal mode
time.sleep(0.2)
ser.setRTS(True) # idle
time.sleep(0.5)
def release_to_normal_dtr(port: str, baud: int):
"""After flashing: BOOT=0, RESET idle via DTR/RTS."""
reset_exit_seq_dtr(port, baud)
time.sleep(0.2)
try:
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
ser.setDTR(False)
ser.setRTS(True)
except SerialException:
pass
# -- Dispatch: pick ASCII or DTR/RTS based on --dtr-rts flag -------------
use_dtr_rts = args.dtr_rts
def boot_seq(port: str, baud: int, idx: int = 0):
if use_dtr_rts:
boot_seq_dtr(port, baud)
else:
boot_seq_ascii(port, baud, idx)
def reset_seq(port: str, baud: int, idx: int = 0):
if use_dtr_rts:
reset_seq_dtr(port, baud)
else:
reset_seq_ascii(port, baud, idx)
def reset_exit_seq(port: str, baud: int, idx: int = 0):
if use_dtr_rts:
reset_exit_seq_dtr(port, baud)
else:
reset_exit_seq_ascii(port, baud, idx)
def release_to_normal(port: str, baud: int, idx: int = 0):
if use_dtr_rts:
release_to_normal_dtr(port, baud)
else:
release_to_normal_ascii(port, baud, idx)
# Reset-only / mode-only handling: if -dl/-r is provided without any images,
# just toggle the lines and exit without running flash.py.
if control_only:
if use_dtr_rts:
ctrl_port = target_port or bridge_port
else:
ctrl_port = bridge_port or args.port
if not ctrl_port:
print("A port is required for GPIO control (bridge port or --dtr-rts with target port)", file=sys.stderr)
sys.exit(1)
idx = 0
try:
if args.download_mode == 1:
boot_seq(ctrl_port, args.baudrate, idx)
elif args.download_mode == 0:
reset_exit_seq(ctrl_port, args.baudrate, idx)
elif args.reset:
reset_seq(ctrl_port, args.baudrate, idx)
sys.exit(0)
finally:
if patched_load:
JsonUtils.load_from_file = orig_load # type: ignore
def build_partition_entry(image_path, start_addr: int, end_addr: int) -> dict:
p = pathlib.Path(image_path) if isinstance(image_path, str) else image_path
return {
"ImageName": str(p),
"StartAddress": start_addr,
"EndAddress": end_addr,
"MemoryType": 1, # NOR
"FullErase": False,
"Mandatory": True,
"Description": p.name,
}
def encode_partition_table(entries: list[dict]) -> str:
return base64.b64encode(json.dumps(entries).encode()).decode()
def run_flash(argv_tail):
"""Invoke flash.main with a constructed argv list."""
original_argv = sys.argv
try:
sys.argv = ["flash.py"] + argv_tail
try:
flash.main(len(argv_tail), argv_tail)
return 0
except SystemExit as e:
return int(e.code) if isinstance(e.code, int) else 1
finally:
sys.argv = original_argv
def flash_with_retry(argv, label="flash"):
"""Run flash, retry once with boot_seq if first attempt fails."""
rc = run_flash(argv)
if rc != 0 and gpio_port:
boot_seq(gpio_port, args.baudrate, 0)
rc = run_flash(argv)
if rc != 0:
print(f"{label} failed (code {rc}).", file=sys.stderr)
sys.exit(rc)
return rc
rc = 0
try:
flash_port = target_port or bridge_port
if not flash_port:
print("Target port (--target) or bridge port (--port) required for flashing", file=sys.stderr)
sys.exit(1)
common = [
"-d",
"--profile", resolve_profile_path(args.profile),
"--baudrate", str(args.baudrate),
"--memory-type", args.memory_type,
"--log-level", "debug" if args.debug else "info",
]
if flash_port:
common += ["--port", flash_port]
common += args.extra
# Pre-drive into download mode before flashing.
gpio_port = flash_port if use_dtr_rts else bridge_port
if not control_only and gpio_port:
boot_seq(gpio_port, args.baudrate, 0)
if multi_images:
partition_entries = [build_partition_entry(p, s, e) for p, s, e in multi_images]
merged_argv = common + ["--partition-table", encode_partition_table(partition_entries)]
rc = flash_with_retry(merged_argv, "Multi-image flash")
elif args.single_image:
single_argv = common + [
"--image", str(boot_path),
"--start-address", start_addr,
"--end-address", end_addr,
]
rc = run_flash(single_argv)
if rc != 0:
sys.exit(rc)
elif boot_path and app_path:
partition_entries = [
build_partition_entry(boot_path, 0x08000000, 0x08040000),
build_partition_entry(app_path, 0x08040000, 0x08440000),
]
merged_argv = common + ["--partition-table", encode_partition_table(partition_entries)]
rc = flash_with_retry(merged_argv, "Single-pass flash")
else:
dir_argv = common.copy()
if image_dir:
dir_argv += ["--image-dir", image_dir]
rc = run_flash(dir_argv)
if rc != 0:
sys.exit(rc)
finally:
if patched_load:
JsonUtils.load_from_file = orig_load # type: ignore
# After flashing, release BOOT and reset to normal mode.
if not control_only:
post_port = gpio_port or flash_port
if post_port:
for _ in range(3):
try:
release_to_normal(post_port, args.baudrate, 0)
break
except SerialException:
time.sleep(0.5)
else:
try:
if use_dtr_rts:
release_to_normal_dtr(post_port, args.baudrate)
else:
send_boot_reset(post_port, args.baudrate, 0, boot=0)
send_boot_reset(post_port, args.baudrate, 0, reset=1)
except SerialException:
pass
if __name__ == "__main__":
main()