pounce/backend/app/services/drop_status_checker.py
Yves Gugger 0729c2426a
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Deploy: 2025-12-19 12:41
2025-12-19 12:41:46 +01:00

248 lines
8.3 KiB
Python

"""
Drop Status Checker
====================
Dedicated RDAP checker for dropped domains.
Correctly identifies pending_delete, redemption, and available status.
Also extracts deletion date for countdown display.
"""
import httpx
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
# RDAP endpoints for different TLDs
RDAP_ENDPOINTS = {
# ccTLDs
'ch': 'https://rdap.nic.ch/domain/',
'li': 'https://rdap.nic.ch/domain/',
'de': 'https://rdap.denic.de/domain/',
# gTLDs via CentralNic
'online': 'https://rdap.centralnic.com/online/domain/',
'xyz': 'https://rdap.centralnic.com/xyz/domain/',
'club': 'https://rdap.nic.club/domain/',
# gTLDs via Afilias/Donuts
'info': 'https://rdap.afilias.net/rdap/info/domain/',
'biz': 'https://rdap.afilias.net/rdap/biz/domain/',
'org': 'https://rdap.publicinterestregistry.org/rdap/org/domain/',
# Google TLDs
'dev': 'https://rdap.nic.google/domain/',
'app': 'https://rdap.nic.google/domain/',
}
# Typical deletion periods after "pending delete" status (in days)
TLD_DELETE_PERIODS = {
'ch': 40, # .ch has ~40 day redemption
'li': 40,
'com': 35, # 30 redemption + 5 pending delete
'net': 35,
'org': 35,
'info': 35,
'biz': 35,
'online': 35,
'xyz': 35,
'club': 35,
'dev': 35,
'app': 35,
}
@dataclass
class DropStatus:
"""Status of a dropped domain."""
domain: str
status: str # 'available', 'dropping_soon', 'taken', 'unknown'
rdap_status: list[str]
can_register_now: bool
should_monitor: bool
message: str
deletion_date: Optional[str] = None # ISO format date when domain will be deleted
estimated_available: Optional[str] = None # Estimated date when registrable
def _parse_date(date_str: str) -> Optional[datetime]:
"""Parse ISO date string to datetime."""
if not date_str:
return None
try:
# Handle various formats
for fmt in ['%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d']:
try:
return datetime.strptime(date_str[:19].replace('Z', ''), fmt.replace('Z', ''))
except ValueError:
continue
return None
except Exception:
return None
async def check_drop_status(domain: str) -> DropStatus:
"""
Check the real status of a dropped domain via RDAP.
Returns:
DropStatus with one of:
- 'available': Domain can be registered NOW
- 'dropping_soon': Domain is in pending delete/redemption (monitor it!)
- 'taken': Domain was re-registered
- 'unknown': Could not determine status
"""
tld = domain.split('.')[-1].lower()
endpoint = RDAP_ENDPOINTS.get(tld)
if not endpoint:
logger.warning(f"No RDAP endpoint for .{tld}, returning unknown")
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message=f"No RDAP endpoint for .{tld}"
)
url = f"{endpoint}{domain}"
try:
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; PounceBot/1.0; +https://pounce.ch)',
'Accept': 'application/rdap+json, application/json',
}
async with httpx.AsyncClient(timeout=10, headers=headers) as client:
resp = await client.get(url)
# 404 = Domain not found = AVAILABLE!
if resp.status_code == 404:
return DropStatus(
domain=domain,
status='available',
rdap_status=[],
can_register_now=True,
should_monitor=False,
message="Available now!"
)
# 200 = Domain exists in registry
if resp.status_code == 200:
data = resp.json()
rdap_status = data.get('status', [])
status_lower = ' '.join(str(s).lower() for s in rdap_status)
# Extract deletion date from events
deletion_date = None
expiration_date = None
events = data.get('events', [])
for event in events:
action = event.get('eventAction', '').lower()
date_str = event.get('eventDate', '')
if 'deletion' in action:
deletion_date = date_str[:10] if date_str else None
elif 'expiration' in action:
expiration_date = date_str[:10] if date_str else None
# Check for pending delete / redemption status
is_pending = any(x in status_lower for x in [
'pending delete', 'pendingdelete',
'pending purge', 'pendingpurge',
'redemption period', 'redemptionperiod',
'pending restore', 'pendingrestore',
])
if is_pending:
# Calculate estimated availability
estimated = None
if deletion_date:
try:
del_dt = datetime.strptime(deletion_date, '%Y-%m-%d')
# For most TLDs, domain is available shortly after deletion date
estimated = del_dt.strftime('%Y-%m-%d')
except Exception:
pass
return DropStatus(
domain=domain,
status='dropping_soon',
rdap_status=rdap_status,
can_register_now=False,
should_monitor=True,
message="Dropping soon - track to get notified!",
deletion_date=deletion_date,
estimated_available=estimated,
)
# Domain is actively registered
return DropStatus(
domain=domain,
status='taken',
rdap_status=rdap_status,
can_register_now=False,
should_monitor=False,
message="Re-registered"
)
# Other status code
logger.warning(f"RDAP returned {resp.status_code} for {domain}")
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message=f"RDAP {resp.status_code}"
)
except httpx.TimeoutException:
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message="Timeout"
)
except Exception as e:
logger.warning(f"RDAP error for {domain}: {e}")
return DropStatus(
domain=domain,
status='unknown',
rdap_status=[],
can_register_now=False,
should_monitor=False,
message=str(e)
)
async def batch_check_drops(domains: list[tuple[int, str]], delay: float = 0.5) -> list[tuple[int, DropStatus]]:
"""
Check status for multiple domains with rate limiting.
Args:
domains: List of (id, domain_name) tuples
delay: Delay between checks in seconds (to avoid rate limiting)
Returns:
List of (id, DropStatus) tuples
"""
import asyncio
results = []
# Process sequentially with delay to avoid rate limiting (429)
for i, (drop_id, domain) in enumerate(domains):
status = await check_drop_status(domain)
results.append((drop_id, status))
# Add delay between requests to avoid rate limiting
if i < len(domains) - 1:
await asyncio.sleep(delay)
# Log progress every 50 domains
if (i + 1) % 50 == 0:
logger.info(f"Checked {i + 1}/{len(domains)} drops...")
return results