from __future__ import annotations import sys from collections import deque from pathlib import Path from typing import Callable, Deque, List, Optional, Tuple, TYPE_CHECKING import logging from PySide6.QtCore import QObject, QTimer, Slot from PySide6.QtWidgets import QFileDialog logger = logging.getLogger(__name__) from ameba_control_panel import config from ameba_control_panel.config import Direction from ameba_control_panel.services.log_buffer import LogBuffer, LogLine from ameba_control_panel.services.search_service import SearchWorker if TYPE_CHECKING: from ameba_control_panel.views.device_tab_view import DeviceTabView _MAX_PREFIX_STRIP_ITERATIONS = 10 _SUPPRESSED_LOG_MSG = "Flash helper completed with code 0" class LogManager(QObject): """Handles log buffering, flushing to view, searching, and save.""" def __init__(self, view: DeviceTabView, alive: Callable[[], bool], parent: QObject | None = None) -> None: super().__init__(parent) self.view = view self._alive = alive self.buffer = LogBuffer() self._pending: Deque[Tuple[str, Direction]] = deque() self._search_worker: Optional[SearchWorker] = None self._matches: List[int] = [] self._match_index = -1 self._flush_timer = QTimer(self) self._flush_timer.setInterval(config.LOG_FLUSH_INTERVAL_MS) self._flush_timer.timeout.connect(self._flush_pending) self._flush_timer.start() def enqueue_line(self, text: str, direction: Direction | str) -> None: if not self._alive(): return cleaned = "".join(ch for ch in text if ch.isprintable()) if not cleaned.strip(): return if direction == Direction.INFO and cleaned.startswith("[") and "]" in cleaned: cleaned = self._strip_info_prefixes(cleaned) if direction == Direction.INFO and _SUPPRESSED_LOG_MSG in cleaned: return self._pending.append((cleaned, Direction(direction))) @staticmethod def _strip_info_prefixes(text: str) -> str: for _ in range(2): if text.startswith("[") and "]" in text: text = text.split("]", 1)[1].lstrip() for _ in range(_MAX_PREFIX_STRIP_ITERATIONS): if not (text.startswith("[") and "]" in text): break prefix = text.split("]", 1)[0] if prefix.startswith("[COM") or prefix.startswith("[/dev/") or prefix.startswith("[main"): text = text.split("]", 1)[1].lstrip() else: break return text def _flush_pending(self) -> None: if not self._alive() or not self._pending: # Reset to normal interval when idle if self._flush_timer.interval() != config.LOG_FLUSH_INTERVAL_MS: self._flush_timer.setInterval(config.LOG_FLUSH_INTERVAL_MS) return to_flush: List[LogLine] = [] count = 0 while self._pending and count < config.LOG_FLUSH_BATCH_LIMIT: text, direction = self._pending.popleft() if not text.strip(): continue line = self.buffer.append(text, direction) to_flush.append(line) count += 1 visible = [line for line in to_flush if line.direction != Direction.TX] if visible: self.view.log_view.append_lines(visible) # Adaptive: slow down flush when queue is heavy to reduce UI stalls pending_count = len(self._pending) if pending_count > 500: self._flush_timer.setInterval(200) elif pending_count > 100: self._flush_timer.setInterval(100) else: self._flush_timer.setInterval(config.LOG_FLUSH_INTERVAL_MS) def clear(self) -> None: self.buffer.clear() self.view.log_view.clear_log() self._matches.clear() self._match_index = -1 self.view.log_view.set_matches([], -1) def save(self) -> None: dlg = QFileDialog(self.view, "Save Log", str(Path.home() / "ameba_log.txt")) dlg.setAcceptMode(QFileDialog.AcceptMode.AcceptSave) if sys.platform == "linux": dlg.setOption(QFileDialog.Option.DontUseNativeDialog, True) if dlg.exec() != QFileDialog.DialogCode.Accepted or not dlg.selectedFiles(): return path = dlg.selectedFiles()[0] if not path: return try: Path(path).write_text(self.buffer.as_text(full=True), encoding="utf-8") self.enqueue_line(f"Saved log to {path}", Direction.INFO) except OSError as exc: logger.error("Failed to save log: %s", exc) from PySide6.QtWidgets import QMessageBox QMessageBox.warning(self.view, "Save Log", f"Failed to save: {exc}") # Search ------------------------------------------------------------------- def run_find(self) -> None: if self._search_worker and self._search_worker.isRunning(): return needle = self.view.find_input.text() self.view.log_view.set_needle(needle, self.view.case_checkbox.isChecked()) if not needle: self.view.log_view.set_matches([], -1) self._matches = [] self._match_index = -1 return lines = [l.as_display() for l in self.view.log_view.displayed_lines()] self._search_worker = SearchWorker(lines, needle, self.view.case_checkbox.isChecked()) self._search_worker.finished.connect(self._on_search_finished) self._search_worker.start() @Slot(list) def _on_search_finished(self, rows: List[int]) -> None: if not self._alive(): return self._matches = rows self._match_index = 0 if rows else -1 self.view.log_view.set_matches(rows, self._match_index) self._scroll_to_match() self._search_worker = None def find_next(self) -> None: self._advance_match(1) def find_prev(self) -> None: self._advance_match(-1) def _advance_match(self, direction: int) -> None: if not self._matches: self.run_find() return n = len(self._matches) self._match_index = (self._match_index + direction) % n if n else 0 self.view.log_view.set_matches(self._matches, self._match_index) self._scroll_to_match() def _scroll_to_match(self) -> None: if self._match_index < 0 or not self._matches: return row = self._matches[self._match_index] doc = self.view.log_view.document() block = doc.findBlockByNumber(row) if not block.isValid(): return cursor = self.view.log_view.textCursor() cursor.setPosition(block.position()) self.view.log_view.setTextCursor(cursor) self.view.log_view.centerCursor() def shutdown(self) -> None: self._flush_timer.stop() if self._search_worker and self._search_worker.isRunning(): self._search_worker.wait(1000)