diff --git a/tui.py b/tui.py index 336bad7..d9311b5 100644 --- a/tui.py +++ b/tui.py @@ -137,51 +137,83 @@ def snapshot_now(dataset: str, keep: int = 7) -> tuple[bool, str]: # ----------------------- Cron helpers ----------------------- +# All TUI-managed cron entries live in /etc/cron.d/pve-tui (single owner file). +# Root's crontab is never modified by this app, except by the one-shot +# migrate_legacy_crontab() which moves any pre-existing pve-tui entries into +# the new file. -CRON_MARKER_BEGIN = "# === PVE-TUI AUTO SNAPSHOT BEGIN ===" -CRON_MARKER_END = "# === PVE-TUI AUTO SNAPSHOT END ===" +CRON_FILE = Path("/etc/cron.d/pve-tui") +SNAPSHOT_SCRIPT = Path("/usr/local/bin/pve-tui-snapshot.sh") +CRON_HEADER = """# Auto-generated by pve-tui — do not edit manually. +# Manage entries via the TUI Schedules / Scrub tabs. +SHELL=/bin/bash +PATH=/usr/sbin:/usr/bin:/sbin:/bin +MAILTO=\"\" +""" -def read_crontab() -> str: - rc, out, _ = run(["crontab", "-l"]) - return out if rc == 0 else "" - - -def write_crontab(content: str) -> bool: +def _read_cron_lines() -> list[str]: + if not CRON_FILE.exists(): + return [] try: - p = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, - text=True) - p.communicate(content) - return p.returncode == 0 + return CRON_FILE.read_text().splitlines() + except Exception: + return [] + + +def _parse_cron_file() -> tuple[list[dict], list[dict]]: + """Parse CRON_FILE → (snapshot_entries, scrub_entries).""" + snapshots: list[dict] = [] + scrubs: list[dict] = [] + snap_re = re.compile( + r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+\S+\s+/usr/local/bin/pve-tui-snapshot\.sh\s+(\S+)\s+(\d+)" + ) + scrub_re = re.compile( + r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+\S+\s+/usr/sbin/zpool\s+scrub\s+(\S+)\s*$" + ) + for raw in _read_cron_lines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + first = line.split()[0] + if "=" in first: + continue + m = snap_re.match(line) + if m: + snapshots.append( + {"cron": m.group(1), "dataset": m.group(2), "keep": m.group(3)} + ) + continue + m = scrub_re.match(line) + if m: + scrubs.append({"cron": m.group(1), "pool": m.group(2)}) + return snapshots, scrubs + + +def _write_cron_file(snapshots: list[dict], scrubs: list[dict]) -> bool: + parts = [CRON_HEADER.rstrip(), ""] + if snapshots: + parts.append("# Snapshot schedules (auto-prune keeps last N)") + for e in snapshots: + parts.append( + f"{e['cron']} root {SNAPSHOT_SCRIPT} {e['dataset']} {e['keep']}" + ) + parts.append("") + if scrubs: + parts.append("# Scrub schedules") + for e in scrubs: + parts.append(f"{e['cron']} root /usr/sbin/zpool scrub {e['pool']}") + parts.append("") + try: + CRON_FILE.parent.mkdir(parents=True, exist_ok=True) + CRON_FILE.write_text("\n".join(parts) + "\n") + CRON_FILE.chmod(0o644) + return True except Exception: return False -def get_schedules() -> list[dict]: - """Parse our managed cron entries.""" - cron = read_crontab() - schedules = [] - in_block = False - for line in cron.splitlines(): - if CRON_MARKER_BEGIN in line: - in_block = True - continue - if CRON_MARKER_END in line: - in_block = False - continue - if in_block and line.strip() and not line.startswith("#"): - # Format: /usr/local/bin/pve-tui-snapshot.sh - m = re.match(r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+(.*snapshot\.sh)\s+(\S+)\s+(\d+)", line) - if m: - schedules.append({ - "cron": m.group(1), - "dataset": m.group(3), - "keep": m.group(4), - }) - return schedules - - -def install_snapshot_script(): +def install_snapshot_script() -> None: script = """#!/bin/bash # Auto-generated by pve-tui. Creates snapshot and prunes old ones. DATASET="$1" @@ -196,7 +228,6 @@ fi TS=$(date +%Y%m%d-%H%M%S) /usr/sbin/zfs snapshot "${DATASET}@${PREFIX}-${TS}" -# Prune: keep only last N auto- snapshots SNAPS=$(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$DATASET" | grep "@${PREFIX}-") COUNT=$(echo "$SNAPS" | wc -l) if [ "$COUNT" -gt "$KEEP" ]; then @@ -206,79 +237,83 @@ if [ "$COUNT" -gt "$KEEP" ]; then done fi """ - path = Path("/usr/local/bin/pve-tui-snapshot.sh") - path.write_text(script) - path.chmod(0o755) + SNAPSHOT_SCRIPT.write_text(script) + SNAPSHOT_SCRIPT.chmod(0o755) + + +def get_schedules() -> list[dict]: + return _parse_cron_file()[0] def set_schedule(dataset: str, cron_expr: str, keep: int) -> bool: - """Upsert a schedule in crontab.""" install_snapshot_script() - cron = read_crontab() - # Remove existing entry for this dataset in our block - lines = cron.splitlines() - new_lines = [] - in_block = False - block_lines = [] - block_start_idx = -1 - - for i, line in enumerate(lines): - if CRON_MARKER_BEGIN in line: - in_block = True - block_start_idx = len(new_lines) - new_lines.append(line) - continue - if CRON_MARKER_END in line: - # flush block_lines, filtered - for bl in block_lines: - if f" {dataset} " not in bl + " ": - new_lines.append(bl) - new_lines.append(line) - block_lines = [] - in_block = False - continue - if in_block: - block_lines.append(line) - else: - new_lines.append(line) - - # If block didn't exist, add it - if block_start_idx == -1: - new_lines.append(CRON_MARKER_BEGIN) - new_lines.append(CRON_MARKER_END) - - # Insert new entry before END marker - final = [] - inserted = False - for line in new_lines: - if CRON_MARKER_END in line and not inserted: - final.append( - f"{cron_expr} /usr/local/bin/pve-tui-snapshot.sh {dataset} {keep}" - ) - inserted = True - final.append(line) - - return write_crontab("\n".join(final) + "\n") + snaps, scrubs = _parse_cron_file() + snaps = [s for s in snaps if s["dataset"] != dataset] + snaps.append({"cron": cron_expr, "dataset": dataset, "keep": str(keep)}) + return _write_cron_file(snaps, scrubs) def remove_schedule(dataset: str) -> bool: - cron = read_crontab() - lines = cron.splitlines() - new_lines = [] - in_block = False - for line in lines: - if CRON_MARKER_BEGIN in line: - in_block = True - new_lines.append(line) + snaps, scrubs = _parse_cron_file() + snaps = [s for s in snaps if s["dataset"] != dataset] + return _write_cron_file(snaps, scrubs) + + +def migrate_legacy_crontab() -> bool: + """One-time: extract pve-tui blocks from root's crontab into CRON_FILE + and strip them from the crontab. Returns True if a migration ran.""" + if CRON_FILE.exists(): + return False + rc, out, _ = run(["crontab", "-l"]) + if rc != 0 or "PVE-TUI" not in out: + return False + + snaps: list[dict] = [] + scrubs: list[dict] = [] + new_crontab: list[str] = [] + in_snap = False + in_scrub = False + snap_re = re.compile( + r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+.*pve-tui-snapshot\.sh\s+(\S+)\s+(\d+)" + ) + scrub_re = re.compile( + r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+/usr/sbin/zpool\s+scrub\s+(\S+)" + ) + for line in out.splitlines(): + if "PVE-TUI AUTO SNAPSHOT BEGIN" in line: + in_snap = True continue - if CRON_MARKER_END in line: - in_block = False - new_lines.append(line) + if "PVE-TUI AUTO SNAPSHOT END" in line: + in_snap = False continue - if in_block and f" {dataset} " in line + " ": + if "PVE-TUI AUTO SCRUB BEGIN" in line: + in_scrub = True continue - new_lines.append(line) - return write_crontab("\n".join(new_lines) + "\n") + if "PVE-TUI AUTO SCRUB END" in line: + in_scrub = False + continue + if in_snap: + m = snap_re.match(line) + if m: + snaps.append( + {"cron": m.group(1), "dataset": m.group(2), "keep": m.group(3)} + ) + continue + if in_scrub: + m = scrub_re.match(line) + if m: + scrubs.append({"cron": m.group(1), "pool": m.group(2)}) + continue + new_crontab.append(line) + + install_snapshot_script() + _write_cron_file(snaps, scrubs) + try: + p = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, text=True) + p.communicate("\n".join(new_crontab) + "\n") + except Exception: + pass + return True # ----------------------- LXC helpers ----------------------- @@ -485,10 +520,6 @@ fi # ----------------------- Scrub helpers ----------------------- -CRON_SCRUB_BEGIN = "# === PVE-TUI AUTO SCRUB BEGIN ===" -CRON_SCRUB_END = "# === PVE-TUI AUTO SCRUB END ===" - - def scrub_status(pool: str) -> dict: rc, out, _ = run(["zpool", "status", pool]) info = {"pool": pool, "state": "?", "scan": "", "errors": ""} @@ -516,83 +547,21 @@ def scrub_stop(pool: str) -> tuple[bool, str]: def get_scrub_schedules() -> dict[str, str]: - """Return pool -> cron expression for managed scrub entries.""" - cron = read_crontab() - out: dict[str, str] = {} - in_block = False - for line in cron.splitlines(): - if CRON_SCRUB_BEGIN in line: - in_block = True - continue - if CRON_SCRUB_END in line: - in_block = False - continue - if in_block and line.strip() and not line.startswith("#"): - m = re.match( - r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+/usr/sbin/zpool\s+scrub\s+(\S+)\s*$", - line, - ) - if m: - out[m.group(2)] = m.group(1) - return out + """pool -> cron expression for TUI-managed scrub entries.""" + return {s["pool"]: s["cron"] for s in _parse_cron_file()[1]} def set_scrub_schedule(pool: str, cron_expr: str) -> bool: - cron = read_crontab() - lines = cron.splitlines() - new_lines: list[str] = [] - in_block = False - block_lines: list[str] = [] - block_seen = False - for line in lines: - if CRON_SCRUB_BEGIN in line: - in_block = True - block_seen = True - new_lines.append(line) - continue - if CRON_SCRUB_END in line: - for bl in block_lines: - if bl.strip().endswith(f"zpool scrub {pool}"): - continue - new_lines.append(bl) - new_lines.append(line) - block_lines = [] - in_block = False - continue - if in_block: - block_lines.append(line) - else: - new_lines.append(line) - if not block_seen: - new_lines.append(CRON_SCRUB_BEGIN) - new_lines.append(CRON_SCRUB_END) - final: list[str] = [] - inserted = False - for line in new_lines: - if CRON_SCRUB_END in line and not inserted: - final.append(f"{cron_expr} /usr/sbin/zpool scrub {pool}") - inserted = True - final.append(line) - return write_crontab("\n".join(final) + "\n") + snaps, scrubs = _parse_cron_file() + scrubs = [s for s in scrubs if s["pool"] != pool] + scrubs.append({"cron": cron_expr, "pool": pool}) + return _write_cron_file(snaps, scrubs) def remove_scrub_schedule(pool: str) -> bool: - cron = read_crontab() - new: list[str] = [] - in_block = False - for line in cron.splitlines(): - if CRON_SCRUB_BEGIN in line: - in_block = True - new.append(line) - continue - if CRON_SCRUB_END in line: - in_block = False - new.append(line) - continue - if in_block and line.strip().endswith(f"zpool scrub {pool}"): - continue - new.append(line) - return write_crontab("\n".join(new) + "\n") + snaps, scrubs = _parse_cron_file() + scrubs = [s for s in scrubs if s["pool"] != pool] + return _write_cron_file(snaps, scrubs) # ----------------------- Modals ----------------------- @@ -930,6 +899,9 @@ class PVETui(App): yield Footer() def on_mount(self) -> None: + if migrate_legacy_crontab(): + self.set_status("Migrated legacy crontab entries → /etc/cron.d/pve-tui") + # Snapshot table t = self.query_one("#snap-table", DataTable) t.add_columns("Snapshot", "Used", "Refer", "Created")