523 lines
22 KiB
Python
523 lines
22 KiB
Python
#! /usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Wrapper for flashing AmebaPro3 via a USB‑CDC bridge (e.g., AmebaSmart).
|
||
|
||
It forces the flasher to treat the port as a plain UART so the ROM XMODEM
|
||
protocol uses 1024‑byte STX frames instead of the 2048‑byte USB variant.
|
||
Command line arguments are identical to flash.py.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import argparse
|
||
import pathlib
|
||
import time
|
||
import json
|
||
import base64
|
||
from typing import Callable
|
||
import serial
|
||
from serial import SerialException
|
||
|
||
# Ensure local imports resolve to the sibling flash package files.
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
if BASE_DIR not in sys.path:
|
||
sys.path.insert(0, BASE_DIR)
|
||
|
||
# Patch Ameba hooks to force UART mode and to prefer floader handshake first.
|
||
import base.download_handler as download_handler # type: ignore
|
||
from base.json_utils import JsonUtils # type: ignore
|
||
from base.rt_settings import RtSettings # type: ignore
|
||
from base.device_profile import RtkDeviceProfile # type: ignore
|
||
from base.rtk_logging import create_logger # type: ignore
|
||
from base.download_handler import ErrType # type: ignore
|
||
from base.rtk_utils import RtkUtils # type: ignore
|
||
from base import rom_handler as base_rom_handler # type: ignore
|
||
|
||
# Force UART path (so STX = 1024B).
|
||
download_handler.Ameba.is_realtek_usb = lambda self: False
|
||
|
||
|
||
def _check_download_mode_floader_first(self):
|
||
"""Prefer floader handshake, then fall back to ROM + reset sequence."""
|
||
ErrType = download_handler.ErrType
|
||
CmdEsc = download_handler.CmdEsc
|
||
CmdSetBackupRegister = download_handler.CmdSetBackupRegister
|
||
CmdResetIntoDownloadMode = download_handler.CmdResetIntoDownloadMode
|
||
|
||
ret = ErrType.SYS_IO
|
||
is_floader = False
|
||
boot_delay = self.setting.usb_rom_boot_delay_in_second if self.profile_info.support_usb_download else self.setting.rom_boot_delay_in_second
|
||
|
||
self.logger.debug(f"Check download mode with baudrate {self.serial_port.baudrate}")
|
||
retry = 0
|
||
while retry < 3:
|
||
retry += 1
|
||
try:
|
||
# 1) See if flashloader is already running (common after stage-1).
|
||
self.logger.debug(f"Check whether in floader with baudrate {self.baudrate}")
|
||
ret, status = self.floader_handler.sense(self.setting.sync_response_timeout_in_second)
|
||
if ret == ErrType.OK:
|
||
is_floader = True
|
||
self.logger.debug("Floader handshake ok")
|
||
break
|
||
else:
|
||
self.logger.debug(f"Floader handshake fail: {ret}")
|
||
|
||
# 2) ROM handshake.
|
||
self.logger.debug(f"Check whether in rom download mode")
|
||
ret = self.rom_handler.handshake()
|
||
if ret == ErrType.OK:
|
||
self.logger.debug(f"Handshake ok, device in rom download mode")
|
||
break
|
||
|
||
# 3) Try to reset into ROM download mode (UART path only).
|
||
if not self.is_usb:
|
||
self.logger.debug(
|
||
f'Assume in application or ROM normal mode with baudrate {self.profile_info.log_baudrate}')
|
||
self.switch_baudrate(self.profile_info.log_baudrate, self.setting.baudrate_switch_delay_in_second)
|
||
|
||
self.logger.debug("Try to reset device...")
|
||
|
||
self.serial_port.flushOutput()
|
||
self.write_bytes(CmdEsc)
|
||
time.sleep(0.1)
|
||
|
||
if self.profile_info.is_amebad():
|
||
self.serial_port.flushOutput()
|
||
self.write_bytes(CmdSetBackupRegister)
|
||
time.sleep(0.1)
|
||
|
||
self.serial_port.flushOutput()
|
||
self.write_bytes(CmdResetIntoDownloadMode)
|
||
|
||
self.switch_baudrate(self.profile_info.handshake_baudrate, boot_delay, True)
|
||
|
||
self.logger.debug(
|
||
f'Check whether reset in ROM download mode with baudrate {self.profile_info.handshake_baudrate}')
|
||
|
||
ret = self.rom_handler.handshake()
|
||
if ret == ErrType.OK:
|
||
self.logger.debug("Handshake ok, device in ROM download mode")
|
||
break
|
||
else:
|
||
self.logger.debug("Handshake fail, cannot enter UART download mode")
|
||
|
||
self.switch_baudrate(self.baudrate, self.setting.baudrate_switch_delay_in_second, True)
|
||
except Exception as err:
|
||
self.logger.error(f"Check download mode exception: {err}")
|
||
|
||
return ret, is_floader
|
||
|
||
|
||
download_handler.Ameba.check_download_mode = _check_download_mode_floader_first
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Resource path patches (packaged exe support)
|
||
# ---------------------------------------------------------------------------
|
||
_orig_get_root = RtkUtils.get_executable_root_path
|
||
|
||
|
||
def _patched_get_root():
|
||
"""
|
||
Ensure flash assets resolve in both source and PyInstaller builds.
|
||
Priority:
|
||
1) _MEIPASS/Flash (onefile extraction)
|
||
2) dir of the running executable + /Flash
|
||
3) fallback to original logic
|
||
"""
|
||
# PyInstaller onefile: files are extracted to _MEIPASS
|
||
mei = getattr(sys, "_MEIPASS", None)
|
||
if mei:
|
||
flash_dir = os.path.join(mei, "Flash")
|
||
if os.path.isdir(flash_dir):
|
||
return flash_dir
|
||
|
||
# PyInstaller onedir: Flash sits next to the exe
|
||
exe_dir = os.path.dirname(os.path.abspath(sys.executable))
|
||
flash_dir = os.path.join(exe_dir, "Flash")
|
||
if os.path.isdir(flash_dir):
|
||
return flash_dir
|
||
|
||
return _orig_get_root()
|
||
|
||
|
||
RtkUtils.get_executable_root_path = staticmethod(_patched_get_root)
|
||
|
||
|
||
def _patched_get_floader_path(self):
|
||
"""
|
||
Wrapper around RomHandler.get_floader_path to retry common packaged locations
|
||
without touching base code. Uses patched root (above) first, then falls back
|
||
to the original implementation for source runs.
|
||
"""
|
||
# Prefer patched root (covers _MEIPASS/Flash and exe_dir/Flash)
|
||
root = RtkUtils.get_executable_root_path()
|
||
if self.profile.floader:
|
||
candidate = os.path.realpath(
|
||
os.path.join(root, base_rom_handler.FloaderDictionary, self.profile.floader)
|
||
)
|
||
if os.path.exists(candidate):
|
||
return candidate
|
||
|
||
# Fallback: original behavior
|
||
return _orig_rom_get_floader_path(self)
|
||
|
||
|
||
_orig_rom_get_floader_path = base_rom_handler.RomHandler.get_floader_path
|
||
base_rom_handler.RomHandler.get_floader_path = _patched_get_floader_path
|
||
|
||
# Reuse the original entry point and argument parsing.
|
||
import flash # type: ignore
|
||
|
||
|
||
def resolve_profile_path(path: str) -> str:
|
||
"""Return an absolute profile path. If relative and not found from CWD, try relative to BASE_DIR."""
|
||
p = pathlib.Path(path)
|
||
if p.is_absolute():
|
||
return str(p)
|
||
if p.exists():
|
||
return str(p.resolve())
|
||
alt = pathlib.Path(BASE_DIR) / path
|
||
if alt.exists():
|
||
return str(alt.resolve())
|
||
return str(p) # let flash.py error out visibly
|
||
|
||
|
||
def main():
|
||
default_profile = str((pathlib.Path(BASE_DIR) / "Devices/Profiles/AmebaPro3_FreeRTOS_NOR.rdev").resolve())
|
||
parser = argparse.ArgumentParser(
|
||
description="AmebaPro3 flasher wrapper (UART mode via USB-CDC bridge).",
|
||
add_help=True,
|
||
)
|
||
parser.add_argument("-p", "--port", help="CDC device of AmebaSmart bridge for BOOT/RESET commands (e.g. /dev/ttyACM0 or COM29)")
|
||
parser.add_argument("-P", "--bridge-port", dest="bridge_port", help="Alias for --port (bridge CDC device), if both set, --bridge-port wins")
|
||
parser.add_argument("-t", "--target", dest="target_port", help="Serial/CDC device to the target SoC for flashing (passed to flash.py)")
|
||
parser.add_argument("-B", "--baudrate", type=int, default=1500000, help="Baudrate (default 1500000)")
|
||
parser.add_argument("-m", "--memory-type", default="nor", choices=["nor", "nand", "ram"], help="Memory type")
|
||
parser.add_argument("--profile", default=default_profile, help="Device profile (.rdev)")
|
||
parser.add_argument("-b", "--boot", help="Boot image path (e.g. amebapro3_boot.bin)")
|
||
parser.add_argument("-a", "--app", help="App image path (e.g. amebapro3_app.bin)")
|
||
parser.add_argument("-i", "--image-dir", dest="image_dir", help="Override image directory (skips -a/-b)")
|
||
parser.add_argument("-bin", dest="single_image", help="Single image path (custom address mode)")
|
||
parser.add_argument("-s", "--start", dest="start_addr", help="Start address (hex) for single image")
|
||
parser.add_argument("-e", "--end", dest="end_addr", help="End address (hex) for single image")
|
||
parser.add_argument("--debug", action="store_true", help="Set log-level to debug")
|
||
parser.add_argument("--auto-dtr", action="store_true",
|
||
help="(Deprecated) legacy auto DTR/RTS toggle; ignored when using GPIO helper.")
|
||
parser.add_argument("-r", "--reset", action="store_true",
|
||
help="Force reset to normal mode after flashing (post-process RESET). Default unless -dl 1.")
|
||
parser.add_argument("-dl", "--download-mode", type=int, choices=[0, 1],
|
||
help="1: stay in download mode (no reset). 0: exit to normal mode (reset).")
|
||
parser.add_argument("extra", nargs=argparse.REMAINDER, help="Additional args passed through to flash.py")
|
||
|
||
args = parser.parse_args()
|
||
|
||
def uart_index_from_port(port: str) -> int | None:
|
||
# Extract trailing digits; COM29 -> 29, /dev/ttyACM0 -> 0, else None
|
||
import re
|
||
m = re.search(r'(\d+)$', port or "")
|
||
return int(m.group(1)) if m else None
|
||
|
||
bridge_port = args.bridge_port or args.port
|
||
target_port = args.target_port
|
||
|
||
# Decide image directory vs per-image flashing.
|
||
image_dir = args.image_dir
|
||
if args.single_image:
|
||
boot_path = pathlib.Path(args.single_image).expanduser()
|
||
if not boot_path.exists():
|
||
print("Custom image not found", file=sys.stderr)
|
||
sys.exit(1)
|
||
app_path = None
|
||
image_dir = None
|
||
if not (args.start_addr and args.end_addr):
|
||
print("Custom image requires --start and --end addresses (hex)", file=sys.stderr)
|
||
sys.exit(1)
|
||
start_addr = args.start_addr
|
||
end_addr = args.end_addr
|
||
elif args.boot or args.app:
|
||
if not (args.boot and args.app):
|
||
print("Both --boot and --app must be provided together", file=sys.stderr)
|
||
sys.exit(1)
|
||
boot_path = pathlib.Path(args.boot).expanduser()
|
||
app_path = pathlib.Path(args.app).expanduser()
|
||
if not boot_path.exists() or not app_path.exists():
|
||
print("Boot or app image not found", file=sys.stderr)
|
||
sys.exit(1)
|
||
# per-image flashing, so don't pass image-dir
|
||
image_dir = None
|
||
start_addr = None
|
||
end_addr = None
|
||
else:
|
||
boot_path = None
|
||
app_path = None
|
||
start_addr = None
|
||
end_addr = None
|
||
|
||
# If the user supplied an image directory (-i) but no explicit boot/app,
|
||
# map the standard images inside that directory to the fixed boot/app
|
||
# address ranges expected by AmebaPro3 (boot: 0x08000000-0x08040000,
|
||
# app: 0x08040000-0x08440000). This avoids relying on the profile's
|
||
# partition table size checks, which can reject larger binaries.
|
||
if image_dir and not (boot_path or app_path or args.single_image):
|
||
candidate_boot = pathlib.Path(image_dir) / "amebapro3_boot.bin"
|
||
candidate_app = pathlib.Path(image_dir) / "amebapro3_app.bin"
|
||
if candidate_boot.exists() and candidate_app.exists():
|
||
boot_path = candidate_boot
|
||
app_path = candidate_app
|
||
image_dir = None # switch to per-image flashing path below
|
||
start_addr = None
|
||
end_addr = None
|
||
|
||
# If invoked via a basename containing "reset" (e.g., pro3_reset), default to reset-only.
|
||
exe_stem = pathlib.Path(sys.argv[0]).stem.lower()
|
||
if (args.download_mode is None) and (not args.reset) and ("reset" in exe_stem):
|
||
args.reset = True
|
||
args.download_mode = 0
|
||
|
||
# Control-only flags: -dl / -r are meant for toggling modes, not flashing.
|
||
control_only = (args.download_mode is not None) or args.reset
|
||
|
||
# Optional: enable DTR/RTS driven auto boot/reset by patching settings load.
|
||
# We monkeypatch JsonUtils.load_from_file to inject the flags before flash.py reads Settings.json.
|
||
created_files = []
|
||
patched_load = False
|
||
|
||
# If control-only flags are provided alongside any flash inputs, bail out early.
|
||
if control_only and any([args.single_image, boot_path, app_path, image_dir]):
|
||
print("Cannot combine -dl/-r with flashing arguments; use them alone to toggle mode/reset.", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# Disable built-in DTR/RTS automation; we'll drive GPIO via pro3_gpio helper.
|
||
orig_load: Callable = JsonUtils.load_from_file
|
||
|
||
def patched_load(path, need_decrypt=True):
|
||
# Only inject for Settings.json; leave profiles untouched.
|
||
if os.path.basename(path) == "Settings.json":
|
||
# Settings.json in this project is plain JSON (not DES/base64),
|
||
# so force no-decrypt to avoid parse errors.
|
||
data = orig_load(path, False) or {}
|
||
data["AutoSwitchToDownloadModeWithDtrRts"] = 0
|
||
data["AutoResetDeviceWithDtrRts"] = 0
|
||
data["PostProcess"] = "NONE"
|
||
return data
|
||
return orig_load(path, need_decrypt)
|
||
|
||
JsonUtils.load_from_file = patched_load # type: ignore
|
||
patched_load = True
|
||
|
||
# Inline GPIO control using ASCII BOOT/RESET commands handled by AmebaSmart firmware.
|
||
def send_boot_reset(port: str, baud: int, idx: int, boot: int | None = None, reset: int | None = None):
|
||
"""
|
||
Issue ASCII commands understood by AmebaSmart CDC bridge.
|
||
Firmware uses "BOOT <idx> <0|1>" and "RESET <idx> <0|1>".
|
||
"""
|
||
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
|
||
if boot is not None:
|
||
ser.write(f"BOOT {idx} {boot}\r\n".encode())
|
||
if reset is not None:
|
||
ser.write(f"RESET {idx} {reset}\r\n".encode())
|
||
ser.flush()
|
||
|
||
def boot_seq(port: str, baud: int, idx: int):
|
||
"""
|
||
Enter download mode (dl=1):
|
||
boot=1 held throughout flash, reset pulse: boot 0 1 -> reset 0 1 -> reset 0 0 -> reset 0 1
|
||
Send within a single serial open to avoid CDC drop between steps. BOOT left asserted.
|
||
"""
|
||
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
|
||
ser.write(f"BOOT {idx} 1\r\n".encode()) # boot 0 1
|
||
ser.write(f"RESET {idx} 1\r\n".encode()) # reset 0 1
|
||
ser.flush()
|
||
time.sleep(0.1)
|
||
ser.write(f"RESET {idx} 0\r\n".encode()) # reset 0 0
|
||
ser.flush()
|
||
time.sleep(0.1)
|
||
ser.write(f"RESET {idx} 1\r\n".encode()) # reset 0 1
|
||
ser.flush()
|
||
time.sleep(0.2) # allow CDC re-enumeration into download; BOOT stays asserted
|
||
|
||
def reset_seq(port: str, baud: int, idx: int):
|
||
"""
|
||
Pulse RESET only (BOOT unchanged): reset=1 -> reset=0 -> reset=1
|
||
"""
|
||
send_boot_reset(port, baud, idx, reset=1)
|
||
time.sleep(0.05)
|
||
send_boot_reset(port, baud, idx, reset=0)
|
||
time.sleep(0.05)
|
||
send_boot_reset(port, baud, idx, reset=1)
|
||
|
||
def reset_exit_seq(port: str, baud: int, idx: int):
|
||
"""
|
||
Exit download mode (dl=0):
|
||
reset=1/boot=0 -> reset=0/boot=0 -> reset=1/boot=0 (final high).
|
||
"""
|
||
time.sleep(0.1) # small guard before exit sequence
|
||
with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser:
|
||
ser.write(f"BOOT {idx} 0\r\n".encode()) # boot 0 0
|
||
ser.flush()
|
||
time.sleep(0.2)
|
||
ser.write(f"RESET {idx} 1\r\n".encode()) # reset 0 1 (assert)
|
||
ser.flush()
|
||
time.sleep(0.2)
|
||
ser.write(f"RESET {idx} 0\r\n".encode()) # reset 0 0 (release)
|
||
ser.flush()
|
||
time.sleep(0.2)
|
||
ser.write(f"RESET {idx} 1\r\n".encode()) # final high
|
||
ser.flush()
|
||
time.sleep(0.5)
|
||
|
||
def release_to_normal(port: str, baud: int, idx: int):
|
||
"""After flashing: BOOT=0, RESET high, with retries for re-enum."""
|
||
reset_exit_seq(port, baud, idx)
|
||
time.sleep(0.2)
|
||
try:
|
||
send_boot_reset(port, baud, idx, boot=0)
|
||
send_boot_reset(port, baud, idx, reset=1)
|
||
except SerialException:
|
||
pass
|
||
|
||
# Reset-only / mode-only handling: if -dl/-r is provided without any images,
|
||
# just toggle the lines and exit without running flash.py.
|
||
if control_only:
|
||
ctrl_port = args.bridge_port or args.port
|
||
if not ctrl_port:
|
||
print("Bridge port (--port) is required for GPIO control", file=sys.stderr)
|
||
sys.exit(1)
|
||
idx = 0 # only UART index 0 drives BOOT/RESET; others are no-op per firmware
|
||
try:
|
||
if args.download_mode == 1:
|
||
boot_seq(ctrl_port, args.baudrate, idx)
|
||
elif args.download_mode == 0:
|
||
reset_exit_seq(ctrl_port, args.baudrate, idx)
|
||
elif args.reset:
|
||
reset_seq(ctrl_port, args.baudrate, idx) # retain BOOT state
|
||
sys.exit(0)
|
||
finally:
|
||
if patched_load:
|
||
JsonUtils.load_from_file = orig_load # type: ignore
|
||
|
||
def run_flash(argv_tail):
|
||
"""Invoke flash.main with a constructed argv list."""
|
||
original_argv = sys.argv
|
||
try:
|
||
sys.argv = ["flash.py"] + argv_tail
|
||
try:
|
||
flash.main(len(argv_tail), argv_tail)
|
||
return 0
|
||
except SystemExit as e:
|
||
return int(e.code) if isinstance(e.code, int) else 1
|
||
finally:
|
||
sys.argv = original_argv
|
||
|
||
def try_reset_port(port: str):
|
||
"""Best-effort reset using inline BOOT/RESET commands, then small delay."""
|
||
try:
|
||
reset_seq(port, args.baudrate)
|
||
time.sleep(0.5)
|
||
except SerialException:
|
||
pass
|
||
|
||
rc = 0
|
||
try:
|
||
flash_port = target_port or bridge_port
|
||
if not flash_port:
|
||
print("Target port (--target) or bridge port (--port) required for flashing", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
common = [
|
||
"-d",
|
||
"--profile", resolve_profile_path(args.profile),
|
||
"--baudrate", str(args.baudrate),
|
||
"--memory-type", args.memory_type,
|
||
"--log-level", "debug" if args.debug else "info",
|
||
]
|
||
if flash_port:
|
||
common += ["--port", flash_port]
|
||
common += args.extra
|
||
|
||
# If flashing (not control_only), pre-drive into download mode via bridge if available.
|
||
if not control_only and bridge_port:
|
||
boot_seq(bridge_port, args.baudrate, 0)
|
||
|
||
if args.single_image:
|
||
single_argv = common + [
|
||
"--image", str(boot_path),
|
||
"--start-address", start_addr,
|
||
"--end-address", end_addr,
|
||
]
|
||
run_flash(single_argv)
|
||
elif boot_path and app_path:
|
||
# Always build a custom partition table (base64-encoded JSON) so flash.py
|
||
# downloads both images in one session without intermediate reset.
|
||
partition_entries = [
|
||
{
|
||
"ImageName": str(boot_path),
|
||
"StartAddress": int("0x08000000", 16),
|
||
"EndAddress": int("0x08040000", 16),
|
||
"MemoryType": 1, # NOR
|
||
"FullErase": False,
|
||
"Mandatory": True,
|
||
"Description": boot_path.name,
|
||
},
|
||
{
|
||
"ImageName": str(app_path),
|
||
"StartAddress": int("0x08040000", 16),
|
||
"EndAddress": int("0x08440000", 16),
|
||
"MemoryType": 1, # NOR
|
||
"FullErase": False,
|
||
"Mandatory": True,
|
||
"Description": app_path.name,
|
||
},
|
||
]
|
||
partition_json = json.dumps(partition_entries)
|
||
partition_b64 = base64.b64encode(partition_json.encode()).decode()
|
||
|
||
merged_argv = common + [
|
||
"--partition-table", partition_b64,
|
||
]
|
||
rc = run_flash(merged_argv)
|
||
if rc != 0:
|
||
# Retry once: drive download again, then re-run single pass; if still fail, fallback two-step.
|
||
if bridge_port:
|
||
boot_seq(bridge_port, args.baudrate, 0)
|
||
rc = run_flash(merged_argv)
|
||
if rc != 0:
|
||
print("Single-pass flash failed (code {}). Aborting (no post-flash).".format(rc), file=sys.stderr)
|
||
sys.exit(rc)
|
||
if rc != 0:
|
||
sys.exit(rc)
|
||
else:
|
||
dir_argv = common.copy()
|
||
if image_dir:
|
||
dir_argv += ["--image-dir", image_dir]
|
||
rc = run_flash(dir_argv)
|
||
if rc != 0:
|
||
sys.exit(rc)
|
||
finally:
|
||
if patched_load:
|
||
JsonUtils.load_from_file = orig_load # type: ignore
|
||
# After successful flashing, release BOOT and reset to normal mode via bridge if available.
|
||
if not control_only:
|
||
ctrl_port = bridge_port or flash_port
|
||
if ctrl_port:
|
||
for _ in range(3):
|
||
try:
|
||
release_to_normal(ctrl_port, args.baudrate, 0)
|
||
break
|
||
except SerialException:
|
||
time.sleep(0.5)
|
||
else:
|
||
# Best-effort final deassert to cover missed bytes.
|
||
try:
|
||
send_boot_reset(ctrl_port, args.baudrate, 0, boot=0)
|
||
send_boot_reset(ctrl_port, args.baudrate, 0, reset=1)
|
||
except SerialException:
|
||
pass
|
||
# (optional cleanup of timing files could go here)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|