pounce/backend/app/services/drop_status_checker.py

244 lines
7.6 KiB
Python

"""
Drop Status Checker
====================
Dedicated RDAP checker for dropped domains.
Correctly identifies pending_delete, redemption, and available status.
Extracts deletion date for countdown display.
Uses IANA Bootstrap (rdap.org) as universal fallback for all TLDs.
"""
import asyncio
import httpx
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from app.services.http_client_pool import get_rdap_http_client
logger = logging.getLogger(__name__)
# ============================================================================
# RDAP CONFIGURATION
# ============================================================================
# Preferred direct endpoints (faster, more reliable)
PREFERRED_ENDPOINTS = {
'ch': 'https://rdap.nic.ch/domain/',
'li': 'https://rdap.nic.ch/domain/',
'de': 'https://rdap.denic.de/domain/',
}
# IANA Bootstrap - works for ALL TLDs (redirects to correct registry)
IANA_BOOTSTRAP = 'https://rdap.org/domain/'
# Rate limiting settings
RDAP_TIMEOUT = 15 # seconds
RATE_LIMIT_DELAY = 0.3 # 300ms between requests = ~3 req/s
@dataclass
class DropStatus:
"""Status of a dropped domain."""
domain: str
status: str # 'available', 'dropping_soon', 'taken', 'unknown'
rdap_status: list[str]
can_register_now: bool
should_monitor: bool
message: str
deletion_date: Optional[datetime] = None
check_method: str = "rdap"
async def _make_rdap_request(client: httpx.AsyncClient, url: str, domain: str) -> Optional[dict]:
"""Make a single RDAP request with proper error handling."""
try:
resp = await client.get(url, timeout=RDAP_TIMEOUT)
if resp.status_code == 404:
# Domain not found = available
return {"_available": True, "_status_code": 404}
if resp.status_code == 200:
data = resp.json()
data["_status_code"] = 200
return data
if resp.status_code == 429:
logger.warning(f"RDAP rate limited for {domain}")
return {"_rate_limited": True, "_status_code": 429}
logger.warning(f"RDAP returned {resp.status_code} for {domain}")
return None
except httpx.TimeoutException:
logger.debug(f"RDAP timeout for {domain} at {url}")
return None
except Exception as e:
logger.debug(f"RDAP error for {domain}: {e}")
return None
async def check_drop_status(domain: str) -> DropStatus:
"""
Check the real status of a dropped domain via RDAP.
Strategy:
1. Try preferred direct endpoint (if available for TLD)
2. Fall back to IANA Bootstrap (works for all TLDs)
Returns:
DropStatus with one of:
- 'available': Domain can be registered NOW
- 'dropping_soon': Domain is in pending delete/redemption
- 'taken': Domain was re-registered
- 'unknown': Could not determine status
"""
tld = domain.split('.')[-1].lower()
# Try preferred endpoint first
data = None
check_method = "rdap"
client = await get_rdap_http_client()
if tld in PREFERRED_ENDPOINTS:
url = f"{PREFERRED_ENDPOINTS[tld]}{domain}"
data = await _make_rdap_request(client, url, domain)
check_method = f"rdap_{tld}"
# Fall back to IANA Bootstrap if no data yet
if data is None:
url = f"{IANA_BOOTSTRAP}{domain}"
data = await _make_rdap_request(client, url, domain)
check_method = "rdap_iana"
# Still no data? Return unknown
if data is None:
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=True,
message="RDAP check failed - will retry later",
check_method="failed",
)
# Rate limited
if data.get("_rate_limited"):
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=True,
message="Rate limited - will retry later",
check_method="rate_limited",
)
# Domain available (404)
if data.get("_available"):
return DropStatus(
domain=domain,
status='available',
rdap_status=[],
can_register_now=True,
should_monitor=False,
message="Domain is available for registration!",
check_method=check_method,
)
# Domain exists - parse status
rdap_status = data.get('status', [])
status_lower = ' '.join(str(s).lower() for s in rdap_status)
# Extract deletion date from events
deletion_date = None
events = data.get('events', [])
for event in events:
action = event.get('eventAction', '').lower()
date_str = event.get('eventDate', '')
if action in ('deletion', 'expiration') and date_str:
try:
deletion_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except (ValueError, TypeError):
pass
# Check for pending delete / redemption status
is_pending = any(x in status_lower for x in [
'pending delete', 'pendingdelete',
'pending purge', 'pendingpurge',
'redemption period', 'redemptionperiod',
'pending restore', 'pendingrestore',
'pending renewal', 'pendingrenewal',
])
if is_pending:
return DropStatus(
domain=domain,
status='dropping_soon',
rdap_status=rdap_status,
can_register_now=False,
should_monitor=True,
message="Domain is being deleted. Track it to get notified!",
deletion_date=deletion_date,
check_method=check_method,
)
# Domain is actively registered
return DropStatus(
domain=domain,
status='taken',
rdap_status=rdap_status,
can_register_now=False,
should_monitor=False,
message="Domain was re-registered",
deletion_date=None,
check_method=check_method,
)
async def check_drops_batch(
domains: list[tuple[int, str]],
delay_between_requests: float = RATE_LIMIT_DELAY,
max_concurrent: int = 3,
) -> list[tuple[int, DropStatus]]:
"""
Check multiple drops with rate limiting and concurrency control.
Args:
domains: List of (drop_id, full_domain) tuples
delay_between_requests: Seconds to wait between requests
max_concurrent: Maximum concurrent requests
Returns:
List of (drop_id, DropStatus) tuples
"""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def check_with_semaphore(drop_id: int, domain: str) -> tuple[int, DropStatus]:
async with semaphore:
try:
status = await check_drop_status(domain)
await asyncio.sleep(delay_between_requests)
return (drop_id, status)
except Exception as e:
logger.error(f"Batch check failed for {domain}: {e}")
return (drop_id, DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message=str(e),
check_method="error",
))
# Run with limited concurrency
tasks = [check_with_semaphore(drop_id, domain) for drop_id, domain in domains]
results = await asyncio.gather(*tasks)
return list(results)