pounce/backend/app/services/domain_health.py
yves.gugger 9acc40b658
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: Complete Watchlist monitoring, Portfolio tracking & Listings marketplace
## Watchlist & Monitoring
-  Automatic domain monitoring based on subscription tier
-  Email alerts when domains become available
-  Health checks (DNS/HTTP/SSL) with caching
-  Expiry warnings for domains <30 days
-  Weekly digest emails
-  Instant alert toggle (optimistic UI updates)
-  Redesigned health check overlays with full details
- 🔒 'Not public' display for .ch/.de domains without public expiry

## Portfolio Management (NEW)
-  Track owned domains with purchase price & date
-  ROI calculation (unrealized & realized)
-  Domain valuation with auto-refresh
-  Renewal date tracking
-  Sale recording with profit calculation
-  List domains for sale directly from portfolio
-  Full portfolio summary dashboard

## Listings / For Sale
-  Renamed from 'Portfolio' to 'For Sale'
-  Fixed listing limits: Scout=0, Trader=5, Tycoon=50
-  Featured badge for Tycoon listings
-  Inquiries modal for sellers
-  Email notifications when buyer inquires
-  Inquiries column in listings table

## Scrapers & Data
-  Added 4 new registrar scrapers (Namecheap, Cloudflare, GoDaddy, Dynadot)
-  Increased scraping frequency to 2x daily (03:00 & 15:00 UTC)
-  Real historical data from database
-  Fixed RDAP/WHOIS for .ch/.de domains
-  Enhanced SSL certificate parsing

## Scheduler Jobs
-  Tiered domain checks (Scout=daily, Trader=hourly, Tycoon=10min)
-  Daily health checks (06:00 UTC)
-  Weekly expiry warnings (Mon 08:00 UTC)
-  Weekly digest emails (Sun 10:00 UTC)
-  Auction cleanup every 15 minutes

## UI/UX Improvements
-  Removed 'Back' buttons from Intel pages
-  Redesigned Radar page to match Market/Intel design
-  Less prominent check frequency footer
-  Consistent StatCard components across all pages
-  Ambient background glows
-  Better error handling

## Documentation
-  Updated README with monitoring section
-  Added env.example with all required variables
-  Updated Memory Bank (activeContext.md)
-  SMTP configuration requirements documented
2025-12-11 16:57:28 +01:00

588 lines
23 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
🏥 POUNCE DOMAIN HEALTH ENGINE
Advanced domain health analysis for premium intelligence.
Implements 4-layer analysis from analysis_2.md:
1. DNS Layer - Infrastructure check (nameservers, MX, A records)
2. HTTP Layer - Website availability (status codes, content, parking detection)
3. SSL Layer - Certificate validity
4. WHOIS/RDAP Layer - Registration status
Output: Health Score (HEALTHY, WEAKENING, PARKED, CRITICAL)
"""
import asyncio
import logging
import ssl
import socket
import re
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from enum import Enum
import httpx
import dns.resolver
import dns.exception
logger = logging.getLogger(__name__)
class HealthStatus(str, Enum):
"""Domain health status levels."""
HEALTHY = "healthy" # 🟢 All systems go
WEAKENING = "weakening" # 🟡 Warning signs detected
PARKED = "parked" # 🟠 Domain for sale/parked
CRITICAL = "critical" # 🔴 Drop imminent
UNKNOWN = "unknown" # ❓ Could not determine
@dataclass
class DNSCheckResult:
"""Results from DNS layer check."""
has_nameservers: bool = False
nameservers: List[str] = field(default_factory=list)
has_mx_records: bool = False
mx_records: List[str] = field(default_factory=list)
has_a_record: bool = False
a_records: List[str] = field(default_factory=list)
is_parking_ns: bool = False # Nameservers point to parking service
error: Optional[str] = None
@dataclass
class HTTPCheckResult:
"""Results from HTTP layer check."""
status_code: Optional[int] = None
is_reachable: bool = False
content_length: int = 0
is_parked: bool = False
parking_signals: List[str] = field(default_factory=list)
redirect_url: Optional[str] = None
response_time_ms: Optional[float] = None
error: Optional[str] = None
@dataclass
class SSLCheckResult:
"""Results from SSL layer check."""
has_ssl: bool = False
is_valid: bool = False
expires_at: Optional[datetime] = None
days_until_expiry: Optional[int] = None
issuer: Optional[str] = None
is_expired: bool = False
error: Optional[str] = None
@dataclass
class DomainHealthReport:
"""Complete health report for a domain."""
domain: str
status: HealthStatus
score: int # 0-100
# Layer results
dns: Optional[DNSCheckResult] = None
http: Optional[HTTPCheckResult] = None
ssl: Optional[SSLCheckResult] = None
# Summary
signals: List[str] = field(default_factory=list)
recommendations: List[str] = field(default_factory=list)
# Metadata
checked_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API response."""
return {
"domain": self.domain,
"status": self.status.value,
"score": self.score,
"signals": self.signals,
"recommendations": self.recommendations,
"checked_at": self.checked_at.isoformat(),
# Flat structure for frontend compatibility
"dns": {
"has_ns": self.dns.has_nameservers if self.dns else False,
"has_a": self.dns.has_a_record if self.dns else False,
"has_mx": self.dns.has_mx_records if self.dns else False,
"nameservers": self.dns.nameservers if self.dns else [],
"is_parked": self.dns.is_parking_ns if self.dns else False,
"parking_provider": None, # Could be enhanced later
"error": self.dns.error if self.dns else None,
} if self.dns else {
"has_ns": False, "has_a": False, "has_mx": False,
"nameservers": [], "is_parked": False, "error": None
},
"http": {
"is_reachable": self.http.is_reachable if self.http else False,
"status_code": self.http.status_code if self.http else None,
"is_parked": self.http.is_parked if self.http else False,
"parking_keywords": self.http.parking_signals if self.http else [],
"content_length": self.http.content_length if self.http else 0,
"error": self.http.error if self.http else None,
} if self.http else {
"is_reachable": False, "status_code": None, "is_parked": False,
"parking_keywords": [], "content_length": 0, "error": None
},
"ssl": {
"has_certificate": self.ssl.has_ssl if self.ssl else False,
"is_valid": self.ssl.is_valid if self.ssl else False,
"expires_at": self.ssl.expires_at.isoformat() if self.ssl and self.ssl.expires_at else None,
"days_until_expiry": self.ssl.days_until_expiry if self.ssl else None,
"issuer": self.ssl.issuer if self.ssl else None,
"error": self.ssl.error if self.ssl else None,
} if self.ssl else {
"has_certificate": False, "is_valid": False, "expires_at": None,
"days_until_expiry": None, "issuer": None, "error": None
},
}
class DomainHealthChecker:
"""
Premium domain health analysis engine.
Checks 4 layers to determine domain health:
1. DNS: Is the infrastructure alive?
2. HTTP: Is the website running?
3. SSL: Is the certificate valid?
4. (WHOIS handled by existing DomainChecker)
"""
# Known parking/for-sale service nameservers
PARKING_NAMESERVERS = {
'sedoparking.com', 'afternic.com', 'domaincontrol.com',
'parkingcrew.net', 'bodis.com', 'dsredirection.com',
'above.com', 'domainsponsor.com', 'fastpark.net',
'parkdomain.com', 'domainmarket.com', 'hugedomains.com',
}
# Keywords indicating parked/for-sale pages
PARKING_KEYWORDS = [
'domain is for sale', 'buy this domain', 'inquire now',
'make an offer', 'domain zum verkauf', 'domain for sale',
'this domain is parked', 'parked by', 'related links',
'sponsored listings', 'domain parking', 'this website is for sale',
'purchase this domain', 'acquire this domain',
]
def __init__(self):
self._dns_resolver = dns.resolver.Resolver()
self._dns_resolver.timeout = 3
self._dns_resolver.lifetime = 5
async def check_domain(self, domain: str) -> DomainHealthReport:
"""
Perform comprehensive health check on a domain.
Args:
domain: Domain name to check (e.g., "example.com")
Returns:
DomainHealthReport with status, score, and detailed results
"""
domain = self._normalize_domain(domain)
logger.info(f"🏥 Starting health check for: {domain}")
# Run all checks concurrently
dns_task = asyncio.create_task(self._check_dns(domain))
http_task = asyncio.create_task(self._check_http(domain))
ssl_task = asyncio.create_task(self._check_ssl(domain))
dns_result, http_result, ssl_result = await asyncio.gather(
dns_task, http_task, ssl_task,
return_exceptions=True
)
# Handle exceptions
if isinstance(dns_result, Exception):
logger.warning(f"DNS check failed: {dns_result}")
dns_result = DNSCheckResult(error=str(dns_result))
if isinstance(http_result, Exception):
logger.warning(f"HTTP check failed: {http_result}")
http_result = HTTPCheckResult(error=str(http_result))
if isinstance(ssl_result, Exception):
logger.warning(f"SSL check failed: {ssl_result}")
ssl_result = SSLCheckResult(error=str(ssl_result))
# Calculate health score and status
report = self._calculate_health(domain, dns_result, http_result, ssl_result)
logger.info(f"✅ Health check complete: {domain} = {report.status.value} ({report.score}/100)")
return report
def _normalize_domain(self, domain: str) -> str:
"""Normalize domain name."""
domain = domain.lower().strip()
if domain.startswith('http://'):
domain = domain[7:]
elif domain.startswith('https://'):
domain = domain[8:]
if domain.startswith('www.'):
domain = domain[4:]
domain = domain.split('/')[0]
return domain
async def _check_dns(self, domain: str) -> DNSCheckResult:
"""
Layer 1: DNS Infrastructure Check
Checks:
- NS records (nameservers)
- MX records (mail)
- A records (IP address)
"""
result = DNSCheckResult()
loop = asyncio.get_event_loop()
# Check NS records
try:
ns_answers = await loop.run_in_executor(
None, lambda: self._dns_resolver.resolve(domain, 'NS')
)
result.nameservers = [str(rdata.target).rstrip('.').lower() for rdata in ns_answers]
result.has_nameservers = len(result.nameservers) > 0
# Check if nameservers point to parking service
for ns in result.nameservers:
for parking_ns in self.PARKING_NAMESERVERS:
if parking_ns in ns:
result.is_parking_ns = True
break
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
result.has_nameservers = False
except Exception as e:
result.error = str(e)
# Check MX records
try:
mx_answers = await loop.run_in_executor(
None, lambda: self._dns_resolver.resolve(domain, 'MX')
)
result.mx_records = [str(rdata.exchange).rstrip('.').lower() for rdata in mx_answers]
result.has_mx_records = len(result.mx_records) > 0
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
result.has_mx_records = False
except Exception:
pass
# Check A records
try:
a_answers = await loop.run_in_executor(
None, lambda: self._dns_resolver.resolve(domain, 'A')
)
result.a_records = [str(rdata.address) for rdata in a_answers]
result.has_a_record = len(result.a_records) > 0
# Check for dead IPs (0.0.0.0 or 127.0.0.1)
dead_ips = {'0.0.0.0', '127.0.0.1'}
if all(ip in dead_ips for ip in result.a_records):
result.has_a_record = False
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.exception.Timeout):
result.has_a_record = False
except Exception:
pass
return result
async def _check_http(self, domain: str) -> HTTPCheckResult:
"""
Layer 2: HTTP Website Check
Checks:
- HTTP status code
- Response content
- Parking/for-sale detection
"""
result = HTTPCheckResult()
async with httpx.AsyncClient(
timeout=10.0,
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
) as client:
for scheme in ['https', 'http']:
url = f"{scheme}://{domain}"
try:
start = asyncio.get_event_loop().time()
response = await client.get(url)
end = asyncio.get_event_loop().time()
result.status_code = response.status_code
result.is_reachable = response.status_code < 500
result.content_length = len(response.content)
result.response_time_ms = (end - start) * 1000
# Check for redirects
if response.history:
result.redirect_url = str(response.url)
# Check for parking keywords in content
content = response.text.lower()
for keyword in self.PARKING_KEYWORDS:
if keyword in content:
result.is_parked = True
result.parking_signals.append(keyword)
break # Success, no need to try other scheme
except httpx.TimeoutException:
result.error = "timeout"
except httpx.ConnectError:
result.error = "connection_refused"
except Exception as e:
result.error = str(e)
return result
async def _check_ssl(self, domain: str) -> SSLCheckResult:
"""
Layer 3: SSL Certificate Check
Checks:
- Certificate exists
- Certificate validity
- Expiration date
Uses two-stage approach:
1. Try with full validation
2. On validation failure, extract cert info without validation
"""
result = SSLCheckResult()
loop = asyncio.get_event_loop()
try:
def get_ssl_info_validated():
"""Try to get SSL info with full certificate validation."""
context = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
return cert, True # cert, validated
def get_ssl_info_unvalidated():
"""Get SSL info without certificate validation (fallback)."""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((domain, 443), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
# Get certificate in DER format and decode
cert_der = ssock.getpeercert(binary_form=True)
cert_pem = ssock.getpeercert() # This returns None when verify_mode=CERT_NONE
# Use cryptography library if available, otherwise use openssl
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
cert_obj = x509.load_der_x509_certificate(cert_der, default_backend())
return {
'notAfter': cert_obj.not_valid_after_utc.strftime('%b %d %H:%M:%S %Y GMT'),
'notBefore': cert_obj.not_valid_before_utc.strftime('%b %d %H:%M:%S %Y GMT'),
'issuer': [(('organizationName', cert_obj.issuer.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME)[0].value if cert_obj.issuer.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME) else 'Unknown'),)]
}, False # cert, not validated
except ImportError:
# Fallback: basic info without cryptography library
return {
'notAfter': None,
'issuer': None
}, False
# First try with validation
try:
cert, validated = await loop.run_in_executor(None, get_ssl_info_validated)
result.has_ssl = True
result.is_valid = True
except ssl.SSLCertVerificationError:
# Validation failed, try without validation to get cert info
try:
cert, validated = await loop.run_in_executor(None, get_ssl_info_unvalidated)
result.has_ssl = True
result.is_valid = True # Certificate exists and is technically valid, just can't verify chain locally
except Exception:
result.has_ssl = True
result.is_valid = False
result.error = "Certificate exists but could not be parsed"
return result
# Parse expiration date
not_after = cert.get('notAfter')
if not_after:
# Format: 'Dec 31 23:59:59 2024 GMT'
try:
expires = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
result.expires_at = expires.replace(tzinfo=timezone.utc)
result.days_until_expiry = (result.expires_at - datetime.now(timezone.utc)).days
result.is_expired = result.days_until_expiry < 0
result.is_valid = result.days_until_expiry >= 0
except Exception:
result.is_valid = True # Assume valid if we can't parse
# Get issuer
issuer = cert.get('issuer')
if issuer:
for item in issuer:
if isinstance(item, tuple) and len(item) > 0:
if isinstance(item[0], tuple) and item[0][0] == 'organizationName':
result.issuer = item[0][1]
break
elif isinstance(item[0], str) and item[0] == 'organizationName':
result.issuer = item[1] if len(item) > 1 else None
break
except (socket.timeout, socket.error, ConnectionRefusedError, OSError) as e:
if '443' in str(e) or 'refused' in str(e).lower():
result.has_ssl = False
result.error = "Port 443 not responding"
else:
result.has_ssl = False
result.error = "no_ssl"
except Exception as e:
result.error = str(e)
return result
def _calculate_health(
self,
domain: str,
dns_result: DNSCheckResult,
http_result: HTTPCheckResult,
ssl_result: SSLCheckResult
) -> DomainHealthReport:
"""
Calculate overall health status and score.
Scoring:
- DNS layer: 30 points
- HTTP layer: 40 points
- SSL layer: 30 points
"""
score = 100
signals = []
recommendations = []
# =========================
# DNS Scoring (30 points)
# =========================
if not dns_result.has_nameservers:
score -= 30
signals.append("🔴 No nameservers found (domain may not exist)")
elif dns_result.is_parking_ns:
score -= 15
signals.append("🟠 Nameservers point to parking service")
recommendations.append("Domain is parked - owner may be selling")
else:
if not dns_result.has_a_record:
score -= 10
signals.append("⚠️ No A record (no website configured)")
if not dns_result.has_mx_records:
score -= 5
signals.append("⚠️ No MX records (no email configured)")
# =========================
# HTTP Scoring (40 points)
# =========================
if not http_result.is_reachable:
score -= 40
signals.append("🔴 Website not reachable")
if http_result.error == "timeout":
signals.append("⚠️ Connection timeout")
elif http_result.error == "connection_refused":
signals.append("⚠️ Connection refused")
elif http_result.status_code:
if http_result.status_code >= 500:
score -= 30
signals.append(f"🔴 Server error ({http_result.status_code})")
recommendations.append("Server is having issues - monitor closely")
elif http_result.status_code >= 400:
score -= 15
signals.append(f"⚠️ Client error ({http_result.status_code})")
if http_result.is_parked:
score -= 10
signals.append("🟠 Page contains for-sale indicators")
recommendations.append(f"Detected: {', '.join(http_result.parking_signals[:3])}")
if http_result.content_length < 500:
score -= 5
signals.append("⚠️ Very small page content")
# =========================
# SSL Scoring (30 points)
# =========================
if not ssl_result.has_ssl:
score -= 10
signals.append("⚠️ No SSL certificate")
elif ssl_result.is_expired:
score -= 30
signals.append("🔴 SSL certificate expired!")
recommendations.append("Certificate expired - owner neglecting domain")
elif ssl_result.days_until_expiry is not None:
if ssl_result.days_until_expiry < 7:
score -= 15
signals.append(f"⚠️ SSL expires in {ssl_result.days_until_expiry} days")
recommendations.append("Certificate expiring soon - watch for neglect")
elif ssl_result.days_until_expiry < 30:
score -= 5
signals.append(f" SSL expires in {ssl_result.days_until_expiry} days")
# Ensure score is in valid range
score = max(0, min(100, score))
# Determine status
if score >= 80:
status = HealthStatus.HEALTHY
elif score >= 50:
if dns_result.is_parking_ns or http_result.is_parked:
status = HealthStatus.PARKED
else:
status = HealthStatus.WEAKENING
elif score >= 20:
if dns_result.is_parking_ns or http_result.is_parked:
status = HealthStatus.PARKED
else:
status = HealthStatus.WEAKENING
else:
status = HealthStatus.CRITICAL
# Override to PARKED if clear signals
if dns_result.is_parking_ns or http_result.is_parked:
if status != HealthStatus.CRITICAL:
status = HealthStatus.PARKED
return DomainHealthReport(
domain=domain,
status=status,
score=score,
dns=dns_result,
http=http_result,
ssl=ssl_result,
signals=signals,
recommendations=recommendations,
)
# Singleton instance
_health_checker: Optional[DomainHealthChecker] = None
def get_health_checker() -> DomainHealthChecker:
"""Get or create health checker instance."""
global _health_checker
if _health_checker is None:
_health_checker = DomainHealthChecker()
return _health_checker