Zone File Storage: - Persistent storage in /data/pounce/zones/ (not /tmp) - 3-day retention for historical snapshots - Volume mounts in CI/CD pipeline - New zone_retention.py for snapshot management Bug Fix: - Fixed wrong TSIG key for .li zone transfer - Key was corrupted, causing BADSIG errors - Now using official Switch.ch key Config Changes: - Added switch_data_dir setting - Added zone_retention_days setting (default: 3) - CZDS path now defaults to /data/czds
231 lines
7.4 KiB
Python
231 lines
7.4 KiB
Python
"""
|
|
Zone File Retention Management
|
|
==============================
|
|
Manages historical zone file snapshots with configurable retention period.
|
|
Default: 3 days of history for reliable drop detection.
|
|
|
|
Features:
|
|
- Daily snapshots with timestamps
|
|
- Automatic cleanup of old snapshots
|
|
- Reliable diff calculation across multiple days
|
|
"""
|
|
|
|
import logging
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from app.config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
settings = get_settings()
|
|
|
|
|
|
class ZoneRetentionManager:
|
|
"""
|
|
Manages zone file snapshots with retention policy.
|
|
|
|
Directory structure:
|
|
/data/czds/
|
|
xyz_domains.txt <- current/latest
|
|
xyz_domains_2024-01-15.txt <- daily snapshot
|
|
xyz_domains_2024-01-14.txt
|
|
xyz_domains_2024-01-13.txt
|
|
"""
|
|
|
|
def __init__(self, data_dir: Optional[Path] = None, retention_days: int = 3):
|
|
self.data_dir = data_dir or Path(settings.czds_data_dir)
|
|
self.retention_days = retention_days or settings.zone_retention_days
|
|
self.data_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def get_snapshot_path(self, tld: str, date: datetime) -> Path:
|
|
"""Get path for a dated snapshot."""
|
|
date_str = date.strftime("%Y-%m-%d")
|
|
return self.data_dir / f"{tld}_domains_{date_str}.txt"
|
|
|
|
def get_current_path(self, tld: str) -> Path:
|
|
"""Get path for current (latest) snapshot."""
|
|
return self.data_dir / f"{tld}_domains.txt"
|
|
|
|
def save_snapshot(self, tld: str, domains: set[str], date: Optional[datetime] = None):
|
|
"""
|
|
Save a domain snapshot with date suffix and update current.
|
|
|
|
Args:
|
|
tld: The TLD (e.g., 'xyz', 'ch')
|
|
domains: Set of domain names
|
|
date: Optional date for snapshot (defaults to today)
|
|
"""
|
|
date = date or datetime.utcnow()
|
|
|
|
# Save dated snapshot
|
|
snapshot_path = self.get_snapshot_path(tld, date)
|
|
content = "\n".join(sorted(domains))
|
|
snapshot_path.write_text(content)
|
|
|
|
# Also update current pointer
|
|
current_path = self.get_current_path(tld)
|
|
current_path.write_text(content)
|
|
|
|
logger.info(f"Saved .{tld} snapshot: {len(domains):,} domains -> {snapshot_path.name}")
|
|
|
|
def load_snapshot(self, tld: str, date: Optional[datetime] = None) -> Optional[set[str]]:
|
|
"""
|
|
Load a snapshot from a specific date.
|
|
|
|
Args:
|
|
tld: The TLD
|
|
date: Date to load (None = current/latest)
|
|
|
|
Returns:
|
|
Set of domain names or None if not found
|
|
"""
|
|
if date:
|
|
path = self.get_snapshot_path(tld, date)
|
|
else:
|
|
path = self.get_current_path(tld)
|
|
|
|
if not path.exists():
|
|
return None
|
|
|
|
try:
|
|
content = path.read_text()
|
|
return set(line.strip() for line in content.splitlines() if line.strip())
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load snapshot {path.name}: {e}")
|
|
return None
|
|
|
|
def get_previous_snapshot(self, tld: str, days_ago: int = 1) -> Optional[set[str]]:
|
|
"""
|
|
Load snapshot from N days ago.
|
|
|
|
Args:
|
|
tld: The TLD
|
|
days_ago: How many days back to look
|
|
|
|
Returns:
|
|
Set of domain names or None
|
|
"""
|
|
target_date = datetime.utcnow() - timedelta(days=days_ago)
|
|
return self.load_snapshot(tld, target_date)
|
|
|
|
def cleanup_old_snapshots(self, tld: Optional[str] = None) -> int:
|
|
"""
|
|
Remove snapshots older than retention period.
|
|
|
|
Args:
|
|
tld: Optional TLD to clean (None = all TLDs)
|
|
|
|
Returns:
|
|
Number of files deleted
|
|
"""
|
|
cutoff_date = datetime.utcnow() - timedelta(days=self.retention_days)
|
|
deleted = 0
|
|
|
|
# Pattern: *_domains_YYYY-MM-DD.txt
|
|
pattern = f"{tld}_domains_*.txt" if tld else "*_domains_*.txt"
|
|
|
|
for file_path in self.data_dir.glob(pattern):
|
|
# Skip current files (no date suffix)
|
|
name = file_path.stem
|
|
if not any(c.isdigit() for c in name):
|
|
continue
|
|
|
|
# Extract date from filename
|
|
try:
|
|
# Get the date part (last 10 chars: YYYY-MM-DD)
|
|
date_str = name[-10:]
|
|
file_date = datetime.strptime(date_str, "%Y-%m-%d")
|
|
|
|
if file_date < cutoff_date:
|
|
file_path.unlink()
|
|
deleted += 1
|
|
logger.info(f"Deleted old snapshot: {file_path.name}")
|
|
except (ValueError, IndexError):
|
|
# Not a dated snapshot, skip
|
|
continue
|
|
|
|
if deleted > 0:
|
|
logger.info(f"Cleaned up {deleted} old zone file snapshots")
|
|
|
|
return deleted
|
|
|
|
def get_available_snapshots(self, tld: str) -> list[datetime]:
|
|
"""
|
|
List all available snapshot dates for a TLD.
|
|
|
|
Args:
|
|
tld: The TLD
|
|
|
|
Returns:
|
|
List of dates (sorted, newest first)
|
|
"""
|
|
dates = []
|
|
pattern = f"{tld}_domains_*.txt"
|
|
|
|
for file_path in self.data_dir.glob(pattern):
|
|
name = file_path.stem
|
|
try:
|
|
date_str = name[-10:]
|
|
file_date = datetime.strptime(date_str, "%Y-%m-%d")
|
|
dates.append(file_date)
|
|
except (ValueError, IndexError):
|
|
continue
|
|
|
|
return sorted(dates, reverse=True)
|
|
|
|
def get_storage_stats(self) -> dict:
|
|
"""Get storage statistics for zone files."""
|
|
stats = {
|
|
"total_files": 0,
|
|
"total_size_mb": 0.0,
|
|
"tlds": {},
|
|
}
|
|
|
|
for file_path in self.data_dir.glob("*_domains*.txt"):
|
|
stats["total_files"] += 1
|
|
size_mb = file_path.stat().st_size / (1024 * 1024)
|
|
stats["total_size_mb"] += size_mb
|
|
|
|
# Extract TLD
|
|
name = file_path.stem
|
|
tld = name.split("_")[0]
|
|
if tld not in stats["tlds"]:
|
|
stats["tlds"][tld] = {"files": 0, "size_mb": 0.0}
|
|
stats["tlds"][tld]["files"] += 1
|
|
stats["tlds"][tld]["size_mb"] += size_mb
|
|
|
|
return stats
|
|
|
|
|
|
def migrate_existing_snapshots():
|
|
"""
|
|
Migrate existing zone files to dated snapshot format.
|
|
Call this once during deployment.
|
|
"""
|
|
manager = ZoneRetentionManager()
|
|
today = datetime.utcnow()
|
|
migrated = 0
|
|
|
|
for data_dir in [Path(settings.czds_data_dir), Path(settings.switch_data_dir)]:
|
|
if not data_dir.exists():
|
|
continue
|
|
|
|
for file_path in data_dir.glob("*_domains.txt"):
|
|
name = file_path.stem
|
|
# Skip if already has date
|
|
if any(c.isdigit() for c in name[-10:]):
|
|
continue
|
|
|
|
tld = name.replace("_domains", "")
|
|
|
|
# Create dated copy
|
|
dated_path = data_dir / f"{tld}_domains_{today.strftime('%Y-%m-%d')}.txt"
|
|
if not dated_path.exists():
|
|
shutil.copy(file_path, dated_path)
|
|
migrated += 1
|
|
logger.info(f"Migrated {file_path.name} -> {dated_path.name}")
|
|
|
|
return migrated
|