Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
- Fix 2 indentation errors in domain_checker.py (lines 200, 205) - Fix 2 indentation errors in domain_health.py (lines 412, 436, 447) - Fix NameError in playwright_scraper.py when playwright not installed - Add fallback type definitions for Browser, BrowserContext, Page
588 lines
23 KiB
Python
588 lines
23 KiB
Python
"""
|
||
🏥 POUNCE DOMAIN HEALTH ENGINE
|
||
|
||
Advanced domain health analysis for premium intelligence.
|
||
|
||
Implements 4-layer analysis from analysis_2.md:
|
||
1. DNS Layer - Infrastructure check (nameservers, MX, A records)
|
||
2. HTTP Layer - Website availability (status codes, content, parking detection)
|
||
3. SSL Layer - Certificate validity
|
||
4. WHOIS/RDAP Layer - Registration status
|
||
|
||
Output: Health Score (HEALTHY, WEAKENING, PARKED, CRITICAL)
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import ssl
|
||
import socket
|
||
import re
|
||
from datetime import datetime, timezone, timedelta
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional, List, Dict, Any
|
||
from enum import Enum
|
||
|
||
import httpx
|
||
import dns.resolver
|
||
import dns.exception
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class HealthStatus(str, Enum):
|
||
"""Domain health status levels."""
|
||
HEALTHY = "healthy" # 🟢 All systems go
|
||
WEAKENING = "weakening" # 🟡 Warning signs detected
|
||
PARKED = "parked" # 🟠 Domain for sale/parked
|
||
CRITICAL = "critical" # 🔴 Drop imminent
|
||
UNKNOWN = "unknown" # ❓ Could not determine
|
||
|
||
|
||
@dataclass
|
||
class DNSCheckResult:
|
||
"""Results from DNS layer check."""
|
||
has_nameservers: bool = False
|
||
nameservers: List[str] = field(default_factory=list)
|
||
has_mx_records: bool = False
|
||
mx_records: List[str] = field(default_factory=list)
|
||
has_a_record: bool = False
|
||
a_records: List[str] = field(default_factory=list)
|
||
is_parking_ns: bool = False # Nameservers point to parking service
|
||
error: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class HTTPCheckResult:
|
||
"""Results from HTTP layer check."""
|
||
status_code: Optional[int] = None
|
||
is_reachable: bool = False
|
||
content_length: int = 0
|
||
is_parked: bool = False
|
||
parking_signals: List[str] = field(default_factory=list)
|
||
redirect_url: Optional[str] = None
|
||
response_time_ms: Optional[float] = None
|
||
error: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class SSLCheckResult:
|
||
"""Results from SSL layer check."""
|
||
has_ssl: bool = False
|
||
is_valid: bool = False
|
||
expires_at: Optional[datetime] = None
|
||
days_until_expiry: Optional[int] = None
|
||
issuer: Optional[str] = None
|
||
is_expired: bool = False
|
||
error: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class DomainHealthReport:
|
||
"""Complete health report for a domain."""
|
||
domain: str
|
||
status: HealthStatus
|
||
score: int # 0-100
|
||
|
||
# Layer results
|
||
dns: Optional[DNSCheckResult] = None
|
||
http: Optional[HTTPCheckResult] = None
|
||
ssl: Optional[SSLCheckResult] = None
|
||
|
||
# Summary
|
||
signals: List[str] = field(default_factory=list)
|
||
recommendations: List[str] = field(default_factory=list)
|
||
|
||
# Metadata
|
||
checked_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""Convert to dictionary for API response."""
|
||
return {
|
||
"domain": self.domain,
|
||
"status": self.status.value,
|
||
"score": self.score,
|
||
"signals": self.signals,
|
||
"recommendations": self.recommendations,
|
||
"checked_at": self.checked_at.isoformat(),
|
||
# Flat structure for frontend compatibility
|
||
"dns": {
|
||
"has_ns": self.dns.has_nameservers if self.dns else False,
|
||
"has_a": self.dns.has_a_record if self.dns else False,
|
||
"has_mx": self.dns.has_mx_records if self.dns else False,
|
||
"nameservers": self.dns.nameservers if self.dns else [],
|
||
"is_parked": self.dns.is_parking_ns if self.dns else False,
|
||
"parking_provider": None, # Could be enhanced later
|
||
"error": self.dns.error if self.dns else None,
|
||
} if self.dns else {
|
||
"has_ns": False, "has_a": False, "has_mx": False,
|
||
"nameservers": [], "is_parked": False, "error": None
|
||
},
|
||
"http": {
|
||
"is_reachable": self.http.is_reachable if self.http else False,
|
||
"status_code": self.http.status_code if self.http else None,
|
||
"is_parked": self.http.is_parked if self.http else False,
|
||
"parking_keywords": self.http.parking_signals if self.http else [],
|
||
"content_length": self.http.content_length if self.http else 0,
|
||
"error": self.http.error if self.http else None,
|
||
} if self.http else {
|
||
"is_reachable": False, "status_code": None, "is_parked": False,
|
||
"parking_keywords": [], "content_length": 0, "error": None
|
||
},
|
||
"ssl": {
|
||
"has_certificate": self.ssl.has_ssl if self.ssl else False,
|
||
"is_valid": self.ssl.is_valid if self.ssl else False,
|
||
"expires_at": self.ssl.expires_at.isoformat() if self.ssl and self.ssl.expires_at else None,
|
||
"days_until_expiry": self.ssl.days_until_expiry if self.ssl else None,
|
||
"issuer": self.ssl.issuer if self.ssl else None,
|
||
"error": self.ssl.error if self.ssl else None,
|
||
} if self.ssl else {
|
||
"has_certificate": False, "is_valid": False, "expires_at": None,
|
||
"days_until_expiry": None, "issuer": None, "error": None
|
||
},
|
||
}
|
||
|
||
|
||
class DomainHealthChecker:
|
||
"""
|
||
Premium domain health analysis engine.
|
||
|
||
Checks 4 layers to determine domain health:
|
||
1. DNS: Is the infrastructure alive?
|
||
2. HTTP: Is the website running?
|
||
3. SSL: Is the certificate valid?
|
||
4. (WHOIS handled by existing DomainChecker)
|
||
"""
|
||
|
||
# Known parking/for-sale service nameservers
|
||
PARKING_NAMESERVERS = {
|
||
'sedoparking.com', 'afternic.com', 'domaincontrol.com',
|
||
'parkingcrew.net', 'bodis.com', 'dsredirection.com',
|
||
'above.com', 'domainsponsor.com', 'fastpark.net',
|
||
'parkdomain.com', 'domainmarket.com', 'hugedomains.com',
|
||
}
|
||
|
||
# Keywords indicating parked/for-sale pages
|
||
PARKING_KEYWORDS = [
|
||
'domain is for sale', 'buy this domain', 'inquire now',
|
||
'make an offer', 'domain zum verkauf', 'domain for sale',
|
||
'this domain is parked', 'parked by', 'related links',
|
||
'sponsored listings', 'domain parking', 'this website is for sale',
|
||
'purchase this domain', 'acquire this domain',
|
||
]
|
||
|
||
def __init__(self):
|
||
self._dns_resolver = dns.resolver.Resolver()
|
||
self._dns_resolver.timeout = 3
|
||
self._dns_resolver.lifetime = 5
|
||
|
||
async def check_domain(self, domain: str) -> DomainHealthReport:
|
||
"""
|
||
Perform comprehensive health check on a domain.
|
||
|
||
Args:
|
||
domain: Domain name to check (e.g., "example.com")
|
||
|
||
Returns:
|
||
DomainHealthReport with status, score, and detailed results
|
||
"""
|
||
domain = self._normalize_domain(domain)
|
||
logger.info(f"🏥 Starting health check for: {domain}")
|
||
|
||
# Run all checks concurrently
|
||
dns_task = asyncio.create_task(self._check_dns(domain))
|
||
http_task = asyncio.create_task(self._check_http(domain))
|
||
ssl_task = asyncio.create_task(self._check_ssl(domain))
|
||
|
||
dns_result, http_result, ssl_result = await asyncio.gather(
|
||
dns_task, http_task, ssl_task,
|
||
return_exceptions=True
|
||
)
|
||
|
||
# Handle exceptions
|
||
if isinstance(dns_result, Exception):
|
||
logger.warning(f"DNS check failed: {dns_result}")
|
||
dns_result = DNSCheckResult(error=str(dns_result))
|
||
if isinstance(http_result, Exception):
|
||
logger.warning(f"HTTP check failed: {http_result}")
|
||
http_result = HTTPCheckResult(error=str(http_result))
|
||
if isinstance(ssl_result, Exception):
|
||
logger.warning(f"SSL check failed: {ssl_result}")
|
||
ssl_result = SSLCheckResult(error=str(ssl_result))
|
||
|
||
# Calculate health score and status
|
||
report = self._calculate_health(domain, dns_result, http_result, ssl_result)
|
||
|
||
logger.info(f"✅ Health check complete: {domain} = {report.status.value} ({report.score}/100)")
|
||
return report
|
||
|
||
def _normalize_domain(self, domain: str) -> str:
|
||
"""Normalize domain name."""
|
||
domain = domain.lower().strip()
|
||
if domain.startswith('http://'):
|
||
domain = domain[7:]
|
||
elif domain.startswith('https://'):
|
||
domain = domain[8:]
|
||
if domain.startswith('www.'):
|
||
domain = domain[4:]
|
||
domain = domain.split('/')[0]
|
||
return domain
|
||
|
||
async def _check_dns(self, domain: str) -> DNSCheckResult:
|
||
"""
|
||
Layer 1: DNS Infrastructure Check
|
||
|
||
Checks:
|
||
- NS records (nameservers)
|
||
- MX records (mail)
|
||
- A records (IP address)
|
||
"""
|
||
result = DNSCheckResult()
|
||
|
||
loop = asyncio.get_event_loop()
|
||
|
||
# Check NS records
|
||
try:
|
||
ns_answers = await loop.run_in_executor(
|
||
None, lambda: self._dns_resolver.resolve(domain, 'NS')
|
||
)
|
||
result.nameservers = [str(rdata.target).rstrip('.').lower() for rdata in ns_answers]
|
||
result.has_nameservers = len(result.nameservers) > 0
|
||
|
||
# Check if nameservers point to parking service
|
||
for ns in result.nameservers:
|
||
for parking_ns in self.PARKING_NAMESERVERS:
|
||
if parking_ns in ns:
|
||
result.is_parking_ns = True
|
||
break
|
||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
|
||
result.has_nameservers = False
|
||
except Exception as e:
|
||
result.error = str(e)
|
||
|
||
# Check MX records
|
||
try:
|
||
mx_answers = await loop.run_in_executor(
|
||
None, lambda: self._dns_resolver.resolve(domain, 'MX')
|
||
)
|
||
result.mx_records = [str(rdata.exchange).rstrip('.').lower() for rdata in mx_answers]
|
||
result.has_mx_records = len(result.mx_records) > 0
|
||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
|
||
result.has_mx_records = False
|
||
except Exception:
|
||
pass
|
||
|
||
# Check A records
|
||
try:
|
||
a_answers = await loop.run_in_executor(
|
||
None, lambda: self._dns_resolver.resolve(domain, 'A')
|
||
)
|
||
result.a_records = [str(rdata.address) for rdata in a_answers]
|
||
result.has_a_record = len(result.a_records) > 0
|
||
|
||
# Check for dead IPs (0.0.0.0 or 127.0.0.1)
|
||
dead_ips = {'0.0.0.0', '127.0.0.1'}
|
||
if all(ip in dead_ips for ip in result.a_records):
|
||
result.has_a_record = False
|
||
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
|
||
result.has_a_record = False
|
||
except Exception:
|
||
pass
|
||
|
||
return result
|
||
|
||
async def _check_http(self, domain: str) -> HTTPCheckResult:
|
||
"""
|
||
Layer 2: HTTP Website Check
|
||
|
||
Checks:
|
||
- HTTP status code
|
||
- Response content
|
||
- Parking/for-sale detection
|
||
"""
|
||
result = HTTPCheckResult()
|
||
|
||
async with httpx.AsyncClient(
|
||
timeout=10.0,
|
||
follow_redirects=True,
|
||
headers={
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
||
}
|
||
) as client:
|
||
for scheme in ['https', 'http']:
|
||
url = f"{scheme}://{domain}"
|
||
try:
|
||
start = asyncio.get_event_loop().time()
|
||
response = await client.get(url)
|
||
end = asyncio.get_event_loop().time()
|
||
|
||
result.status_code = response.status_code
|
||
result.is_reachable = response.status_code < 500
|
||
result.content_length = len(response.content)
|
||
result.response_time_ms = (end - start) * 1000
|
||
|
||
# Check for redirects
|
||
if response.history:
|
||
result.redirect_url = str(response.url)
|
||
|
||
# Check for parking keywords in content
|
||
content = response.text.lower()
|
||
for keyword in self.PARKING_KEYWORDS:
|
||
if keyword in content:
|
||
result.is_parked = True
|
||
result.parking_signals.append(keyword)
|
||
|
||
break # Success, no need to try other scheme
|
||
|
||
except httpx.TimeoutException:
|
||
result.error = "timeout"
|
||
except httpx.ConnectError:
|
||
result.error = "connection_refused"
|
||
except Exception as e:
|
||
result.error = str(e)
|
||
|
||
return result
|
||
|
||
async def _check_ssl(self, domain: str) -> SSLCheckResult:
|
||
"""
|
||
Layer 3: SSL Certificate Check
|
||
|
||
Checks:
|
||
- Certificate exists
|
||
- Certificate validity
|
||
- Expiration date
|
||
|
||
Uses two-stage approach:
|
||
1. Try with full validation
|
||
2. On validation failure, extract cert info without validation
|
||
"""
|
||
result = SSLCheckResult()
|
||
|
||
loop = asyncio.get_event_loop()
|
||
|
||
try:
|
||
def get_ssl_info_validated():
|
||
"""Try to get SSL info with full certificate validation."""
|
||
context = ssl.create_default_context()
|
||
with socket.create_connection((domain, 443), timeout=5) as sock:
|
||
with context.wrap_socket(sock, server_hostname=domain) as ssock:
|
||
cert = ssock.getpeercert()
|
||
return cert, True # cert, validated
|
||
|
||
def get_ssl_info_unvalidated():
|
||
"""Get SSL info without certificate validation (fallback)."""
|
||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||
context.check_hostname = False
|
||
context.verify_mode = ssl.CERT_NONE
|
||
with socket.create_connection((domain, 443), timeout=5) as sock:
|
||
with context.wrap_socket(sock, server_hostname=domain) as ssock:
|
||
# Get certificate in DER format and decode
|
||
cert_der = ssock.getpeercert(binary_form=True)
|
||
cert_pem = ssock.getpeercert() # This returns None when verify_mode=CERT_NONE
|
||
|
||
# Use cryptography library if available, otherwise use openssl
|
||
try:
|
||
from cryptography import x509
|
||
from cryptography.hazmat.backends import default_backend
|
||
|
||
cert_obj = x509.load_der_x509_certificate(cert_der, default_backend())
|
||
|
||
return {
|
||
'notAfter': cert_obj.not_valid_after_utc.strftime('%b %d %H:%M:%S %Y GMT'),
|
||
'notBefore': cert_obj.not_valid_before_utc.strftime('%b %d %H:%M:%S %Y GMT'),
|
||
'issuer': [(('organizationName', cert_obj.issuer.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME)[0].value if cert_obj.issuer.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME) else 'Unknown'),)]
|
||
}, False # cert, not validated
|
||
except ImportError:
|
||
# Fallback: basic info without cryptography library
|
||
return {
|
||
'notAfter': None,
|
||
'issuer': None
|
||
}, False
|
||
|
||
# First try with validation
|
||
try:
|
||
cert, validated = await loop.run_in_executor(None, get_ssl_info_validated)
|
||
result.has_ssl = True
|
||
result.is_valid = True
|
||
except ssl.SSLCertVerificationError:
|
||
# Validation failed, try without validation to get cert info
|
||
try:
|
||
cert, validated = await loop.run_in_executor(None, get_ssl_info_unvalidated)
|
||
result.has_ssl = True
|
||
result.is_valid = True # Certificate exists and is technically valid, just can't verify chain locally
|
||
except Exception:
|
||
result.has_ssl = True
|
||
result.is_valid = False
|
||
result.error = "Certificate exists but could not be parsed"
|
||
return result
|
||
|
||
# Parse expiration date
|
||
not_after = cert.get('notAfter')
|
||
if not_after:
|
||
# Format: 'Dec 31 23:59:59 2024 GMT'
|
||
try:
|
||
expires = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
|
||
result.expires_at = expires.replace(tzinfo=timezone.utc)
|
||
result.days_until_expiry = (result.expires_at - datetime.now(timezone.utc)).days
|
||
result.is_expired = result.days_until_expiry < 0
|
||
result.is_valid = result.days_until_expiry >= 0
|
||
except Exception:
|
||
result.is_valid = True # Assume valid if we can't parse
|
||
|
||
# Get issuer
|
||
issuer = cert.get('issuer')
|
||
if issuer:
|
||
for item in issuer:
|
||
if isinstance(item, tuple) and len(item) > 0:
|
||
if isinstance(item[0], tuple) and item[0][0] == 'organizationName':
|
||
result.issuer = item[0][1]
|
||
break
|
||
elif isinstance(item[0], str) and item[0] == 'organizationName':
|
||
result.issuer = item[1] if len(item) > 1 else None
|
||
break
|
||
|
||
except (socket.timeout, socket.error, ConnectionRefusedError, OSError) as e:
|
||
if '443' in str(e) or 'refused' in str(e).lower():
|
||
result.has_ssl = False
|
||
result.error = "Port 443 not responding"
|
||
else:
|
||
result.has_ssl = False
|
||
result.error = "no_ssl"
|
||
except Exception as e:
|
||
result.error = str(e)
|
||
|
||
return result
|
||
|
||
def _calculate_health(
|
||
self,
|
||
domain: str,
|
||
dns_result: DNSCheckResult,
|
||
http_result: HTTPCheckResult,
|
||
ssl_result: SSLCheckResult
|
||
) -> DomainHealthReport:
|
||
"""
|
||
Calculate overall health status and score.
|
||
|
||
Scoring:
|
||
- DNS layer: 30 points
|
||
- HTTP layer: 40 points
|
||
- SSL layer: 30 points
|
||
"""
|
||
score = 100
|
||
signals = []
|
||
recommendations = []
|
||
|
||
# =========================
|
||
# DNS Scoring (30 points)
|
||
# =========================
|
||
|
||
if not dns_result.has_nameservers:
|
||
score -= 30
|
||
signals.append("🔴 No nameservers found (domain may not exist)")
|
||
elif dns_result.is_parking_ns:
|
||
score -= 15
|
||
signals.append("🟠 Nameservers point to parking service")
|
||
recommendations.append("Domain is parked - owner may be selling")
|
||
else:
|
||
if not dns_result.has_a_record:
|
||
score -= 10
|
||
signals.append("⚠️ No A record (no website configured)")
|
||
if not dns_result.has_mx_records:
|
||
score -= 5
|
||
signals.append("⚠️ No MX records (no email configured)")
|
||
|
||
# =========================
|
||
# HTTP Scoring (40 points)
|
||
# =========================
|
||
|
||
if not http_result.is_reachable:
|
||
score -= 40
|
||
signals.append("🔴 Website not reachable")
|
||
if http_result.error == "timeout":
|
||
signals.append("⚠️ Connection timeout")
|
||
elif http_result.error == "connection_refused":
|
||
signals.append("⚠️ Connection refused")
|
||
elif http_result.status_code:
|
||
if http_result.status_code >= 500:
|
||
score -= 30
|
||
signals.append(f"🔴 Server error ({http_result.status_code})")
|
||
recommendations.append("Server is having issues - monitor closely")
|
||
elif http_result.status_code >= 400:
|
||
score -= 15
|
||
signals.append(f"⚠️ Client error ({http_result.status_code})")
|
||
|
||
if http_result.is_parked:
|
||
score -= 10
|
||
signals.append("🟠 Page contains for-sale indicators")
|
||
recommendations.append(f"Detected: {', '.join(http_result.parking_signals[:3])}")
|
||
|
||
if http_result.content_length < 500:
|
||
score -= 5
|
||
signals.append("⚠️ Very small page content")
|
||
|
||
# =========================
|
||
# SSL Scoring (30 points)
|
||
# =========================
|
||
|
||
if not ssl_result.has_ssl:
|
||
score -= 10
|
||
signals.append("⚠️ No SSL certificate")
|
||
elif ssl_result.is_expired:
|
||
score -= 30
|
||
signals.append("🔴 SSL certificate expired!")
|
||
recommendations.append("Certificate expired - owner neglecting domain")
|
||
elif ssl_result.days_until_expiry is not None:
|
||
if ssl_result.days_until_expiry < 7:
|
||
score -= 15
|
||
signals.append(f"⚠️ SSL expires in {ssl_result.days_until_expiry} days")
|
||
recommendations.append("Certificate expiring soon - watch for neglect")
|
||
elif ssl_result.days_until_expiry < 30:
|
||
score -= 5
|
||
signals.append(f"ℹ️ SSL expires in {ssl_result.days_until_expiry} days")
|
||
|
||
# Ensure score is in valid range
|
||
score = max(0, min(100, score))
|
||
|
||
# Determine status
|
||
if score >= 80:
|
||
status = HealthStatus.HEALTHY
|
||
elif score >= 50:
|
||
if dns_result.is_parking_ns or http_result.is_parked:
|
||
status = HealthStatus.PARKED
|
||
else:
|
||
status = HealthStatus.WEAKENING
|
||
elif score >= 20:
|
||
if dns_result.is_parking_ns or http_result.is_parked:
|
||
status = HealthStatus.PARKED
|
||
else:
|
||
status = HealthStatus.WEAKENING
|
||
else:
|
||
status = HealthStatus.CRITICAL
|
||
|
||
# Override to PARKED if clear signals
|
||
if dns_result.is_parking_ns or http_result.is_parked:
|
||
if status != HealthStatus.CRITICAL:
|
||
status = HealthStatus.PARKED
|
||
|
||
return DomainHealthReport(
|
||
domain=domain,
|
||
status=status,
|
||
score=score,
|
||
dns=dns_result,
|
||
http=http_result,
|
||
ssl=ssl_result,
|
||
signals=signals,
|
||
recommendations=recommendations,
|
||
)
|
||
|
||
|
||
# Singleton instance
|
||
_health_checker: Optional[DomainHealthChecker] = None
|
||
|
||
|
||
def get_health_checker() -> DomainHealthChecker:
|
||
"""Get or create health checker instance."""
|
||
global _health_checker
|
||
if _health_checker is None:
|
||
_health_checker = DomainHealthChecker()
|
||
return _health_checker
|
||
|