Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
730 lines
28 KiB
Python
730 lines
28 KiB
Python
"""
|
|
Advanced Domain Availability Checker
|
|
|
|
Uses multiple methods for maximum accuracy:
|
|
1. RDAP (Registration Data Access Protocol) - Modern, accurate, JSON format
|
|
2. Custom RDAP endpoints (for TLDs like .ch, .li with own RDAP servers)
|
|
3. DNS lookup - Fast availability check
|
|
4. WHOIS - Fallback for TLDs without RDAP
|
|
|
|
Performance optimized with caching and async operations.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
from functools import lru_cache
|
|
|
|
import dns.resolver
|
|
import whois
|
|
import whodap
|
|
import httpx
|
|
|
|
from app.models.domain import DomainStatus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class DomainCheckResult:
|
|
"""Result of a domain availability check."""
|
|
domain: str
|
|
status: DomainStatus
|
|
is_available: bool
|
|
registrar: Optional[str] = None
|
|
expiration_date: Optional[datetime] = None
|
|
creation_date: Optional[datetime] = None
|
|
updated_date: Optional[datetime] = None
|
|
name_servers: Optional[list[str]] = None
|
|
error_message: Optional[str] = None
|
|
check_method: str = "unknown" # rdap, whois, dns
|
|
raw_data: Optional[dict] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary."""
|
|
return {
|
|
"domain": self.domain,
|
|
"status": self.status.value,
|
|
"is_available": self.is_available,
|
|
"registrar": self.registrar,
|
|
"expiration_date": self.expiration_date.isoformat() if self.expiration_date else None,
|
|
"creation_date": self.creation_date.isoformat() if self.creation_date else None,
|
|
"updated_date": self.updated_date.isoformat() if self.updated_date else None,
|
|
"name_servers": self.name_servers,
|
|
"error_message": self.error_message,
|
|
"check_method": self.check_method,
|
|
}
|
|
|
|
|
|
class DomainChecker:
|
|
"""
|
|
Advanced domain availability checker.
|
|
|
|
Priority: RDAP > DNS > WHOIS
|
|
"""
|
|
|
|
# TLDs known to support RDAP via whodap library
|
|
RDAP_SUPPORTED_TLDS = {
|
|
'com', 'net', 'org', 'info', 'biz', 'mobi', 'name', 'pro',
|
|
'app', 'dev', 'page', 'new', 'day', 'eat', 'fly', 'how',
|
|
'io', 'co', 'ai', 'me', 'tv', 'cc', 'ws',
|
|
'xyz', 'top', 'site', 'online', 'tech', 'store', 'club',
|
|
'de', 'uk', 'fr', 'nl', 'eu', 'be', 'at', 'us',
|
|
}
|
|
|
|
# TLDs with custom RDAP endpoints (not in whodap but have their own RDAP servers)
|
|
# These registries have their own RDAP APIs that we query directly
|
|
CUSTOM_RDAP_ENDPOINTS = {
|
|
'ch': 'https://rdap.nic.ch/domain/', # Swiss .ch domains (SWITCH)
|
|
'li': 'https://rdap.nic.ch/domain/', # Liechtenstein .li (same registry)
|
|
'de': 'https://rdap.denic.de/domain/', # German .de domains (DENIC)
|
|
}
|
|
|
|
# TLDs that only support WHOIS (no RDAP at all)
|
|
# Note: .ch and .li removed - they have custom RDAP!
|
|
WHOIS_ONLY_TLDS = {
|
|
'ru', 'su', 'ua', 'by', 'kz',
|
|
}
|
|
|
|
def __init__(self):
|
|
"""Initialize the domain checker."""
|
|
self._dns_resolver = dns.resolver.Resolver()
|
|
self._dns_resolver.timeout = 3
|
|
self._dns_resolver.lifetime = 5
|
|
self._cache = {}
|
|
self._cache_ttl = 300 # 5 minutes
|
|
|
|
def _normalize_domain(self, domain: str) -> str:
|
|
"""Normalize domain name."""
|
|
domain = domain.lower().strip()
|
|
if domain.startswith('http://'):
|
|
domain = domain[7:]
|
|
elif domain.startswith('https://'):
|
|
domain = domain[8:]
|
|
if domain.startswith('www.'):
|
|
domain = domain[4:]
|
|
domain = domain.split('/')[0]
|
|
return domain
|
|
|
|
def _get_tld(self, domain: str) -> str:
|
|
"""Extract TLD from domain."""
|
|
parts = domain.split('.')
|
|
return parts[-1].lower() if parts else ''
|
|
|
|
def _get_sld(self, domain: str) -> str:
|
|
"""Extract second-level domain (without TLD)."""
|
|
parts = domain.split('.')
|
|
return parts[0] if parts else domain
|
|
|
|
def _parse_datetime(self, date_str: str) -> Optional[datetime]:
|
|
"""Parse various datetime formats."""
|
|
if not date_str:
|
|
return None
|
|
|
|
# Common formats
|
|
formats = [
|
|
"%Y-%m-%dT%H:%M:%SZ",
|
|
"%Y-%m-%dT%H:%M:%S.%fZ",
|
|
"%Y-%m-%dT%H:%M:%S%z",
|
|
"%Y-%m-%dT%H:%M:%S.%f%z",
|
|
"%Y-%m-%d",
|
|
]
|
|
|
|
for fmt in formats:
|
|
try:
|
|
return datetime.strptime(date_str.replace('+00:00', 'Z').replace('.000Z', 'Z'), fmt)
|
|
except ValueError:
|
|
continue
|
|
|
|
# Try ISO format
|
|
try:
|
|
# Handle formats like "2028-09-14T07:00:00.000+00:00"
|
|
if '+' in date_str:
|
|
date_str = date_str.split('+')[0] + '+00:00'
|
|
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
|
|
except:
|
|
pass
|
|
|
|
return None
|
|
|
|
async def _check_custom_rdap(self, domain: str) -> Optional[DomainCheckResult]:
|
|
"""
|
|
Check domain using custom RDAP endpoints (e.g., nic.ch for .ch/.li domains).
|
|
|
|
These are registries that have their own RDAP servers not covered by whodap.
|
|
"""
|
|
tld = self._get_tld(domain)
|
|
|
|
if tld not in self.CUSTOM_RDAP_ENDPOINTS:
|
|
return None
|
|
|
|
endpoint = self.CUSTOM_RDAP_ENDPOINTS[tld]
|
|
url = f"{endpoint}{domain}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(url, follow_redirects=True)
|
|
|
|
if response.status_code == 404:
|
|
# Domain not found = available
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.AVAILABLE,
|
|
is_available=True,
|
|
check_method="rdap_custom",
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
# Domain exists = taken
|
|
data = response.json()
|
|
|
|
# Extract dates from events
|
|
expiration_date = None
|
|
creation_date = None
|
|
updated_date = None
|
|
registrar = None
|
|
name_servers = []
|
|
|
|
# Parse events - different registries use different event actions
|
|
# SWITCH (.ch/.li): uses "expiration"
|
|
# DENIC (.de): uses "last changed" but no expiration in RDAP (only WHOIS)
|
|
events = data.get('events', [])
|
|
for event in events:
|
|
action = event.get('eventAction', '').lower()
|
|
date_str = event.get('eventDate', '')
|
|
|
|
# Expiration date - check multiple variations
|
|
if not expiration_date:
|
|
if any(x in action for x in ['expiration', 'expire']):
|
|
expiration_date = self._parse_datetime(date_str)
|
|
|
|
# Creation/registration date
|
|
if not creation_date:
|
|
if any(x in action for x in ['registration', 'created']):
|
|
creation_date = self._parse_datetime(date_str)
|
|
|
|
# Update date
|
|
if any(x in action for x in ['changed', 'update', 'last changed']):
|
|
updated_date = self._parse_datetime(date_str)
|
|
|
|
# Parse nameservers
|
|
nameservers = data.get('nameservers', [])
|
|
for ns in nameservers:
|
|
if isinstance(ns, dict):
|
|
ns_name = ns.get('ldhName', '')
|
|
if ns_name:
|
|
name_servers.append(ns_name.lower())
|
|
|
|
# Parse registrar from entities - check multiple roles
|
|
entities = data.get('entities', [])
|
|
for entity in entities:
|
|
roles = entity.get('roles', [])
|
|
# Look for registrar or technical contact as registrar source
|
|
if any(r in roles for r in ['registrar', 'technical']):
|
|
# Try vcardArray first
|
|
vcard = entity.get('vcardArray', [])
|
|
if isinstance(vcard, list) and len(vcard) > 1:
|
|
for item in vcard[1]:
|
|
if isinstance(item, list) and len(item) > 3:
|
|
if item[0] in ('fn', 'org') and item[3]:
|
|
registrar = str(item[3])
|
|
break
|
|
# Try handle as fallback
|
|
if not registrar:
|
|
handle = entity.get('handle', '')
|
|
if handle:
|
|
registrar = handle
|
|
if registrar:
|
|
break
|
|
|
|
# For .de domains: DENIC doesn't expose expiration via RDAP
|
|
# We need to use WHOIS as fallback for expiration date
|
|
if tld == 'de' and not expiration_date:
|
|
logger.debug(f"No expiration in RDAP for {domain}, will try WHOIS")
|
|
# Return what we have, scheduler will update via WHOIS later
|
|
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.TAKEN,
|
|
is_available=False,
|
|
registrar=registrar,
|
|
expiration_date=expiration_date,
|
|
creation_date=creation_date,
|
|
updated_date=updated_date,
|
|
name_servers=name_servers if name_servers else None,
|
|
check_method="rdap_custom",
|
|
)
|
|
|
|
# Other status codes - try fallback
|
|
logger.warning(f"Custom RDAP returned {response.status_code} for {domain}")
|
|
return None
|
|
|
|
except httpx.TimeoutException:
|
|
logger.warning(f"Custom RDAP timeout for {domain}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Custom RDAP error for {domain}: {e}")
|
|
return None
|
|
|
|
async def _check_rdap(self, domain: str) -> Optional[DomainCheckResult]:
|
|
"""
|
|
Check domain using RDAP (Registration Data Access Protocol).
|
|
|
|
Returns None if RDAP is not available for this TLD.
|
|
"""
|
|
tld = self._get_tld(domain)
|
|
sld = self._get_sld(domain)
|
|
|
|
try:
|
|
# Run RDAP lookup in thread pool
|
|
loop = asyncio.get_event_loop()
|
|
response = await loop.run_in_executor(
|
|
None,
|
|
lambda: whodap.lookup_domain(sld, tld)
|
|
)
|
|
|
|
# Parse events for dates
|
|
expiration_date = None
|
|
creation_date = None
|
|
updated_date = None
|
|
registrar = None
|
|
|
|
if response.events:
|
|
for event in response.events:
|
|
# Access event data from __dict__
|
|
event_dict = event.__dict__ if hasattr(event, '__dict__') else {}
|
|
action = event_dict.get('eventAction', '')
|
|
date_str = event_dict.get('eventDate', '')
|
|
|
|
if not action or not date_str:
|
|
continue
|
|
|
|
action_lower = action.lower()
|
|
if 'expiration' in action_lower and not expiration_date:
|
|
expiration_date = self._parse_datetime(date_str)
|
|
elif 'registration' in action_lower and not creation_date:
|
|
creation_date = self._parse_datetime(date_str)
|
|
elif 'changed' in action_lower and 'database' not in action_lower:
|
|
updated_date = self._parse_datetime(date_str)
|
|
|
|
# Extract registrar from entities
|
|
if response.entities:
|
|
for entity in response.entities:
|
|
try:
|
|
entity_dict = entity.__dict__ if hasattr(entity, '__dict__') else {}
|
|
roles = entity_dict.get('roles', [])
|
|
if 'registrar' in roles:
|
|
vcard = entity_dict.get('vcardArray', [])
|
|
if isinstance(vcard, list) and len(vcard) > 1:
|
|
for item in vcard[1]:
|
|
if isinstance(item, list) and len(item) > 3:
|
|
if item[0] == 'fn' and item[3]:
|
|
registrar = str(item[3])
|
|
break
|
|
elif item[0] == 'org' and item[3]:
|
|
registrar = str(item[3])
|
|
except Exception:
|
|
continue
|
|
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.TAKEN,
|
|
is_available=False,
|
|
registrar=registrar,
|
|
expiration_date=expiration_date,
|
|
creation_date=creation_date,
|
|
updated_date=updated_date,
|
|
check_method="rdap",
|
|
)
|
|
|
|
except NotImplementedError:
|
|
# No RDAP server for this TLD
|
|
logger.debug(f"No RDAP server for TLD .{tld}")
|
|
return None
|
|
except Exception as e:
|
|
error_msg = str(e).lower()
|
|
# Check if domain is not found (available)
|
|
if 'not found' in error_msg or '404' in error_msg:
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.AVAILABLE,
|
|
is_available=True,
|
|
check_method="rdap",
|
|
)
|
|
logger.warning(f"RDAP check failed for {domain}: {e}")
|
|
return None
|
|
|
|
async def _check_whois(self, domain: str) -> DomainCheckResult:
|
|
"""Check domain using WHOIS (fallback method)."""
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
w = await loop.run_in_executor(None, whois.whois, domain)
|
|
|
|
# Check if domain is available
|
|
# 1. No domain_name returned
|
|
if w.domain_name is None:
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.AVAILABLE,
|
|
is_available=True,
|
|
check_method="whois",
|
|
)
|
|
|
|
# 2. Check the raw text for "not found" indicators
|
|
raw_text = str(w.text).lower() if hasattr(w, 'text') and w.text else ""
|
|
not_found_indicators = [
|
|
'no match',
|
|
'not found',
|
|
'no entries',
|
|
'no data',
|
|
'status: free',
|
|
'no entry',
|
|
'we do not have an entry',
|
|
'domain not found',
|
|
'is available',
|
|
'available for registration',
|
|
'no object found',
|
|
'object does not exist',
|
|
]
|
|
if any(indicator in raw_text for indicator in not_found_indicators):
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.AVAILABLE,
|
|
is_available=True,
|
|
check_method="whois",
|
|
)
|
|
|
|
# 3. Check if no registrar and no creation date (likely available)
|
|
if not w.registrar and not w.creation_date and not w.name_servers:
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.AVAILABLE,
|
|
is_available=True,
|
|
check_method="whois",
|
|
)
|
|
|
|
# Extract data
|
|
expiration = None
|
|
creation = None
|
|
registrar = None
|
|
name_servers = None
|
|
|
|
if w.expiration_date:
|
|
if isinstance(w.expiration_date, list):
|
|
expiration = w.expiration_date[0]
|
|
else:
|
|
expiration = w.expiration_date
|
|
|
|
if w.creation_date:
|
|
if isinstance(w.creation_date, list):
|
|
creation = w.creation_date[0]
|
|
else:
|
|
creation = w.creation_date
|
|
|
|
if w.registrar:
|
|
registrar = w.registrar if isinstance(w.registrar, str) else str(w.registrar)
|
|
|
|
if w.name_servers:
|
|
if isinstance(w.name_servers, list):
|
|
name_servers = [str(ns).lower() for ns in w.name_servers]
|
|
else:
|
|
name_servers = [str(w.name_servers).lower()]
|
|
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.TAKEN,
|
|
is_available=False,
|
|
registrar=registrar,
|
|
expiration_date=expiration,
|
|
creation_date=creation,
|
|
name_servers=name_servers,
|
|
check_method="whois",
|
|
)
|
|
|
|
except Exception as e:
|
|
# Check if it's a "domain not found" type error (indicates available)
|
|
error_str = str(e).lower()
|
|
not_found_phrases = [
|
|
'no match',
|
|
'not found',
|
|
'no entries',
|
|
'no data',
|
|
'status: free',
|
|
'no entry',
|
|
'we do not have an entry',
|
|
'domain not found',
|
|
'is available',
|
|
'no object found',
|
|
'object does not exist',
|
|
]
|
|
if any(phrase in error_str for phrase in not_found_phrases):
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.AVAILABLE,
|
|
is_available=True,
|
|
check_method="whois",
|
|
)
|
|
# Otherwise it's a real error
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.ERROR,
|
|
is_available=False,
|
|
error_message=str(e),
|
|
check_method="whois",
|
|
)
|
|
|
|
async def _check_dns(self, domain: str) -> bool:
|
|
"""
|
|
Quick DNS check for domain existence.
|
|
|
|
Returns True if domain appears available (no DNS records).
|
|
"""
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
|
|
# Try A record
|
|
try:
|
|
await loop.run_in_executor(
|
|
None,
|
|
lambda: self._dns_resolver.resolve(domain, 'A')
|
|
)
|
|
return False # Has A record = taken
|
|
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers):
|
|
pass
|
|
|
|
# Try NS record
|
|
try:
|
|
await loop.run_in_executor(
|
|
None,
|
|
lambda: self._dns_resolver.resolve(domain, 'NS')
|
|
)
|
|
return False # Has NS record = taken
|
|
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers):
|
|
pass
|
|
|
|
return True # No DNS records = likely available
|
|
|
|
except Exception:
|
|
return True # On error, assume might be available
|
|
|
|
async def check_domain(self, domain: str, quick: bool = False) -> DomainCheckResult:
|
|
"""
|
|
Check domain availability using the best available method.
|
|
|
|
Priority:
|
|
1. Custom RDAP (for TLDs like .ch, .li with own RDAP servers)
|
|
2. RDAP via whodap (most accurate, modern protocol)
|
|
3. WHOIS (fallback for TLDs without RDAP)
|
|
4. DNS (quick check only, or final validation)
|
|
|
|
Args:
|
|
domain: Domain name to check
|
|
quick: If True, only use DNS (faster but less accurate)
|
|
|
|
Returns:
|
|
DomainCheckResult with detailed availability info
|
|
"""
|
|
domain = self._normalize_domain(domain)
|
|
|
|
if not domain or '.' not in domain:
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.ERROR,
|
|
is_available=False,
|
|
error_message="Invalid domain format",
|
|
)
|
|
|
|
tld = self._get_tld(domain)
|
|
|
|
# Quick DNS check
|
|
if quick:
|
|
dns_available = await self._check_dns(domain)
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.AVAILABLE if dns_available else DomainStatus.TAKEN,
|
|
is_available=dns_available,
|
|
check_method="dns",
|
|
)
|
|
|
|
# Priority 1: Try custom RDAP endpoints (for .ch, .li, .de etc.)
|
|
if tld in self.CUSTOM_RDAP_ENDPOINTS:
|
|
custom_result = await self._check_custom_rdap(domain)
|
|
if custom_result:
|
|
# Validate with DNS if custom RDAP says available
|
|
if custom_result.is_available:
|
|
dns_available = await self._check_dns(domain)
|
|
if not dns_available:
|
|
custom_result.status = DomainStatus.TAKEN
|
|
custom_result.is_available = False
|
|
|
|
# If no expiration date from RDAP, try WHOIS as supplement
|
|
# (DENIC .de doesn't expose expiration via RDAP)
|
|
if not custom_result.is_available and not custom_result.expiration_date:
|
|
try:
|
|
whois_result = await self._check_whois(domain)
|
|
if whois_result.expiration_date:
|
|
custom_result.expiration_date = whois_result.expiration_date
|
|
logger.debug(f"Got expiration from WHOIS for {domain}: {whois_result.expiration_date}")
|
|
if not custom_result.registrar and whois_result.registrar:
|
|
custom_result.registrar = whois_result.registrar
|
|
except Exception as e:
|
|
logger.debug(f"WHOIS supplement failed for {domain}: {e}")
|
|
|
|
return custom_result
|
|
# If custom RDAP fails, fall through to DNS check
|
|
logger.info(f"Custom RDAP failed for {domain}, using DNS fallback")
|
|
|
|
# Priority 2: Try standard RDAP via whodap
|
|
if tld not in self.WHOIS_ONLY_TLDS and tld not in self.CUSTOM_RDAP_ENDPOINTS:
|
|
rdap_result = await self._check_rdap(domain)
|
|
if rdap_result:
|
|
# Validate with DNS if RDAP says available
|
|
if rdap_result.is_available:
|
|
dns_available = await self._check_dns(domain)
|
|
if not dns_available:
|
|
rdap_result.status = DomainStatus.TAKEN
|
|
rdap_result.is_available = False
|
|
return rdap_result
|
|
|
|
# Priority 3: Fall back to WHOIS (skip for TLDs that block it like .ch)
|
|
if tld not in self.CUSTOM_RDAP_ENDPOINTS:
|
|
whois_result = await self._check_whois(domain)
|
|
|
|
# Validate with DNS
|
|
if whois_result.is_available:
|
|
dns_available = await self._check_dns(domain)
|
|
if not dns_available:
|
|
whois_result.status = DomainStatus.TAKEN
|
|
whois_result.is_available = False
|
|
|
|
return whois_result
|
|
|
|
# Final fallback: DNS-only check (for TLDs where everything else failed)
|
|
dns_available = await self._check_dns(domain)
|
|
return DomainCheckResult(
|
|
domain=domain,
|
|
status=DomainStatus.AVAILABLE if dns_available else DomainStatus.TAKEN,
|
|
is_available=dns_available,
|
|
check_method="dns",
|
|
)
|
|
|
|
async def check_multiple(self, domains: list[str], quick: bool = False) -> list[DomainCheckResult]:
|
|
"""
|
|
Check multiple domains concurrently.
|
|
|
|
Args:
|
|
domains: List of domain names
|
|
quick: Use quick DNS-only check
|
|
|
|
Returns:
|
|
List of DomainCheckResult
|
|
"""
|
|
tasks = [self.check_domain(d, quick=quick) for d in domains]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
def validate_domain(self, domain: str) -> tuple[bool, str]:
|
|
"""
|
|
Validate domain format.
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
domain = self._normalize_domain(domain)
|
|
|
|
if not domain:
|
|
return False, "Domain cannot be empty"
|
|
|
|
if '.' not in domain:
|
|
return False, "Domain must include TLD (e.g., .com)"
|
|
|
|
parts = domain.split('.')
|
|
|
|
for part in parts:
|
|
if not part:
|
|
return False, "Invalid domain format"
|
|
if len(part) > 63:
|
|
return False, "Domain label too long (max 63 characters)"
|
|
if not all(c.isalnum() or c == '-' for c in part):
|
|
return False, "Domain contains invalid characters"
|
|
if part.startswith('-') or part.endswith('-'):
|
|
return False, "Domain labels cannot start or end with hyphen"
|
|
|
|
if len(domain) > 253:
|
|
return False, "Domain name too long (max 253 characters)"
|
|
|
|
return True, ""
|
|
|
|
|
|
# Singleton instance
|
|
domain_checker = DomainChecker()
|
|
|
|
|
|
async def check_all_domains(db):
|
|
"""
|
|
Check availability of all watched domains.
|
|
|
|
This is triggered manually from admin panel or by scheduled job.
|
|
"""
|
|
from app.models.domain import Domain, DomainCheck
|
|
from sqlalchemy import select
|
|
|
|
logger.info("Starting check for all watched domains...")
|
|
|
|
# Get all domains
|
|
result = await db.execute(select(Domain))
|
|
domains = result.scalars().all()
|
|
|
|
if not domains:
|
|
logger.info("No domains to check")
|
|
return {"checked": 0, "available": 0, "taken": 0, "errors": 0}
|
|
|
|
checked = 0
|
|
available = 0
|
|
taken = 0
|
|
errors = 0
|
|
|
|
for domain_obj in domains:
|
|
try:
|
|
check_result = await domain_checker.check_domain(domain_obj.domain)
|
|
|
|
# Update domain status
|
|
domain_obj.status = check_result.status.value
|
|
domain_obj.is_available = check_result.is_available
|
|
domain_obj.last_checked = datetime.utcnow()
|
|
|
|
if check_result.expiration_date:
|
|
domain_obj.expiration_date = check_result.expiration_date
|
|
|
|
# Create check record
|
|
domain_check = DomainCheck(
|
|
domain_id=domain_obj.id,
|
|
status=check_result.status.value,
|
|
is_available=check_result.is_available,
|
|
check_method=check_result.check_method,
|
|
)
|
|
db.add(domain_check)
|
|
|
|
checked += 1
|
|
if check_result.is_available:
|
|
available += 1
|
|
else:
|
|
taken += 1
|
|
|
|
logger.info(f"Checked {domain_obj.domain}: {check_result.status.value}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking {domain_obj.domain}: {e}")
|
|
errors += 1
|
|
|
|
await db.commit()
|
|
|
|
logger.info(f"Domain check complete: {checked} checked, {available} available, {taken} taken, {errors} errors")
|
|
|
|
return {
|
|
"checked": checked,
|
|
"available": available,
|
|
"taken": taken,
|
|
"errors": errors,
|
|
}
|