""" ICANN CZDS (Centralized Zone Data Service) Client ================================================== Downloads zone files from ICANN CZDS, parses them, and detects dropped domains. Authentication: OAuth2 with username/password Zone Format: Standard DNS zone file format (.txt.gz) Usage: client = CZDSClient(username, password) await client.sync_all_zones(db) """ import asyncio import gzip import hashlib import logging import os import re import shutil from datetime import datetime, timedelta from pathlib import Path from typing import Optional import httpx from sqlalchemy import select, func, text from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.models.zone_file import ZoneSnapshot, DroppedDomain logger = logging.getLogger(__name__) settings = get_settings() # ============================================================================ # CONSTANTS # ============================================================================ CZDS_AUTH_URL = "https://account-api.icann.org/api/authenticate" CZDS_ZONES_URL = "https://czds-api.icann.org/czds/downloads/links" CZDS_DOWNLOAD_BASE = "https://czds-download.icann.org" # TLDs we have approved access to APPROVED_TLDS = ["xyz", "org", "online", "info", "dev", "app"] # Regex to extract domain names from zone file NS records # Format: example.tld. IN NS ns1.example.com. NS_RECORD_PATTERN = re.compile(r'^([a-z0-9][-a-z0-9]*)\.[a-z]+\.\s+\d*\s*IN\s+NS\s+', re.IGNORECASE) # ============================================================================ # CZDS CLIENT # ============================================================================ class CZDSClient: """Client for ICANN CZDS zone file downloads.""" def __init__( self, username: Optional[str] = None, password: Optional[str] = None, data_dir: Optional[Path] = None ): self.username = username or os.getenv("CZDS_USERNAME") or settings.czds_username self.password = password or os.getenv("CZDS_PASSWORD") or settings.czds_password self.data_dir = data_dir or Path(os.getenv("CZDS_DATA_DIR", "/tmp/pounce_czds")) self.data_dir.mkdir(parents=True, exist_ok=True) self._token: Optional[str] = None self._token_expires: Optional[datetime] = None async def _authenticate(self) -> str: """Authenticate with ICANN and get access token.""" if self._token and self._token_expires and datetime.utcnow() < self._token_expires: return self._token if not self.username or not self.password: raise ValueError("CZDS credentials not configured. Set CZDS_USERNAME and CZDS_PASSWORD.") logger.info("Authenticating with ICANN CZDS...") async with httpx.AsyncClient(timeout=30) as client: response = await client.post( CZDS_AUTH_URL, json={"username": self.username, "password": self.password}, headers={"Content-Type": "application/json"} ) if response.status_code != 200: logger.error(f"CZDS authentication failed: {response.status_code} {response.text}") raise RuntimeError(f"CZDS authentication failed: {response.status_code}") data = response.json() self._token = data.get("accessToken") # Token expires in 24 hours, refresh after 23 hours self._token_expires = datetime.utcnow() + timedelta(hours=23) logger.info("CZDS authentication successful") return self._token async def get_available_zones(self) -> list[str]: """Get list of zone files available for download.""" token = await self._authenticate() async with httpx.AsyncClient(timeout=60) as client: response = await client.get( CZDS_ZONES_URL, headers={"Authorization": f"Bearer {token}"} ) if response.status_code != 200: logger.error(f"Failed to get zone list: {response.status_code}") return [] # Response is a list of download URLs urls = response.json() # Extract TLDs from URLs tlds = [] for url in urls: # URL format: https://czds-download.icann.org/czds/downloads/xyz.zone match = re.search(r'/([a-z0-9-]+)\.zone$', url, re.IGNORECASE) if match: tlds.append(match.group(1).lower()) logger.info(f"Available zones: {tlds}") return tlds async def download_zone(self, tld: str) -> Optional[Path]: """Download a zone file for a specific TLD.""" token = await self._authenticate() download_url = f"{CZDS_DOWNLOAD_BASE}/czds/downloads/{tld}.zone" output_path = self.data_dir / f"{tld}.zone.txt.gz" logger.info(f"Downloading zone file for .{tld}...") async with httpx.AsyncClient(timeout=600, follow_redirects=True) as client: try: async with client.stream( "GET", download_url, headers={"Authorization": f"Bearer {token}"} ) as response: if response.status_code != 200: logger.error(f"Failed to download .{tld}: {response.status_code}") return None # Stream to file with open(output_path, "wb") as f: async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): f.write(chunk) file_size = output_path.stat().st_size / (1024 * 1024) logger.info(f"Downloaded .{tld} zone file: {file_size:.1f} MB") return output_path except Exception as e: logger.error(f"Error downloading .{tld}: {e}") return None def extract_zone_file(self, gz_path: Path) -> Path: """Extract gzipped zone file.""" output_path = gz_path.with_suffix('') # Remove .gz logger.info(f"Extracting {gz_path.name}...") with gzip.open(gz_path, 'rb') as f_in: with open(output_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) # Remove gz file to save space gz_path.unlink() return output_path def parse_zone_file(self, zone_path: Path, tld: str) -> set[str]: """ Parse zone file and extract unique domain names. Zone files contain various record types. We extract domains from: - NS records (most reliable indicator of active domain) - A/AAAA records Returns set of domain names (without TLD suffix). """ logger.info(f"Parsing zone file for .{tld}...") domains = set() line_count = 0 with open(zone_path, 'r', encoding='utf-8', errors='ignore') as f: for line in f: line_count += 1 # Skip comments and empty lines if line.startswith(';') or not line.strip(): continue # Look for NS records which indicate delegated domains # Format: example.tld. 86400 IN NS ns1.registrar.com. parts = line.split() if len(parts) >= 4: # First column is the domain name name = parts[0].rstrip('.') # Must end with our TLD if name.lower().endswith(f'.{tld}'): # Extract just the domain name part domain_name = name[:-(len(tld) + 1)] # Skip the TLD itself and subdomains if domain_name and '.' not in domain_name: domains.add(domain_name.lower()) logger.info(f"Parsed .{tld}: {len(domains):,} unique domains from {line_count:,} lines") return domains def compute_checksum(self, domains: set[str]) -> str: """Compute SHA256 checksum of sorted domain list.""" sorted_domains = "\n".join(sorted(domains)) return hashlib.sha256(sorted_domains.encode()).hexdigest() async def get_previous_domains(self, tld: str) -> Optional[set[str]]: """Load previous day's domain set from cache file.""" cache_file = self.data_dir / f"{tld}_domains.txt" if cache_file.exists(): try: content = cache_file.read_text() return set(line.strip() for line in content.splitlines() if line.strip()) except Exception as e: logger.warning(f"Failed to load cache for .{tld}: {e}") return None async def save_domains(self, tld: str, domains: set[str]): """Save current domains to cache file.""" cache_file = self.data_dir / f"{tld}_domains.txt" cache_file.write_text("\n".join(sorted(domains))) logger.info(f"Saved {len(domains):,} domains for .{tld}") async def process_drops( self, db: AsyncSession, tld: str, previous: set[str], current: set[str] ) -> list[dict]: """Find and store dropped domains.""" dropped = previous - current if not dropped: logger.info(f"No dropped domains found for .{tld}") return [] logger.info(f"Found {len(dropped):,} dropped domains for .{tld}") today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) # Batch insert for performance dropped_records = [] batch_size = 1000 batch = [] for name in dropped: record = DroppedDomain( domain=f"{name}.{tld}", tld=tld, dropped_date=today, length=len(name), is_numeric=name.isdigit(), has_hyphen='-' in name ) batch.append(record) dropped_records.append({ "domain": f"{name}.{tld}", "length": len(name), "is_numeric": name.isdigit(), "has_hyphen": '-' in name }) if len(batch) >= batch_size: db.add_all(batch) await db.flush() batch = [] # Add remaining if batch: db.add_all(batch) await db.commit() return dropped_records async def sync_zone(self, db: AsyncSession, tld: str) -> dict: """ Sync a single zone file: 1. Download zone file 2. Extract and parse 3. Compare with previous snapshot 4. Store dropped domains 5. Save new snapshot """ logger.info(f"Starting sync for .{tld}") result = { "tld": tld, "status": "pending", "current_count": 0, "previous_count": 0, "dropped_count": 0, "new_count": 0, "error": None } try: # Download zone file gz_path = await self.download_zone(tld) if not gz_path: result["status"] = "download_failed" result["error"] = "Failed to download zone file" return result # Extract zone_path = self.extract_zone_file(gz_path) # Parse current_domains = self.parse_zone_file(zone_path, tld) result["current_count"] = len(current_domains) # Clean up zone file (can be very large) zone_path.unlink() # Get previous snapshot previous_domains = await self.get_previous_domains(tld) if previous_domains: result["previous_count"] = len(previous_domains) # Find dropped domains dropped = await self.process_drops(db, tld, previous_domains, current_domains) result["dropped_count"] = len(dropped) result["new_count"] = len(current_domains - previous_domains) # Save current snapshot await self.save_domains(tld, current_domains) # Save snapshot metadata checksum = self.compute_checksum(current_domains) snapshot = ZoneSnapshot( tld=tld, snapshot_date=datetime.utcnow(), domain_count=len(current_domains), checksum=checksum ) db.add(snapshot) await db.commit() result["status"] = "success" logger.info( f"Sync complete for .{tld}: " f"{result['current_count']:,} domains, " f"{result['dropped_count']:,} dropped, " f"{result['new_count']:,} new" ) except Exception as e: logger.exception(f"Error syncing .{tld}: {e}") result["status"] = "error" result["error"] = str(e) return result async def sync_all_zones( self, db: AsyncSession, tlds: Optional[list[str]] = None ) -> list[dict]: """ Sync all approved zone files. Args: db: Database session tlds: Optional list of TLDs to sync. Defaults to APPROVED_TLDS. Returns: List of sync results for each TLD. """ tlds = tlds or APPROVED_TLDS logger.info(f"Starting CZDS sync for {len(tlds)} zones: {tlds}") results = [] for tld in tlds: result = await self.sync_zone(db, tld) results.append(result) # Small delay between zones to be nice to ICANN servers await asyncio.sleep(2) # Summary success_count = sum(1 for r in results if r["status"] == "success") total_dropped = sum(r["dropped_count"] for r in results) logger.info( f"CZDS sync complete: " f"{success_count}/{len(tlds)} zones successful, " f"{total_dropped:,} total dropped domains" ) return results # ============================================================================ # STANDALONE SCRIPT # ============================================================================ async def main(): """Standalone script to run CZDS sync.""" import sys from app.database import AsyncSessionLocal, init_db # Initialize database await init_db() # Parse arguments tlds = sys.argv[1:] if len(sys.argv) > 1 else APPROVED_TLDS print(f"🌐 CZDS Zone File Sync") print(f"šŸ“‚ TLDs: {', '.join(tlds)}") print("-" * 50) client = CZDSClient() async with AsyncSessionLocal() as db: results = await client.sync_all_zones(db, tlds) print("\n" + "=" * 50) print("šŸ“Š RESULTS") print("=" * 50) for r in results: status_icon = "āœ…" if r["status"] == "success" else "āŒ" print(f"{status_icon} .{r['tld']}: {r['current_count']:,} domains, " f"{r['dropped_count']:,} dropped, {r['new_count']:,} new") if r["error"]: print(f" āš ļø Error: {r['error']}") total_dropped = sum(r["dropped_count"] for r in results) print(f"\nšŸŽÆ Total dropped domains: {total_dropped:,}") if __name__ == "__main__": asyncio.run(main())