root 5eb7572fa1 Move cron management to /etc/cron.d/pve-tui
Stop mutating root's crontab. All snapshot and scrub schedules now live
in a single TUI-owned file at /etc/cron.d/pve-tui, with a one-shot
migrate_legacy_crontab() that extracts pre-existing pve-tui blocks from
the crontab on startup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 23:09:05 +08:00

1347 lines
44 KiB
Python

#!/usr/bin/env python3
"""
PVE Storage Manager TUI
Manages ZFS snapshots, scheduling, and LXC container mountpoints
on RAID1_1TB and Samsung_860_EVO_256GB pools.
Run on Proxmox host as root.
"""
import subprocess
import re
import json
import shlex
from datetime import datetime
from pathlib import Path
from textual import work
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import (
Header,
Footer,
DataTable,
Button,
Static,
Input,
Label,
Select,
TabbedContent,
TabPane,
)
from textual.binding import Binding
# ----------------------- ZFS helpers -----------------------
def run(cmd: list[str]) -> tuple[int, str, str]:
"""Run a shell command and return (rc, stdout, stderr)."""
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return r.returncode, r.stdout.strip(), r.stderr.strip()
except Exception as e:
return 1, "", str(e)
def list_pools() -> list[str]:
rc, out, _ = run(["zpool", "list", "-H", "-o", "name"])
if rc != 0:
return []
return out.splitlines()
def list_datasets(pool: str | None = None) -> list[dict]:
args = ["zfs", "list", "-H", "-o", "name,used,avail,mountpoint"]
if pool:
args += ["-r", pool]
rc, out, _ = run(args)
if rc != 0:
return []
rows = []
for line in out.splitlines():
parts = line.split("\t")
if len(parts) >= 4:
rows.append({
"name": parts[0],
"used": parts[1],
"avail": parts[2],
"mountpoint": parts[3],
})
return rows
def list_snapshots(pool: str | None = None) -> list[dict]:
args = ["zfs", "list", "-t", "snapshot", "-H", "-o",
"name,used,refer,creation"]
if pool:
args += ["-r", pool]
rc, out, _ = run(args)
if rc != 0:
return []
rows = []
for line in out.splitlines():
parts = line.split("\t")
if len(parts) >= 4:
rows.append({
"name": parts[0],
"used": parts[1],
"refer": parts[2],
"creation": parts[3],
})
return rows
def create_snapshot(dataset: str, name: str) -> tuple[bool, str]:
full = f"{dataset}@{name}"
rc, out, err = run(["zfs", "snapshot", full])
return rc == 0, err or out or f"Created {full}"
def destroy_snapshot(snap: str) -> tuple[bool, str]:
rc, out, err = run(["zfs", "destroy", snap])
return rc == 0, err or out or f"Destroyed {snap}"
def rollback_snapshot(snap: str) -> tuple[bool, str]:
rc, out, err = run(["zfs", "rollback", "-r", snap])
return rc == 0, err or out or f"Rolled back to {snap}"
def snapshot_now(dataset: str, keep: int = 7) -> tuple[bool, str]:
"""Create an auto-* snapshot immediately and prune older ones beyond keep."""
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
full = f"{dataset}@auto-{ts}"
rc, out, err = run(["zfs", "snapshot", full])
if rc != 0:
return False, err or out or f"snapshot failed for {full}"
rc, out, _ = run([
"zfs", "list", "-H", "-t", "snapshot", "-o", "name",
"-s", "creation", "-r", dataset,
])
pruned = 0
if rc == 0:
auto_snaps = [
s for s in out.splitlines()
if s.startswith(f"{dataset}@auto-")
]
excess = len(auto_snaps) - keep
if excess > 0:
for old in auto_snaps[:excess]:
drc, _, _ = run(["zfs", "destroy", old])
if drc == 0:
pruned += 1
suffix = f" (pruned {pruned})" if pruned else ""
return True, f"Created {full}{suffix}"
# ----------------------- Cron helpers -----------------------
# All TUI-managed cron entries live in /etc/cron.d/pve-tui (single owner file).
# Root's crontab is never modified by this app, except by the one-shot
# migrate_legacy_crontab() which moves any pre-existing pve-tui entries into
# the new file.
CRON_FILE = Path("/etc/cron.d/pve-tui")
SNAPSHOT_SCRIPT = Path("/usr/local/bin/pve-tui-snapshot.sh")
CRON_HEADER = """# Auto-generated by pve-tui — do not edit manually.
# Manage entries via the TUI Schedules / Scrub tabs.
SHELL=/bin/bash
PATH=/usr/sbin:/usr/bin:/sbin:/bin
MAILTO=\"\"
"""
def _read_cron_lines() -> list[str]:
if not CRON_FILE.exists():
return []
try:
return CRON_FILE.read_text().splitlines()
except Exception:
return []
def _parse_cron_file() -> tuple[list[dict], list[dict]]:
"""Parse CRON_FILE → (snapshot_entries, scrub_entries)."""
snapshots: list[dict] = []
scrubs: list[dict] = []
snap_re = re.compile(
r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+\S+\s+/usr/local/bin/pve-tui-snapshot\.sh\s+(\S+)\s+(\d+)"
)
scrub_re = re.compile(
r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+\S+\s+/usr/sbin/zpool\s+scrub\s+(\S+)\s*$"
)
for raw in _read_cron_lines():
line = raw.strip()
if not line or line.startswith("#"):
continue
first = line.split()[0]
if "=" in first:
continue
m = snap_re.match(line)
if m:
snapshots.append(
{"cron": m.group(1), "dataset": m.group(2), "keep": m.group(3)}
)
continue
m = scrub_re.match(line)
if m:
scrubs.append({"cron": m.group(1), "pool": m.group(2)})
return snapshots, scrubs
def _write_cron_file(snapshots: list[dict], scrubs: list[dict]) -> bool:
parts = [CRON_HEADER.rstrip(), ""]
if snapshots:
parts.append("# Snapshot schedules (auto-prune keeps last N)")
for e in snapshots:
parts.append(
f"{e['cron']} root {SNAPSHOT_SCRIPT} {e['dataset']} {e['keep']}"
)
parts.append("")
if scrubs:
parts.append("# Scrub schedules")
for e in scrubs:
parts.append(f"{e['cron']} root /usr/sbin/zpool scrub {e['pool']}")
parts.append("")
try:
CRON_FILE.parent.mkdir(parents=True, exist_ok=True)
CRON_FILE.write_text("\n".join(parts) + "\n")
CRON_FILE.chmod(0o644)
return True
except Exception:
return False
def install_snapshot_script() -> None:
script = """#!/bin/bash
# Auto-generated by pve-tui. Creates snapshot and prunes old ones.
DATASET="$1"
KEEP="${2:-7}"
PREFIX="auto"
if [ -z "$DATASET" ]; then
echo "Usage: $0 <dataset> [keep_count]"
exit 1
fi
TS=$(date +%Y%m%d-%H%M%S)
/usr/sbin/zfs snapshot "${DATASET}@${PREFIX}-${TS}"
SNAPS=$(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$DATASET" | grep "@${PREFIX}-")
COUNT=$(echo "$SNAPS" | wc -l)
if [ "$COUNT" -gt "$KEEP" ]; then
DEL=$((COUNT - KEEP))
echo "$SNAPS" | head -n "$DEL" | while read S; do
/usr/sbin/zfs destroy "$S"
done
fi
"""
SNAPSHOT_SCRIPT.write_text(script)
SNAPSHOT_SCRIPT.chmod(0o755)
def get_schedules() -> list[dict]:
return _parse_cron_file()[0]
def set_schedule(dataset: str, cron_expr: str, keep: int) -> bool:
install_snapshot_script()
snaps, scrubs = _parse_cron_file()
snaps = [s for s in snaps if s["dataset"] != dataset]
snaps.append({"cron": cron_expr, "dataset": dataset, "keep": str(keep)})
return _write_cron_file(snaps, scrubs)
def remove_schedule(dataset: str) -> bool:
snaps, scrubs = _parse_cron_file()
snaps = [s for s in snaps if s["dataset"] != dataset]
return _write_cron_file(snaps, scrubs)
def migrate_legacy_crontab() -> bool:
"""One-time: extract pve-tui blocks from root's crontab into CRON_FILE
and strip them from the crontab. Returns True if a migration ran."""
if CRON_FILE.exists():
return False
rc, out, _ = run(["crontab", "-l"])
if rc != 0 or "PVE-TUI" not in out:
return False
snaps: list[dict] = []
scrubs: list[dict] = []
new_crontab: list[str] = []
in_snap = False
in_scrub = False
snap_re = re.compile(
r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+.*pve-tui-snapshot\.sh\s+(\S+)\s+(\d+)"
)
scrub_re = re.compile(
r"^(\S+\s+\S+\s+\S+\s+\S+\s+\S+)\s+/usr/sbin/zpool\s+scrub\s+(\S+)"
)
for line in out.splitlines():
if "PVE-TUI AUTO SNAPSHOT BEGIN" in line:
in_snap = True
continue
if "PVE-TUI AUTO SNAPSHOT END" in line:
in_snap = False
continue
if "PVE-TUI AUTO SCRUB BEGIN" in line:
in_scrub = True
continue
if "PVE-TUI AUTO SCRUB END" in line:
in_scrub = False
continue
if in_snap:
m = snap_re.match(line)
if m:
snaps.append(
{"cron": m.group(1), "dataset": m.group(2), "keep": m.group(3)}
)
continue
if in_scrub:
m = scrub_re.match(line)
if m:
scrubs.append({"cron": m.group(1), "pool": m.group(2)})
continue
new_crontab.append(line)
install_snapshot_script()
_write_cron_file(snaps, scrubs)
try:
p = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, text=True)
p.communicate("\n".join(new_crontab) + "\n")
except Exception:
pass
return True
# ----------------------- LXC helpers -----------------------
def list_lxc() -> list[dict]:
rc, out, _ = run(["pct", "list"])
if rc != 0:
return []
rows = []
for line in out.splitlines()[1:]: # skip header
parts = line.split(maxsplit=3)
if len(parts) >= 3:
rows.append({
"id": parts[0],
"status": parts[1],
"name": parts[-1] if len(parts) > 2 else "",
})
return rows
def get_lxc_mountpoints(ctid: str) -> list[dict]:
rc, out, _ = run(["pct", "config", ctid])
if rc != 0:
return []
mps = []
for line in out.splitlines():
m = re.match(r"^(mp\d+):\s*(.+)$", line)
if m:
key = m.group(1)
val = m.group(2)
# val like: /host/path,mp=/ct/path,size=8G
parts = val.split(",")
host = parts[0]
target = ""
for p in parts[1:]:
if p.startswith("mp="):
target = p[3:]
mps.append({
"key": key,
"host": host,
"target": target,
"raw": val,
})
return mps
def add_lxc_mount(ctid: str, host_path: str, ct_path: str) -> tuple[bool, str]:
# Find next free mp slot
existing = get_lxc_mountpoints(ctid)
used = {int(m["key"][2:]) for m in existing}
slot = 0
while slot in used:
slot += 1
rc, out, err = run([
"pct", "set", ctid, f"-mp{slot}",
f"{host_path},mp={ct_path}"
])
return rc == 0, err or out or f"Added mp{slot}"
def remove_lxc_mount(ctid: str, key: str) -> tuple[bool, str]:
rc, out, err = run(["pct", "set", ctid, f"-delete", key])
return rc == 0, err or out or f"Removed {key}"
# ----------------------- Replication helpers -----------------------
REPLICATION_CONFIG = Path("/etc/pve-tui/replication.json")
REPLICATION_SCRIPT = Path("/usr/local/bin/pve-tui-replicate.sh")
def load_replication_targets() -> list[dict]:
if not REPLICATION_CONFIG.exists():
return []
try:
return json.loads(REPLICATION_CONFIG.read_text())
except Exception:
return []
def save_replication_targets(targets: list[dict]) -> bool:
try:
REPLICATION_CONFIG.parent.mkdir(parents=True, exist_ok=True)
REPLICATION_CONFIG.write_text(json.dumps(targets, indent=2))
return True
except Exception:
return False
def list_dataset_snapshots(dataset: str) -> list[str]:
"""Snapshot short-names (no `dataset@` prefix) for an exact dataset, oldest first."""
rc, out, _ = run([
"zfs", "list", "-H", "-t", "snapshot", "-o", "name",
"-s", "creation", dataset,
])
if rc != 0:
return []
names = []
for line in out.splitlines():
if "@" in line:
ds, snap = line.split("@", 1)
if ds == dataset:
names.append(snap)
return names
def latest_snapshot(dataset: str) -> str | None:
snaps = list_dataset_snapshots(dataset)
return snaps[-1] if snaps else None
def latest_common_snapshot(source: str, target: str) -> str | None:
src = list_dataset_snapshots(source)
tgt = set(list_dataset_snapshots(target))
for s in reversed(src):
if s in tgt:
return s
return None
def dataset_exists(dataset: str) -> bool:
rc, _, _ = run(["zfs", "list", "-H", dataset])
return rc == 0
def replicate_dataset(source: str, target: str) -> tuple[bool, str]:
"""Full or incremental zfs send|recv, automatically picking the right mode."""
src_snaps = list_dataset_snapshots(source)
if not src_snaps:
return False, f"No snapshots on {source}; create one first."
latest_src = src_snaps[-1]
if dataset_exists(target):
common = latest_common_snapshot(source, target)
if not common:
return False, (
f"{target} exists but shares no snapshot with {source}. "
f"Destroy {target} for a fresh full send, or align bases manually."
)
if common == latest_src:
return True, f"Up-to-date: {source}@{common}"
cmd = (
f"zfs send -R -I {shlex.quote(source + '@' + common)} "
f"{shlex.quote(source + '@' + latest_src)} | "
f"zfs receive -F {shlex.quote(target)}"
)
mode = f"incremental {common}{latest_src}"
else:
cmd = (
f"zfs send -R {shlex.quote(source + '@' + latest_src)} | "
f"zfs receive {shlex.quote(target)}"
)
mode = f"full → {latest_src}"
proc = subprocess.run(["bash", "-c", cmd], capture_output=True, text=True)
if proc.returncode != 0:
return False, (proc.stderr.strip() or proc.stdout.strip()
or "replication failed").splitlines()[-1]
return True, f"Replicated {source}{target} ({mode})"
def install_replication_script() -> None:
"""Standalone replication helper for cron / manual use."""
script = """#!/bin/bash
# Auto-generated by pve-tui. Incrementally replicates ZFS dataset.
# Usage: pve-tui-replicate.sh <source-dataset> <target-dataset>
set -e
SOURCE="$1"
TARGET="$2"
if [ -z "$SOURCE" ] || [ -z "$TARGET" ]; then
echo "Usage: $0 <source-dataset> <target-dataset>"
exit 1
fi
LATEST=$(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$SOURCE" | tail -n1 | awk -F@ '{print $2}')
if [ -z "$LATEST" ]; then
echo "No snapshots on $SOURCE"; exit 1
fi
if /usr/sbin/zfs list -H "$TARGET" >/dev/null 2>&1; then
# Find latest snapshot name that exists on both source and target.
COMMON=""
for s in $(/usr/sbin/zfs list -H -t snapshot -o name -s creation "$SOURCE" | awk -F@ '{print $2}' | tac); do
if /usr/sbin/zfs list -H -t snapshot "${TARGET}@${s}" >/dev/null 2>&1; then
COMMON="$s"; break
fi
done
if [ -z "$COMMON" ]; then
echo "No common snapshot between $SOURCE and $TARGET; aborting." >&2; exit 2
fi
if [ "$COMMON" = "$LATEST" ]; then
echo "Up-to-date: $SOURCE@$COMMON"; exit 0
fi
/usr/sbin/zfs send -R -I "${SOURCE}@${COMMON}" "${SOURCE}@${LATEST}" | /usr/sbin/zfs receive -F "$TARGET"
echo "Incremental ${COMMON} → ${LATEST}"
else
/usr/sbin/zfs send -R "${SOURCE}@${LATEST}" | /usr/sbin/zfs receive "$TARGET"
echo "Full → ${LATEST}"
fi
"""
REPLICATION_SCRIPT.write_text(script)
REPLICATION_SCRIPT.chmod(0o755)
# ----------------------- Scrub helpers -----------------------
def scrub_status(pool: str) -> dict:
rc, out, _ = run(["zpool", "status", pool])
info = {"pool": pool, "state": "?", "scan": "", "errors": ""}
if rc != 0:
return info
for line in out.splitlines():
s = line.strip()
if s.startswith("state:"):
info["state"] = s.split(":", 1)[1].strip()
elif s.startswith("scan:"):
info["scan"] = s.split(":", 1)[1].strip()
elif s.startswith("errors:"):
info["errors"] = s.split(":", 1)[1].strip()
return info
def scrub_start(pool: str) -> tuple[bool, str]:
rc, out, err = run(["zpool", "scrub", pool])
return rc == 0, err or out or f"Started scrub on {pool}"
def scrub_stop(pool: str) -> tuple[bool, str]:
rc, out, err = run(["zpool", "scrub", "-s", pool])
return rc == 0, err or out or f"Stopped scrub on {pool}"
def get_scrub_schedules() -> dict[str, str]:
"""pool -> cron expression for TUI-managed scrub entries."""
return {s["pool"]: s["cron"] for s in _parse_cron_file()[1]}
def set_scrub_schedule(pool: str, cron_expr: str) -> bool:
snaps, scrubs = _parse_cron_file()
scrubs = [s for s in scrubs if s["pool"] != pool]
scrubs.append({"cron": cron_expr, "pool": pool})
return _write_cron_file(snaps, scrubs)
def remove_scrub_schedule(pool: str) -> bool:
snaps, scrubs = _parse_cron_file()
scrubs = [s for s in scrubs if s["pool"] != pool]
return _write_cron_file(snaps, scrubs)
# ----------------------- Modals -----------------------
class ConfirmModal(ModalScreen[bool]):
def __init__(self, message: str):
super().__init__()
self.message = message
def compose(self) -> ComposeResult:
with Container(classes="modal-box"):
yield Label(self.message, classes="modal-title")
with Horizontal(classes="modal-buttons"):
yield Button("Yes", variant="error", id="yes")
yield Button("No", variant="primary", id="no")
def on_button_pressed(self, event: Button.Pressed) -> None:
self.dismiss(event.button.id == "yes")
class CreateSnapshotModal(ModalScreen[tuple[str, str] | None]):
def __init__(self, datasets: list[str]):
super().__init__()
self.datasets = datasets
def compose(self) -> ComposeResult:
with Container(classes="modal-box"):
yield Label("Create Snapshot", classes="modal-title")
yield Label("Dataset:")
yield Select(
[(d, d) for d in self.datasets],
id="dataset",
value=self.datasets[0] if self.datasets else None,
)
yield Label("Snapshot name:")
default_name = f"manual-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
yield Input(value=default_name, id="snap-name")
with Horizontal(classes="modal-buttons"):
yield Button("Create", variant="success", id="create")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "create":
dataset = self.query_one("#dataset", Select).value
name = self.query_one("#snap-name", Input).value.strip()
if dataset and name:
self.dismiss((str(dataset), name))
else:
self.dismiss(None)
else:
self.dismiss(None)
class ScheduleModal(ModalScreen[tuple[str, str, int] | None]):
PRESETS = [
("Hourly", "0 * * * *"),
("Every 6 hours", "0 */6 * * *"),
("Daily at midnight", "0 0 * * *"),
("Daily at 3am", "0 3 * * *"),
("Weekly (Sunday 2am)", "0 2 * * 0"),
("Custom...", "custom"),
]
def __init__(self, datasets: list[str]):
super().__init__()
self.datasets = datasets
def compose(self) -> ComposeResult:
with Container(classes="modal-box"):
yield Label("Schedule Auto-Snapshot", classes="modal-title")
yield Label("Dataset:")
yield Select(
[(d, d) for d in self.datasets],
id="dataset",
value=self.datasets[0] if self.datasets else None,
)
yield Label("Frequency:")
yield Select(
[(name, val) for name, val in self.PRESETS],
id="preset",
value="0 3 * * *",
)
yield Label("Custom cron (if selected above):")
yield Input(placeholder="e.g. 0 */4 * * *", id="custom-cron")
yield Label("Keep last N snapshots:")
yield Input(value="7", id="keep")
with Horizontal(classes="modal-buttons"):
yield Button("Save", variant="success", id="save")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "save":
dataset = self.query_one("#dataset", Select).value
preset = self.query_one("#preset", Select).value
custom = self.query_one("#custom-cron", Input).value.strip()
keep_str = self.query_one("#keep", Input).value.strip()
cron_expr = custom if preset == "custom" else preset
try:
keep = int(keep_str)
except ValueError:
keep = 7
if dataset and cron_expr:
self.dismiss((str(dataset), str(cron_expr), keep))
else:
self.dismiss(None)
else:
self.dismiss(None)
class AddReplicationModal(ModalScreen[tuple[str, str] | None]):
def __init__(self, sources: list[str]):
super().__init__()
self.sources = sources
def compose(self) -> ComposeResult:
with Container(classes="modal-box"):
yield Label("Add Replication Target", classes="modal-title")
yield Label("Source dataset:")
yield Select(
[(d, d) for d in self.sources],
id="src",
value=self.sources[0] if self.sources else None,
)
yield Label("Target dataset (must be on imported pool):")
yield Input(placeholder="e.g. backup_ext/RAID1_1TB", id="tgt")
with Horizontal(classes="modal-buttons"):
yield Button("Add", variant="success", id="add")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "add":
src = self.query_one("#src", Select).value
tgt = self.query_one("#tgt", Input).value.strip()
if src and tgt:
self.dismiss((str(src), tgt))
return
self.dismiss(None)
class ScrubScheduleModal(ModalScreen[str | None]):
PRESETS = [
("Monthly (1st @ 3am)", "0 3 1 * *"),
("Bi-weekly (1st & 15th @ 3am)", "0 3 1,15 * *"),
("Weekly (Sunday 3am)", "0 3 * * 0"),
("Custom...", "custom"),
]
def compose(self) -> ComposeResult:
with Container(classes="modal-box"):
yield Label("Schedule Scrub", classes="modal-title")
yield Label("Frequency:")
yield Select(
[(name, val) for name, val in self.PRESETS],
id="preset",
value="0 3 1 * *",
)
yield Label("Custom cron (if selected above):")
yield Input(placeholder="e.g. 0 3 1 * *", id="custom-cron")
with Horizontal(classes="modal-buttons"):
yield Button("Save", variant="success", id="save")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "save":
preset = self.query_one("#preset", Select).value
custom = self.query_one("#custom-cron", Input).value.strip()
cron_expr = custom if preset == "custom" else preset
if cron_expr:
self.dismiss(str(cron_expr))
return
self.dismiss(None)
class AddMountModal(ModalScreen[tuple[str, str, str] | None]):
def __init__(self, ctids: list[str]):
super().__init__()
self.ctids = ctids
def compose(self) -> ComposeResult:
with Container(classes="modal-box"):
yield Label("Add LXC Mountpoint", classes="modal-title")
yield Label("Container:")
yield Select(
[(c, c) for c in self.ctids],
id="ctid",
value=self.ctids[0] if self.ctids else None,
)
yield Label("Host path (on Proxmox):")
yield Input(placeholder="/RAID1_1TB/MyData", id="host")
yield Label("Container path (inside LXC):")
yield Input(placeholder="/RAID1_1TB/MyData", id="target")
with Horizontal(classes="modal-buttons"):
yield Button("Add", variant="success", id="add")
yield Button("Cancel", id="cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "add":
ctid = self.query_one("#ctid", Select).value
host = self.query_one("#host", Input).value.strip()
target = self.query_one("#target", Input).value.strip()
if ctid and host and target:
self.dismiss((str(ctid), host, target))
else:
self.dismiss(None)
else:
self.dismiss(None)
# ----------------------- Main App -----------------------
class PVETui(App):
CSS = """
Screen {
background: $surface;
}
TabbedContent {
height: 1fr;
}
DataTable {
height: 1fr;
}
.toolbar {
height: auto;
padding: 1;
background: $panel;
}
.toolbar Button {
margin-right: 1;
}
.status {
dock: bottom;
height: 1;
background: $primary;
color: $text;
padding: 0 1;
}
.modal-box {
align: center middle;
background: $panel;
border: thick $primary;
padding: 1 2;
width: 60;
height: auto;
}
.modal-title {
text-align: center;
text-style: bold;
margin-bottom: 1;
}
.modal-buttons {
align: center middle;
height: auto;
margin-top: 1;
}
.modal-buttons Button {
margin: 0 1;
}
Label {
margin-top: 1;
}
"""
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("r", "refresh", "Refresh"),
Binding("n", "new_snapshot", "New Snapshot"),
Binding("d", "delete_snapshot", "Delete"),
Binding("s", "schedule", "Schedule"),
]
TITLE = "PVE Storage Manager"
SUB_TITLE = "ZFS Snapshots + LXC Mountpoints"
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent(initial="snapshots"):
with TabPane("Snapshots", id="snapshots"):
with Horizontal(classes="toolbar"):
yield Button("New (n)", id="btn-new", variant="success")
yield Button("Delete (d)", id="btn-del", variant="error")
yield Button("Rollback", id="btn-rollback", variant="warning")
yield Button("Refresh (r)", id="btn-refresh")
yield DataTable(id="snap-table", cursor_type="row")
with TabPane("Schedules", id="schedules"):
with Horizontal(classes="toolbar"):
yield Button("Add Schedule (s)", id="btn-sched-add", variant="success")
yield Button("Snapshot Now", id="btn-sched-now", variant="primary")
yield Button("Remove", id="btn-sched-del", variant="error")
yield Button("Refresh", id="btn-sched-refresh")
yield DataTable(id="sched-table", cursor_type="row")
with TabPane("Datasets", id="datasets"):
with Horizontal(classes="toolbar"):
yield Button("Refresh", id="btn-ds-refresh")
yield DataTable(id="ds-table", cursor_type="row")
with TabPane("LXC Mounts", id="mounts"):
with Horizontal(classes="toolbar"):
yield Button("Add Mount", id="btn-mp-add", variant="success")
yield Button("Remove Mount", id="btn-mp-del", variant="error")
yield Button("Refresh", id="btn-mp-refresh")
yield DataTable(id="mp-table", cursor_type="row")
with TabPane("Replication", id="replication"):
with Horizontal(classes="toolbar"):
yield Button("Add Target", id="btn-rep-add", variant="success")
yield Button("Replicate Now", id="btn-rep-run", variant="primary")
yield Button("Remove", id="btn-rep-del", variant="error")
yield Button("Refresh", id="btn-rep-refresh")
yield DataTable(id="rep-table", cursor_type="row")
with TabPane("Scrub", id="scrub"):
with Horizontal(classes="toolbar"):
yield Button("Start Scrub", id="btn-scrub-start", variant="success")
yield Button("Stop Scrub", id="btn-scrub-stop", variant="warning")
yield Button("Schedule", id="btn-scrub-sched")
yield Button("Unschedule", id="btn-scrub-unsched", variant="error")
yield Button("Refresh", id="btn-scrub-refresh")
yield DataTable(id="scrub-table", cursor_type="row")
yield Static("Ready", id="status", classes="status")
yield Footer()
def on_mount(self) -> None:
if migrate_legacy_crontab():
self.set_status("Migrated legacy crontab entries → /etc/cron.d/pve-tui")
# Snapshot table
t = self.query_one("#snap-table", DataTable)
t.add_columns("Snapshot", "Used", "Refer", "Created")
# Schedule table
t = self.query_one("#sched-table", DataTable)
t.add_columns("Dataset", "Cron", "Keep")
# Dataset table
t = self.query_one("#ds-table", DataTable)
t.add_columns("Name", "Used", "Available", "Mountpoint")
# Mountpoint table
t = self.query_one("#mp-table", DataTable)
t.add_columns("CTID", "Slot", "Host Path", "Container Path")
# Replication table
t = self.query_one("#rep-table", DataTable)
t.add_columns("Source", "Target", "Common", "Latest Src", "Status")
# Scrub table
t = self.query_one("#scrub-table", DataTable)
t.add_columns("Pool", "State", "Last/Current Scan", "Errors", "Schedule")
self.refresh_all()
def set_status(self, msg: str) -> None:
self.query_one("#status", Static).update(msg)
def refresh_all(self) -> None:
self.refresh_snapshots()
self.refresh_schedules()
self.refresh_datasets()
self.refresh_mounts()
self.refresh_replication()
self.refresh_scrub()
self.set_status(f"Refreshed at {datetime.now().strftime('%H:%M:%S')}")
def refresh_snapshots(self) -> None:
t = self.query_one("#snap-table", DataTable)
t.clear()
for snap in list_snapshots():
t.add_row(snap["name"], snap["used"], snap["refer"], snap["creation"])
def refresh_schedules(self) -> None:
t = self.query_one("#sched-table", DataTable)
t.clear()
for s in get_schedules():
t.add_row(s["dataset"], s["cron"], s["keep"])
def refresh_datasets(self) -> None:
t = self.query_one("#ds-table", DataTable)
t.clear()
for d in list_datasets():
t.add_row(d["name"], d["used"], d["avail"], d["mountpoint"])
def refresh_mounts(self) -> None:
t = self.query_one("#mp-table", DataTable)
t.clear()
for lxc in list_lxc():
for mp in get_lxc_mountpoints(lxc["id"]):
t.add_row(lxc["id"], mp["key"], mp["host"], mp["target"])
def refresh_replication(self) -> None:
t = self.query_one("#rep-table", DataTable)
t.clear()
for tgt in load_replication_targets():
src = tgt["source"]
dst = tgt["target"]
common = latest_common_snapshot(src, dst) or "-"
latest = latest_snapshot(src) or "-"
if not dataset_exists(dst):
status = "target missing (full send needed)"
elif common == latest:
status = "up-to-date"
elif common == "-":
status = "no common snap (mismatch)"
else:
status = "incremental pending"
t.add_row(src, dst, common, latest, status)
def refresh_scrub(self) -> None:
t = self.query_one("#scrub-table", DataTable)
t.clear()
sched = get_scrub_schedules()
for pool in list_pools():
info = scrub_status(pool)
t.add_row(
pool,
info["state"],
info["scan"][:60] if info["scan"] else "-",
info["errors"] or "-",
sched.get(pool, "-"),
)
# ------------- Actions -------------
def action_refresh(self) -> None:
self.refresh_all()
def action_new_snapshot(self) -> None:
self._open_create_snapshot()
def action_delete_snapshot(self) -> None:
self._delete_selected_snapshot()
def action_schedule(self) -> None:
self._open_schedule()
def _get_datasets(self) -> list[str]:
return [d["name"] for d in list_datasets()]
def _open_create_snapshot(self) -> None:
datasets = self._get_datasets()
if not datasets:
self.set_status("No datasets found")
return
def cb(result):
if result:
dataset, name = result
ok, msg = create_snapshot(dataset, name)
self.set_status(msg)
self.refresh_snapshots()
self.push_screen(CreateSnapshotModal(datasets), cb)
def _delete_selected_snapshot(self) -> None:
t = self.query_one("#snap-table", DataTable)
if t.cursor_row is None:
return
try:
row = t.get_row_at(t.cursor_row)
snap_name = row[0]
except Exception:
return
def cb(confirmed):
if confirmed:
ok, msg = destroy_snapshot(snap_name)
self.set_status(msg)
self.refresh_snapshots()
self.push_screen(
ConfirmModal(f"Destroy snapshot:\n{snap_name}?"), cb
)
def _rollback_selected_snapshot(self) -> None:
t = self.query_one("#snap-table", DataTable)
if t.cursor_row is None:
return
try:
row = t.get_row_at(t.cursor_row)
snap_name = row[0]
except Exception:
return
def cb(confirmed):
if confirmed:
ok, msg = rollback_snapshot(snap_name)
self.set_status(msg)
self.refresh_all()
self.push_screen(
ConfirmModal(
f"ROLLBACK to {snap_name}?\n"
"This will DESTROY all newer snapshots\n"
"and changes since then!"
), cb
)
def _open_schedule(self) -> None:
datasets = self._get_datasets()
if not datasets:
self.set_status("No datasets found")
return
def cb(result):
if result:
dataset, cron_expr, keep = result
ok = set_schedule(dataset, cron_expr, keep)
self.set_status(
f"Schedule saved for {dataset}" if ok else "Failed to save schedule"
)
self.refresh_schedules()
self.push_screen(ScheduleModal(datasets), cb)
def _snapshot_now_selected(self) -> None:
t = self.query_one("#sched-table", DataTable)
if t.cursor_row is None:
return
try:
row = t.get_row_at(t.cursor_row)
dataset = str(row[0])
keep = int(str(row[2]))
except Exception:
self.set_status("Select a schedule row first")
return
ok, msg = snapshot_now(dataset, keep)
self.set_status(msg)
self.refresh_snapshots()
def _remove_selected_schedule(self) -> None:
t = self.query_one("#sched-table", DataTable)
if t.cursor_row is None:
return
try:
row = t.get_row_at(t.cursor_row)
dataset = row[0]
except Exception:
return
def cb(confirmed):
if confirmed:
ok = remove_schedule(dataset)
self.set_status(
f"Removed schedule for {dataset}" if ok else "Failed"
)
self.refresh_schedules()
self.push_screen(
ConfirmModal(f"Remove schedule for:\n{dataset}?"), cb
)
def _open_add_mount(self) -> None:
ctids = [c["id"] for c in list_lxc()]
if not ctids:
self.set_status("No LXC containers found")
return
def cb(result):
if result:
ctid, host, target = result
ok, msg = add_lxc_mount(ctid, host, target)
self.set_status(msg)
self.refresh_mounts()
self.push_screen(AddMountModal(ctids), cb)
def _open_add_replication(self) -> None:
sources = self._get_datasets()
if not sources:
self.set_status("No datasets to replicate")
return
def cb(result):
if result:
src, tgt = result
targets = load_replication_targets()
for t in targets:
if t["source"] == src and t["target"] == tgt:
self.set_status("Target already configured")
return
targets.append({"source": src, "target": tgt})
if save_replication_targets(targets):
install_replication_script()
self.set_status(f"Added: {src}{tgt}")
self.refresh_replication()
else:
self.set_status("Failed to save replication config")
self.push_screen(AddReplicationModal(sources), cb)
def _remove_selected_replication(self) -> None:
t = self.query_one("#rep-table", DataTable)
if t.cursor_row is None:
return
try:
row = t.get_row_at(t.cursor_row)
src, tgt = str(row[0]), str(row[1])
except Exception:
return
def cb(confirmed):
if confirmed:
targets = [
x for x in load_replication_targets()
if not (x["source"] == src and x["target"] == tgt)
]
if save_replication_targets(targets):
self.set_status(f"Removed: {src}{tgt}")
self.refresh_replication()
self.push_screen(
ConfirmModal(
f"Remove replication target?\n{src}{tgt}\n"
"(This does not destroy the target dataset.)"
),
cb,
)
@work(thread=True, exclusive=True, group="replicate")
def _do_replicate(self, source: str, target: str) -> None:
ok, msg = replicate_dataset(source, target)
self.app.call_from_thread(self.set_status, msg)
self.app.call_from_thread(self.refresh_replication)
self.app.call_from_thread(self.refresh_snapshots)
def _replicate_selected(self) -> None:
t = self.query_one("#rep-table", DataTable)
if t.cursor_row is None:
self.set_status("Select a replication row first")
return
try:
row = t.get_row_at(t.cursor_row)
src, tgt = str(row[0]), str(row[1])
except Exception:
return
self.set_status(f"Replicating {src}{tgt} ... (this may take a while)")
self._do_replicate(src, tgt)
def _scrub_action(self, action: str) -> None:
t = self.query_one("#scrub-table", DataTable)
if t.cursor_row is None:
self.set_status("Select a pool row first")
return
try:
row = t.get_row_at(t.cursor_row)
pool = str(row[0])
except Exception:
return
if action == "start":
ok, msg = scrub_start(pool)
else:
ok, msg = scrub_stop(pool)
self.set_status(msg)
self.refresh_scrub()
def _open_scrub_schedule(self) -> None:
t = self.query_one("#scrub-table", DataTable)
if t.cursor_row is None:
self.set_status("Select a pool row first")
return
try:
row = t.get_row_at(t.cursor_row)
pool = str(row[0])
except Exception:
return
def cb(cron_expr):
if cron_expr:
ok = set_scrub_schedule(pool, cron_expr)
self.set_status(
f"Scrub scheduled for {pool}" if ok else "Failed to save"
)
self.refresh_scrub()
self.push_screen(ScrubScheduleModal(), cb)
def _unschedule_scrub(self) -> None:
t = self.query_one("#scrub-table", DataTable)
if t.cursor_row is None:
return
try:
row = t.get_row_at(t.cursor_row)
pool = str(row[0])
except Exception:
return
def cb(confirmed):
if confirmed:
ok = remove_scrub_schedule(pool)
self.set_status(
f"Unscheduled scrub for {pool}" if ok else "Failed"
)
self.refresh_scrub()
self.push_screen(ConfirmModal(f"Remove scrub schedule for {pool}?"), cb)
def _remove_selected_mount(self) -> None:
t = self.query_one("#mp-table", DataTable)
if t.cursor_row is None:
return
try:
row = t.get_row_at(t.cursor_row)
ctid = row[0]
slot = row[1]
except Exception:
return
def cb(confirmed):
if confirmed:
ok, msg = remove_lxc_mount(ctid, slot)
self.set_status(msg)
self.refresh_mounts()
self.push_screen(
ConfirmModal(f"Remove {slot} from LXC {ctid}?\n(CT must be stopped for some changes)"),
cb
)
# ------------- Button handlers -------------
def on_button_pressed(self, event: Button.Pressed) -> None:
bid = event.button.id
if bid == "btn-new":
self._open_create_snapshot()
elif bid == "btn-del":
self._delete_selected_snapshot()
elif bid == "btn-rollback":
self._rollback_selected_snapshot()
elif bid == "btn-refresh":
self.refresh_snapshots()
elif bid == "btn-sched-add":
self._open_schedule()
elif bid == "btn-sched-now":
self._snapshot_now_selected()
elif bid == "btn-sched-del":
self._remove_selected_schedule()
elif bid == "btn-sched-refresh":
self.refresh_schedules()
elif bid == "btn-ds-refresh":
self.refresh_datasets()
elif bid == "btn-mp-add":
self._open_add_mount()
elif bid == "btn-mp-del":
self._remove_selected_mount()
elif bid == "btn-mp-refresh":
self.refresh_mounts()
elif bid == "btn-rep-add":
self._open_add_replication()
elif bid == "btn-rep-run":
self._replicate_selected()
elif bid == "btn-rep-del":
self._remove_selected_replication()
elif bid == "btn-rep-refresh":
self.refresh_replication()
elif bid == "btn-scrub-start":
self._scrub_action("start")
elif bid == "btn-scrub-stop":
self._scrub_action("stop")
elif bid == "btn-scrub-sched":
self._open_scrub_schedule()
elif bid == "btn-scrub-unsched":
self._unschedule_scrub()
elif bid == "btn-scrub-refresh":
self.refresh_scrub()
if __name__ == "__main__":
PVETui().run()