Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Previously, if download succeeded but file wasn't found, function returned None immediately. Now raises FileNotFoundError to trigger the retry logic properly.
607 lines
20 KiB
Python
607 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Pounce Zone File Sync - Daily Automated Zone File Synchronization
|
|
|
|
This script:
|
|
1. Downloads zone files from ICANN CZDS (app, dev, info, online, org, xyz)
|
|
2. Downloads zone files from Switch.ch via AXFR (.ch, .li)
|
|
3. Compares with yesterday's data to detect drops
|
|
4. Stores drops in the database for the Drops tab
|
|
5. Cleans up all temporary files and compresses domain lists
|
|
|
|
STORAGE STRATEGY (Ultra-Efficient):
|
|
- Raw zone files: DELETED immediately after parsing
|
|
- Domain lists: Stored COMPRESSED (.gz) - ~80% size reduction
|
|
- Only keeps current snapshot (no history)
|
|
- Drops stored in DB for 48h only
|
|
|
|
Run daily via cron at 06:00 UTC (after most registries update)
|
|
"""
|
|
|
|
import asyncio
|
|
import gzip
|
|
import logging
|
|
import subprocess
|
|
import sys
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Set
|
|
|
|
# Add parent to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
# Configuration
|
|
CZDS_DIR = Path("/home/user/pounce_czds")
|
|
SWITCH_DIR = Path("/home/user/pounce_switch")
|
|
LOG_FILE = Path("/home/user/logs/zone_sync.log")
|
|
|
|
# Storage efficiency: compress domain lists
|
|
COMPRESS_DOMAIN_LISTS = True
|
|
|
|
# CZDS TLDs we have access to
|
|
CZDS_TLDS = ["app", "dev", "info", "online", "org", "xyz"]
|
|
|
|
# Switch.ch AXFR config
|
|
SWITCH_CONFIG = {
|
|
"ch": {
|
|
"server": "zonedata.switch.ch",
|
|
"key_name": "tsig-zonedata-ch-public-21-01.",
|
|
"key_secret": "stZwEGApYumtXkh73qMLPqfbIDozWKZLkqRvcjKSpRnsor6A6MxixRL6C2HeSVBQNfMW4wer+qjS0ZSfiWiJ3Q=="
|
|
},
|
|
"li": {
|
|
"server": "zonedata.switch.ch",
|
|
"key_name": "tsig-zonedata-li-public-21-01.",
|
|
"key_secret": "t8GgeCn+fhPaj+cRy/lakQPb6M45xz/NZwmcp4iqbBxKFCCH0/k3xNGe6sf3ObmoaKDBedge/La4cpPfLqtFkw=="
|
|
}
|
|
}
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler(LOG_FILE) if LOG_FILE.parent.exists() else logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ZoneSyncResult:
|
|
"""Result of a zone sync operation"""
|
|
def __init__(self, tld: str):
|
|
self.tld = tld
|
|
self.success = False
|
|
self.domain_count = 0
|
|
self.drops_count = 0
|
|
self.error: Optional[str] = None
|
|
self.duration_seconds = 0
|
|
|
|
|
|
async def get_db_session():
|
|
"""Create async database session"""
|
|
from app.config import settings
|
|
|
|
engine = create_async_engine(settings.database_url.replace("sqlite://", "sqlite+aiosqlite://"))
|
|
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
return async_session()
|
|
|
|
|
|
def download_czds_zone(tld: str, max_retries: int = 3) -> Optional[Path]:
|
|
"""Download a single CZDS zone file using pyCZDS with retry logic"""
|
|
import time
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
from pyczds.client import CZDSClient
|
|
|
|
# Read credentials from .env
|
|
env_file = Path(__file__).parent.parent / ".env"
|
|
if not env_file.exists():
|
|
env_file = Path("/home/user/pounce/backend/.env")
|
|
|
|
env_content = env_file.read_text()
|
|
username = password = None
|
|
|
|
for line in env_content.splitlines():
|
|
if line.startswith("CZDS_USERNAME="):
|
|
username = line.split("=", 1)[1].strip()
|
|
elif line.startswith("CZDS_PASSWORD="):
|
|
password = line.split("=", 1)[1].strip()
|
|
|
|
if not username or not password:
|
|
logger.error(f"CZDS credentials not found in .env")
|
|
return None
|
|
|
|
client = CZDSClient(username, password)
|
|
urls = client.get_zonefiles_list()
|
|
|
|
# Find URL for this TLD
|
|
target_url = None
|
|
for url in urls:
|
|
if f"{tld}.zone" in url or f"/{tld}." in url:
|
|
target_url = url
|
|
break
|
|
|
|
if not target_url:
|
|
logger.warning(f"No access to .{tld} zone file")
|
|
return None
|
|
|
|
logger.info(f"Downloading .{tld} from CZDS... (attempt {attempt + 1}/{max_retries})")
|
|
result = client.get_zonefile(target_url, download_dir=str(CZDS_DIR))
|
|
|
|
# Find the downloaded file
|
|
gz_file = CZDS_DIR / f"{tld}.txt.gz"
|
|
if gz_file.exists():
|
|
return gz_file
|
|
|
|
# Try alternative naming (pyCZDS sometimes uses different names)
|
|
for f in CZDS_DIR.glob(f"*{tld}*.gz"):
|
|
return f
|
|
|
|
# File not found after download - raise exception to trigger retry
|
|
raise FileNotFoundError(f"Downloaded file not found for .{tld} in {CZDS_DIR}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"CZDS download attempt {attempt + 1} failed for .{tld}: {e}")
|
|
if attempt < max_retries - 1:
|
|
wait_time = (attempt + 1) * 30 # 30s, 60s, 90s backoff
|
|
logger.info(f"Retrying in {wait_time}s...")
|
|
time.sleep(wait_time)
|
|
else:
|
|
logger.error(f"CZDS download failed for .{tld} after {max_retries} attempts")
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def download_switch_zone(tld: str) -> Optional[Path]:
|
|
"""Download zone file from Switch.ch via AXFR"""
|
|
config = SWITCH_CONFIG.get(tld)
|
|
if not config:
|
|
return None
|
|
|
|
try:
|
|
output_file = SWITCH_DIR / f"{tld}_zone.txt"
|
|
|
|
cmd = [
|
|
"dig", "@" + config["server"],
|
|
f"{tld}.", "AXFR",
|
|
"-y", f"hmac-sha512:{config['key_name']}:{config['key_secret']}"
|
|
]
|
|
|
|
logger.info(f"Downloading .{tld} via AXFR from Switch.ch...")
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
|
|
|
|
if result.returncode != 0:
|
|
logger.error(f"AXFR failed for .{tld}: {result.stderr}")
|
|
return None
|
|
|
|
output_file.write_text(result.stdout)
|
|
return output_file
|
|
|
|
except subprocess.TimeoutExpired:
|
|
logger.error(f"AXFR timeout for .{tld}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"AXFR failed for .{tld}: {e}")
|
|
return None
|
|
|
|
|
|
def parse_czds_zone(gz_file: Path, tld: str) -> set:
|
|
"""Parse a gzipped CZDS zone file and extract unique root domains"""
|
|
domains = set()
|
|
|
|
try:
|
|
with gzip.open(gz_file, 'rt', encoding='utf-8', errors='ignore') as f:
|
|
for line in f:
|
|
if line.startswith(';') or not line.strip():
|
|
continue
|
|
|
|
parts = line.split()
|
|
if len(parts) >= 4:
|
|
name = parts[0].rstrip('.')
|
|
|
|
if name.lower().endswith(f'.{tld}'):
|
|
domain_name = name[:-(len(tld) + 1)]
|
|
|
|
# Only root domains (no subdomains)
|
|
if domain_name and '.' not in domain_name:
|
|
domains.add(domain_name.lower())
|
|
|
|
return domains
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse .{tld} zone file: {e}")
|
|
return set()
|
|
|
|
|
|
def parse_switch_zone(zone_file: Path, tld: str) -> set:
|
|
"""Parse Switch.ch AXFR output and extract unique root domains"""
|
|
domains = set()
|
|
|
|
try:
|
|
content = zone_file.read_text()
|
|
|
|
for line in content.splitlines():
|
|
if line.startswith(';') or not line.strip():
|
|
continue
|
|
|
|
parts = line.split()
|
|
if len(parts) >= 4:
|
|
name = parts[0].rstrip('.')
|
|
|
|
# Skip the TLD itself
|
|
if name.lower() == tld:
|
|
continue
|
|
|
|
if name.lower().endswith(f'.{tld}'):
|
|
domain_name = name[:-(len(tld) + 1)]
|
|
|
|
# Only root domains
|
|
if domain_name and '.' not in domain_name:
|
|
domains.add(domain_name.lower())
|
|
|
|
return domains
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse .{tld} zone file: {e}")
|
|
return set()
|
|
|
|
|
|
def save_domains(tld: str, domains: Set[str], directory: Path) -> Path:
|
|
"""Save domain list to compressed file for storage efficiency"""
|
|
if COMPRESS_DOMAIN_LISTS:
|
|
out_file = directory / f"{tld}_domains.txt.gz"
|
|
# Remove old uncompressed file if exists
|
|
old_file = directory / f"{tld}_domains.txt"
|
|
if old_file.exists():
|
|
old_file.unlink()
|
|
# Write compressed
|
|
with gzip.open(out_file, 'wt', encoding='utf-8', compresslevel=9) as f:
|
|
f.write('\n'.join(sorted(domains)))
|
|
return out_file
|
|
else:
|
|
out_file = directory / f"{tld}_domains.txt"
|
|
out_file.write_text('\n'.join(sorted(domains)))
|
|
return out_file
|
|
|
|
|
|
def load_previous_domains(tld: str, directory: Path) -> Set[str]:
|
|
"""Load previous day's domain list (compressed or uncompressed)"""
|
|
# Try compressed first
|
|
gz_file = directory / f"{tld}_domains.txt.gz"
|
|
if gz_file.exists():
|
|
try:
|
|
with gzip.open(gz_file, 'rt', encoding='utf-8') as f:
|
|
return set(f.read().splitlines())
|
|
except Exception as e:
|
|
logger.warning(f"Failed to read compressed domains for .{tld}: {e}")
|
|
return set()
|
|
|
|
# Fallback to uncompressed
|
|
txt_file = directory / f"{tld}_domains.txt"
|
|
if txt_file.exists():
|
|
try:
|
|
return set(txt_file.read_text().splitlines())
|
|
except Exception:
|
|
return set()
|
|
|
|
return set()
|
|
|
|
|
|
def detect_drops(tld: str, today_domains: set, yesterday_domains: set) -> set:
|
|
"""Detect domains that were dropped (present yesterday, missing today)"""
|
|
if not yesterday_domains:
|
|
logger.info(f".{tld}: No previous data for comparison (first run)")
|
|
return set()
|
|
|
|
drops = yesterday_domains - today_domains
|
|
return drops
|
|
|
|
|
|
async def store_drops_in_db(drops: list[tuple[str, str]], session: AsyncSession):
|
|
"""Store dropped domains in database using existing DroppedDomain model"""
|
|
if not drops:
|
|
return 0
|
|
|
|
now = datetime.utcnow()
|
|
|
|
# Delete old drops (older than 48 hours)
|
|
cutoff = now - timedelta(hours=48)
|
|
await session.execute(
|
|
text("DELETE FROM dropped_domains WHERE dropped_date < :cutoff"),
|
|
{"cutoff": cutoff}
|
|
)
|
|
|
|
# Insert new drops
|
|
count = 0
|
|
for domain, tld in drops:
|
|
try:
|
|
# Calculate domain properties
|
|
length = len(domain)
|
|
is_numeric = domain.isdigit()
|
|
has_hyphen = '-' in domain
|
|
|
|
await session.execute(
|
|
text("""
|
|
INSERT OR REPLACE INTO dropped_domains
|
|
(domain, tld, dropped_date, length, is_numeric, has_hyphen, created_at)
|
|
VALUES (:domain, :tld, :dropped_date, :length, :is_numeric, :has_hyphen, :created_at)
|
|
"""),
|
|
{
|
|
"domain": domain,
|
|
"tld": tld,
|
|
"dropped_date": now,
|
|
"length": length,
|
|
"is_numeric": is_numeric,
|
|
"has_hyphen": has_hyphen,
|
|
"created_at": now
|
|
}
|
|
)
|
|
count += 1
|
|
except Exception as e:
|
|
logger.debug(f"Failed to insert drop {domain}.{tld}: {e}")
|
|
|
|
await session.commit()
|
|
return count
|
|
|
|
|
|
async def sync_czds_tld(tld: str) -> ZoneSyncResult:
|
|
"""Sync a single CZDS TLD"""
|
|
result = ZoneSyncResult(tld)
|
|
start = datetime.now()
|
|
|
|
try:
|
|
# Load previous domains for comparison
|
|
yesterday_domains = load_previous_domains(tld, CZDS_DIR)
|
|
|
|
# Download new zone file
|
|
gz_file = download_czds_zone(tld)
|
|
if not gz_file:
|
|
result.error = "Download failed"
|
|
return result
|
|
|
|
# Parse zone file
|
|
logger.info(f"Parsing .{tld} zone file...")
|
|
today_domains = parse_czds_zone(gz_file, tld)
|
|
|
|
if not today_domains:
|
|
result.error = "Parsing failed - no domains extracted"
|
|
return result
|
|
|
|
result.domain_count = len(today_domains)
|
|
|
|
# Detect drops
|
|
drops = detect_drops(tld, today_domains, yesterday_domains)
|
|
result.drops_count = len(drops)
|
|
|
|
# Save current domains for tomorrow's comparison
|
|
save_domains(tld, today_domains, CZDS_DIR)
|
|
|
|
# Cleanup gz file
|
|
if gz_file.exists():
|
|
gz_file.unlink()
|
|
|
|
# Update last download marker
|
|
marker = CZDS_DIR / f".{tld}_last_download"
|
|
marker.write_text(datetime.utcnow().isoformat())
|
|
|
|
result.success = True
|
|
logger.info(f"✅ .{tld}: {result.domain_count:,} domains, {result.drops_count:,} drops")
|
|
|
|
# Return drops for DB storage
|
|
result.drops = [(d, tld) for d in drops]
|
|
|
|
except Exception as e:
|
|
result.error = str(e)
|
|
logger.error(f"❌ .{tld} sync failed: {e}")
|
|
|
|
result.duration_seconds = (datetime.now() - start).total_seconds()
|
|
return result
|
|
|
|
|
|
async def sync_switch_tld(tld: str) -> ZoneSyncResult:
|
|
"""Sync a single Switch.ch TLD"""
|
|
result = ZoneSyncResult(tld)
|
|
start = datetime.now()
|
|
|
|
try:
|
|
# Load previous domains for comparison
|
|
yesterday_domains = load_previous_domains(tld, SWITCH_DIR)
|
|
|
|
# Download new zone file
|
|
zone_file = download_switch_zone(tld)
|
|
if not zone_file:
|
|
result.error = "AXFR failed"
|
|
return result
|
|
|
|
# Parse zone file
|
|
logger.info(f"Parsing .{tld} zone file...")
|
|
today_domains = parse_switch_zone(zone_file, tld)
|
|
|
|
if not today_domains:
|
|
result.error = "Parsing failed - no domains extracted"
|
|
return result
|
|
|
|
result.domain_count = len(today_domains)
|
|
|
|
# Detect drops
|
|
drops = detect_drops(tld, today_domains, yesterday_domains)
|
|
result.drops_count = len(drops)
|
|
|
|
# Save current domains for tomorrow's comparison
|
|
save_domains(tld, today_domains, SWITCH_DIR)
|
|
|
|
# Cleanup raw zone file (keep only domain list)
|
|
if zone_file.exists():
|
|
zone_file.unlink()
|
|
|
|
result.success = True
|
|
logger.info(f"✅ .{tld}: {result.domain_count:,} domains, {result.drops_count:,} drops")
|
|
|
|
# Return drops for DB storage
|
|
result.drops = [(d, tld) for d in drops]
|
|
|
|
except Exception as e:
|
|
result.error = str(e)
|
|
logger.error(f"❌ .{tld} sync failed: {e}")
|
|
|
|
result.duration_seconds = (datetime.now() - start).total_seconds()
|
|
return result
|
|
|
|
|
|
def cleanup_stray_files(directory: Path, keep_extensions: list = None):
|
|
"""Remove any stray/temporary files to save space"""
|
|
if keep_extensions is None:
|
|
keep_extensions = ['.txt.gz', '.txt'] # Only keep domain lists
|
|
|
|
removed_count = 0
|
|
removed_size = 0
|
|
|
|
for f in directory.iterdir():
|
|
if f.is_file():
|
|
# Keep marker files
|
|
if f.name.startswith('.'):
|
|
continue
|
|
# Keep domain list files
|
|
if any(f.name.endswith(ext) for ext in keep_extensions):
|
|
continue
|
|
# Remove everything else (raw zone files, temp files)
|
|
try:
|
|
size = f.stat().st_size
|
|
f.unlink()
|
|
removed_count += 1
|
|
removed_size += size
|
|
logger.info(f"🗑️ Removed stray file: {f.name} ({size / (1024*1024):.1f} MB)")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove {f.name}: {e}")
|
|
|
|
return removed_count, removed_size
|
|
|
|
|
|
def get_directory_size(directory: Path) -> int:
|
|
"""Get total size of directory in bytes"""
|
|
total = 0
|
|
for f in directory.rglob('*'):
|
|
if f.is_file():
|
|
total += f.stat().st_size
|
|
return total
|
|
|
|
|
|
def log_storage_stats():
|
|
"""Log current storage usage"""
|
|
czds_size = get_directory_size(CZDS_DIR) if CZDS_DIR.exists() else 0
|
|
switch_size = get_directory_size(SWITCH_DIR) if SWITCH_DIR.exists() else 0
|
|
total = czds_size + switch_size
|
|
|
|
logger.info(f"💾 STORAGE: CZDS={czds_size/(1024*1024):.1f}MB, Switch={switch_size/(1024*1024):.1f}MB, Total={total/(1024*1024):.1f}MB")
|
|
return total
|
|
|
|
|
|
async def main():
|
|
"""Main sync process"""
|
|
logger.info("=" * 60)
|
|
logger.info("🚀 POUNCE ZONE SYNC - Starting daily synchronization")
|
|
logger.info("=" * 60)
|
|
|
|
start_time = datetime.now()
|
|
|
|
# Ensure directories exist
|
|
CZDS_DIR.mkdir(parents=True, exist_ok=True)
|
|
SWITCH_DIR.mkdir(parents=True, exist_ok=True)
|
|
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Log initial storage
|
|
logger.info("\n📊 Initial storage check...")
|
|
initial_storage = log_storage_stats()
|
|
|
|
all_drops = []
|
|
results = []
|
|
|
|
# Sync CZDS TLDs (sequentially to respect rate limits)
|
|
logger.info("\n📦 Syncing ICANN CZDS zone files...")
|
|
for tld in CZDS_TLDS:
|
|
result = await sync_czds_tld(tld)
|
|
results.append(result)
|
|
if hasattr(result, 'drops'):
|
|
all_drops.extend(result.drops)
|
|
|
|
# Rate limit: wait between downloads
|
|
if tld != CZDS_TLDS[-1]:
|
|
logger.info("⏳ Waiting 5 seconds (rate limit)...")
|
|
await asyncio.sleep(5)
|
|
|
|
# Sync Switch.ch TLDs
|
|
logger.info("\n🇨🇭 Syncing Switch.ch zone files...")
|
|
for tld in ["ch", "li"]:
|
|
result = await sync_switch_tld(tld)
|
|
results.append(result)
|
|
if hasattr(result, 'drops'):
|
|
all_drops.extend(result.drops)
|
|
|
|
# Store drops in database
|
|
if all_drops:
|
|
logger.info(f"\n💾 Storing {len(all_drops)} drops in database...")
|
|
try:
|
|
session = await get_db_session()
|
|
stored = await store_drops_in_db(all_drops, session)
|
|
await session.close()
|
|
logger.info(f"✅ Stored {stored} drops in database")
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to store drops: {e}")
|
|
|
|
# Cleanup stray files
|
|
logger.info("\n🧹 Cleaning up temporary files...")
|
|
czds_removed, czds_freed = cleanup_stray_files(CZDS_DIR)
|
|
switch_removed, switch_freed = cleanup_stray_files(SWITCH_DIR)
|
|
total_freed = czds_freed + switch_freed
|
|
if total_freed > 0:
|
|
logger.info(f"✅ Freed {total_freed / (1024*1024):.1f} MB ({czds_removed + switch_removed} files)")
|
|
else:
|
|
logger.info("✅ No stray files found")
|
|
|
|
# Summary
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("📊 SYNC SUMMARY")
|
|
logger.info("=" * 60)
|
|
|
|
total_domains = 0
|
|
total_drops = 0
|
|
success_count = 0
|
|
|
|
for r in results:
|
|
status = "✅" if r.success else "❌"
|
|
logger.info(f" {status} .{r.tld}: {r.domain_count:,} domains, {r.drops_count:,} drops ({r.duration_seconds:.1f}s)")
|
|
if r.success:
|
|
total_domains += r.domain_count
|
|
total_drops += r.drops_count
|
|
success_count += 1
|
|
|
|
logger.info("-" * 60)
|
|
logger.info(f" TOTAL: {total_domains:,} domains across {success_count}/{len(results)} TLDs")
|
|
logger.info(f" DROPS: {total_drops:,} new drops detected")
|
|
logger.info(f" TIME: {duration:.1f} seconds")
|
|
|
|
# Final storage stats
|
|
logger.info("-" * 60)
|
|
final_storage = log_storage_stats()
|
|
if initial_storage > 0:
|
|
change = final_storage - initial_storage
|
|
logger.info(f" CHANGE: {'+' if change > 0 else ''}{change/(1024*1024):.1f} MB")
|
|
logger.info("=" * 60)
|
|
|
|
return 0 if success_count == len(results) else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit_code = asyncio.run(main())
|
|
sys.exit(exit_code)
|