Major refactor of Ameba Control Panel v3.1.0: - Three-column layout: icon sidebar, config+history, log view - Dracula PRO theme with light/dark toggle - DTR/RTS GPIO control (replaces ASCII commands) - Multi-CDC firmware support for AmebaSmart control device - Dynamic DUT tabs with +/- management - NN Model flash image support - Settings dialog (Font, Serial, Flash, Command tabs) - Background port scanning, debounced session store - Adaptive log flush rate, format cache optimization - Smooth sidebar animation, deferred startup - pytest test framework with session/log/settings tests - Thread safety fixes: _alive guards, parented timers, safe baud parsing - Find highlight: needle-only highlighting with focused match color - Partial line buffering for table output - PyInstaller packaging with version stamp and module exclusions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
175 lines
6.6 KiB
Python
175 lines
6.6 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
from pathlib import Path
|
|
from typing import Callable, Deque, List, Optional, Tuple, TYPE_CHECKING
|
|
|
|
import logging
|
|
|
|
from PySide6.QtCore import QObject, QTimer, Slot
|
|
from PySide6.QtWidgets import QFileDialog
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from ameba_control_panel import config
|
|
from ameba_control_panel.config import Direction
|
|
from ameba_control_panel.services.log_buffer import LogBuffer, LogLine
|
|
from ameba_control_panel.services.search_service import SearchWorker
|
|
|
|
if TYPE_CHECKING:
|
|
from ameba_control_panel.views.device_tab_view import DeviceTabView
|
|
|
|
_MAX_PREFIX_STRIP_ITERATIONS = 10
|
|
_SUPPRESSED_LOG_MSG = "Flash helper completed with code 0"
|
|
|
|
|
|
class LogManager(QObject):
|
|
"""Handles log buffering, flushing to view, searching, and save."""
|
|
|
|
def __init__(self, view: DeviceTabView, alive: Callable[[], bool], parent: QObject | None = None) -> None:
|
|
super().__init__(parent)
|
|
self.view = view
|
|
self._alive = alive
|
|
self.buffer = LogBuffer()
|
|
self._pending: Deque[Tuple[str, Direction]] = deque()
|
|
self._search_worker: Optional[SearchWorker] = None
|
|
self._matches: List[int] = []
|
|
self._match_index = -1
|
|
|
|
self._flush_timer = QTimer(self)
|
|
self._flush_timer.setInterval(config.LOG_FLUSH_INTERVAL_MS)
|
|
self._flush_timer.timeout.connect(self._flush_pending)
|
|
self._flush_timer.start()
|
|
|
|
def enqueue_line(self, text: str, direction: Direction | str) -> None:
|
|
if not self._alive():
|
|
return
|
|
cleaned = "".join(ch for ch in text if ch.isprintable())
|
|
if not cleaned.strip():
|
|
return
|
|
if direction == Direction.INFO and cleaned.startswith("[") and "]" in cleaned:
|
|
cleaned = self._strip_info_prefixes(cleaned)
|
|
if direction == Direction.INFO and _SUPPRESSED_LOG_MSG in cleaned:
|
|
return
|
|
self._pending.append((cleaned, Direction(direction)))
|
|
|
|
@staticmethod
|
|
def _strip_info_prefixes(text: str) -> str:
|
|
for _ in range(2):
|
|
if text.startswith("[") and "]" in text:
|
|
text = text.split("]", 1)[1].lstrip()
|
|
for _ in range(_MAX_PREFIX_STRIP_ITERATIONS):
|
|
if not (text.startswith("[") and "]" in text):
|
|
break
|
|
prefix = text.split("]", 1)[0]
|
|
if prefix.startswith("[COM") or prefix.startswith("[main"):
|
|
text = text.split("]", 1)[1].lstrip()
|
|
else:
|
|
break
|
|
return text
|
|
|
|
def _flush_pending(self) -> None:
|
|
if not self._alive() or not self._pending:
|
|
# Reset to normal interval when idle
|
|
if self._flush_timer.interval() != config.LOG_FLUSH_INTERVAL_MS:
|
|
self._flush_timer.setInterval(config.LOG_FLUSH_INTERVAL_MS)
|
|
return
|
|
to_flush: List[LogLine] = []
|
|
count = 0
|
|
while self._pending and count < config.LOG_FLUSH_BATCH_LIMIT:
|
|
text, direction = self._pending.popleft()
|
|
if not text.strip():
|
|
continue
|
|
line = self.buffer.append(text, direction)
|
|
to_flush.append(line)
|
|
count += 1
|
|
visible = [line for line in to_flush if line.direction != Direction.TX]
|
|
if visible:
|
|
self.view.log_view.append_lines(visible)
|
|
# Adaptive: slow down flush when queue is heavy to reduce UI stalls
|
|
pending_count = len(self._pending)
|
|
if pending_count > 500:
|
|
self._flush_timer.setInterval(200)
|
|
elif pending_count > 100:
|
|
self._flush_timer.setInterval(100)
|
|
else:
|
|
self._flush_timer.setInterval(config.LOG_FLUSH_INTERVAL_MS)
|
|
|
|
def clear(self) -> None:
|
|
self.buffer.clear()
|
|
self.view.log_view.clear_log()
|
|
self._matches.clear()
|
|
self._match_index = -1
|
|
self.view.log_view.set_matches([], -1)
|
|
|
|
def save(self) -> None:
|
|
path, _ = QFileDialog.getSaveFileName(self.view, "Save Log", str(Path.home() / "ameba_log.txt"))
|
|
if not path:
|
|
return
|
|
try:
|
|
Path(path).write_text(self.buffer.as_text(full=True), encoding="utf-8")
|
|
self.enqueue_line(f"Saved log to {path}", Direction.INFO)
|
|
except OSError as exc:
|
|
logger.error("Failed to save log: %s", exc)
|
|
from PySide6.QtWidgets import QMessageBox
|
|
QMessageBox.warning(self.view, "Save Log", f"Failed to save: {exc}")
|
|
|
|
# Search -------------------------------------------------------------------
|
|
def run_find(self) -> None:
|
|
if self._search_worker and self._search_worker.isRunning():
|
|
return
|
|
needle = self.view.find_input.text()
|
|
self.view.log_view.set_needle(needle, self.view.case_checkbox.isChecked())
|
|
if not needle:
|
|
self.view.log_view.set_matches([], -1)
|
|
self._matches = []
|
|
self._match_index = -1
|
|
return
|
|
lines = [l.as_display() for l in self.view.log_view.displayed_lines()]
|
|
self._search_worker = SearchWorker(lines, needle, self.view.case_checkbox.isChecked())
|
|
self._search_worker.finished.connect(self._on_search_finished)
|
|
self._search_worker.start()
|
|
|
|
@Slot(list)
|
|
def _on_search_finished(self, rows: List[int]) -> None:
|
|
if not self._alive():
|
|
return
|
|
self._matches = rows
|
|
self._match_index = 0 if rows else -1
|
|
self.view.log_view.set_matches(rows, self._match_index)
|
|
self._scroll_to_match()
|
|
self._search_worker = None
|
|
|
|
def find_next(self) -> None:
|
|
self._advance_match(1)
|
|
|
|
def find_prev(self) -> None:
|
|
self._advance_match(-1)
|
|
|
|
def _advance_match(self, direction: int) -> None:
|
|
if not self._matches:
|
|
self.run_find()
|
|
return
|
|
n = len(self._matches)
|
|
self._match_index = (self._match_index + direction) % n if n else 0
|
|
self.view.log_view.set_matches(self._matches, self._match_index)
|
|
self._scroll_to_match()
|
|
|
|
def _scroll_to_match(self) -> None:
|
|
if self._match_index < 0 or not self._matches:
|
|
return
|
|
row = self._matches[self._match_index]
|
|
doc = self.view.log_view.document()
|
|
block = doc.findBlockByNumber(row)
|
|
if not block.isValid():
|
|
return
|
|
cursor = self.view.log_view.textCursor()
|
|
cursor.setPosition(block.position())
|
|
self.view.log_view.setTextCursor(cursor)
|
|
self.view.log_view.centerCursor()
|
|
|
|
def shutdown(self) -> None:
|
|
self._flush_timer.stop()
|
|
if self._search_worker and self._search_worker.isRunning():
|
|
self._search_worker.wait(1000)
|