pounce/backend/app/services/czds_client.py
Yves Gugger 85b1be691a fix: Disable RDAP verification to prevent bans, improve drops UI
- Disabled verify_drops scheduler job (caused RDAP rate limit bans)
- Zone files now saved without RDAP verification (zone diff is reliable)
- Added date-based zone file snapshots with 3-day retention
- Improved DropsTab UI with better status display:
  - "In Transition" with countdown timer for dropping_soon
  - "Available Now" with Buy button
  - "Re-registered" for taken domains
  - Track button for dropping_soon domains
- Added --shm-size=8g to backend container for multiprocessing
- Removed duplicate host cron job (scheduler handles everything)
2025-12-20 22:56:25 +01:00

530 lines
19 KiB
Python

"""
ICANN CZDS (Centralized Zone Data Service) Client
==================================================
Downloads zone files from ICANN CZDS, parses them, and detects dropped domains.
Authentication: OAuth2 with username/password
Zone Format: Standard DNS zone file format (.txt.gz)
Usage:
client = CZDSClient(username, password)
await client.sync_all_zones(db)
"""
import asyncio
import gzip
import hashlib
import logging
import os
import re
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import httpx
from sqlalchemy import select, func, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.models.zone_file import ZoneSnapshot, DroppedDomain
logger = logging.getLogger(__name__)
settings = get_settings()
# ============================================================================
# CONSTANTS
# ============================================================================
CZDS_AUTH_URL = "https://account-api.icann.org/api/authenticate"
CZDS_ZONES_URL = "https://czds-api.icann.org/czds/downloads/links"
CZDS_DOWNLOAD_BASE = "https://czds-download-api.icann.org"
# TLDs we have approved access to
APPROVED_TLDS = ["xyz", "org", "online", "info", "dev", "app"]
# Regex to extract domain names from zone file NS records
# Format: example.tld. IN NS ns1.example.com.
NS_RECORD_PATTERN = re.compile(r'^([a-z0-9][-a-z0-9]*)\.[a-z]+\.\s+\d*\s*IN\s+NS\s+', re.IGNORECASE)
# ============================================================================
# CZDS CLIENT
# ============================================================================
class CZDSClient:
"""Client for ICANN CZDS zone file downloads."""
def __init__(
self,
username: Optional[str] = None,
password: Optional[str] = None,
data_dir: Optional[Path] = None
):
self.username = username or os.getenv("CZDS_USERNAME") or settings.czds_username
self.password = password or os.getenv("CZDS_PASSWORD") or settings.czds_password
self.data_dir = data_dir or Path(os.getenv("CZDS_DATA_DIR", "/tmp/pounce_czds"))
self.data_dir.mkdir(parents=True, exist_ok=True)
self._token: Optional[str] = None
self._token_expires: Optional[datetime] = None
async def _authenticate(self) -> str:
"""Authenticate with ICANN and get access token."""
if self._token and self._token_expires and datetime.utcnow() < self._token_expires:
return self._token
if not self.username or not self.password:
raise ValueError("CZDS credentials not configured. Set CZDS_USERNAME and CZDS_PASSWORD.")
logger.info("Authenticating with ICANN CZDS...")
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
CZDS_AUTH_URL,
json={"username": self.username, "password": self.password},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
logger.error(f"CZDS authentication failed: {response.status_code} {response.text}")
raise RuntimeError(f"CZDS authentication failed: {response.status_code}")
data = response.json()
self._token = data.get("accessToken")
# Token expires in 24 hours, refresh after 23 hours
self._token_expires = datetime.utcnow() + timedelta(hours=23)
logger.info("CZDS authentication successful")
return self._token
async def get_available_zones(self) -> dict[str, str]:
"""
Get list of zone files available for download.
Returns dict mapping TLD to download URL.
"""
token = await self._authenticate()
async with httpx.AsyncClient(timeout=60) as client:
response = await client.get(
CZDS_ZONES_URL,
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code != 200:
logger.error(f"Failed to get zone list: {response.status_code}")
return {}
# Response is a list of download URLs
urls = response.json()
# Extract TLDs and their URLs
zones = {}
for url in urls:
# URL format: https://czds-download-api.icann.org/czds/downloads/xyz.zone
match = re.search(r'/([a-z0-9-]+)\.zone$', url, re.IGNORECASE)
if match:
tld = match.group(1).lower()
zones[tld] = url
logger.info(f"Available zones: {list(zones.keys())}")
return zones
async def download_zone(self, tld: str, download_url: Optional[str] = None) -> Optional[Path]:
"""
Download a zone file for a specific TLD.
Args:
tld: The TLD to download
download_url: Optional explicit download URL (from get_available_zones)
"""
token = await self._authenticate()
# Use provided URL or construct one
if not download_url:
download_url = f"{CZDS_DOWNLOAD_BASE}/czds/downloads/{tld}.zone"
output_path = self.data_dir / f"{tld}.zone.txt.gz"
logger.info(f"Downloading zone file for .{tld} from {download_url}...")
async with httpx.AsyncClient(timeout=600, follow_redirects=True) as client:
try:
async with client.stream(
"GET",
download_url,
headers={"Authorization": f"Bearer {token}"}
) as response:
if response.status_code != 200:
logger.error(f"Failed to download .{tld}: {response.status_code}")
return None
# Stream to file
with open(output_path, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=1024 * 1024):
f.write(chunk)
file_size = output_path.stat().st_size / (1024 * 1024)
logger.info(f"Downloaded .{tld} zone file: {file_size:.1f} MB")
return output_path
except Exception as e:
logger.error(f"Error downloading .{tld}: {e}")
return None
def extract_zone_file(self, gz_path: Path) -> Path:
"""
Extract gzipped zone file to RAM drive for fastest access.
Falls back to disk if RAM drive unavailable.
"""
from app.services.zone_file_parser import HighPerformanceZoneParser
parser = HighPerformanceZoneParser(use_ram_drive=True)
output_path = parser.extract_to_ram(gz_path)
# Remove gz file to save space
gz_path.unlink()
return output_path
def parse_zone_file(self, zone_path: Path, tld: str) -> set[str]:
"""
Parse zone file and extract unique domain names.
Uses high-performance parallel parser with all CPU cores
and RAM drive for maximum speed on large zone files.
Returns set of domain names (without TLD suffix).
"""
from app.services.zone_file_parser import HighPerformanceZoneParser
# Use parallel parser with RAM drive
parser = HighPerformanceZoneParser(use_ram_drive=True)
try:
domains = parser.parse_zone_file_parallel(zone_path, tld)
return domains
finally:
parser.cleanup_ram_drive()
def compute_checksum(self, domains: set[str]) -> str:
"""Compute SHA256 checksum of sorted domain list."""
sorted_domains = "\n".join(sorted(domains))
return hashlib.sha256(sorted_domains.encode()).hexdigest()
async def get_previous_domains(self, tld: str) -> Optional[set[str]]:
"""Load previous day's domain set from cache file."""
cache_file = self.data_dir / f"{tld}_domains.txt"
if cache_file.exists():
try:
content = cache_file.read_text()
return set(line.strip() for line in content.splitlines() if line.strip())
except Exception as e:
logger.warning(f"Failed to load cache for .{tld}: {e}")
return None
async def save_domains(self, tld: str, domains: set[str]):
"""Save current domains to cache file with date-based retention."""
from app.config import get_settings
settings = get_settings()
# Save current file (for next sync comparison)
cache_file = self.data_dir / f"{tld}_domains.txt"
cache_file.write_text("\n".join(sorted(domains)))
# Also save dated snapshot for retention
today = datetime.now().strftime("%Y-%m-%d")
dated_file = self.data_dir / f"{tld}_domains_{today}.txt"
if not dated_file.exists():
dated_file.write_text("\n".join(sorted(domains)))
logger.info(f"Saved snapshot: {dated_file.name}")
# Cleanup old snapshots (keep last N days)
retention_days = getattr(settings, 'zone_retention_days', 3)
await self._cleanup_old_snapshots(tld, retention_days)
logger.info(f"Saved {len(domains):,} domains for .{tld}")
async def _cleanup_old_snapshots(self, tld: str, keep_days: int = 3):
"""Remove zone file snapshots older than keep_days."""
import re
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=keep_days)
pattern = re.compile(rf"^{tld}_domains_(\d{{4}}-\d{{2}}-\d{{2}})\.txt$")
for file in self.data_dir.glob(f"{tld}_domains_*.txt"):
match = pattern.match(file.name)
if match:
file_date = datetime.strptime(match.group(1), "%Y-%m-%d")
if file_date < cutoff:
file.unlink()
logger.info(f"Deleted old snapshot: {file.name}")
async def process_drops(
self,
db: AsyncSession,
tld: str,
previous: set[str],
current: set[str]
) -> list[dict]:
"""
Find dropped domains and store them directly.
NOTE: We do NOT verify availability here to avoid RDAP rate limits/bans.
Verification happens separately in the 'verify_drops' scheduler job
which runs in small batches throughout the day.
"""
dropped = previous - current
if not dropped:
logger.info(f"No dropped domains found for .{tld}")
return []
logger.info(f"Found {len(dropped):,} dropped domains for .{tld}, saving to database...")
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
# Store all drops - availability will be verified separately
dropped_records = []
batch_size = 1000
dropped_list = list(dropped)
for i in range(0, len(dropped_list), batch_size):
batch = dropped_list[i:i + batch_size]
for name in batch:
try:
record = DroppedDomain(
domain=name, # Just the name, not full domain!
tld=tld,
dropped_date=today,
length=len(name),
is_numeric=name.isdigit(),
has_hyphen='-' in name,
availability_status='unknown' # Will be verified later
)
db.add(record)
dropped_records.append({
"domain": f"{name}.{tld}",
"length": len(name),
})
except Exception as e:
# Duplicate or other error - skip
pass
# Commit batch
try:
await db.commit()
except Exception:
await db.rollback()
if (i + batch_size) % 5000 == 0:
logger.info(f"Saved {min(i + batch_size, len(dropped_list)):,}/{len(dropped_list):,} drops")
# Final commit
try:
await db.commit()
except Exception:
await db.rollback()
logger.info(f"CZDS drops for .{tld}: {len(dropped_records):,} saved (verification pending)")
return dropped_records
async def sync_zone(
self,
db: AsyncSession,
tld: str,
download_url: Optional[str] = None
) -> dict:
"""
Sync a single zone file:
1. Download zone file
2. Extract and parse
3. Compare with previous snapshot
4. Store dropped domains
5. Save new snapshot
Args:
db: Database session
tld: TLD to sync
download_url: Optional explicit download URL
"""
logger.info(f"Starting sync for .{tld}")
result = {
"tld": tld,
"status": "pending",
"current_count": 0,
"previous_count": 0,
"dropped_count": 0,
"new_count": 0,
"error": None
}
try:
# Download zone file
gz_path = await self.download_zone(tld, download_url)
if not gz_path:
result["status"] = "download_failed"
result["error"] = "Failed to download zone file"
return result
# Extract
zone_path = self.extract_zone_file(gz_path)
# Parse
current_domains = self.parse_zone_file(zone_path, tld)
result["current_count"] = len(current_domains)
# Clean up zone file (can be very large)
# Note: Parser may have already deleted the file during cleanup_ram_drive()
if zone_path.exists():
zone_path.unlink()
# Get previous snapshot
previous_domains = await self.get_previous_domains(tld)
if previous_domains:
result["previous_count"] = len(previous_domains)
# Find dropped domains
dropped = await self.process_drops(db, tld, previous_domains, current_domains)
result["dropped_count"] = len(dropped)
result["new_count"] = len(current_domains - previous_domains)
# Save current snapshot
await self.save_domains(tld, current_domains)
# Save snapshot metadata
checksum = self.compute_checksum(current_domains)
snapshot = ZoneSnapshot(
tld=tld,
snapshot_date=datetime.utcnow(),
domain_count=len(current_domains),
checksum=checksum
)
db.add(snapshot)
await db.commit()
result["status"] = "success"
logger.info(
f"Sync complete for .{tld}: "
f"{result['current_count']:,} domains, "
f"{result['dropped_count']:,} dropped, "
f"{result['new_count']:,} new"
)
except Exception as e:
logger.exception(f"Error syncing .{tld}: {e}")
result["status"] = "error"
result["error"] = str(e)
return result
async def sync_all_zones(
self,
db: AsyncSession,
tlds: Optional[list[str]] = None
) -> list[dict]:
"""
Sync all approved zone files.
Args:
db: Database session
tlds: Optional list of TLDs to sync. Defaults to APPROVED_TLDS.
Returns:
List of sync results for each TLD.
"""
target_tlds = tlds or APPROVED_TLDS
# Get available zones with their download URLs
available_zones = await self.get_available_zones()
logger.info(f"Starting CZDS sync for {len(target_tlds)} zones: {target_tlds}")
logger.info(f"Available zones: {list(available_zones.keys())}")
results = []
for tld in target_tlds:
# Get the actual download URL for this TLD
download_url = available_zones.get(tld)
if not download_url:
logger.warning(f"No download URL available for .{tld}")
results.append({
"tld": tld,
"status": "not_available",
"current_count": 0,
"previous_count": 0,
"dropped_count": 0,
"new_count": 0,
"error": f"No access to .{tld} zone"
})
continue
result = await self.sync_zone(db, tld, download_url)
results.append(result)
# Small delay between zones to be nice to ICANN servers
await asyncio.sleep(2)
# Summary
success_count = sum(1 for r in results if r["status"] == "success")
total_dropped = sum(r["dropped_count"] for r in results)
logger.info(
f"CZDS sync complete: "
f"{success_count}/{len(target_tlds)} zones successful, "
f"{total_dropped:,} total dropped domains"
)
return results
# ============================================================================
# STANDALONE SCRIPT
# ============================================================================
async def main():
"""Standalone script to run CZDS sync."""
import sys
from app.database import AsyncSessionLocal, init_db
# Initialize database
await init_db()
# Parse arguments
tlds = sys.argv[1:] if len(sys.argv) > 1 else APPROVED_TLDS
print(f"🌐 CZDS Zone File Sync")
print(f"📂 TLDs: {', '.join(tlds)}")
print("-" * 50)
client = CZDSClient()
async with AsyncSessionLocal() as db:
results = await client.sync_all_zones(db, tlds)
print("\n" + "=" * 50)
print("📊 RESULTS")
print("=" * 50)
for r in results:
status_icon = "" if r["status"] == "success" else ""
print(f"{status_icon} .{r['tld']}: {r['current_count']:,} domains, "
f"{r['dropped_count']:,} dropped, {r['new_count']:,} new")
if r["error"]:
print(f" ⚠️ Error: {r['error']}")
total_dropped = sum(r["dropped_count"] for r in results)
print(f"\n🎯 Total dropped domains: {total_dropped:,}")
if __name__ == "__main__":
asyncio.run(main())