CRITICAL FIXES: - API: Added end_time > now() filter to all auction queries - Scheduler: Cleanup expired auctions every 15 minutes - Scheduler: Scrape auctions every 2 hours (was 1 hour) - Scheduler: Sniper alert matching every 30 minutes Affected endpoints: - GET /auctions (search) - GET /auctions/feed (unified) - GET /auctions/hot - GET /auctions/ending-soon (already had filter) Updated MARKET_CONCEPT.md with: - 3 pillars: Pounce Direct, Live Auctions, Drops Tomorrow - Data freshness architecture - Unicorn roadmap
556 lines
20 KiB
Python
556 lines
20 KiB
Python
"""Background scheduler for domain checks, TLD price scraping, auctions, and notifications."""
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import TYPE_CHECKING
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from sqlalchemy import select, and_
|
|
|
|
from app.config import get_settings
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.domain import Domain, DomainCheck
|
|
from app.models.user import User
|
|
from app.models.subscription import Subscription, SubscriptionTier, TIER_CONFIG
|
|
from app.services.domain_checker import domain_checker
|
|
from app.services.email_service import email_service
|
|
from app.services.price_tracker import price_tracker
|
|
|
|
if TYPE_CHECKING:
|
|
from app.models.sniper_alert import SniperAlert
|
|
from app.models.auction import DomainAuction
|
|
|
|
logger = logging.getLogger(__name__)
|
|
settings = get_settings()
|
|
|
|
# Global scheduler instance
|
|
scheduler = AsyncIOScheduler()
|
|
|
|
|
|
async def scrape_tld_prices():
|
|
"""Scheduled task to scrape TLD prices from public sources."""
|
|
from app.services.tld_scraper.aggregator import tld_aggregator
|
|
|
|
logger.info("Starting scheduled TLD price scrape...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
result = await tld_aggregator.run_scrape(db)
|
|
|
|
logger.info(
|
|
f"TLD scrape completed: {result.status}, "
|
|
f"{result.tlds_scraped} TLDs, {result.prices_saved} prices saved"
|
|
)
|
|
|
|
if result.errors:
|
|
logger.warning(f"Scrape errors: {result.errors}")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"TLD price scrape failed: {e}")
|
|
|
|
|
|
async def check_domains_by_frequency(frequency: str):
|
|
"""Check availability of domains based on their subscription frequency.
|
|
|
|
Args:
|
|
frequency: One of 'daily', 'hourly', 'realtime' (10-min)
|
|
"""
|
|
logger.info(f"Starting {frequency} domain check...")
|
|
start_time = datetime.utcnow()
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
# Get users with matching check frequency
|
|
tiers_for_frequency = []
|
|
for tier, config in TIER_CONFIG.items():
|
|
if config['check_frequency'] == frequency:
|
|
tiers_for_frequency.append(tier)
|
|
# Realtime includes hourly and daily too (more frequent = superset)
|
|
elif frequency == 'realtime':
|
|
tiers_for_frequency.append(tier)
|
|
elif frequency == 'hourly' and config['check_frequency'] in ['hourly', 'realtime']:
|
|
tiers_for_frequency.append(tier)
|
|
|
|
# Get domains from users with matching subscription tier
|
|
from sqlalchemy.orm import joinedload
|
|
result = await db.execute(
|
|
select(Domain)
|
|
.join(User, Domain.user_id == User.id)
|
|
.outerjoin(Subscription, Subscription.user_id == User.id)
|
|
.where(
|
|
(Subscription.tier.in_(tiers_for_frequency)) |
|
|
(Subscription.id.is_(None) & (frequency == 'daily')) # Scout users (no subscription) = daily
|
|
)
|
|
)
|
|
domains = result.scalars().all()
|
|
|
|
logger.info(f"Checking {len(domains)} domains...")
|
|
|
|
checked = 0
|
|
errors = 0
|
|
newly_available = []
|
|
|
|
for domain in domains:
|
|
try:
|
|
# Check domain availability
|
|
check_result = await domain_checker.check_domain(domain.name)
|
|
|
|
# Track if domain became available
|
|
was_taken = not domain.is_available
|
|
is_now_available = check_result.is_available
|
|
|
|
if was_taken and is_now_available and domain.notify_on_available:
|
|
newly_available.append(domain)
|
|
|
|
# Update domain
|
|
domain.status = check_result.status
|
|
domain.is_available = check_result.is_available
|
|
domain.registrar = check_result.registrar
|
|
domain.expiration_date = check_result.expiration_date
|
|
domain.last_checked = datetime.utcnow()
|
|
|
|
# Create check record
|
|
check = DomainCheck(
|
|
domain_id=domain.id,
|
|
status=check_result.status,
|
|
is_available=check_result.is_available,
|
|
response_data=str(check_result.to_dict()),
|
|
checked_at=datetime.utcnow(),
|
|
)
|
|
db.add(check)
|
|
|
|
checked += 1
|
|
|
|
# Small delay to avoid rate limiting
|
|
await asyncio.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking domain {domain.name}: {e}")
|
|
errors += 1
|
|
|
|
await db.commit()
|
|
|
|
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
|
logger.info(
|
|
f"Domain check complete. Checked: {checked}, Errors: {errors}, "
|
|
f"Newly available: {len(newly_available)}, Time: {elapsed:.2f}s"
|
|
)
|
|
|
|
# Send notifications for newly available domains
|
|
if newly_available:
|
|
logger.info(f"Domains that became available: {[d.name for d in newly_available]}")
|
|
await send_domain_availability_alerts(db, newly_available)
|
|
|
|
|
|
async def check_all_domains():
|
|
"""Legacy function - checks all domains (daily)."""
|
|
await check_domains_by_frequency('daily')
|
|
|
|
|
|
async def check_hourly_domains():
|
|
"""Check domains for Trader users (hourly)."""
|
|
await check_domains_by_frequency('hourly')
|
|
|
|
|
|
async def check_realtime_domains():
|
|
"""Check domains for Tycoon users (every 10 minutes)."""
|
|
await check_domains_by_frequency('realtime')
|
|
|
|
|
|
def setup_scheduler():
|
|
"""Configure and start the scheduler."""
|
|
# Daily domain check for Scout users at configured hour
|
|
scheduler.add_job(
|
|
check_all_domains,
|
|
CronTrigger(hour=settings.check_hour, minute=settings.check_minute),
|
|
id="daily_domain_check",
|
|
name="Daily Domain Check (Scout)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Hourly domain check for Trader users
|
|
scheduler.add_job(
|
|
check_hourly_domains,
|
|
CronTrigger(minute=0), # Every hour at :00
|
|
id="hourly_domain_check",
|
|
name="Hourly Domain Check (Trader)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# 10-minute domain check for Tycoon users
|
|
scheduler.add_job(
|
|
check_realtime_domains,
|
|
CronTrigger(minute='*/10'), # Every 10 minutes
|
|
id="realtime_domain_check",
|
|
name="10-Minute Domain Check (Tycoon)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Daily TLD price scrape at 03:00 UTC
|
|
scheduler.add_job(
|
|
scrape_tld_prices,
|
|
CronTrigger(hour=3, minute=0),
|
|
id="daily_tld_scrape",
|
|
name="Daily TLD Price Scrape",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Price change check at 04:00 UTC (after scrape completes)
|
|
scheduler.add_job(
|
|
check_price_changes,
|
|
CronTrigger(hour=4, minute=0),
|
|
id="daily_price_check",
|
|
name="Daily Price Change Check",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Auction scrape every 2 hours (at :30 to avoid conflict with other jobs)
|
|
scheduler.add_job(
|
|
scrape_auctions,
|
|
CronTrigger(hour='*/2', minute=30), # Every 2 hours at :30
|
|
id="auction_scrape",
|
|
name="Auction Scrape (2h)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Cleanup expired auctions every 15 minutes (CRITICAL for data freshness!)
|
|
scheduler.add_job(
|
|
cleanup_expired_auctions,
|
|
CronTrigger(minute='*/15'), # Every 15 minutes
|
|
id="auction_cleanup",
|
|
name="Expired Auction Cleanup (15m)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Sniper alert matching every 30 minutes
|
|
scheduler.add_job(
|
|
match_sniper_alerts,
|
|
CronTrigger(minute='*/30'), # Every 30 minutes
|
|
id="sniper_matching",
|
|
name="Sniper Alert Matching (30m)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
logger.info(
|
|
f"Scheduler configured:"
|
|
f"\n - Scout domain check at {settings.check_hour:02d}:{settings.check_minute:02d} (daily)"
|
|
f"\n - Trader domain check every hour at :00"
|
|
f"\n - Tycoon domain check every 10 minutes"
|
|
f"\n - TLD price scrape at 03:00 UTC"
|
|
f"\n - Price change alerts at 04:00 UTC"
|
|
f"\n - Auction scrape every 2 hours at :30"
|
|
f"\n - Expired auction cleanup every 15 minutes"
|
|
f"\n - Sniper alert matching every 30 minutes"
|
|
)
|
|
|
|
|
|
def start_scheduler():
|
|
"""Start the scheduler if not already running."""
|
|
if not scheduler.running:
|
|
setup_scheduler()
|
|
scheduler.start()
|
|
logger.info("Scheduler started")
|
|
|
|
|
|
def stop_scheduler():
|
|
"""Stop the scheduler."""
|
|
if scheduler.running:
|
|
scheduler.shutdown()
|
|
logger.info("Scheduler stopped")
|
|
|
|
|
|
async def run_manual_check():
|
|
"""Run domain check manually (for testing or on-demand)."""
|
|
await check_all_domains()
|
|
|
|
|
|
async def run_manual_tld_scrape():
|
|
"""Run TLD price scrape manually (for testing or on-demand)."""
|
|
await scrape_tld_prices()
|
|
|
|
|
|
async def send_domain_availability_alerts(db, domains: list[Domain]):
|
|
"""Send email alerts for newly available domains."""
|
|
if not email_service.is_enabled:
|
|
logger.info("Email service not configured, skipping domain alerts")
|
|
return
|
|
|
|
alerts_sent = 0
|
|
|
|
for domain in domains:
|
|
try:
|
|
# Get domain owner
|
|
result = await db.execute(
|
|
select(User).where(User.id == domain.user_id)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user and user.email:
|
|
success = await email_service.send_domain_available_alert(
|
|
to_email=user.email,
|
|
domain=domain.name,
|
|
user_name=user.name,
|
|
)
|
|
if success:
|
|
alerts_sent += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send alert for {domain.name}: {e}")
|
|
|
|
logger.info(f"Sent {alerts_sent} domain availability alerts")
|
|
|
|
|
|
async def check_price_changes():
|
|
"""Check for TLD price changes and send alerts."""
|
|
logger.info("Checking for TLD price changes...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
# Detect changes in last 24 hours
|
|
changes = await price_tracker.detect_price_changes(db, hours=24)
|
|
|
|
if changes:
|
|
logger.info(f"Found {len(changes)} significant price changes")
|
|
|
|
# Send alerts (if email is configured)
|
|
alerts_sent = await price_tracker.send_price_alerts(db, changes)
|
|
logger.info(f"Sent {alerts_sent} price change alerts")
|
|
else:
|
|
logger.info("No significant price changes detected")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Price change check failed: {e}")
|
|
|
|
|
|
async def cleanup_expired_auctions():
|
|
"""
|
|
Mark expired auctions as inactive and delete very old ones.
|
|
|
|
This is CRITICAL for data freshness! Without this, the Market page
|
|
would show auctions that ended days ago.
|
|
|
|
Runs every 15 minutes to ensure users always see live data.
|
|
"""
|
|
from app.models.auction import DomainAuction
|
|
from sqlalchemy import update, delete
|
|
|
|
logger.info("Starting expired auction cleanup...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
now = datetime.utcnow()
|
|
|
|
# 1. Mark ended auctions as inactive
|
|
stmt = (
|
|
update(DomainAuction)
|
|
.where(
|
|
and_(
|
|
DomainAuction.end_time < now,
|
|
DomainAuction.is_active == True
|
|
)
|
|
)
|
|
.values(is_active=False)
|
|
)
|
|
result = await db.execute(stmt)
|
|
marked_inactive = result.rowcount
|
|
|
|
# 2. Delete very old inactive auctions (> 7 days)
|
|
cutoff = now - timedelta(days=7)
|
|
stmt = delete(DomainAuction).where(
|
|
and_(
|
|
DomainAuction.is_active == False,
|
|
DomainAuction.end_time < cutoff
|
|
)
|
|
)
|
|
result = await db.execute(stmt)
|
|
deleted = result.rowcount
|
|
|
|
await db.commit()
|
|
|
|
if marked_inactive > 0 or deleted > 0:
|
|
logger.info(f"Auction cleanup: {marked_inactive} marked inactive, {deleted} deleted")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Auction cleanup failed: {e}")
|
|
|
|
|
|
async def scrape_auctions():
|
|
"""Scheduled task to scrape domain auctions from public sources."""
|
|
from app.services.auction_scraper import auction_scraper
|
|
|
|
logger.info("Starting scheduled auction scrape...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
result = await auction_scraper.scrape_all_platforms(db)
|
|
|
|
logger.info(
|
|
f"Auction scrape completed: "
|
|
f"{result['total_found']} found, {result['total_new']} new, "
|
|
f"{result['total_updated']} updated"
|
|
)
|
|
|
|
if result.get('errors'):
|
|
logger.warning(f"Scrape errors: {result['errors']}")
|
|
|
|
# Match new auctions against Sniper Alerts
|
|
if result['total_new'] > 0:
|
|
await match_sniper_alerts()
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Auction scrape failed: {e}")
|
|
|
|
|
|
async def match_sniper_alerts():
|
|
"""Match active sniper alerts against current auctions and notify users."""
|
|
from app.models.sniper_alert import SniperAlert, SniperAlertMatch
|
|
from app.models.auction import DomainAuction
|
|
|
|
logger.info("Matching sniper alerts against new auctions...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
# Get all active sniper alerts
|
|
alerts_result = await db.execute(
|
|
select(SniperAlert).where(SniperAlert.is_active == True)
|
|
)
|
|
alerts = alerts_result.scalars().all()
|
|
|
|
if not alerts:
|
|
logger.info("No active sniper alerts to match")
|
|
return
|
|
|
|
# Get recent auctions (added in last 2 hours)
|
|
cutoff = datetime.utcnow() - timedelta(hours=2)
|
|
auctions_result = await db.execute(
|
|
select(DomainAuction).where(
|
|
and_(
|
|
DomainAuction.is_active == True,
|
|
DomainAuction.scraped_at >= cutoff,
|
|
)
|
|
)
|
|
)
|
|
auctions = auctions_result.scalars().all()
|
|
|
|
if not auctions:
|
|
logger.info("No recent auctions to match against")
|
|
return
|
|
|
|
matches_created = 0
|
|
notifications_sent = 0
|
|
|
|
for alert in alerts:
|
|
matching_auctions = []
|
|
|
|
for auction in auctions:
|
|
if _auction_matches_alert(auction, alert):
|
|
matching_auctions.append(auction)
|
|
|
|
if matching_auctions:
|
|
for auction in matching_auctions:
|
|
# Check if this match already exists
|
|
existing = await db.execute(
|
|
select(SniperAlertMatch).where(
|
|
and_(
|
|
SniperAlertMatch.alert_id == alert.id,
|
|
SniperAlertMatch.domain == auction.domain,
|
|
)
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
continue
|
|
|
|
# Create new match
|
|
match = SniperAlertMatch(
|
|
alert_id=alert.id,
|
|
domain=auction.domain,
|
|
platform=auction.platform,
|
|
current_bid=auction.current_bid,
|
|
end_time=auction.end_time,
|
|
auction_url=auction.auction_url,
|
|
matched_at=datetime.utcnow(),
|
|
)
|
|
db.add(match)
|
|
matches_created += 1
|
|
|
|
# Update alert last_triggered
|
|
alert.last_triggered = datetime.utcnow()
|
|
|
|
# Send notification if enabled
|
|
if alert.notify_email:
|
|
try:
|
|
user_result = await db.execute(
|
|
select(User).where(User.id == alert.user_id)
|
|
)
|
|
user = user_result.scalar_one_or_none()
|
|
|
|
if user and email_service.is_enabled:
|
|
# Send email with matching domains
|
|
domains_list = ", ".join([a.domain for a in matching_auctions[:5]])
|
|
await email_service.send_email(
|
|
to_email=user.email,
|
|
subject=f"🎯 Sniper Alert: {len(matching_auctions)} matching domains found!",
|
|
html_content=f"""
|
|
<h2>Your Sniper Alert "{alert.name}" matched!</h2>
|
|
<p>We found {len(matching_auctions)} domains matching your criteria:</p>
|
|
<ul>
|
|
{"".join(f"<li><strong>{a.domain}</strong> - ${a.current_bid:.0f} on {a.platform}</li>" for a in matching_auctions[:10])}
|
|
</ul>
|
|
<p><a href="https://pounce.ch/command/alerts">View all matches in your Command Center</a></p>
|
|
"""
|
|
)
|
|
notifications_sent += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to send sniper alert notification: {e}")
|
|
|
|
await db.commit()
|
|
logger.info(f"Sniper alert matching complete: {matches_created} matches created, {notifications_sent} notifications sent")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Sniper alert matching failed: {e}")
|
|
|
|
|
|
def _auction_matches_alert(auction: "DomainAuction", alert: "SniperAlert") -> bool:
|
|
"""Check if an auction matches the criteria of a sniper alert."""
|
|
domain_name = auction.domain.rsplit('.', 1)[0] if '.' in auction.domain else auction.domain
|
|
|
|
# Check keyword filter
|
|
if alert.keyword:
|
|
if alert.keyword.lower() not in domain_name.lower():
|
|
return False
|
|
|
|
# Check TLD filter
|
|
if alert.tlds:
|
|
allowed_tlds = [t.strip().lower() for t in alert.tlds.split(',')]
|
|
if auction.tld.lower() not in allowed_tlds:
|
|
return False
|
|
|
|
# Check length filters
|
|
if alert.min_length and len(domain_name) < alert.min_length:
|
|
return False
|
|
if alert.max_length and len(domain_name) > alert.max_length:
|
|
return False
|
|
|
|
# Check price filters
|
|
if alert.min_price and auction.current_bid < alert.min_price:
|
|
return False
|
|
if alert.max_price and auction.current_bid > alert.max_price:
|
|
return False
|
|
|
|
# Check exclusion filters
|
|
if alert.exclude_numbers:
|
|
if any(c.isdigit() for c in domain_name):
|
|
return False
|
|
|
|
if alert.exclude_hyphens:
|
|
if '-' in domain_name:
|
|
return False
|
|
|
|
if alert.exclude_chars:
|
|
excluded = set(alert.exclude_chars.lower())
|
|
if any(c in excluded for c in domain_name.lower()):
|
|
return False
|
|
|
|
return True
|
|
|