244 lines
7.6 KiB
Python
244 lines
7.6 KiB
Python
"""
|
|
Drop Status Checker
|
|
====================
|
|
Dedicated RDAP checker for dropped domains.
|
|
Correctly identifies pending_delete, redemption, and available status.
|
|
Extracts deletion date for countdown display.
|
|
|
|
Uses IANA Bootstrap (rdap.org) as universal fallback for all TLDs.
|
|
"""
|
|
|
|
import asyncio
|
|
import httpx
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from app.services.http_client_pool import get_rdap_http_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ============================================================================
|
|
# RDAP CONFIGURATION
|
|
# ============================================================================
|
|
|
|
# Preferred direct endpoints (faster, more reliable)
|
|
PREFERRED_ENDPOINTS = {
|
|
'ch': 'https://rdap.nic.ch/domain/',
|
|
'li': 'https://rdap.nic.ch/domain/',
|
|
'de': 'https://rdap.denic.de/domain/',
|
|
}
|
|
|
|
# IANA Bootstrap - works for ALL TLDs (redirects to correct registry)
|
|
IANA_BOOTSTRAP = 'https://rdap.org/domain/'
|
|
|
|
# Rate limiting settings
|
|
RDAP_TIMEOUT = 15 # seconds
|
|
RATE_LIMIT_DELAY = 0.3 # 300ms between requests = ~3 req/s
|
|
|
|
|
|
@dataclass
|
|
class DropStatus:
|
|
"""Status of a dropped domain."""
|
|
domain: str
|
|
status: str # 'available', 'dropping_soon', 'taken', 'unknown'
|
|
rdap_status: list[str]
|
|
can_register_now: bool
|
|
should_monitor: bool
|
|
message: str
|
|
deletion_date: Optional[datetime] = None
|
|
check_method: str = "rdap"
|
|
|
|
|
|
async def _make_rdap_request(client: httpx.AsyncClient, url: str, domain: str) -> Optional[dict]:
|
|
"""Make a single RDAP request with proper error handling."""
|
|
try:
|
|
resp = await client.get(url, timeout=RDAP_TIMEOUT)
|
|
|
|
if resp.status_code == 404:
|
|
# Domain not found = available
|
|
return {"_available": True, "_status_code": 404}
|
|
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
data["_status_code"] = 200
|
|
return data
|
|
|
|
if resp.status_code == 429:
|
|
logger.warning(f"RDAP rate limited for {domain}")
|
|
return {"_rate_limited": True, "_status_code": 429}
|
|
|
|
logger.warning(f"RDAP returned {resp.status_code} for {domain}")
|
|
return None
|
|
|
|
except httpx.TimeoutException:
|
|
logger.debug(f"RDAP timeout for {domain} at {url}")
|
|
return None
|
|
except Exception as e:
|
|
logger.debug(f"RDAP error for {domain}: {e}")
|
|
return None
|
|
|
|
|
|
async def check_drop_status(domain: str) -> DropStatus:
|
|
"""
|
|
Check the real status of a dropped domain via RDAP.
|
|
|
|
Strategy:
|
|
1. Try preferred direct endpoint (if available for TLD)
|
|
2. Fall back to IANA Bootstrap (works for all TLDs)
|
|
|
|
Returns:
|
|
DropStatus with one of:
|
|
- 'available': Domain can be registered NOW
|
|
- 'dropping_soon': Domain is in pending delete/redemption
|
|
- 'taken': Domain was re-registered
|
|
- 'unknown': Could not determine status
|
|
"""
|
|
tld = domain.split('.')[-1].lower()
|
|
|
|
# Try preferred endpoint first
|
|
data = None
|
|
check_method = "rdap"
|
|
client = await get_rdap_http_client()
|
|
|
|
if tld in PREFERRED_ENDPOINTS:
|
|
url = f"{PREFERRED_ENDPOINTS[tld]}{domain}"
|
|
data = await _make_rdap_request(client, url, domain)
|
|
check_method = f"rdap_{tld}"
|
|
|
|
# Fall back to IANA Bootstrap if no data yet
|
|
if data is None:
|
|
url = f"{IANA_BOOTSTRAP}{domain}"
|
|
data = await _make_rdap_request(client, url, domain)
|
|
check_method = "rdap_iana"
|
|
|
|
# Still no data? Return unknown
|
|
if data is None:
|
|
return DropStatus(
|
|
domain=domain,
|
|
status='unknown',
|
|
rdap_status=[],
|
|
can_register_now=False,
|
|
should_monitor=True,
|
|
message="RDAP check failed - will retry later",
|
|
check_method="failed",
|
|
)
|
|
|
|
# Rate limited
|
|
if data.get("_rate_limited"):
|
|
return DropStatus(
|
|
domain=domain,
|
|
status='unknown',
|
|
rdap_status=[],
|
|
can_register_now=False,
|
|
should_monitor=True,
|
|
message="Rate limited - will retry later",
|
|
check_method="rate_limited",
|
|
)
|
|
|
|
# Domain available (404)
|
|
if data.get("_available"):
|
|
return DropStatus(
|
|
domain=domain,
|
|
status='available',
|
|
rdap_status=[],
|
|
can_register_now=True,
|
|
should_monitor=False,
|
|
message="Domain is available for registration!",
|
|
check_method=check_method,
|
|
)
|
|
|
|
# Domain exists - parse status
|
|
rdap_status = data.get('status', [])
|
|
status_lower = ' '.join(str(s).lower() for s in rdap_status)
|
|
|
|
# Extract deletion date from events
|
|
deletion_date = None
|
|
events = data.get('events', [])
|
|
for event in events:
|
|
action = event.get('eventAction', '').lower()
|
|
date_str = event.get('eventDate', '')
|
|
if action in ('deletion', 'expiration') and date_str:
|
|
try:
|
|
deletion_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Check for pending delete / redemption status
|
|
is_pending = any(x in status_lower for x in [
|
|
'pending delete', 'pendingdelete',
|
|
'pending purge', 'pendingpurge',
|
|
'redemption period', 'redemptionperiod',
|
|
'pending restore', 'pendingrestore',
|
|
'pending renewal', 'pendingrenewal',
|
|
])
|
|
|
|
if is_pending:
|
|
return DropStatus(
|
|
domain=domain,
|
|
status='dropping_soon',
|
|
rdap_status=rdap_status,
|
|
can_register_now=False,
|
|
should_monitor=True,
|
|
message="Domain is being deleted. Track it to get notified!",
|
|
deletion_date=deletion_date,
|
|
check_method=check_method,
|
|
)
|
|
|
|
# Domain is actively registered
|
|
return DropStatus(
|
|
domain=domain,
|
|
status='taken',
|
|
rdap_status=rdap_status,
|
|
can_register_now=False,
|
|
should_monitor=False,
|
|
message="Domain was re-registered",
|
|
deletion_date=None,
|
|
check_method=check_method,
|
|
)
|
|
|
|
|
|
async def check_drops_batch(
|
|
domains: list[tuple[int, str]],
|
|
delay_between_requests: float = RATE_LIMIT_DELAY,
|
|
max_concurrent: int = 3,
|
|
) -> list[tuple[int, DropStatus]]:
|
|
"""
|
|
Check multiple drops with rate limiting and concurrency control.
|
|
|
|
Args:
|
|
domains: List of (drop_id, full_domain) tuples
|
|
delay_between_requests: Seconds to wait between requests
|
|
max_concurrent: Maximum concurrent requests
|
|
|
|
Returns:
|
|
List of (drop_id, DropStatus) tuples
|
|
"""
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
results = []
|
|
|
|
async def check_with_semaphore(drop_id: int, domain: str) -> tuple[int, DropStatus]:
|
|
async with semaphore:
|
|
try:
|
|
status = await check_drop_status(domain)
|
|
await asyncio.sleep(delay_between_requests)
|
|
return (drop_id, status)
|
|
except Exception as e:
|
|
logger.error(f"Batch check failed for {domain}: {e}")
|
|
return (drop_id, DropStatus(
|
|
domain=domain,
|
|
status='unknown',
|
|
rdap_status=[],
|
|
can_register_now=False,
|
|
should_monitor=False,
|
|
message=str(e),
|
|
check_method="error",
|
|
))
|
|
|
|
# Run with limited concurrency
|
|
tasks = [check_with_semaphore(drop_id, domain) for drop_id, domain in domains]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
return list(results)
|