#!/usr/bin/env python3 """ PVE Storage Manager TUI Manages ZFS snapshots, scheduling, and LXC container mountpoints on RAID1_1TB and Samsung_860_EVO_256GB pools. Run on Proxmox host as root. """ import subprocess import re import json import shlex from datetime import datetime from pathlib import Path from textual import work from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical from textual.screen import ModalScreen from textual.widgets import ( Header, Footer, DataTable, Button, Static, Input, Label, Select, TabbedContent, TabPane, ) from textual.binding import Binding # ----------------------- ZFS helpers ----------------------- def run(cmd: list[str]) -> tuple[int, str, str]: """Run a shell command and return (rc, stdout, stderr).""" try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=30) return r.returncode, r.stdout.strip(), r.stderr.strip() except Exception as e: return 1, "", str(e) def list_pools() -> list[str]: rc, out, _ = run(["zpool", "list", "-H", "-o", "name"]) if rc != 0: return [] return out.splitlines() def list_datasets(pool: str | None = None) -> list[dict]: args = ["zfs", "list", "-H", "-o", "name,used,avail,mountpoint"] if pool: args += ["-r", pool] rc, out, _ = run(args) if rc != 0: return [] rows = [] for line in out.splitlines(): parts = line.split("\t") if len(parts) >= 4: rows.append({ "name": parts[0], "used": parts[1], "avail": parts[2], "mountpoint": parts[3], }) return rows def list_snapshots(pool: str | None = None) -> list[dict]: args = ["zfs", "list", "-t", "snapshot", "-H", "-o", "name,used,refer,creation"] if pool: args += ["-r", pool] rc, out, _ = run(args) if rc != 0: return [] rows = [] for line in out.splitlines(): parts = line.split("\t") if len(parts) >= 4: rows.append({ "name": parts[0], "used": parts[1], "refer": parts[2], "creation": parts[3], }) return rows def create_snapshot(dataset: str, name: str) -> tuple[bool, str]: full = f"{dataset}@{name}" rc, out, err = run(["zfs", "snapshot", full]) return rc == 0, err or out or f"Created {full}" def destroy_snapshot(snap: str) -> tuple[bool, str]: rc, out, err = run(["zfs", "destroy", snap]) return rc == 0, err or out or f"Destroyed {snap}" def rollback_snapshot(snap: str) -> tuple[bool, str]: rc, out, err = run(["zfs", "rollback", "-r", snap]) return rc == 0, err or out or f"Rolled back to {snap}" def snapshot_now(dataset: str, keep: int = 7) -> tuple[bool, str]: """Create an auto-* snapshot immediately and prune older ones beyond keep.""" ts = datetime.now().strftime("%Y%m%d-%H%M%S") full = f"{dataset}@auto-{ts}" rc, out, err = run(["zfs", "snapshot", full]) if rc != 0: return False, err or out or f"snapshot failed for {full}" rc, out, _ = run([ "zfs", "list", "-H", "-t", "snapshot", "-o", "name", "-s", "creation", "-r", dataset, ]) pruned = 0 if rc == 0: auto_snaps = [ s for s in out.splitlines() if s.startswith(f"{dataset}@auto-") ] excess = len(auto_snaps) - keep if excess > 0: for old in auto_snaps[:excess]: drc, _, _ = run(["zfs", "destroy", old]) if drc == 0: pruned += 1 suffix = f" (pruned {pruned})" if pruned else "" return True, f"Created {full}{suffix}" # ----------------------- Cron helpers ----------------------- CRON_MARKER_BEGIN = "# === PVE-TUI AUTO SNAPSHOT BEGIN ===" CRON_MARKER_END = "# === PVE-TUI AUTO SNAPSHOT END ===" def read_crontab() -> str: rc, out, _ = run(["crontab", "-l"]) return out if rc == 0 else "" def write_crontab(content: str) -> bool: try: p = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, text=True) p.communicate(content) return p.returncode == 0 except Exception: return False def get_schedules() -> list[dict]: """Parse our managed cron entries.""" cron = read_crontab() schedules = [] in_block = False for line in cron.splitlines(): if CRON_MARKER_BEGIN in line: in_block = True continue if CRON_MARKER_END in line: in_block = False continue if in_block and line.strip() and not line.startswith("#"): # Format: /usr/local/bin/pve-tui-snapshot.sh m = re.match(r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+(.*snapshot\.sh)\s+(\S+)\s+(\d+)", line) if m: schedules.append({ "cron": m.group(1), "dataset": m.group(3), "keep": m.group(4), }) return schedules def install_snapshot_script(): script = """#!/bin/bash # Auto-generated by pve-tui. Creates snapshot and prunes old ones. DATASET="$1" KEEP="${2:-7}" PREFIX="auto" if [ -z "$DATASET" ]; then echo "Usage: $0 [keep_count]" exit 1 fi TS=$(date +%Y%m%d-%H%M%S) /usr/sbin/zfs snapshot "${DATASET}@${PREFIX}-${TS}" # Prune: keep only last N auto- snapshots SNAPS=$(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$DATASET" | grep "@${PREFIX}-") COUNT=$(echo "$SNAPS" | wc -l) if [ "$COUNT" -gt "$KEEP" ]; then DEL=$((COUNT - KEEP)) echo "$SNAPS" | head -n "$DEL" | while read S; do /usr/sbin/zfs destroy "$S" done fi """ path = Path("/usr/local/bin/pve-tui-snapshot.sh") path.write_text(script) path.chmod(0o755) def set_schedule(dataset: str, cron_expr: str, keep: int) -> bool: """Upsert a schedule in crontab.""" install_snapshot_script() cron = read_crontab() # Remove existing entry for this dataset in our block lines = cron.splitlines() new_lines = [] in_block = False block_lines = [] block_start_idx = -1 for i, line in enumerate(lines): if CRON_MARKER_BEGIN in line: in_block = True block_start_idx = len(new_lines) new_lines.append(line) continue if CRON_MARKER_END in line: # flush block_lines, filtered for bl in block_lines: if f" {dataset} " not in bl + " ": new_lines.append(bl) new_lines.append(line) block_lines = [] in_block = False continue if in_block: block_lines.append(line) else: new_lines.append(line) # If block didn't exist, add it if block_start_idx == -1: new_lines.append(CRON_MARKER_BEGIN) new_lines.append(CRON_MARKER_END) # Insert new entry before END marker final = [] inserted = False for line in new_lines: if CRON_MARKER_END in line and not inserted: final.append( f"{cron_expr} /usr/local/bin/pve-tui-snapshot.sh {dataset} {keep}" ) inserted = True final.append(line) return write_crontab("\n".join(final) + "\n") def remove_schedule(dataset: str) -> bool: cron = read_crontab() lines = cron.splitlines() new_lines = [] in_block = False for line in lines: if CRON_MARKER_BEGIN in line: in_block = True new_lines.append(line) continue if CRON_MARKER_END in line: in_block = False new_lines.append(line) continue if in_block and f" {dataset} " in line + " ": continue new_lines.append(line) return write_crontab("\n".join(new_lines) + "\n") # ----------------------- LXC helpers ----------------------- def list_lxc() -> list[dict]: rc, out, _ = run(["pct", "list"]) if rc != 0: return [] rows = [] for line in out.splitlines()[1:]: # skip header parts = line.split(maxsplit=3) if len(parts) >= 3: rows.append({ "id": parts[0], "status": parts[1], "name": parts[-1] if len(parts) > 2 else "", }) return rows def get_lxc_mountpoints(ctid: str) -> list[dict]: rc, out, _ = run(["pct", "config", ctid]) if rc != 0: return [] mps = [] for line in out.splitlines(): m = re.match(r"^(mp\d+):\s*(.+)$", line) if m: key = m.group(1) val = m.group(2) # val like: /host/path,mp=/ct/path,size=8G parts = val.split(",") host = parts[0] target = "" for p in parts[1:]: if p.startswith("mp="): target = p[3:] mps.append({ "key": key, "host": host, "target": target, "raw": val, }) return mps def add_lxc_mount(ctid: str, host_path: str, ct_path: str) -> tuple[bool, str]: # Find next free mp slot existing = get_lxc_mountpoints(ctid) used = {int(m["key"][2:]) for m in existing} slot = 0 while slot in used: slot += 1 rc, out, err = run([ "pct", "set", ctid, f"-mp{slot}", f"{host_path},mp={ct_path}" ]) return rc == 0, err or out or f"Added mp{slot}" def remove_lxc_mount(ctid: str, key: str) -> tuple[bool, str]: rc, out, err = run(["pct", "set", ctid, f"-delete", key]) return rc == 0, err or out or f"Removed {key}" # ----------------------- Replication helpers ----------------------- REPLICATION_CONFIG = Path("/etc/pve-tui/replication.json") REPLICATION_SCRIPT = Path("/usr/local/bin/pve-tui-replicate.sh") def load_replication_targets() -> list[dict]: if not REPLICATION_CONFIG.exists(): return [] try: return json.loads(REPLICATION_CONFIG.read_text()) except Exception: return [] def save_replication_targets(targets: list[dict]) -> bool: try: REPLICATION_CONFIG.parent.mkdir(parents=True, exist_ok=True) REPLICATION_CONFIG.write_text(json.dumps(targets, indent=2)) return True except Exception: return False def list_dataset_snapshots(dataset: str) -> list[str]: """Snapshot short-names (no `dataset@` prefix) for an exact dataset, oldest first.""" rc, out, _ = run([ "zfs", "list", "-H", "-t", "snapshot", "-o", "name", "-s", "creation", dataset, ]) if rc != 0: return [] names = [] for line in out.splitlines(): if "@" in line: ds, snap = line.split("@", 1) if ds == dataset: names.append(snap) return names def latest_snapshot(dataset: str) -> str | None: snaps = list_dataset_snapshots(dataset) return snaps[-1] if snaps else None def latest_common_snapshot(source: str, target: str) -> str | None: src = list_dataset_snapshots(source) tgt = set(list_dataset_snapshots(target)) for s in reversed(src): if s in tgt: return s return None def dataset_exists(dataset: str) -> bool: rc, _, _ = run(["zfs", "list", "-H", dataset]) return rc == 0 def replicate_dataset(source: str, target: str) -> tuple[bool, str]: """Full or incremental zfs send|recv, automatically picking the right mode.""" src_snaps = list_dataset_snapshots(source) if not src_snaps: return False, f"No snapshots on {source}; create one first." latest_src = src_snaps[-1] if dataset_exists(target): common = latest_common_snapshot(source, target) if not common: return False, ( f"{target} exists but shares no snapshot with {source}. " f"Destroy {target} for a fresh full send, or align bases manually." ) if common == latest_src: return True, f"Up-to-date: {source}@{common}" cmd = ( f"zfs send -R -I {shlex.quote(source + '@' + common)} " f"{shlex.quote(source + '@' + latest_src)} | " f"zfs receive -F {shlex.quote(target)}" ) mode = f"incremental {common} → {latest_src}" else: cmd = ( f"zfs send -R {shlex.quote(source + '@' + latest_src)} | " f"zfs receive {shlex.quote(target)}" ) mode = f"full → {latest_src}" proc = subprocess.run(["bash", "-c", cmd], capture_output=True, text=True) if proc.returncode != 0: return False, (proc.stderr.strip() or proc.stdout.strip() or "replication failed").splitlines()[-1] return True, f"Replicated {source} → {target} ({mode})" def install_replication_script() -> None: """Standalone replication helper for cron / manual use.""" script = """#!/bin/bash # Auto-generated by pve-tui. Incrementally replicates ZFS dataset. # Usage: pve-tui-replicate.sh set -e SOURCE="$1" TARGET="$2" if [ -z "$SOURCE" ] || [ -z "$TARGET" ]; then echo "Usage: $0 " exit 1 fi LATEST=$(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$SOURCE" | tail -n1 | awk -F@ '{print $2}') if [ -z "$LATEST" ]; then echo "No snapshots on $SOURCE"; exit 1 fi if /usr/sbin/zfs list -H "$TARGET" >/dev/null 2>&1; then # Find latest snapshot name that exists on both source and target. COMMON="" for s in $(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$SOURCE" | awk -F@ '{print $2}' | tac); do if /usr/sbin/zfs list -H -t snapshot "${TARGET}@${s}" >/dev/null 2>&1; then COMMON="$s"; break fi done if [ -z "$COMMON" ]; then echo "No common snapshot between $SOURCE and $TARGET; aborting." >&2; exit 2 fi if [ "$COMMON" = "$LATEST" ]; then echo "Up-to-date: $SOURCE@$COMMON"; exit 0 fi /usr/sbin/zfs send -R -I "${SOURCE}@${COMMON}" "${SOURCE}@${LATEST}" | /usr/sbin/zfs receive -F "$TARGET" echo "Incremental ${COMMON} → ${LATEST}" else /usr/sbin/zfs send -R "${SOURCE}@${LATEST}" | /usr/sbin/zfs receive "$TARGET" echo "Full → ${LATEST}" fi """ REPLICATION_SCRIPT.write_text(script) REPLICATION_SCRIPT.chmod(0o755) # ----------------------- Scrub helpers ----------------------- CRON_SCRUB_BEGIN = "# === PVE-TUI AUTO SCRUB BEGIN ===" CRON_SCRUB_END = "# === PVE-TUI AUTO SCRUB END ===" def scrub_status(pool: str) -> dict: rc, out, _ = run(["zpool", "status", pool]) info = {"pool": pool, "state": "?", "scan": "", "errors": ""} if rc != 0: return info for line in out.splitlines(): s = line.strip() if s.startswith("state:"): info["state"] = s.split(":", 1)[1].strip() elif s.startswith("scan:"): info["scan"] = s.split(":", 1)[1].strip() elif s.startswith("errors:"): info["errors"] = s.split(":", 1)[1].strip() return info def scrub_start(pool: str) -> tuple[bool, str]: rc, out, err = run(["zpool", "scrub", pool]) return rc == 0, err or out or f"Started scrub on {pool}" def scrub_stop(pool: str) -> tuple[bool, str]: rc, out, err = run(["zpool", "scrub", "-s", pool]) return rc == 0, err or out or f"Stopped scrub on {pool}" def get_scrub_schedules() -> dict[str, str]: """Return pool -> cron expression for managed scrub entries.""" cron = read_crontab() out: dict[str, str] = {} in_block = False for line in cron.splitlines(): if CRON_SCRUB_BEGIN in line: in_block = True continue if CRON_SCRUB_END in line: in_block = False continue if in_block and line.strip() and not line.startswith("#"): m = re.match( r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+/usr/sbin/zpool\s+scrub\s+(\S+)\s*$", line, ) if m: out[m.group(2)] = m.group(1) return out def set_scrub_schedule(pool: str, cron_expr: str) -> bool: cron = read_crontab() lines = cron.splitlines() new_lines: list[str] = [] in_block = False block_lines: list[str] = [] block_seen = False for line in lines: if CRON_SCRUB_BEGIN in line: in_block = True block_seen = True new_lines.append(line) continue if CRON_SCRUB_END in line: for bl in block_lines: if bl.strip().endswith(f"zpool scrub {pool}"): continue new_lines.append(bl) new_lines.append(line) block_lines = [] in_block = False continue if in_block: block_lines.append(line) else: new_lines.append(line) if not block_seen: new_lines.append(CRON_SCRUB_BEGIN) new_lines.append(CRON_SCRUB_END) final: list[str] = [] inserted = False for line in new_lines: if CRON_SCRUB_END in line and not inserted: final.append(f"{cron_expr} /usr/sbin/zpool scrub {pool}") inserted = True final.append(line) return write_crontab("\n".join(final) + "\n") def remove_scrub_schedule(pool: str) -> bool: cron = read_crontab() new: list[str] = [] in_block = False for line in cron.splitlines(): if CRON_SCRUB_BEGIN in line: in_block = True new.append(line) continue if CRON_SCRUB_END in line: in_block = False new.append(line) continue if in_block and line.strip().endswith(f"zpool scrub {pool}"): continue new.append(line) return write_crontab("\n".join(new) + "\n") # ----------------------- Modals ----------------------- class ConfirmModal(ModalScreen[bool]): def __init__(self, message: str): super().__init__() self.message = message def compose(self) -> ComposeResult: with Container(classes="modal-box"): yield Label(self.message, classes="modal-title") with Horizontal(classes="modal-buttons"): yield Button("Yes", variant="error", id="yes") yield Button("No", variant="primary", id="no") def on_button_pressed(self, event: Button.Pressed) -> None: self.dismiss(event.button.id == "yes") class CreateSnapshotModal(ModalScreen[tuple[str, str] | None]): def __init__(self, datasets: list[str]): super().__init__() self.datasets = datasets def compose(self) -> ComposeResult: with Container(classes="modal-box"): yield Label("Create Snapshot", classes="modal-title") yield Label("Dataset:") yield Select( [(d, d) for d in self.datasets], id="dataset", value=self.datasets[0] if self.datasets else None, ) yield Label("Snapshot name:") default_name = f"manual-{datetime.now().strftime('%Y%m%d-%H%M%S')}" yield Input(value=default_name, id="snap-name") with Horizontal(classes="modal-buttons"): yield Button("Create", variant="success", id="create") yield Button("Cancel", id="cancel") def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "create": dataset = self.query_one("#dataset", Select).value name = self.query_one("#snap-name", Input).value.strip() if dataset and name: self.dismiss((str(dataset), name)) else: self.dismiss(None) else: self.dismiss(None) class ScheduleModal(ModalScreen[tuple[str, str, int] | None]): PRESETS = [ ("Hourly", "0 * * * *"), ("Every 6 hours", "0 */6 * * *"), ("Daily at midnight", "0 0 * * *"), ("Daily at 3am", "0 3 * * *"), ("Weekly (Sunday 2am)", "0 2 * * 0"), ("Custom...", "custom"), ] def __init__(self, datasets: list[str]): super().__init__() self.datasets = datasets def compose(self) -> ComposeResult: with Container(classes="modal-box"): yield Label("Schedule Auto-Snapshot", classes="modal-title") yield Label("Dataset:") yield Select( [(d, d) for d in self.datasets], id="dataset", value=self.datasets[0] if self.datasets else None, ) yield Label("Frequency:") yield Select( [(name, val) for name, val in self.PRESETS], id="preset", value="0 3 * * *", ) yield Label("Custom cron (if selected above):") yield Input(placeholder="e.g. 0 */4 * * *", id="custom-cron") yield Label("Keep last N snapshots:") yield Input(value="7", id="keep") with Horizontal(classes="modal-buttons"): yield Button("Save", variant="success", id="save") yield Button("Cancel", id="cancel") def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "save": dataset = self.query_one("#dataset", Select).value preset = self.query_one("#preset", Select).value custom = self.query_one("#custom-cron", Input).value.strip() keep_str = self.query_one("#keep", Input).value.strip() cron_expr = custom if preset == "custom" else preset try: keep = int(keep_str) except ValueError: keep = 7 if dataset and cron_expr: self.dismiss((str(dataset), str(cron_expr), keep)) else: self.dismiss(None) else: self.dismiss(None) class AddReplicationModal(ModalScreen[tuple[str, str] | None]): def __init__(self, sources: list[str]): super().__init__() self.sources = sources def compose(self) -> ComposeResult: with Container(classes="modal-box"): yield Label("Add Replication Target", classes="modal-title") yield Label("Source dataset:") yield Select( [(d, d) for d in self.sources], id="src", value=self.sources[0] if self.sources else None, ) yield Label("Target dataset (must be on imported pool):") yield Input(placeholder="e.g. backup_ext/RAID1_1TB", id="tgt") with Horizontal(classes="modal-buttons"): yield Button("Add", variant="success", id="add") yield Button("Cancel", id="cancel") def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "add": src = self.query_one("#src", Select).value tgt = self.query_one("#tgt", Input).value.strip() if src and tgt: self.dismiss((str(src), tgt)) return self.dismiss(None) class ScrubScheduleModal(ModalScreen[str | None]): PRESETS = [ ("Monthly (1st @ 3am)", "0 3 1 * *"), ("Bi-weekly (1st & 15th @ 3am)", "0 3 1,15 * *"), ("Weekly (Sunday 3am)", "0 3 * * 0"), ("Custom...", "custom"), ] def compose(self) -> ComposeResult: with Container(classes="modal-box"): yield Label("Schedule Scrub", classes="modal-title") yield Label("Frequency:") yield Select( [(name, val) for name, val in self.PRESETS], id="preset", value="0 3 1 * *", ) yield Label("Custom cron (if selected above):") yield Input(placeholder="e.g. 0 3 1 * *", id="custom-cron") with Horizontal(classes="modal-buttons"): yield Button("Save", variant="success", id="save") yield Button("Cancel", id="cancel") def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "save": preset = self.query_one("#preset", Select).value custom = self.query_one("#custom-cron", Input).value.strip() cron_expr = custom if preset == "custom" else preset if cron_expr: self.dismiss(str(cron_expr)) return self.dismiss(None) class AddMountModal(ModalScreen[tuple[str, str, str] | None]): def __init__(self, ctids: list[str]): super().__init__() self.ctids = ctids def compose(self) -> ComposeResult: with Container(classes="modal-box"): yield Label("Add LXC Mountpoint", classes="modal-title") yield Label("Container:") yield Select( [(c, c) for c in self.ctids], id="ctid", value=self.ctids[0] if self.ctids else None, ) yield Label("Host path (on Proxmox):") yield Input(placeholder="/RAID1_1TB/MyData", id="host") yield Label("Container path (inside LXC):") yield Input(placeholder="/RAID1_1TB/MyData", id="target") with Horizontal(classes="modal-buttons"): yield Button("Add", variant="success", id="add") yield Button("Cancel", id="cancel") def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "add": ctid = self.query_one("#ctid", Select).value host = self.query_one("#host", Input).value.strip() target = self.query_one("#target", Input).value.strip() if ctid and host and target: self.dismiss((str(ctid), host, target)) else: self.dismiss(None) else: self.dismiss(None) # ----------------------- Main App ----------------------- class PVETui(App): CSS = """ Screen { background: $surface; } TabbedContent { height: 1fr; } DataTable { height: 1fr; } .toolbar { height: auto; padding: 1; background: $panel; } .toolbar Button { margin-right: 1; } .status { dock: bottom; height: 1; background: $primary; color: $text; padding: 0 1; } .modal-box { align: center middle; background: $panel; border: thick $primary; padding: 1 2; width: 60; height: auto; } .modal-title { text-align: center; text-style: bold; margin-bottom: 1; } .modal-buttons { align: center middle; height: auto; margin-top: 1; } .modal-buttons Button { margin: 0 1; } Label { margin-top: 1; } """ BINDINGS = [ Binding("q", "quit", "Quit"), Binding("r", "refresh", "Refresh"), Binding("n", "new_snapshot", "New Snapshot"), Binding("d", "delete_snapshot", "Delete"), Binding("s", "schedule", "Schedule"), ] TITLE = "PVE Storage Manager" SUB_TITLE = "ZFS Snapshots + LXC Mountpoints" def compose(self) -> ComposeResult: yield Header() with TabbedContent(initial="snapshots"): with TabPane("Snapshots", id="snapshots"): with Horizontal(classes="toolbar"): yield Button("New (n)", id="btn-new", variant="success") yield Button("Delete (d)", id="btn-del", variant="error") yield Button("Rollback", id="btn-rollback", variant="warning") yield Button("Refresh (r)", id="btn-refresh") yield DataTable(id="snap-table", cursor_type="row") with TabPane("Schedules", id="schedules"): with Horizontal(classes="toolbar"): yield Button("Add Schedule (s)", id="btn-sched-add", variant="success") yield Button("Snapshot Now", id="btn-sched-now", variant="primary") yield Button("Remove", id="btn-sched-del", variant="error") yield Button("Refresh", id="btn-sched-refresh") yield DataTable(id="sched-table", cursor_type="row") with TabPane("Datasets", id="datasets"): with Horizontal(classes="toolbar"): yield Button("Refresh", id="btn-ds-refresh") yield DataTable(id="ds-table", cursor_type="row") with TabPane("LXC Mounts", id="mounts"): with Horizontal(classes="toolbar"): yield Button("Add Mount", id="btn-mp-add", variant="success") yield Button("Remove Mount", id="btn-mp-del", variant="error") yield Button("Refresh", id="btn-mp-refresh") yield DataTable(id="mp-table", cursor_type="row") with TabPane("Replication", id="replication"): with Horizontal(classes="toolbar"): yield Button("Add Target", id="btn-rep-add", variant="success") yield Button("Replicate Now", id="btn-rep-run", variant="primary") yield Button("Remove", id="btn-rep-del", variant="error") yield Button("Refresh", id="btn-rep-refresh") yield DataTable(id="rep-table", cursor_type="row") with TabPane("Scrub", id="scrub"): with Horizontal(classes="toolbar"): yield Button("Start Scrub", id="btn-scrub-start", variant="success") yield Button("Stop Scrub", id="btn-scrub-stop", variant="warning") yield Button("Schedule", id="btn-scrub-sched") yield Button("Unschedule", id="btn-scrub-unsched", variant="error") yield Button("Refresh", id="btn-scrub-refresh") yield DataTable(id="scrub-table", cursor_type="row") yield Static("Ready", id="status", classes="status") yield Footer() def on_mount(self) -> None: # Snapshot table t = self.query_one("#snap-table", DataTable) t.add_columns("Snapshot", "Used", "Refer", "Created") # Schedule table t = self.query_one("#sched-table", DataTable) t.add_columns("Dataset", "Cron", "Keep") # Dataset table t = self.query_one("#ds-table", DataTable) t.add_columns("Name", "Used", "Available", "Mountpoint") # Mountpoint table t = self.query_one("#mp-table", DataTable) t.add_columns("CTID", "Slot", "Host Path", "Container Path") # Replication table t = self.query_one("#rep-table", DataTable) t.add_columns("Source", "Target", "Common", "Latest Src", "Status") # Scrub table t = self.query_one("#scrub-table", DataTable) t.add_columns("Pool", "State", "Last/Current Scan", "Errors", "Schedule") self.refresh_all() def set_status(self, msg: str) -> None: self.query_one("#status", Static).update(msg) def refresh_all(self) -> None: self.refresh_snapshots() self.refresh_schedules() self.refresh_datasets() self.refresh_mounts() self.refresh_replication() self.refresh_scrub() self.set_status(f"Refreshed at {datetime.now().strftime('%H:%M:%S')}") def refresh_snapshots(self) -> None: t = self.query_one("#snap-table", DataTable) t.clear() for snap in list_snapshots(): t.add_row(snap["name"], snap["used"], snap["refer"], snap["creation"]) def refresh_schedules(self) -> None: t = self.query_one("#sched-table", DataTable) t.clear() for s in get_schedules(): t.add_row(s["dataset"], s["cron"], s["keep"]) def refresh_datasets(self) -> None: t = self.query_one("#ds-table", DataTable) t.clear() for d in list_datasets(): t.add_row(d["name"], d["used"], d["avail"], d["mountpoint"]) def refresh_mounts(self) -> None: t = self.query_one("#mp-table", DataTable) t.clear() for lxc in list_lxc(): for mp in get_lxc_mountpoints(lxc["id"]): t.add_row(lxc["id"], mp["key"], mp["host"], mp["target"]) def refresh_replication(self) -> None: t = self.query_one("#rep-table", DataTable) t.clear() for tgt in load_replication_targets(): src = tgt["source"] dst = tgt["target"] common = latest_common_snapshot(src, dst) or "-" latest = latest_snapshot(src) or "-" if not dataset_exists(dst): status = "target missing (full send needed)" elif common == latest: status = "up-to-date" elif common == "-": status = "no common snap (mismatch)" else: status = "incremental pending" t.add_row(src, dst, common, latest, status) def refresh_scrub(self) -> None: t = self.query_one("#scrub-table", DataTable) t.clear() sched = get_scrub_schedules() for pool in list_pools(): info = scrub_status(pool) t.add_row( pool, info["state"], info["scan"][:60] if info["scan"] else "-", info["errors"] or "-", sched.get(pool, "-"), ) # ------------- Actions ------------- def action_refresh(self) -> None: self.refresh_all() def action_new_snapshot(self) -> None: self._open_create_snapshot() def action_delete_snapshot(self) -> None: self._delete_selected_snapshot() def action_schedule(self) -> None: self._open_schedule() def _get_datasets(self) -> list[str]: return [d["name"] for d in list_datasets()] def _open_create_snapshot(self) -> None: datasets = self._get_datasets() if not datasets: self.set_status("No datasets found") return def cb(result): if result: dataset, name = result ok, msg = create_snapshot(dataset, name) self.set_status(msg) self.refresh_snapshots() self.push_screen(CreateSnapshotModal(datasets), cb) def _delete_selected_snapshot(self) -> None: t = self.query_one("#snap-table", DataTable) if t.cursor_row is None: return try: row = t.get_row_at(t.cursor_row) snap_name = row[0] except Exception: return def cb(confirmed): if confirmed: ok, msg = destroy_snapshot(snap_name) self.set_status(msg) self.refresh_snapshots() self.push_screen( ConfirmModal(f"Destroy snapshot:\n{snap_name}?"), cb ) def _rollback_selected_snapshot(self) -> None: t = self.query_one("#snap-table", DataTable) if t.cursor_row is None: return try: row = t.get_row_at(t.cursor_row) snap_name = row[0] except Exception: return def cb(confirmed): if confirmed: ok, msg = rollback_snapshot(snap_name) self.set_status(msg) self.refresh_all() self.push_screen( ConfirmModal( f"ROLLBACK to {snap_name}?\n" "This will DESTROY all newer snapshots\n" "and changes since then!" ), cb ) def _open_schedule(self) -> None: datasets = self._get_datasets() if not datasets: self.set_status("No datasets found") return def cb(result): if result: dataset, cron_expr, keep = result ok = set_schedule(dataset, cron_expr, keep) self.set_status( f"Schedule saved for {dataset}" if ok else "Failed to save schedule" ) self.refresh_schedules() self.push_screen(ScheduleModal(datasets), cb) def _snapshot_now_selected(self) -> None: t = self.query_one("#sched-table", DataTable) if t.cursor_row is None: return try: row = t.get_row_at(t.cursor_row) dataset = str(row[0]) keep = int(str(row[2])) except Exception: self.set_status("Select a schedule row first") return ok, msg = snapshot_now(dataset, keep) self.set_status(msg) self.refresh_snapshots() def _remove_selected_schedule(self) -> None: t = self.query_one("#sched-table", DataTable) if t.cursor_row is None: return try: row = t.get_row_at(t.cursor_row) dataset = row[0] except Exception: return def cb(confirmed): if confirmed: ok = remove_schedule(dataset) self.set_status( f"Removed schedule for {dataset}" if ok else "Failed" ) self.refresh_schedules() self.push_screen( ConfirmModal(f"Remove schedule for:\n{dataset}?"), cb ) def _open_add_mount(self) -> None: ctids = [c["id"] for c in list_lxc()] if not ctids: self.set_status("No LXC containers found") return def cb(result): if result: ctid, host, target = result ok, msg = add_lxc_mount(ctid, host, target) self.set_status(msg) self.refresh_mounts() self.push_screen(AddMountModal(ctids), cb) def _open_add_replication(self) -> None: sources = self._get_datasets() if not sources: self.set_status("No datasets to replicate") return def cb(result): if result: src, tgt = result targets = load_replication_targets() for t in targets: if t["source"] == src and t["target"] == tgt: self.set_status("Target already configured") return targets.append({"source": src, "target": tgt}) if save_replication_targets(targets): install_replication_script() self.set_status(f"Added: {src} → {tgt}") self.refresh_replication() else: self.set_status("Failed to save replication config") self.push_screen(AddReplicationModal(sources), cb) def _remove_selected_replication(self) -> None: t = self.query_one("#rep-table", DataTable) if t.cursor_row is None: return try: row = t.get_row_at(t.cursor_row) src, tgt = str(row[0]), str(row[1]) except Exception: return def cb(confirmed): if confirmed: targets = [ x for x in load_replication_targets() if not (x["source"] == src and x["target"] == tgt) ] if save_replication_targets(targets): self.set_status(f"Removed: {src} → {tgt}") self.refresh_replication() self.push_screen( ConfirmModal( f"Remove replication target?\n{src} → {tgt}\n" "(This does not destroy the target dataset.)" ), cb, ) @work(thread=True, exclusive=True, group="replicate") def _do_replicate(self, source: str, target: str) -> None: ok, msg = replicate_dataset(source, target) self.app.call_from_thread(self.set_status, msg) self.app.call_from_thread(self.refresh_replication) self.app.call_from_thread(self.refresh_snapshots) def _replicate_selected(self) -> None: t = self.query_one("#rep-table", DataTable) if t.cursor_row is None: self.set_status("Select a replication row first") return try: row = t.get_row_at(t.cursor_row) src, tgt = str(row[0]), str(row[1]) except Exception: return self.set_status(f"Replicating {src} → {tgt} ... (this may take a while)") self._do_replicate(src, tgt) def _scrub_action(self, action: str) -> None: t = self.query_one("#scrub-table", DataTable) if t.cursor_row is None: self.set_status("Select a pool row first") return try: row = t.get_row_at(t.cursor_row) pool = str(row[0]) except Exception: return if action == "start": ok, msg = scrub_start(pool) else: ok, msg = scrub_stop(pool) self.set_status(msg) self.refresh_scrub() def _open_scrub_schedule(self) -> None: t = self.query_one("#scrub-table", DataTable) if t.cursor_row is None: self.set_status("Select a pool row first") return try: row = t.get_row_at(t.cursor_row) pool = str(row[0]) except Exception: return def cb(cron_expr): if cron_expr: ok = set_scrub_schedule(pool, cron_expr) self.set_status( f"Scrub scheduled for {pool}" if ok else "Failed to save" ) self.refresh_scrub() self.push_screen(ScrubScheduleModal(), cb) def _unschedule_scrub(self) -> None: t = self.query_one("#scrub-table", DataTable) if t.cursor_row is None: return try: row = t.get_row_at(t.cursor_row) pool = str(row[0]) except Exception: return def cb(confirmed): if confirmed: ok = remove_scrub_schedule(pool) self.set_status( f"Unscheduled scrub for {pool}" if ok else "Failed" ) self.refresh_scrub() self.push_screen(ConfirmModal(f"Remove scrub schedule for {pool}?"), cb) def _remove_selected_mount(self) -> None: t = self.query_one("#mp-table", DataTable) if t.cursor_row is None: return try: row = t.get_row_at(t.cursor_row) ctid = row[0] slot = row[1] except Exception: return def cb(confirmed): if confirmed: ok, msg = remove_lxc_mount(ctid, slot) self.set_status(msg) self.refresh_mounts() self.push_screen( ConfirmModal(f"Remove {slot} from LXC {ctid}?\n(CT must be stopped for some changes)"), cb ) # ------------- Button handlers ------------- def on_button_pressed(self, event: Button.Pressed) -> None: bid = event.button.id if bid == "btn-new": self._open_create_snapshot() elif bid == "btn-del": self._delete_selected_snapshot() elif bid == "btn-rollback": self._rollback_selected_snapshot() elif bid == "btn-refresh": self.refresh_snapshots() elif bid == "btn-sched-add": self._open_schedule() elif bid == "btn-sched-now": self._snapshot_now_selected() elif bid == "btn-sched-del": self._remove_selected_schedule() elif bid == "btn-sched-refresh": self.refresh_schedules() elif bid == "btn-ds-refresh": self.refresh_datasets() elif bid == "btn-mp-add": self._open_add_mount() elif bid == "btn-mp-del": self._remove_selected_mount() elif bid == "btn-mp-refresh": self.refresh_mounts() elif bid == "btn-rep-add": self._open_add_replication() elif bid == "btn-rep-run": self._replicate_selected() elif bid == "btn-rep-del": self._remove_selected_replication() elif bid == "btn-rep-refresh": self.refresh_replication() elif bid == "btn-scrub-start": self._scrub_action("start") elif bid == "btn-scrub-stop": self._scrub_action("stop") elif bid == "btn-scrub-sched": self._open_scrub_schedule() elif bid == "btn-scrub-unsched": self._unschedule_scrub() elif bid == "btn-scrub-refresh": self.refresh_scrub() if __name__ == "__main__": PVETui().run()