""" DNS Zone Manager for Pounce Yield. Manages CoreDNS zone files for yield domains. When a domain is activated for yield, we add it to the zone file. When deactivated, we remove it. """ import logging import os import re import subprocess from datetime import datetime from pathlib import Path from typing import Optional from app.config import get_settings logger = logging.getLogger(__name__) settings = get_settings() # CoreDNS zone file path ZONE_FILE = Path("/opt/coredns/zones/db.yield") SERVER_IP = "46.235.147.194" def _get_serial() -> str: """Generate zone serial in YYYYMMDDNN format.""" today = datetime.utcnow().strftime("%Y%m%d") # Read current serial and increment if same day if ZONE_FILE.exists(): content = ZONE_FILE.read_text() match = re.search(r"(\d{8})(\d{2})\s*;\s*Serial", content) if match: existing_date = match.group(1) existing_nn = int(match.group(2)) if existing_date == today: return f"{today}{(existing_nn + 1):02d}" return f"{today}01" def _generate_zone_file(domains: list[str]) -> str: """Generate the complete zone file content.""" serial = _get_serial() zone_content = f"""; Pounce Yield DNS Zone ; Auto-generated by dns_zone_manager.py ; Last updated: {datetime.utcnow().isoformat()}Z $TTL 300 $ORIGIN yield.pounce.ch. @ IN SOA ns1.pounce.ch. admin.pounce.ch. ( {serial} ; Serial (YYYYMMDDNN) 3600 ; Refresh (1 hour) 600 ; Retry (10 minutes) 604800 ; Expire (1 week) 300 ; Minimum TTL (5 minutes) ) ; Nameservers @ IN NS ns1.pounce.ch. @ IN NS ns2.pounce.ch. ; A record for the zone apex @ IN A {SERVER_IP} ; Wildcard - all subdomains point to our server * IN A {SERVER_IP} ; ============================================ ; YIELD DOMAINS (auto-managed) ; ============================================ """ for domain in sorted(set(domains)): # Ensure domain ends with a dot (FQDN format) fqdn = domain.lower().strip() if not fqdn.endswith("."): fqdn += "." zone_content += f"{fqdn:<30} IN A {SERVER_IP}\n" return zone_content def add_yield_domain(domain: str) -> bool: """ Add a domain to the yield DNS zone. Returns True if successful, False otherwise. """ if not ZONE_FILE.exists(): logger.warning(f"Zone file not found: {ZONE_FILE}. CoreDNS may not be installed.") return False try: # Read current domains from zone file content = ZONE_FILE.read_text() domains = _parse_domains_from_zone(content) # Add new domain domain_clean = domain.lower().strip().rstrip(".") if domain_clean not in domains: domains.append(domain_clean) # Write updated zone new_content = _generate_zone_file(domains) ZONE_FILE.write_text(new_content) # Reload CoreDNS _reload_coredns() logger.info(f"Added yield domain to DNS: {domain_clean}") else: logger.info(f"Domain already in DNS zone: {domain_clean}") return True except Exception as e: logger.error(f"Failed to add domain {domain} to DNS: {e}") return False def remove_yield_domain(domain: str) -> bool: """ Remove a domain from the yield DNS zone. Returns True if successful, False otherwise. """ if not ZONE_FILE.exists(): logger.warning(f"Zone file not found: {ZONE_FILE}") return False try: content = ZONE_FILE.read_text() domains = _parse_domains_from_zone(content) domain_clean = domain.lower().strip().rstrip(".") if domain_clean in domains: domains.remove(domain_clean) new_content = _generate_zone_file(domains) ZONE_FILE.write_text(new_content) _reload_coredns() logger.info(f"Removed yield domain from DNS: {domain_clean}") return True except Exception as e: logger.error(f"Failed to remove domain {domain} from DNS: {e}") return False def sync_yield_domains(domains: list[str]) -> bool: """ Sync all yield domains to the DNS zone. Replaces all existing yield domain entries with the provided list. """ if not ZONE_FILE.parent.exists(): logger.warning(f"Zone directory not found: {ZONE_FILE.parent}") return False try: clean_domains = [d.lower().strip().rstrip(".") for d in domains] new_content = _generate_zone_file(clean_domains) # Ensure directory exists ZONE_FILE.parent.mkdir(parents=True, exist_ok=True) ZONE_FILE.write_text(new_content) _reload_coredns() logger.info(f"Synced {len(clean_domains)} yield domains to DNS") return True except Exception as e: logger.error(f"Failed to sync yield domains: {e}") return False def _parse_domains_from_zone(content: str) -> list[str]: """Extract yield domain names from zone file content.""" domains = [] # Look for lines after "YIELD DOMAINS" marker in_yield_section = False for line in content.split("\n"): if "YIELD DOMAINS" in line: in_yield_section = True continue if in_yield_section and line.strip() and not line.strip().startswith(";"): # Parse: domain.tld. IN A IP match = re.match(r"^([a-z0-9.-]+)\.\s+IN\s+A\s+", line, re.IGNORECASE) if match: domain = match.group(1).rstrip(".") # Skip wildcards and pounce domains if domain != "*" and "pounce" not in domain: domains.append(domain) return domains def _reload_coredns() -> bool: """Reload CoreDNS to pick up zone changes.""" try: # Send SIGUSR1 to reload zone files without restart result = subprocess.run( ["systemctl", "reload", "coredns"], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: # Try restart as fallback subprocess.run( ["systemctl", "restart", "coredns"], capture_output=True, text=True, timeout=30 ) return True except Exception as e: logger.warning(f"Could not reload CoreDNS: {e}") return False def check_coredns_status() -> dict: """Check if CoreDNS is running and healthy.""" status = { "installed": ZONE_FILE.parent.exists(), "running": False, "zone_file_exists": ZONE_FILE.exists(), "domain_count": 0, } try: result = subprocess.run( ["systemctl", "is-active", "coredns"], capture_output=True, text=True, timeout=5 ) status["running"] = result.stdout.strip() == "active" except Exception: pass if status["zone_file_exists"]: content = ZONE_FILE.read_text() status["domain_count"] = len(_parse_domains_from_zone(content)) return status