#! /usr/bin/env python3 # -*- coding: utf-8 -*- """ Wrapper for flashing AmebaPro3 via a USB‑CDC bridge (e.g., AmebaSmart). It forces the flasher to treat the port as a plain UART so the ROM XMODEM protocol uses 1024‑byte STX frames instead of the 2048‑byte USB variant. Command line arguments are identical to flash.py. """ import os import sys import argparse import pathlib import time import json import base64 from typing import Callable import serial from serial import SerialException # Ensure local imports resolve to the sibling flash package files. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) if BASE_DIR not in sys.path: sys.path.insert(0, BASE_DIR) # Patch Ameba hooks to force UART mode and to prefer floader handshake first. import base.download_handler as download_handler # type: ignore from base.json_utils import JsonUtils # type: ignore from base.rt_settings import RtSettings # type: ignore from base.device_profile import RtkDeviceProfile # type: ignore from base.rtk_logging import create_logger # type: ignore from base.download_handler import ErrType # type: ignore from base.rtk_utils import RtkUtils # type: ignore from base import rom_handler as base_rom_handler # type: ignore # Force UART path (so STX = 1024B). download_handler.Ameba.is_realtek_usb = lambda self: False def _check_download_mode_floader_first(self): """Prefer floader handshake, then fall back to ROM + reset sequence.""" ErrType = download_handler.ErrType CmdEsc = download_handler.CmdEsc CmdSetBackupRegister = download_handler.CmdSetBackupRegister CmdResetIntoDownloadMode = download_handler.CmdResetIntoDownloadMode ret = ErrType.SYS_IO is_floader = False boot_delay = self.setting.usb_rom_boot_delay_in_second if self.profile_info.support_usb_download else self.setting.rom_boot_delay_in_second self.logger.debug(f"Check download mode with baudrate {self.serial_port.baudrate}") retry = 0 while retry < 3: retry += 1 try: # 1) See if flashloader is already running (common after stage-1). self.logger.debug(f"Check whether in floader with baudrate {self.baudrate}") ret, status = self.floader_handler.sense(self.setting.sync_response_timeout_in_second) if ret == ErrType.OK: is_floader = True self.logger.debug("Floader handshake ok") break else: self.logger.debug(f"Floader handshake fail: {ret}") # 2) ROM handshake. self.logger.debug(f"Check whether in rom download mode") ret = self.rom_handler.handshake() if ret == ErrType.OK: self.logger.debug(f"Handshake ok, device in rom download mode") break # 3) Try to reset into ROM download mode (UART path only). if not self.is_usb: self.logger.debug( f'Assume in application or ROM normal mode with baudrate {self.profile_info.log_baudrate}') self.switch_baudrate(self.profile_info.log_baudrate, self.setting.baudrate_switch_delay_in_second) self.logger.debug("Try to reset device...") self.serial_port.flushOutput() self.write_bytes(CmdEsc) time.sleep(0.1) if self.profile_info.is_amebad(): self.serial_port.flushOutput() self.write_bytes(CmdSetBackupRegister) time.sleep(0.1) self.serial_port.flushOutput() self.write_bytes(CmdResetIntoDownloadMode) self.switch_baudrate(self.profile_info.handshake_baudrate, boot_delay, True) self.logger.debug( f'Check whether reset in ROM download mode with baudrate {self.profile_info.handshake_baudrate}') ret = self.rom_handler.handshake() if ret == ErrType.OK: self.logger.debug("Handshake ok, device in ROM download mode") break else: self.logger.debug("Handshake fail, cannot enter UART download mode") self.switch_baudrate(self.baudrate, self.setting.baudrate_switch_delay_in_second, True) except Exception as err: self.logger.error(f"Check download mode exception: {err}") return ret, is_floader download_handler.Ameba.check_download_mode = _check_download_mode_floader_first # --------------------------------------------------------------------------- # Resource path patches (packaged exe support) # --------------------------------------------------------------------------- _orig_get_root = RtkUtils.get_executable_root_path def _patched_get_root(): """ Ensure flash assets resolve in both source and PyInstaller builds. Priority: 1) _MEIPASS/Flash (onefile extraction) 2) dir of the running executable + /Flash 3) fallback to original logic """ # PyInstaller onefile: files are extracted to _MEIPASS mei = getattr(sys, "_MEIPASS", None) if mei: flash_dir = os.path.join(mei, "Flash") if os.path.isdir(flash_dir): return flash_dir # PyInstaller onedir: Flash sits next to the exe exe_dir = os.path.dirname(os.path.abspath(sys.executable)) flash_dir = os.path.join(exe_dir, "Flash") if os.path.isdir(flash_dir): return flash_dir return _orig_get_root() RtkUtils.get_executable_root_path = staticmethod(_patched_get_root) def _patched_get_floader_path(self): """ Wrapper around RomHandler.get_floader_path to retry common packaged locations without touching base code. Uses patched root (above) first, then falls back to the original implementation for source runs. """ # Prefer patched root (covers _MEIPASS/Flash and exe_dir/Flash) root = RtkUtils.get_executable_root_path() if self.profile.floader: candidate = os.path.realpath( os.path.join(root, base_rom_handler.FloaderDictionary, self.profile.floader) ) if os.path.exists(candidate): return candidate # Fallback: original behavior return _orig_rom_get_floader_path(self) _orig_rom_get_floader_path = base_rom_handler.RomHandler.get_floader_path base_rom_handler.RomHandler.get_floader_path = _patched_get_floader_path # Reuse the original entry point and argument parsing. import flash # type: ignore def resolve_profile_path(path: str) -> str: """Return an absolute profile path. If relative and not found from CWD, try relative to BASE_DIR.""" p = pathlib.Path(path) if p.is_absolute(): return str(p) if p.exists(): return str(p.resolve()) alt = pathlib.Path(BASE_DIR) / path if alt.exists(): return str(alt.resolve()) return str(p) # let flash.py error out visibly def main(): default_profile = str((pathlib.Path(BASE_DIR) / "Devices/Profiles/AmebaPro3_FreeRTOS_NOR.rdev").resolve()) parser = argparse.ArgumentParser( description="AmebaPro3 flasher wrapper (UART mode via USB-CDC bridge).", add_help=True, ) parser.add_argument("-p", "--port", help="CDC device of AmebaSmart bridge for BOOT/RESET commands (e.g. /dev/ttyACM0 or COM29)") parser.add_argument("-P", "--bridge-port", dest="bridge_port", help="Alias for --port (bridge CDC device), if both set, --bridge-port wins") parser.add_argument("-t", "--target", dest="target_port", help="Serial/CDC device to the target SoC for flashing (passed to flash.py)") parser.add_argument("-B", "--baudrate", type=int, default=1500000, help="Baudrate (default 1500000)") parser.add_argument("-m", "--memory-type", default="nor", choices=["nor", "nand", "ram"], help="Memory type") parser.add_argument("--profile", default=default_profile, help="Device profile (.rdev)") parser.add_argument("-b", "--boot", help="Boot image path (e.g. amebapro3_boot.bin)") parser.add_argument("-a", "--app", help="App image path (e.g. amebapro3_app.bin)") parser.add_argument("-i", "--image-dir", dest="image_dir", help="Override image directory (skips -a/-b)") parser.add_argument("-bin", dest="single_image", help="Single image path (custom address mode)") parser.add_argument("-s", "--start", dest="start_addr", help="Start address (hex) for single image") parser.add_argument("-e", "--end", dest="end_addr", help="End address (hex) for single image") parser.add_argument("--debug", action="store_true", help="Set log-level to debug") parser.add_argument("--auto-dtr", action="store_true", help="(Deprecated) legacy auto DTR/RTS toggle; ignored when using GPIO helper.") parser.add_argument("-r", "--reset", action="store_true", help="Force reset to normal mode after flashing (post-process RESET). Default unless -dl 1.") parser.add_argument("-dl", "--download-mode", type=int, choices=[0, 1], help="1: stay in download mode (no reset). 0: exit to normal mode (reset).") parser.add_argument("extra", nargs=argparse.REMAINDER, help="Additional args passed through to flash.py") args = parser.parse_args() def uart_index_from_port(port: str) -> int | None: # Extract trailing digits; COM29 -> 29, /dev/ttyACM0 -> 0, else None import re m = re.search(r'(\d+)$', port or "") return int(m.group(1)) if m else None bridge_port = args.bridge_port or args.port target_port = args.target_port # Decide image directory vs per-image flashing. image_dir = args.image_dir if args.single_image: boot_path = pathlib.Path(args.single_image).expanduser() if not boot_path.exists(): print("Custom image not found", file=sys.stderr) sys.exit(1) app_path = None image_dir = None if not (args.start_addr and args.end_addr): print("Custom image requires --start and --end addresses (hex)", file=sys.stderr) sys.exit(1) start_addr = args.start_addr end_addr = args.end_addr elif args.boot or args.app: if not (args.boot and args.app): print("Both --boot and --app must be provided together", file=sys.stderr) sys.exit(1) boot_path = pathlib.Path(args.boot).expanduser() app_path = pathlib.Path(args.app).expanduser() if not boot_path.exists() or not app_path.exists(): print("Boot or app image not found", file=sys.stderr) sys.exit(1) # per-image flashing, so don't pass image-dir image_dir = None start_addr = None end_addr = None else: boot_path = None app_path = None start_addr = None end_addr = None # If the user supplied an image directory (-i) but no explicit boot/app, # map the standard images inside that directory to the fixed boot/app # address ranges expected by AmebaPro3 (boot: 0x08000000-0x08040000, # app: 0x08040000-0x08440000). This avoids relying on the profile's # partition table size checks, which can reject larger binaries. if image_dir and not (boot_path or app_path or args.single_image): candidate_boot = pathlib.Path(image_dir) / "amebapro3_boot.bin" candidate_app = pathlib.Path(image_dir) / "amebapro3_app.bin" if candidate_boot.exists() and candidate_app.exists(): boot_path = candidate_boot app_path = candidate_app image_dir = None # switch to per-image flashing path below start_addr = None end_addr = None # If invoked via a basename containing "reset" (e.g., pro3_reset), default to reset-only. exe_stem = pathlib.Path(sys.argv[0]).stem.lower() if (args.download_mode is None) and (not args.reset) and ("reset" in exe_stem): args.reset = True args.download_mode = 0 # Control-only flags: -dl / -r are meant for toggling modes, not flashing. control_only = (args.download_mode is not None) or args.reset # Optional: enable DTR/RTS driven auto boot/reset by patching settings load. # We monkeypatch JsonUtils.load_from_file to inject the flags before flash.py reads Settings.json. created_files = [] patched_load = False # If control-only flags are provided alongside any flash inputs, bail out early. if control_only and any([args.single_image, boot_path, app_path, image_dir]): print("Cannot combine -dl/-r with flashing arguments; use them alone to toggle mode/reset.", file=sys.stderr) sys.exit(1) # Disable built-in DTR/RTS automation; we'll drive GPIO via pro3_gpio helper. orig_load: Callable = JsonUtils.load_from_file def patched_load(path, need_decrypt=True): # Only inject for Settings.json; leave profiles untouched. if os.path.basename(path) == "Settings.json": # Settings.json in this project is plain JSON (not DES/base64), # so force no-decrypt to avoid parse errors. data = orig_load(path, False) or {} data["AutoSwitchToDownloadModeWithDtrRts"] = 0 data["AutoResetDeviceWithDtrRts"] = 0 data["PostProcess"] = "NONE" return data return orig_load(path, need_decrypt) JsonUtils.load_from_file = patched_load # type: ignore patched_load = True # Inline GPIO control using ASCII BOOT/RESET commands handled by AmebaSmart firmware. def send_boot_reset(port: str, baud: int, idx: int, boot: int | None = None, reset: int | None = None): """ Issue ASCII commands understood by AmebaSmart CDC bridge. Firmware uses "BOOT <0|1>" and "RESET <0|1>". """ with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser: if boot is not None: ser.write(f"BOOT {idx} {boot}\r\n".encode()) if reset is not None: ser.write(f"RESET {idx} {reset}\r\n".encode()) ser.flush() def boot_seq(port: str, baud: int, idx: int): """ Enter download mode (dl=1): boot=1 held throughout flash, reset pulse: boot 0 1 -> reset 0 1 -> reset 0 0 -> reset 0 1 Send within a single serial open to avoid CDC drop between steps. BOOT left asserted. """ with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser: ser.write(f"BOOT {idx} 1\r\n".encode()) # boot 0 1 ser.write(f"RESET {idx} 1\r\n".encode()) # reset 0 1 ser.flush() time.sleep(0.1) ser.write(f"RESET {idx} 0\r\n".encode()) # reset 0 0 ser.flush() time.sleep(0.1) ser.write(f"RESET {idx} 1\r\n".encode()) # reset 0 1 ser.flush() time.sleep(0.2) # allow CDC re-enumeration into download; BOOT stays asserted def reset_seq(port: str, baud: int, idx: int): """ Pulse RESET only (BOOT unchanged): reset=1 -> reset=0 -> reset=1 """ send_boot_reset(port, baud, idx, reset=1) time.sleep(0.05) send_boot_reset(port, baud, idx, reset=0) time.sleep(0.05) send_boot_reset(port, baud, idx, reset=1) def reset_exit_seq(port: str, baud: int, idx: int): """ Exit download mode (dl=0): reset=1/boot=0 -> reset=0/boot=0 -> reset=1/boot=0 (final high). """ time.sleep(0.1) # small guard before exit sequence with serial.Serial(port=port, baudrate=baud, timeout=0.5, rtscts=False, dsrdtr=False) as ser: ser.write(f"BOOT {idx} 0\r\n".encode()) # boot 0 0 ser.flush() time.sleep(0.2) ser.write(f"RESET {idx} 1\r\n".encode()) # reset 0 1 (assert) ser.flush() time.sleep(0.2) ser.write(f"RESET {idx} 0\r\n".encode()) # reset 0 0 (release) ser.flush() time.sleep(0.2) ser.write(f"RESET {idx} 1\r\n".encode()) # final high ser.flush() time.sleep(0.5) def release_to_normal(port: str, baud: int, idx: int): """After flashing: BOOT=0, RESET high, with retries for re-enum.""" reset_exit_seq(port, baud, idx) time.sleep(0.2) try: send_boot_reset(port, baud, idx, boot=0) send_boot_reset(port, baud, idx, reset=1) except SerialException: pass # Reset-only / mode-only handling: if -dl/-r is provided without any images, # just toggle the lines and exit without running flash.py. if control_only: ctrl_port = args.bridge_port or args.port if not ctrl_port: print("Bridge port (--port) is required for GPIO control", file=sys.stderr) sys.exit(1) idx = 0 # only UART index 0 drives BOOT/RESET; others are no-op per firmware try: if args.download_mode == 1: boot_seq(ctrl_port, args.baudrate, idx) elif args.download_mode == 0: reset_exit_seq(ctrl_port, args.baudrate, idx) elif args.reset: reset_seq(ctrl_port, args.baudrate, idx) # retain BOOT state sys.exit(0) finally: if patched_load: JsonUtils.load_from_file = orig_load # type: ignore def run_flash(argv_tail): """Invoke flash.main with a constructed argv list.""" original_argv = sys.argv try: sys.argv = ["flash.py"] + argv_tail try: flash.main(len(argv_tail), argv_tail) return 0 except SystemExit as e: return int(e.code) if isinstance(e.code, int) else 1 finally: sys.argv = original_argv def try_reset_port(port: str): """Best-effort reset using inline BOOT/RESET commands, then small delay.""" try: reset_seq(port, args.baudrate) time.sleep(0.5) except SerialException: pass rc = 0 try: flash_port = target_port or bridge_port if not flash_port: print("Target port (--target) or bridge port (--port) required for flashing", file=sys.stderr) sys.exit(1) common = [ "-d", "--profile", resolve_profile_path(args.profile), "--baudrate", str(args.baudrate), "--memory-type", args.memory_type, "--log-level", "debug" if args.debug else "info", ] if flash_port: common += ["--port", flash_port] common += args.extra # If flashing (not control_only), pre-drive into download mode via bridge if available. if not control_only and bridge_port: boot_seq(bridge_port, args.baudrate, 0) if args.single_image: single_argv = common + [ "--image", str(boot_path), "--start-address", start_addr, "--end-address", end_addr, ] run_flash(single_argv) elif boot_path and app_path: # Always build a custom partition table (base64-encoded JSON) so flash.py # downloads both images in one session without intermediate reset. partition_entries = [ { "ImageName": str(boot_path), "StartAddress": int("0x08000000", 16), "EndAddress": int("0x08040000", 16), "MemoryType": 1, # NOR "FullErase": False, "Mandatory": True, "Description": boot_path.name, }, { "ImageName": str(app_path), "StartAddress": int("0x08040000", 16), "EndAddress": int("0x08440000", 16), "MemoryType": 1, # NOR "FullErase": False, "Mandatory": True, "Description": app_path.name, }, ] partition_json = json.dumps(partition_entries) partition_b64 = base64.b64encode(partition_json.encode()).decode() merged_argv = common + [ "--partition-table", partition_b64, ] rc = run_flash(merged_argv) if rc != 0: # Retry once: drive download again, then re-run single pass; if still fail, fallback two-step. if bridge_port: boot_seq(bridge_port, args.baudrate, 0) rc = run_flash(merged_argv) if rc != 0: print("Single-pass flash failed (code {}). Aborting (no post-flash).".format(rc), file=sys.stderr) sys.exit(rc) if rc != 0: sys.exit(rc) else: dir_argv = common.copy() if image_dir: dir_argv += ["--image-dir", image_dir] rc = run_flash(dir_argv) if rc != 0: sys.exit(rc) finally: if patched_load: JsonUtils.load_from_file = orig_load # type: ignore # After successful flashing, release BOOT and reset to normal mode via bridge if available. if not control_only: ctrl_port = bridge_port or flash_port if ctrl_port: for _ in range(3): try: release_to_normal(ctrl_port, args.baudrate, 0) break except SerialException: time.sleep(0.5) else: # Best-effort final deassert to cover missed bytes. try: send_boot_reset(ctrl_port, args.baudrate, 0, boot=0) send_boot_reset(ctrl_port, args.baudrate, 0, reset=1) except SerialException: pass # (optional cleanup of timing files could go here) if __name__ == "__main__": main()