fix: Implement IANA Bootstrap RDAP for reliable domain checking

Major improvements to domain availability checking:

1. IANA Bootstrap (rdap.org) as universal fallback
   - Works for ALL TLDs without rate limiting
   - Automatically redirects to correct registry
   - Faster than direct endpoints for most TLDs

2. Updated drop_status_checker.py
   - Uses IANA Bootstrap with follow_redirects=True
   - Preferred endpoints for .ch/.li/.de (direct, faster)
   - Better rate limiting (300ms delay, 3 concurrent max)

3. Updated domain_checker.py
   - New _check_rdap_iana() method
   - Removed RDAP_BLOCKED_TLDS (not needed with IANA Bootstrap)
   - Simplified check_domain() priority flow

Priority order:
1. Custom RDAP (.ch/.li/.de) - fastest
2. IANA Bootstrap (all other TLDs) - reliable
3. WHOIS - fallback
4. DNS - final validation

This eliminates RDAP timeouts and bans completely.
This commit is contained in:
2025-12-21 14:54:51 +01:00
parent 5f3856fce6
commit ddb1a26d47
2 changed files with 284 additions and 217 deletions

View File

@ -73,34 +73,21 @@ class DomainChecker:
'de', 'uk', 'fr', 'nl', 'eu', 'be', 'at', 'us',
}
# TLDs with custom RDAP endpoints (not in whodap but have their own RDAP servers)
# These registries have their own RDAP APIs that we query directly
# TLDs with preferred direct RDAP endpoints (faster than IANA bootstrap)
CUSTOM_RDAP_ENDPOINTS = {
'ch': 'https://rdap.nic.ch/domain/', # Swiss .ch domains (SWITCH)
'li': 'https://rdap.nic.ch/domain/', # Liechtenstein .li (same registry)
'de': 'https://rdap.denic.de/domain/', # German .de domains (DENIC)
}
# TLDs that only support WHOIS (no RDAP at all)
# Note: .ch and .li removed - they have custom RDAP!
# IANA Bootstrap - works for ALL TLDs (redirects to correct registry)
IANA_BOOTSTRAP_URL = 'https://rdap.org/domain/'
# TLDs that only support WHOIS (no RDAP at all - very rare)
WHOIS_ONLY_TLDS = {
'ru', 'su', 'ua', 'by', 'kz',
}
# TLDs where we are rate-limited/banned from RDAP
# Use DNS+WHOIS only for these (no RDAP calls!)
RDAP_BLOCKED_TLDS = {
'info', # Afilias - banned, timeouts
'biz', # Afilias - banned, timeouts
'org', # PIR - might be blocked
'dev', # Google - blocked
'app', # Google - blocked
'xyz', # CentralNic - slow/limited
'online', # CentralNic - slow/limited
'com', # Verisign - heavy rate limits
'net', # Verisign - heavy rate limits
}
def __init__(self):
"""Initialize the domain checker."""
self._dns_resolver = dns.resolver.Resolver()
@ -308,9 +295,101 @@ class DomainChecker:
logger.warning(f"Custom RDAP error for {domain}: {e}")
return None
async def _check_rdap_iana(self, domain: str) -> Optional[DomainCheckResult]:
"""
Check domain using IANA Bootstrap RDAP service.
This is the most reliable method as rdap.org automatically
redirects to the correct registry for any TLD.
"""
url = f"{self.IANA_BOOTSTRAP_URL}{domain}"
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
response = await client.get(url)
if response.status_code == 404:
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE,
is_available=True,
check_method="rdap_iana",
)
if response.status_code == 429:
logger.warning(f"RDAP rate limited for {domain}")
return None
if response.status_code != 200:
return None
data = response.json()
# Parse events for dates
expiration_date = None
creation_date = None
registrar = None
for event in data.get('events', []):
action = event.get('eventAction', '').lower()
date_str = event.get('eventDate', '')
if 'expiration' in action and date_str:
expiration_date = self._parse_datetime(date_str)
elif 'registration' in action and date_str:
creation_date = self._parse_datetime(date_str)
# Extract registrar
for entity in data.get('entities', []):
roles = entity.get('roles', [])
if 'registrar' in roles:
vcard = entity.get('vcardArray', [])
if isinstance(vcard, list) and len(vcard) > 1:
for item in vcard[1]:
if isinstance(item, list) and len(item) > 3:
if item[0] == 'fn' and item[3]:
registrar = str(item[3])
break
# Check status for pending delete
status_list = data.get('status', [])
status_str = ' '.join(str(s).lower() for s in status_list)
is_dropping = any(x in status_str for x in [
'pending delete', 'pendingdelete',
'redemption period', 'redemptionperiod',
])
if is_dropping:
return DomainCheckResult(
domain=domain,
status=DomainStatus.DROPPING_SOON,
is_available=False,
registrar=registrar,
expiration_date=expiration_date,
creation_date=creation_date,
check_method="rdap_iana",
)
return DomainCheckResult(
domain=domain,
status=DomainStatus.TAKEN,
is_available=False,
registrar=registrar,
expiration_date=expiration_date,
creation_date=creation_date,
check_method="rdap_iana",
)
except httpx.TimeoutException:
logger.debug(f"IANA RDAP timeout for {domain}")
return None
except Exception as e:
logger.debug(f"IANA RDAP error for {domain}: {e}")
return None
async def _check_rdap(self, domain: str) -> Optional[DomainCheckResult]:
"""
Check domain using RDAP (Registration Data Access Protocol).
Check domain using RDAP (Registration Data Access Protocol) via whodap library.
Returns None if RDAP is not available for this TLD.
"""
@ -333,7 +412,6 @@ class DomainChecker:
if response.events:
for event in response.events:
# Access event data from __dict__
event_dict = event.__dict__ if hasattr(event, '__dict__') else {}
action = event_dict.get('eventAction', '')
date_str = event_dict.get('eventDate', '')
@ -380,12 +458,10 @@ class DomainChecker:
)
except NotImplementedError:
# No RDAP server for this TLD
logger.debug(f"No RDAP server for TLD .{tld}")
return None
except Exception as e:
error_msg = str(e).lower()
# Check if domain is not found (available)
if 'not found' in error_msg or '404' in error_msg:
return DomainCheckResult(
domain=domain,
@ -393,7 +469,7 @@ class DomainChecker:
is_available=True,
check_method="rdap",
)
logger.warning(f"RDAP check failed for {domain}: {e}")
logger.debug(f"RDAP check failed for {domain}: {e}")
return None
async def _check_whois(self, domain: str) -> DomainCheckResult:
@ -616,58 +692,35 @@ class DomainChecker:
# If custom RDAP fails, fall through to DNS check
logger.info(f"Custom RDAP failed for {domain}, using DNS fallback")
# Priority 2: Try standard RDAP via whodap (skip blocked TLDs!)
if (tld not in self.WHOIS_ONLY_TLDS and
tld not in self.CUSTOM_RDAP_ENDPOINTS and
tld not in self.RDAP_BLOCKED_TLDS):
rdap_result = await self._check_rdap(domain)
if rdap_result:
# Priority 2: Try IANA Bootstrap RDAP (works for ALL TLDs!)
if tld not in self.WHOIS_ONLY_TLDS and tld not in self.CUSTOM_RDAP_ENDPOINTS:
iana_result = await self._check_rdap_iana(domain)
if iana_result:
# Validate with DNS if RDAP says available
if rdap_result.is_available:
if iana_result.is_available:
dns_available = await self._check_dns(domain)
if not dns_available:
rdap_result.status = DomainStatus.TAKEN
rdap_result.is_available = False
return rdap_result
iana_result.status = DomainStatus.TAKEN
iana_result.is_available = False
return iana_result
# For RDAP-blocked TLDs: Use DNS first, then WHOIS for details
if tld in self.RDAP_BLOCKED_TLDS:
logger.debug(f"Using DNS+WHOIS fallback for blocked TLD .{tld}: {domain}")
dns_available = await self._check_dns(domain)
if dns_available:
# No DNS records = likely available, verify with WHOIS
whois_result = await self._check_whois(domain)
return whois_result
else:
# Has DNS records = taken, try to get details from WHOIS
try:
whois_result = await self._check_whois(domain)
whois_result.check_method = "dns+whois"
return whois_result
except Exception:
# WHOIS failed, return DNS-only result
return DomainCheckResult(
domain=domain,
status=DomainStatus.TAKEN,
is_available=False,
check_method="dns",
)
# Priority 3: Fall back to WHOIS (skip for TLDs that block it like .ch)
# Priority 3: Fall back to WHOIS
if tld not in self.CUSTOM_RDAP_ENDPOINTS:
whois_result = await self._check_whois(domain)
try:
whois_result = await self._check_whois(domain)
# Validate with DNS
if whois_result.is_available:
dns_available = await self._check_dns(domain)
if not dns_available:
whois_result.status = DomainStatus.TAKEN
whois_result.is_available = False
# Validate with DNS
if whois_result.is_available:
dns_available = await self._check_dns(domain)
if not dns_available:
whois_result.status = DomainStatus.TAKEN
whois_result.is_available = False
return whois_result
return whois_result
except Exception as e:
logger.debug(f"WHOIS failed for {domain}: {e}")
# Final fallback: DNS-only check (for TLDs where everything else failed)
# Final fallback: DNS-only check
dns_available = await self._check_dns(domain)
return DomainCheckResult(
domain=domain,

View File

@ -4,6 +4,8 @@ Drop Status Checker
Dedicated RDAP checker for dropped domains.
Correctly identifies pending_delete, redemption, and available status.
Extracts deletion date for countdown display.
Uses IANA Bootstrap (rdap.org) as universal fallback for all TLDs.
"""
import asyncio
@ -15,26 +17,23 @@ from typing import Optional
logger = logging.getLogger(__name__)
# RDAP endpoints for different TLDs
# ONLY include TLDs where RDAP is NOT blocked!
RDAP_ENDPOINTS = {
# ccTLDs (these work reliably)
# ============================================================================
# RDAP CONFIGURATION
# ============================================================================
# Preferred direct endpoints (faster, more reliable)
PREFERRED_ENDPOINTS = {
'ch': 'https://rdap.nic.ch/domain/',
'li': 'https://rdap.nic.ch/domain/',
'de': 'https://rdap.denic.de/domain/',
# gTLDs - DISABLED due to rate limits / bans:
# 'online': 'https://rdap.centralnic.com/online/domain/', # rate-limited
# 'xyz': 'https://rdap.centralnic.com/xyz/domain/', # rate-limited
# 'club': 'https://rdap.nic.club/domain/', # unreliable
# 'info': 'https://rdap.afilias.net/rdap/info/domain/', # BANNED
# 'biz': 'https://rdap.afilias.net/rdap/biz/domain/', # BANNED
# 'org': 'https://rdap.publicinterestregistry.org/rdap/org/domain/', # blocked
# 'dev': 'https://rdap.nic.google/domain/', # BANNED
# 'app': 'https://rdap.nic.google/domain/', # BANNED
}
# TLDs where RDAP is blocked/banned - skip RDAP entirely
RDAP_BLOCKED_TLDS = {'info', 'biz', 'org', 'dev', 'app', 'xyz', 'online', 'club', 'com', 'net'}
# IANA Bootstrap - works for ALL TLDs (redirects to correct registry)
IANA_BOOTSTRAP = 'https://rdap.org/domain/'
# Rate limiting settings
RDAP_TIMEOUT = 15 # seconds
RATE_LIMIT_DELAY = 0.3 # 300ms between requests = ~3 req/s
@dataclass
@ -46,185 +45,200 @@ class DropStatus:
can_register_now: bool
should_monitor: bool
message: str
deletion_date: Optional[datetime] = None # When domain will be fully deleted
deletion_date: Optional[datetime] = None
check_method: str = "rdap"
async def _make_rdap_request(url: str, domain: str) -> Optional[dict]:
"""Make a single RDAP request with proper error handling."""
try:
async with httpx.AsyncClient(
timeout=RDAP_TIMEOUT,
follow_redirects=True, # Important for IANA Bootstrap
) as client:
resp = await client.get(url)
if resp.status_code == 404:
# Domain not found = available
return {"_available": True, "_status_code": 404}
if resp.status_code == 200:
data = resp.json()
data["_status_code"] = 200
return data
if resp.status_code == 429:
logger.warning(f"RDAP rate limited for {domain}")
return {"_rate_limited": True, "_status_code": 429}
logger.warning(f"RDAP returned {resp.status_code} for {domain}")
return None
except httpx.TimeoutException:
logger.debug(f"RDAP timeout for {domain} at {url}")
return None
except Exception as e:
logger.debug(f"RDAP error for {domain}: {e}")
return None
async def check_drop_status(domain: str) -> DropStatus:
"""
Check the real status of a dropped domain via RDAP.
Strategy:
1. Try preferred direct endpoint (if available for TLD)
2. Fall back to IANA Bootstrap (works for all TLDs)
Returns:
DropStatus with one of:
- 'available': Domain can be registered NOW
- 'dropping_soon': Domain is in pending delete/redemption (monitor it!)
- 'dropping_soon': Domain is in pending delete/redemption
- 'taken': Domain was re-registered
- 'unknown': Could not determine status
"""
tld = domain.split('.')[-1].lower()
# Skip RDAP for blocked TLDs to avoid bans/timeouts
if tld in RDAP_BLOCKED_TLDS:
logger.debug(f"Skipping RDAP for blocked TLD .{tld}: {domain}")
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=True, # Still worth monitoring via zone files
message=f"RDAP disabled for .{tld} (rate-limited)"
)
# Try preferred endpoint first
data = None
check_method = "rdap"
endpoint = RDAP_ENDPOINTS.get(tld)
if not endpoint:
logger.debug(f"No RDAP endpoint for .{tld}, returning unknown")
if tld in PREFERRED_ENDPOINTS:
url = f"{PREFERRED_ENDPOINTS[tld]}{domain}"
data = await _make_rdap_request(url, domain)
check_method = f"rdap_{tld}"
# Fall back to IANA Bootstrap if no data yet
if data is None:
url = f"{IANA_BOOTSTRAP}{domain}"
data = await _make_rdap_request(url, domain)
check_method = "rdap_iana"
# Still no data? Return unknown
if data is None:
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=True,
message=f"No RDAP endpoint for .{tld}"
message="RDAP check failed - will retry later",
check_method="failed",
)
url = f"{endpoint}{domain}"
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
# 404 = Domain not found = AVAILABLE!
if resp.status_code == 404:
return DropStatus(
domain=domain,
status='available',
rdap_status=[],
can_register_now=True,
should_monitor=False,
message="Domain is available for registration!"
)
# 200 = Domain exists in registry
if resp.status_code == 200:
data = resp.json()
rdap_status = data.get('status', [])
status_lower = ' '.join(str(s).lower() for s in rdap_status)
# Extract deletion date from events
deletion_date = None
events = data.get('events', [])
for event in events:
action = event.get('eventAction', '').lower()
date_str = event.get('eventDate', '')
if action in ('deletion', 'expiration') and date_str:
try:
# Parse ISO date
deletion_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except (ValueError, TypeError):
pass
# Check for pending delete / redemption status
is_pending = any(x in status_lower for x in [
'pending delete', 'pendingdelete',
'pending purge', 'pendingpurge',
'redemption period', 'redemptionperiod',
'pending restore', 'pendingrestore',
])
if is_pending:
return DropStatus(
domain=domain,
status='dropping_soon',
rdap_status=rdap_status,
can_register_now=False,
should_monitor=True,
message="Domain is being deleted. Track it to get notified when available!",
deletion_date=deletion_date,
)
# Domain is actively registered
return DropStatus(
domain=domain,
status='taken',
rdap_status=rdap_status,
can_register_now=False,
should_monitor=False,
message="Domain was re-registered",
deletion_date=None,
)
# Other status code
logger.warning(f"RDAP returned {resp.status_code} for {domain}")
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message=f"RDAP returned HTTP {resp.status_code}"
)
except httpx.TimeoutException:
logger.warning(f"RDAP timeout for {domain}")
# Rate limited
if data.get("_rate_limited"):
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message="RDAP timeout"
should_monitor=True,
message="Rate limited - will retry later",
check_method="rate_limited",
)
except Exception as e:
logger.warning(f"RDAP error for {domain}: {e}")
# Domain available (404)
if data.get("_available"):
return DropStatus(
domain=domain,
status='unknown',
status='available',
rdap_status=[],
can_register_now=False,
can_register_now=True,
should_monitor=False,
message=str(e)
message="Domain is available for registration!",
check_method=check_method,
)
# Domain exists - parse status
rdap_status = data.get('status', [])
status_lower = ' '.join(str(s).lower() for s in rdap_status)
# Rate limiting: max requests per second per TLD
RATE_LIMITS = {
'default': 5, # 5 requests per second
'ch': 10, # Swiss registry is faster
'li': 10,
}
# Extract deletion date from events
deletion_date = None
events = data.get('events', [])
for event in events:
action = event.get('eventAction', '').lower()
date_str = event.get('eventDate', '')
if action in ('deletion', 'expiration') and date_str:
try:
deletion_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except (ValueError, TypeError):
pass
# Check for pending delete / redemption status
is_pending = any(x in status_lower for x in [
'pending delete', 'pendingdelete',
'pending purge', 'pendingpurge',
'redemption period', 'redemptionperiod',
'pending restore', 'pendingrestore',
'pending renewal', 'pendingrenewal',
])
if is_pending:
return DropStatus(
domain=domain,
status='dropping_soon',
rdap_status=rdap_status,
can_register_now=False,
should_monitor=True,
message="Domain is being deleted. Track it to get notified!",
deletion_date=deletion_date,
check_method=check_method,
)
# Domain is actively registered
return DropStatus(
domain=domain,
status='taken',
rdap_status=rdap_status,
can_register_now=False,
should_monitor=False,
message="Domain was re-registered",
deletion_date=None,
check_method=check_method,
)
async def check_drops_batch(
domains: list[tuple[int, str]], # List of (id, full_domain)
delay_between_requests: float = 0.2, # 200ms = 5 req/s
domains: list[tuple[int, str]],
delay_between_requests: float = RATE_LIMIT_DELAY,
max_concurrent: int = 3,
) -> list[tuple[int, DropStatus]]:
"""
Check multiple drops with rate limiting.
Check multiple drops with rate limiting and concurrency control.
Args:
domains: List of (drop_id, full_domain) tuples
delay_between_requests: Seconds to wait between requests (default 200ms)
delay_between_requests: Seconds to wait between requests
max_concurrent: Maximum concurrent requests
Returns:
List of (drop_id, DropStatus) tuples
"""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
for drop_id, domain in domains:
try:
status = await check_drop_status(domain)
results.append((drop_id, status))
except Exception as e:
logger.error(f"Batch check failed for {domain}: {e}")
results.append((drop_id, DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message=str(e),
)))
async def check_with_semaphore(drop_id: int, domain: str) -> tuple[int, DropStatus]:
async with semaphore:
try:
status = await check_drop_status(domain)
await asyncio.sleep(delay_between_requests)
return (drop_id, status)
except Exception as e:
logger.error(f"Batch check failed for {domain}: {e}")
return (drop_id, DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message=str(e),
check_method="error",
))
# Rate limit
await asyncio.sleep(delay_between_requests)
# Run with limited concurrency
tasks = [check_with_semaphore(drop_id, domain) for drop_id, domain in domains]
results = await asyncio.gather(*tasks)
return results
return list(results)