pounce/backend/scripts/sync_all_zones.py
Yves Gugger d7eb86b0c0
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Fix: Robust DB connection for zone sync - reads DATABASE_URL directly from .env
2025-12-17 12:00:23 +01:00

640 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Pounce Zone File Sync - Daily Automated Zone File Synchronization
This script:
1. Downloads zone files from ICANN CZDS (app, dev, info, online, org, xyz)
2. Downloads zone files from Switch.ch via AXFR (.ch, .li)
3. Compares with yesterday's data to detect drops
4. Stores drops in the database for the Drops tab
5. Cleans up all temporary files and compresses domain lists
STORAGE STRATEGY (Ultra-Efficient):
- Raw zone files: DELETED immediately after parsing
- Domain lists: Stored COMPRESSED (.gz) - ~80% size reduction
- Only keeps current snapshot (no history)
- Drops stored in DB for 48h only
Run daily via cron at 06:00 UTC (after most registries update)
"""
import asyncio
import gzip
import logging
import subprocess
import sys
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Set
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Configuration
CZDS_DIR = Path("/home/user/pounce_czds")
SWITCH_DIR = Path("/home/user/pounce_switch")
LOG_FILE = Path("/home/user/logs/zone_sync.log")
# Storage efficiency: compress domain lists
COMPRESS_DOMAIN_LISTS = True
# CZDS TLDs we have access to
# Note: .org is HUGE (~10M domains, 442MB gz) - requires special handling
CZDS_TLDS = ["app", "dev", "info", "online", "xyz"] # org temporarily excluded due to memory
CZDS_TLDS_LARGE = ["org"] # Process separately with streaming
# Switch.ch AXFR config
SWITCH_CONFIG = {
"ch": {
"server": "zonedata.switch.ch",
"key_name": "tsig-zonedata-ch-public-21-01.",
"key_secret": "stZwEGApYumtXkh73qMLPqfbIDozWKZLkqRvcjKSpRnsor6A6MxixRL6C2HeSVBQNfMW4wer+qjS0ZSfiWiJ3Q=="
},
"li": {
"server": "zonedata.switch.ch",
"key_name": "tsig-zonedata-li-public-21-01.",
"key_secret": "t8GgeCn+fhPaj+cRy/lakQPb6M45xz/NZwmcp4iqbBxKFCCH0/k3xNGe6sf3ObmoaKDBedge/La4cpPfLqtFkw=="
}
}
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_FILE) if LOG_FILE.parent.exists() else logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ZoneSyncResult:
"""Result of a zone sync operation"""
def __init__(self, tld: str):
self.tld = tld
self.success = False
self.domain_count = 0
self.drops_count = 0
self.error: Optional[str] = None
self.duration_seconds = 0
async def get_db_session():
"""Create async database session - ROBUST VERSION for standalone script"""
# Read DATABASE_URL directly from .env (avoids import issues when running standalone)
env_file = Path("/home/user/pounce/backend/.env")
if not env_file.exists():
env_file = Path(__file__).parent.parent / ".env"
db_url = None
if env_file.exists():
for line in env_file.read_text().splitlines():
if line.startswith("DATABASE_URL="):
db_url = line.split("=", 1)[1].strip().strip('"').strip("'")
break
# Default to SQLite if not found
if not db_url:
db_url = "sqlite:///./domainwatch.db"
logger.warning(f"DATABASE_URL not found in .env, using default: {db_url}")
# Convert to async driver
if "sqlite://" in db_url and "aiosqlite" not in db_url:
db_url = db_url.replace("sqlite://", "sqlite+aiosqlite://")
elif "postgresql://" in db_url and "asyncpg" not in db_url:
db_url = db_url.replace("postgresql://", "postgresql+asyncpg://")
logger.info(f"Database URL: {db_url[:50]}...")
engine = create_async_engine(db_url, echo=False)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
return async_session()
def download_czds_zone(tld: str, max_retries: int = 3) -> Optional[Path]:
"""Download a single CZDS zone file using pyCZDS with retry logic"""
import time
for attempt in range(max_retries):
try:
from pyczds.client import CZDSClient
# Read credentials from .env
env_file = Path(__file__).parent.parent / ".env"
if not env_file.exists():
env_file = Path("/home/user/pounce/backend/.env")
env_content = env_file.read_text()
username = password = None
for line in env_content.splitlines():
if line.startswith("CZDS_USERNAME="):
username = line.split("=", 1)[1].strip()
elif line.startswith("CZDS_PASSWORD="):
password = line.split("=", 1)[1].strip()
if not username or not password:
logger.error(f"CZDS credentials not found in .env")
return None
client = CZDSClient(username, password)
urls = client.get_zonefiles_list()
# Find URL for this TLD
target_url = None
for url in urls:
if f"{tld}.zone" in url or f"/{tld}." in url:
target_url = url
break
if not target_url:
logger.warning(f"No access to .{tld} zone file")
return None
logger.info(f"Downloading .{tld} from CZDS... (attempt {attempt + 1}/{max_retries})")
result = client.get_zonefile(target_url, download_dir=str(CZDS_DIR))
# Find the downloaded file
gz_file = CZDS_DIR / f"{tld}.txt.gz"
if gz_file.exists():
return gz_file
# Try alternative naming (pyCZDS sometimes uses different names)
for f in CZDS_DIR.glob(f"*{tld}*.gz"):
return f
# File not found after download - raise exception to trigger retry
raise FileNotFoundError(f"Downloaded file not found for .{tld} in {CZDS_DIR}")
except Exception as e:
logger.warning(f"CZDS download attempt {attempt + 1} failed for .{tld}: {e}")
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 30 # 30s, 60s, 90s backoff
logger.info(f"Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
logger.error(f"CZDS download failed for .{tld} after {max_retries} attempts")
return None
return None
def download_switch_zone(tld: str) -> Optional[Path]:
"""Download zone file from Switch.ch via AXFR"""
config = SWITCH_CONFIG.get(tld)
if not config:
return None
try:
output_file = SWITCH_DIR / f"{tld}_zone.txt"
cmd = [
"dig", "@" + config["server"],
f"{tld}.", "AXFR",
"-y", f"hmac-sha512:{config['key_name']}:{config['key_secret']}"
]
logger.info(f"Downloading .{tld} via AXFR from Switch.ch...")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
logger.error(f"AXFR failed for .{tld}: {result.stderr}")
return None
output_file.write_text(result.stdout)
return output_file
except subprocess.TimeoutExpired:
logger.error(f"AXFR timeout for .{tld}")
return None
except Exception as e:
logger.error(f"AXFR failed for .{tld}: {e}")
return None
def parse_czds_zone(gz_file: Path, tld: str) -> set:
"""Parse a gzipped CZDS zone file and extract unique root domains"""
domains = set()
try:
with gzip.open(gz_file, 'rt', encoding='utf-8', errors='ignore') as f:
for line in f:
if line.startswith(';') or not line.strip():
continue
parts = line.split()
if len(parts) >= 4:
name = parts[0].rstrip('.')
if name.lower().endswith(f'.{tld}'):
domain_name = name[:-(len(tld) + 1)]
# Only root domains (no subdomains)
if domain_name and '.' not in domain_name:
domains.add(domain_name.lower())
return domains
except Exception as e:
logger.error(f"Failed to parse .{tld} zone file: {e}")
return set()
def parse_switch_zone(zone_file: Path, tld: str) -> set:
"""Parse Switch.ch AXFR output and extract unique root domains"""
domains = set()
try:
content = zone_file.read_text()
for line in content.splitlines():
if line.startswith(';') or not line.strip():
continue
parts = line.split()
if len(parts) >= 4:
name = parts[0].rstrip('.')
# Skip the TLD itself
if name.lower() == tld:
continue
if name.lower().endswith(f'.{tld}'):
domain_name = name[:-(len(tld) + 1)]
# Only root domains
if domain_name and '.' not in domain_name:
domains.add(domain_name.lower())
return domains
except Exception as e:
logger.error(f"Failed to parse .{tld} zone file: {e}")
return set()
def save_domains(tld: str, domains: Set[str], directory: Path) -> Path:
"""Save domain list to compressed file for storage efficiency"""
if COMPRESS_DOMAIN_LISTS:
out_file = directory / f"{tld}_domains.txt.gz"
# Remove old uncompressed file if exists
old_file = directory / f"{tld}_domains.txt"
if old_file.exists():
old_file.unlink()
# Write compressed
with gzip.open(out_file, 'wt', encoding='utf-8', compresslevel=9) as f:
f.write('\n'.join(sorted(domains)))
return out_file
else:
out_file = directory / f"{tld}_domains.txt"
out_file.write_text('\n'.join(sorted(domains)))
return out_file
def load_previous_domains(tld: str, directory: Path) -> Set[str]:
"""Load previous day's domain list (compressed or uncompressed)"""
# Try compressed first
gz_file = directory / f"{tld}_domains.txt.gz"
if gz_file.exists():
try:
with gzip.open(gz_file, 'rt', encoding='utf-8') as f:
return set(f.read().splitlines())
except Exception as e:
logger.warning(f"Failed to read compressed domains for .{tld}: {e}")
return set()
# Fallback to uncompressed
txt_file = directory / f"{tld}_domains.txt"
if txt_file.exists():
try:
return set(txt_file.read_text().splitlines())
except Exception:
return set()
return set()
def detect_drops(tld: str, today_domains: set, yesterday_domains: set) -> set:
"""Detect domains that were dropped (present yesterday, missing today)"""
if not yesterday_domains:
logger.info(f".{tld}: No previous data for comparison (first run)")
return set()
drops = yesterday_domains - today_domains
return drops
async def store_drops_in_db(drops: list[tuple[str, str]], session: AsyncSession):
"""Store dropped domains in database using existing DroppedDomain model"""
if not drops:
return 0
now = datetime.utcnow()
# Delete old drops (older than 48 hours)
cutoff = now - timedelta(hours=48)
await session.execute(
text("DELETE FROM dropped_domains WHERE dropped_date < :cutoff"),
{"cutoff": cutoff}
)
# Insert new drops
count = 0
for domain, tld in drops:
try:
# Calculate domain properties
length = len(domain)
is_numeric = domain.isdigit()
has_hyphen = '-' in domain
await session.execute(
text("""
INSERT OR REPLACE INTO dropped_domains
(domain, tld, dropped_date, length, is_numeric, has_hyphen, created_at)
VALUES (:domain, :tld, :dropped_date, :length, :is_numeric, :has_hyphen, :created_at)
"""),
{
"domain": domain,
"tld": tld,
"dropped_date": now,
"length": length,
"is_numeric": is_numeric,
"has_hyphen": has_hyphen,
"created_at": now
}
)
count += 1
except Exception as e:
logger.debug(f"Failed to insert drop {domain}.{tld}: {e}")
await session.commit()
return count
async def sync_czds_tld(tld: str) -> ZoneSyncResult:
"""Sync a single CZDS TLD"""
result = ZoneSyncResult(tld)
start = datetime.now()
try:
# Load previous domains for comparison
yesterday_domains = load_previous_domains(tld, CZDS_DIR)
# Download new zone file
gz_file = download_czds_zone(tld)
if not gz_file:
result.error = "Download failed"
return result
# Parse zone file
logger.info(f"Parsing .{tld} zone file...")
today_domains = parse_czds_zone(gz_file, tld)
if not today_domains:
result.error = "Parsing failed - no domains extracted"
return result
result.domain_count = len(today_domains)
# Detect drops
drops = detect_drops(tld, today_domains, yesterday_domains)
result.drops_count = len(drops)
# Save current domains for tomorrow's comparison
save_domains(tld, today_domains, CZDS_DIR)
# Cleanup gz file
if gz_file.exists():
gz_file.unlink()
# Update last download marker
marker = CZDS_DIR / f".{tld}_last_download"
marker.write_text(datetime.utcnow().isoformat())
result.success = True
logger.info(f"✅ .{tld}: {result.domain_count:,} domains, {result.drops_count:,} drops")
# Return drops for DB storage
result.drops = [(d, tld) for d in drops]
except Exception as e:
result.error = str(e)
logger.error(f"❌ .{tld} sync failed: {e}")
result.duration_seconds = (datetime.now() - start).total_seconds()
return result
async def sync_switch_tld(tld: str) -> ZoneSyncResult:
"""Sync a single Switch.ch TLD"""
result = ZoneSyncResult(tld)
start = datetime.now()
try:
# Load previous domains for comparison
yesterday_domains = load_previous_domains(tld, SWITCH_DIR)
# Download new zone file
zone_file = download_switch_zone(tld)
if not zone_file:
result.error = "AXFR failed"
return result
# Parse zone file
logger.info(f"Parsing .{tld} zone file...")
today_domains = parse_switch_zone(zone_file, tld)
if not today_domains:
result.error = "Parsing failed - no domains extracted"
return result
result.domain_count = len(today_domains)
# Detect drops
drops = detect_drops(tld, today_domains, yesterday_domains)
result.drops_count = len(drops)
# Save current domains for tomorrow's comparison
save_domains(tld, today_domains, SWITCH_DIR)
# Cleanup raw zone file (keep only domain list)
if zone_file.exists():
zone_file.unlink()
result.success = True
logger.info(f"✅ .{tld}: {result.domain_count:,} domains, {result.drops_count:,} drops")
# Return drops for DB storage
result.drops = [(d, tld) for d in drops]
except Exception as e:
result.error = str(e)
logger.error(f"❌ .{tld} sync failed: {e}")
result.duration_seconds = (datetime.now() - start).total_seconds()
return result
def cleanup_stray_files(directory: Path, keep_extensions: list = None):
"""Remove any stray/temporary files to save space"""
if keep_extensions is None:
keep_extensions = ['.txt.gz', '.txt'] # Only keep domain lists
removed_count = 0
removed_size = 0
for f in directory.iterdir():
if f.is_file():
# Keep marker files
if f.name.startswith('.'):
continue
# Keep domain list files
if any(f.name.endswith(ext) for ext in keep_extensions):
continue
# Remove everything else (raw zone files, temp files)
try:
size = f.stat().st_size
f.unlink()
removed_count += 1
removed_size += size
logger.info(f"🗑️ Removed stray file: {f.name} ({size / (1024*1024):.1f} MB)")
except Exception as e:
logger.warning(f"Failed to remove {f.name}: {e}")
return removed_count, removed_size
def get_directory_size(directory: Path) -> int:
"""Get total size of directory in bytes"""
total = 0
for f in directory.rglob('*'):
if f.is_file():
total += f.stat().st_size
return total
def log_storage_stats():
"""Log current storage usage"""
czds_size = get_directory_size(CZDS_DIR) if CZDS_DIR.exists() else 0
switch_size = get_directory_size(SWITCH_DIR) if SWITCH_DIR.exists() else 0
total = czds_size + switch_size
logger.info(f"💾 STORAGE: CZDS={czds_size/(1024*1024):.1f}MB, Switch={switch_size/(1024*1024):.1f}MB, Total={total/(1024*1024):.1f}MB")
return total
async def main():
"""Main sync process"""
logger.info("=" * 60)
logger.info("🚀 POUNCE ZONE SYNC - Starting daily synchronization")
logger.info("=" * 60)
start_time = datetime.now()
# Ensure directories exist
CZDS_DIR.mkdir(parents=True, exist_ok=True)
SWITCH_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
# Log initial storage
logger.info("\n📊 Initial storage check...")
initial_storage = log_storage_stats()
results = []
total_drops_stored = 0
# Helper to store drops immediately after each TLD
async def store_tld_drops(drops: list, tld: str):
nonlocal total_drops_stored
if not drops:
return 0
try:
session = await get_db_session()
stored = await store_drops_in_db(drops, session)
await session.close()
total_drops_stored += stored
logger.info(f" 💾 Stored {stored} .{tld} drops in database")
return stored
except Exception as e:
logger.error(f" ❌ Failed to store .{tld} drops: {e}")
return 0
# Sync CZDS TLDs (sequentially to respect rate limits)
logger.info("\n📦 Syncing ICANN CZDS zone files...")
for tld in CZDS_TLDS:
result = await sync_czds_tld(tld)
results.append(result)
# Store drops IMMEDIATELY after each TLD (crash-safe)
if hasattr(result, 'drops') and result.drops:
await store_tld_drops(result.drops, tld)
# Rate limit: wait between downloads
if tld != CZDS_TLDS[-1]:
logger.info("⏳ Waiting 5 seconds (rate limit)...")
await asyncio.sleep(5)
# Sync Switch.ch TLDs
logger.info("\n🇨🇭 Syncing Switch.ch zone files...")
for tld in ["ch", "li"]:
result = await sync_switch_tld(tld)
results.append(result)
# Store drops IMMEDIATELY
if hasattr(result, 'drops') and result.drops:
await store_tld_drops(result.drops, tld)
# Cleanup stray files
logger.info("\n🧹 Cleaning up temporary files...")
czds_removed, czds_freed = cleanup_stray_files(CZDS_DIR)
switch_removed, switch_freed = cleanup_stray_files(SWITCH_DIR)
total_freed = czds_freed + switch_freed
if total_freed > 0:
logger.info(f"✅ Freed {total_freed / (1024*1024):.1f} MB ({czds_removed + switch_removed} files)")
else:
logger.info("✅ No stray files found")
# Summary
duration = (datetime.now() - start_time).total_seconds()
logger.info("\n" + "=" * 60)
logger.info("📊 SYNC SUMMARY")
logger.info("=" * 60)
total_domains = 0
total_drops = 0
success_count = 0
for r in results:
status = "" if r.success else ""
logger.info(f" {status} .{r.tld}: {r.domain_count:,} domains, {r.drops_count:,} drops ({r.duration_seconds:.1f}s)")
if r.success:
total_domains += r.domain_count
total_drops += r.drops_count
success_count += 1
logger.info("-" * 60)
logger.info(f" TOTAL: {total_domains:,} domains across {success_count}/{len(results)} TLDs")
logger.info(f" DROPS: {total_drops:,} detected, {total_drops_stored:,} stored in DB")
logger.info(f" TIME: {duration:.1f} seconds")
# Final storage stats
logger.info("-" * 60)
final_storage = log_storage_stats()
if initial_storage > 0:
change = final_storage - initial_storage
logger.info(f" CHANGE: {'+' if change > 0 else ''}{change/(1024*1024):.1f} MB")
logger.info("=" * 60)
return 0 if success_count == len(results) else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)