pounce/backend/app/services/zone_retention.py
Yves Gugger 77e3e9dc1f fix: Zone file persistence + .li TSIG key correction
Zone File Storage:
- Persistent storage in /data/pounce/zones/ (not /tmp)
- 3-day retention for historical snapshots
- Volume mounts in CI/CD pipeline
- New zone_retention.py for snapshot management

Bug Fix:
- Fixed wrong TSIG key for .li zone transfer
- Key was corrupted, causing BADSIG errors
- Now using official Switch.ch key

Config Changes:
- Added switch_data_dir setting
- Added zone_retention_days setting (default: 3)
- CZDS path now defaults to /data/czds
2025-12-20 21:21:37 +01:00

231 lines
7.4 KiB
Python

"""
Zone File Retention Management
==============================
Manages historical zone file snapshots with configurable retention period.
Default: 3 days of history for reliable drop detection.
Features:
- Daily snapshots with timestamps
- Automatic cleanup of old snapshots
- Reliable diff calculation across multiple days
"""
import logging
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from app.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class ZoneRetentionManager:
"""
Manages zone file snapshots with retention policy.
Directory structure:
/data/czds/
xyz_domains.txt <- current/latest
xyz_domains_2024-01-15.txt <- daily snapshot
xyz_domains_2024-01-14.txt
xyz_domains_2024-01-13.txt
"""
def __init__(self, data_dir: Optional[Path] = None, retention_days: int = 3):
self.data_dir = data_dir or Path(settings.czds_data_dir)
self.retention_days = retention_days or settings.zone_retention_days
self.data_dir.mkdir(parents=True, exist_ok=True)
def get_snapshot_path(self, tld: str, date: datetime) -> Path:
"""Get path for a dated snapshot."""
date_str = date.strftime("%Y-%m-%d")
return self.data_dir / f"{tld}_domains_{date_str}.txt"
def get_current_path(self, tld: str) -> Path:
"""Get path for current (latest) snapshot."""
return self.data_dir / f"{tld}_domains.txt"
def save_snapshot(self, tld: str, domains: set[str], date: Optional[datetime] = None):
"""
Save a domain snapshot with date suffix and update current.
Args:
tld: The TLD (e.g., 'xyz', 'ch')
domains: Set of domain names
date: Optional date for snapshot (defaults to today)
"""
date = date or datetime.utcnow()
# Save dated snapshot
snapshot_path = self.get_snapshot_path(tld, date)
content = "\n".join(sorted(domains))
snapshot_path.write_text(content)
# Also update current pointer
current_path = self.get_current_path(tld)
current_path.write_text(content)
logger.info(f"Saved .{tld} snapshot: {len(domains):,} domains -> {snapshot_path.name}")
def load_snapshot(self, tld: str, date: Optional[datetime] = None) -> Optional[set[str]]:
"""
Load a snapshot from a specific date.
Args:
tld: The TLD
date: Date to load (None = current/latest)
Returns:
Set of domain names or None if not found
"""
if date:
path = self.get_snapshot_path(tld, date)
else:
path = self.get_current_path(tld)
if not path.exists():
return None
try:
content = path.read_text()
return set(line.strip() for line in content.splitlines() if line.strip())
except Exception as e:
logger.warning(f"Failed to load snapshot {path.name}: {e}")
return None
def get_previous_snapshot(self, tld: str, days_ago: int = 1) -> Optional[set[str]]:
"""
Load snapshot from N days ago.
Args:
tld: The TLD
days_ago: How many days back to look
Returns:
Set of domain names or None
"""
target_date = datetime.utcnow() - timedelta(days=days_ago)
return self.load_snapshot(tld, target_date)
def cleanup_old_snapshots(self, tld: Optional[str] = None) -> int:
"""
Remove snapshots older than retention period.
Args:
tld: Optional TLD to clean (None = all TLDs)
Returns:
Number of files deleted
"""
cutoff_date = datetime.utcnow() - timedelta(days=self.retention_days)
deleted = 0
# Pattern: *_domains_YYYY-MM-DD.txt
pattern = f"{tld}_domains_*.txt" if tld else "*_domains_*.txt"
for file_path in self.data_dir.glob(pattern):
# Skip current files (no date suffix)
name = file_path.stem
if not any(c.isdigit() for c in name):
continue
# Extract date from filename
try:
# Get the date part (last 10 chars: YYYY-MM-DD)
date_str = name[-10:]
file_date = datetime.strptime(date_str, "%Y-%m-%d")
if file_date < cutoff_date:
file_path.unlink()
deleted += 1
logger.info(f"Deleted old snapshot: {file_path.name}")
except (ValueError, IndexError):
# Not a dated snapshot, skip
continue
if deleted > 0:
logger.info(f"Cleaned up {deleted} old zone file snapshots")
return deleted
def get_available_snapshots(self, tld: str) -> list[datetime]:
"""
List all available snapshot dates for a TLD.
Args:
tld: The TLD
Returns:
List of dates (sorted, newest first)
"""
dates = []
pattern = f"{tld}_domains_*.txt"
for file_path in self.data_dir.glob(pattern):
name = file_path.stem
try:
date_str = name[-10:]
file_date = datetime.strptime(date_str, "%Y-%m-%d")
dates.append(file_date)
except (ValueError, IndexError):
continue
return sorted(dates, reverse=True)
def get_storage_stats(self) -> dict:
"""Get storage statistics for zone files."""
stats = {
"total_files": 0,
"total_size_mb": 0.0,
"tlds": {},
}
for file_path in self.data_dir.glob("*_domains*.txt"):
stats["total_files"] += 1
size_mb = file_path.stat().st_size / (1024 * 1024)
stats["total_size_mb"] += size_mb
# Extract TLD
name = file_path.stem
tld = name.split("_")[0]
if tld not in stats["tlds"]:
stats["tlds"][tld] = {"files": 0, "size_mb": 0.0}
stats["tlds"][tld]["files"] += 1
stats["tlds"][tld]["size_mb"] += size_mb
return stats
def migrate_existing_snapshots():
"""
Migrate existing zone files to dated snapshot format.
Call this once during deployment.
"""
manager = ZoneRetentionManager()
today = datetime.utcnow()
migrated = 0
for data_dir in [Path(settings.czds_data_dir), Path(settings.switch_data_dir)]:
if not data_dir.exists():
continue
for file_path in data_dir.glob("*_domains.txt"):
name = file_path.stem
# Skip if already has date
if any(c.isdigit() for c in name[-10:]):
continue
tld = name.replace("_domains", "")
# Create dated copy
dated_path = data_dir / f"{tld}_domains_{today.strftime('%Y-%m-%d')}.txt"
if not dated_path.exists():
shutil.copy(file_path, dated_path)
migrated += 1
logger.info(f"Migrated {file_path.name} -> {dated_path.name}")
return migrated