diff --git a/backend/app/db_migrations.py b/backend/app/db_migrations.py index b33d9d6..ccdf82f 100644 --- a/backend/app/db_migrations.py +++ b/backend/app/db_migrations.py @@ -105,6 +105,56 @@ async def apply_migrations(conn: AsyncConnection) -> None: ) ) + # --------------------------------------------------------- + # 2b) domains indexes (watchlist list/sort/filter) + # --------------------------------------------------------- + if await _table_exists(conn, "domains"): + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_domains_user_id ON domains(user_id)")) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_domains_status ON domains(status)")) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_domains_user_created_at ON domains(user_id, created_at)")) + + # --------------------------------------------------------- + # 2c) zone_snapshots indexes (admin zone status + recency) + # --------------------------------------------------------- + if await _table_exists(conn, "zone_snapshots"): + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_zone_snapshots_tld ON zone_snapshots(tld)")) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_zone_snapshots_snapshot_date ON zone_snapshots(snapshot_date)")) + await conn.execute( + text( + "CREATE INDEX IF NOT EXISTS ix_zone_snapshots_tld_snapshot_date " + "ON zone_snapshots(tld, snapshot_date)" + ) + ) + + # --------------------------------------------------------- + # 2d) dropped_domains indexes + de-duplication + # --------------------------------------------------------- + if await _table_exists(conn, "dropped_domains"): + # Query patterns: + # - by time window (dropped_date) + optional tld + keyword + # - status updates (availability_status + last_status_check) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_tld ON dropped_domains(tld)")) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_dropped_date ON dropped_domains(dropped_date)")) + await conn.execute( + text( + "CREATE INDEX IF NOT EXISTS ix_dropped_domains_tld_dropped_date " + "ON dropped_domains(tld, dropped_date)" + ) + ) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_domain ON dropped_domains(domain)")) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_availability ON dropped_domains(availability_status)")) + await conn.execute(text("CREATE INDEX IF NOT EXISTS ix_dropped_domains_last_status_check ON dropped_domains(last_status_check)")) + + # Enforce de-duplication per drop day (safe + idempotent). + # SQLite: Unique indexes are supported. + # Postgres: Unique indexes are supported; we avoid CONCURRENTLY here (runs in startup transaction). + await conn.execute( + text( + "CREATE UNIQUE INDEX IF NOT EXISTS ux_dropped_domains_domain_tld_dropped_date " + "ON dropped_domains(domain, tld, dropped_date)" + ) + ) + # --------------------------------------------------- # 3) tld_prices composite index for trend computations # --------------------------------------------------- diff --git a/backend/app/scheduler.py b/backend/app/scheduler.py index 8d3c644..5dfa5d7 100644 --- a/backend/app/scheduler.py +++ b/backend/app/scheduler.py @@ -391,30 +391,51 @@ async def run_health_checks(): try: async with AsyncSessionLocal() as db: # Get all watched domains (registered, not available) - result = await db.execute( - select(Domain).where(Domain.is_available == False) - ) + result = await db.execute(select(Domain).where(Domain.is_available == False)) domains = result.scalars().all() - + logger.info(f"Running health checks on {len(domains)} domains...") - + + if not domains: + return + + # Prefetch caches to avoid N+1 queries + domain_ids = [d.id for d in domains] + caches_result = await db.execute( + select(DomainHealthCache).where(DomainHealthCache.domain_id.in_(domain_ids)) + ) + caches = caches_result.scalars().all() + cache_by_domain_id = {c.domain_id: c for c in caches} + health_checker = get_health_checker() checked = 0 errors = 0 status_changes = [] - - for domain in domains: - try: - # Run health check - report = await health_checker.check_domain(domain.name) - - # Check for status changes (if we have previous data) - # Get existing cache - cache_result = await db.execute( - select(DomainHealthCache).where(DomainHealthCache.domain_id == domain.id) - ) - existing_cache = cache_result.scalar_one_or_none() - + + max_concurrent = max(1, int(getattr(settings, "domain_check_max_concurrent", 3) or 3)) + delay = float(getattr(settings, "domain_check_delay_seconds", 0.3) or 0.3) + semaphore = asyncio.Semaphore(max_concurrent) + + async def _check_one(d: Domain): + async with semaphore: + report = await health_checker.check_domain(d.name) + await asyncio.sleep(delay) + return d, report + + chunk_size = 100 + for i in range(0, len(domains), chunk_size): + chunk = domains[i : i + chunk_size] + results = await asyncio.gather(*[_check_one(d) for d in chunk], return_exceptions=True) + + for item in results: + if isinstance(item, Exception): + errors += 1 + continue + + domain, report = item + + existing_cache = cache_by_domain_id.get(domain.id) + old_status = existing_cache.status if existing_cache else None new_status = report.status.value @@ -446,7 +467,6 @@ async def run_health_checks(): existing_cache.ssl_data = ssl_json existing_cache.checked_at = datetime.utcnow() else: - # Create new cache entry new_cache = DomainHealthCache( domain_id=domain.id, status=new_status, @@ -458,15 +478,9 @@ async def run_health_checks(): checked_at=datetime.utcnow(), ) db.add(new_cache) + cache_by_domain_id[domain.id] = new_cache checked += 1 - - # Small delay to avoid overwhelming DNS servers - await asyncio.sleep(0.3) - - except Exception as e: - logger.error(f"Health check failed for {domain.name}: {e}") - errors += 1 await db.commit() diff --git a/backend/app/services/zone_file.py b/backend/app/services/zone_file.py index 56135f1..8bc9486 100644 --- a/backend/app/services/zone_file.py +++ b/backend/app/services/zone_file.py @@ -15,6 +15,8 @@ from pathlib import Path from typing import Optional from sqlalchemy import select, func +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.dialects.sqlite import insert as sqlite_insert from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings @@ -236,52 +238,55 @@ class ZoneFileService: today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) - # Store all drops - no RDAP verification (prevents bans!) - dropped_records = [] - batch_size = 1000 dropped_list = list(dropped) - - for i in range(0, len(dropped_list), batch_size): - batch = dropped_list[i:i + batch_size] - - for name in batch: - try: - record = DroppedDomain( - domain=name, # Just the name, not full domain! - tld=tld, - dropped_date=today, - length=len(name), - is_numeric=name.isdigit(), - has_hyphen='-' in name, - availability_status='unknown' - ) - db.add(record) - dropped_records.append({ - "domain": f"{name}.{tld}", - "length": len(name), - }) - except Exception: - # Duplicate or other error - skip - pass - - # Commit batch - try: - await db.commit() - except Exception: - await db.rollback() - - if (i + batch_size) % 5000 == 0: - logger.info(f"Saved {min(i + batch_size, len(dropped_list)):,}/{len(dropped_list):,} drops") - - # Final commit - try: + + rows = [ + { + "domain": name, + "tld": tld, + "dropped_date": today, + "length": len(name), + "is_numeric": name.isdigit(), + "has_hyphen": "-" in name, + "availability_status": "unknown", + } + for name in dropped_list + ] + + # Bulk insert with conflict-ignore (needs unique index, see db_migrations.py) + dialect = db.get_bind().dialect.name if db.get_bind() is not None else "unknown" + batch_size = 5000 + inserted_total = 0 + + for i in range(0, len(rows), batch_size): + batch = rows[i : i + batch_size] + + if dialect == "postgresql": + stmt = ( + pg_insert(DroppedDomain) + .values(batch) + .on_conflict_do_nothing(index_elements=["domain", "tld", "dropped_date"]) + ) + elif dialect == "sqlite": + # SQLite: INSERT OR IGNORE (unique index is still respected) + stmt = sqlite_insert(DroppedDomain).values(batch).prefix_with("OR IGNORE") + else: + # Fallback: best-effort plain insert; duplicates are handled by DB constraints if present. + stmt = pg_insert(DroppedDomain).values(batch) + + result = await db.execute(stmt) + # rowcount is driver-dependent; still useful for postgres/sqlite + inserted_total += int(getattr(result, "rowcount", 0) or 0) await db.commit() - except Exception: - await db.rollback() - - logger.info(f"Zone drops for .{tld}: {len(dropped_records):,} saved (verification pending)") - - return dropped_records + + if (i + batch_size) % 20000 == 0: + logger.info(f"Saved {min(i + batch_size, len(rows)):,}/{len(rows):,} drops (inserted so far: {inserted_total:,})") + + logger.info(f"Zone drops for .{tld}: {inserted_total:,} inserted (out of {len(rows):,} diff)") + + # Return a small preview list (avoid returning huge payloads) + preview = [{"domain": f"{r['domain']}.{tld}", "length": r["length"]} for r in rows[:200]] + return preview async def run_daily_sync(self, db: AsyncSession, tld: str) -> dict: """