pounce/backend/app/services/dns_zone_manager.py
Yves Gugger 800379b581
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: HTTP-based Yield routing (no DNS server required)
- Updated yield_dns.py to support A-record verification (simplest method!)
- A-record pointing to 46.235.147.194 is now the primary verification method
- Added Nginx catch-all config for yield domains
- DNS schema updated with method and actual_a fields
- CoreDNS installed but Port 53 blocked by hosting provider
2025-12-18 14:55:59 +01:00

256 lines
7.4 KiB
Python

"""
DNS Zone Manager for Pounce Yield.
Manages CoreDNS zone files for yield domains.
When a domain is activated for yield, we add it to the zone file.
When deactivated, we remove it.
"""
import logging
import os
import re
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Optional
from app.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
# CoreDNS zone file path
ZONE_FILE = Path("/opt/coredns/zones/db.yield")
SERVER_IP = "46.235.147.194"
def _get_serial() -> str:
"""Generate zone serial in YYYYMMDDNN format."""
today = datetime.utcnow().strftime("%Y%m%d")
# Read current serial and increment if same day
if ZONE_FILE.exists():
content = ZONE_FILE.read_text()
match = re.search(r"(\d{8})(\d{2})\s*;\s*Serial", content)
if match:
existing_date = match.group(1)
existing_nn = int(match.group(2))
if existing_date == today:
return f"{today}{(existing_nn + 1):02d}"
return f"{today}01"
def _generate_zone_file(domains: list[str]) -> str:
"""Generate the complete zone file content."""
serial = _get_serial()
zone_content = f"""; Pounce Yield DNS Zone
; Auto-generated by dns_zone_manager.py
; Last updated: {datetime.utcnow().isoformat()}Z
$TTL 300
$ORIGIN yield.pounce.ch.
@ IN SOA ns1.pounce.ch. admin.pounce.ch. (
{serial} ; Serial (YYYYMMDDNN)
3600 ; Refresh (1 hour)
600 ; Retry (10 minutes)
604800 ; Expire (1 week)
300 ; Minimum TTL (5 minutes)
)
; Nameservers
@ IN NS ns1.pounce.ch.
@ IN NS ns2.pounce.ch.
; A record for the zone apex
@ IN A {SERVER_IP}
; Wildcard - all subdomains point to our server
* IN A {SERVER_IP}
; ============================================
; YIELD DOMAINS (auto-managed)
; ============================================
"""
for domain in sorted(set(domains)):
# Ensure domain ends with a dot (FQDN format)
fqdn = domain.lower().strip()
if not fqdn.endswith("."):
fqdn += "."
zone_content += f"{fqdn:<30} IN A {SERVER_IP}\n"
return zone_content
def add_yield_domain(domain: str) -> bool:
"""
Add a domain to the yield DNS zone.
Returns True if successful, False otherwise.
"""
if not ZONE_FILE.exists():
logger.warning(f"Zone file not found: {ZONE_FILE}. CoreDNS may not be installed.")
return False
try:
# Read current domains from zone file
content = ZONE_FILE.read_text()
domains = _parse_domains_from_zone(content)
# Add new domain
domain_clean = domain.lower().strip().rstrip(".")
if domain_clean not in domains:
domains.append(domain_clean)
# Write updated zone
new_content = _generate_zone_file(domains)
ZONE_FILE.write_text(new_content)
# Reload CoreDNS
_reload_coredns()
logger.info(f"Added yield domain to DNS: {domain_clean}")
else:
logger.info(f"Domain already in DNS zone: {domain_clean}")
return True
except Exception as e:
logger.error(f"Failed to add domain {domain} to DNS: {e}")
return False
def remove_yield_domain(domain: str) -> bool:
"""
Remove a domain from the yield DNS zone.
Returns True if successful, False otherwise.
"""
if not ZONE_FILE.exists():
logger.warning(f"Zone file not found: {ZONE_FILE}")
return False
try:
content = ZONE_FILE.read_text()
domains = _parse_domains_from_zone(content)
domain_clean = domain.lower().strip().rstrip(".")
if domain_clean in domains:
domains.remove(domain_clean)
new_content = _generate_zone_file(domains)
ZONE_FILE.write_text(new_content)
_reload_coredns()
logger.info(f"Removed yield domain from DNS: {domain_clean}")
return True
except Exception as e:
logger.error(f"Failed to remove domain {domain} from DNS: {e}")
return False
def sync_yield_domains(domains: list[str]) -> bool:
"""
Sync all yield domains to the DNS zone.
Replaces all existing yield domain entries with the provided list.
"""
if not ZONE_FILE.parent.exists():
logger.warning(f"Zone directory not found: {ZONE_FILE.parent}")
return False
try:
clean_domains = [d.lower().strip().rstrip(".") for d in domains]
new_content = _generate_zone_file(clean_domains)
# Ensure directory exists
ZONE_FILE.parent.mkdir(parents=True, exist_ok=True)
ZONE_FILE.write_text(new_content)
_reload_coredns()
logger.info(f"Synced {len(clean_domains)} yield domains to DNS")
return True
except Exception as e:
logger.error(f"Failed to sync yield domains: {e}")
return False
def _parse_domains_from_zone(content: str) -> list[str]:
"""Extract yield domain names from zone file content."""
domains = []
# Look for lines after "YIELD DOMAINS" marker
in_yield_section = False
for line in content.split("\n"):
if "YIELD DOMAINS" in line:
in_yield_section = True
continue
if in_yield_section and line.strip() and not line.strip().startswith(";"):
# Parse: domain.tld. IN A IP
match = re.match(r"^([a-z0-9.-]+)\.\s+IN\s+A\s+", line, re.IGNORECASE)
if match:
domain = match.group(1).rstrip(".")
# Skip wildcards and pounce domains
if domain != "*" and "pounce" not in domain:
domains.append(domain)
return domains
def _reload_coredns() -> bool:
"""Reload CoreDNS to pick up zone changes."""
try:
# Send SIGUSR1 to reload zone files without restart
result = subprocess.run(
["systemctl", "reload", "coredns"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
# Try restart as fallback
subprocess.run(
["systemctl", "restart", "coredns"],
capture_output=True,
text=True,
timeout=30
)
return True
except Exception as e:
logger.warning(f"Could not reload CoreDNS: {e}")
return False
def check_coredns_status() -> dict:
"""Check if CoreDNS is running and healthy."""
status = {
"installed": ZONE_FILE.parent.exists(),
"running": False,
"zone_file_exists": ZONE_FILE.exists(),
"domain_count": 0,
}
try:
result = subprocess.run(
["systemctl", "is-active", "coredns"],
capture_output=True,
text=True,
timeout=5
)
status["running"] = result.stdout.strip() == "active"
except Exception:
pass
if status["zone_file_exists"]:
content = ZONE_FILE.read_text()
status["domain_count"] = len(_parse_domains_from_zone(content))
return status