""" Zone File Service for .ch and .li domains ========================================== Uses DNS AXFR zone transfer to fetch domain lists from Switch.ch Compares daily snapshots to find freshly dropped domains. Storage: We only store the diff (dropped/new domains) to minimize disk usage. """ import asyncio import hashlib import logging from datetime import datetime, timedelta from pathlib import Path from typing import Optional from sqlalchemy import select, func from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.sqlite import insert as sqlite_insert from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.models.zone_file import ZoneSnapshot, DroppedDomain logger = logging.getLogger(__name__) ZONE_SERVER = "zonedata.switch.ch" # ============================================================================ # ZONE FILE SERVICE # ============================================================================ class ZoneFileService: """Service for fetching and analyzing zone files""" def __init__(self, data_dir: Optional[Path] = None): settings = get_settings() self.data_dir = data_dir or Path(settings.switch_data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) self._settings = settings # Store daily snapshots for N days (premium reliability) self.snapshots_dir = self.data_dir / "snapshots" self.snapshots_dir.mkdir(parents=True, exist_ok=True) def _get_tsig_config(self, tld: str) -> dict: """Resolve TSIG config from settings/env (no secrets in git).""" if tld == "ch": return { "name": self._settings.switch_tsig_ch_name, "algorithm": self._settings.switch_tsig_ch_algorithm, "secret": self._settings.switch_tsig_ch_secret, } if tld == "li": return { "name": self._settings.switch_tsig_li_name, "algorithm": self._settings.switch_tsig_li_algorithm, "secret": self._settings.switch_tsig_li_secret, } raise ValueError(f"Unknown TLD: {tld}") def _get_key_file_path(self, tld: str) -> Path: """Generate TSIG key file for dig command""" key_path = self.data_dir / f"{tld}_zonedata.key" key_info = self._get_tsig_config(tld) if not (key_info.get("secret") or "").strip(): raise RuntimeError(f"Missing Switch TSIG secret for .{tld} (set SWITCH_TSIG_{tld.upper()}_SECRET)") # Write TSIG key file in BIND format key_content = f"""key "{key_info['name']}" {{ algorithm {key_info['algorithm']}; secret "{key_info['secret']}"; }}; """ key_path.write_text(key_content) return key_path async def fetch_zone_file(self, tld: str) -> set[str]: """ Fetch zone file via DNS AXFR transfer. Returns set of domain names (without TLD suffix). """ if tld not in ("ch", "li"): raise ValueError(f"Unsupported TLD: {tld}. Only 'ch' and 'li' are supported.") logger.info(f"Starting zone transfer for .{tld}") key_file = self._get_key_file_path(tld) # Build dig command cmd = [ "dig", "-k", str(key_file), f"@{ZONE_SERVER}", "+noall", "+answer", "+noidnout", "+onesoa", "AXFR", f"{tld}." ] try: # Run dig command (this can take a while for large zones) process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=600 # 10 minutes timeout for large zones ) if process.returncode != 0: error_msg = stderr.decode() if stderr else "Unknown error" logger.error(f"Zone transfer failed for .{tld}: {error_msg}") raise RuntimeError(f"Zone transfer failed: {error_msg}") # Parse output to extract domain names domains = set() for line in stdout.decode().splitlines(): parts = line.split() if len(parts) >= 1: domain = parts[0].rstrip('.') # Only include actual domain names (not the TLD itself) if domain and domain != tld and '.' in domain: # Extract just the name part (before the TLD) name = domain.rsplit('.', 1)[0] if name: domains.add(name.lower()) logger.info(f"Zone transfer complete for .{tld}: {len(domains)} domains") return domains except asyncio.TimeoutError: logger.error(f"Zone transfer timed out for .{tld}") raise RuntimeError("Zone transfer timed out") except FileNotFoundError: logger.error("dig command not found. Please install bind-utils or dnsutils.") raise RuntimeError("dig command not available") def compute_checksum(self, domains: set[str]) -> str: """Compute SHA256 checksum of sorted domain list""" sorted_domains = "\n".join(sorted(domains)) return hashlib.sha256(sorted_domains.encode()).hexdigest() async def get_previous_snapshot(self, db: AsyncSession, tld: str) -> Optional[set[str]]: """Load previous day's domain set from cache file""" # Prefer most recent snapshot file before today (supports N-day retention) tld_dir = self.snapshots_dir / tld if tld_dir.exists(): candidates = sorted([p for p in tld_dir.glob("*.domains.txt") if p.is_file()]) if candidates: # Pick the latest snapshot file (by name sort = date sort) latest = candidates[-1] try: content = latest.read_text() return set(line.strip() for line in content.splitlines() if line.strip()) except Exception as e: logger.warning(f"Failed to load snapshot for .{tld} from {latest.name}: {e}") # Fallback: legacy cache file cache_file = self.data_dir / f"{tld}_domains.txt" if cache_file.exists(): try: content = cache_file.read_text() return set(line.strip() for line in content.splitlines() if line.strip()) except Exception as e: logger.warning(f"Failed to load cache for .{tld}: {e}") return None def _cleanup_snapshot_files(self, tld: str) -> None: """Delete snapshot files older than retention window (best-effort).""" keep_days = int(self._settings.zone_retention_days or 3) cutoff = datetime.utcnow().date() - timedelta(days=keep_days) tld_dir = self.snapshots_dir / tld if not tld_dir.exists(): return for p in tld_dir.glob("*.domains.txt"): try: # filename: YYYY-MM-DD.domains.txt date_part = p.name.split(".")[0] snap_date = datetime.fromisoformat(date_part).date() if snap_date < cutoff: p.unlink(missing_ok=True) except Exception: # Don't let cleanup break sync continue async def save_snapshot(self, db: AsyncSession, tld: str, domains: set[str]): """Save current snapshot to cache and database""" # Save to legacy cache file (fast path) cache_file = self.data_dir / f"{tld}_domains.txt" cache_file.write_text("\n".join(sorted(domains))) # Save a daily snapshot file for retention/debugging tld_dir = self.snapshots_dir / tld tld_dir.mkdir(parents=True, exist_ok=True) today_str = datetime.utcnow().date().isoformat() snapshot_file = tld_dir / f"{today_str}.domains.txt" snapshot_file.write_text("\n".join(sorted(domains))) self._cleanup_snapshot_files(tld) # Save metadata to database checksum = self.compute_checksum(domains) snapshot = ZoneSnapshot( tld=tld, snapshot_date=datetime.utcnow(), domain_count=len(domains), checksum=checksum ) db.add(snapshot) await db.commit() logger.info(f"Saved snapshot for .{tld}: {len(domains)} domains") async def process_drops( self, db: AsyncSession, tld: str, previous: set[str], current: set[str] ) -> list[dict]: """ Find dropped domains and store them directly. NOTE: We do NOT verify availability via RDAP here to avoid rate limits/bans. Zone file diff is already a reliable signal that the domain was dropped. """ dropped = previous - current if not dropped: logger.info(f"No dropped domains found for .{tld}") return [] logger.info(f"Found {len(dropped):,} dropped domains for .{tld}, saving to database...") today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) dropped_list = list(dropped) rows = [ { "domain": name, "tld": tld, "dropped_date": today, "length": len(name), "is_numeric": name.isdigit(), "has_hyphen": "-" in name, "availability_status": "unknown", } for name in dropped_list ] # Bulk insert with conflict-ignore (needs unique index, see db_migrations.py) dialect = db.get_bind().dialect.name if db.get_bind() is not None else "unknown" batch_size = 5000 inserted_total = 0 for i in range(0, len(rows), batch_size): batch = rows[i : i + batch_size] if dialect == "postgresql": stmt = ( pg_insert(DroppedDomain) .values(batch) .on_conflict_do_nothing(index_elements=["domain", "tld", "dropped_date"]) ) elif dialect == "sqlite": # SQLite: INSERT OR IGNORE (unique index is still respected) stmt = sqlite_insert(DroppedDomain).values(batch).prefix_with("OR IGNORE") else: # Fallback: best-effort plain insert; duplicates are handled by DB constraints if present. stmt = pg_insert(DroppedDomain).values(batch) result = await db.execute(stmt) # rowcount is driver-dependent; still useful for postgres/sqlite inserted_total += int(getattr(result, "rowcount", 0) or 0) await db.commit() if (i + batch_size) % 20000 == 0: logger.info(f"Saved {min(i + batch_size, len(rows)):,}/{len(rows):,} drops (inserted so far: {inserted_total:,})") logger.info(f"Zone drops for .{tld}: {inserted_total:,} inserted (out of {len(rows):,} diff)") # Return a small preview list (avoid returning huge payloads) preview = [{"domain": f"{r['domain']}.{tld}", "length": r["length"]} for r in rows[:200]] return preview async def run_daily_sync(self, db: AsyncSession, tld: str) -> dict: """ Run daily zone file sync: 1. Fetch current zone file 2. Compare with previous snapshot 3. Store dropped domains 4. Save new snapshot """ logger.info(f"Starting daily sync for .{tld}") # Get previous snapshot previous = await self.get_previous_snapshot(db, tld) # Fetch current zone current = await self.fetch_zone_file(tld) result = { "tld": tld, "current_count": len(current), "previous_count": len(previous) if previous else 0, "dropped": [], "new_count": 0 } if previous: # Find dropped domains result["dropped"] = await self.process_drops(db, tld, previous, current) result["new_count"] = len(current - previous) # Save current snapshot await self.save_snapshot(db, tld, current) logger.info(f"Daily sync complete for .{tld}: {len(result['dropped'])} dropped, {result['new_count']} new") return result # ============================================================================ # API FUNCTIONS # ============================================================================ async def get_dropped_domains( db: AsyncSession, tld: Optional[str] = None, hours: int = 24, min_length: Optional[int] = None, max_length: Optional[int] = None, exclude_numeric: bool = False, exclude_hyphen: bool = False, keyword: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> dict: """ Get recently dropped domains with filters. Only returns drops from last 24-48h (we don't store older data). """ cutoff = datetime.utcnow() - timedelta(hours=hours) query = select(DroppedDomain).where(DroppedDomain.dropped_date >= cutoff) count_query = select(func.count(DroppedDomain.id)).where(DroppedDomain.dropped_date >= cutoff) if tld: query = query.where(DroppedDomain.tld == tld) count_query = count_query.where(DroppedDomain.tld == tld) if min_length: query = query.where(DroppedDomain.length >= min_length) count_query = count_query.where(DroppedDomain.length >= min_length) if max_length: query = query.where(DroppedDomain.length <= max_length) count_query = count_query.where(DroppedDomain.length <= max_length) if exclude_numeric: query = query.where(DroppedDomain.is_numeric == False) count_query = count_query.where(DroppedDomain.is_numeric == False) if exclude_hyphen: query = query.where(DroppedDomain.has_hyphen == False) count_query = count_query.where(DroppedDomain.has_hyphen == False) if keyword: query = query.where(DroppedDomain.domain.ilike(f"%{keyword}%")) count_query = count_query.where(DroppedDomain.domain.ilike(f"%{keyword}%")) # Get total count total_result = await db.execute(count_query) total = total_result.scalar() or 0 # Get items with pagination query = query.order_by(DroppedDomain.dropped_date.desc(), DroppedDomain.length) query = query.offset(offset).limit(limit) result = await db.execute(query) items = result.scalars().all() return { "total": total, "items": [ { "id": item.id, "domain": item.domain, "tld": item.tld, "dropped_date": item.dropped_date.isoformat(), "length": item.length, "is_numeric": item.is_numeric, "has_hyphen": item.has_hyphen, "availability_status": getattr(item, 'availability_status', 'unknown') or 'unknown', "last_status_check": item.last_status_check.isoformat() if getattr(item, 'last_status_check', None) else None, "deletion_date": item.deletion_date.isoformat() if getattr(item, 'deletion_date', None) else None, } for item in items ] } async def get_zone_stats(db: AsyncSession) -> dict: """Get zone file statistics""" # Get latest snapshots ch_query = select(ZoneSnapshot).where(ZoneSnapshot.tld == "ch").order_by(ZoneSnapshot.snapshot_date.desc()).limit(1) li_query = select(ZoneSnapshot).where(ZoneSnapshot.tld == "li").order_by(ZoneSnapshot.snapshot_date.desc()).limit(1) ch_result = await db.execute(ch_query) li_result = await db.execute(li_query) ch_snapshot = ch_result.scalar_one_or_none() li_snapshot = li_result.scalar_one_or_none() # Count drops from last 24h only cutoff_24h = datetime.utcnow() - timedelta(hours=24) drops_query = select(func.count(DroppedDomain.id)).where(DroppedDomain.dropped_date >= cutoff_24h) drops_result = await db.execute(drops_query) daily_drops = drops_result.scalar() or 0 return { "ch": { "domain_count": ch_snapshot.domain_count if ch_snapshot else 0, "last_sync": ch_snapshot.snapshot_date.isoformat() if ch_snapshot else None }, "li": { "domain_count": li_snapshot.domain_count if li_snapshot else 0, "last_sync": li_snapshot.snapshot_date.isoformat() if li_snapshot else None }, "daily_drops": daily_drops } async def cleanup_old_drops(db: AsyncSession, hours: int = 48) -> int: """ Delete dropped domains older than specified hours. Default: Keep only last 48h for safety margin (24h display + 24h buffer). Returns number of deleted records. """ from sqlalchemy import delete cutoff = datetime.utcnow() - timedelta(hours=hours) # Delete old drops stmt = delete(DroppedDomain).where(DroppedDomain.dropped_date < cutoff) result = await db.execute(stmt) await db.commit() deleted = result.rowcount if deleted > 0: logger.info(f"Cleaned up {deleted} old dropped domains (older than {hours}h)") return deleted async def cleanup_old_snapshots(db: AsyncSession, keep_days: int = 7) -> int: """ Delete zone snapshots older than specified days. Keep at least 7 days of metadata for debugging. Returns number of deleted records. """ from sqlalchemy import delete cutoff = datetime.utcnow() - timedelta(days=keep_days) stmt = delete(ZoneSnapshot).where(ZoneSnapshot.snapshot_date < cutoff) result = await db.execute(stmt) await db.commit() deleted = result.rowcount if deleted > 0: logger.info(f"Cleaned up {deleted} old zone snapshots (older than {keep_days}d)") return deleted async def verify_drops_availability( db: AsyncSession, batch_size: int = 50, max_checks: int = 200 ) -> dict: """ Verify availability of dropped domains and update their status. This runs periodically to check the real RDAP status of drops. Updates availability_status and deletion_date fields. Rate limited: ~200ms between requests = ~5 req/sec Args: db: Database session batch_size: Number of domains to check per batch max_checks: Maximum domains to check per run (to avoid overload) Returns: dict with stats: checked, available, dropping_soon, taken, errors """ from sqlalchemy import update, bindparam, case from app.services.drop_status_checker import check_drops_batch from app.config import get_settings logger.info(f"Starting drops status update (max {max_checks} checks)...") # Get drops that haven't been checked recently (prioritize unchecked and short domains) cutoff = datetime.utcnow() - timedelta(hours=24) check_cutoff = datetime.utcnow() - timedelta(hours=2) # Re-check every 2 hours # Prioritization (fast + predictable): # 1) never checked first # 2) then oldest check first # 3) then unknown status # 4) then shortest domains first unknown_first = case((DroppedDomain.availability_status == "unknown", 0), else_=1) never_checked_first = case((DroppedDomain.last_status_check.is_(None), 0), else_=1) query = ( select(DroppedDomain) .where(DroppedDomain.dropped_date >= cutoff) .where( (DroppedDomain.last_status_check.is_(None)) # Never checked | (DroppedDomain.last_status_check < check_cutoff) # Not checked recently ) .order_by( never_checked_first.asc(), DroppedDomain.last_status_check.asc().nullsfirst(), unknown_first.asc(), DroppedDomain.length.asc(), ) .limit(max_checks) ) result = await db.execute(query) drops = result.scalars().all() if not drops: logger.info("No drops need status update") return {"checked": 0, "available": 0, "dropping_soon": 0, "taken": 0, "errors": 0} checked = 0 stats = {"available": 0, "dropping_soon": 0, "taken": 0, "unknown": 0} errors = 0 logger.info(f"Checking {len(drops)} dropped domains (batch mode)...") settings = get_settings() delay = float(getattr(settings, "domain_check_delay_seconds", 0.3) or 0.3) max_concurrent = int(getattr(settings, "domain_check_max_concurrent", 3) or 3) # Build (drop_id, domain) tuples for batch checker domain_tuples: list[tuple[int, str]] = [(d.id, f"{d.domain}.{d.tld}") for d in drops] # Process in batches to bound memory + keep DB commits reasonable now = datetime.utcnow() for start in range(0, len(domain_tuples), batch_size): batch = domain_tuples[start : start + batch_size] results = await check_drops_batch( batch, delay_between_requests=delay, max_concurrent=max_concurrent, ) # Prepare bulk updates updates: list[dict] = [] for drop_id, status_result in results: checked += 1 stats[status_result.status] = stats.get(status_result.status, 0) + 1 updates.append( { "id": drop_id, "availability_status": status_result.status, "rdap_status": str(status_result.rdap_status)[:255] if status_result.rdap_status else None, "last_status_check": now, "deletion_date": status_result.deletion_date, } ) # Bulk update using executemany stmt = ( update(DroppedDomain) .where(DroppedDomain.id == bindparam("id")) .values( availability_status=bindparam("availability_status"), rdap_status=bindparam("rdap_status"), last_status_check=bindparam("last_status_check"), deletion_date=bindparam("deletion_date"), ) ) await db.execute(stmt, updates) await db.commit() logger.info(f"Checked {min(start + batch_size, len(domain_tuples))}/{len(domain_tuples)}: {stats}") # Final commit # (already committed per batch) logger.info( f"Drops status update complete: " f"{checked} checked, {stats['available']} available, " f"{stats['dropping_soon']} dropping_soon, {stats['taken']} taken, {errors} errors" ) return { "checked": checked, "available": stats['available'], "dropping_soon": stats['dropping_soon'], "taken": stats['taken'], "errors": errors }