pounce/backend/app/scheduler.py
yves.gugger 20d321a394 feat: Add Sniper Alert auto-matching to auction scraper
SCHEDULER ENHANCEMENT:
- After each hourly auction scrape, automatically match new auctions
  against all active Sniper Alerts
- _auction_matches_alert() checks all filter criteria:
  - Keyword matching
  - TLD whitelist
  - Min/max length
  - Min/max price
  - Exclude numbers
  - Exclude hyphens
  - Exclude specific characters
- Creates SniperAlertMatch records for dashboard display
- Sends email notifications to users with matching alerts
- Updates alert's last_triggered timestamp

This implements the full Sniper Alert workflow from analysis_3.md:
'Der User kann extrem spezifische Filter speichern.
Wenn die Mail kommt, weiß der User: Das ist relevant.'
2025-12-10 13:09:37 +01:00

484 lines
18 KiB
Python

"""Background scheduler for domain checks, TLD price scraping, auctions, and notifications."""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select, and_
from app.config import get_settings
from app.database import AsyncSessionLocal
from app.models.domain import Domain, DomainCheck
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, TIER_CONFIG
from app.services.domain_checker import domain_checker
from app.services.email_service import email_service
from app.services.price_tracker import price_tracker
if TYPE_CHECKING:
from app.models.sniper_alert import SniperAlert
from app.models.auction import DomainAuction
logger = logging.getLogger(__name__)
settings = get_settings()
# Global scheduler instance
scheduler = AsyncIOScheduler()
async def scrape_tld_prices():
"""Scheduled task to scrape TLD prices from public sources."""
from app.services.tld_scraper.aggregator import tld_aggregator
logger.info("Starting scheduled TLD price scrape...")
try:
async with AsyncSessionLocal() as db:
result = await tld_aggregator.run_scrape(db)
logger.info(
f"TLD scrape completed: {result.status}, "
f"{result.tlds_scraped} TLDs, {result.prices_saved} prices saved"
)
if result.errors:
logger.warning(f"Scrape errors: {result.errors}")
except Exception as e:
logger.exception(f"TLD price scrape failed: {e}")
async def check_domains_by_frequency(frequency: str):
"""Check availability of domains based on their subscription frequency.
Args:
frequency: One of 'daily', 'hourly', 'realtime' (10-min)
"""
logger.info(f"Starting {frequency} domain check...")
start_time = datetime.utcnow()
async with AsyncSessionLocal() as db:
# Get users with matching check frequency
tiers_for_frequency = []
for tier, config in TIER_CONFIG.items():
if config['check_frequency'] == frequency:
tiers_for_frequency.append(tier)
# Realtime includes hourly and daily too (more frequent = superset)
elif frequency == 'realtime':
tiers_for_frequency.append(tier)
elif frequency == 'hourly' and config['check_frequency'] in ['hourly', 'realtime']:
tiers_for_frequency.append(tier)
# Get domains from users with matching subscription tier
from sqlalchemy.orm import joinedload
result = await db.execute(
select(Domain)
.join(User, Domain.user_id == User.id)
.outerjoin(Subscription, Subscription.user_id == User.id)
.where(
(Subscription.tier.in_(tiers_for_frequency)) |
(Subscription.id.is_(None) & (frequency == 'daily')) # Scout users (no subscription) = daily
)
)
domains = result.scalars().all()
logger.info(f"Checking {len(domains)} domains...")
checked = 0
errors = 0
newly_available = []
for domain in domains:
try:
# Check domain availability
check_result = await domain_checker.check_domain(domain.name)
# Track if domain became available
was_taken = not domain.is_available
is_now_available = check_result.is_available
if was_taken and is_now_available and domain.notify_on_available:
newly_available.append(domain)
# Update domain
domain.status = check_result.status
domain.is_available = check_result.is_available
domain.registrar = check_result.registrar
domain.expiration_date = check_result.expiration_date
domain.last_checked = datetime.utcnow()
# Create check record
check = DomainCheck(
domain_id=domain.id,
status=check_result.status,
is_available=check_result.is_available,
response_data=str(check_result.to_dict()),
checked_at=datetime.utcnow(),
)
db.add(check)
checked += 1
# Small delay to avoid rate limiting
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error checking domain {domain.name}: {e}")
errors += 1
await db.commit()
elapsed = (datetime.utcnow() - start_time).total_seconds()
logger.info(
f"Domain check complete. Checked: {checked}, Errors: {errors}, "
f"Newly available: {len(newly_available)}, Time: {elapsed:.2f}s"
)
# Send notifications for newly available domains
if newly_available:
logger.info(f"Domains that became available: {[d.name for d in newly_available]}")
await send_domain_availability_alerts(db, newly_available)
async def check_all_domains():
"""Legacy function - checks all domains (daily)."""
await check_domains_by_frequency('daily')
async def check_hourly_domains():
"""Check domains for Trader users (hourly)."""
await check_domains_by_frequency('hourly')
async def check_realtime_domains():
"""Check domains for Tycoon users (every 10 minutes)."""
await check_domains_by_frequency('realtime')
def setup_scheduler():
"""Configure and start the scheduler."""
# Daily domain check for Scout users at configured hour
scheduler.add_job(
check_all_domains,
CronTrigger(hour=settings.check_hour, minute=settings.check_minute),
id="daily_domain_check",
name="Daily Domain Check (Scout)",
replace_existing=True,
)
# Hourly domain check for Trader users
scheduler.add_job(
check_hourly_domains,
CronTrigger(minute=0), # Every hour at :00
id="hourly_domain_check",
name="Hourly Domain Check (Trader)",
replace_existing=True,
)
# 10-minute domain check for Tycoon users
scheduler.add_job(
check_realtime_domains,
CronTrigger(minute='*/10'), # Every 10 minutes
id="realtime_domain_check",
name="10-Minute Domain Check (Tycoon)",
replace_existing=True,
)
# Daily TLD price scrape at 03:00 UTC
scheduler.add_job(
scrape_tld_prices,
CronTrigger(hour=3, minute=0),
id="daily_tld_scrape",
name="Daily TLD Price Scrape",
replace_existing=True,
)
# Price change check at 04:00 UTC (after scrape completes)
scheduler.add_job(
check_price_changes,
CronTrigger(hour=4, minute=0),
id="daily_price_check",
name="Daily Price Change Check",
replace_existing=True,
)
# Auction scrape every hour (at :30 to avoid conflict with other jobs)
scheduler.add_job(
scrape_auctions,
CronTrigger(minute=30), # Every hour at :30
id="hourly_auction_scrape",
name="Hourly Auction Scrape",
replace_existing=True,
)
logger.info(
f"Scheduler configured:"
f"\n - Scout domain check at {settings.check_hour:02d}:{settings.check_minute:02d} (daily)"
f"\n - Trader domain check every hour at :00"
f"\n - Tycoon domain check every 10 minutes"
f"\n - TLD price scrape at 03:00 UTC"
f"\n - Price change alerts at 04:00 UTC"
f"\n - Auction scrape every hour at :30"
)
def start_scheduler():
"""Start the scheduler if not already running."""
if not scheduler.running:
setup_scheduler()
scheduler.start()
logger.info("Scheduler started")
def stop_scheduler():
"""Stop the scheduler."""
if scheduler.running:
scheduler.shutdown()
logger.info("Scheduler stopped")
async def run_manual_check():
"""Run domain check manually (for testing or on-demand)."""
await check_all_domains()
async def run_manual_tld_scrape():
"""Run TLD price scrape manually (for testing or on-demand)."""
await scrape_tld_prices()
async def send_domain_availability_alerts(db, domains: list[Domain]):
"""Send email alerts for newly available domains."""
if not email_service.is_enabled:
logger.info("Email service not configured, skipping domain alerts")
return
alerts_sent = 0
for domain in domains:
try:
# Get domain owner
result = await db.execute(
select(User).where(User.id == domain.user_id)
)
user = result.scalar_one_or_none()
if user and user.email:
success = await email_service.send_domain_available_alert(
to_email=user.email,
domain=domain.name,
user_name=user.name,
)
if success:
alerts_sent += 1
except Exception as e:
logger.error(f"Failed to send alert for {domain.name}: {e}")
logger.info(f"Sent {alerts_sent} domain availability alerts")
async def check_price_changes():
"""Check for TLD price changes and send alerts."""
logger.info("Checking for TLD price changes...")
try:
async with AsyncSessionLocal() as db:
# Detect changes in last 24 hours
changes = await price_tracker.detect_price_changes(db, hours=24)
if changes:
logger.info(f"Found {len(changes)} significant price changes")
# Send alerts (if email is configured)
alerts_sent = await price_tracker.send_price_alerts(db, changes)
logger.info(f"Sent {alerts_sent} price change alerts")
else:
logger.info("No significant price changes detected")
except Exception as e:
logger.exception(f"Price change check failed: {e}")
async def scrape_auctions():
"""Scheduled task to scrape domain auctions from public sources."""
from app.services.auction_scraper import auction_scraper
logger.info("Starting scheduled auction scrape...")
try:
async with AsyncSessionLocal() as db:
result = await auction_scraper.scrape_all_platforms(db)
logger.info(
f"Auction scrape completed: "
f"{result['total_found']} found, {result['total_new']} new, "
f"{result['total_updated']} updated"
)
if result.get('errors'):
logger.warning(f"Scrape errors: {result['errors']}")
# Match new auctions against Sniper Alerts
if result['total_new'] > 0:
await match_sniper_alerts()
except Exception as e:
logger.exception(f"Auction scrape failed: {e}")
async def match_sniper_alerts():
"""Match active sniper alerts against current auctions and notify users."""
from app.models.sniper_alert import SniperAlert, SniperAlertMatch
from app.models.auction import DomainAuction
logger.info("Matching sniper alerts against new auctions...")
try:
async with AsyncSessionLocal() as db:
# Get all active sniper alerts
alerts_result = await db.execute(
select(SniperAlert).where(SniperAlert.is_active == True)
)
alerts = alerts_result.scalars().all()
if not alerts:
logger.info("No active sniper alerts to match")
return
# Get recent auctions (added in last 2 hours)
cutoff = datetime.utcnow() - timedelta(hours=2)
auctions_result = await db.execute(
select(DomainAuction).where(
and_(
DomainAuction.is_active == True,
DomainAuction.scraped_at >= cutoff,
)
)
)
auctions = auctions_result.scalars().all()
if not auctions:
logger.info("No recent auctions to match against")
return
matches_created = 0
notifications_sent = 0
for alert in alerts:
matching_auctions = []
for auction in auctions:
if _auction_matches_alert(auction, alert):
matching_auctions.append(auction)
if matching_auctions:
for auction in matching_auctions:
# Check if this match already exists
existing = await db.execute(
select(SniperAlertMatch).where(
and_(
SniperAlertMatch.alert_id == alert.id,
SniperAlertMatch.domain == auction.domain,
)
)
)
if existing.scalar_one_or_none():
continue
# Create new match
match = SniperAlertMatch(
alert_id=alert.id,
domain=auction.domain,
platform=auction.platform,
current_bid=auction.current_bid,
end_time=auction.end_time,
auction_url=auction.auction_url,
matched_at=datetime.utcnow(),
)
db.add(match)
matches_created += 1
# Update alert last_triggered
alert.last_triggered = datetime.utcnow()
# Send notification if enabled
if alert.notify_email:
try:
user_result = await db.execute(
select(User).where(User.id == alert.user_id)
)
user = user_result.scalar_one_or_none()
if user and email_service.is_enabled:
# Send email with matching domains
domains_list = ", ".join([a.domain for a in matching_auctions[:5]])
await email_service.send_email(
to_email=user.email,
subject=f"🎯 Sniper Alert: {len(matching_auctions)} matching domains found!",
html_content=f"""
<h2>Your Sniper Alert "{alert.name}" matched!</h2>
<p>We found {len(matching_auctions)} domains matching your criteria:</p>
<ul>
{"".join(f"<li><strong>{a.domain}</strong> - ${a.current_bid:.0f} on {a.platform}</li>" for a in matching_auctions[:10])}
</ul>
<p><a href="https://pounce.ch/command/alerts">View all matches in your Command Center</a></p>
"""
)
notifications_sent += 1
except Exception as e:
logger.error(f"Failed to send sniper alert notification: {e}")
await db.commit()
logger.info(f"Sniper alert matching complete: {matches_created} matches created, {notifications_sent} notifications sent")
except Exception as e:
logger.exception(f"Sniper alert matching failed: {e}")
def _auction_matches_alert(auction: "DomainAuction", alert: "SniperAlert") -> bool:
"""Check if an auction matches the criteria of a sniper alert."""
domain_name = auction.domain.rsplit('.', 1)[0] if '.' in auction.domain else auction.domain
# Check keyword filter
if alert.keyword:
if alert.keyword.lower() not in domain_name.lower():
return False
# Check TLD filter
if alert.tlds:
allowed_tlds = [t.strip().lower() for t in alert.tlds.split(',')]
if auction.tld.lower() not in allowed_tlds:
return False
# Check length filters
if alert.min_length and len(domain_name) < alert.min_length:
return False
if alert.max_length and len(domain_name) > alert.max_length:
return False
# Check price filters
if alert.min_price and auction.current_bid < alert.min_price:
return False
if alert.max_price and auction.current_bid > alert.max_price:
return False
# Check exclusion filters
if alert.exclude_numbers:
if any(c.isdigit() for c in domain_name):
return False
if alert.exclude_hyphens:
if '-' in domain_name:
return False
if alert.exclude_chars:
excluded = set(alert.exclude_chars.lower())
if any(c in excluded for c in domain_name.lower()):
return False
return True