#!/usr/bin/env python3 """ Pounce Zone File Sync - Daily Automated Zone File Synchronization This script: 1. Downloads zone files from ICANN CZDS (app, dev, info, online, org, xyz) 2. Downloads zone files from Switch.ch via AXFR (.ch, .li) 3. Compares with yesterday's data to detect drops 4. Stores drops in the database for the Drops tab 5. Cleans up all temporary files and compresses domain lists STORAGE STRATEGY (Ultra-Efficient): - Raw zone files: DELETED immediately after parsing - Domain lists: Stored COMPRESSED (.gz) - ~80% size reduction - Only keeps current snapshot (no history) - Drops stored in DB for 48h only Run daily via cron at 06:00 UTC (after most registries update) """ import asyncio import gzip import logging import subprocess import sys import shutil from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Set # Add parent to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker # Configuration CZDS_DIR = Path("/home/user/pounce_czds") SWITCH_DIR = Path("/home/user/pounce_switch") LOG_FILE = Path("/home/user/logs/zone_sync.log") # Storage efficiency: compress domain lists COMPRESS_DOMAIN_LISTS = True # CZDS TLDs we have access to CZDS_TLDS = ["app", "dev", "info", "online", "org", "xyz"] # Switch.ch AXFR config SWITCH_CONFIG = { "ch": { "server": "zonedata.switch.ch", "key_name": "tsig-zonedata-ch-public-21-01.", "key_secret": "stZwEGApYumtXkh73qMLPqfbIDozWKZLkqRvcjKSpRnsor6A6MxixRL6C2HeSVBQNfMW4wer+qjS0ZSfiWiJ3Q==" }, "li": { "server": "zonedata.switch.ch", "key_name": "tsig-zonedata-li-public-21-01.", "key_secret": "t8GgeCn+fhPaj+cRy/lakQPb6M45xz/NZwmcp4iqbBxKFCCH0/k3xNGe6sf3ObmoaKDBedge/La4cpPfLqtFkw==" } } # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(LOG_FILE) if LOG_FILE.parent.exists() else logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class ZoneSyncResult: """Result of a zone sync operation""" def __init__(self, tld: str): self.tld = tld self.success = False self.domain_count = 0 self.drops_count = 0 self.error: Optional[str] = None self.duration_seconds = 0 async def get_db_session(): """Create async database session""" from app.config import settings engine = create_async_engine(settings.database_url.replace("sqlite://", "sqlite+aiosqlite://")) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) return async_session() def download_czds_zone(tld: str, max_retries: int = 3) -> Optional[Path]: """Download a single CZDS zone file using pyCZDS with retry logic""" import time for attempt in range(max_retries): try: from pyczds.client import CZDSClient # Read credentials from .env env_file = Path(__file__).parent.parent / ".env" if not env_file.exists(): env_file = Path("/home/user/pounce/backend/.env") env_content = env_file.read_text() username = password = None for line in env_content.splitlines(): if line.startswith("CZDS_USERNAME="): username = line.split("=", 1)[1].strip() elif line.startswith("CZDS_PASSWORD="): password = line.split("=", 1)[1].strip() if not username or not password: logger.error(f"CZDS credentials not found in .env") return None client = CZDSClient(username, password) urls = client.get_zonefiles_list() # Find URL for this TLD target_url = None for url in urls: if f"{tld}.zone" in url or f"/{tld}." in url: target_url = url break if not target_url: logger.warning(f"No access to .{tld} zone file") return None logger.info(f"Downloading .{tld} from CZDS... (attempt {attempt + 1}/{max_retries})") result = client.get_zonefile(target_url, download_dir=str(CZDS_DIR)) # Find the downloaded file gz_file = CZDS_DIR / f"{tld}.txt.gz" if gz_file.exists(): return gz_file # Try alternative naming for f in CZDS_DIR.glob(f"*{tld}*.gz"): return f return None except Exception as e: logger.warning(f"CZDS download attempt {attempt + 1} failed for .{tld}: {e}") if attempt < max_retries - 1: wait_time = (attempt + 1) * 30 # 30s, 60s, 90s backoff logger.info(f"Retrying in {wait_time}s...") time.sleep(wait_time) else: logger.error(f"CZDS download failed for .{tld} after {max_retries} attempts") return None return None def download_switch_zone(tld: str) -> Optional[Path]: """Download zone file from Switch.ch via AXFR""" config = SWITCH_CONFIG.get(tld) if not config: return None try: output_file = SWITCH_DIR / f"{tld}_zone.txt" cmd = [ "dig", "@" + config["server"], f"{tld}.", "AXFR", "-y", f"hmac-sha512:{config['key_name']}:{config['key_secret']}" ] logger.info(f"Downloading .{tld} via AXFR from Switch.ch...") result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) if result.returncode != 0: logger.error(f"AXFR failed for .{tld}: {result.stderr}") return None output_file.write_text(result.stdout) return output_file except subprocess.TimeoutExpired: logger.error(f"AXFR timeout for .{tld}") return None except Exception as e: logger.error(f"AXFR failed for .{tld}: {e}") return None def parse_czds_zone(gz_file: Path, tld: str) -> set: """Parse a gzipped CZDS zone file and extract unique root domains""" domains = set() try: with gzip.open(gz_file, 'rt', encoding='utf-8', errors='ignore') as f: for line in f: if line.startswith(';') or not line.strip(): continue parts = line.split() if len(parts) >= 4: name = parts[0].rstrip('.') if name.lower().endswith(f'.{tld}'): domain_name = name[:-(len(tld) + 1)] # Only root domains (no subdomains) if domain_name and '.' not in domain_name: domains.add(domain_name.lower()) return domains except Exception as e: logger.error(f"Failed to parse .{tld} zone file: {e}") return set() def parse_switch_zone(zone_file: Path, tld: str) -> set: """Parse Switch.ch AXFR output and extract unique root domains""" domains = set() try: content = zone_file.read_text() for line in content.splitlines(): if line.startswith(';') or not line.strip(): continue parts = line.split() if len(parts) >= 4: name = parts[0].rstrip('.') # Skip the TLD itself if name.lower() == tld: continue if name.lower().endswith(f'.{tld}'): domain_name = name[:-(len(tld) + 1)] # Only root domains if domain_name and '.' not in domain_name: domains.add(domain_name.lower()) return domains except Exception as e: logger.error(f"Failed to parse .{tld} zone file: {e}") return set() def save_domains(tld: str, domains: Set[str], directory: Path) -> Path: """Save domain list to compressed file for storage efficiency""" if COMPRESS_DOMAIN_LISTS: out_file = directory / f"{tld}_domains.txt.gz" # Remove old uncompressed file if exists old_file = directory / f"{tld}_domains.txt" if old_file.exists(): old_file.unlink() # Write compressed with gzip.open(out_file, 'wt', encoding='utf-8', compresslevel=9) as f: f.write('\n'.join(sorted(domains))) return out_file else: out_file = directory / f"{tld}_domains.txt" out_file.write_text('\n'.join(sorted(domains))) return out_file def load_previous_domains(tld: str, directory: Path) -> Set[str]: """Load previous day's domain list (compressed or uncompressed)""" # Try compressed first gz_file = directory / f"{tld}_domains.txt.gz" if gz_file.exists(): try: with gzip.open(gz_file, 'rt', encoding='utf-8') as f: return set(f.read().splitlines()) except Exception as e: logger.warning(f"Failed to read compressed domains for .{tld}: {e}") return set() # Fallback to uncompressed txt_file = directory / f"{tld}_domains.txt" if txt_file.exists(): try: return set(txt_file.read_text().splitlines()) except Exception: return set() return set() def detect_drops(tld: str, today_domains: set, yesterday_domains: set) -> set: """Detect domains that were dropped (present yesterday, missing today)""" if not yesterday_domains: logger.info(f".{tld}: No previous data for comparison (first run)") return set() drops = yesterday_domains - today_domains return drops async def store_drops_in_db(drops: list[tuple[str, str]], session: AsyncSession): """Store dropped domains in database using existing DroppedDomain model""" if not drops: return 0 now = datetime.utcnow() # Delete old drops (older than 48 hours) cutoff = now - timedelta(hours=48) await session.execute( text("DELETE FROM dropped_domains WHERE dropped_date < :cutoff"), {"cutoff": cutoff} ) # Insert new drops count = 0 for domain, tld in drops: try: # Calculate domain properties length = len(domain) is_numeric = domain.isdigit() has_hyphen = '-' in domain await session.execute( text(""" INSERT OR REPLACE INTO dropped_domains (domain, tld, dropped_date, length, is_numeric, has_hyphen, created_at) VALUES (:domain, :tld, :dropped_date, :length, :is_numeric, :has_hyphen, :created_at) """), { "domain": domain, "tld": tld, "dropped_date": now, "length": length, "is_numeric": is_numeric, "has_hyphen": has_hyphen, "created_at": now } ) count += 1 except Exception as e: logger.debug(f"Failed to insert drop {domain}.{tld}: {e}") await session.commit() return count async def sync_czds_tld(tld: str) -> ZoneSyncResult: """Sync a single CZDS TLD""" result = ZoneSyncResult(tld) start = datetime.now() try: # Load previous domains for comparison yesterday_domains = load_previous_domains(tld, CZDS_DIR) # Download new zone file gz_file = download_czds_zone(tld) if not gz_file: result.error = "Download failed" return result # Parse zone file logger.info(f"Parsing .{tld} zone file...") today_domains = parse_czds_zone(gz_file, tld) if not today_domains: result.error = "Parsing failed - no domains extracted" return result result.domain_count = len(today_domains) # Detect drops drops = detect_drops(tld, today_domains, yesterday_domains) result.drops_count = len(drops) # Save current domains for tomorrow's comparison save_domains(tld, today_domains, CZDS_DIR) # Cleanup gz file if gz_file.exists(): gz_file.unlink() # Update last download marker marker = CZDS_DIR / f".{tld}_last_download" marker.write_text(datetime.utcnow().isoformat()) result.success = True logger.info(f"โœ… .{tld}: {result.domain_count:,} domains, {result.drops_count:,} drops") # Return drops for DB storage result.drops = [(d, tld) for d in drops] except Exception as e: result.error = str(e) logger.error(f"โŒ .{tld} sync failed: {e}") result.duration_seconds = (datetime.now() - start).total_seconds() return result async def sync_switch_tld(tld: str) -> ZoneSyncResult: """Sync a single Switch.ch TLD""" result = ZoneSyncResult(tld) start = datetime.now() try: # Load previous domains for comparison yesterday_domains = load_previous_domains(tld, SWITCH_DIR) # Download new zone file zone_file = download_switch_zone(tld) if not zone_file: result.error = "AXFR failed" return result # Parse zone file logger.info(f"Parsing .{tld} zone file...") today_domains = parse_switch_zone(zone_file, tld) if not today_domains: result.error = "Parsing failed - no domains extracted" return result result.domain_count = len(today_domains) # Detect drops drops = detect_drops(tld, today_domains, yesterday_domains) result.drops_count = len(drops) # Save current domains for tomorrow's comparison save_domains(tld, today_domains, SWITCH_DIR) # Cleanup raw zone file (keep only domain list) if zone_file.exists(): zone_file.unlink() result.success = True logger.info(f"โœ… .{tld}: {result.domain_count:,} domains, {result.drops_count:,} drops") # Return drops for DB storage result.drops = [(d, tld) for d in drops] except Exception as e: result.error = str(e) logger.error(f"โŒ .{tld} sync failed: {e}") result.duration_seconds = (datetime.now() - start).total_seconds() return result def cleanup_stray_files(directory: Path, keep_extensions: list = None): """Remove any stray/temporary files to save space""" if keep_extensions is None: keep_extensions = ['.txt.gz', '.txt'] # Only keep domain lists removed_count = 0 removed_size = 0 for f in directory.iterdir(): if f.is_file(): # Keep marker files if f.name.startswith('.'): continue # Keep domain list files if any(f.name.endswith(ext) for ext in keep_extensions): continue # Remove everything else (raw zone files, temp files) try: size = f.stat().st_size f.unlink() removed_count += 1 removed_size += size logger.info(f"๐Ÿ—‘๏ธ Removed stray file: {f.name} ({size / (1024*1024):.1f} MB)") except Exception as e: logger.warning(f"Failed to remove {f.name}: {e}") return removed_count, removed_size def get_directory_size(directory: Path) -> int: """Get total size of directory in bytes""" total = 0 for f in directory.rglob('*'): if f.is_file(): total += f.stat().st_size return total def log_storage_stats(): """Log current storage usage""" czds_size = get_directory_size(CZDS_DIR) if CZDS_DIR.exists() else 0 switch_size = get_directory_size(SWITCH_DIR) if SWITCH_DIR.exists() else 0 total = czds_size + switch_size logger.info(f"๐Ÿ’พ STORAGE: CZDS={czds_size/(1024*1024):.1f}MB, Switch={switch_size/(1024*1024):.1f}MB, Total={total/(1024*1024):.1f}MB") return total async def main(): """Main sync process""" logger.info("=" * 60) logger.info("๐Ÿš€ POUNCE ZONE SYNC - Starting daily synchronization") logger.info("=" * 60) start_time = datetime.now() # Ensure directories exist CZDS_DIR.mkdir(parents=True, exist_ok=True) SWITCH_DIR.mkdir(parents=True, exist_ok=True) LOG_FILE.parent.mkdir(parents=True, exist_ok=True) # Log initial storage logger.info("\n๐Ÿ“Š Initial storage check...") initial_storage = log_storage_stats() all_drops = [] results = [] # Sync CZDS TLDs (sequentially to respect rate limits) logger.info("\n๐Ÿ“ฆ Syncing ICANN CZDS zone files...") for tld in CZDS_TLDS: result = await sync_czds_tld(tld) results.append(result) if hasattr(result, 'drops'): all_drops.extend(result.drops) # Rate limit: wait between downloads if tld != CZDS_TLDS[-1]: logger.info("โณ Waiting 5 seconds (rate limit)...") await asyncio.sleep(5) # Sync Switch.ch TLDs logger.info("\n๐Ÿ‡จ๐Ÿ‡ญ Syncing Switch.ch zone files...") for tld in ["ch", "li"]: result = await sync_switch_tld(tld) results.append(result) if hasattr(result, 'drops'): all_drops.extend(result.drops) # Store drops in database if all_drops: logger.info(f"\n๐Ÿ’พ Storing {len(all_drops)} drops in database...") try: session = await get_db_session() stored = await store_drops_in_db(all_drops, session) await session.close() logger.info(f"โœ… Stored {stored} drops in database") except Exception as e: logger.error(f"โŒ Failed to store drops: {e}") # Cleanup stray files logger.info("\n๐Ÿงน Cleaning up temporary files...") czds_removed, czds_freed = cleanup_stray_files(CZDS_DIR) switch_removed, switch_freed = cleanup_stray_files(SWITCH_DIR) total_freed = czds_freed + switch_freed if total_freed > 0: logger.info(f"โœ… Freed {total_freed / (1024*1024):.1f} MB ({czds_removed + switch_removed} files)") else: logger.info("โœ… No stray files found") # Summary duration = (datetime.now() - start_time).total_seconds() logger.info("\n" + "=" * 60) logger.info("๐Ÿ“Š SYNC SUMMARY") logger.info("=" * 60) total_domains = 0 total_drops = 0 success_count = 0 for r in results: status = "โœ…" if r.success else "โŒ" logger.info(f" {status} .{r.tld}: {r.domain_count:,} domains, {r.drops_count:,} drops ({r.duration_seconds:.1f}s)") if r.success: total_domains += r.domain_count total_drops += r.drops_count success_count += 1 logger.info("-" * 60) logger.info(f" TOTAL: {total_domains:,} domains across {success_count}/{len(results)} TLDs") logger.info(f" DROPS: {total_drops:,} new drops detected") logger.info(f" TIME: {duration:.1f} seconds") # Final storage stats logger.info("-" * 60) final_storage = log_storage_stats() if initial_storage > 0: change = final_storage - initial_storage logger.info(f" CHANGE: {'+' if change > 0 else ''}{change/(1024*1024):.1f} MB") logger.info("=" * 60) return 0 if success_count == len(results) else 1 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)