""" Drop Status Checker ==================== Dedicated RDAP checker for dropped domains. Correctly identifies pending_delete, redemption, and available status. Also extracts deletion date for countdown display. """ import httpx import logging from dataclasses import dataclass from datetime import datetime from typing import Optional logger = logging.getLogger(__name__) # RDAP endpoints for different TLDs RDAP_ENDPOINTS = { # ccTLDs 'ch': 'https://rdap.nic.ch/domain/', 'li': 'https://rdap.nic.ch/domain/', 'de': 'https://rdap.denic.de/domain/', # gTLDs via CentralNic 'online': 'https://rdap.centralnic.com/online/domain/', 'xyz': 'https://rdap.centralnic.com/xyz/domain/', 'club': 'https://rdap.nic.club/domain/', # gTLDs via Afilias/Donuts 'info': 'https://rdap.afilias.net/rdap/info/domain/', 'biz': 'https://rdap.afilias.net/rdap/biz/domain/', 'org': 'https://rdap.publicinterestregistry.org/rdap/org/domain/', # Google TLDs 'dev': 'https://rdap.nic.google/domain/', 'app': 'https://rdap.nic.google/domain/', } # Typical deletion periods after "pending delete" status (in days) TLD_DELETE_PERIODS = { 'ch': 40, # .ch has ~40 day redemption 'li': 40, 'com': 35, # 30 redemption + 5 pending delete 'net': 35, 'org': 35, 'info': 35, 'biz': 35, 'online': 35, 'xyz': 35, 'club': 35, 'dev': 35, 'app': 35, } @dataclass class DropStatus: """Status of a dropped domain.""" domain: str status: str # 'available', 'dropping_soon', 'taken', 'unknown' rdap_status: list[str] can_register_now: bool should_monitor: bool message: str deletion_date: Optional[str] = None # ISO format date when domain will be deleted estimated_available: Optional[str] = None # Estimated date when registrable def _parse_date(date_str: str) -> Optional[datetime]: """Parse ISO date string to datetime.""" if not date_str: return None try: # Handle various formats for fmt in ['%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d']: try: return datetime.strptime(date_str[:19].replace('Z', ''), fmt.replace('Z', '')) except ValueError: continue return None except Exception: return None async def check_drop_status(domain: str) -> DropStatus: """ Check the real status of a dropped domain via RDAP. Returns: DropStatus with one of: - 'available': Domain can be registered NOW - 'dropping_soon': Domain is in pending delete/redemption (monitor it!) - 'taken': Domain was re-registered - 'unknown': Could not determine status """ tld = domain.split('.')[-1].lower() endpoint = RDAP_ENDPOINTS.get(tld) if not endpoint: logger.warning(f"No RDAP endpoint for .{tld}, returning unknown") return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message=f"No RDAP endpoint for .{tld}" ) url = f"{endpoint}{domain}" try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(url) # 404 = Domain not found = AVAILABLE! if resp.status_code == 404: return DropStatus( domain=domain, status='available', rdap_status=[], can_register_now=True, should_monitor=False, message="Available now!" ) # 200 = Domain exists in registry if resp.status_code == 200: data = resp.json() rdap_status = data.get('status', []) status_lower = ' '.join(str(s).lower() for s in rdap_status) # Extract deletion date from events deletion_date = None expiration_date = None events = data.get('events', []) for event in events: action = event.get('eventAction', '').lower() date_str = event.get('eventDate', '') if 'deletion' in action: deletion_date = date_str[:10] if date_str else None elif 'expiration' in action: expiration_date = date_str[:10] if date_str else None # Check for pending delete / redemption status is_pending = any(x in status_lower for x in [ 'pending delete', 'pendingdelete', 'pending purge', 'pendingpurge', 'redemption period', 'redemptionperiod', 'pending restore', 'pendingrestore', ]) if is_pending: # Calculate estimated availability estimated = None if deletion_date: try: del_dt = datetime.strptime(deletion_date, '%Y-%m-%d') # For most TLDs, domain is available shortly after deletion date estimated = del_dt.strftime('%Y-%m-%d') except Exception: pass return DropStatus( domain=domain, status='dropping_soon', rdap_status=rdap_status, can_register_now=False, should_monitor=True, message="Dropping soon - track to get notified!", deletion_date=deletion_date, estimated_available=estimated, ) # Domain is actively registered return DropStatus( domain=domain, status='taken', rdap_status=rdap_status, can_register_now=False, should_monitor=False, message="Re-registered" ) # Other status code logger.warning(f"RDAP returned {resp.status_code} for {domain}") return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message=f"RDAP {resp.status_code}" ) except httpx.TimeoutException: return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message="Timeout" ) except Exception as e: logger.warning(f"RDAP error for {domain}: {e}") return DropStatus( domain=domain, status='unknown', rdap_status=[], can_register_now=False, should_monitor=False, message=str(e) ) async def batch_check_drops(domains: list[tuple[int, str]]) -> list[tuple[int, DropStatus]]: """ Check status for multiple domains in parallel. Args: domains: List of (id, domain_name) tuples Returns: List of (id, DropStatus) tuples """ import asyncio async def check_one(item: tuple[int, str]) -> tuple[int, DropStatus]: drop_id, domain = item status = await check_drop_status(domain) return (drop_id, status) # Limit concurrency to avoid overwhelming RDAP servers semaphore = asyncio.Semaphore(10) async def limited_check(item): async with semaphore: return await check_one(item) results = await asyncio.gather(*[limited_check(d) for d in domains]) return results