Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
202 lines
5.6 KiB
Python
202 lines
5.6 KiB
Python
"""
|
|
DB backup utilities (4B Ops).
|
|
|
|
Supports:
|
|
- SQLite: file copy + integrity_check verification
|
|
- Postgres: pg_dump custom format + pg_restore --list verification
|
|
|
|
This is real ops code: it will fail loudly if the platform tooling isn't available.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from sqlalchemy.engine.url import make_url
|
|
|
|
from app.config import get_settings
|
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BackupResult:
|
|
path: str
|
|
size_bytes: int
|
|
created_at: str
|
|
verified: bool
|
|
verification_detail: Optional[str] = None
|
|
|
|
|
|
def _backup_root() -> Path:
|
|
root = Path(settings.backup_dir)
|
|
if not root.is_absolute():
|
|
# Keep backups next to backend working dir by default
|
|
root = (Path.cwd() / root).resolve()
|
|
root.mkdir(parents=True, exist_ok=True)
|
|
return root
|
|
|
|
|
|
def _timestamp() -> str:
|
|
return datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
|
|
|
|
|
|
def _cleanup_old_backups(root: Path, retention_days: int) -> int:
|
|
if retention_days <= 0:
|
|
return 0
|
|
cutoff = datetime.utcnow() - timedelta(days=retention_days)
|
|
removed = 0
|
|
for p in root.glob("*"):
|
|
if not p.is_file():
|
|
continue
|
|
try:
|
|
mtime = datetime.utcfromtimestamp(p.stat().st_mtime)
|
|
if mtime < cutoff:
|
|
p.unlink()
|
|
removed += 1
|
|
except Exception:
|
|
continue
|
|
return removed
|
|
|
|
|
|
def _sqlite_path_from_url(database_url: str) -> Path:
|
|
url = make_url(database_url)
|
|
db_path = url.database
|
|
if not db_path:
|
|
raise RuntimeError("SQLite database path missing in DATABASE_URL")
|
|
p = Path(db_path)
|
|
if not p.is_absolute():
|
|
p = (Path.cwd() / p).resolve()
|
|
return p
|
|
|
|
|
|
def _verify_sqlite(path: Path) -> tuple[bool, str]:
|
|
import sqlite3
|
|
|
|
conn = sqlite3.connect(str(path))
|
|
try:
|
|
row = conn.execute("PRAGMA integrity_check;").fetchone()
|
|
ok = bool(row and str(row[0]).lower() == "ok")
|
|
return ok, str(row[0]) if row else "no result"
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _pg_dump_backup(database_url: str, out_file: Path) -> None:
|
|
url = make_url(database_url)
|
|
if not url.database:
|
|
raise RuntimeError("Postgres database name missing in DATABASE_URL")
|
|
|
|
env = os.environ.copy()
|
|
if url.password:
|
|
env["PGPASSWORD"] = str(url.password)
|
|
|
|
cmd = [
|
|
"pg_dump",
|
|
"--format=custom",
|
|
"--no-owner",
|
|
"--no-privileges",
|
|
"--file",
|
|
str(out_file),
|
|
]
|
|
if url.host:
|
|
cmd += ["--host", str(url.host)]
|
|
if url.port:
|
|
cmd += ["--port", str(url.port)]
|
|
if url.username:
|
|
cmd += ["--username", str(url.username)]
|
|
cmd += [str(url.database)]
|
|
|
|
proc = subprocess.run(cmd, env=env, capture_output=True, text=True)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"pg_dump failed: {proc.stderr.strip() or proc.stdout.strip()}")
|
|
|
|
|
|
def _verify_pg_dump(out_file: Path) -> tuple[bool, str]:
|
|
# Basic size check
|
|
if out_file.stat().st_size < 1024:
|
|
return False, "backup file too small"
|
|
|
|
proc = subprocess.run(
|
|
["pg_restore", "--list", str(out_file)],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if proc.returncode != 0:
|
|
return False, proc.stderr.strip() or proc.stdout.strip() or "pg_restore failed"
|
|
return True, "pg_restore --list OK"
|
|
|
|
|
|
def create_backup(*, verify: bool = True) -> BackupResult:
|
|
root = _backup_root()
|
|
_cleanup_old_backups(root, settings.backup_retention_days)
|
|
|
|
db_url = settings.database_url
|
|
driver = make_url(db_url).drivername
|
|
created_at = datetime.utcnow().isoformat() + "Z"
|
|
|
|
if driver.startswith("sqlite"):
|
|
src = _sqlite_path_from_url(db_url)
|
|
if not src.exists():
|
|
raise RuntimeError(f"SQLite DB file not found: {src}")
|
|
out = root / f"sqlite-backup-{_timestamp()}{src.suffix or '.db'}"
|
|
shutil.copy2(src, out)
|
|
ok = True
|
|
detail = None
|
|
if verify:
|
|
ok, detail = _verify_sqlite(out)
|
|
if not ok:
|
|
raise RuntimeError(f"SQLite backup verification failed: {detail}")
|
|
return BackupResult(
|
|
path=str(out),
|
|
size_bytes=out.stat().st_size,
|
|
created_at=created_at,
|
|
verified=ok,
|
|
verification_detail=detail,
|
|
)
|
|
|
|
if driver.startswith("postgresql"):
|
|
out = root / f"pg-backup-{_timestamp()}.dump"
|
|
_pg_dump_backup(db_url, out)
|
|
ok = True
|
|
detail = None
|
|
if verify:
|
|
ok, detail = _verify_pg_dump(out)
|
|
if not ok:
|
|
raise RuntimeError(f"Postgres backup verification failed: {detail}")
|
|
return BackupResult(
|
|
path=str(out),
|
|
size_bytes=out.stat().st_size,
|
|
created_at=created_at,
|
|
verified=ok,
|
|
verification_detail=detail,
|
|
)
|
|
|
|
raise RuntimeError(f"Unsupported database driver for backups: {driver}")
|
|
|
|
|
|
def list_backups(limit: int = 20) -> list[dict]:
|
|
root = _backup_root()
|
|
files = [p for p in root.glob("*") if p.is_file()]
|
|
files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
out: list[dict] = []
|
|
for p in files[: max(1, limit)]:
|
|
st = p.stat()
|
|
out.append(
|
|
{
|
|
"name": p.name,
|
|
"path": str(p),
|
|
"size_bytes": st.st_size,
|
|
"modified_at": datetime.utcfromtimestamp(st.st_mtime).isoformat() + "Z",
|
|
}
|
|
)
|
|
return out
|
|
|