#!/usr/bin/env python3 """ Safe CZDS Zone File Sync Script ================================ Uses the official pyCZDS library to safely download zone files. IMPORTANT Rate Limits: - Max 1 download per TLD per 24 hours - Max 3 direct downloads per TLD per 24h (or you get blocked!) - Zone files are updated daily between 00:00-06:00 UTC Run this script ONCE daily, after 06:00 UTC. """ import asyncio import gzip import hashlib import logging import os import sys import time from datetime import datetime, timedelta from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from pyczds.client import CZDSClient as PyCZDSClient logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration APPROVED_TLDS = ["xyz", "org", "online", "info", "dev", "app"] DATA_DIR = Path(os.getenv("CZDS_DATA_DIR", "/home/user/pounce_czds")) DELAY_BETWEEN_DOWNLOADS = 5 # seconds - be nice to ICANN servers def get_credentials(): """Get CZDS credentials from environment or .env file.""" # Try environment first username = os.getenv("CZDS_USERNAME") password = os.getenv("CZDS_PASSWORD") if not username or not password: # Try loading from .env env_file = Path(__file__).parent.parent / ".env" if env_file.exists(): for line in env_file.read_text().splitlines(): if line.startswith("CZDS_USERNAME="): username = line.split("=", 1)[1].strip() elif line.startswith("CZDS_PASSWORD="): password = line.split("=", 1)[1].strip() return username, password def should_download_today(tld: str) -> bool: """Check if we already downloaded this TLD today.""" marker_file = DATA_DIR / f".{tld}_last_download" if marker_file.exists(): last_download = datetime.fromisoformat(marker_file.read_text().strip()) # Only download once per 24h if datetime.utcnow() - last_download < timedelta(hours=23): logger.info(f"â­ī¸ .{tld}: Already downloaded within 24h, skipping") return False return True def mark_downloaded(tld: str): """Mark TLD as downloaded today.""" marker_file = DATA_DIR / f".{tld}_last_download" marker_file.write_text(datetime.utcnow().isoformat()) def parse_zone_file(zone_path: Path, tld: str) -> set[str]: """ Parse zone file and extract unique domain names. Returns set of domain names (without TLD suffix). """ logger.info(f"📖 Parsing zone file for .{tld}...") domains = set() line_count = 0 # Handle gzipped files if str(zone_path).endswith('.gz'): open_func = lambda p: gzip.open(p, 'rt', encoding='utf-8', errors='ignore') else: open_func = lambda p: open(p, 'r', encoding='utf-8', errors='ignore') with open_func(zone_path) as f: for line in f: line_count += 1 # Skip comments and empty lines if line.startswith(';') or not line.strip(): continue # Parse zone file line parts = line.split() if len(parts) >= 4: name = parts[0].rstrip('.') # Must end with our TLD if name.lower().endswith(f'.{tld}'): # Extract just the domain name part domain_name = name[:-(len(tld) + 1)] # Skip the TLD itself and subdomains if domain_name and '.' not in domain_name: domains.add(domain_name.lower()) logger.info(f" Found {len(domains):,} unique domains from {line_count:,} lines") return domains def compute_checksum(domains: set[str]) -> str: """Compute SHA256 checksum of sorted domain list.""" sorted_domains = "\n".join(sorted(domains)) return hashlib.sha256(sorted_domains.encode()).hexdigest() def load_previous_domains(tld: str) -> set[str] | None: """Load previous day's domain set from cache file.""" cache_file = DATA_DIR / f"{tld}_domains.txt" if cache_file.exists(): try: content = cache_file.read_text() return set(line.strip() for line in content.splitlines() if line.strip()) except Exception as e: logger.warning(f"Failed to load cache for .{tld}: {e}") return None def save_domains(tld: str, domains: set[str]): """Save current domains to cache file.""" cache_file = DATA_DIR / f"{tld}_domains.txt" cache_file.write_text("\n".join(sorted(domains))) logger.info(f"💾 Saved {len(domains):,} domains for .{tld}") def find_drops(previous: set[str], current: set[str]) -> set[str]: """Find dropped domains (present yesterday, missing today).""" return previous - current async def save_drops_to_db(tld: str, dropped: set[str]): """Save dropped domains to database.""" if not dropped: return 0 try: from app.database import AsyncSessionLocal, init_db from app.models.zone_file import DroppedDomain from datetime import datetime await init_db() today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) async with AsyncSessionLocal() as db: # Batch insert records = [] for name in dropped: record = DroppedDomain( domain=f"{name}.{tld}", tld=tld, dropped_date=today, length=len(name), is_numeric=name.isdigit(), has_hyphen='-' in name ) records.append(record) db.add_all(records) await db.commit() logger.info(f"💾 Saved {len(dropped):,} dropped domains to database") return len(dropped) except Exception as e: logger.error(f"Failed to save drops to DB: {e}") return 0 def sync_single_tld(client: PyCZDSClient, tld: str, zone_urls: list[str]) -> dict: """ Sync a single TLD zone file. Returns dict with sync results. """ result = { "tld": tld, "status": "pending", "domain_count": 0, "dropped_count": 0, "new_count": 0, "error": None } # Check if we should download if not should_download_today(tld): result["status"] = "skipped" return result # Find URL for this TLD tld_url = next((url for url in zone_urls if f"/{tld}.zone" in url.lower()), None) if not tld_url: logger.warning(f"❌ No access to .{tld} zone file") result["status"] = "no_access" result["error"] = "No access to this TLD" return result try: logger.info(f"âŦ‡ī¸ Downloading .{tld} zone file...") # Download zone file using pyCZDS # This returns the path to the downloaded file downloaded_file = client.get_zonefile(tld_url, download_dir=str(DATA_DIR)) if not downloaded_file or not Path(downloaded_file).exists(): result["status"] = "download_failed" result["error"] = "Download returned no file" return result downloaded_path = Path(downloaded_file) file_size_mb = downloaded_path.stat().st_size / (1024 * 1024) logger.info(f" Downloaded: {file_size_mb:.1f} MB") # Parse zone file current_domains = parse_zone_file(downloaded_path, tld) result["domain_count"] = len(current_domains) # Load previous domains previous_domains = load_previous_domains(tld) if previous_domains: # Find drops dropped = find_drops(previous_domains, current_domains) result["dropped_count"] = len(dropped) result["new_count"] = len(current_domains - previous_domains) logger.info(f" 📉 Dropped: {len(dropped):,}") logger.info(f" 📈 New: {result['new_count']:,}") # Save drops to database (async) if dropped: asyncio.run(save_drops_to_db(tld, dropped)) else: logger.info(f" â„šī¸ First sync, no comparison available") # Save current domains for next comparison save_domains(tld, current_domains) # Mark as downloaded mark_downloaded(tld) # Clean up downloaded zone file (can be huge) downloaded_path.unlink() logger.info(f" đŸ—‘ī¸ Cleaned up zone file") result["status"] = "success" logger.info(f"✅ .{tld} sync complete!") except Exception as e: logger.exception(f"❌ Error syncing .{tld}: {e}") result["status"] = "error" result["error"] = str(e) return result def main(): """Main sync function.""" print("=" * 60) print("🌐 CZDS Zone File Sync (Safe Mode)") print("=" * 60) print(f"📅 Time: {datetime.utcnow().isoformat()} UTC") print(f"📂 Data dir: {DATA_DIR}") print(f"📋 TLDs: {', '.join(APPROVED_TLDS)}") print("-" * 60) # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) # Get credentials username, password = get_credentials() if not username or not password: print("❌ CZDS credentials not configured!") print(" Set CZDS_USERNAME and CZDS_PASSWORD in .env") sys.exit(1) print(f"👤 User: {username}") # Initialize pyCZDS client try: client = PyCZDSClient(username, password) print("✅ Authenticated with ICANN CZDS") except Exception as e: print(f"❌ Authentication failed: {e}") sys.exit(1) # Get available zone files try: zone_urls = client.get_zonefiles_list() available_tlds = [url.split('/')[-1].replace('.zone', '') for url in zone_urls] print(f"✅ Available zones: {', '.join(available_tlds)}") except Exception as e: print(f"❌ Failed to get zone list: {e}") sys.exit(1) print("-" * 60) # Sync each TLD results = [] for i, tld in enumerate(APPROVED_TLDS): print(f"\n[{i+1}/{len(APPROVED_TLDS)}] Processing .{tld}...") result = sync_single_tld(client, tld, zone_urls) results.append(result) # Delay between downloads (be nice to ICANN) if i < len(APPROVED_TLDS) - 1 and result["status"] == "success": print(f" âŗ Waiting {DELAY_BETWEEN_DOWNLOADS}s before next download...") time.sleep(DELAY_BETWEEN_DOWNLOADS) # Summary print("\n" + "=" * 60) print("📊 SUMMARY") print("=" * 60) success_count = sum(1 for r in results if r["status"] == "success") total_drops = sum(r["dropped_count"] for r in results) for r in results: icon = "✅" if r["status"] == "success" else "â­ī¸" if r["status"] == "skipped" else "❌" print(f"{icon} .{r['tld']}: {r['status']} - {r['domain_count']:,} domains, {r['dropped_count']:,} dropped") if r["error"]: print(f" âš ī¸ Error: {r['error']}") print("-" * 60) print(f"✅ Successful: {success_count}/{len(APPROVED_TLDS)}") print(f"📉 Total drops: {total_drops:,}") print("=" * 60) if __name__ == "__main__": # Parse arguments if len(sys.argv) > 1: # Only sync specific TLDs APPROVED_TLDS = [tld.lower() for tld in sys.argv[1:]] print(f"đŸŽ¯ Syncing specific TLDs: {APPROVED_TLDS}") main()