""" Drop Status Checker ==================== Dedicated RDAP checker for dropped domains. Correctly identifies pending_delete, redemption, and available status. Extracts deletion date for countdown display. Uses IANA Bootstrap (rdap.org) as universal fallback for all TLDs. """ import asyncio import httpx import logging from dataclasses import dataclass from datetime import datetime from typing import Optional from app.services.http_client_pool import get_rdap_http_client logger = logging.getLogger(__name__) # ============================================================================ # RDAP CONFIGURATION # ============================================================================ # Preferred direct endpoints (faster, more reliable) PREFERRED_ENDPOINTS = { 'ch': 'https://rdap.nic.ch/domain/', 'li': 'https://rdap.nic.ch/domain/', 'de': 'https://rdap.denic.de/domain/', } # IANA Bootstrap - works for ALL TLDs (redirects to correct registry) IANA_BOOTSTRAP = 'https://rdap.org/domain/' # Rate limiting settings RDAP_TIMEOUT = 15 # seconds RATE_LIMIT_DELAY = 0.3 # 300ms between requests = ~3 req/s @dataclass class DropStatus: """Status of a dropped domain.""" domain: str status: str # 'available', 'dropping_soon', 'taken', 'unknown' rdap_status: list[str] can_register_now: bool should_monitor: bool message: str deletion_date: Optional[datetime] = None check_method: str = "rdap" async def _make_rdap_request(client: httpx.AsyncClient, url: str, domain: str) -> Optional[dict]: """Make a single RDAP request with proper error handling.""" try: resp = await client.get(url, timeout=RDAP_TIMEOUT) if resp.status_code == 404: # Domain not found = available return {"_available": True, "_status_code": 404} if resp.status_code == 200: data = resp.json() data["_status_code"] = 200 return data if resp.status_code == 429: logger.warning(f"RDAP rate limited for {domain}") return {"_rate_limited": True, "_status_code": 429} logger.warning(f"RDAP returned {resp.status_code} for {domain}") return None except httpx.TimeoutException: logger.debug(f"RDAP timeout for {domain} at {url}") return None except Exception as e: logger.debug(f"RDAP error for {domain}: {e}") return None async def check_drop_status(domain: str) -> DropStatus: """ Check the real status of a dropped domain via RDAP. Strategy: 1. Try preferred direct endpoint (if available for TLD) 2. Fall back to IANA Bootstrap (works for all TLDs) Returns: DropStatus with one of: - 'available': Domain can be registered NOW - 'dropping_soon': Domain is in pending delete/redemption - 'taken': Domain was re-registered - 'unknown': Could not determine status """ tld = domain.split('.')[-1].lower() # Try preferred endpoint first data = None check_method = "rdap" client = await get_rdap_http_client() if tld in PREFERRED_ENDPOINTS: url = f"{PREFERRED_ENDPOINTS[tld]}{domain}" data = await _make_rdap_request(client, url, domain) check_method = f"rdap_{tld}" # Fall back to IANA Bootstrap if no data yet if data is None: url = f"{IANA_BOOTSTRAP}{domain}" data = await _make_rdap_request(client, url, domain) check_method = "rdap_iana" # Still no data? Return unknown if data is None: return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=True, message="RDAP check failed - will retry later", check_method="failed", ) # Rate limited if data.get("_rate_limited"): return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=True, message="Rate limited - will retry later", check_method="rate_limited", ) # Domain available (404) if data.get("_available"): return DropStatus( domain=domain, status='available', rdap_status=[], can_register_now=True, should_monitor=False, message="Domain is available for registration!", check_method=check_method, ) # Domain exists - parse status rdap_status = data.get('status', []) status_lower = ' '.join(str(s).lower() for s in rdap_status) # Extract deletion date from events deletion_date = None events = data.get('events', []) for event in events: action = event.get('eventAction', '').lower() date_str = event.get('eventDate', '') if action in ('deletion', 'expiration') and date_str: try: deletion_date = datetime.fromisoformat(date_str.replace('Z', '+00:00')) except (ValueError, TypeError): pass # Check for pending delete / redemption status is_pending = any(x in status_lower for x in [ 'pending delete', 'pendingdelete', 'pending purge', 'pendingpurge', 'redemption period', 'redemptionperiod', 'pending restore', 'pendingrestore', 'pending renewal', 'pendingrenewal', ]) if is_pending: return DropStatus( domain=domain, status='dropping_soon', rdap_status=rdap_status, can_register_now=False, should_monitor=True, message="Domain is being deleted. Track it to get notified!", deletion_date=deletion_date, check_method=check_method, ) # Domain is actively registered return DropStatus( domain=domain, status='taken', rdap_status=rdap_status, can_register_now=False, should_monitor=False, message="Domain was re-registered", deletion_date=None, check_method=check_method, ) async def check_drops_batch( domains: list[tuple[int, str]], delay_between_requests: float = RATE_LIMIT_DELAY, max_concurrent: int = 3, ) -> list[tuple[int, DropStatus]]: """ Check multiple drops with rate limiting and concurrency control. Args: domains: List of (drop_id, full_domain) tuples delay_between_requests: Seconds to wait between requests max_concurrent: Maximum concurrent requests Returns: List of (drop_id, DropStatus) tuples """ semaphore = asyncio.Semaphore(max_concurrent) results = [] async def check_with_semaphore(drop_id: int, domain: str) -> tuple[int, DropStatus]: async with semaphore: try: status = await check_drop_status(domain) await asyncio.sleep(delay_between_requests) return (drop_id, status) except Exception as e: logger.error(f"Batch check failed for {domain}: {e}") return (drop_id, DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message=str(e), check_method="error", )) # Run with limited concurrency tasks = [check_with_semaphore(drop_id, domain) for drop_id, domain in domains] results = await asyncio.gather(*tasks) return list(results)