from __future__ import annotations import os import runpy import subprocess import sys import threading from pathlib import Path from typing import List, Optional from PySide6.QtCore import QObject, QThread, Signal class FlashRunner(QThread): output = Signal(str) finished = Signal(int) def __init__(self, args: List[str], flash_script: Path, parent: Optional[QObject] = None) -> None: super().__init__(parent) self._args = args self._flash_script = flash_script def run(self) -> None: # Prefer packaged FlashHelper.exe when frozen; fallback to inline or python. helper_exe = self._helper_executable() if helper_exe and helper_exe.exists(): cmd = [str(helper_exe)] + self._args elif getattr(sys, "frozen", False): exit_code = self._run_inline() self.finished.emit(exit_code) return else: cmd = [sys.executable, str(self._flash_script)] + self._args try: with subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, cwd=str(self._flash_script.parent), ) as proc: if proc.stdout: for line in proc.stdout: stripped = line.rstrip("\n") cleaned = self._strip_embedded_timestamp(stripped) if cleaned.strip(): self.output.emit(cleaned) if self.isInterruptionRequested(): proc.terminate() break if proc.poll() is None: proc.wait() self.finished.emit(proc.returncode or 0) except FileNotFoundError: self.output.emit("flash helper not found") self.finished.emit(1) def _run_inline(self) -> int: # Basic inline execution for packaged builds. writer = _SignalWriter(self.output.emit) argv_backup = list(sys.argv) sys.argv = [str(self._flash_script)] + self._args cwd_backup = Path.cwd() stdout_backup = sys.stdout stderr_backup = sys.stderr os_exit_backup = os._exit def _soft_exit(code=0): raise SystemExit(code) os.chdir(self._flash_script.parent) sys.stdout = writer # type: ignore[assignment] sys.stderr = writer # type: ignore[assignment] os._exit = _soft_exit # type: ignore[assignment] try: runpy.run_path(str(self._flash_script), run_name="__main__") writer.flush() return 0 except SystemExit as exc: # flash script might call sys.exit writer.flush() return int(exc.code or 0) finally: sys.argv = argv_backup os.chdir(cwd_backup) sys.stdout = stdout_backup sys.stderr = stderr_backup os._exit = os_exit_backup writer.close() def _helper_executable(self) -> Optional[Path]: if not getattr(sys, "frozen", False): return None base_dir = Path(sys.executable).resolve().parent candidates = [ base_dir / "FlashHelper.exe", base_dir / "FlashHelper", ] for c in candidates: if c.exists(): return c return None @staticmethod def _strip_embedded_timestamp(line: str) -> str: # Example: "[2026-02-05 10:39:05.391][I] [COM29] [main]Flash Version..." if not (line.startswith("[") and "]" in line): return line # Remove up to two leading bracketed segments (timestamp, level) trimmed = line for _ in range(2): if trimmed.startswith("[") and "]" in trimmed: trimmed = trimmed.split("]", 1)[1].lstrip() # Strip optional "[COMxx]" or "[main]" prefixes that follow while trimmed.startswith("[") and "]" in trimmed: prefix = trimmed.split("]", 1)[0] if prefix.startswith("[COM") or prefix.startswith("[main"): trimmed = trimmed.split("]", 1)[1].lstrip() continue break return trimmed class _SignalWriter: """File-like writer that forwards lines to a Qt signal.""" def __init__(self, emit_line) -> None: self._emit_line = emit_line self._buffer = "" self._lock = threading.Lock() self._closed = False def write(self, data: str) -> int: # type: ignore[override] if self._closed: return len(data) with self._lock: self._buffer += data while "\n" in self._buffer: line, self._buffer = self._buffer.split("\n", 1) cleaned = line.rstrip("\r") if cleaned.strip(): try: self._emit_line(cleaned) except RuntimeError: # Signal source deleted (thread exiting); drop quietly. self._closed = True break return len(data) def flush(self) -> None: # type: ignore[override] with self._lock: if self._buffer.strip() and not self._closed: try: self._emit_line(self._buffer.rstrip("\r\n")) except RuntimeError: self._closed = True self._buffer = "" def close(self) -> None: with self._lock: self._closed = True self._buffer = ""