""" Zone File Retention Management ============================== Manages historical zone file snapshots with configurable retention period. Default: 3 days of history for reliable drop detection. Features: - Daily snapshots with timestamps - Automatic cleanup of old snapshots - Reliable diff calculation across multiple days """ import logging import shutil from datetime import datetime, timedelta from pathlib import Path from typing import Optional from app.config import get_settings logger = logging.getLogger(__name__) settings = get_settings() class ZoneRetentionManager: """ Manages zone file snapshots with retention policy. Directory structure: /data/czds/ xyz_domains.txt <- current/latest xyz_domains_2024-01-15.txt <- daily snapshot xyz_domains_2024-01-14.txt xyz_domains_2024-01-13.txt """ def __init__(self, data_dir: Optional[Path] = None, retention_days: int = 3): self.data_dir = data_dir or Path(settings.czds_data_dir) self.retention_days = retention_days or settings.zone_retention_days self.data_dir.mkdir(parents=True, exist_ok=True) def get_snapshot_path(self, tld: str, date: datetime) -> Path: """Get path for a dated snapshot.""" date_str = date.strftime("%Y-%m-%d") return self.data_dir / f"{tld}_domains_{date_str}.txt" def get_current_path(self, tld: str) -> Path: """Get path for current (latest) snapshot.""" return self.data_dir / f"{tld}_domains.txt" def save_snapshot(self, tld: str, domains: set[str], date: Optional[datetime] = None): """ Save a domain snapshot with date suffix and update current. Args: tld: The TLD (e.g., 'xyz', 'ch') domains: Set of domain names date: Optional date for snapshot (defaults to today) """ date = date or datetime.utcnow() # Save dated snapshot snapshot_path = self.get_snapshot_path(tld, date) content = "\n".join(sorted(domains)) snapshot_path.write_text(content) # Also update current pointer current_path = self.get_current_path(tld) current_path.write_text(content) logger.info(f"Saved .{tld} snapshot: {len(domains):,} domains -> {snapshot_path.name}") def load_snapshot(self, tld: str, date: Optional[datetime] = None) -> Optional[set[str]]: """ Load a snapshot from a specific date. Args: tld: The TLD date: Date to load (None = current/latest) Returns: Set of domain names or None if not found """ if date: path = self.get_snapshot_path(tld, date) else: path = self.get_current_path(tld) if not path.exists(): return None try: content = path.read_text() return set(line.strip() for line in content.splitlines() if line.strip()) except Exception as e: logger.warning(f"Failed to load snapshot {path.name}: {e}") return None def get_previous_snapshot(self, tld: str, days_ago: int = 1) -> Optional[set[str]]: """ Load snapshot from N days ago. Args: tld: The TLD days_ago: How many days back to look Returns: Set of domain names or None """ target_date = datetime.utcnow() - timedelta(days=days_ago) return self.load_snapshot(tld, target_date) def cleanup_old_snapshots(self, tld: Optional[str] = None) -> int: """ Remove snapshots older than retention period. Args: tld: Optional TLD to clean (None = all TLDs) Returns: Number of files deleted """ cutoff_date = datetime.utcnow() - timedelta(days=self.retention_days) deleted = 0 # Pattern: *_domains_YYYY-MM-DD.txt pattern = f"{tld}_domains_*.txt" if tld else "*_domains_*.txt" for file_path in self.data_dir.glob(pattern): # Skip current files (no date suffix) name = file_path.stem if not any(c.isdigit() for c in name): continue # Extract date from filename try: # Get the date part (last 10 chars: YYYY-MM-DD) date_str = name[-10:] file_date = datetime.strptime(date_str, "%Y-%m-%d") if file_date < cutoff_date: file_path.unlink() deleted += 1 logger.info(f"Deleted old snapshot: {file_path.name}") except (ValueError, IndexError): # Not a dated snapshot, skip continue if deleted > 0: logger.info(f"Cleaned up {deleted} old zone file snapshots") return deleted def get_available_snapshots(self, tld: str) -> list[datetime]: """ List all available snapshot dates for a TLD. Args: tld: The TLD Returns: List of dates (sorted, newest first) """ dates = [] pattern = f"{tld}_domains_*.txt" for file_path in self.data_dir.glob(pattern): name = file_path.stem try: date_str = name[-10:] file_date = datetime.strptime(date_str, "%Y-%m-%d") dates.append(file_date) except (ValueError, IndexError): continue return sorted(dates, reverse=True) def get_storage_stats(self) -> dict: """Get storage statistics for zone files.""" stats = { "total_files": 0, "total_size_mb": 0.0, "tlds": {}, } for file_path in self.data_dir.glob("*_domains*.txt"): stats["total_files"] += 1 size_mb = file_path.stat().st_size / (1024 * 1024) stats["total_size_mb"] += size_mb # Extract TLD name = file_path.stem tld = name.split("_")[0] if tld not in stats["tlds"]: stats["tlds"][tld] = {"files": 0, "size_mb": 0.0} stats["tlds"][tld]["files"] += 1 stats["tlds"][tld]["size_mb"] += size_mb return stats def migrate_existing_snapshots(): """ Migrate existing zone files to dated snapshot format. Call this once during deployment. """ manager = ZoneRetentionManager() today = datetime.utcnow() migrated = 0 for data_dir in [Path(settings.czds_data_dir), Path(settings.switch_data_dir)]: if not data_dir.exists(): continue for file_path in data_dir.glob("*_domains.txt"): name = file_path.stem # Skip if already has date if any(c.isdigit() for c in name[-10:]): continue tld = name.replace("_domains", "") # Create dated copy dated_path = data_dir / f"{tld}_domains_{today.strftime('%Y-%m-%d')}.txt" if not dated_path.exists(): shutil.copy(file_path, dated_path) migrated += 1 logger.info(f"Migrated {file_path.name} -> {dated_path.name}") return migrated