""" Drop Status Checker ==================== Dedicated RDAP checker for dropped domains. Correctly identifies pending_delete, redemption, and available status. Extracts deletion date for countdown display. """ import asyncio import httpx import logging from dataclasses import dataclass from datetime import datetime from typing import Optional logger = logging.getLogger(__name__) # RDAP endpoints for different TLDs RDAP_ENDPOINTS = { # ccTLDs 'ch': 'https://rdap.nic.ch/domain/', 'li': 'https://rdap.nic.ch/domain/', 'de': 'https://rdap.denic.de/domain/', # gTLDs via CentralNic 'online': 'https://rdap.centralnic.com/online/domain/', 'xyz': 'https://rdap.centralnic.com/xyz/domain/', 'club': 'https://rdap.nic.club/domain/', # gTLDs via Afilias/Donuts 'info': 'https://rdap.afilias.net/rdap/info/domain/', 'biz': 'https://rdap.afilias.net/rdap/biz/domain/', 'org': 'https://rdap.publicinterestregistry.org/rdap/org/domain/', # Google TLDs 'dev': 'https://rdap.nic.google/domain/', 'app': 'https://rdap.nic.google/domain/', } @dataclass class DropStatus: """Status of a dropped domain.""" domain: str status: str # 'available', 'dropping_soon', 'taken', 'unknown' rdap_status: list[str] can_register_now: bool should_monitor: bool message: str deletion_date: Optional[datetime] = None # When domain will be fully deleted async def check_drop_status(domain: str) -> DropStatus: """ Check the real status of a dropped domain via RDAP. Returns: DropStatus with one of: - 'available': Domain can be registered NOW - 'dropping_soon': Domain is in pending delete/redemption (monitor it!) - 'taken': Domain was re-registered - 'unknown': Could not determine status """ tld = domain.split('.')[-1].lower() endpoint = RDAP_ENDPOINTS.get(tld) if not endpoint: # Try generic lookup logger.warning(f"No RDAP endpoint for .{tld}, returning unknown") return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message=f"No RDAP endpoint for .{tld}" ) url = f"{endpoint}{domain}" try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(url) # 404 = Domain not found = AVAILABLE! if resp.status_code == 404: return DropStatus( domain=domain, status='available', rdap_status=[], can_register_now=True, should_monitor=False, message="Domain is available for registration!" ) # 200 = Domain exists in registry if resp.status_code == 200: data = resp.json() rdap_status = data.get('status', []) status_lower = ' '.join(str(s).lower() for s in rdap_status) # Extract deletion date from events deletion_date = None events = data.get('events', []) for event in events: action = event.get('eventAction', '').lower() date_str = event.get('eventDate', '') if action in ('deletion', 'expiration') and date_str: try: # Parse ISO date deletion_date = datetime.fromisoformat(date_str.replace('Z', '+00:00')) except (ValueError, TypeError): pass # Check for pending delete / redemption status is_pending = any(x in status_lower for x in [ 'pending delete', 'pendingdelete', 'pending purge', 'pendingpurge', 'redemption period', 'redemptionperiod', 'pending restore', 'pendingrestore', ]) if is_pending: return DropStatus( domain=domain, status='dropping_soon', rdap_status=rdap_status, can_register_now=False, should_monitor=True, message="Domain is being deleted. Track it to get notified when available!", deletion_date=deletion_date, ) # Domain is actively registered return DropStatus( domain=domain, status='taken', rdap_status=rdap_status, can_register_now=False, should_monitor=False, message="Domain was re-registered", deletion_date=None, ) # Other status code logger.warning(f"RDAP returned {resp.status_code} for {domain}") return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message=f"RDAP returned HTTP {resp.status_code}" ) except httpx.TimeoutException: logger.warning(f"RDAP timeout for {domain}") return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message="RDAP timeout" ) except Exception as e: logger.warning(f"RDAP error for {domain}: {e}") return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message=str(e) ) # Rate limiting: max requests per second per TLD RATE_LIMITS = { 'default': 5, # 5 requests per second 'ch': 10, # Swiss registry is faster 'li': 10, } async def check_drops_batch( domains: list[tuple[int, str]], # List of (id, full_domain) delay_between_requests: float = 0.2, # 200ms = 5 req/s ) -> list[tuple[int, DropStatus]]: """ Check multiple drops with rate limiting. Args: domains: List of (drop_id, full_domain) tuples delay_between_requests: Seconds to wait between requests (default 200ms) Returns: List of (drop_id, DropStatus) tuples """ results = [] for drop_id, domain in domains: try: status = await check_drop_status(domain) results.append((drop_id, status)) except Exception as e: logger.error(f"Batch check failed for {domain}: {e}") results.append((drop_id, DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message=str(e), ))) # Rate limit await asyncio.sleep(delay_between_requests) return results