perf: Bulk insert drops + add critical DB indexes

This commit is contained in:
2025-12-21 16:13:46 +01:00
parent 93bd23c1cd
commit f36d55f814
3 changed files with 139 additions and 70 deletions

View File

@ -105,6 +105,56 @@ async def apply_migrations(conn: AsyncConnection) -> None:
)
)
# ---------------------------------------------------------
# 2b) domains indexes (watchlist list/sort/filter)
# ---------------------------------------------------------
if await _table_exists(conn, "domains"):
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_domains_user_id ON domains(user_id)"))
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_domains_status ON domains(status)"))
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_domains_user_created_at ON domains(user_id, created_at)"))
# ---------------------------------------------------------
# 2c) zone_snapshots indexes (admin zone status + recency)
# ---------------------------------------------------------
if await _table_exists(conn, "zone_snapshots"):
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_zone_snapshots_tld ON zone_snapshots(tld)"))
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_zone_snapshots_snapshot_date ON zone_snapshots(snapshot_date)"))
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS ix_zone_snapshots_tld_snapshot_date "
"ON zone_snapshots(tld, snapshot_date)"
)
)
# ---------------------------------------------------------
# 2d) dropped_domains indexes + de-duplication
# ---------------------------------------------------------
if await _table_exists(conn, "dropped_domains"):
# Query patterns:
# - by time window (dropped_date) + optional tld + keyword
# - status updates (availability_status + last_status_check)
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_tld ON dropped_domains(tld)"))
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_dropped_date ON dropped_domains(dropped_date)"))
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS ix_dropped_domains_tld_dropped_date "
"ON dropped_domains(tld, dropped_date)"
)
)
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_domain ON dropped_domains(domain)"))
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_availability ON dropped_domains(availability_status)"))
await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_last_status_check ON dropped_domains(last_status_check)"))
# Enforce de-duplication per drop day (safe + idempotent).
# SQLite: Unique indexes are supported.
# Postgres: Unique indexes are supported; we avoid CONCURRENTLY here (runs in startup transaction).
await conn.execute(
text(
"CREATE UNIQUE INDEX IF NOT EXISTS ux_dropped_domains_domain_tld_dropped_date "
"ON dropped_domains(domain, tld, dropped_date)"
)
)
# ---------------------------------------------------
# 3) tld_prices composite index for trend computations
# ---------------------------------------------------

View File

@ -391,29 +391,50 @@ async def run_health_checks():
try:
async with AsyncSessionLocal() as db:
# Get all watched domains (registered, not available)
result = await db.execute(
select(Domain).where(Domain.is_available == False)
)
result = await db.execute(select(Domain).where(Domain.is_available == False))
domains = result.scalars().all()
logger.info(f"Running health checks on {len(domains)} domains...")
if not domains:
return
# Prefetch caches to avoid N+1 queries
domain_ids = [d.id for d in domains]
caches_result = await db.execute(
select(DomainHealthCache).where(DomainHealthCache.domain_id.in_(domain_ids))
)
caches = caches_result.scalars().all()
cache_by_domain_id = {c.domain_id: c for c in caches}
health_checker = get_health_checker()
checked = 0
errors = 0
status_changes = []
for domain in domains:
try:
# Run health check
report = await health_checker.check_domain(domain.name)
max_concurrent = max(1, int(getattr(settings, "domain_check_max_concurrent", 3) or 3))
delay = float(getattr(settings, "domain_check_delay_seconds", 0.3) or 0.3)
semaphore = asyncio.Semaphore(max_concurrent)
# Check for status changes (if we have previous data)
# Get existing cache
cache_result = await db.execute(
select(DomainHealthCache).where(DomainHealthCache.domain_id == domain.id)
)
existing_cache = cache_result.scalar_one_or_none()
async def _check_one(d: Domain):
async with semaphore:
report = await health_checker.check_domain(d.name)
await asyncio.sleep(delay)
return d, report
chunk_size = 100
for i in range(0, len(domains), chunk_size):
chunk = domains[i : i + chunk_size]
results = await asyncio.gather(*[_check_one(d) for d in chunk], return_exceptions=True)
for item in results:
if isinstance(item, Exception):
errors += 1
continue
domain, report = item
existing_cache = cache_by_domain_id.get(domain.id)
old_status = existing_cache.status if existing_cache else None
new_status = report.status.value
@ -446,7 +467,6 @@ async def run_health_checks():
existing_cache.ssl_data = ssl_json
existing_cache.checked_at = datetime.utcnow()
else:
# Create new cache entry
new_cache = DomainHealthCache(
domain_id=domain.id,
status=new_status,
@ -458,16 +478,10 @@ async def run_health_checks():
checked_at=datetime.utcnow(),
)
db.add(new_cache)
cache_by_domain_id[domain.id] = new_cache
checked += 1
# Small delay to avoid overwhelming DNS servers
await asyncio.sleep(0.3)
except Exception as e:
logger.error(f"Health check failed for {domain.name}: {e}")
errors += 1
await db.commit()
elapsed = (datetime.utcnow() - start_time).total_seconds()

View File

@ -15,6 +15,8 @@ from pathlib import Path
from typing import Optional
from sqlalchemy import select, func
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
@ -236,52 +238,55 @@ class ZoneFileService:
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
# Store all drops - no RDAP verification (prevents bans!)
dropped_records = []
batch_size = 1000
dropped_list = list(dropped)
for i in range(0, len(dropped_list), batch_size):
batch = dropped_list[i:i + batch_size]
rows = [
{
"domain": name,
"tld": tld,
"dropped_date": today,
"length": len(name),
"is_numeric": name.isdigit(),
"has_hyphen": "-" in name,
"availability_status": "unknown",
}
for name in dropped_list
]
for name in batch:
try:
record = DroppedDomain(
domain=name, # Just the name, not full domain!
tld=tld,
dropped_date=today,
length=len(name),
is_numeric=name.isdigit(),
has_hyphen='-' in name,
availability_status='unknown'
)
db.add(record)
dropped_records.append({
"domain": f"{name}.{tld}",
"length": len(name),
})
except Exception:
# Duplicate or other error - skip
pass
# Bulk insert with conflict-ignore (needs unique index, see db_migrations.py)
dialect = db.get_bind().dialect.name if db.get_bind() is not None else "unknown"
batch_size = 5000
inserted_total = 0
# Commit batch
try:
await db.commit()
except Exception:
await db.rollback()
for i in range(0, len(rows), batch_size):
batch = rows[i : i + batch_size]
if (i + batch_size) % 5000 == 0:
logger.info(f"Saved {min(i + batch_size, len(dropped_list)):,}/{len(dropped_list):,} drops")
if dialect == "postgresql":
stmt = (
pg_insert(DroppedDomain)
.values(batch)
.on_conflict_do_nothing(index_elements=["domain", "tld", "dropped_date"])
)
elif dialect == "sqlite":
# SQLite: INSERT OR IGNORE (unique index is still respected)
stmt = sqlite_insert(DroppedDomain).values(batch).prefix_with("OR IGNORE")
else:
# Fallback: best-effort plain insert; duplicates are handled by DB constraints if present.
stmt = pg_insert(DroppedDomain).values(batch)
# Final commit
try:
result = await db.execute(stmt)
# rowcount is driver-dependent; still useful for postgres/sqlite
inserted_total += int(getattr(result, "rowcount", 0) or 0)
await db.commit()
except Exception:
await db.rollback()
logger.info(f"Zone drops for .{tld}: {len(dropped_records):,} saved (verification pending)")
if (i + batch_size) % 20000 == 0:
logger.info(f"Saved {min(i + batch_size, len(rows)):,}/{len(rows):,} drops (inserted so far: {inserted_total:,})")
return dropped_records
logger.info(f"Zone drops for .{tld}: {inserted_total:,} inserted (out of {len(rows):,} diff)")
# Return a small preview list (avoid returning huge payloads)
preview = [{"domain": f"{r['domain']}.{tld}", "length": r["length"]} for r in rows[:200]]
return preview
async def run_daily_sync(self, db: AsyncSession, tld: str) -> dict:
"""