""" Advanced Domain Availability Checker Uses multiple methods for maximum accuracy: 1. RDAP (Registration Data Access Protocol) - Modern, accurate, JSON format 2. Custom RDAP endpoints (for TLDs like .ch, .li with own RDAP servers) 3. DNS lookup - Fast availability check 4. WHOIS - Fallback for TLDs without RDAP Performance optimized with caching and async operations. """ import asyncio import logging from datetime import datetime, timezone from dataclasses import dataclass, field from typing import Optional from functools import lru_cache import dns.resolver import whois import whodap import httpx from app.models.domain import DomainStatus logger = logging.getLogger(__name__) @dataclass class DomainCheckResult: """Result of a domain availability check.""" domain: str status: DomainStatus is_available: bool registrar: Optional[str] = None expiration_date: Optional[datetime] = None creation_date: Optional[datetime] = None updated_date: Optional[datetime] = None name_servers: Optional[list[str]] = None error_message: Optional[str] = None check_method: str = "unknown" # rdap, whois, dns raw_data: Optional[dict] = None def to_dict(self) -> dict: """Convert to dictionary.""" return { "domain": self.domain, "status": self.status.value, "is_available": self.is_available, "registrar": self.registrar, "expiration_date": self.expiration_date.isoformat() if self.expiration_date else None, "creation_date": self.creation_date.isoformat() if self.creation_date else None, "updated_date": self.updated_date.isoformat() if self.updated_date else None, "name_servers": self.name_servers, "error_message": self.error_message, "check_method": self.check_method, } class DomainChecker: """ Advanced domain availability checker. Priority: RDAP > DNS > WHOIS """ # TLDs known to support RDAP via whodap library RDAP_SUPPORTED_TLDS = { 'com', 'net', 'org', 'info', 'biz', 'mobi', 'name', 'pro', 'app', 'dev', 'page', 'new', 'day', 'eat', 'fly', 'how', 'io', 'co', 'ai', 'me', 'tv', 'cc', 'ws', 'xyz', 'top', 'site', 'online', 'tech', 'store', 'club', 'de', 'uk', 'fr', 'nl', 'eu', 'be', 'at', 'us', } # TLDs with custom RDAP endpoints (not in whodap but have their own RDAP servers) # These registries have their own RDAP APIs that we query directly CUSTOM_RDAP_ENDPOINTS = { 'ch': 'https://rdap.nic.ch/domain/', # Swiss .ch domains 'li': 'https://rdap.nic.ch/domain/', # Liechtenstein .li (same registry) } # TLDs that only support WHOIS (no RDAP at all) # Note: .ch and .li removed - they have custom RDAP! WHOIS_ONLY_TLDS = { 'ru', 'su', 'ua', 'by', 'kz', } def __init__(self): """Initialize the domain checker.""" self._dns_resolver = dns.resolver.Resolver() self._dns_resolver.timeout = 3 self._dns_resolver.lifetime = 5 self._cache = {} self._cache_ttl = 300 # 5 minutes def _normalize_domain(self, domain: str) -> str: """Normalize domain name.""" domain = domain.lower().strip() if domain.startswith('http://'): domain = domain[7:] elif domain.startswith('https://'): domain = domain[8:] if domain.startswith('www.'): domain = domain[4:] domain = domain.split('/')[0] return domain def _get_tld(self, domain: str) -> str: """Extract TLD from domain.""" parts = domain.split('.') return parts[-1].lower() if parts else '' def _get_sld(self, domain: str) -> str: """Extract second-level domain (without TLD).""" parts = domain.split('.') return parts[0] if parts else domain def _parse_datetime(self, date_str: str) -> Optional[datetime]: """Parse various datetime formats.""" if not date_str: return None # Common formats formats = [ "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", ] for fmt in formats: try: return datetime.strptime(date_str.replace('+00:00', 'Z').replace('.000Z', 'Z'), fmt) except ValueError: continue # Try ISO format try: # Handle formats like "2028-09-14T07:00:00.000+00:00" if '+' in date_str: date_str = date_str.split('+')[0] + '+00:00' return datetime.fromisoformat(date_str.replace('Z', '+00:00')) except: pass return None async def _check_custom_rdap(self, domain: str) -> Optional[DomainCheckResult]: """ Check domain using custom RDAP endpoints (e.g., nic.ch for .ch/.li domains). These are registries that have their own RDAP servers not covered by whodap. """ tld = self._get_tld(domain) if tld not in self.CUSTOM_RDAP_ENDPOINTS: return None endpoint = self.CUSTOM_RDAP_ENDPOINTS[tld] url = f"{endpoint}{domain}" try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url, follow_redirects=True) if response.status_code == 404: # Domain not found = available return DomainCheckResult( domain=domain, status=DomainStatus.AVAILABLE, is_available=True, check_method="rdap_custom", ) if response.status_code == 200: # Domain exists = taken data = response.json() # Extract dates from events expiration_date = None creation_date = None updated_date = None registrar = None name_servers = [] # Parse events events = data.get('events', []) for event in events: action = event.get('eventAction', '').lower() date_str = event.get('eventDate', '') if 'expiration' in action and not expiration_date: expiration_date = self._parse_datetime(date_str) elif 'registration' in action and not creation_date: creation_date = self._parse_datetime(date_str) elif 'changed' in action or 'update' in action: updated_date = self._parse_datetime(date_str) # Parse nameservers nameservers = data.get('nameservers', []) for ns in nameservers: if isinstance(ns, dict): ns_name = ns.get('ldhName', '') if ns_name: name_servers.append(ns_name.lower()) # Parse registrar from entities entities = data.get('entities', []) for entity in entities: roles = entity.get('roles', []) if 'registrar' in roles: vcard = entity.get('vcardArray', []) if isinstance(vcard, list) and len(vcard) > 1: for item in vcard[1]: if isinstance(item, list) and len(item) > 3: if item[0] in ('fn', 'org') and item[3]: registrar = str(item[3]) break return DomainCheckResult( domain=domain, status=DomainStatus.TAKEN, is_available=False, registrar=registrar, expiration_date=expiration_date, creation_date=creation_date, updated_date=updated_date, name_servers=name_servers if name_servers else None, check_method="rdap_custom", ) # Other status codes - try fallback logger.warning(f"Custom RDAP returned {response.status_code} for {domain}") return None except httpx.TimeoutException: logger.warning(f"Custom RDAP timeout for {domain}") return None except Exception as e: logger.warning(f"Custom RDAP error for {domain}: {e}") return None async def _check_rdap(self, domain: str) -> Optional[DomainCheckResult]: """ Check domain using RDAP (Registration Data Access Protocol). Returns None if RDAP is not available for this TLD. """ tld = self._get_tld(domain) sld = self._get_sld(domain) try: # Run RDAP lookup in thread pool loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: whodap.lookup_domain(sld, tld) ) # Parse events for dates expiration_date = None creation_date = None updated_date = None registrar = None if response.events: for event in response.events: # Access event data from __dict__ event_dict = event.__dict__ if hasattr(event, '__dict__') else {} action = event_dict.get('eventAction', '') date_str = event_dict.get('eventDate', '') if not action or not date_str: continue action_lower = action.lower() if 'expiration' in action_lower and not expiration_date: expiration_date = self._parse_datetime(date_str) elif 'registration' in action_lower and not creation_date: creation_date = self._parse_datetime(date_str) elif 'changed' in action_lower and 'database' not in action_lower: updated_date = self._parse_datetime(date_str) # Extract registrar from entities if response.entities: for entity in response.entities: try: entity_dict = entity.__dict__ if hasattr(entity, '__dict__') else {} roles = entity_dict.get('roles', []) if 'registrar' in roles: vcard = entity_dict.get('vcardArray', []) if isinstance(vcard, list) and len(vcard) > 1: for item in vcard[1]: if isinstance(item, list) and len(item) > 3: if item[0] == 'fn' and item[3]: registrar = str(item[3]) break elif item[0] == 'org' and item[3]: registrar = str(item[3]) except Exception: continue return DomainCheckResult( domain=domain, status=DomainStatus.TAKEN, is_available=False, registrar=registrar, expiration_date=expiration_date, creation_date=creation_date, updated_date=updated_date, check_method="rdap", ) except NotImplementedError: # No RDAP server for this TLD logger.debug(f"No RDAP server for TLD .{tld}") return None except Exception as e: error_msg = str(e).lower() # Check if domain is not found (available) if 'not found' in error_msg or '404' in error_msg: return DomainCheckResult( domain=domain, status=DomainStatus.AVAILABLE, is_available=True, check_method="rdap", ) logger.warning(f"RDAP check failed for {domain}: {e}") return None async def _check_whois(self, domain: str) -> DomainCheckResult: """Check domain using WHOIS (fallback method).""" try: loop = asyncio.get_event_loop() w = await loop.run_in_executor(None, whois.whois, domain) # Check if domain is available # 1. No domain_name returned if w.domain_name is None: return DomainCheckResult( domain=domain, status=DomainStatus.AVAILABLE, is_available=True, check_method="whois", ) # 2. Check the raw text for "not found" indicators raw_text = str(w.text).lower() if hasattr(w, 'text') and w.text else "" not_found_indicators = [ 'no match', 'not found', 'no entries', 'no data', 'status: free', 'no entry', 'we do not have an entry', 'domain not found', 'is available', 'available for registration', 'no object found', 'object does not exist', ] if any(indicator in raw_text for indicator in not_found_indicators): return DomainCheckResult( domain=domain, status=DomainStatus.AVAILABLE, is_available=True, check_method="whois", ) # 3. Check if no registrar and no creation date (likely available) if not w.registrar and not w.creation_date and not w.name_servers: return DomainCheckResult( domain=domain, status=DomainStatus.AVAILABLE, is_available=True, check_method="whois", ) # Extract data expiration = None creation = None registrar = None name_servers = None if w.expiration_date: if isinstance(w.expiration_date, list): expiration = w.expiration_date[0] else: expiration = w.expiration_date if w.creation_date: if isinstance(w.creation_date, list): creation = w.creation_date[0] else: creation = w.creation_date if w.registrar: registrar = w.registrar if isinstance(w.registrar, str) else str(w.registrar) if w.name_servers: if isinstance(w.name_servers, list): name_servers = [str(ns).lower() for ns in w.name_servers] else: name_servers = [str(w.name_servers).lower()] return DomainCheckResult( domain=domain, status=DomainStatus.TAKEN, is_available=False, registrar=registrar, expiration_date=expiration, creation_date=creation, name_servers=name_servers, check_method="whois", ) except Exception as e: # Check if it's a "domain not found" type error (indicates available) error_str = str(e).lower() not_found_phrases = [ 'no match', 'not found', 'no entries', 'no data', 'status: free', 'no entry', 'we do not have an entry', 'domain not found', 'is available', 'no object found', 'object does not exist', ] if any(phrase in error_str for phrase in not_found_phrases): return DomainCheckResult( domain=domain, status=DomainStatus.AVAILABLE, is_available=True, check_method="whois", ) # Otherwise it's a real error return DomainCheckResult( domain=domain, status=DomainStatus.ERROR, is_available=False, error_message=str(e), check_method="whois", ) async def _check_dns(self, domain: str) -> bool: """ Quick DNS check for domain existence. Returns True if domain appears available (no DNS records). """ try: loop = asyncio.get_event_loop() # Try A record try: await loop.run_in_executor( None, lambda: self._dns_resolver.resolve(domain, 'A') ) return False # Has A record = taken except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers): pass # Try NS record try: await loop.run_in_executor( None, lambda: self._dns_resolver.resolve(domain, 'NS') ) return False # Has NS record = taken except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers): pass return True # No DNS records = likely available except Exception: return True # On error, assume might be available async def check_domain(self, domain: str, quick: bool = False) -> DomainCheckResult: """ Check domain availability using the best available method. Priority: 1. Custom RDAP (for TLDs like .ch, .li with own RDAP servers) 2. RDAP via whodap (most accurate, modern protocol) 3. WHOIS (fallback for TLDs without RDAP) 4. DNS (quick check only, or final validation) Args: domain: Domain name to check quick: If True, only use DNS (faster but less accurate) Returns: DomainCheckResult with detailed availability info """ domain = self._normalize_domain(domain) if not domain or '.' not in domain: return DomainCheckResult( domain=domain, status=DomainStatus.ERROR, is_available=False, error_message="Invalid domain format", ) tld = self._get_tld(domain) # Quick DNS check if quick: dns_available = await self._check_dns(domain) return DomainCheckResult( domain=domain, status=DomainStatus.AVAILABLE if dns_available else DomainStatus.TAKEN, is_available=dns_available, check_method="dns", ) # Priority 1: Try custom RDAP endpoints (for .ch, .li, etc.) if tld in self.CUSTOM_RDAP_ENDPOINTS: custom_result = await self._check_custom_rdap(domain) if custom_result: # Validate with DNS if custom RDAP says available if custom_result.is_available: dns_available = await self._check_dns(domain) if not dns_available: custom_result.status = DomainStatus.TAKEN custom_result.is_available = False return custom_result # If custom RDAP fails, fall through to DNS check logger.info(f"Custom RDAP failed for {domain}, using DNS fallback") # Priority 2: Try standard RDAP via whodap if tld not in self.WHOIS_ONLY_TLDS and tld not in self.CUSTOM_RDAP_ENDPOINTS: rdap_result = await self._check_rdap(domain) if rdap_result: # Validate with DNS if RDAP says available if rdap_result.is_available: dns_available = await self._check_dns(domain) if not dns_available: rdap_result.status = DomainStatus.TAKEN rdap_result.is_available = False return rdap_result # Priority 3: Fall back to WHOIS (skip for TLDs that block it like .ch) if tld not in self.CUSTOM_RDAP_ENDPOINTS: whois_result = await self._check_whois(domain) # Validate with DNS if whois_result.is_available: dns_available = await self._check_dns(domain) if not dns_available: whois_result.status = DomainStatus.TAKEN whois_result.is_available = False return whois_result # Final fallback: DNS-only check (for TLDs where everything else failed) dns_available = await self._check_dns(domain) return DomainCheckResult( domain=domain, status=DomainStatus.AVAILABLE if dns_available else DomainStatus.TAKEN, is_available=dns_available, check_method="dns", ) async def check_multiple(self, domains: list[str], quick: bool = False) -> list[DomainCheckResult]: """ Check multiple domains concurrently. Args: domains: List of domain names quick: Use quick DNS-only check Returns: List of DomainCheckResult """ tasks = [self.check_domain(d, quick=quick) for d in domains] return await asyncio.gather(*tasks) def validate_domain(self, domain: str) -> tuple[bool, str]: """ Validate domain format. Returns: Tuple of (is_valid, error_message) """ domain = self._normalize_domain(domain) if not domain: return False, "Domain cannot be empty" if '.' not in domain: return False, "Domain must include TLD (e.g., .com)" parts = domain.split('.') for part in parts: if not part: return False, "Invalid domain format" if len(part) > 63: return False, "Domain label too long (max 63 characters)" if not all(c.isalnum() or c == '-' for c in part): return False, "Domain contains invalid characters" if part.startswith('-') or part.endswith('-'): return False, "Domain labels cannot start or end with hyphen" if len(domain) > 253: return False, "Domain name too long (max 253 characters)" return True, "" # Singleton instance domain_checker = DomainChecker() async def check_all_domains(db): """ Check availability of all watched domains. This is triggered manually from admin panel or by scheduled job. """ from app.models.domain import Domain, DomainCheck from sqlalchemy import select logger.info("Starting check for all watched domains...") # Get all domains result = await db.execute(select(Domain)) domains = result.scalars().all() if not domains: logger.info("No domains to check") return {"checked": 0, "available": 0, "taken": 0, "errors": 0} checked = 0 available = 0 taken = 0 errors = 0 for domain_obj in domains: try: check_result = await domain_checker.check_domain(domain_obj.domain) # Update domain status domain_obj.status = check_result.status.value domain_obj.is_available = check_result.is_available domain_obj.last_checked = datetime.utcnow() if check_result.expiration_date: domain_obj.expiration_date = check_result.expiration_date # Create check record domain_check = DomainCheck( domain_id=domain_obj.id, status=check_result.status.value, is_available=check_result.is_available, check_method=check_result.check_method, ) db.add(domain_check) checked += 1 if check_result.is_available: available += 1 else: taken += 1 logger.info(f"Checked {domain_obj.domain}: {check_result.status.value}") except Exception as e: logger.error(f"Error checking {domain_obj.domain}: {e}") errors += 1 await db.commit() logger.info(f"Domain check complete: {checked} checked, {available} available, {taken} taken, {errors} errors") return { "checked": checked, "available": available, "taken": taken, "errors": errors, }