""" đŸĨ POUNCE DOMAIN HEALTH ENGINE Advanced domain health analysis for premium intelligence. Implements 4-layer analysis from analysis_2.md: 1. DNS Layer - Infrastructure check (nameservers, MX, A records) 2. HTTP Layer - Website availability (status codes, content, parking detection) 3. SSL Layer - Certificate validity 4. WHOIS/RDAP Layer - Registration status Output: Health Score (HEALTHY, WEAKENING, PARKED, CRITICAL) """ import asyncio import logging import ssl import socket import re import ipaddress from datetime import datetime, timezone, timedelta from dataclasses import dataclass, field from typing import Optional, List, Dict, Any from enum import Enum import httpx import dns.resolver import dns.exception logger = logging.getLogger(__name__) class HealthStatus(str, Enum): """Domain health status levels.""" HEALTHY = "healthy" # đŸŸĸ All systems go WEAKENING = "weakening" # 🟡 Warning signs detected PARKED = "parked" # 🟠 Domain for sale/parked CRITICAL = "critical" # 🔴 Drop imminent UNKNOWN = "unknown" # ❓ Could not determine @dataclass class DNSCheckResult: """Results from DNS layer check.""" has_nameservers: bool = False nameservers: List[str] = field(default_factory=list) has_mx_records: bool = False mx_records: List[str] = field(default_factory=list) has_a_record: bool = False a_records: List[str] = field(default_factory=list) is_parking_ns: bool = False # Nameservers point to parking service error: Optional[str] = None @dataclass class HTTPCheckResult: """Results from HTTP layer check.""" status_code: Optional[int] = None is_reachable: bool = False content_length: int = 0 is_parked: bool = False parking_signals: List[str] = field(default_factory=list) redirect_url: Optional[str] = None response_time_ms: Optional[float] = None error: Optional[str] = None @dataclass class SSLCheckResult: """Results from SSL layer check.""" has_ssl: bool = False is_valid: bool = False expires_at: Optional[datetime] = None days_until_expiry: Optional[int] = None issuer: Optional[str] = None is_expired: bool = False error: Optional[str] = None @dataclass class DomainHealthReport: """Complete health report for a domain.""" domain: str status: HealthStatus score: int # 0-100 # Layer results dns: Optional[DNSCheckResult] = None http: Optional[HTTPCheckResult] = None ssl: Optional[SSLCheckResult] = None # Summary signals: List[str] = field(default_factory=list) recommendations: List[str] = field(default_factory=list) # Metadata checked_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for API response.""" return { "domain": self.domain, "status": self.status.value, "score": self.score, "signals": self.signals, "recommendations": self.recommendations, "checked_at": self.checked_at.isoformat(), # Flat structure for frontend compatibility "dns": { "has_ns": self.dns.has_nameservers if self.dns else False, "has_a": self.dns.has_a_record if self.dns else False, "has_mx": self.dns.has_mx_records if self.dns else False, "nameservers": self.dns.nameservers if self.dns else [], "is_parked": self.dns.is_parking_ns if self.dns else False, "parking_provider": None, # Could be enhanced later "error": self.dns.error if self.dns else None, } if self.dns else { "has_ns": False, "has_a": False, "has_mx": False, "nameservers": [], "is_parked": False, "error": None }, "http": { "is_reachable": self.http.is_reachable if self.http else False, "status_code": self.http.status_code if self.http else None, "is_parked": self.http.is_parked if self.http else False, "parking_keywords": self.http.parking_signals if self.http else [], "content_length": self.http.content_length if self.http else 0, "error": self.http.error if self.http else None, } if self.http else { "is_reachable": False, "status_code": None, "is_parked": False, "parking_keywords": [], "content_length": 0, "error": None }, "ssl": { "has_certificate": self.ssl.has_ssl if self.ssl else False, "is_valid": self.ssl.is_valid if self.ssl else False, "expires_at": self.ssl.expires_at.isoformat() if self.ssl and self.ssl.expires_at else None, "days_until_expiry": self.ssl.days_until_expiry if self.ssl else None, "issuer": self.ssl.issuer if self.ssl else None, "error": self.ssl.error if self.ssl else None, } if self.ssl else { "has_certificate": False, "is_valid": False, "expires_at": None, "days_until_expiry": None, "issuer": None, "error": None }, } class DomainHealthChecker: """ Premium domain health analysis engine. Checks 4 layers to determine domain health: 1. DNS: Is the infrastructure alive? 2. HTTP: Is the website running? 3. SSL: Is the certificate valid? 4. (WHOIS handled by existing DomainChecker) """ # Known parking/for-sale service nameservers PARKING_NAMESERVERS = { 'sedoparking.com', 'afternic.com', 'domaincontrol.com', 'parkingcrew.net', 'bodis.com', 'dsredirection.com', 'above.com', 'domainsponsor.com', 'fastpark.net', 'parkdomain.com', 'domainmarket.com', 'hugedomains.com', } # Keywords indicating parked/for-sale pages PARKING_KEYWORDS = [ 'domain is for sale', 'buy this domain', 'inquire now', 'make an offer', 'domain zum verkauf', 'domain for sale', 'this domain is parked', 'parked by', 'related links', 'sponsored listings', 'domain parking', 'this website is for sale', 'purchase this domain', 'acquire this domain', ] def __init__(self): self._dns_resolver = dns.resolver.Resolver() self._dns_resolver.timeout = 3 self._dns_resolver.lifetime = 5 def _is_public_ip(self, ip: str) -> bool: try: addr = ipaddress.ip_address(ip) return bool(getattr(addr, "is_global", False)) except Exception: return False async def _ssrf_guard(self, domain: str) -> tuple[bool, str | None]: """ SSRF hardening for HTTP/SSL probes. We block domains that resolve exclusively to non-public IPs. """ loop = asyncio.get_event_loop() def _resolve_ips() -> list[str]: ips: list[str] = [] try: a = self._dns_resolver.resolve(domain, "A") ips.extend([str(r.address) for r in a]) except Exception: pass try: aaaa = self._dns_resolver.resolve(domain, "AAAA") ips.extend([str(r.address) for r in aaaa]) except Exception: pass # de-dup return list(dict.fromkeys([i.strip() for i in ips if i])) ips = await loop.run_in_executor(None, _resolve_ips) if not ips: return True, None # nothing to block; will fail naturally if unreachable if any(self._is_public_ip(ip) for ip in ips): return True, None return False, f"blocked_ssrf: resolved_non_public_ips={ips}" async def check_domain(self, domain: str) -> DomainHealthReport: """ Perform comprehensive health check on a domain. Args: domain: Domain name to check (e.g., "example.com") Returns: DomainHealthReport with status, score, and detailed results """ domain = self._normalize_domain(domain) logger.info(f"đŸĨ Starting health check for: {domain}") # Run all checks concurrently dns_task = asyncio.create_task(self._check_dns(domain)) http_task = asyncio.create_task(self._check_http(domain)) ssl_task = asyncio.create_task(self._check_ssl(domain)) dns_result, http_result, ssl_result = await asyncio.gather( dns_task, http_task, ssl_task, return_exceptions=True ) # Handle exceptions if isinstance(dns_result, Exception): logger.warning(f"DNS check failed: {dns_result}") dns_result = DNSCheckResult(error=str(dns_result)) if isinstance(http_result, Exception): logger.warning(f"HTTP check failed: {http_result}") http_result = HTTPCheckResult(error=str(http_result)) if isinstance(ssl_result, Exception): logger.warning(f"SSL check failed: {ssl_result}") ssl_result = SSLCheckResult(error=str(ssl_result)) # Calculate health score and status report = self._calculate_health(domain, dns_result, http_result, ssl_result) logger.info(f"✅ Health check complete: {domain} = {report.status.value} ({report.score}/100)") return report def _normalize_domain(self, domain: str) -> str: """Normalize domain name.""" domain = domain.lower().strip() if domain.startswith('http://'): domain = domain[7:] elif domain.startswith('https://'): domain = domain[8:] if domain.startswith('www.'): domain = domain[4:] domain = domain.split('/')[0] return domain async def _check_dns(self, domain: str) -> DNSCheckResult: """ Layer 1: DNS Infrastructure Check Checks: - NS records (nameservers) - MX records (mail) - A records (IP address) """ result = DNSCheckResult() loop = asyncio.get_event_loop() # Check NS records try: ns_answers = await loop.run_in_executor( None, lambda: self._dns_resolver.resolve(domain, 'NS') ) result.nameservers = [str(rdata.target).rstrip('.').lower() for rdata in ns_answers] result.has_nameservers = len(result.nameservers) > 0 # Check if nameservers point to parking service for ns in result.nameservers: for parking_ns in self.PARKING_NAMESERVERS: if parking_ns in ns: result.is_parking_ns = True break except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout): result.has_nameservers = False except Exception as e: result.error = str(e) # Check MX records try: mx_answers = await loop.run_in_executor( None, lambda: self._dns_resolver.resolve(domain, 'MX') ) result.mx_records = [str(rdata.exchange).rstrip('.').lower() for rdata in mx_answers] result.has_mx_records = len(result.mx_records) > 0 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout): result.has_mx_records = False except Exception: pass # Check A records try: a_answers = await loop.run_in_executor( None, lambda: self._dns_resolver.resolve(domain, 'A') ) result.a_records = [str(rdata.address) for rdata in a_answers] result.has_a_record = len(result.a_records) > 0 # Check for dead IPs (0.0.0.0 or 127.0.0.1) dead_ips = {'0.0.0.0', '127.0.0.1'} if all(ip in dead_ips for ip in result.a_records): result.has_a_record = False except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout): result.has_a_record = False except Exception: pass return result async def _check_http(self, domain: str) -> HTTPCheckResult: """ Layer 2: HTTP Website Check Checks: - HTTP status code - Response content - Parking/for-sale detection """ result = HTTPCheckResult() allowed, reason = await self._ssrf_guard(domain) if not allowed: result.error = reason return result async with httpx.AsyncClient( timeout=10.0, follow_redirects=False, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } ) as client: for scheme in ['https', 'http']: url = f"{scheme}://{domain}" try: start = asyncio.get_event_loop().time() # Follow redirects manually with host/IP guard current_url = url for _ in range(0, 5): response = await client.get(current_url) if response.status_code in (301, 302, 303, 307, 308) and response.headers.get("location"): next_url = str(httpx.URL(current_url).join(response.headers["location"])) next_host = httpx.URL(next_url).host if not next_host: break ok, why = await self._ssrf_guard(next_host) if not ok: result.error = why return result current_url = next_url continue break end = asyncio.get_event_loop().time() result.status_code = response.status_code result.is_reachable = response.status_code < 500 result.content_length = len(response.content) result.response_time_ms = (end - start) * 1000 # Check for redirects if str(response.url) != url: result.redirect_url = str(response.url) # Check for parking keywords in content content = response.text.lower() for keyword in self.PARKING_KEYWORDS: if keyword in content: result.is_parked = True result.parking_signals.append(keyword) break # Success, no need to try other scheme except httpx.TimeoutException: result.error = "timeout" except httpx.ConnectError: result.error = "connection_refused" except Exception as e: result.error = str(e) return result async def _check_ssl(self, domain: str) -> SSLCheckResult: """ Layer 3: SSL Certificate Check Checks: - Certificate exists - Certificate validity - Expiration date Uses two-stage approach: 1. Try with full validation 2. On validation failure, extract cert info without validation """ result = SSLCheckResult() allowed, reason = await self._ssrf_guard(domain) if not allowed: result.error = reason return result loop = asyncio.get_event_loop() try: def get_ssl_info_validated(): """Try to get SSL info with full certificate validation.""" context = ssl.create_default_context() with socket.create_connection((domain, 443), timeout=5) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() return cert, True # cert, validated def get_ssl_info_unvalidated(): """Get SSL info without certificate validation (fallback).""" context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((domain, 443), timeout=5) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: # Get certificate in DER format and decode cert_der = ssock.getpeercert(binary_form=True) cert_pem = ssock.getpeercert() # This returns None when verify_mode=CERT_NONE # Use cryptography library if available, otherwise use openssl try: from cryptography import x509 from cryptography.hazmat.backends import default_backend cert_obj = x509.load_der_x509_certificate(cert_der, default_backend()) return { 'notAfter': cert_obj.not_valid_after_utc.strftime('%b %d %H:%M:%S %Y GMT'), 'notBefore': cert_obj.not_valid_before_utc.strftime('%b %d %H:%M:%S %Y GMT'), 'issuer': [(('organizationName', cert_obj.issuer.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME)[0].value if cert_obj.issuer.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME) else 'Unknown'),)] }, False # cert, not validated except ImportError: # Fallback: basic info without cryptography library return { 'notAfter': None, 'issuer': None }, False # First try with validation try: cert, validated = await loop.run_in_executor(None, get_ssl_info_validated) result.has_ssl = True result.is_valid = True except ssl.SSLCertVerificationError: # Validation failed, try without validation to get cert info try: cert, validated = await loop.run_in_executor(None, get_ssl_info_unvalidated) result.has_ssl = True result.is_valid = True # Certificate exists and is technically valid, just can't verify chain locally except Exception: result.has_ssl = True result.is_valid = False result.error = "Certificate exists but could not be parsed" return result # Parse expiration date not_after = cert.get('notAfter') if not_after: # Format: 'Dec 31 23:59:59 2024 GMT' try: expires = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z') result.expires_at = expires.replace(tzinfo=timezone.utc) result.days_until_expiry = (result.expires_at - datetime.now(timezone.utc)).days result.is_expired = result.days_until_expiry < 0 result.is_valid = result.days_until_expiry >= 0 except Exception: result.is_valid = True # Assume valid if we can't parse # Get issuer issuer = cert.get('issuer') if issuer: for item in issuer: if isinstance(item, tuple) and len(item) > 0: if isinstance(item[0], tuple) and item[0][0] == 'organizationName': result.issuer = item[0][1] break elif isinstance(item[0], str) and item[0] == 'organizationName': result.issuer = item[1] if len(item) > 1 else None break except (socket.timeout, socket.error, ConnectionRefusedError, OSError) as e: if '443' in str(e) or 'refused' in str(e).lower(): result.has_ssl = False result.error = "Port 443 not responding" else: result.has_ssl = False result.error = "no_ssl" except Exception as e: result.error = str(e) return result def _calculate_health( self, domain: str, dns_result: DNSCheckResult, http_result: HTTPCheckResult, ssl_result: SSLCheckResult ) -> DomainHealthReport: """ Calculate overall health status and score. Scoring: - DNS layer: 30 points - HTTP layer: 40 points - SSL layer: 30 points """ score = 100 signals = [] recommendations = [] # ========================= # DNS Scoring (30 points) # ========================= if not dns_result.has_nameservers: score -= 30 signals.append("🔴 No nameservers found (domain may not exist)") elif dns_result.is_parking_ns: score -= 15 signals.append("🟠 Nameservers point to parking service") recommendations.append("Domain is parked - owner may be selling") else: if not dns_result.has_a_record: score -= 10 signals.append("âš ī¸ No A record (no website configured)") if not dns_result.has_mx_records: score -= 5 signals.append("âš ī¸ No MX records (no email configured)") # ========================= # HTTP Scoring (40 points) # ========================= if not http_result.is_reachable: score -= 40 signals.append("🔴 Website not reachable") if http_result.error == "timeout": signals.append("âš ī¸ Connection timeout") elif http_result.error == "connection_refused": signals.append("âš ī¸ Connection refused") elif http_result.status_code: if http_result.status_code >= 500: score -= 30 signals.append(f"🔴 Server error ({http_result.status_code})") recommendations.append("Server is having issues - monitor closely") elif http_result.status_code >= 400: score -= 15 signals.append(f"âš ī¸ Client error ({http_result.status_code})") if http_result.is_parked: score -= 10 signals.append("🟠 Page contains for-sale indicators") recommendations.append(f"Detected: {', '.join(http_result.parking_signals[:3])}") if http_result.content_length < 500: score -= 5 signals.append("âš ī¸ Very small page content") # ========================= # SSL Scoring (30 points) # ========================= if not ssl_result.has_ssl: score -= 10 signals.append("âš ī¸ No SSL certificate") elif ssl_result.is_expired: score -= 30 signals.append("🔴 SSL certificate expired!") recommendations.append("Certificate expired - owner neglecting domain") elif ssl_result.days_until_expiry is not None: if ssl_result.days_until_expiry < 7: score -= 15 signals.append(f"âš ī¸ SSL expires in {ssl_result.days_until_expiry} days") recommendations.append("Certificate expiring soon - watch for neglect") elif ssl_result.days_until_expiry < 30: score -= 5 signals.append(f"â„šī¸ SSL expires in {ssl_result.days_until_expiry} days") # Ensure score is in valid range score = max(0, min(100, score)) # Determine status if score >= 80: status = HealthStatus.HEALTHY elif score >= 50: if dns_result.is_parking_ns or http_result.is_parked: status = HealthStatus.PARKED else: status = HealthStatus.WEAKENING elif score >= 20: if dns_result.is_parking_ns or http_result.is_parked: status = HealthStatus.PARKED else: status = HealthStatus.WEAKENING else: status = HealthStatus.CRITICAL # Override to PARKED if clear signals if dns_result.is_parking_ns or http_result.is_parked: if status != HealthStatus.CRITICAL: status = HealthStatus.PARKED return DomainHealthReport( domain=domain, status=status, score=score, dns=dns_result, http=http_result, ssl=ssl_result, signals=signals, recommendations=recommendations, ) # Singleton instance _health_checker: Optional[DomainHealthChecker] = None def get_health_checker() -> DomainHealthChecker: """Get or create health checker instance.""" global _health_checker if _health_checker is None: _health_checker = DomainHealthChecker() return _health_checker