🏥 DOMAIN HEALTH ENGINE (from analysis_2.md): - New service: backend/app/services/domain_health.py - 4-layer analysis: 1. DNS: Nameservers, MX records, A records, parking NS detection 2. HTTP: Status codes, content, parking keyword detection 3. SSL: Certificate validity, expiration date, issuer 4. (WHOIS via existing domain_checker) 📊 HEALTH SCORING: - Score 0-100 based on all layers - Status: HEALTHY (🟢), WEAKENING (🟡), PARKED (🟠), CRITICAL (🔴) - Signals and recommendations for each domain 🔌 API ENDPOINTS: - GET /api/v1/domains/{id}/health - Full health report - POST /api/v1/domains/health-check?domain=x - Quick check any domain 🔐 PASSWORD RESET: - New script: backend/scripts/reset_admin_password.py - guggeryves@hotmail.com password: Pounce2024! PARKING DETECTION: - Known parking nameservers (Sedo, Afternic, etc.) - Page content keywords ('buy this domain', 'for sale', etc.)
522 lines
19 KiB
Python
522 lines
19 KiB
Python
"""
|
||
🏥 POUNCE DOMAIN HEALTH ENGINE
|
||
|
||
Advanced domain health analysis for premium intelligence.
|
||
|
||
Implements 4-layer analysis from analysis_2.md:
|
||
1. DNS Layer - Infrastructure check (nameservers, MX, A records)
|
||
2. HTTP Layer - Website availability (status codes, content, parking detection)
|
||
3. SSL Layer - Certificate validity
|
||
4. WHOIS/RDAP Layer - Registration status
|
||
|
||
Output: Health Score (HEALTHY, WEAKENING, PARKED, CRITICAL)
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import ssl
|
||
import socket
|
||
import re
|
||
from datetime import datetime, timezone, timedelta
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional, List, Dict, Any
|
||
from enum import Enum
|
||
|
||
import httpx
|
||
import dns.resolver
|
||
import dns.exception
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class HealthStatus(str, Enum):
|
||
"""Domain health status levels."""
|
||
HEALTHY = "healthy" # 🟢 All systems go
|
||
WEAKENING = "weakening" # 🟡 Warning signs detected
|
||
PARKED = "parked" # 🟠 Domain for sale/parked
|
||
CRITICAL = "critical" # 🔴 Drop imminent
|
||
UNKNOWN = "unknown" # ❓ Could not determine
|
||
|
||
|
||
@dataclass
|
||
class DNSCheckResult:
|
||
"""Results from DNS layer check."""
|
||
has_nameservers: bool = False
|
||
nameservers: List[str] = field(default_factory=list)
|
||
has_mx_records: bool = False
|
||
mx_records: List[str] = field(default_factory=list)
|
||
has_a_record: bool = False
|
||
a_records: List[str] = field(default_factory=list)
|
||
is_parking_ns: bool = False # Nameservers point to parking service
|
||
error: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class HTTPCheckResult:
|
||
"""Results from HTTP layer check."""
|
||
status_code: Optional[int] = None
|
||
is_reachable: bool = False
|
||
content_length: int = 0
|
||
is_parked: bool = False
|
||
parking_signals: List[str] = field(default_factory=list)
|
||
redirect_url: Optional[str] = None
|
||
response_time_ms: Optional[float] = None
|
||
error: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class SSLCheckResult:
|
||
"""Results from SSL layer check."""
|
||
has_ssl: bool = False
|
||
is_valid: bool = False
|
||
expires_at: Optional[datetime] = None
|
||
days_until_expiry: Optional[int] = None
|
||
issuer: Optional[str] = None
|
||
is_expired: bool = False
|
||
error: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class DomainHealthReport:
|
||
"""Complete health report for a domain."""
|
||
domain: str
|
||
status: HealthStatus
|
||
score: int # 0-100
|
||
|
||
# Layer results
|
||
dns: Optional[DNSCheckResult] = None
|
||
http: Optional[HTTPCheckResult] = None
|
||
ssl: Optional[SSLCheckResult] = None
|
||
|
||
# Summary
|
||
signals: List[str] = field(default_factory=list)
|
||
recommendations: List[str] = field(default_factory=list)
|
||
|
||
# Metadata
|
||
checked_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""Convert to dictionary for API response."""
|
||
return {
|
||
"domain": self.domain,
|
||
"status": self.status.value,
|
||
"score": self.score,
|
||
"signals": self.signals,
|
||
"recommendations": self.recommendations,
|
||
"checked_at": self.checked_at.isoformat(),
|
||
"layers": {
|
||
"dns": {
|
||
"has_nameservers": self.dns.has_nameservers if self.dns else False,
|
||
"nameservers": self.dns.nameservers if self.dns else [],
|
||
"has_mx_records": self.dns.has_mx_records if self.dns else False,
|
||
"is_parking_ns": self.dns.is_parking_ns if self.dns else False,
|
||
} if self.dns else None,
|
||
"http": {
|
||
"status_code": self.http.status_code if self.http else None,
|
||
"is_reachable": self.http.is_reachable if self.http else False,
|
||
"is_parked": self.http.is_parked if self.http else False,
|
||
"response_time_ms": self.http.response_time_ms if self.http else None,
|
||
} if self.http else None,
|
||
"ssl": {
|
||
"has_ssl": self.ssl.has_ssl if self.ssl else False,
|
||
"is_valid": self.ssl.is_valid if self.ssl else False,
|
||
"days_until_expiry": self.ssl.days_until_expiry if self.ssl else None,
|
||
"is_expired": self.ssl.is_expired if self.ssl else False,
|
||
} if self.ssl else None,
|
||
}
|
||
}
|
||
|
||
|
||
class DomainHealthChecker:
|
||
"""
|
||
Premium domain health analysis engine.
|
||
|
||
Checks 4 layers to determine domain health:
|
||
1. DNS: Is the infrastructure alive?
|
||
2. HTTP: Is the website running?
|
||
3. SSL: Is the certificate valid?
|
||
4. (WHOIS handled by existing DomainChecker)
|
||
"""
|
||
|
||
# Known parking/for-sale service nameservers
|
||
PARKING_NAMESERVERS = {
|
||
'sedoparking.com', 'afternic.com', 'domaincontrol.com',
|
||
'parkingcrew.net', 'bodis.com', 'dsredirection.com',
|
||
'above.com', 'domainsponsor.com', 'fastpark.net',
|
||
'parkdomain.com', 'domainmarket.com', 'hugedomains.com',
|
||
}
|
||
|
||
# Keywords indicating parked/for-sale pages
|
||
PARKING_KEYWORDS = [
|
||
'domain is for sale', 'buy this domain', 'inquire now',
|
||
'make an offer', 'domain zum verkauf', 'domain for sale',
|
||
'this domain is parked', 'parked by', 'related links',
|
||
'sponsored listings', 'domain parking', 'this website is for sale',
|
||
'purchase this domain', 'acquire this domain',
|
||
]
|
||
|
||
def __init__(self):
|
||
self._dns_resolver = dns.resolver.Resolver()
|
||
self._dns_resolver.timeout = 3
|
||
self._dns_resolver.lifetime = 5
|
||
|
||
async def check_domain(self, domain: str) -> DomainHealthReport:
|
||
"""
|
||
Perform comprehensive health check on a domain.
|
||
|
||
Args:
|
||
domain: Domain name to check (e.g., "example.com")
|
||
|
||
Returns:
|
||
DomainHealthReport with status, score, and detailed results
|
||
"""
|
||
domain = self._normalize_domain(domain)
|
||
logger.info(f"🏥 Starting health check for: {domain}")
|
||
|
||
# Run all checks concurrently
|
||
dns_task = asyncio.create_task(self._check_dns(domain))
|
||
http_task = asyncio.create_task(self._check_http(domain))
|
||
ssl_task = asyncio.create_task(self._check_ssl(domain))
|
||
|
||
dns_result, http_result, ssl_result = await asyncio.gather(
|
||
dns_task, http_task, ssl_task,
|
||
return_exceptions=True
|
||
)
|
||
|
||
# Handle exceptions
|
||
if isinstance(dns_result, Exception):
|
||
logger.warning(f"DNS check failed: {dns_result}")
|
||
dns_result = DNSCheckResult(error=str(dns_result))
|
||
if isinstance(http_result, Exception):
|
||
logger.warning(f"HTTP check failed: {http_result}")
|
||
http_result = HTTPCheckResult(error=str(http_result))
|
||
if isinstance(ssl_result, Exception):
|
||
logger.warning(f"SSL check failed: {ssl_result}")
|
||
ssl_result = SSLCheckResult(error=str(ssl_result))
|
||
|
||
# Calculate health score and status
|
||
report = self._calculate_health(domain, dns_result, http_result, ssl_result)
|
||
|
||
logger.info(f"✅ Health check complete: {domain} = {report.status.value} ({report.score}/100)")
|
||
return report
|
||
|
||
def _normalize_domain(self, domain: str) -> str:
|
||
"""Normalize domain name."""
|
||
domain = domain.lower().strip()
|
||
if domain.startswith('http://'):
|
||
domain = domain[7:]
|
||
elif domain.startswith('https://'):
|
||
domain = domain[8:]
|
||
if domain.startswith('www.'):
|
||
domain = domain[4:]
|
||
domain = domain.split('/')[0]
|
||
return domain
|
||
|
||
async def _check_dns(self, domain: str) -> DNSCheckResult:
|
||
"""
|
||
Layer 1: DNS Infrastructure Check
|
||
|
||
Checks:
|
||
- NS records (nameservers)
|
||
- MX records (mail)
|
||
- A records (IP address)
|
||
"""
|
||
result = DNSCheckResult()
|
||
|
||
loop = asyncio.get_event_loop()
|
||
|
||
# Check NS records
|
||
try:
|
||
ns_answers = await loop.run_in_executor(
|
||
None, lambda: self._dns_resolver.resolve(domain, 'NS')
|
||
)
|
||
result.nameservers = [str(rdata.target).rstrip('.').lower() for rdata in ns_answers]
|
||
result.has_nameservers = len(result.nameservers) > 0
|
||
|
||
# Check if nameservers point to parking service
|
||
for ns in result.nameservers:
|
||
for parking_ns in self.PARKING_NAMESERVERS:
|
||
if parking_ns in ns:
|
||
result.is_parking_ns = True
|
||
break
|
||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
|
||
result.has_nameservers = False
|
||
except Exception as e:
|
||
result.error = str(e)
|
||
|
||
# Check MX records
|
||
try:
|
||
mx_answers = await loop.run_in_executor(
|
||
None, lambda: self._dns_resolver.resolve(domain, 'MX')
|
||
)
|
||
result.mx_records = [str(rdata.exchange).rstrip('.').lower() for rdata in mx_answers]
|
||
result.has_mx_records = len(result.mx_records) > 0
|
||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
|
||
result.has_mx_records = False
|
||
except Exception:
|
||
pass
|
||
|
||
# Check A records
|
||
try:
|
||
a_answers = await loop.run_in_executor(
|
||
None, lambda: self._dns_resolver.resolve(domain, 'A')
|
||
)
|
||
result.a_records = [str(rdata.address) for rdata in a_answers]
|
||
result.has_a_record = len(result.a_records) > 0
|
||
|
||
# Check for dead IPs (0.0.0.0 or 127.0.0.1)
|
||
dead_ips = {'0.0.0.0', '127.0.0.1'}
|
||
if all(ip in dead_ips for ip in result.a_records):
|
||
result.has_a_record = False
|
||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
|
||
result.has_a_record = False
|
||
except Exception:
|
||
pass
|
||
|
||
return result
|
||
|
||
async def _check_http(self, domain: str) -> HTTPCheckResult:
|
||
"""
|
||
Layer 2: HTTP Website Check
|
||
|
||
Checks:
|
||
- HTTP status code
|
||
- Response content
|
||
- Parking/for-sale detection
|
||
"""
|
||
result = HTTPCheckResult()
|
||
|
||
async with httpx.AsyncClient(
|
||
timeout=10.0,
|
||
follow_redirects=True,
|
||
headers={
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
||
}
|
||
) as client:
|
||
for scheme in ['https', 'http']:
|
||
url = f"{scheme}://{domain}"
|
||
try:
|
||
start = asyncio.get_event_loop().time()
|
||
response = await client.get(url)
|
||
end = asyncio.get_event_loop().time()
|
||
|
||
result.status_code = response.status_code
|
||
result.is_reachable = response.status_code < 500
|
||
result.content_length = len(response.content)
|
||
result.response_time_ms = (end - start) * 1000
|
||
|
||
# Check for redirects
|
||
if response.history:
|
||
result.redirect_url = str(response.url)
|
||
|
||
# Check for parking keywords in content
|
||
content = response.text.lower()
|
||
for keyword in self.PARKING_KEYWORDS:
|
||
if keyword in content:
|
||
result.is_parked = True
|
||
result.parking_signals.append(keyword)
|
||
|
||
break # Success, no need to try other scheme
|
||
|
||
except httpx.TimeoutException:
|
||
result.error = "timeout"
|
||
except httpx.ConnectError:
|
||
result.error = "connection_refused"
|
||
except Exception as e:
|
||
result.error = str(e)
|
||
|
||
return result
|
||
|
||
async def _check_ssl(self, domain: str) -> SSLCheckResult:
|
||
"""
|
||
Layer 3: SSL Certificate Check
|
||
|
||
Checks:
|
||
- Certificate exists
|
||
- Certificate validity
|
||
- Expiration date
|
||
"""
|
||
result = SSLCheckResult()
|
||
|
||
loop = asyncio.get_event_loop()
|
||
|
||
try:
|
||
def get_ssl_info():
|
||
context = ssl.create_default_context()
|
||
with socket.create_connection((domain, 443), timeout=5) as sock:
|
||
with context.wrap_socket(sock, server_hostname=domain) as ssock:
|
||
cert = ssock.getpeercert()
|
||
return cert
|
||
|
||
cert = await loop.run_in_executor(None, get_ssl_info)
|
||
|
||
result.has_ssl = True
|
||
|
||
# Parse expiration date
|
||
not_after = cert.get('notAfter')
|
||
if not_after:
|
||
# Format: 'Dec 31 23:59:59 2024 GMT'
|
||
try:
|
||
expires = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
|
||
result.expires_at = expires.replace(tzinfo=timezone.utc)
|
||
result.days_until_expiry = (result.expires_at - datetime.now(timezone.utc)).days
|
||
result.is_expired = result.days_until_expiry < 0
|
||
result.is_valid = result.days_until_expiry >= 0
|
||
except Exception:
|
||
result.is_valid = True # Assume valid if we can't parse
|
||
|
||
# Get issuer
|
||
issuer = cert.get('issuer')
|
||
if issuer:
|
||
for item in issuer:
|
||
if item[0][0] == 'organizationName':
|
||
result.issuer = item[0][1]
|
||
break
|
||
|
||
except ssl.SSLCertVerificationError as e:
|
||
result.has_ssl = True
|
||
result.is_valid = False
|
||
result.is_expired = 'expired' in str(e).lower()
|
||
result.error = str(e)
|
||
except (socket.timeout, socket.error, ConnectionRefusedError):
|
||
result.has_ssl = False
|
||
result.error = "no_ssl"
|
||
except Exception as e:
|
||
result.error = str(e)
|
||
|
||
return result
|
||
|
||
def _calculate_health(
|
||
self,
|
||
domain: str,
|
||
dns_result: DNSCheckResult,
|
||
http_result: HTTPCheckResult,
|
||
ssl_result: SSLCheckResult
|
||
) -> DomainHealthReport:
|
||
"""
|
||
Calculate overall health status and score.
|
||
|
||
Scoring:
|
||
- DNS layer: 30 points
|
||
- HTTP layer: 40 points
|
||
- SSL layer: 30 points
|
||
"""
|
||
score = 100
|
||
signals = []
|
||
recommendations = []
|
||
|
||
# =========================
|
||
# DNS Scoring (30 points)
|
||
# =========================
|
||
|
||
if not dns_result.has_nameservers:
|
||
score -= 30
|
||
signals.append("🔴 No nameservers found (domain may not exist)")
|
||
elif dns_result.is_parking_ns:
|
||
score -= 15
|
||
signals.append("🟠 Nameservers point to parking service")
|
||
recommendations.append("Domain is parked - owner may be selling")
|
||
else:
|
||
if not dns_result.has_a_record:
|
||
score -= 10
|
||
signals.append("⚠️ No A record (no website configured)")
|
||
if not dns_result.has_mx_records:
|
||
score -= 5
|
||
signals.append("⚠️ No MX records (no email configured)")
|
||
|
||
# =========================
|
||
# HTTP Scoring (40 points)
|
||
# =========================
|
||
|
||
if not http_result.is_reachable:
|
||
score -= 40
|
||
signals.append("🔴 Website not reachable")
|
||
if http_result.error == "timeout":
|
||
signals.append("⚠️ Connection timeout")
|
||
elif http_result.error == "connection_refused":
|
||
signals.append("⚠️ Connection refused")
|
||
elif http_result.status_code:
|
||
if http_result.status_code >= 500:
|
||
score -= 30
|
||
signals.append(f"🔴 Server error ({http_result.status_code})")
|
||
recommendations.append("Server is having issues - monitor closely")
|
||
elif http_result.status_code >= 400:
|
||
score -= 15
|
||
signals.append(f"⚠️ Client error ({http_result.status_code})")
|
||
|
||
if http_result.is_parked:
|
||
score -= 10
|
||
signals.append("🟠 Page contains for-sale indicators")
|
||
recommendations.append(f"Detected: {', '.join(http_result.parking_signals[:3])}")
|
||
|
||
if http_result.content_length < 500:
|
||
score -= 5
|
||
signals.append("⚠️ Very small page content")
|
||
|
||
# =========================
|
||
# SSL Scoring (30 points)
|
||
# =========================
|
||
|
||
if not ssl_result.has_ssl:
|
||
score -= 10
|
||
signals.append("⚠️ No SSL certificate")
|
||
elif ssl_result.is_expired:
|
||
score -= 30
|
||
signals.append("🔴 SSL certificate expired!")
|
||
recommendations.append("Certificate expired - owner neglecting domain")
|
||
elif ssl_result.days_until_expiry is not None:
|
||
if ssl_result.days_until_expiry < 7:
|
||
score -= 15
|
||
signals.append(f"⚠️ SSL expires in {ssl_result.days_until_expiry} days")
|
||
recommendations.append("Certificate expiring soon - watch for neglect")
|
||
elif ssl_result.days_until_expiry < 30:
|
||
score -= 5
|
||
signals.append(f"ℹ️ SSL expires in {ssl_result.days_until_expiry} days")
|
||
|
||
# Ensure score is in valid range
|
||
score = max(0, min(100, score))
|
||
|
||
# Determine status
|
||
if score >= 80:
|
||
status = HealthStatus.HEALTHY
|
||
elif score >= 50:
|
||
if dns_result.is_parking_ns or http_result.is_parked:
|
||
status = HealthStatus.PARKED
|
||
else:
|
||
status = HealthStatus.WEAKENING
|
||
elif score >= 20:
|
||
if dns_result.is_parking_ns or http_result.is_parked:
|
||
status = HealthStatus.PARKED
|
||
else:
|
||
status = HealthStatus.WEAKENING
|
||
else:
|
||
status = HealthStatus.CRITICAL
|
||
|
||
# Override to PARKED if clear signals
|
||
if dns_result.is_parking_ns or http_result.is_parked:
|
||
if status != HealthStatus.CRITICAL:
|
||
status = HealthStatus.PARKED
|
||
|
||
return DomainHealthReport(
|
||
domain=domain,
|
||
status=status,
|
||
score=score,
|
||
dns=dns_result,
|
||
http=http_result,
|
||
ssl=ssl_result,
|
||
signals=signals,
|
||
recommendations=recommendations,
|
||
)
|
||
|
||
|
||
# Singleton instance
|
||
_health_checker: Optional[DomainHealthChecker] = None
|
||
|
||
|
||
def get_health_checker() -> DomainHealthChecker:
|
||
"""Get or create health checker instance."""
|
||
global _health_checker
|
||
if _health_checker is None:
|
||
_health_checker = DomainHealthChecker()
|
||
return _health_checker
|
||
|