perf: Harden zone sync + scheduler concurrency
This commit is contained in:
@ -50,6 +50,8 @@ jobs:
|
|||||||
GH_OAUTH_SECRET: ${{ secrets.GH_OAUTH_SECRET }}
|
GH_OAUTH_SECRET: ${{ secrets.GH_OAUTH_SECRET }}
|
||||||
CZDS_USERNAME: ${{ secrets.CZDS_USERNAME }}
|
CZDS_USERNAME: ${{ secrets.CZDS_USERNAME }}
|
||||||
CZDS_PASSWORD: ${{ secrets.CZDS_PASSWORD }}
|
CZDS_PASSWORD: ${{ secrets.CZDS_PASSWORD }}
|
||||||
|
SWITCH_TSIG_CH_SECRET: ${{ secrets.SWITCH_TSIG_CH_SECRET }}
|
||||||
|
SWITCH_TSIG_LI_SECRET: ${{ secrets.SWITCH_TSIG_LI_SECRET }}
|
||||||
run: |
|
run: |
|
||||||
python3 - <<'PY'
|
python3 - <<'PY'
|
||||||
import os
|
import os
|
||||||
@ -110,6 +112,10 @@ jobs:
|
|||||||
# CZDS
|
# CZDS
|
||||||
"CZDS_USERNAME": os.environ["CZDS_USERNAME"],
|
"CZDS_USERNAME": os.environ["CZDS_USERNAME"],
|
||||||
"CZDS_PASSWORD": os.environ["CZDS_PASSWORD"],
|
"CZDS_PASSWORD": os.environ["CZDS_PASSWORD"],
|
||||||
|
|
||||||
|
# Switch TSIG (AXFR)
|
||||||
|
"SWITCH_TSIG_CH_SECRET": os.environ["SWITCH_TSIG_CH_SECRET"],
|
||||||
|
"SWITCH_TSIG_LI_SECRET": os.environ["SWITCH_TSIG_LI_SECRET"],
|
||||||
}
|
}
|
||||||
|
|
||||||
lines = []
|
lines = []
|
||||||
|
|||||||
@ -331,3 +331,5 @@ Empfehlungen:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -134,9 +134,23 @@ class Settings(BaseSettings):
|
|||||||
# Switch.ch Zone Files (.ch, .li)
|
# Switch.ch Zone Files (.ch, .li)
|
||||||
switch_data_dir: str = "/data/switch" # Persistent storage
|
switch_data_dir: str = "/data/switch" # Persistent storage
|
||||||
|
|
||||||
|
# Switch.ch TSIG (DNS AXFR) credentials
|
||||||
|
# These should be provided via environment variables in production.
|
||||||
|
switch_tsig_ch_name: str = "tsig-zonedata-ch-public-21-01"
|
||||||
|
switch_tsig_ch_algorithm: str = "hmac-sha512"
|
||||||
|
switch_tsig_ch_secret: str = ""
|
||||||
|
|
||||||
|
switch_tsig_li_name: str = "tsig-zonedata-li-public-21-01"
|
||||||
|
switch_tsig_li_algorithm: str = "hmac-sha512"
|
||||||
|
switch_tsig_li_secret: str = ""
|
||||||
|
|
||||||
# Zone File Retention (days to keep historical snapshots)
|
# Zone File Retention (days to keep historical snapshots)
|
||||||
zone_retention_days: int = 3
|
zone_retention_days: int = 3
|
||||||
|
|
||||||
|
# Domain check scheduler tuning (external I/O heavy; keep conservative defaults)
|
||||||
|
domain_check_max_concurrent: int = 3
|
||||||
|
domain_check_delay_seconds: float = 0.3
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
env_file = ".env"
|
env_file = ".env"
|
||||||
env_file_encoding = "utf-8"
|
env_file_encoding = "utf-8"
|
||||||
|
|||||||
@ -88,7 +88,6 @@ async def check_domains_by_frequency(frequency: str):
|
|||||||
tiers_for_frequency.append(tier)
|
tiers_for_frequency.append(tier)
|
||||||
|
|
||||||
# Get domains from users with matching subscription tier
|
# Get domains from users with matching subscription tier
|
||||||
from sqlalchemy.orm import joinedload
|
|
||||||
result = await db.execute(
|
result = await db.execute(
|
||||||
select(Domain)
|
select(Domain)
|
||||||
.join(User, Domain.user_id == User.id)
|
.join(User, Domain.user_id == User.id)
|
||||||
@ -108,10 +107,32 @@ async def check_domains_by_frequency(frequency: str):
|
|||||||
newly_taken = [] # Track domains that became taken
|
newly_taken = [] # Track domains that became taken
|
||||||
status_changes = [] # All status changes for logging
|
status_changes = [] # All status changes for logging
|
||||||
|
|
||||||
for domain in domains:
|
# Concurrency control + polite pacing (prevents RDAP/WHOIS bans)
|
||||||
|
max_concurrent = max(1, int(getattr(settings, "domain_check_max_concurrent", 3) or 3))
|
||||||
|
delay = float(getattr(settings, "domain_check_delay_seconds", 0.3) or 0.3)
|
||||||
|
semaphore = asyncio.Semaphore(max_concurrent)
|
||||||
|
|
||||||
|
async def _check_one(d: Domain) -> tuple[Domain, object | None, Exception | None]:
|
||||||
|
async with semaphore:
|
||||||
try:
|
try:
|
||||||
# Check domain availability
|
res = await domain_checker.check_domain(d.name)
|
||||||
check_result = await domain_checker.check_domain(domain.name)
|
# small delay after each external request
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
return d, res, None
|
||||||
|
except Exception as e:
|
||||||
|
return d, None, e
|
||||||
|
|
||||||
|
# Process in chunks to avoid huge gather lists
|
||||||
|
chunk_size = 200
|
||||||
|
for i in range(0, len(domains), chunk_size):
|
||||||
|
chunk = domains[i : i + chunk_size]
|
||||||
|
results = await asyncio.gather(*[_check_one(d) for d in chunk])
|
||||||
|
|
||||||
|
for domain, check_result, err in results:
|
||||||
|
if err is not None or check_result is None:
|
||||||
|
logger.error(f"Error checking domain {domain.name}: {err}")
|
||||||
|
errors += 1
|
||||||
|
continue
|
||||||
|
|
||||||
# Track status transitions
|
# Track status transitions
|
||||||
was_available = domain.is_available
|
was_available = domain.is_available
|
||||||
@ -119,27 +140,28 @@ async def check_domains_by_frequency(frequency: str):
|
|||||||
|
|
||||||
# Detect transition: taken -> available (domain dropped!)
|
# Detect transition: taken -> available (domain dropped!)
|
||||||
if not was_available and is_now_available:
|
if not was_available and is_now_available:
|
||||||
status_changes.append({
|
status_changes.append(
|
||||||
'domain': domain.name,
|
{
|
||||||
'change': 'became_available',
|
"domain": domain.name,
|
||||||
'old_registrar': domain.registrar,
|
"change": "became_available",
|
||||||
})
|
"old_registrar": domain.registrar,
|
||||||
|
}
|
||||||
|
)
|
||||||
if domain.notify_on_available:
|
if domain.notify_on_available:
|
||||||
newly_available.append(domain)
|
newly_available.append(domain)
|
||||||
logger.info(f"🎯 Domain AVAILABLE: {domain.name} (was registered by {domain.registrar})")
|
logger.info(f"🎯 Domain AVAILABLE: {domain.name} (was registered by {domain.registrar})")
|
||||||
|
|
||||||
# Detect transition: available -> taken (someone registered it!)
|
# Detect transition: available -> taken (someone registered it!)
|
||||||
elif was_available and not is_now_available:
|
elif was_available and not is_now_available:
|
||||||
status_changes.append({
|
status_changes.append(
|
||||||
'domain': domain.name,
|
{
|
||||||
'change': 'became_taken',
|
"domain": domain.name,
|
||||||
'new_registrar': check_result.registrar,
|
"change": "became_taken",
|
||||||
})
|
"new_registrar": check_result.registrar,
|
||||||
|
}
|
||||||
|
)
|
||||||
if domain.notify_on_available: # Notify if alerts are on
|
if domain.notify_on_available: # Notify if alerts are on
|
||||||
newly_taken.append({
|
newly_taken.append({"domain": domain, "registrar": check_result.registrar})
|
||||||
'domain': domain,
|
|
||||||
'registrar': check_result.registrar,
|
|
||||||
})
|
|
||||||
logger.info(f"⚠️ Domain TAKEN: {domain.name} (now registered by {check_result.registrar})")
|
logger.info(f"⚠️ Domain TAKEN: {domain.name} (now registered by {check_result.registrar})")
|
||||||
|
|
||||||
# Update domain with fresh data
|
# Update domain with fresh data
|
||||||
@ -158,16 +180,8 @@ async def check_domains_by_frequency(frequency: str):
|
|||||||
checked_at=datetime.utcnow(),
|
checked_at=datetime.utcnow(),
|
||||||
)
|
)
|
||||||
db.add(check)
|
db.add(check)
|
||||||
|
|
||||||
checked += 1
|
checked += 1
|
||||||
|
|
||||||
# Small delay to avoid rate limiting
|
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error checking domain {domain.name}: {e}")
|
|
||||||
errors += 1
|
|
||||||
|
|
||||||
await db.commit()
|
await db.commit()
|
||||||
|
|
||||||
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
||||||
|
|||||||
@ -17,28 +17,12 @@ from typing import Optional
|
|||||||
from sqlalchemy import select, func
|
from sqlalchemy import select, func
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
from app.models.zone_file import ZoneSnapshot, DroppedDomain
|
from app.models.zone_file import ZoneSnapshot, DroppedDomain
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
|
||||||
# TSIG KEYS (from Switch.ch documentation)
|
|
||||||
# ============================================================================
|
|
||||||
|
|
||||||
TSIG_KEYS = {
|
|
||||||
"ch": {
|
|
||||||
"name": "tsig-zonedata-ch-public-21-01",
|
|
||||||
"algorithm": "hmac-sha512",
|
|
||||||
"secret": "stZwEGApYumtXkh73qMLPqfbIDozWKZLkqRvcjKSpRnsor6A6MxixRL6C2HeSVBQNfMW4wer+qjS0ZSfiWiJ3Q=="
|
|
||||||
},
|
|
||||||
"li": {
|
|
||||||
"name": "tsig-zonedata-li-public-21-01",
|
|
||||||
"algorithm": "hmac-sha512",
|
|
||||||
"secret": "t8GgeCn+fhPaj+cRy1epox2Vj4hZ45ax6v3rQCkkfIQNg5fsxuU23QM5mzz+BxJ4kgF/jiQyBDBvL+XWPE6oCQ=="
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ZONE_SERVER = "zonedata.switch.ch"
|
ZONE_SERVER = "zonedata.switch.ch"
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
@ -49,18 +33,36 @@ class ZoneFileService:
|
|||||||
"""Service for fetching and analyzing zone files"""
|
"""Service for fetching and analyzing zone files"""
|
||||||
|
|
||||||
def __init__(self, data_dir: Optional[Path] = None):
|
def __init__(self, data_dir: Optional[Path] = None):
|
||||||
from app.config import get_settings
|
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
self.data_dir = data_dir or Path(settings.switch_data_dir)
|
self.data_dir = data_dir or Path(settings.switch_data_dir)
|
||||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._settings = settings
|
||||||
|
# Store daily snapshots for N days (premium reliability)
|
||||||
|
self.snapshots_dir = self.data_dir / "snapshots"
|
||||||
|
self.snapshots_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
def _get_tsig_config(self, tld: str) -> dict:
|
||||||
|
"""Resolve TSIG config from settings/env (no secrets in git)."""
|
||||||
|
if tld == "ch":
|
||||||
|
return {
|
||||||
|
"name": self._settings.switch_tsig_ch_name,
|
||||||
|
"algorithm": self._settings.switch_tsig_ch_algorithm,
|
||||||
|
"secret": self._settings.switch_tsig_ch_secret,
|
||||||
|
}
|
||||||
|
if tld == "li":
|
||||||
|
return {
|
||||||
|
"name": self._settings.switch_tsig_li_name,
|
||||||
|
"algorithm": self._settings.switch_tsig_li_algorithm,
|
||||||
|
"secret": self._settings.switch_tsig_li_secret,
|
||||||
|
}
|
||||||
|
raise ValueError(f"Unknown TLD: {tld}")
|
||||||
|
|
||||||
def _get_key_file_path(self, tld: str) -> Path:
|
def _get_key_file_path(self, tld: str) -> Path:
|
||||||
"""Generate TSIG key file for dig command"""
|
"""Generate TSIG key file for dig command"""
|
||||||
key_path = self.data_dir / f"{tld}_zonedata.key"
|
key_path = self.data_dir / f"{tld}_zonedata.key"
|
||||||
key_info = TSIG_KEYS.get(tld)
|
key_info = self._get_tsig_config(tld)
|
||||||
|
if not (key_info.get("secret") or "").strip():
|
||||||
if not key_info:
|
raise RuntimeError(f"Missing Switch TSIG secret for .{tld} (set SWITCH_TSIG_{tld.upper()}_SECRET)")
|
||||||
raise ValueError(f"Unknown TLD: {tld}")
|
|
||||||
|
|
||||||
# Write TSIG key file in BIND format
|
# Write TSIG key file in BIND format
|
||||||
key_content = f"""key "{key_info['name']}" {{
|
key_content = f"""key "{key_info['name']}" {{
|
||||||
@ -76,7 +78,7 @@ class ZoneFileService:
|
|||||||
Fetch zone file via DNS AXFR transfer.
|
Fetch zone file via DNS AXFR transfer.
|
||||||
Returns set of domain names (without TLD suffix).
|
Returns set of domain names (without TLD suffix).
|
||||||
"""
|
"""
|
||||||
if tld not in TSIG_KEYS:
|
if tld not in ("ch", "li"):
|
||||||
raise ValueError(f"Unsupported TLD: {tld}. Only 'ch' and 'li' are supported.")
|
raise ValueError(f"Unsupported TLD: {tld}. Only 'ch' and 'li' are supported.")
|
||||||
|
|
||||||
logger.info(f"Starting zone transfer for .{tld}")
|
logger.info(f"Starting zone transfer for .{tld}")
|
||||||
@ -143,23 +145,61 @@ class ZoneFileService:
|
|||||||
|
|
||||||
async def get_previous_snapshot(self, db: AsyncSession, tld: str) -> Optional[set[str]]:
|
async def get_previous_snapshot(self, db: AsyncSession, tld: str) -> Optional[set[str]]:
|
||||||
"""Load previous day's domain set from cache file"""
|
"""Load previous day's domain set from cache file"""
|
||||||
cache_file = self.data_dir / f"{tld}_domains.txt"
|
# Prefer most recent snapshot file before today (supports N-day retention)
|
||||||
|
tld_dir = self.snapshots_dir / tld
|
||||||
|
if tld_dir.exists():
|
||||||
|
candidates = sorted([p for p in tld_dir.glob("*.domains.txt") if p.is_file()])
|
||||||
|
if candidates:
|
||||||
|
# Pick the latest snapshot file (by name sort = date sort)
|
||||||
|
latest = candidates[-1]
|
||||||
|
try:
|
||||||
|
content = latest.read_text()
|
||||||
|
return set(line.strip() for line in content.splitlines() if line.strip())
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to load snapshot for .{tld} from {latest.name}: {e}")
|
||||||
|
|
||||||
|
# Fallback: legacy cache file
|
||||||
|
cache_file = self.data_dir / f"{tld}_domains.txt"
|
||||||
if cache_file.exists():
|
if cache_file.exists():
|
||||||
try:
|
try:
|
||||||
content = cache_file.read_text()
|
content = cache_file.read_text()
|
||||||
return set(line.strip() for line in content.splitlines() if line.strip())
|
return set(line.strip() for line in content.splitlines() if line.strip())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to load cache for .{tld}: {e}")
|
logger.warning(f"Failed to load cache for .{tld}: {e}")
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _cleanup_snapshot_files(self, tld: str) -> None:
|
||||||
|
"""Delete snapshot files older than retention window (best-effort)."""
|
||||||
|
keep_days = int(self._settings.zone_retention_days or 3)
|
||||||
|
cutoff = datetime.utcnow().date() - timedelta(days=keep_days)
|
||||||
|
tld_dir = self.snapshots_dir / tld
|
||||||
|
if not tld_dir.exists():
|
||||||
|
return
|
||||||
|
for p in tld_dir.glob("*.domains.txt"):
|
||||||
|
try:
|
||||||
|
# filename: YYYY-MM-DD.domains.txt
|
||||||
|
date_part = p.name.split(".")[0]
|
||||||
|
snap_date = datetime.fromisoformat(date_part).date()
|
||||||
|
if snap_date < cutoff:
|
||||||
|
p.unlink(missing_ok=True)
|
||||||
|
except Exception:
|
||||||
|
# Don't let cleanup break sync
|
||||||
|
continue
|
||||||
|
|
||||||
async def save_snapshot(self, db: AsyncSession, tld: str, domains: set[str]):
|
async def save_snapshot(self, db: AsyncSession, tld: str, domains: set[str]):
|
||||||
"""Save current snapshot to cache and database"""
|
"""Save current snapshot to cache and database"""
|
||||||
# Save to cache file
|
# Save to legacy cache file (fast path)
|
||||||
cache_file = self.data_dir / f"{tld}_domains.txt"
|
cache_file = self.data_dir / f"{tld}_domains.txt"
|
||||||
cache_file.write_text("\n".join(sorted(domains)))
|
cache_file.write_text("\n".join(sorted(domains)))
|
||||||
|
|
||||||
|
# Save a daily snapshot file for retention/debugging
|
||||||
|
tld_dir = self.snapshots_dir / tld
|
||||||
|
tld_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
today_str = datetime.utcnow().date().isoformat()
|
||||||
|
snapshot_file = tld_dir / f"{today_str}.domains.txt"
|
||||||
|
snapshot_file.write_text("\n".join(sorted(domains)))
|
||||||
|
self._cleanup_snapshot_files(tld)
|
||||||
|
|
||||||
# Save metadata to database
|
# Save metadata to database
|
||||||
checksum = self.compute_checksum(domains)
|
checksum = self.compute_checksum(domains)
|
||||||
snapshot = ZoneSnapshot(
|
snapshot = ZoneSnapshot(
|
||||||
|
|||||||
Reference in New Issue
Block a user