pounce/backend/app/services/domain_checker.py
Yves Gugger 7c08e90a56
Some checks failed
Deploy Pounce (Auto) / deploy (push) Has been cancelled
fix: normalize transition timestamps across terminal
Convert timezone-aware datetimes to naive UTC before persisting (prevents Postgres 500s),
add deletion_date migrations, and unify transition countdown + tracked-state across Drops,
Watchlist, and Analyze panel.
2025-12-21 18:14:25 +01:00

842 lines
32 KiB
Python

"""
Advanced Domain Availability Checker
Uses multiple methods for maximum accuracy:
1. RDAP (Registration Data Access Protocol) - Modern, accurate, JSON format
2. Custom RDAP endpoints (for TLDs like .ch, .li with own RDAP servers)
3. DNS lookup - Fast availability check
4. WHOIS - Fallback for TLDs without RDAP
Performance optimized with caching and async operations.
"""
import asyncio
import logging
from datetime import datetime, timezone
from dataclasses import dataclass, field
from typing import Optional
from functools import lru_cache
import dns.resolver
import whois
import whodap
import httpx
from app.models.domain import DomainStatus
from app.services.http_client_pool import get_rdap_http_client
logger = logging.getLogger(__name__)
@dataclass
class DomainCheckResult:
"""Result of a domain availability check."""
domain: str
status: DomainStatus
is_available: bool
registrar: Optional[str] = None
expiration_date: Optional[datetime] = None
creation_date: Optional[datetime] = None
updated_date: Optional[datetime] = None
name_servers: Optional[list[str]] = None
error_message: Optional[str] = None
check_method: str = "unknown" # rdap, whois, dns
raw_data: Optional[dict] = None
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
"domain": self.domain,
"status": self.status.value,
"is_available": self.is_available,
"registrar": self.registrar,
"expiration_date": self.expiration_date.isoformat() if self.expiration_date else None,
"creation_date": self.creation_date.isoformat() if self.creation_date else None,
"updated_date": self.updated_date.isoformat() if self.updated_date else None,
"name_servers": self.name_servers,
"error_message": self.error_message,
"check_method": self.check_method,
}
class DomainChecker:
"""
Advanced domain availability checker.
Priority: RDAP > DNS > WHOIS
"""
# TLDs known to support RDAP via whodap library
RDAP_SUPPORTED_TLDS = {
'com', 'net', 'org', 'info', 'biz', 'mobi', 'name', 'pro',
'app', 'dev', 'page', 'new', 'day', 'eat', 'fly', 'how',
'io', 'co', 'ai', 'me', 'tv', 'cc', 'ws',
'xyz', 'top', 'site', 'online', 'tech', 'store', 'club',
'de', 'uk', 'fr', 'nl', 'eu', 'be', 'at', 'us',
}
# TLDs with preferred direct RDAP endpoints (faster than IANA bootstrap)
CUSTOM_RDAP_ENDPOINTS = {
'ch': 'https://rdap.nic.ch/domain/', # Swiss .ch domains (SWITCH)
'li': 'https://rdap.nic.ch/domain/', # Liechtenstein .li (same registry)
'de': 'https://rdap.denic.de/domain/', # German .de domains (DENIC)
}
# IANA Bootstrap - works for ALL TLDs (redirects to correct registry)
IANA_BOOTSTRAP_URL = 'https://rdap.org/domain/'
# TLDs that only support WHOIS (no RDAP at all - very rare)
WHOIS_ONLY_TLDS = {
'ru', 'su', 'ua', 'by', 'kz',
}
def __init__(self):
"""Initialize the domain checker."""
self._dns_resolver = dns.resolver.Resolver()
self._dns_resolver.timeout = 3
self._dns_resolver.lifetime = 5
self._cache = {}
self._cache_ttl = 300 # 5 minutes
def _normalize_domain(self, domain: str) -> str:
"""Normalize domain name."""
domain = domain.lower().strip()
if domain.startswith('http://'):
domain = domain[7:]
elif domain.startswith('https://'):
domain = domain[8:]
if domain.startswith('www.'):
domain = domain[4:]
domain = domain.split('/')[0]
return domain
def _get_tld(self, domain: str) -> str:
"""Extract TLD from domain."""
parts = domain.split('.')
return parts[-1].lower() if parts else ''
def _get_sld(self, domain: str) -> str:
"""Extract second-level domain (without TLD)."""
parts = domain.split('.')
return parts[0] if parts else domain
def _parse_datetime(self, date_str: str) -> Optional[datetime]:
"""Parse various datetime formats."""
if not date_str:
return None
# Common formats
formats = [
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%d",
]
for fmt in formats:
try:
return datetime.strptime(date_str.replace('+00:00', 'Z').replace('.000Z', 'Z'), fmt)
except ValueError:
continue
# Try ISO format
try:
# Handle formats like "2028-09-14T07:00:00.000+00:00"
if '+' in date_str:
date_str = date_str.split('+')[0] + '+00:00'
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except:
pass
return None
async def _check_custom_rdap(self, domain: str) -> Optional[DomainCheckResult]:
"""
Check domain using custom RDAP endpoints (e.g., nic.ch for .ch/.li domains).
These are registries that have their own RDAP servers not covered by whodap.
"""
tld = self._get_tld(domain)
if tld not in self.CUSTOM_RDAP_ENDPOINTS:
return None
endpoint = self.CUSTOM_RDAP_ENDPOINTS[tld]
url = f"{endpoint}{domain}"
try:
client = await get_rdap_http_client()
response = await client.get(url, timeout=10.0)
if response.status_code == 404:
# Domain not found = available
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE,
is_available=True,
check_method="rdap_custom",
)
if response.status_code == 200:
# Domain exists in registry - but check status for pending delete
data = response.json()
# Check if domain is pending deletion (dropped but not yet purged)
domain_status = data.get("status", [])
pending_delete_statuses = [
"pending delete",
"pendingdelete",
"redemption period",
"redemptionperiod",
"pending purge",
"pendingpurge",
]
is_pending_delete = any(
any(pds in str(s).lower() for pds in pending_delete_statuses)
for s in domain_status
)
if is_pending_delete:
logger.info(
f"{domain} is in transition/pending delete (status: {domain_status})"
)
return DomainCheckResult(
domain=domain,
status=DomainStatus.DROPPING_SOON, # In transition, not yet available
is_available=False, # Not yet registrable
check_method="rdap_custom",
raw_data={"rdap_status": domain_status, "note": "pending_delete"},
)
# Extract dates from events
expiration_date = None
creation_date = None
updated_date = None
registrar = None
name_servers: list[str] = []
# Parse events
events = data.get("events", [])
for event in events:
action = event.get("eventAction", "").lower()
date_str = event.get("eventDate", "")
if not expiration_date and any(x in action for x in ["expiration", "expire"]):
expiration_date = self._parse_datetime(date_str)
if not creation_date and any(x in action for x in ["registration", "created"]):
creation_date = self._parse_datetime(date_str)
if any(x in action for x in ["changed", "update", "last changed"]):
updated_date = self._parse_datetime(date_str)
# Parse nameservers
for ns in data.get("nameservers", []):
if isinstance(ns, dict):
ns_name = ns.get("ldhName", "")
if ns_name:
name_servers.append(ns_name.lower())
# Parse registrar from entities
for entity in data.get("entities", []):
roles = entity.get("roles", [])
if any(r in roles for r in ["registrar", "technical"]):
vcard = entity.get("vcardArray", [])
if isinstance(vcard, list) and len(vcard) > 1:
for item in vcard[1]:
if isinstance(item, list) and len(item) > 3:
if item[0] in ("fn", "org") and item[3]:
registrar = str(item[3])
break
if not registrar:
handle = entity.get("handle", "")
if handle:
registrar = handle
if registrar:
break
# For .de domains: DENIC doesn't expose expiration via RDAP
if tld == "de" and not expiration_date:
logger.debug(f"No expiration in RDAP for {domain}, will try WHOIS")
return DomainCheckResult(
domain=domain,
status=DomainStatus.TAKEN,
is_available=False,
registrar=registrar,
expiration_date=expiration_date,
creation_date=creation_date,
updated_date=updated_date,
name_servers=name_servers if name_servers else None,
check_method="rdap_custom",
)
# Other status codes - try fallback
logger.warning(f"Custom RDAP returned {response.status_code} for {domain}")
return None
except httpx.TimeoutException:
logger.warning(f"Custom RDAP timeout for {domain}")
return None
except Exception as e:
logger.warning(f"Custom RDAP error for {domain}: {e}")
return None
async def _check_rdap_iana(self, domain: str) -> Optional[DomainCheckResult]:
"""
Check domain using IANA Bootstrap RDAP service.
This is the most reliable method as rdap.org automatically
redirects to the correct registry for any TLD.
"""
url = f"{self.IANA_BOOTSTRAP_URL}{domain}"
try:
client = await get_rdap_http_client()
response = await client.get(url, timeout=15.0)
if response.status_code == 404:
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE,
is_available=True,
check_method="rdap_iana",
)
if response.status_code == 429:
logger.warning(f"RDAP rate limited for {domain}")
return None
if response.status_code != 200:
return None
data = response.json()
# Parse events for dates
expiration_date = None
creation_date = None
registrar = None
for event in data.get('events', []):
action = event.get('eventAction', '').lower()
date_str = event.get('eventDate', '')
if 'expiration' in action and date_str:
expiration_date = self._parse_datetime(date_str)
elif 'registration' in action and date_str:
creation_date = self._parse_datetime(date_str)
# Extract registrar
for entity in data.get('entities', []):
roles = entity.get('roles', [])
if 'registrar' in roles:
vcard = entity.get('vcardArray', [])
if isinstance(vcard, list) and len(vcard) > 1:
for item in vcard[1]:
if isinstance(item, list) and len(item) > 3:
if item[0] == 'fn' and item[3]:
registrar = str(item[3])
break
# Check status for pending delete
status_list = data.get('status', [])
status_str = ' '.join(str(s).lower() for s in status_list)
is_dropping = any(x in status_str for x in [
'pending delete', 'pendingdelete',
'redemption period', 'redemptionperiod',
])
if is_dropping:
return DomainCheckResult(
domain=domain,
status=DomainStatus.DROPPING_SOON,
is_available=False,
registrar=registrar,
expiration_date=expiration_date,
creation_date=creation_date,
check_method="rdap_iana",
)
return DomainCheckResult(
domain=domain,
status=DomainStatus.TAKEN,
is_available=False,
registrar=registrar,
expiration_date=expiration_date,
creation_date=creation_date,
check_method="rdap_iana",
)
except httpx.TimeoutException:
logger.debug(f"IANA RDAP timeout for {domain}")
return None
except Exception as e:
logger.debug(f"IANA RDAP error for {domain}: {e}")
return None
async def _check_rdap(self, domain: str) -> Optional[DomainCheckResult]:
"""
Check domain using RDAP (Registration Data Access Protocol) via whodap library.
Returns None if RDAP is not available for this TLD.
"""
tld = self._get_tld(domain)
sld = self._get_sld(domain)
try:
# Run RDAP lookup in thread pool
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: whodap.lookup_domain(sld, tld)
)
# Parse events for dates
expiration_date = None
creation_date = None
updated_date = None
registrar = None
if response.events:
for event in response.events:
event_dict = event.__dict__ if hasattr(event, '__dict__') else {}
action = event_dict.get('eventAction', '')
date_str = event_dict.get('eventDate', '')
if not action or not date_str:
continue
action_lower = action.lower()
if 'expiration' in action_lower and not expiration_date:
expiration_date = self._parse_datetime(date_str)
elif 'registration' in action_lower and not creation_date:
creation_date = self._parse_datetime(date_str)
elif 'changed' in action_lower and 'database' not in action_lower:
updated_date = self._parse_datetime(date_str)
# Extract registrar from entities
if response.entities:
for entity in response.entities:
try:
entity_dict = entity.__dict__ if hasattr(entity, '__dict__') else {}
roles = entity_dict.get('roles', [])
if 'registrar' in roles:
vcard = entity_dict.get('vcardArray', [])
if isinstance(vcard, list) and len(vcard) > 1:
for item in vcard[1]:
if isinstance(item, list) and len(item) > 3:
if item[0] == 'fn' and item[3]:
registrar = str(item[3])
break
elif item[0] == 'org' and item[3]:
registrar = str(item[3])
except Exception:
continue
return DomainCheckResult(
domain=domain,
status=DomainStatus.TAKEN,
is_available=False,
registrar=registrar,
expiration_date=expiration_date,
creation_date=creation_date,
updated_date=updated_date,
check_method="rdap",
)
except NotImplementedError:
logger.debug(f"No RDAP server for TLD .{tld}")
return None
except Exception as e:
error_msg = str(e).lower()
if 'not found' in error_msg or '404' in error_msg:
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE,
is_available=True,
check_method="rdap",
)
logger.debug(f"RDAP check failed for {domain}: {e}")
return None
async def _check_whois(self, domain: str) -> DomainCheckResult:
"""Check domain using WHOIS (fallback method)."""
try:
loop = asyncio.get_event_loop()
w = await loop.run_in_executor(None, whois.whois, domain)
# Check if domain is available
# 1. No domain_name returned
if w.domain_name is None:
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE,
is_available=True,
check_method="whois",
)
# 2. Check the raw text for "not found" indicators
raw_text = str(w.text).lower() if hasattr(w, 'text') and w.text else ""
not_found_indicators = [
'no match',
'not found',
'no entries',
'no data',
'status: free',
'no entry',
'we do not have an entry',
'domain not found',
'is available',
'available for registration',
'no object found',
'object does not exist',
]
if any(indicator in raw_text for indicator in not_found_indicators):
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE,
is_available=True,
check_method="whois",
)
# 3. Check if no registrar and no creation date (likely available)
if not w.registrar and not w.creation_date and not w.name_servers:
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE,
is_available=True,
check_method="whois",
)
# Extract data
expiration = None
creation = None
registrar = None
name_servers = None
if w.expiration_date:
if isinstance(w.expiration_date, list):
expiration = w.expiration_date[0]
else:
expiration = w.expiration_date
if w.creation_date:
if isinstance(w.creation_date, list):
creation = w.creation_date[0]
else:
creation = w.creation_date
if w.registrar:
registrar = w.registrar if isinstance(w.registrar, str) else str(w.registrar)
if w.name_servers:
if isinstance(w.name_servers, list):
name_servers = [str(ns).lower() for ns in w.name_servers]
else:
name_servers = [str(w.name_servers).lower()]
return DomainCheckResult(
domain=domain,
status=DomainStatus.TAKEN,
is_available=False,
registrar=registrar,
expiration_date=expiration,
creation_date=creation,
name_servers=name_servers,
check_method="whois",
)
except Exception as e:
# Check if it's a "domain not found" type error (indicates available)
error_str = str(e).lower()
not_found_phrases = [
'no match',
'not found',
'no entries',
'no data',
'status: free',
'no entry',
'we do not have an entry',
'domain not found',
'is available',
'no object found',
'object does not exist',
]
if any(phrase in error_str for phrase in not_found_phrases):
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE,
is_available=True,
check_method="whois",
)
# Otherwise it's a real error
return DomainCheckResult(
domain=domain,
status=DomainStatus.ERROR,
is_available=False,
error_message=str(e),
check_method="whois",
)
async def _check_dns(self, domain: str) -> bool:
"""
Quick DNS check for domain existence.
Returns True if domain appears available (no DNS records).
"""
try:
loop = asyncio.get_event_loop()
# Try A record
try:
await loop.run_in_executor(
None,
lambda: self._dns_resolver.resolve(domain, 'A')
)
return False # Has A record = taken
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers):
pass
# Try NS record
try:
await loop.run_in_executor(
None,
lambda: self._dns_resolver.resolve(domain, 'NS')
)
return False # Has NS record = taken
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers):
pass
return True # No DNS records = likely available
except Exception:
return True # On error, assume might be available
async def check_domain(self, domain: str, quick: bool = False) -> DomainCheckResult:
"""
Check domain availability using the best available method.
Priority:
1. Custom RDAP (for TLDs like .ch, .li with own RDAP servers)
2. RDAP via whodap (most accurate, modern protocol)
3. WHOIS (fallback for TLDs without RDAP)
4. DNS (quick check only, or final validation)
Args:
domain: Domain name to check
quick: If True, only use DNS (faster but less accurate)
Returns:
DomainCheckResult with detailed availability info
"""
domain = self._normalize_domain(domain)
if not domain or '.' not in domain:
return DomainCheckResult(
domain=domain,
status=DomainStatus.ERROR,
is_available=False,
error_message="Invalid domain format",
)
tld = self._get_tld(domain)
# Quick DNS check
if quick:
dns_available = await self._check_dns(domain)
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE if dns_available else DomainStatus.TAKEN,
is_available=dns_available,
check_method="dns",
)
# Priority 1: Try custom RDAP endpoints (for .ch, .li, .de etc.)
if tld in self.CUSTOM_RDAP_ENDPOINTS:
custom_result = await self._check_custom_rdap(domain)
if custom_result:
# Validate with DNS if custom RDAP says available
if custom_result.is_available:
dns_available = await self._check_dns(domain)
if not dns_available:
custom_result.status = DomainStatus.TAKEN
custom_result.is_available = False
# If no expiration date from RDAP, try WHOIS as supplement
# (DENIC .de doesn't expose expiration via RDAP)
if not custom_result.is_available and not custom_result.expiration_date:
try:
whois_result = await self._check_whois(domain)
if whois_result.expiration_date:
custom_result.expiration_date = whois_result.expiration_date
logger.debug(f"Got expiration from WHOIS for {domain}: {whois_result.expiration_date}")
if not custom_result.registrar and whois_result.registrar:
custom_result.registrar = whois_result.registrar
except Exception as e:
logger.debug(f"WHOIS supplement failed for {domain}: {e}")
return custom_result
# If custom RDAP fails, fall through to DNS check
logger.info(f"Custom RDAP failed for {domain}, using DNS fallback")
# Priority 2: Try IANA Bootstrap RDAP (works for ALL TLDs!)
if tld not in self.WHOIS_ONLY_TLDS and tld not in self.CUSTOM_RDAP_ENDPOINTS:
iana_result = await self._check_rdap_iana(domain)
if iana_result:
# Validate with DNS if RDAP says available
if iana_result.is_available:
dns_available = await self._check_dns(domain)
if not dns_available:
iana_result.status = DomainStatus.TAKEN
iana_result.is_available = False
return iana_result
# Priority 3: Fall back to WHOIS
if tld not in self.CUSTOM_RDAP_ENDPOINTS:
try:
whois_result = await self._check_whois(domain)
# Validate with DNS
if whois_result.is_available:
dns_available = await self._check_dns(domain)
if not dns_available:
whois_result.status = DomainStatus.TAKEN
whois_result.is_available = False
return whois_result
except Exception as e:
logger.debug(f"WHOIS failed for {domain}: {e}")
# Final fallback: DNS-only check
dns_available = await self._check_dns(domain)
return DomainCheckResult(
domain=domain,
status=DomainStatus.AVAILABLE if dns_available else DomainStatus.TAKEN,
is_available=dns_available,
check_method="dns",
)
async def check_multiple(self, domains: list[str], quick: bool = False) -> list[DomainCheckResult]:
"""
Check multiple domains concurrently.
Args:
domains: List of domain names
quick: Use quick DNS-only check
Returns:
List of DomainCheckResult
"""
tasks = [self.check_domain(d, quick=quick) for d in domains]
return await asyncio.gather(*tasks)
def validate_domain(self, domain: str) -> tuple[bool, str]:
"""
Validate domain format.
Returns:
Tuple of (is_valid, error_message)
"""
domain = self._normalize_domain(domain)
if not domain:
return False, "Domain cannot be empty"
if '.' not in domain:
return False, "Domain must include TLD (e.g., .com)"
parts = domain.split('.')
for part in parts:
if not part:
return False, "Invalid domain format"
if len(part) > 63:
return False, "Domain label too long (max 63 characters)"
if not all(c.isalnum() or c == '-' for c in part):
return False, "Domain contains invalid characters"
if part.startswith('-') or part.endswith('-'):
return False, "Domain labels cannot start or end with hyphen"
if len(domain) > 253:
return False, "Domain name too long (max 253 characters)"
return True, ""
# Singleton instance
domain_checker = DomainChecker()
async def check_all_domains(db):
"""
Check availability of all watched domains.
This is triggered manually from admin panel or by scheduled job.
"""
from app.models.domain import Domain, DomainCheck
from sqlalchemy import select
logger.info("Starting check for all watched domains...")
# Get all domains
result = await db.execute(select(Domain))
domains = result.scalars().all()
if not domains:
logger.info("No domains to check")
return {"checked": 0, "available": 0, "taken": 0, "errors": 0}
checked = 0
available = 0
taken = 0
errors = 0
from app.utils.datetime import to_naive_utc
for domain_obj in domains:
try:
check_result = await domain_checker.check_domain(domain_obj.name)
# Update domain status
domain_obj.status = check_result.status
domain_obj.is_available = check_result.is_available
domain_obj.last_checked = datetime.utcnow()
domain_obj.last_check_method = check_result.check_method
if check_result.expiration_date:
domain_obj.expiration_date = to_naive_utc(check_result.expiration_date)
# Create check record
domain_check = DomainCheck(
domain_id=domain_obj.id,
status=check_result.status,
is_available=check_result.is_available,
response_data=str(check_result.to_dict()),
checked_at=datetime.utcnow(),
)
db.add(domain_check)
checked += 1
if check_result.is_available:
available += 1
else:
taken += 1
logger.info(f"Checked {domain_obj.name}: {check_result.status.value}")
except Exception as e:
logger.error(f"Error checking {domain_obj.name}: {e}")
errors += 1
await db.commit()
logger.info(f"Domain check complete: {checked} checked, {available} available, {taken} taken, {errors} errors")
return {
"checked": checked,
"available": available,
"taken": taken,
"errors": errors,
}