"""Background scheduler for domain checks, TLD price scraping, auctions, and notifications.""" import asyncio import logging from datetime import datetime, timedelta from typing import TYPE_CHECKING from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from sqlalchemy import select, and_ from app.config import get_settings from app.database import AsyncSessionLocal from app.models.domain import Domain, DomainCheck from app.models.user import User from app.models.subscription import Subscription, SubscriptionTier, TIER_CONFIG from app.services.domain_checker import domain_checker from app.services.email_service import email_service from app.services.price_tracker import price_tracker if TYPE_CHECKING: from app.models.sniper_alert import SniperAlert from app.models.auction import DomainAuction logger = logging.getLogger(__name__) settings = get_settings() # Global scheduler instance scheduler = AsyncIOScheduler() async def scrape_tld_prices(): """Scheduled task to scrape TLD prices from public sources.""" from app.services.tld_scraper.aggregator import tld_aggregator logger.info("Starting scheduled TLD price scrape...") try: async with AsyncSessionLocal() as db: result = await tld_aggregator.run_scrape(db) logger.info( f"TLD scrape completed: {result.status}, " f"{result.tlds_scraped} TLDs, {result.prices_saved} prices saved" ) if result.errors: logger.warning(f"Scrape errors: {result.errors}") except Exception as e: logger.exception(f"TLD price scrape failed: {e}") async def check_domains_by_frequency(frequency: str): """Check availability of domains based on their subscription frequency. Args: frequency: One of 'daily', 'hourly', 'realtime' (10-min) """ logger.info(f"Starting {frequency} domain check...") start_time = datetime.utcnow() async with AsyncSessionLocal() as db: # Get users with matching check frequency tiers_for_frequency = [] for tier, config in TIER_CONFIG.items(): if config['check_frequency'] == frequency: tiers_for_frequency.append(tier) # Realtime includes hourly and daily too (more frequent = superset) elif frequency == 'realtime': tiers_for_frequency.append(tier) elif frequency == 'hourly' and config['check_frequency'] in ['hourly', 'realtime']: tiers_for_frequency.append(tier) # Get domains from users with matching subscription tier from sqlalchemy.orm import joinedload result = await db.execute( select(Domain) .join(User, Domain.user_id == User.id) .outerjoin(Subscription, Subscription.user_id == User.id) .where( (Subscription.tier.in_(tiers_for_frequency)) | (Subscription.id.is_(None) & (frequency == 'daily')) # Scout users (no subscription) = daily ) ) domains = result.scalars().all() logger.info(f"Checking {len(domains)} domains...") checked = 0 errors = 0 newly_available = [] for domain in domains: try: # Check domain availability check_result = await domain_checker.check_domain(domain.name) # Track if domain became available was_taken = not domain.is_available is_now_available = check_result.is_available if was_taken and is_now_available and domain.notify_on_available: newly_available.append(domain) # Update domain domain.status = check_result.status domain.is_available = check_result.is_available domain.registrar = check_result.registrar domain.expiration_date = check_result.expiration_date domain.last_checked = datetime.utcnow() # Create check record check = DomainCheck( domain_id=domain.id, status=check_result.status, is_available=check_result.is_available, response_data=str(check_result.to_dict()), checked_at=datetime.utcnow(), ) db.add(check) checked += 1 # Small delay to avoid rate limiting await asyncio.sleep(0.5) except Exception as e: logger.error(f"Error checking domain {domain.name}: {e}") errors += 1 await db.commit() elapsed = (datetime.utcnow() - start_time).total_seconds() logger.info( f"Domain check complete. Checked: {checked}, Errors: {errors}, " f"Newly available: {len(newly_available)}, Time: {elapsed:.2f}s" ) # Send notifications for newly available domains if newly_available: logger.info(f"Domains that became available: {[d.name for d in newly_available]}") await send_domain_availability_alerts(db, newly_available) async def check_all_domains(): """Legacy function - checks all domains (daily).""" await check_domains_by_frequency('daily') async def check_hourly_domains(): """Check domains for Trader users (hourly).""" await check_domains_by_frequency('hourly') async def check_realtime_domains(): """Check domains for Tycoon users (every 10 minutes).""" await check_domains_by_frequency('realtime') def setup_scheduler(): """Configure and start the scheduler.""" # Daily domain check for Scout users at configured hour scheduler.add_job( check_all_domains, CronTrigger(hour=settings.check_hour, minute=settings.check_minute), id="daily_domain_check", name="Daily Domain Check (Scout)", replace_existing=True, ) # Hourly domain check for Trader users scheduler.add_job( check_hourly_domains, CronTrigger(minute=0), # Every hour at :00 id="hourly_domain_check", name="Hourly Domain Check (Trader)", replace_existing=True, ) # 10-minute domain check for Tycoon users scheduler.add_job( check_realtime_domains, CronTrigger(minute='*/10'), # Every 10 minutes id="realtime_domain_check", name="10-Minute Domain Check (Tycoon)", replace_existing=True, ) # Daily TLD price scrape at 03:00 UTC scheduler.add_job( scrape_tld_prices, CronTrigger(hour=3, minute=0), id="daily_tld_scrape", name="Daily TLD Price Scrape", replace_existing=True, ) # Price change check at 04:00 UTC (after scrape completes) scheduler.add_job( check_price_changes, CronTrigger(hour=4, minute=0), id="daily_price_check", name="Daily Price Change Check", replace_existing=True, ) # Auction scrape every hour (at :30 to avoid conflict with other jobs) scheduler.add_job( scrape_auctions, CronTrigger(minute=30), # Every hour at :30 id="hourly_auction_scrape", name="Hourly Auction Scrape", replace_existing=True, ) logger.info( f"Scheduler configured:" f"\n - Scout domain check at {settings.check_hour:02d}:{settings.check_minute:02d} (daily)" f"\n - Trader domain check every hour at :00" f"\n - Tycoon domain check every 10 minutes" f"\n - TLD price scrape at 03:00 UTC" f"\n - Price change alerts at 04:00 UTC" f"\n - Auction scrape every hour at :30" ) def start_scheduler(): """Start the scheduler if not already running.""" if not scheduler.running: setup_scheduler() scheduler.start() logger.info("Scheduler started") def stop_scheduler(): """Stop the scheduler.""" if scheduler.running: scheduler.shutdown() logger.info("Scheduler stopped") async def run_manual_check(): """Run domain check manually (for testing or on-demand).""" await check_all_domains() async def run_manual_tld_scrape(): """Run TLD price scrape manually (for testing or on-demand).""" await scrape_tld_prices() async def send_domain_availability_alerts(db, domains: list[Domain]): """Send email alerts for newly available domains.""" if not email_service.is_enabled: logger.info("Email service not configured, skipping domain alerts") return alerts_sent = 0 for domain in domains: try: # Get domain owner result = await db.execute( select(User).where(User.id == domain.user_id) ) user = result.scalar_one_or_none() if user and user.email: success = await email_service.send_domain_available_alert( to_email=user.email, domain=domain.name, user_name=user.name, ) if success: alerts_sent += 1 except Exception as e: logger.error(f"Failed to send alert for {domain.name}: {e}") logger.info(f"Sent {alerts_sent} domain availability alerts") async def check_price_changes(): """Check for TLD price changes and send alerts.""" logger.info("Checking for TLD price changes...") try: async with AsyncSessionLocal() as db: # Detect changes in last 24 hours changes = await price_tracker.detect_price_changes(db, hours=24) if changes: logger.info(f"Found {len(changes)} significant price changes") # Send alerts (if email is configured) alerts_sent = await price_tracker.send_price_alerts(db, changes) logger.info(f"Sent {alerts_sent} price change alerts") else: logger.info("No significant price changes detected") except Exception as e: logger.exception(f"Price change check failed: {e}") async def scrape_auctions(): """Scheduled task to scrape domain auctions from public sources.""" from app.services.auction_scraper import auction_scraper logger.info("Starting scheduled auction scrape...") try: async with AsyncSessionLocal() as db: result = await auction_scraper.scrape_all_platforms(db) logger.info( f"Auction scrape completed: " f"{result['total_found']} found, {result['total_new']} new, " f"{result['total_updated']} updated" ) if result.get('errors'): logger.warning(f"Scrape errors: {result['errors']}") # Match new auctions against Sniper Alerts if result['total_new'] > 0: await match_sniper_alerts() except Exception as e: logger.exception(f"Auction scrape failed: {e}") async def match_sniper_alerts(): """Match active sniper alerts against current auctions and notify users.""" from app.models.sniper_alert import SniperAlert, SniperAlertMatch from app.models.auction import DomainAuction logger.info("Matching sniper alerts against new auctions...") try: async with AsyncSessionLocal() as db: # Get all active sniper alerts alerts_result = await db.execute( select(SniperAlert).where(SniperAlert.is_active == True) ) alerts = alerts_result.scalars().all() if not alerts: logger.info("No active sniper alerts to match") return # Get recent auctions (added in last 2 hours) cutoff = datetime.utcnow() - timedelta(hours=2) auctions_result = await db.execute( select(DomainAuction).where( and_( DomainAuction.is_active == True, DomainAuction.scraped_at >= cutoff, ) ) ) auctions = auctions_result.scalars().all() if not auctions: logger.info("No recent auctions to match against") return matches_created = 0 notifications_sent = 0 for alert in alerts: matching_auctions = [] for auction in auctions: if _auction_matches_alert(auction, alert): matching_auctions.append(auction) if matching_auctions: for auction in matching_auctions: # Check if this match already exists existing = await db.execute( select(SniperAlertMatch).where( and_( SniperAlertMatch.alert_id == alert.id, SniperAlertMatch.domain == auction.domain, ) ) ) if existing.scalar_one_or_none(): continue # Create new match match = SniperAlertMatch( alert_id=alert.id, domain=auction.domain, platform=auction.platform, current_bid=auction.current_bid, end_time=auction.end_time, auction_url=auction.auction_url, matched_at=datetime.utcnow(), ) db.add(match) matches_created += 1 # Update alert last_triggered alert.last_triggered = datetime.utcnow() # Send notification if enabled if alert.notify_email: try: user_result = await db.execute( select(User).where(User.id == alert.user_id) ) user = user_result.scalar_one_or_none() if user and email_service.is_enabled: # Send email with matching domains domains_list = ", ".join([a.domain for a in matching_auctions[:5]]) await email_service.send_email( to_email=user.email, subject=f"🎯 Sniper Alert: {len(matching_auctions)} matching domains found!", html_content=f"""

Your Sniper Alert "{alert.name}" matched!

We found {len(matching_auctions)} domains matching your criteria:

View all matches in your Command Center

""" ) notifications_sent += 1 except Exception as e: logger.error(f"Failed to send sniper alert notification: {e}") await db.commit() logger.info(f"Sniper alert matching complete: {matches_created} matches created, {notifications_sent} notifications sent") except Exception as e: logger.exception(f"Sniper alert matching failed: {e}") def _auction_matches_alert(auction: "DomainAuction", alert: "SniperAlert") -> bool: """Check if an auction matches the criteria of a sniper alert.""" domain_name = auction.domain.rsplit('.', 1)[0] if '.' in auction.domain else auction.domain # Check keyword filter if alert.keyword: if alert.keyword.lower() not in domain_name.lower(): return False # Check TLD filter if alert.tlds: allowed_tlds = [t.strip().lower() for t in alert.tlds.split(',')] if auction.tld.lower() not in allowed_tlds: return False # Check length filters if alert.min_length and len(domain_name) < alert.min_length: return False if alert.max_length and len(domain_name) > alert.max_length: return False # Check price filters if alert.min_price and auction.current_bid < alert.min_price: return False if alert.max_price and auction.current_bid > alert.max_price: return False # Check exclusion filters if alert.exclude_numbers: if any(c.isdigit() for c in domain_name): return False if alert.exclude_hyphens: if '-' in domain_name: return False if alert.exclude_chars: excluded = set(alert.exclude_chars.lower()) if any(c in excluded for c in domain_name.lower()): return False return True