pounce/backend/scripts/sync_czds_safe.py
Yves Gugger 5b99145fb2
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
fix: banner position and Sedo affiliate links
2025-12-16 09:02:00 +01:00

366 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Safe CZDS Zone File Sync Script
================================
Uses the official pyCZDS library to safely download zone files.
IMPORTANT Rate Limits:
- Max 1 download per TLD per 24 hours
- Max 3 direct downloads per TLD per 24h (or you get blocked!)
- Zone files are updated daily between 00:00-06:00 UTC
Run this script ONCE daily, after 06:00 UTC.
"""
import asyncio
import gzip
import hashlib
import logging
import os
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from pyczds.client import CZDSClient as PyCZDSClient
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
APPROVED_TLDS = ["xyz", "org", "online", "info", "dev", "app"]
DATA_DIR = Path(os.getenv("CZDS_DATA_DIR", "/home/user/pounce_czds"))
DELAY_BETWEEN_DOWNLOADS = 5 # seconds - be nice to ICANN servers
def get_credentials():
"""Get CZDS credentials from environment or .env file."""
# Try environment first
username = os.getenv("CZDS_USERNAME")
password = os.getenv("CZDS_PASSWORD")
if not username or not password:
# Try loading from .env
env_file = Path(__file__).parent.parent / ".env"
if env_file.exists():
for line in env_file.read_text().splitlines():
if line.startswith("CZDS_USERNAME="):
username = line.split("=", 1)[1].strip()
elif line.startswith("CZDS_PASSWORD="):
password = line.split("=", 1)[1].strip()
return username, password
def should_download_today(tld: str) -> bool:
"""Check if we already downloaded this TLD today."""
marker_file = DATA_DIR / f".{tld}_last_download"
if marker_file.exists():
last_download = datetime.fromisoformat(marker_file.read_text().strip())
# Only download once per 24h
if datetime.utcnow() - last_download < timedelta(hours=23):
logger.info(f"⏭️ .{tld}: Already downloaded within 24h, skipping")
return False
return True
def mark_downloaded(tld: str):
"""Mark TLD as downloaded today."""
marker_file = DATA_DIR / f".{tld}_last_download"
marker_file.write_text(datetime.utcnow().isoformat())
def parse_zone_file(zone_path: Path, tld: str) -> set[str]:
"""
Parse zone file and extract unique domain names.
Returns set of domain names (without TLD suffix).
"""
logger.info(f"📖 Parsing zone file for .{tld}...")
domains = set()
line_count = 0
# Handle gzipped files
if str(zone_path).endswith('.gz'):
open_func = lambda p: gzip.open(p, 'rt', encoding='utf-8', errors='ignore')
else:
open_func = lambda p: open(p, 'r', encoding='utf-8', errors='ignore')
with open_func(zone_path) as f:
for line in f:
line_count += 1
# Skip comments and empty lines
if line.startswith(';') or not line.strip():
continue
# Parse zone file line
parts = line.split()
if len(parts) >= 4:
name = parts[0].rstrip('.')
# Must end with our TLD
if name.lower().endswith(f'.{tld}'):
# Extract just the domain name part
domain_name = name[:-(len(tld) + 1)]
# Skip the TLD itself and subdomains
if domain_name and '.' not in domain_name:
domains.add(domain_name.lower())
logger.info(f" Found {len(domains):,} unique domains from {line_count:,} lines")
return domains
def compute_checksum(domains: set[str]) -> str:
"""Compute SHA256 checksum of sorted domain list."""
sorted_domains = "\n".join(sorted(domains))
return hashlib.sha256(sorted_domains.encode()).hexdigest()
def load_previous_domains(tld: str) -> set[str] | None:
"""Load previous day's domain set from cache file."""
cache_file = DATA_DIR / f"{tld}_domains.txt"
if cache_file.exists():
try:
content = cache_file.read_text()
return set(line.strip() for line in content.splitlines() if line.strip())
except Exception as e:
logger.warning(f"Failed to load cache for .{tld}: {e}")
return None
def save_domains(tld: str, domains: set[str]):
"""Save current domains to cache file."""
cache_file = DATA_DIR / f"{tld}_domains.txt"
cache_file.write_text("\n".join(sorted(domains)))
logger.info(f"💾 Saved {len(domains):,} domains for .{tld}")
def find_drops(previous: set[str], current: set[str]) -> set[str]:
"""Find dropped domains (present yesterday, missing today)."""
return previous - current
async def save_drops_to_db(tld: str, dropped: set[str]):
"""Save dropped domains to database."""
if not dropped:
return 0
try:
from app.database import AsyncSessionLocal, init_db
from app.models.zone_file import DroppedDomain
from datetime import datetime
await init_db()
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
async with AsyncSessionLocal() as db:
# Batch insert
records = []
for name in dropped:
record = DroppedDomain(
domain=f"{name}.{tld}",
tld=tld,
dropped_date=today,
length=len(name),
is_numeric=name.isdigit(),
has_hyphen='-' in name
)
records.append(record)
db.add_all(records)
await db.commit()
logger.info(f"💾 Saved {len(dropped):,} dropped domains to database")
return len(dropped)
except Exception as e:
logger.error(f"Failed to save drops to DB: {e}")
return 0
def sync_single_tld(client: PyCZDSClient, tld: str, zone_urls: list[str]) -> dict:
"""
Sync a single TLD zone file.
Returns dict with sync results.
"""
result = {
"tld": tld,
"status": "pending",
"domain_count": 0,
"dropped_count": 0,
"new_count": 0,
"error": None
}
# Check if we should download
if not should_download_today(tld):
result["status"] = "skipped"
return result
# Find URL for this TLD
tld_url = next((url for url in zone_urls if f"/{tld}.zone" in url.lower()), None)
if not tld_url:
logger.warning(f"❌ No access to .{tld} zone file")
result["status"] = "no_access"
result["error"] = "No access to this TLD"
return result
try:
logger.info(f"⬇️ Downloading .{tld} zone file...")
# Download zone file using pyCZDS
# This returns the path to the downloaded file
downloaded_file = client.get_zonefile(tld_url, download_dir=str(DATA_DIR))
if not downloaded_file or not Path(downloaded_file).exists():
result["status"] = "download_failed"
result["error"] = "Download returned no file"
return result
downloaded_path = Path(downloaded_file)
file_size_mb = downloaded_path.stat().st_size / (1024 * 1024)
logger.info(f" Downloaded: {file_size_mb:.1f} MB")
# Parse zone file
current_domains = parse_zone_file(downloaded_path, tld)
result["domain_count"] = len(current_domains)
# Load previous domains
previous_domains = load_previous_domains(tld)
if previous_domains:
# Find drops
dropped = find_drops(previous_domains, current_domains)
result["dropped_count"] = len(dropped)
result["new_count"] = len(current_domains - previous_domains)
logger.info(f" 📉 Dropped: {len(dropped):,}")
logger.info(f" 📈 New: {result['new_count']:,}")
# Save drops to database (async)
if dropped:
asyncio.run(save_drops_to_db(tld, dropped))
else:
logger.info(f" First sync, no comparison available")
# Save current domains for next comparison
save_domains(tld, current_domains)
# Mark as downloaded
mark_downloaded(tld)
# Clean up downloaded zone file (can be huge)
downloaded_path.unlink()
logger.info(f" 🗑️ Cleaned up zone file")
result["status"] = "success"
logger.info(f"✅ .{tld} sync complete!")
except Exception as e:
logger.exception(f"❌ Error syncing .{tld}: {e}")
result["status"] = "error"
result["error"] = str(e)
return result
def main():
"""Main sync function."""
print("=" * 60)
print("🌐 CZDS Zone File Sync (Safe Mode)")
print("=" * 60)
print(f"📅 Time: {datetime.utcnow().isoformat()} UTC")
print(f"📂 Data dir: {DATA_DIR}")
print(f"📋 TLDs: {', '.join(APPROVED_TLDS)}")
print("-" * 60)
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Get credentials
username, password = get_credentials()
if not username or not password:
print("❌ CZDS credentials not configured!")
print(" Set CZDS_USERNAME and CZDS_PASSWORD in .env")
sys.exit(1)
print(f"👤 User: {username}")
# Initialize pyCZDS client
try:
client = PyCZDSClient(username, password)
print("✅ Authenticated with ICANN CZDS")
except Exception as e:
print(f"❌ Authentication failed: {e}")
sys.exit(1)
# Get available zone files
try:
zone_urls = client.get_zonefiles_list()
available_tlds = [url.split('/')[-1].replace('.zone', '') for url in zone_urls]
print(f"✅ Available zones: {', '.join(available_tlds)}")
except Exception as e:
print(f"❌ Failed to get zone list: {e}")
sys.exit(1)
print("-" * 60)
# Sync each TLD
results = []
for i, tld in enumerate(APPROVED_TLDS):
print(f"\n[{i+1}/{len(APPROVED_TLDS)}] Processing .{tld}...")
result = sync_single_tld(client, tld, zone_urls)
results.append(result)
# Delay between downloads (be nice to ICANN)
if i < len(APPROVED_TLDS) - 1 and result["status"] == "success":
print(f" ⏳ Waiting {DELAY_BETWEEN_DOWNLOADS}s before next download...")
time.sleep(DELAY_BETWEEN_DOWNLOADS)
# Summary
print("\n" + "=" * 60)
print("📊 SUMMARY")
print("=" * 60)
success_count = sum(1 for r in results if r["status"] == "success")
total_drops = sum(r["dropped_count"] for r in results)
for r in results:
icon = "" if r["status"] == "success" else "⏭️" if r["status"] == "skipped" else ""
print(f"{icon} .{r['tld']}: {r['status']} - {r['domain_count']:,} domains, {r['dropped_count']:,} dropped")
if r["error"]:
print(f" ⚠️ Error: {r['error']}")
print("-" * 60)
print(f"✅ Successful: {success_count}/{len(APPROVED_TLDS)}")
print(f"📉 Total drops: {total_drops:,}")
print("=" * 60)
if __name__ == "__main__":
# Parse arguments
if len(sys.argv) > 1:
# Only sync specific TLDs
APPROVED_TLDS = [tld.lower() for tld in sys.argv[1:]]
print(f"🎯 Syncing specific TLDs: {APPROVED_TLDS}")
main()