""" ICANN CZDS (Centralized Zone Data Service) Client ================================================== Downloads zone files from ICANN CZDS, parses them, and detects dropped domains. Authentication: OAuth2 with username/password Zone Format: Standard DNS zone file format (.txt.gz) Usage: client = CZDSClient(username, password) await client.sync_all_zones(db) """ import asyncio import gzip import hashlib import logging import os import re import shutil from datetime import datetime, timedelta from pathlib import Path from typing import Optional import httpx from sqlalchemy import select, func, text from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.models.zone_file import ZoneSnapshot, DroppedDomain logger = logging.getLogger(__name__) settings = get_settings() # ============================================================================ # CONSTANTS # ============================================================================ CZDS_AUTH_URL = "https://account-api.icann.org/api/authenticate" CZDS_ZONES_URL = "https://czds-api.icann.org/czds/downloads/links" CZDS_DOWNLOAD_BASE = "https://czds-download-api.icann.org" # TLDs we have approved access to APPROVED_TLDS = ["xyz", "org", "online", "info", "dev", "app"] # Regex to extract domain names from zone file NS records # Format: example.tld. IN NS ns1.example.com. NS_RECORD_PATTERN = re.compile(r'^([a-z0-9][-a-z0-9]*)\.[a-z]+\.\s+\d*\s*IN\s+NS\s+', re.IGNORECASE) # ============================================================================ # CZDS CLIENT # ============================================================================ class CZDSClient: """Client for ICANN CZDS zone file downloads.""" def __init__( self, username: Optional[str] = None, password: Optional[str] = None, data_dir: Optional[Path] = None ): self.username = username or os.getenv("CZDS_USERNAME") or settings.czds_username self.password = password or os.getenv("CZDS_PASSWORD") or settings.czds_password self.data_dir = data_dir or Path(os.getenv("CZDS_DATA_DIR", "/tmp/pounce_czds")) self.data_dir.mkdir(parents=True, exist_ok=True) self._token: Optional[str] = None self._token_expires: Optional[datetime] = None async def _authenticate(self) -> str: """Authenticate with ICANN and get access token.""" if self._token and self._token_expires and datetime.utcnow() < self._token_expires: return self._token if not self.username or not self.password: raise ValueError("CZDS credentials not configured. Set CZDS_USERNAME and CZDS_PASSWORD.") logger.info("Authenticating with ICANN CZDS...") async with httpx.AsyncClient(timeout=30) as client: response = await client.post( CZDS_AUTH_URL, json={"username": self.username, "password": self.password}, headers={"Content-Type": "application/json"} ) if response.status_code != 200: logger.error(f"CZDS authentication failed: {response.status_code} {response.text}") raise RuntimeError(f"CZDS authentication failed: {response.status_code}") data = response.json() self._token = data.get("accessToken") # Token expires in 24 hours, refresh after 23 hours self._token_expires = datetime.utcnow() + timedelta(hours=23) logger.info("CZDS authentication successful") return self._token async def get_available_zones(self) -> dict[str, str]: """ Get list of zone files available for download. Returns dict mapping TLD to download URL. """ token = await self._authenticate() async with httpx.AsyncClient(timeout=60) as client: response = await client.get( CZDS_ZONES_URL, headers={"Authorization": f"Bearer {token}"} ) if response.status_code != 200: logger.error(f"Failed to get zone list: {response.status_code}") return {} # Response is a list of download URLs urls = response.json() # Extract TLDs and their URLs zones = {} for url in urls: # URL format: https://czds-download-api.icann.org/czds/downloads/xyz.zone match = re.search(r'/([a-z0-9-]+)\.zone$', url, re.IGNORECASE) if match: tld = match.group(1).lower() zones[tld] = url logger.info(f"Available zones: {list(zones.keys())}") return zones async def download_zone(self, tld: str, download_url: Optional[str] = None) -> Optional[Path]: """ Download a zone file for a specific TLD. Args: tld: The TLD to download download_url: Optional explicit download URL (from get_available_zones) """ token = await self._authenticate() # Use provided URL or construct one if not download_url: download_url = f"{CZDS_DOWNLOAD_BASE}/czds/downloads/{tld}.zone" output_path = self.data_dir / f"{tld}.zone.txt.gz" logger.info(f"Downloading zone file for .{tld} from {download_url}...") async with httpx.AsyncClient(timeout=600, follow_redirects=True) as client: try: async with client.stream( "GET", download_url, headers={"Authorization": f"Bearer {token}"} ) as response: if response.status_code != 200: logger.error(f"Failed to download .{tld}: {response.status_code}") return None # Stream to file with open(output_path, "wb") as f: async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): f.write(chunk) file_size = output_path.stat().st_size / (1024 * 1024) logger.info(f"Downloaded .{tld} zone file: {file_size:.1f} MB") return output_path except Exception as e: logger.error(f"Error downloading .{tld}: {e}") return None def extract_zone_file(self, gz_path: Path) -> Path: """ Extract gzipped zone file to RAM drive for fastest access. Falls back to disk if RAM drive unavailable. """ from app.services.zone_file_parser import HighPerformanceZoneParser parser = HighPerformanceZoneParser(use_ram_drive=True) output_path = parser.extract_to_ram(gz_path) # Remove gz file to save space gz_path.unlink() return output_path def parse_zone_file(self, zone_path: Path, tld: str) -> set[str]: """ Parse zone file and extract unique domain names. Uses high-performance parallel parser with all CPU cores and RAM drive for maximum speed on large zone files. Returns set of domain names (without TLD suffix). """ from app.services.zone_file_parser import HighPerformanceZoneParser # Use parallel parser with RAM drive parser = HighPerformanceZoneParser(use_ram_drive=True) try: domains = parser.parse_zone_file_parallel(zone_path, tld) return domains finally: parser.cleanup_ram_drive() def compute_checksum(self, domains: set[str]) -> str: """Compute SHA256 checksum of sorted domain list.""" sorted_domains = "\n".join(sorted(domains)) return hashlib.sha256(sorted_domains.encode()).hexdigest() async def get_previous_domains(self, tld: str) -> Optional[set[str]]: """Load previous day's domain set from cache file.""" cache_file = self.data_dir / f"{tld}_domains.txt" if cache_file.exists(): try: content = cache_file.read_text() return set(line.strip() for line in content.splitlines() if line.strip()) except Exception as e: logger.warning(f"Failed to load cache for .{tld}: {e}") return None async def save_domains(self, tld: str, domains: set[str]): """Save current domains to cache file with date-based retention.""" from app.config import get_settings settings = get_settings() # Save current file (for next sync comparison) cache_file = self.data_dir / f"{tld}_domains.txt" cache_file.write_text("\n".join(sorted(domains))) # Also save dated snapshot for retention today = datetime.now().strftime("%Y-%m-%d") dated_file = self.data_dir / f"{tld}_domains_{today}.txt" if not dated_file.exists(): dated_file.write_text("\n".join(sorted(domains))) logger.info(f"Saved snapshot: {dated_file.name}") # Cleanup old snapshots (keep last N days) retention_days = getattr(settings, 'zone_retention_days', 3) await self._cleanup_old_snapshots(tld, retention_days) logger.info(f"Saved {len(domains):,} domains for .{tld}") async def _cleanup_old_snapshots(self, tld: str, keep_days: int = 3): """Remove zone file snapshots older than keep_days.""" import re from datetime import timedelta cutoff = datetime.now() - timedelta(days=keep_days) pattern = re.compile(rf"^{tld}_domains_(\d{{4}}-\d{{2}}-\d{{2}})\.txt$") for file in self.data_dir.glob(f"{tld}_domains_*.txt"): match = pattern.match(file.name) if match: file_date = datetime.strptime(match.group(1), "%Y-%m-%d") if file_date < cutoff: file.unlink() logger.info(f"Deleted old snapshot: {file.name}") async def process_drops( self, db: AsyncSession, tld: str, previous: set[str], current: set[str] ) -> list[dict]: """ Find dropped domains and store them directly. NOTE: We do NOT verify availability here to avoid RDAP rate limits/bans. Verification happens separately in the 'verify_drops' scheduler job which runs in small batches throughout the day. """ dropped = previous - current if not dropped: logger.info(f"No dropped domains found for .{tld}") return [] logger.info(f"Found {len(dropped):,} dropped domains for .{tld}, saving to database...") today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) # Store all drops - availability will be verified separately dropped_records = [] batch_size = 1000 dropped_list = list(dropped) for i in range(0, len(dropped_list), batch_size): batch = dropped_list[i:i + batch_size] for name in batch: try: record = DroppedDomain( domain=name, # Just the name, not full domain! tld=tld, dropped_date=today, length=len(name), is_numeric=name.isdigit(), has_hyphen='-' in name, availability_status='unknown' # Will be verified later ) db.add(record) dropped_records.append({ "domain": f"{name}.{tld}", "length": len(name), }) except Exception as e: # Duplicate or other error - skip pass # Commit batch try: await db.commit() except Exception: await db.rollback() if (i + batch_size) % 5000 == 0: logger.info(f"Saved {min(i + batch_size, len(dropped_list)):,}/{len(dropped_list):,} drops") # Final commit try: await db.commit() except Exception: await db.rollback() logger.info(f"CZDS drops for .{tld}: {len(dropped_records):,} saved (verification pending)") return dropped_records async def sync_zone( self, db: AsyncSession, tld: str, download_url: Optional[str] = None ) -> dict: """ Sync a single zone file: 1. Download zone file 2. Extract and parse 3. Compare with previous snapshot 4. Store dropped domains 5. Save new snapshot Args: db: Database session tld: TLD to sync download_url: Optional explicit download URL """ logger.info(f"Starting sync for .{tld}") result = { "tld": tld, "status": "pending", "current_count": 0, "previous_count": 0, "dropped_count": 0, "new_count": 0, "error": None } try: # Download zone file gz_path = await self.download_zone(tld, download_url) if not gz_path: result["status"] = "download_failed" result["error"] = "Failed to download zone file" return result # Extract zone_path = self.extract_zone_file(gz_path) # Parse current_domains = self.parse_zone_file(zone_path, tld) result["current_count"] = len(current_domains) # Clean up zone file (can be very large) # Note: Parser may have already deleted the file during cleanup_ram_drive() if zone_path.exists(): zone_path.unlink() # Get previous snapshot previous_domains = await self.get_previous_domains(tld) if previous_domains: result["previous_count"] = len(previous_domains) # Find dropped domains dropped = await self.process_drops(db, tld, previous_domains, current_domains) result["dropped_count"] = len(dropped) result["new_count"] = len(current_domains - previous_domains) # Save current snapshot await self.save_domains(tld, current_domains) # Save snapshot metadata checksum = self.compute_checksum(current_domains) snapshot = ZoneSnapshot( tld=tld, snapshot_date=datetime.utcnow(), domain_count=len(current_domains), checksum=checksum ) db.add(snapshot) await db.commit() result["status"] = "success" logger.info( f"Sync complete for .{tld}: " f"{result['current_count']:,} domains, " f"{result['dropped_count']:,} dropped, " f"{result['new_count']:,} new" ) except Exception as e: logger.exception(f"Error syncing .{tld}: {e}") result["status"] = "error" result["error"] = str(e) return result async def sync_all_zones( self, db: AsyncSession, tlds: Optional[list[str]] = None, parallel: bool = True, max_concurrent: int = 3 ) -> list[dict]: """ Sync all approved zone files. Args: db: Database session tlds: Optional list of TLDs to sync. Defaults to APPROVED_TLDS. parallel: If True, download zones in parallel (faster) max_concurrent: Max concurrent downloads (be nice to ICANN) Returns: List of sync results for each TLD. """ target_tlds = tlds or APPROVED_TLDS # Get available zones with their download URLs available_zones = await self.get_available_zones() logger.info(f"Starting CZDS sync for {len(target_tlds)} zones: {target_tlds}") logger.info(f"Available zones: {list(available_zones.keys())}") # Build list of TLDs with URLs sync_tasks = [] results = [] for tld in target_tlds: download_url = available_zones.get(tld) if not download_url: logger.warning(f"No download URL available for .{tld}") results.append({ "tld": tld, "status": "not_available", "current_count": 0, "previous_count": 0, "dropped_count": 0, "new_count": 0, "error": f"No access to .{tld} zone" }) continue sync_tasks.append((tld, download_url)) if parallel and len(sync_tasks) > 1: # Parallel sync with semaphore to limit concurrency semaphore = asyncio.Semaphore(max_concurrent) async def sync_with_semaphore(tld: str, url: str) -> dict: async with semaphore: return await self.sync_zone(db, tld, url) # Run all syncs in parallel parallel_results = await asyncio.gather( *[sync_with_semaphore(tld, url) for tld, url in sync_tasks], return_exceptions=True ) for (tld, _), result in zip(sync_tasks, parallel_results): if isinstance(result, Exception): logger.error(f"Sync failed for .{tld}: {result}") results.append({ "tld": tld, "status": "error", "current_count": 0, "previous_count": 0, "dropped_count": 0, "new_count": 0, "error": str(result) }) else: results.append(result) else: # Sequential sync (original behavior) for tld, download_url in sync_tasks: result = await self.sync_zone(db, tld, download_url) results.append(result) await asyncio.sleep(2) # Summary success_count = sum(1 for r in results if r["status"] == "success") total_dropped = sum(r["dropped_count"] for r in results) error_count = sum(1 for r in results if r["status"] == "error") logger.info( f"CZDS sync complete: " f"{success_count}/{len(target_tlds)} zones successful, " f"{error_count} errors, " f"{total_dropped:,} total dropped domains" ) # Send alert on errors if error_count > 0: await self._send_sync_alert(results) return results async def _send_sync_alert(self, results: list[dict]): """Send email alert when sync has errors.""" try: from app.services.email_service import email_service errors = [r for r in results if r["status"] == "error"] if not errors: return error_details = "\n".join([ f" ⢠.{r['tld']}: {r.get('error', 'Unknown error')}" for r in errors ]) success_count = sum(1 for r in results if r["status"] == "success") total_dropped = sum(r["dropped_count"] for r in results) await email_service.send_email( to_email=settings.smtp_from_email, # Send to admin subject=f"ā ļø CZDS Zone Sync Alert - {len(errors)} errors", html_content=f"""
Status: {success_count} success, {len(errors)} errors
Total Drops: {total_dropped:,}
{error_details}
Time: {datetime.utcnow().isoformat()}
""" ) logger.info("Sent sync error alert email") except Exception as e: logger.error(f"Failed to send sync alert: {e}") # ============================================================================ # STANDALONE SCRIPT # ============================================================================ async def main(): """Standalone script to run CZDS sync.""" import sys from app.database import AsyncSessionLocal, init_db # Initialize database await init_db() # Parse arguments tlds = sys.argv[1:] if len(sys.argv) > 1 else APPROVED_TLDS print(f"š CZDS Zone File Sync") print(f"š TLDs: {', '.join(tlds)}") print("-" * 50) client = CZDSClient() async with AsyncSessionLocal() as db: results = await client.sync_all_zones(db, tlds) print("\n" + "=" * 50) print("š RESULTS") print("=" * 50) for r in results: status_icon = "ā " if r["status"] == "success" else "ā" print(f"{status_icon} .{r['tld']}: {r['current_count']:,} domains, " f"{r['dropped_count']:,} dropped, {r['new_count']:,} new") if r["error"]: print(f" ā ļø Error: {r['error']}") total_dropped = sum(r["dropped_count"] for r in results) print(f"\nšÆ Total dropped domains: {total_dropped:,}") if __name__ == "__main__": asyncio.run(main())