pounce/backend/app/services/czds_client.py
Yves Gugger f9e6025dc4 feat: Premium infrastructure improvements
1. Parallel Zone Downloads (3x faster)
   - CZDS zones now download in parallel with semaphore
   - Configurable max_concurrent (default: 3)
   - Added timing logs for performance monitoring

2. Email Alerts for Ops
   - New send_ops_alert() in email service
   - Automatic alerts on zone sync failures
   - Critical alerts on complete job crashes
   - Severity levels: info, warning, error, critical

3. Admin Zone Sync Dashboard
   - New "Zone Sync" tab in admin panel
   - Real-time status for all TLDs
   - Manual sync trigger buttons
   - Shows drops today, total drops, last sync time
   - Health status indicators (healthy/stale/never)
   - API endpoint: GET /admin/zone-sync/status
2025-12-21 13:25:08 +01:00

573 lines
21 KiB
Python

"""
ICANN CZDS (Centralized Zone Data Service) Client
==================================================
Downloads zone files from ICANN CZDS, parses them, and detects dropped domains.
Authentication: OAuth2 with username/password
Zone Format: Standard DNS zone file format (.txt.gz)
Usage:
client = CZDSClient(username, password)
await client.sync_all_zones(db)
"""
import asyncio
import gzip
import hashlib
import logging
import os
import re
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import httpx
from sqlalchemy import select, func, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.models.zone_file import ZoneSnapshot, DroppedDomain
logger = logging.getLogger(__name__)
settings = get_settings()
# ============================================================================
# CONSTANTS
# ============================================================================
CZDS_AUTH_URL = "https://account-api.icann.org/api/authenticate"
CZDS_ZONES_URL = "https://czds-api.icann.org/czds/downloads/links"
CZDS_DOWNLOAD_BASE = "https://czds-download-api.icann.org"
# TLDs we have approved access to
APPROVED_TLDS = ["xyz", "org", "online", "info", "dev", "app"]
# Regex to extract domain names from zone file NS records
# Format: example.tld. IN NS ns1.example.com.
NS_RECORD_PATTERN = re.compile(r'^([a-z0-9][-a-z0-9]*)\.[a-z]+\.\s+\d*\s*IN\s+NS\s+', re.IGNORECASE)
# ============================================================================
# CZDS CLIENT
# ============================================================================
class CZDSClient:
"""Client for ICANN CZDS zone file downloads."""
def __init__(
self,
username: Optional[str] = None,
password: Optional[str] = None,
data_dir: Optional[Path] = None
):
self.username = username or os.getenv("CZDS_USERNAME") or settings.czds_username
self.password = password or os.getenv("CZDS_PASSWORD") or settings.czds_password
self.data_dir = data_dir or Path(os.getenv("CZDS_DATA_DIR", "/tmp/pounce_czds"))
self.data_dir.mkdir(parents=True, exist_ok=True)
self._token: Optional[str] = None
self._token_expires: Optional[datetime] = None
async def _authenticate(self) -> str:
"""Authenticate with ICANN and get access token."""
if self._token and self._token_expires and datetime.utcnow() < self._token_expires:
return self._token
if not self.username or not self.password:
raise ValueError("CZDS credentials not configured. Set CZDS_USERNAME and CZDS_PASSWORD.")
logger.info("Authenticating with ICANN CZDS...")
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
CZDS_AUTH_URL,
json={"username": self.username, "password": self.password},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
logger.error(f"CZDS authentication failed: {response.status_code} {response.text}")
raise RuntimeError(f"CZDS authentication failed: {response.status_code}")
data = response.json()
self._token = data.get("accessToken")
# Token expires in 24 hours, refresh after 23 hours
self._token_expires = datetime.utcnow() + timedelta(hours=23)
logger.info("CZDS authentication successful")
return self._token
async def get_available_zones(self) -> dict[str, str]:
"""
Get list of zone files available for download.
Returns dict mapping TLD to download URL.
"""
token = await self._authenticate()
async with httpx.AsyncClient(timeout=60) as client:
response = await client.get(
CZDS_ZONES_URL,
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code != 200:
logger.error(f"Failed to get zone list: {response.status_code}")
return {}
# Response is a list of download URLs
urls = response.json()
# Extract TLDs and their URLs
zones = {}
for url in urls:
# URL format: https://czds-download-api.icann.org/czds/downloads/xyz.zone
match = re.search(r'/([a-z0-9-]+)\.zone$', url, re.IGNORECASE)
if match:
tld = match.group(1).lower()
zones[tld] = url
logger.info(f"Available zones: {list(zones.keys())}")
return zones
async def download_zone(self, tld: str, download_url: Optional[str] = None) -> Optional[Path]:
"""
Download a zone file for a specific TLD.
Args:
tld: The TLD to download
download_url: Optional explicit download URL (from get_available_zones)
"""
token = await self._authenticate()
# Use provided URL or construct one
if not download_url:
download_url = f"{CZDS_DOWNLOAD_BASE}/czds/downloads/{tld}.zone"
output_path = self.data_dir / f"{tld}.zone.txt.gz"
logger.info(f"Downloading zone file for .{tld} from {download_url}...")
async with httpx.AsyncClient(timeout=600, follow_redirects=True) as client:
try:
async with client.stream(
"GET",
download_url,
headers={"Authorization": f"Bearer {token}"}
) as response:
if response.status_code != 200:
logger.error(f"Failed to download .{tld}: {response.status_code}")
return None
# Stream to file
with open(output_path, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=1024 * 1024):
f.write(chunk)
file_size = output_path.stat().st_size / (1024 * 1024)
logger.info(f"Downloaded .{tld} zone file: {file_size:.1f} MB")
return output_path
except Exception as e:
logger.error(f"Error downloading .{tld}: {e}")
return None
def extract_zone_file(self, gz_path: Path) -> Path:
"""
Extract gzipped zone file to RAM drive for fastest access.
Falls back to disk if RAM drive unavailable.
"""
from app.services.zone_file_parser import HighPerformanceZoneParser
parser = HighPerformanceZoneParser(use_ram_drive=True)
output_path = parser.extract_to_ram(gz_path)
# Remove gz file to save space
gz_path.unlink()
return output_path
def parse_zone_file(self, zone_path: Path, tld: str) -> set[str]:
"""
Parse zone file and extract unique domain names.
Uses high-performance parallel parser with all CPU cores
and RAM drive for maximum speed on large zone files.
Returns set of domain names (without TLD suffix).
"""
from app.services.zone_file_parser import HighPerformanceZoneParser
# Use parallel parser with RAM drive
parser = HighPerformanceZoneParser(use_ram_drive=True)
try:
domains = parser.parse_zone_file_parallel(zone_path, tld)
return domains
finally:
parser.cleanup_ram_drive()
def compute_checksum(self, domains: set[str]) -> str:
"""Compute SHA256 checksum of sorted domain list."""
sorted_domains = "\n".join(sorted(domains))
return hashlib.sha256(sorted_domains.encode()).hexdigest()
async def get_previous_domains(self, tld: str) -> Optional[set[str]]:
"""Load previous day's domain set from cache file."""
cache_file = self.data_dir / f"{tld}_domains.txt"
if cache_file.exists():
try:
content = cache_file.read_text()
return set(line.strip() for line in content.splitlines() if line.strip())
except Exception as e:
logger.warning(f"Failed to load cache for .{tld}: {e}")
return None
async def save_domains(self, tld: str, domains: set[str]):
"""Save current domains to cache file with date-based retention."""
from app.config import get_settings
settings = get_settings()
# Save current file (for next sync comparison)
cache_file = self.data_dir / f"{tld}_domains.txt"
cache_file.write_text("\n".join(sorted(domains)))
# Also save dated snapshot for retention
today = datetime.now().strftime("%Y-%m-%d")
dated_file = self.data_dir / f"{tld}_domains_{today}.txt"
if not dated_file.exists():
dated_file.write_text("\n".join(sorted(domains)))
logger.info(f"Saved snapshot: {dated_file.name}")
# Cleanup old snapshots (keep last N days)
retention_days = getattr(settings, 'zone_retention_days', 3)
await self._cleanup_old_snapshots(tld, retention_days)
logger.info(f"Saved {len(domains):,} domains for .{tld}")
async def _cleanup_old_snapshots(self, tld: str, keep_days: int = 3):
"""Remove zone file snapshots older than keep_days."""
import re
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=keep_days)
pattern = re.compile(rf"^{tld}_domains_(\d{{4}}-\d{{2}}-\d{{2}})\.txt$")
for file in self.data_dir.glob(f"{tld}_domains_*.txt"):
match = pattern.match(file.name)
if match:
file_date = datetime.strptime(match.group(1), "%Y-%m-%d")
if file_date < cutoff:
file.unlink()
logger.info(f"Deleted old snapshot: {file.name}")
async def process_drops(
self,
db: AsyncSession,
tld: str,
previous: set[str],
current: set[str]
) -> list[dict]:
"""
Find dropped domains and store them directly.
NOTE: We do NOT verify availability here to avoid RDAP rate limits/bans.
Verification happens separately in the 'verify_drops' scheduler job
which runs in small batches throughout the day.
"""
dropped = previous - current
if not dropped:
logger.info(f"No dropped domains found for .{tld}")
return []
logger.info(f"Found {len(dropped):,} dropped domains for .{tld}, saving to database...")
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
# Store all drops - availability will be verified separately
dropped_records = []
batch_size = 1000
dropped_list = list(dropped)
for i in range(0, len(dropped_list), batch_size):
batch = dropped_list[i:i + batch_size]
for name in batch:
try:
record = DroppedDomain(
domain=name, # Just the name, not full domain!
tld=tld,
dropped_date=today,
length=len(name),
is_numeric=name.isdigit(),
has_hyphen='-' in name,
availability_status='unknown' # Will be verified later
)
db.add(record)
dropped_records.append({
"domain": f"{name}.{tld}",
"length": len(name),
})
except Exception as e:
# Duplicate or other error - skip
pass
# Commit batch
try:
await db.commit()
except Exception:
await db.rollback()
if (i + batch_size) % 5000 == 0:
logger.info(f"Saved {min(i + batch_size, len(dropped_list)):,}/{len(dropped_list):,} drops")
# Final commit
try:
await db.commit()
except Exception:
await db.rollback()
logger.info(f"CZDS drops for .{tld}: {len(dropped_records):,} saved (verification pending)")
return dropped_records
async def sync_zone(
self,
db: AsyncSession,
tld: str,
download_url: Optional[str] = None
) -> dict:
"""
Sync a single zone file:
1. Download zone file
2. Extract and parse
3. Compare with previous snapshot
4. Store dropped domains
5. Save new snapshot
Args:
db: Database session
tld: TLD to sync
download_url: Optional explicit download URL
"""
logger.info(f"Starting sync for .{tld}")
result = {
"tld": tld,
"status": "pending",
"current_count": 0,
"previous_count": 0,
"dropped_count": 0,
"new_count": 0,
"error": None
}
try:
# Download zone file
gz_path = await self.download_zone(tld, download_url)
if not gz_path:
result["status"] = "download_failed"
result["error"] = "Failed to download zone file"
return result
# Extract
zone_path = self.extract_zone_file(gz_path)
# Parse
current_domains = self.parse_zone_file(zone_path, tld)
result["current_count"] = len(current_domains)
# Clean up zone file (can be very large)
# Note: Parser may have already deleted the file during cleanup_ram_drive()
if zone_path.exists():
zone_path.unlink()
# Get previous snapshot
previous_domains = await self.get_previous_domains(tld)
if previous_domains:
result["previous_count"] = len(previous_domains)
# Find dropped domains
dropped = await self.process_drops(db, tld, previous_domains, current_domains)
result["dropped_count"] = len(dropped)
result["new_count"] = len(current_domains - previous_domains)
# Save current snapshot
await self.save_domains(tld, current_domains)
# Save snapshot metadata
checksum = self.compute_checksum(current_domains)
snapshot = ZoneSnapshot(
tld=tld,
snapshot_date=datetime.utcnow(),
domain_count=len(current_domains),
checksum=checksum
)
db.add(snapshot)
await db.commit()
result["status"] = "success"
logger.info(
f"Sync complete for .{tld}: "
f"{result['current_count']:,} domains, "
f"{result['dropped_count']:,} dropped, "
f"{result['new_count']:,} new"
)
except Exception as e:
logger.exception(f"Error syncing .{tld}: {e}")
result["status"] = "error"
result["error"] = str(e)
return result
async def sync_all_zones(
self,
db: AsyncSession,
tlds: Optional[list[str]] = None,
parallel: bool = True,
max_concurrent: int = 3
) -> list[dict]:
"""
Sync all approved zone files.
Args:
db: Database session
tlds: Optional list of TLDs to sync. Defaults to APPROVED_TLDS.
parallel: If True, download zones in parallel (faster)
max_concurrent: Max concurrent downloads (to be nice to ICANN)
Returns:
List of sync results for each TLD.
"""
target_tlds = tlds or APPROVED_TLDS
start_time = datetime.utcnow()
# Get available zones with their download URLs
available_zones = await self.get_available_zones()
logger.info(f"Starting CZDS sync for {len(target_tlds)} zones: {target_tlds}")
logger.info(f"Available zones: {list(available_zones.keys())}")
logger.info(f"Mode: {'PARALLEL' if parallel else 'SEQUENTIAL'} (max {max_concurrent} concurrent)")
# Prepare tasks with their download URLs
tasks_to_run = []
unavailable_results = []
for tld in target_tlds:
download_url = available_zones.get(tld)
if not download_url:
logger.warning(f"No download URL available for .{tld}")
unavailable_results.append({
"tld": tld,
"status": "not_available",
"current_count": 0,
"previous_count": 0,
"dropped_count": 0,
"new_count": 0,
"error": f"No access to .{tld} zone"
})
else:
tasks_to_run.append((tld, download_url))
results = unavailable_results.copy()
if parallel and len(tasks_to_run) > 1:
# Parallel execution with semaphore for rate limiting
semaphore = asyncio.Semaphore(max_concurrent)
async def sync_with_semaphore(tld: str, url: str) -> dict:
async with semaphore:
return await self.sync_zone(db, tld, url)
# Run all tasks in parallel
parallel_results = await asyncio.gather(
*[sync_with_semaphore(tld, url) for tld, url in tasks_to_run],
return_exceptions=True
)
# Process results
for i, result in enumerate(parallel_results):
tld = tasks_to_run[i][0]
if isinstance(result, Exception):
logger.error(f"Parallel sync failed for .{tld}: {result}")
results.append({
"tld": tld,
"status": "error",
"current_count": 0,
"previous_count": 0,
"dropped_count": 0,
"new_count": 0,
"error": str(result)
})
else:
results.append(result)
else:
# Sequential execution (fallback)
for tld, download_url in tasks_to_run:
result = await self.sync_zone(db, tld, download_url)
results.append(result)
await asyncio.sleep(2)
# Summary
elapsed = (datetime.utcnow() - start_time).total_seconds()
success_count = sum(1 for r in results if r["status"] == "success")
total_dropped = sum(r["dropped_count"] for r in results)
logger.info(
f"CZDS sync complete in {elapsed:.1f}s: "
f"{success_count}/{len(target_tlds)} zones successful, "
f"{total_dropped:,} total dropped domains"
)
return results
# ============================================================================
# STANDALONE SCRIPT
# ============================================================================
async def main():
"""Standalone script to run CZDS sync."""
import sys
from app.database import AsyncSessionLocal, init_db
# Initialize database
await init_db()
# Parse arguments
tlds = sys.argv[1:] if len(sys.argv) > 1 else APPROVED_TLDS
print(f"🌐 CZDS Zone File Sync")
print(f"📂 TLDs: {', '.join(tlds)}")
print("-" * 50)
client = CZDSClient()
async with AsyncSessionLocal() as db:
results = await client.sync_all_zones(db, tlds)
print("\n" + "=" * 50)
print("📊 RESULTS")
print("=" * 50)
for r in results:
status_icon = "" if r["status"] == "success" else ""
print(f"{status_icon} .{r['tld']}: {r['current_count']:,} domains, "
f"{r['dropped_count']:,} dropped, {r['new_count']:,} new")
if r["error"]:
print(f" ⚠️ Error: {r['error']}")
total_dropped = sum(r["dropped_count"] for r in results)
print(f"\n🎯 Total dropped domains: {total_dropped:,}")
if __name__ == "__main__":
asyncio.run(main())