pounce/backend/app/services/domain_health.py
yves.gugger a58db843e0
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Implement Domain Health Engine + Password Reset
🏥 DOMAIN HEALTH ENGINE (from analysis_2.md):
- New service: backend/app/services/domain_health.py
- 4-layer analysis:
  1. DNS: Nameservers, MX records, A records, parking NS detection
  2. HTTP: Status codes, content, parking keyword detection
  3. SSL: Certificate validity, expiration date, issuer
  4. (WHOIS via existing domain_checker)

📊 HEALTH SCORING:
- Score 0-100 based on all layers
- Status: HEALTHY (🟢), WEAKENING (🟡), PARKED (🟠), CRITICAL (🔴)
- Signals and recommendations for each domain

🔌 API ENDPOINTS:
- GET /api/v1/domains/{id}/health - Full health report
- POST /api/v1/domains/health-check?domain=x - Quick check any domain

🔐 PASSWORD RESET:
- New script: backend/scripts/reset_admin_password.py
- guggeryves@hotmail.com password: Pounce2024!

PARKING DETECTION:
- Known parking nameservers (Sedo, Afternic, etc.)
- Page content keywords ('buy this domain', 'for sale', etc.)
2025-12-10 09:34:43 +01:00

522 lines
19 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
🏥 POUNCE DOMAIN HEALTH ENGINE
Advanced domain health analysis for premium intelligence.
Implements 4-layer analysis from analysis_2.md:
1. DNS Layer - Infrastructure check (nameservers, MX, A records)
2. HTTP Layer - Website availability (status codes, content, parking detection)
3. SSL Layer - Certificate validity
4. WHOIS/RDAP Layer - Registration status
Output: Health Score (HEALTHY, WEAKENING, PARKED, CRITICAL)
"""
import asyncio
import logging
import ssl
import socket
import re
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from enum import Enum
import httpx
import dns.resolver
import dns.exception
logger = logging.getLogger(__name__)
class HealthStatus(str, Enum):
"""Domain health status levels."""
HEALTHY = "healthy" # 🟢 All systems go
WEAKENING = "weakening" # 🟡 Warning signs detected
PARKED = "parked" # 🟠 Domain for sale/parked
CRITICAL = "critical" # 🔴 Drop imminent
UNKNOWN = "unknown" # ❓ Could not determine
@dataclass
class DNSCheckResult:
"""Results from DNS layer check."""
has_nameservers: bool = False
nameservers: List[str] = field(default_factory=list)
has_mx_records: bool = False
mx_records: List[str] = field(default_factory=list)
has_a_record: bool = False
a_records: List[str] = field(default_factory=list)
is_parking_ns: bool = False # Nameservers point to parking service
error: Optional[str] = None
@dataclass
class HTTPCheckResult:
"""Results from HTTP layer check."""
status_code: Optional[int] = None
is_reachable: bool = False
content_length: int = 0
is_parked: bool = False
parking_signals: List[str] = field(default_factory=list)
redirect_url: Optional[str] = None
response_time_ms: Optional[float] = None
error: Optional[str] = None
@dataclass
class SSLCheckResult:
"""Results from SSL layer check."""
has_ssl: bool = False
is_valid: bool = False
expires_at: Optional[datetime] = None
days_until_expiry: Optional[int] = None
issuer: Optional[str] = None
is_expired: bool = False
error: Optional[str] = None
@dataclass
class DomainHealthReport:
"""Complete health report for a domain."""
domain: str
status: HealthStatus
score: int # 0-100
# Layer results
dns: Optional[DNSCheckResult] = None
http: Optional[HTTPCheckResult] = None
ssl: Optional[SSLCheckResult] = None
# Summary
signals: List[str] = field(default_factory=list)
recommendations: List[str] = field(default_factory=list)
# Metadata
checked_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API response."""
return {
"domain": self.domain,
"status": self.status.value,
"score": self.score,
"signals": self.signals,
"recommendations": self.recommendations,
"checked_at": self.checked_at.isoformat(),
"layers": {
"dns": {
"has_nameservers": self.dns.has_nameservers if self.dns else False,
"nameservers": self.dns.nameservers if self.dns else [],
"has_mx_records": self.dns.has_mx_records if self.dns else False,
"is_parking_ns": self.dns.is_parking_ns if self.dns else False,
} if self.dns else None,
"http": {
"status_code": self.http.status_code if self.http else None,
"is_reachable": self.http.is_reachable if self.http else False,
"is_parked": self.http.is_parked if self.http else False,
"response_time_ms": self.http.response_time_ms if self.http else None,
} if self.http else None,
"ssl": {
"has_ssl": self.ssl.has_ssl if self.ssl else False,
"is_valid": self.ssl.is_valid if self.ssl else False,
"days_until_expiry": self.ssl.days_until_expiry if self.ssl else None,
"is_expired": self.ssl.is_expired if self.ssl else False,
} if self.ssl else None,
}
}
class DomainHealthChecker:
"""
Premium domain health analysis engine.
Checks 4 layers to determine domain health:
1. DNS: Is the infrastructure alive?
2. HTTP: Is the website running?
3. SSL: Is the certificate valid?
4. (WHOIS handled by existing DomainChecker)
"""
# Known parking/for-sale service nameservers
PARKING_NAMESERVERS = {
'sedoparking.com', 'afternic.com', 'domaincontrol.com',
'parkingcrew.net', 'bodis.com', 'dsredirection.com',
'above.com', 'domainsponsor.com', 'fastpark.net',
'parkdomain.com', 'domainmarket.com', 'hugedomains.com',
}
# Keywords indicating parked/for-sale pages
PARKING_KEYWORDS = [
'domain is for sale', 'buy this domain', 'inquire now',
'make an offer', 'domain zum verkauf', 'domain for sale',
'this domain is parked', 'parked by', 'related links',
'sponsored listings', 'domain parking', 'this website is for sale',
'purchase this domain', 'acquire this domain',
]
def __init__(self):
self._dns_resolver = dns.resolver.Resolver()
self._dns_resolver.timeout = 3
self._dns_resolver.lifetime = 5
async def check_domain(self, domain: str) -> DomainHealthReport:
"""
Perform comprehensive health check on a domain.
Args:
domain: Domain name to check (e.g., "example.com")
Returns:
DomainHealthReport with status, score, and detailed results
"""
domain = self._normalize_domain(domain)
logger.info(f"🏥 Starting health check for: {domain}")
# Run all checks concurrently
dns_task = asyncio.create_task(self._check_dns(domain))
http_task = asyncio.create_task(self._check_http(domain))
ssl_task = asyncio.create_task(self._check_ssl(domain))
dns_result, http_result, ssl_result = await asyncio.gather(
dns_task, http_task, ssl_task,
return_exceptions=True
)
# Handle exceptions
if isinstance(dns_result, Exception):
logger.warning(f"DNS check failed: {dns_result}")
dns_result = DNSCheckResult(error=str(dns_result))
if isinstance(http_result, Exception):
logger.warning(f"HTTP check failed: {http_result}")
http_result = HTTPCheckResult(error=str(http_result))
if isinstance(ssl_result, Exception):
logger.warning(f"SSL check failed: {ssl_result}")
ssl_result = SSLCheckResult(error=str(ssl_result))
# Calculate health score and status
report = self._calculate_health(domain, dns_result, http_result, ssl_result)
logger.info(f"✅ Health check complete: {domain} = {report.status.value} ({report.score}/100)")
return report
def _normalize_domain(self, domain: str) -> str:
"""Normalize domain name."""
domain = domain.lower().strip()
if domain.startswith('http://'):
domain = domain[7:]
elif domain.startswith('https://'):
domain = domain[8:]
if domain.startswith('www.'):
domain = domain[4:]
domain = domain.split('/')[0]
return domain
async def _check_dns(self, domain: str) -> DNSCheckResult:
"""
Layer 1: DNS Infrastructure Check
Checks:
- NS records (nameservers)
- MX records (mail)
- A records (IP address)
"""
result = DNSCheckResult()
loop = asyncio.get_event_loop()
# Check NS records
try:
ns_answers = await loop.run_in_executor(
None, lambda: self._dns_resolver.resolve(domain, 'NS')
)
result.nameservers = [str(rdata.target).rstrip('.').lower() for rdata in ns_answers]
result.has_nameservers = len(result.nameservers) > 0
# Check if nameservers point to parking service
for ns in result.nameservers:
for parking_ns in self.PARKING_NAMESERVERS:
if parking_ns in ns:
result.is_parking_ns = True
break
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
result.has_nameservers = False
except Exception as e:
result.error = str(e)
# Check MX records
try:
mx_answers = await loop.run_in_executor(
None, lambda: self._dns_resolver.resolve(domain, 'MX')
)
result.mx_records = [str(rdata.exchange).rstrip('.').lower() for rdata in mx_answers]
result.has_mx_records = len(result.mx_records) > 0
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
result.has_mx_records = False
except Exception:
pass
# Check A records
try:
a_answers = await loop.run_in_executor(
None, lambda: self._dns_resolver.resolve(domain, 'A')
)
result.a_records = [str(rdata.address) for rdata in a_answers]
result.has_a_record = len(result.a_records) > 0
# Check for dead IPs (0.0.0.0 or 127.0.0.1)
dead_ips = {'0.0.0.0', '127.0.0.1'}
if all(ip in dead_ips for ip in result.a_records):
result.has_a_record = False
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
result.has_a_record = False
except Exception:
pass
return result
async def _check_http(self, domain: str) -> HTTPCheckResult:
"""
Layer 2: HTTP Website Check
Checks:
- HTTP status code
- Response content
- Parking/for-sale detection
"""
result = HTTPCheckResult()
async with httpx.AsyncClient(
timeout=10.0,
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
) as client:
for scheme in ['https', 'http']:
url = f"{scheme}://{domain}"
try:
start = asyncio.get_event_loop().time()
response = await client.get(url)
end = asyncio.get_event_loop().time()
result.status_code = response.status_code
result.is_reachable = response.status_code < 500
result.content_length = len(response.content)
result.response_time_ms = (end - start) * 1000
# Check for redirects
if response.history:
result.redirect_url = str(response.url)
# Check for parking keywords in content
content = response.text.lower()
for keyword in self.PARKING_KEYWORDS:
if keyword in content:
result.is_parked = True
result.parking_signals.append(keyword)
break # Success, no need to try other scheme
except httpx.TimeoutException:
result.error = "timeout"
except httpx.ConnectError:
result.error = "connection_refused"
except Exception as e:
result.error = str(e)
return result
async def _check_ssl(self, domain: str) -> SSLCheckResult:
"""
Layer 3: SSL Certificate Check
Checks:
- Certificate exists
- Certificate validity
- Expiration date
"""
result = SSLCheckResult()
loop = asyncio.get_event_loop()
try:
def get_ssl_info():
context = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
return cert
cert = await loop.run_in_executor(None, get_ssl_info)
result.has_ssl = True
# Parse expiration date
not_after = cert.get('notAfter')
if not_after:
# Format: 'Dec 31 23:59:59 2024 GMT'
try:
expires = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
result.expires_at = expires.replace(tzinfo=timezone.utc)
result.days_until_expiry = (result.expires_at - datetime.now(timezone.utc)).days
result.is_expired = result.days_until_expiry < 0
result.is_valid = result.days_until_expiry >= 0
except Exception:
result.is_valid = True # Assume valid if we can't parse
# Get issuer
issuer = cert.get('issuer')
if issuer:
for item in issuer:
if item[0][0] == 'organizationName':
result.issuer = item[0][1]
break
except ssl.SSLCertVerificationError as e:
result.has_ssl = True
result.is_valid = False
result.is_expired = 'expired' in str(e).lower()
result.error = str(e)
except (socket.timeout, socket.error, ConnectionRefusedError):
result.has_ssl = False
result.error = "no_ssl"
except Exception as e:
result.error = str(e)
return result
def _calculate_health(
self,
domain: str,
dns_result: DNSCheckResult,
http_result: HTTPCheckResult,
ssl_result: SSLCheckResult
) -> DomainHealthReport:
"""
Calculate overall health status and score.
Scoring:
- DNS layer: 30 points
- HTTP layer: 40 points
- SSL layer: 30 points
"""
score = 100
signals = []
recommendations = []
# =========================
# DNS Scoring (30 points)
# =========================
if not dns_result.has_nameservers:
score -= 30
signals.append("🔴 No nameservers found (domain may not exist)")
elif dns_result.is_parking_ns:
score -= 15
signals.append("🟠 Nameservers point to parking service")
recommendations.append("Domain is parked - owner may be selling")
else:
if not dns_result.has_a_record:
score -= 10
signals.append("⚠️ No A record (no website configured)")
if not dns_result.has_mx_records:
score -= 5
signals.append("⚠️ No MX records (no email configured)")
# =========================
# HTTP Scoring (40 points)
# =========================
if not http_result.is_reachable:
score -= 40
signals.append("🔴 Website not reachable")
if http_result.error == "timeout":
signals.append("⚠️ Connection timeout")
elif http_result.error == "connection_refused":
signals.append("⚠️ Connection refused")
elif http_result.status_code:
if http_result.status_code >= 500:
score -= 30
signals.append(f"🔴 Server error ({http_result.status_code})")
recommendations.append("Server is having issues - monitor closely")
elif http_result.status_code >= 400:
score -= 15
signals.append(f"⚠️ Client error ({http_result.status_code})")
if http_result.is_parked:
score -= 10
signals.append("🟠 Page contains for-sale indicators")
recommendations.append(f"Detected: {', '.join(http_result.parking_signals[:3])}")
if http_result.content_length < 500:
score -= 5
signals.append("⚠️ Very small page content")
# =========================
# SSL Scoring (30 points)
# =========================
if not ssl_result.has_ssl:
score -= 10
signals.append("⚠️ No SSL certificate")
elif ssl_result.is_expired:
score -= 30
signals.append("🔴 SSL certificate expired!")
recommendations.append("Certificate expired - owner neglecting domain")
elif ssl_result.days_until_expiry is not None:
if ssl_result.days_until_expiry < 7:
score -= 15
signals.append(f"⚠️ SSL expires in {ssl_result.days_until_expiry} days")
recommendations.append("Certificate expiring soon - watch for neglect")
elif ssl_result.days_until_expiry < 30:
score -= 5
signals.append(f" SSL expires in {ssl_result.days_until_expiry} days")
# Ensure score is in valid range
score = max(0, min(100, score))
# Determine status
if score >= 80:
status = HealthStatus.HEALTHY
elif score >= 50:
if dns_result.is_parking_ns or http_result.is_parked:
status = HealthStatus.PARKED
else:
status = HealthStatus.WEAKENING
elif score >= 20:
if dns_result.is_parking_ns or http_result.is_parked:
status = HealthStatus.PARKED
else:
status = HealthStatus.WEAKENING
else:
status = HealthStatus.CRITICAL
# Override to PARKED if clear signals
if dns_result.is_parking_ns or http_result.is_parked:
if status != HealthStatus.CRITICAL:
status = HealthStatus.PARKED
return DomainHealthReport(
domain=domain,
status=status,
score=score,
dns=dns_result,
http=http_result,
ssl=ssl_result,
signals=signals,
recommendations=recommendations,
)
# Singleton instance
_health_checker: Optional[DomainHealthChecker] = None
def get_health_checker() -> DomainHealthChecker:
"""Get or create health checker instance."""
global _health_checker
if _health_checker is None:
_health_checker = DomainHealthChecker()
return _health_checker