pounce/backend/app/services/yield_dns.py
Yves Gugger 5d81f8d71e
Some checks are pending
CI / Frontend Lint & Type Check (push) Waiting to run
CI / Frontend Build (push) Blocked by required conditions
CI / Backend Lint (push) Waiting to run
CI / Backend Tests (push) Blocked by required conditions
CI / Docker Build (push) Blocked by required conditions
CI / Security Scan (push) Waiting to run
Deploy / Build & Push Images (push) Waiting to run
Deploy / Deploy to Server (push) Blocked by required conditions
Deploy / Notify (push) Blocked by required conditions
fix(yield): Check multiple DNS servers to handle propagation delays
2025-12-18 16:08:36 +01:00

195 lines
5.6 KiB
Python

"""
Yield DNS verification helpers.
Production-grade DNS checks for the Yield Connect flow:
- Option A: A-record pointing directly to our server IP (simplest!)
- Option B: CNAME/ALIAS to yield.pounce.ch
- Option C: Nameserver delegation to our nameservers (requires DNS server)
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import dns.resolver
# Our server IP - domains with A-record pointing here are verified
YIELD_SERVER_IP = "46.235.147.194"
@dataclass(frozen=True)
class YieldDNSCheckResult:
verified: bool
method: Optional[str] # "a_record" | "cname" | "nameserver" | None
actual_ns: list[str]
actual_a: list[str]
cname_ok: bool
error: Optional[str]
def _resolver(nameserver: str | None = None) -> dns.resolver.Resolver:
"""Create a DNS resolver, optionally using a specific nameserver."""
r = dns.resolver.Resolver()
if nameserver:
r.nameservers = [nameserver]
r.timeout = 3
r.lifetime = 5
return r
# Multiple public DNS servers to check (for propagation consistency)
PUBLIC_DNS_SERVERS = ['8.8.8.8', '8.8.4.4', '9.9.9.9', '1.1.1.1']
def _normalize_host(host: str) -> str:
return host.rstrip(".").lower().strip()
def _resolve_ns(domain: str) -> list[str]:
r = _resolver()
answers = r.resolve(domain, "NS")
return sorted({_normalize_host(str(rr.target)) for rr in answers})
def _resolve_cname(domain: str) -> list[str]:
r = _resolver()
answers = r.resolve(domain, "CNAME")
return sorted({_normalize_host(str(rr.target)) for rr in answers})
def _resolve_a(host: str) -> list[str]:
r = _resolver()
answers = r.resolve(host, "A")
return sorted({str(rr) for rr in answers})
def _resolve_a_from_multiple_dns(host: str) -> set[str]:
"""
Resolve A records from multiple public DNS servers.
Returns the union of all IPs found (handles propagation delays).
"""
all_ips: set[str] = set()
for ns in PUBLIC_DNS_SERVERS:
try:
r = _resolver(ns)
answers = r.resolve(host, "A")
for rr in answers:
all_ips.add(str(rr))
except Exception:
continue
return all_ips
def verify_yield_dns(domain: str, expected_nameservers: list[str], cname_target: str) -> YieldDNSCheckResult:
"""
Verify that a domain is connected for Yield.
We accept (in order of simplicity):
1. A-record pointing directly to our server IP (46.235.147.194)
2. CNAME/ALIAS to `cname_target` (yield.pounce.ch)
3. Nameserver delegation (NS contains all expected nameservers)
"""
domain = _normalize_host(domain)
expected_ns = sorted({_normalize_host(ns) for ns in expected_nameservers if ns})
target = _normalize_host(cname_target)
actual_ns: list[str] = []
actual_a: list[str] = []
if not domain:
return YieldDNSCheckResult(
verified=False,
method=None,
actual_ns=[],
actual_a=[],
cname_ok=False,
error="Domain is empty",
)
# Option A: Direct A-record to our server IP (SIMPLEST!)
# Check multiple DNS servers to handle propagation delays
try:
all_ips = _resolve_a_from_multiple_dns(domain)
actual_a = sorted(all_ips)
if YIELD_SERVER_IP in all_ips:
return YieldDNSCheckResult(
verified=True,
method="a_record",
actual_ns=[],
actual_a=actual_a,
cname_ok=False,
error=None,
)
except Exception as e:
return YieldDNSCheckResult(
verified=False,
method=None,
actual_ns=[],
actual_a=[],
cname_ok=False,
error=str(e),
)
# Option B: CNAME to yield.pounce.ch
if target:
try:
cnames = _resolve_cname(domain)
if any(c == target for c in cnames):
return YieldDNSCheckResult(
verified=True,
method="cname",
actual_ns=[],
actual_a=actual_a,
cname_ok=True,
error=None,
)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
except Exception:
pass
# Also check if A-records match target's A-records (ALIAS/ANAME flattening)
try:
target_as = set(_resolve_a(target))
if target_as and actual_a and set(actual_a).issubset(target_as):
return YieldDNSCheckResult(
verified=True,
method="cname",
actual_ns=[],
actual_a=actual_a,
cname_ok=True,
error=None,
)
except Exception:
pass
# Option C: NS delegation (requires DNS server on Port 53)
if expected_ns:
try:
actual_ns = _resolve_ns(domain)
if set(expected_ns).issubset(set(actual_ns)):
return YieldDNSCheckResult(
verified=True,
method="nameserver",
actual_ns=actual_ns,
actual_a=actual_a,
cname_ok=False,
error=None,
)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
except Exception:
pass
# Not verified
return YieldDNSCheckResult(
verified=False,
method=None,
actual_ns=actual_ns,
actual_a=actual_a,
cname_ok=False,
error=None,
)