pounce/backend/app/scheduler.py
yves.gugger 0d2cc356b1 fix: Data freshness - only show active auctions
CRITICAL FIXES:
- API: Added end_time > now() filter to all auction queries
- Scheduler: Cleanup expired auctions every 15 minutes
- Scheduler: Scrape auctions every 2 hours (was 1 hour)
- Scheduler: Sniper alert matching every 30 minutes

Affected endpoints:
- GET /auctions (search)
- GET /auctions/feed (unified)
- GET /auctions/hot
- GET /auctions/ending-soon (already had filter)

Updated MARKET_CONCEPT.md with:
- 3 pillars: Pounce Direct, Live Auctions, Drops Tomorrow
- Data freshness architecture
- Unicorn roadmap
2025-12-11 09:55:27 +01:00

556 lines
20 KiB
Python

"""Background scheduler for domain checks, TLD price scraping, auctions, and notifications."""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select, and_
from app.config import get_settings
from app.database import AsyncSessionLocal
from app.models.domain import Domain, DomainCheck
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, TIER_CONFIG
from app.services.domain_checker import domain_checker
from app.services.email_service import email_service
from app.services.price_tracker import price_tracker
if TYPE_CHECKING:
from app.models.sniper_alert import SniperAlert
from app.models.auction import DomainAuction
logger = logging.getLogger(__name__)
settings = get_settings()
# Global scheduler instance
scheduler = AsyncIOScheduler()
async def scrape_tld_prices():
"""Scheduled task to scrape TLD prices from public sources."""
from app.services.tld_scraper.aggregator import tld_aggregator
logger.info("Starting scheduled TLD price scrape...")
try:
async with AsyncSessionLocal() as db:
result = await tld_aggregator.run_scrape(db)
logger.info(
f"TLD scrape completed: {result.status}, "
f"{result.tlds_scraped} TLDs, {result.prices_saved} prices saved"
)
if result.errors:
logger.warning(f"Scrape errors: {result.errors}")
except Exception as e:
logger.exception(f"TLD price scrape failed: {e}")
async def check_domains_by_frequency(frequency: str):
"""Check availability of domains based on their subscription frequency.
Args:
frequency: One of 'daily', 'hourly', 'realtime' (10-min)
"""
logger.info(f"Starting {frequency} domain check...")
start_time = datetime.utcnow()
async with AsyncSessionLocal() as db:
# Get users with matching check frequency
tiers_for_frequency = []
for tier, config in TIER_CONFIG.items():
if config['check_frequency'] == frequency:
tiers_for_frequency.append(tier)
# Realtime includes hourly and daily too (more frequent = superset)
elif frequency == 'realtime':
tiers_for_frequency.append(tier)
elif frequency == 'hourly' and config['check_frequency'] in ['hourly', 'realtime']:
tiers_for_frequency.append(tier)
# Get domains from users with matching subscription tier
from sqlalchemy.orm import joinedload
result = await db.execute(
select(Domain)
.join(User, Domain.user_id == User.id)
.outerjoin(Subscription, Subscription.user_id == User.id)
.where(
(Subscription.tier.in_(tiers_for_frequency)) |
(Subscription.id.is_(None) & (frequency == 'daily')) # Scout users (no subscription) = daily
)
)
domains = result.scalars().all()
logger.info(f"Checking {len(domains)} domains...")
checked = 0
errors = 0
newly_available = []
for domain in domains:
try:
# Check domain availability
check_result = await domain_checker.check_domain(domain.name)
# Track if domain became available
was_taken = not domain.is_available
is_now_available = check_result.is_available
if was_taken and is_now_available and domain.notify_on_available:
newly_available.append(domain)
# Update domain
domain.status = check_result.status
domain.is_available = check_result.is_available
domain.registrar = check_result.registrar
domain.expiration_date = check_result.expiration_date
domain.last_checked = datetime.utcnow()
# Create check record
check = DomainCheck(
domain_id=domain.id,
status=check_result.status,
is_available=check_result.is_available,
response_data=str(check_result.to_dict()),
checked_at=datetime.utcnow(),
)
db.add(check)
checked += 1
# Small delay to avoid rate limiting
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error checking domain {domain.name}: {e}")
errors += 1
await db.commit()
elapsed = (datetime.utcnow() - start_time).total_seconds()
logger.info(
f"Domain check complete. Checked: {checked}, Errors: {errors}, "
f"Newly available: {len(newly_available)}, Time: {elapsed:.2f}s"
)
# Send notifications for newly available domains
if newly_available:
logger.info(f"Domains that became available: {[d.name for d in newly_available]}")
await send_domain_availability_alerts(db, newly_available)
async def check_all_domains():
"""Legacy function - checks all domains (daily)."""
await check_domains_by_frequency('daily')
async def check_hourly_domains():
"""Check domains for Trader users (hourly)."""
await check_domains_by_frequency('hourly')
async def check_realtime_domains():
"""Check domains for Tycoon users (every 10 minutes)."""
await check_domains_by_frequency('realtime')
def setup_scheduler():
"""Configure and start the scheduler."""
# Daily domain check for Scout users at configured hour
scheduler.add_job(
check_all_domains,
CronTrigger(hour=settings.check_hour, minute=settings.check_minute),
id="daily_domain_check",
name="Daily Domain Check (Scout)",
replace_existing=True,
)
# Hourly domain check for Trader users
scheduler.add_job(
check_hourly_domains,
CronTrigger(minute=0), # Every hour at :00
id="hourly_domain_check",
name="Hourly Domain Check (Trader)",
replace_existing=True,
)
# 10-minute domain check for Tycoon users
scheduler.add_job(
check_realtime_domains,
CronTrigger(minute='*/10'), # Every 10 minutes
id="realtime_domain_check",
name="10-Minute Domain Check (Tycoon)",
replace_existing=True,
)
# Daily TLD price scrape at 03:00 UTC
scheduler.add_job(
scrape_tld_prices,
CronTrigger(hour=3, minute=0),
id="daily_tld_scrape",
name="Daily TLD Price Scrape",
replace_existing=True,
)
# Price change check at 04:00 UTC (after scrape completes)
scheduler.add_job(
check_price_changes,
CronTrigger(hour=4, minute=0),
id="daily_price_check",
name="Daily Price Change Check",
replace_existing=True,
)
# Auction scrape every 2 hours (at :30 to avoid conflict with other jobs)
scheduler.add_job(
scrape_auctions,
CronTrigger(hour='*/2', minute=30), # Every 2 hours at :30
id="auction_scrape",
name="Auction Scrape (2h)",
replace_existing=True,
)
# Cleanup expired auctions every 15 minutes (CRITICAL for data freshness!)
scheduler.add_job(
cleanup_expired_auctions,
CronTrigger(minute='*/15'), # Every 15 minutes
id="auction_cleanup",
name="Expired Auction Cleanup (15m)",
replace_existing=True,
)
# Sniper alert matching every 30 minutes
scheduler.add_job(
match_sniper_alerts,
CronTrigger(minute='*/30'), # Every 30 minutes
id="sniper_matching",
name="Sniper Alert Matching (30m)",
replace_existing=True,
)
logger.info(
f"Scheduler configured:"
f"\n - Scout domain check at {settings.check_hour:02d}:{settings.check_minute:02d} (daily)"
f"\n - Trader domain check every hour at :00"
f"\n - Tycoon domain check every 10 minutes"
f"\n - TLD price scrape at 03:00 UTC"
f"\n - Price change alerts at 04:00 UTC"
f"\n - Auction scrape every 2 hours at :30"
f"\n - Expired auction cleanup every 15 minutes"
f"\n - Sniper alert matching every 30 minutes"
)
def start_scheduler():
"""Start the scheduler if not already running."""
if not scheduler.running:
setup_scheduler()
scheduler.start()
logger.info("Scheduler started")
def stop_scheduler():
"""Stop the scheduler."""
if scheduler.running:
scheduler.shutdown()
logger.info("Scheduler stopped")
async def run_manual_check():
"""Run domain check manually (for testing or on-demand)."""
await check_all_domains()
async def run_manual_tld_scrape():
"""Run TLD price scrape manually (for testing or on-demand)."""
await scrape_tld_prices()
async def send_domain_availability_alerts(db, domains: list[Domain]):
"""Send email alerts for newly available domains."""
if not email_service.is_enabled:
logger.info("Email service not configured, skipping domain alerts")
return
alerts_sent = 0
for domain in domains:
try:
# Get domain owner
result = await db.execute(
select(User).where(User.id == domain.user_id)
)
user = result.scalar_one_or_none()
if user and user.email:
success = await email_service.send_domain_available_alert(
to_email=user.email,
domain=domain.name,
user_name=user.name,
)
if success:
alerts_sent += 1
except Exception as e:
logger.error(f"Failed to send alert for {domain.name}: {e}")
logger.info(f"Sent {alerts_sent} domain availability alerts")
async def check_price_changes():
"""Check for TLD price changes and send alerts."""
logger.info("Checking for TLD price changes...")
try:
async with AsyncSessionLocal() as db:
# Detect changes in last 24 hours
changes = await price_tracker.detect_price_changes(db, hours=24)
if changes:
logger.info(f"Found {len(changes)} significant price changes")
# Send alerts (if email is configured)
alerts_sent = await price_tracker.send_price_alerts(db, changes)
logger.info(f"Sent {alerts_sent} price change alerts")
else:
logger.info("No significant price changes detected")
except Exception as e:
logger.exception(f"Price change check failed: {e}")
async def cleanup_expired_auctions():
"""
Mark expired auctions as inactive and delete very old ones.
This is CRITICAL for data freshness! Without this, the Market page
would show auctions that ended days ago.
Runs every 15 minutes to ensure users always see live data.
"""
from app.models.auction import DomainAuction
from sqlalchemy import update, delete
logger.info("Starting expired auction cleanup...")
try:
async with AsyncSessionLocal() as db:
now = datetime.utcnow()
# 1. Mark ended auctions as inactive
stmt = (
update(DomainAuction)
.where(
and_(
DomainAuction.end_time < now,
DomainAuction.is_active == True
)
)
.values(is_active=False)
)
result = await db.execute(stmt)
marked_inactive = result.rowcount
# 2. Delete very old inactive auctions (> 7 days)
cutoff = now - timedelta(days=7)
stmt = delete(DomainAuction).where(
and_(
DomainAuction.is_active == False,
DomainAuction.end_time < cutoff
)
)
result = await db.execute(stmt)
deleted = result.rowcount
await db.commit()
if marked_inactive > 0 or deleted > 0:
logger.info(f"Auction cleanup: {marked_inactive} marked inactive, {deleted} deleted")
except Exception as e:
logger.exception(f"Auction cleanup failed: {e}")
async def scrape_auctions():
"""Scheduled task to scrape domain auctions from public sources."""
from app.services.auction_scraper import auction_scraper
logger.info("Starting scheduled auction scrape...")
try:
async with AsyncSessionLocal() as db:
result = await auction_scraper.scrape_all_platforms(db)
logger.info(
f"Auction scrape completed: "
f"{result['total_found']} found, {result['total_new']} new, "
f"{result['total_updated']} updated"
)
if result.get('errors'):
logger.warning(f"Scrape errors: {result['errors']}")
# Match new auctions against Sniper Alerts
if result['total_new'] > 0:
await match_sniper_alerts()
except Exception as e:
logger.exception(f"Auction scrape failed: {e}")
async def match_sniper_alerts():
"""Match active sniper alerts against current auctions and notify users."""
from app.models.sniper_alert import SniperAlert, SniperAlertMatch
from app.models.auction import DomainAuction
logger.info("Matching sniper alerts against new auctions...")
try:
async with AsyncSessionLocal() as db:
# Get all active sniper alerts
alerts_result = await db.execute(
select(SniperAlert).where(SniperAlert.is_active == True)
)
alerts = alerts_result.scalars().all()
if not alerts:
logger.info("No active sniper alerts to match")
return
# Get recent auctions (added in last 2 hours)
cutoff = datetime.utcnow() - timedelta(hours=2)
auctions_result = await db.execute(
select(DomainAuction).where(
and_(
DomainAuction.is_active == True,
DomainAuction.scraped_at >= cutoff,
)
)
)
auctions = auctions_result.scalars().all()
if not auctions:
logger.info("No recent auctions to match against")
return
matches_created = 0
notifications_sent = 0
for alert in alerts:
matching_auctions = []
for auction in auctions:
if _auction_matches_alert(auction, alert):
matching_auctions.append(auction)
if matching_auctions:
for auction in matching_auctions:
# Check if this match already exists
existing = await db.execute(
select(SniperAlertMatch).where(
and_(
SniperAlertMatch.alert_id == alert.id,
SniperAlertMatch.domain == auction.domain,
)
)
)
if existing.scalar_one_or_none():
continue
# Create new match
match = SniperAlertMatch(
alert_id=alert.id,
domain=auction.domain,
platform=auction.platform,
current_bid=auction.current_bid,
end_time=auction.end_time,
auction_url=auction.auction_url,
matched_at=datetime.utcnow(),
)
db.add(match)
matches_created += 1
# Update alert last_triggered
alert.last_triggered = datetime.utcnow()
# Send notification if enabled
if alert.notify_email:
try:
user_result = await db.execute(
select(User).where(User.id == alert.user_id)
)
user = user_result.scalar_one_or_none()
if user and email_service.is_enabled:
# Send email with matching domains
domains_list = ", ".join([a.domain for a in matching_auctions[:5]])
await email_service.send_email(
to_email=user.email,
subject=f"🎯 Sniper Alert: {len(matching_auctions)} matching domains found!",
html_content=f"""
<h2>Your Sniper Alert "{alert.name}" matched!</h2>
<p>We found {len(matching_auctions)} domains matching your criteria:</p>
<ul>
{"".join(f"<li><strong>{a.domain}</strong> - ${a.current_bid:.0f} on {a.platform}</li>" for a in matching_auctions[:10])}
</ul>
<p><a href="https://pounce.ch/command/alerts">View all matches in your Command Center</a></p>
"""
)
notifications_sent += 1
except Exception as e:
logger.error(f"Failed to send sniper alert notification: {e}")
await db.commit()
logger.info(f"Sniper alert matching complete: {matches_created} matches created, {notifications_sent} notifications sent")
except Exception as e:
logger.exception(f"Sniper alert matching failed: {e}")
def _auction_matches_alert(auction: "DomainAuction", alert: "SniperAlert") -> bool:
"""Check if an auction matches the criteria of a sniper alert."""
domain_name = auction.domain.rsplit('.', 1)[0] if '.' in auction.domain else auction.domain
# Check keyword filter
if alert.keyword:
if alert.keyword.lower() not in domain_name.lower():
return False
# Check TLD filter
if alert.tlds:
allowed_tlds = [t.strip().lower() for t in alert.tlds.split(',')]
if auction.tld.lower() not in allowed_tlds:
return False
# Check length filters
if alert.min_length and len(domain_name) < alert.min_length:
return False
if alert.max_length and len(domain_name) > alert.max_length:
return False
# Check price filters
if alert.min_price and auction.current_bid < alert.min_price:
return False
if alert.max_price and auction.current_bid > alert.max_price:
return False
# Check exclusion filters
if alert.exclude_numbers:
if any(c.isdigit() for c in domain_name):
return False
if alert.exclude_hyphens:
if '-' in domain_name:
return False
if alert.exclude_chars:
excluded = set(alert.exclude_chars.lower())
if any(c in excluded for c in domain_name.lower()):
return False
return True