commit 946caeff90885adc22347d4c2c72507807f86c66 Author: root Date: Mon Apr 27 22:56:14 2026 +0800 Initial commit: PVE Storage Manager TUI diff --git a/README.md b/README.md new file mode 100644 index 0000000..c7793bf --- /dev/null +++ b/README.md @@ -0,0 +1,85 @@ +# PVE Storage Manager TUI + +Python Textual-based TUI for managing ZFS snapshots and LXC mountpoints on Proxmox VE. + +## Features + +- **Snapshots tab** — List, create, delete, rollback ZFS snapshots across all pools +- **Schedules tab** — Set up auto-snapshot cron jobs per dataset with auto-pruning (keep last N) +- **Datasets tab** — View all ZFS datasets with usage info +- **LXC Mounts tab** — View, add, and remove mountpoints for LXC containers + +## Installation + +Run on **Proxmox host as root**. + +### 1. Install dependencies + +```bash +apt update +apt install -y python3-pip +pip3 install textual --break-system-packages +``` + +### 2. Copy the script + +```bash +# Copy pve_tui.py to the Proxmox host, then: +chmod +x /root/pve_tui.py +``` + +### 3. Run + +```bash +python3 /root/pve_tui.py +``` + +### Optional: make it a command + +```bash +cp /root/pve_tui.py /usr/local/bin/pve-tui +chmod +x /usr/local/bin/pve-tui +``` + +Then just run `pve-tui` from anywhere. + +## Keybindings + +| Key | Action | +|-----|--------| +| `q` | Quit | +| `r` | Refresh current view | +| `n` | New snapshot | +| `d` | Delete selected snapshot | +| `s` | Add schedule | +| `Tab` / arrow keys | Navigate tabs | +| `↑` / `↓` | Move selection in table | + +## How Scheduling Works + +When you create a schedule, the tool: + +1. Installs a helper script at `/usr/local/bin/pve-tui-snapshot.sh` +2. Adds a cron entry in root's crontab between markers: + ``` + # === PVE-TUI AUTO SNAPSHOT BEGIN === + 0 3 * * * /usr/local/bin/pve-tui-snapshot.sh RAID1_1TB 7 + # === PVE-TUI AUTO SNAPSHOT END === + ``` +3. On each run, creates `DATASET@auto-YYYYMMDD-HHMMSS` and prunes + oldest `auto-*` snapshots beyond the "keep last N" limit. + +Manual snapshots (any name not starting with `auto-`) are **never** auto-pruned. + +## Safety Notes + +- **Rollback** destroys all newer snapshots. Always double-check before confirming. +- **LXC mount changes** may require the container to be stopped (Proxmox will warn). +- Crontab is only modified inside the marker block — your other cron entries are untouched. +- The script uses `zfs`, `pct`, and `crontab` — no third-party ZFS tools required. + +## Troubleshooting + +- **"No datasets found"** → check `zfs list` works as root +- **Cron not running** → check `systemctl status cron` and `/var/log/syslog` +- **Textual rendering issues** → use a modern terminal (Alacritty, iTerm2, Windows Terminal) diff --git a/session b/session new file mode 100644 index 0000000..83ea14a --- /dev/null +++ b/session @@ -0,0 +1 @@ +claude --resume 67e13bd7-317d-4f76-a3cd-524f2eab26d8 diff --git a/tui.py b/tui.py new file mode 100644 index 0000000..336bad7 --- /dev/null +++ b/tui.py @@ -0,0 +1,1374 @@ +#!/usr/bin/env python3 +""" +PVE Storage Manager TUI +Manages ZFS snapshots, scheduling, and LXC container mountpoints +on RAID1_1TB and Samsung_860_EVO_256GB pools. + +Run on Proxmox host as root. +""" + +import subprocess +import re +import json +import shlex +from datetime import datetime +from pathlib import Path + +from textual import work +from textual.app import App, ComposeResult +from textual.containers import Container, Horizontal, Vertical +from textual.screen import ModalScreen +from textual.widgets import ( + Header, + Footer, + DataTable, + Button, + Static, + Input, + Label, + Select, + TabbedContent, + TabPane, +) +from textual.binding import Binding + + +# ----------------------- ZFS helpers ----------------------- + +def run(cmd: list[str]) -> tuple[int, str, str]: + """Run a shell command and return (rc, stdout, stderr).""" + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return r.returncode, r.stdout.strip(), r.stderr.strip() + except Exception as e: + return 1, "", str(e) + + +def list_pools() -> list[str]: + rc, out, _ = run(["zpool", "list", "-H", "-o", "name"]) + if rc != 0: + return [] + return out.splitlines() + + +def list_datasets(pool: str | None = None) -> list[dict]: + args = ["zfs", "list", "-H", "-o", "name,used,avail,mountpoint"] + if pool: + args += ["-r", pool] + rc, out, _ = run(args) + if rc != 0: + return [] + rows = [] + for line in out.splitlines(): + parts = line.split("\t") + if len(parts) >= 4: + rows.append({ + "name": parts[0], + "used": parts[1], + "avail": parts[2], + "mountpoint": parts[3], + }) + return rows + + +def list_snapshots(pool: str | None = None) -> list[dict]: + args = ["zfs", "list", "-t", "snapshot", "-H", "-o", + "name,used,refer,creation"] + if pool: + args += ["-r", pool] + rc, out, _ = run(args) + if rc != 0: + return [] + rows = [] + for line in out.splitlines(): + parts = line.split("\t") + if len(parts) >= 4: + rows.append({ + "name": parts[0], + "used": parts[1], + "refer": parts[2], + "creation": parts[3], + }) + return rows + + +def create_snapshot(dataset: str, name: str) -> tuple[bool, str]: + full = f"{dataset}@{name}" + rc, out, err = run(["zfs", "snapshot", full]) + return rc == 0, err or out or f"Created {full}" + + +def destroy_snapshot(snap: str) -> tuple[bool, str]: + rc, out, err = run(["zfs", "destroy", snap]) + return rc == 0, err or out or f"Destroyed {snap}" + + +def rollback_snapshot(snap: str) -> tuple[bool, str]: + rc, out, err = run(["zfs", "rollback", "-r", snap]) + return rc == 0, err or out or f"Rolled back to {snap}" + + +def snapshot_now(dataset: str, keep: int = 7) -> tuple[bool, str]: + """Create an auto-* snapshot immediately and prune older ones beyond keep.""" + ts = datetime.now().strftime("%Y%m%d-%H%M%S") + full = f"{dataset}@auto-{ts}" + rc, out, err = run(["zfs", "snapshot", full]) + if rc != 0: + return False, err or out or f"snapshot failed for {full}" + + rc, out, _ = run([ + "zfs", "list", "-H", "-t", "snapshot", "-o", "name", + "-s", "creation", "-r", dataset, + ]) + pruned = 0 + if rc == 0: + auto_snaps = [ + s for s in out.splitlines() + if s.startswith(f"{dataset}@auto-") + ] + excess = len(auto_snaps) - keep + if excess > 0: + for old in auto_snaps[:excess]: + drc, _, _ = run(["zfs", "destroy", old]) + if drc == 0: + pruned += 1 + suffix = f" (pruned {pruned})" if pruned else "" + return True, f"Created {full}{suffix}" + + +# ----------------------- Cron helpers ----------------------- + +CRON_MARKER_BEGIN = "# === PVE-TUI AUTO SNAPSHOT BEGIN ===" +CRON_MARKER_END = "# === PVE-TUI AUTO SNAPSHOT END ===" + + +def read_crontab() -> str: + rc, out, _ = run(["crontab", "-l"]) + return out if rc == 0 else "" + + +def write_crontab(content: str) -> bool: + try: + p = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, + text=True) + p.communicate(content) + return p.returncode == 0 + except Exception: + return False + + +def get_schedules() -> list[dict]: + """Parse our managed cron entries.""" + cron = read_crontab() + schedules = [] + in_block = False + for line in cron.splitlines(): + if CRON_MARKER_BEGIN in line: + in_block = True + continue + if CRON_MARKER_END in line: + in_block = False + continue + if in_block and line.strip() and not line.startswith("#"): + # Format: /usr/local/bin/pve-tui-snapshot.sh + m = re.match(r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+(.*snapshot\.sh)\s+(\S+)\s+(\d+)", line) + if m: + schedules.append({ + "cron": m.group(1), + "dataset": m.group(3), + "keep": m.group(4), + }) + return schedules + + +def install_snapshot_script(): + script = """#!/bin/bash +# Auto-generated by pve-tui. Creates snapshot and prunes old ones. +DATASET="$1" +KEEP="${2:-7}" +PREFIX="auto" + +if [ -z "$DATASET" ]; then + echo "Usage: $0 [keep_count]" + exit 1 +fi + +TS=$(date +%Y%m%d-%H%M%S) +/usr/sbin/zfs snapshot "${DATASET}@${PREFIX}-${TS}" + +# Prune: keep only last N auto- snapshots +SNAPS=$(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$DATASET" | grep "@${PREFIX}-") +COUNT=$(echo "$SNAPS" | wc -l) +if [ "$COUNT" -gt "$KEEP" ]; then + DEL=$((COUNT - KEEP)) + echo "$SNAPS" | head -n "$DEL" | while read S; do + /usr/sbin/zfs destroy "$S" + done +fi +""" + path = Path("/usr/local/bin/pve-tui-snapshot.sh") + path.write_text(script) + path.chmod(0o755) + + +def set_schedule(dataset: str, cron_expr: str, keep: int) -> bool: + """Upsert a schedule in crontab.""" + install_snapshot_script() + cron = read_crontab() + # Remove existing entry for this dataset in our block + lines = cron.splitlines() + new_lines = [] + in_block = False + block_lines = [] + block_start_idx = -1 + + for i, line in enumerate(lines): + if CRON_MARKER_BEGIN in line: + in_block = True + block_start_idx = len(new_lines) + new_lines.append(line) + continue + if CRON_MARKER_END in line: + # flush block_lines, filtered + for bl in block_lines: + if f" {dataset} " not in bl + " ": + new_lines.append(bl) + new_lines.append(line) + block_lines = [] + in_block = False + continue + if in_block: + block_lines.append(line) + else: + new_lines.append(line) + + # If block didn't exist, add it + if block_start_idx == -1: + new_lines.append(CRON_MARKER_BEGIN) + new_lines.append(CRON_MARKER_END) + + # Insert new entry before END marker + final = [] + inserted = False + for line in new_lines: + if CRON_MARKER_END in line and not inserted: + final.append( + f"{cron_expr} /usr/local/bin/pve-tui-snapshot.sh {dataset} {keep}" + ) + inserted = True + final.append(line) + + return write_crontab("\n".join(final) + "\n") + + +def remove_schedule(dataset: str) -> bool: + cron = read_crontab() + lines = cron.splitlines() + new_lines = [] + in_block = False + for line in lines: + if CRON_MARKER_BEGIN in line: + in_block = True + new_lines.append(line) + continue + if CRON_MARKER_END in line: + in_block = False + new_lines.append(line) + continue + if in_block and f" {dataset} " in line + " ": + continue + new_lines.append(line) + return write_crontab("\n".join(new_lines) + "\n") + + +# ----------------------- LXC helpers ----------------------- + +def list_lxc() -> list[dict]: + rc, out, _ = run(["pct", "list"]) + if rc != 0: + return [] + rows = [] + for line in out.splitlines()[1:]: # skip header + parts = line.split(maxsplit=3) + if len(parts) >= 3: + rows.append({ + "id": parts[0], + "status": parts[1], + "name": parts[-1] if len(parts) > 2 else "", + }) + return rows + + +def get_lxc_mountpoints(ctid: str) -> list[dict]: + rc, out, _ = run(["pct", "config", ctid]) + if rc != 0: + return [] + mps = [] + for line in out.splitlines(): + m = re.match(r"^(mp\d+):\s*(.+)$", line) + if m: + key = m.group(1) + val = m.group(2) + # val like: /host/path,mp=/ct/path,size=8G + parts = val.split(",") + host = parts[0] + target = "" + for p in parts[1:]: + if p.startswith("mp="): + target = p[3:] + mps.append({ + "key": key, + "host": host, + "target": target, + "raw": val, + }) + return mps + + +def add_lxc_mount(ctid: str, host_path: str, ct_path: str) -> tuple[bool, str]: + # Find next free mp slot + existing = get_lxc_mountpoints(ctid) + used = {int(m["key"][2:]) for m in existing} + slot = 0 + while slot in used: + slot += 1 + rc, out, err = run([ + "pct", "set", ctid, f"-mp{slot}", + f"{host_path},mp={ct_path}" + ]) + return rc == 0, err or out or f"Added mp{slot}" + + +def remove_lxc_mount(ctid: str, key: str) -> tuple[bool, str]: + rc, out, err = run(["pct", "set", ctid, f"-delete", key]) + return rc == 0, err or out or f"Removed {key}" + + +# ----------------------- Replication helpers ----------------------- + +REPLICATION_CONFIG = Path("/etc/pve-tui/replication.json") +REPLICATION_SCRIPT = Path("/usr/local/bin/pve-tui-replicate.sh") + + +def load_replication_targets() -> list[dict]: + if not REPLICATION_CONFIG.exists(): + return [] + try: + return json.loads(REPLICATION_CONFIG.read_text()) + except Exception: + return [] + + +def save_replication_targets(targets: list[dict]) -> bool: + try: + REPLICATION_CONFIG.parent.mkdir(parents=True, exist_ok=True) + REPLICATION_CONFIG.write_text(json.dumps(targets, indent=2)) + return True + except Exception: + return False + + +def list_dataset_snapshots(dataset: str) -> list[str]: + """Snapshot short-names (no `dataset@` prefix) for an exact dataset, oldest first.""" + rc, out, _ = run([ + "zfs", "list", "-H", "-t", "snapshot", "-o", "name", + "-s", "creation", dataset, + ]) + if rc != 0: + return [] + names = [] + for line in out.splitlines(): + if "@" in line: + ds, snap = line.split("@", 1) + if ds == dataset: + names.append(snap) + return names + + +def latest_snapshot(dataset: str) -> str | None: + snaps = list_dataset_snapshots(dataset) + return snaps[-1] if snaps else None + + +def latest_common_snapshot(source: str, target: str) -> str | None: + src = list_dataset_snapshots(source) + tgt = set(list_dataset_snapshots(target)) + for s in reversed(src): + if s in tgt: + return s + return None + + +def dataset_exists(dataset: str) -> bool: + rc, _, _ = run(["zfs", "list", "-H", dataset]) + return rc == 0 + + +def replicate_dataset(source: str, target: str) -> tuple[bool, str]: + """Full or incremental zfs send|recv, automatically picking the right mode.""" + src_snaps = list_dataset_snapshots(source) + if not src_snaps: + return False, f"No snapshots on {source}; create one first." + latest_src = src_snaps[-1] + + if dataset_exists(target): + common = latest_common_snapshot(source, target) + if not common: + return False, ( + f"{target} exists but shares no snapshot with {source}. " + f"Destroy {target} for a fresh full send, or align bases manually." + ) + if common == latest_src: + return True, f"Up-to-date: {source}@{common}" + cmd = ( + f"zfs send -R -I {shlex.quote(source + '@' + common)} " + f"{shlex.quote(source + '@' + latest_src)} | " + f"zfs receive -F {shlex.quote(target)}" + ) + mode = f"incremental {common} → {latest_src}" + else: + cmd = ( + f"zfs send -R {shlex.quote(source + '@' + latest_src)} | " + f"zfs receive {shlex.quote(target)}" + ) + mode = f"full → {latest_src}" + + proc = subprocess.run(["bash", "-c", cmd], capture_output=True, text=True) + if proc.returncode != 0: + return False, (proc.stderr.strip() or proc.stdout.strip() + or "replication failed").splitlines()[-1] + return True, f"Replicated {source} → {target} ({mode})" + + +def install_replication_script() -> None: + """Standalone replication helper for cron / manual use.""" + script = """#!/bin/bash +# Auto-generated by pve-tui. Incrementally replicates ZFS dataset. +# Usage: pve-tui-replicate.sh +set -e +SOURCE="$1" +TARGET="$2" +if [ -z "$SOURCE" ] || [ -z "$TARGET" ]; then + echo "Usage: $0 " + exit 1 +fi + +LATEST=$(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$SOURCE" | tail -n1 | awk -F@ '{print $2}') +if [ -z "$LATEST" ]; then + echo "No snapshots on $SOURCE"; exit 1 +fi + +if /usr/sbin/zfs list -H "$TARGET" >/dev/null 2>&1; then + # Find latest snapshot name that exists on both source and target. + COMMON="" + for s in $(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$SOURCE" | awk -F@ '{print $2}' | tac); do + if /usr/sbin/zfs list -H -t snapshot "${TARGET}@${s}" >/dev/null 2>&1; then + COMMON="$s"; break + fi + done + if [ -z "$COMMON" ]; then + echo "No common snapshot between $SOURCE and $TARGET; aborting." >&2; exit 2 + fi + if [ "$COMMON" = "$LATEST" ]; then + echo "Up-to-date: $SOURCE@$COMMON"; exit 0 + fi + /usr/sbin/zfs send -R -I "${SOURCE}@${COMMON}" "${SOURCE}@${LATEST}" | /usr/sbin/zfs receive -F "$TARGET" + echo "Incremental ${COMMON} → ${LATEST}" +else + /usr/sbin/zfs send -R "${SOURCE}@${LATEST}" | /usr/sbin/zfs receive "$TARGET" + echo "Full → ${LATEST}" +fi +""" + REPLICATION_SCRIPT.write_text(script) + REPLICATION_SCRIPT.chmod(0o755) + + +# ----------------------- Scrub helpers ----------------------- + +CRON_SCRUB_BEGIN = "# === PVE-TUI AUTO SCRUB BEGIN ===" +CRON_SCRUB_END = "# === PVE-TUI AUTO SCRUB END ===" + + +def scrub_status(pool: str) -> dict: + rc, out, _ = run(["zpool", "status", pool]) + info = {"pool": pool, "state": "?", "scan": "", "errors": ""} + if rc != 0: + return info + for line in out.splitlines(): + s = line.strip() + if s.startswith("state:"): + info["state"] = s.split(":", 1)[1].strip() + elif s.startswith("scan:"): + info["scan"] = s.split(":", 1)[1].strip() + elif s.startswith("errors:"): + info["errors"] = s.split(":", 1)[1].strip() + return info + + +def scrub_start(pool: str) -> tuple[bool, str]: + rc, out, err = run(["zpool", "scrub", pool]) + return rc == 0, err or out or f"Started scrub on {pool}" + + +def scrub_stop(pool: str) -> tuple[bool, str]: + rc, out, err = run(["zpool", "scrub", "-s", pool]) + return rc == 0, err or out or f"Stopped scrub on {pool}" + + +def get_scrub_schedules() -> dict[str, str]: + """Return pool -> cron expression for managed scrub entries.""" + cron = read_crontab() + out: dict[str, str] = {} + in_block = False + for line in cron.splitlines(): + if CRON_SCRUB_BEGIN in line: + in_block = True + continue + if CRON_SCRUB_END in line: + in_block = False + continue + if in_block and line.strip() and not line.startswith("#"): + m = re.match( + r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+/usr/sbin/zpool\s+scrub\s+(\S+)\s*$", + line, + ) + if m: + out[m.group(2)] = m.group(1) + return out + + +def set_scrub_schedule(pool: str, cron_expr: str) -> bool: + cron = read_crontab() + lines = cron.splitlines() + new_lines: list[str] = [] + in_block = False + block_lines: list[str] = [] + block_seen = False + for line in lines: + if CRON_SCRUB_BEGIN in line: + in_block = True + block_seen = True + new_lines.append(line) + continue + if CRON_SCRUB_END in line: + for bl in block_lines: + if bl.strip().endswith(f"zpool scrub {pool}"): + continue + new_lines.append(bl) + new_lines.append(line) + block_lines = [] + in_block = False + continue + if in_block: + block_lines.append(line) + else: + new_lines.append(line) + if not block_seen: + new_lines.append(CRON_SCRUB_BEGIN) + new_lines.append(CRON_SCRUB_END) + final: list[str] = [] + inserted = False + for line in new_lines: + if CRON_SCRUB_END in line and not inserted: + final.append(f"{cron_expr} /usr/sbin/zpool scrub {pool}") + inserted = True + final.append(line) + return write_crontab("\n".join(final) + "\n") + + +def remove_scrub_schedule(pool: str) -> bool: + cron = read_crontab() + new: list[str] = [] + in_block = False + for line in cron.splitlines(): + if CRON_SCRUB_BEGIN in line: + in_block = True + new.append(line) + continue + if CRON_SCRUB_END in line: + in_block = False + new.append(line) + continue + if in_block and line.strip().endswith(f"zpool scrub {pool}"): + continue + new.append(line) + return write_crontab("\n".join(new) + "\n") + + +# ----------------------- Modals ----------------------- + +class ConfirmModal(ModalScreen[bool]): + def __init__(self, message: str): + super().__init__() + self.message = message + + def compose(self) -> ComposeResult: + with Container(classes="modal-box"): + yield Label(self.message, classes="modal-title") + with Horizontal(classes="modal-buttons"): + yield Button("Yes", variant="error", id="yes") + yield Button("No", variant="primary", id="no") + + def on_button_pressed(self, event: Button.Pressed) -> None: + self.dismiss(event.button.id == "yes") + + +class CreateSnapshotModal(ModalScreen[tuple[str, str] | None]): + def __init__(self, datasets: list[str]): + super().__init__() + self.datasets = datasets + + def compose(self) -> ComposeResult: + with Container(classes="modal-box"): + yield Label("Create Snapshot", classes="modal-title") + yield Label("Dataset:") + yield Select( + [(d, d) for d in self.datasets], + id="dataset", + value=self.datasets[0] if self.datasets else None, + ) + yield Label("Snapshot name:") + default_name = f"manual-{datetime.now().strftime('%Y%m%d-%H%M%S')}" + yield Input(value=default_name, id="snap-name") + with Horizontal(classes="modal-buttons"): + yield Button("Create", variant="success", id="create") + yield Button("Cancel", id="cancel") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "create": + dataset = self.query_one("#dataset", Select).value + name = self.query_one("#snap-name", Input).value.strip() + if dataset and name: + self.dismiss((str(dataset), name)) + else: + self.dismiss(None) + else: + self.dismiss(None) + + +class ScheduleModal(ModalScreen[tuple[str, str, int] | None]): + PRESETS = [ + ("Hourly", "0 * * * *"), + ("Every 6 hours", "0 */6 * * *"), + ("Daily at midnight", "0 0 * * *"), + ("Daily at 3am", "0 3 * * *"), + ("Weekly (Sunday 2am)", "0 2 * * 0"), + ("Custom...", "custom"), + ] + + def __init__(self, datasets: list[str]): + super().__init__() + self.datasets = datasets + + def compose(self) -> ComposeResult: + with Container(classes="modal-box"): + yield Label("Schedule Auto-Snapshot", classes="modal-title") + yield Label("Dataset:") + yield Select( + [(d, d) for d in self.datasets], + id="dataset", + value=self.datasets[0] if self.datasets else None, + ) + yield Label("Frequency:") + yield Select( + [(name, val) for name, val in self.PRESETS], + id="preset", + value="0 3 * * *", + ) + yield Label("Custom cron (if selected above):") + yield Input(placeholder="e.g. 0 */4 * * *", id="custom-cron") + yield Label("Keep last N snapshots:") + yield Input(value="7", id="keep") + with Horizontal(classes="modal-buttons"): + yield Button("Save", variant="success", id="save") + yield Button("Cancel", id="cancel") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "save": + dataset = self.query_one("#dataset", Select).value + preset = self.query_one("#preset", Select).value + custom = self.query_one("#custom-cron", Input).value.strip() + keep_str = self.query_one("#keep", Input).value.strip() + + cron_expr = custom if preset == "custom" else preset + try: + keep = int(keep_str) + except ValueError: + keep = 7 + + if dataset and cron_expr: + self.dismiss((str(dataset), str(cron_expr), keep)) + else: + self.dismiss(None) + else: + self.dismiss(None) + + +class AddReplicationModal(ModalScreen[tuple[str, str] | None]): + def __init__(self, sources: list[str]): + super().__init__() + self.sources = sources + + def compose(self) -> ComposeResult: + with Container(classes="modal-box"): + yield Label("Add Replication Target", classes="modal-title") + yield Label("Source dataset:") + yield Select( + [(d, d) for d in self.sources], + id="src", + value=self.sources[0] if self.sources else None, + ) + yield Label("Target dataset (must be on imported pool):") + yield Input(placeholder="e.g. backup_ext/RAID1_1TB", id="tgt") + with Horizontal(classes="modal-buttons"): + yield Button("Add", variant="success", id="add") + yield Button("Cancel", id="cancel") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "add": + src = self.query_one("#src", Select).value + tgt = self.query_one("#tgt", Input).value.strip() + if src and tgt: + self.dismiss((str(src), tgt)) + return + self.dismiss(None) + + +class ScrubScheduleModal(ModalScreen[str | None]): + PRESETS = [ + ("Monthly (1st @ 3am)", "0 3 1 * *"), + ("Bi-weekly (1st & 15th @ 3am)", "0 3 1,15 * *"), + ("Weekly (Sunday 3am)", "0 3 * * 0"), + ("Custom...", "custom"), + ] + + def compose(self) -> ComposeResult: + with Container(classes="modal-box"): + yield Label("Schedule Scrub", classes="modal-title") + yield Label("Frequency:") + yield Select( + [(name, val) for name, val in self.PRESETS], + id="preset", + value="0 3 1 * *", + ) + yield Label("Custom cron (if selected above):") + yield Input(placeholder="e.g. 0 3 1 * *", id="custom-cron") + with Horizontal(classes="modal-buttons"): + yield Button("Save", variant="success", id="save") + yield Button("Cancel", id="cancel") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "save": + preset = self.query_one("#preset", Select).value + custom = self.query_one("#custom-cron", Input).value.strip() + cron_expr = custom if preset == "custom" else preset + if cron_expr: + self.dismiss(str(cron_expr)) + return + self.dismiss(None) + + +class AddMountModal(ModalScreen[tuple[str, str, str] | None]): + def __init__(self, ctids: list[str]): + super().__init__() + self.ctids = ctids + + def compose(self) -> ComposeResult: + with Container(classes="modal-box"): + yield Label("Add LXC Mountpoint", classes="modal-title") + yield Label("Container:") + yield Select( + [(c, c) for c in self.ctids], + id="ctid", + value=self.ctids[0] if self.ctids else None, + ) + yield Label("Host path (on Proxmox):") + yield Input(placeholder="/RAID1_1TB/MyData", id="host") + yield Label("Container path (inside LXC):") + yield Input(placeholder="/RAID1_1TB/MyData", id="target") + with Horizontal(classes="modal-buttons"): + yield Button("Add", variant="success", id="add") + yield Button("Cancel", id="cancel") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "add": + ctid = self.query_one("#ctid", Select).value + host = self.query_one("#host", Input).value.strip() + target = self.query_one("#target", Input).value.strip() + if ctid and host and target: + self.dismiss((str(ctid), host, target)) + else: + self.dismiss(None) + else: + self.dismiss(None) + + +# ----------------------- Main App ----------------------- + +class PVETui(App): + CSS = """ + Screen { + background: $surface; + } + + TabbedContent { + height: 1fr; + } + + DataTable { + height: 1fr; + } + + .toolbar { + height: auto; + padding: 1; + background: $panel; + } + + .toolbar Button { + margin-right: 1; + } + + .status { + dock: bottom; + height: 1; + background: $primary; + color: $text; + padding: 0 1; + } + + .modal-box { + align: center middle; + background: $panel; + border: thick $primary; + padding: 1 2; + width: 60; + height: auto; + } + + .modal-title { + text-align: center; + text-style: bold; + margin-bottom: 1; + } + + .modal-buttons { + align: center middle; + height: auto; + margin-top: 1; + } + + .modal-buttons Button { + margin: 0 1; + } + + Label { + margin-top: 1; + } + """ + + BINDINGS = [ + Binding("q", "quit", "Quit"), + Binding("r", "refresh", "Refresh"), + Binding("n", "new_snapshot", "New Snapshot"), + Binding("d", "delete_snapshot", "Delete"), + Binding("s", "schedule", "Schedule"), + ] + + TITLE = "PVE Storage Manager" + SUB_TITLE = "ZFS Snapshots + LXC Mountpoints" + + def compose(self) -> ComposeResult: + yield Header() + with TabbedContent(initial="snapshots"): + with TabPane("Snapshots", id="snapshots"): + with Horizontal(classes="toolbar"): + yield Button("New (n)", id="btn-new", variant="success") + yield Button("Delete (d)", id="btn-del", variant="error") + yield Button("Rollback", id="btn-rollback", variant="warning") + yield Button("Refresh (r)", id="btn-refresh") + yield DataTable(id="snap-table", cursor_type="row") + + with TabPane("Schedules", id="schedules"): + with Horizontal(classes="toolbar"): + yield Button("Add Schedule (s)", id="btn-sched-add", variant="success") + yield Button("Snapshot Now", id="btn-sched-now", variant="primary") + yield Button("Remove", id="btn-sched-del", variant="error") + yield Button("Refresh", id="btn-sched-refresh") + yield DataTable(id="sched-table", cursor_type="row") + + with TabPane("Datasets", id="datasets"): + with Horizontal(classes="toolbar"): + yield Button("Refresh", id="btn-ds-refresh") + yield DataTable(id="ds-table", cursor_type="row") + + with TabPane("LXC Mounts", id="mounts"): + with Horizontal(classes="toolbar"): + yield Button("Add Mount", id="btn-mp-add", variant="success") + yield Button("Remove Mount", id="btn-mp-del", variant="error") + yield Button("Refresh", id="btn-mp-refresh") + yield DataTable(id="mp-table", cursor_type="row") + + with TabPane("Replication", id="replication"): + with Horizontal(classes="toolbar"): + yield Button("Add Target", id="btn-rep-add", variant="success") + yield Button("Replicate Now", id="btn-rep-run", variant="primary") + yield Button("Remove", id="btn-rep-del", variant="error") + yield Button("Refresh", id="btn-rep-refresh") + yield DataTable(id="rep-table", cursor_type="row") + + with TabPane("Scrub", id="scrub"): + with Horizontal(classes="toolbar"): + yield Button("Start Scrub", id="btn-scrub-start", variant="success") + yield Button("Stop Scrub", id="btn-scrub-stop", variant="warning") + yield Button("Schedule", id="btn-scrub-sched") + yield Button("Unschedule", id="btn-scrub-unsched", variant="error") + yield Button("Refresh", id="btn-scrub-refresh") + yield DataTable(id="scrub-table", cursor_type="row") + + yield Static("Ready", id="status", classes="status") + yield Footer() + + def on_mount(self) -> None: + # Snapshot table + t = self.query_one("#snap-table", DataTable) + t.add_columns("Snapshot", "Used", "Refer", "Created") + + # Schedule table + t = self.query_one("#sched-table", DataTable) + t.add_columns("Dataset", "Cron", "Keep") + + # Dataset table + t = self.query_one("#ds-table", DataTable) + t.add_columns("Name", "Used", "Available", "Mountpoint") + + # Mountpoint table + t = self.query_one("#mp-table", DataTable) + t.add_columns("CTID", "Slot", "Host Path", "Container Path") + + # Replication table + t = self.query_one("#rep-table", DataTable) + t.add_columns("Source", "Target", "Common", "Latest Src", "Status") + + # Scrub table + t = self.query_one("#scrub-table", DataTable) + t.add_columns("Pool", "State", "Last/Current Scan", "Errors", "Schedule") + + self.refresh_all() + + def set_status(self, msg: str) -> None: + self.query_one("#status", Static).update(msg) + + def refresh_all(self) -> None: + self.refresh_snapshots() + self.refresh_schedules() + self.refresh_datasets() + self.refresh_mounts() + self.refresh_replication() + self.refresh_scrub() + self.set_status(f"Refreshed at {datetime.now().strftime('%H:%M:%S')}") + + def refresh_snapshots(self) -> None: + t = self.query_one("#snap-table", DataTable) + t.clear() + for snap in list_snapshots(): + t.add_row(snap["name"], snap["used"], snap["refer"], snap["creation"]) + + def refresh_schedules(self) -> None: + t = self.query_one("#sched-table", DataTable) + t.clear() + for s in get_schedules(): + t.add_row(s["dataset"], s["cron"], s["keep"]) + + def refresh_datasets(self) -> None: + t = self.query_one("#ds-table", DataTable) + t.clear() + for d in list_datasets(): + t.add_row(d["name"], d["used"], d["avail"], d["mountpoint"]) + + def refresh_mounts(self) -> None: + t = self.query_one("#mp-table", DataTable) + t.clear() + for lxc in list_lxc(): + for mp in get_lxc_mountpoints(lxc["id"]): + t.add_row(lxc["id"], mp["key"], mp["host"], mp["target"]) + + def refresh_replication(self) -> None: + t = self.query_one("#rep-table", DataTable) + t.clear() + for tgt in load_replication_targets(): + src = tgt["source"] + dst = tgt["target"] + common = latest_common_snapshot(src, dst) or "-" + latest = latest_snapshot(src) or "-" + if not dataset_exists(dst): + status = "target missing (full send needed)" + elif common == latest: + status = "up-to-date" + elif common == "-": + status = "no common snap (mismatch)" + else: + status = "incremental pending" + t.add_row(src, dst, common, latest, status) + + def refresh_scrub(self) -> None: + t = self.query_one("#scrub-table", DataTable) + t.clear() + sched = get_scrub_schedules() + for pool in list_pools(): + info = scrub_status(pool) + t.add_row( + pool, + info["state"], + info["scan"][:60] if info["scan"] else "-", + info["errors"] or "-", + sched.get(pool, "-"), + ) + + # ------------- Actions ------------- + + def action_refresh(self) -> None: + self.refresh_all() + + def action_new_snapshot(self) -> None: + self._open_create_snapshot() + + def action_delete_snapshot(self) -> None: + self._delete_selected_snapshot() + + def action_schedule(self) -> None: + self._open_schedule() + + def _get_datasets(self) -> list[str]: + return [d["name"] for d in list_datasets()] + + def _open_create_snapshot(self) -> None: + datasets = self._get_datasets() + if not datasets: + self.set_status("No datasets found") + return + + def cb(result): + if result: + dataset, name = result + ok, msg = create_snapshot(dataset, name) + self.set_status(msg) + self.refresh_snapshots() + + self.push_screen(CreateSnapshotModal(datasets), cb) + + def _delete_selected_snapshot(self) -> None: + t = self.query_one("#snap-table", DataTable) + if t.cursor_row is None: + return + try: + row = t.get_row_at(t.cursor_row) + snap_name = row[0] + except Exception: + return + + def cb(confirmed): + if confirmed: + ok, msg = destroy_snapshot(snap_name) + self.set_status(msg) + self.refresh_snapshots() + + self.push_screen( + ConfirmModal(f"Destroy snapshot:\n{snap_name}?"), cb + ) + + def _rollback_selected_snapshot(self) -> None: + t = self.query_one("#snap-table", DataTable) + if t.cursor_row is None: + return + try: + row = t.get_row_at(t.cursor_row) + snap_name = row[0] + except Exception: + return + + def cb(confirmed): + if confirmed: + ok, msg = rollback_snapshot(snap_name) + self.set_status(msg) + self.refresh_all() + + self.push_screen( + ConfirmModal( + f"ROLLBACK to {snap_name}?\n" + "This will DESTROY all newer snapshots\n" + "and changes since then!" + ), cb + ) + + def _open_schedule(self) -> None: + datasets = self._get_datasets() + if not datasets: + self.set_status("No datasets found") + return + + def cb(result): + if result: + dataset, cron_expr, keep = result + ok = set_schedule(dataset, cron_expr, keep) + self.set_status( + f"Schedule saved for {dataset}" if ok else "Failed to save schedule" + ) + self.refresh_schedules() + + self.push_screen(ScheduleModal(datasets), cb) + + def _snapshot_now_selected(self) -> None: + t = self.query_one("#sched-table", DataTable) + if t.cursor_row is None: + return + try: + row = t.get_row_at(t.cursor_row) + dataset = str(row[0]) + keep = int(str(row[2])) + except Exception: + self.set_status("Select a schedule row first") + return + ok, msg = snapshot_now(dataset, keep) + self.set_status(msg) + self.refresh_snapshots() + + def _remove_selected_schedule(self) -> None: + t = self.query_one("#sched-table", DataTable) + if t.cursor_row is None: + return + try: + row = t.get_row_at(t.cursor_row) + dataset = row[0] + except Exception: + return + + def cb(confirmed): + if confirmed: + ok = remove_schedule(dataset) + self.set_status( + f"Removed schedule for {dataset}" if ok else "Failed" + ) + self.refresh_schedules() + + self.push_screen( + ConfirmModal(f"Remove schedule for:\n{dataset}?"), cb + ) + + def _open_add_mount(self) -> None: + ctids = [c["id"] for c in list_lxc()] + if not ctids: + self.set_status("No LXC containers found") + return + + def cb(result): + if result: + ctid, host, target = result + ok, msg = add_lxc_mount(ctid, host, target) + self.set_status(msg) + self.refresh_mounts() + + self.push_screen(AddMountModal(ctids), cb) + + def _open_add_replication(self) -> None: + sources = self._get_datasets() + if not sources: + self.set_status("No datasets to replicate") + return + + def cb(result): + if result: + src, tgt = result + targets = load_replication_targets() + for t in targets: + if t["source"] == src and t["target"] == tgt: + self.set_status("Target already configured") + return + targets.append({"source": src, "target": tgt}) + if save_replication_targets(targets): + install_replication_script() + self.set_status(f"Added: {src} → {tgt}") + self.refresh_replication() + else: + self.set_status("Failed to save replication config") + + self.push_screen(AddReplicationModal(sources), cb) + + def _remove_selected_replication(self) -> None: + t = self.query_one("#rep-table", DataTable) + if t.cursor_row is None: + return + try: + row = t.get_row_at(t.cursor_row) + src, tgt = str(row[0]), str(row[1]) + except Exception: + return + + def cb(confirmed): + if confirmed: + targets = [ + x for x in load_replication_targets() + if not (x["source"] == src and x["target"] == tgt) + ] + if save_replication_targets(targets): + self.set_status(f"Removed: {src} → {tgt}") + self.refresh_replication() + + self.push_screen( + ConfirmModal( + f"Remove replication target?\n{src} → {tgt}\n" + "(This does not destroy the target dataset.)" + ), + cb, + ) + + @work(thread=True, exclusive=True, group="replicate") + def _do_replicate(self, source: str, target: str) -> None: + ok, msg = replicate_dataset(source, target) + self.app.call_from_thread(self.set_status, msg) + self.app.call_from_thread(self.refresh_replication) + self.app.call_from_thread(self.refresh_snapshots) + + def _replicate_selected(self) -> None: + t = self.query_one("#rep-table", DataTable) + if t.cursor_row is None: + self.set_status("Select a replication row first") + return + try: + row = t.get_row_at(t.cursor_row) + src, tgt = str(row[0]), str(row[1]) + except Exception: + return + self.set_status(f"Replicating {src} → {tgt} ... (this may take a while)") + self._do_replicate(src, tgt) + + def _scrub_action(self, action: str) -> None: + t = self.query_one("#scrub-table", DataTable) + if t.cursor_row is None: + self.set_status("Select a pool row first") + return + try: + row = t.get_row_at(t.cursor_row) + pool = str(row[0]) + except Exception: + return + if action == "start": + ok, msg = scrub_start(pool) + else: + ok, msg = scrub_stop(pool) + self.set_status(msg) + self.refresh_scrub() + + def _open_scrub_schedule(self) -> None: + t = self.query_one("#scrub-table", DataTable) + if t.cursor_row is None: + self.set_status("Select a pool row first") + return + try: + row = t.get_row_at(t.cursor_row) + pool = str(row[0]) + except Exception: + return + + def cb(cron_expr): + if cron_expr: + ok = set_scrub_schedule(pool, cron_expr) + self.set_status( + f"Scrub scheduled for {pool}" if ok else "Failed to save" + ) + self.refresh_scrub() + + self.push_screen(ScrubScheduleModal(), cb) + + def _unschedule_scrub(self) -> None: + t = self.query_one("#scrub-table", DataTable) + if t.cursor_row is None: + return + try: + row = t.get_row_at(t.cursor_row) + pool = str(row[0]) + except Exception: + return + + def cb(confirmed): + if confirmed: + ok = remove_scrub_schedule(pool) + self.set_status( + f"Unscheduled scrub for {pool}" if ok else "Failed" + ) + self.refresh_scrub() + + self.push_screen(ConfirmModal(f"Remove scrub schedule for {pool}?"), cb) + + def _remove_selected_mount(self) -> None: + t = self.query_one("#mp-table", DataTable) + if t.cursor_row is None: + return + try: + row = t.get_row_at(t.cursor_row) + ctid = row[0] + slot = row[1] + except Exception: + return + + def cb(confirmed): + if confirmed: + ok, msg = remove_lxc_mount(ctid, slot) + self.set_status(msg) + self.refresh_mounts() + + self.push_screen( + ConfirmModal(f"Remove {slot} from LXC {ctid}?\n(CT must be stopped for some changes)"), + cb + ) + + # ------------- Button handlers ------------- + + def on_button_pressed(self, event: Button.Pressed) -> None: + bid = event.button.id + if bid == "btn-new": + self._open_create_snapshot() + elif bid == "btn-del": + self._delete_selected_snapshot() + elif bid == "btn-rollback": + self._rollback_selected_snapshot() + elif bid == "btn-refresh": + self.refresh_snapshots() + elif bid == "btn-sched-add": + self._open_schedule() + elif bid == "btn-sched-now": + self._snapshot_now_selected() + elif bid == "btn-sched-del": + self._remove_selected_schedule() + elif bid == "btn-sched-refresh": + self.refresh_schedules() + elif bid == "btn-ds-refresh": + self.refresh_datasets() + elif bid == "btn-mp-add": + self._open_add_mount() + elif bid == "btn-mp-del": + self._remove_selected_mount() + elif bid == "btn-mp-refresh": + self.refresh_mounts() + elif bid == "btn-rep-add": + self._open_add_replication() + elif bid == "btn-rep-run": + self._replicate_selected() + elif bid == "btn-rep-del": + self._remove_selected_replication() + elif bid == "btn-rep-refresh": + self.refresh_replication() + elif bid == "btn-scrub-start": + self._scrub_action("start") + elif bid == "btn-scrub-stop": + self._scrub_action("stop") + elif bid == "btn-scrub-sched": + self._open_scrub_schedule() + elif bid == "btn-scrub-unsched": + self._unschedule_scrub() + elif bid == "btn-scrub-refresh": + self.refresh_scrub() + + +if __name__ == "__main__": + PVETui().run()