"""Background scheduler for daily domain checks.""" import asyncio import logging from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from sqlalchemy import select from app.config import get_settings from app.database import AsyncSessionLocal from app.models.domain import Domain, DomainCheck from app.services.domain_checker import domain_checker logger = logging.getLogger(__name__) settings = get_settings() # Global scheduler instance scheduler = AsyncIOScheduler() async def check_all_domains(): """Check availability of all monitored domains.""" logger.info("Starting daily domain check...") start_time = datetime.utcnow() async with AsyncSessionLocal() as db: # Get all domains result = await db.execute(select(Domain)) domains = result.scalars().all() logger.info(f"Checking {len(domains)} domains...") checked = 0 errors = 0 newly_available = [] for domain in domains: try: # Check domain availability check_result = await domain_checker.check_domain(domain.name) # Track if domain became available was_taken = not domain.is_available is_now_available = check_result.is_available if was_taken and is_now_available and domain.notify_on_available: newly_available.append(domain) # Update domain domain.status = check_result.status domain.is_available = check_result.is_available domain.registrar = check_result.registrar domain.expiration_date = check_result.expiration_date domain.last_checked = datetime.utcnow() # Create check record check = DomainCheck( domain_id=domain.id, status=check_result.status, is_available=check_result.is_available, response_data=str(check_result.to_dict()), checked_at=datetime.utcnow(), ) db.add(check) checked += 1 # Small delay to avoid rate limiting await asyncio.sleep(0.5) except Exception as e: logger.error(f"Error checking domain {domain.name}: {e}") errors += 1 await db.commit() elapsed = (datetime.utcnow() - start_time).total_seconds() logger.info( f"Domain check complete. Checked: {checked}, Errors: {errors}, " f"Newly available: {len(newly_available)}, Time: {elapsed:.2f}s" ) # TODO: Send notifications for newly available domains if newly_available: logger.info(f"Domains that became available: {[d.name for d in newly_available]}") # await send_availability_notifications(newly_available) def setup_scheduler(): """Configure and start the scheduler.""" # Daily check at configured hour scheduler.add_job( check_all_domains, CronTrigger(hour=settings.check_hour, minute=settings.check_minute), id="daily_domain_check", name="Daily Domain Availability Check", replace_existing=True, ) logger.info( f"Scheduler configured. Daily check at {settings.check_hour:02d}:{settings.check_minute:02d}" ) def start_scheduler(): """Start the scheduler if not already running.""" if not scheduler.running: setup_scheduler() scheduler.start() logger.info("Scheduler started") def stop_scheduler(): """Stop the scheduler.""" if scheduler.running: scheduler.shutdown() logger.info("Scheduler stopped") async def run_manual_check(): """Run domain check manually (for testing or on-demand).""" await check_all_domains()