- Run auction cleanup every 5 minutes and treat end_time <= now as ended - Add admin endpoints to upload/inspect Playwright cookies (free alternative to paid proxies) - Add client-side guardrail to never render ended auctions in Terminal Market
889 lines
34 KiB
Python
889 lines
34 KiB
Python
"""Background scheduler for domain checks, TLD price scraping, auctions, and notifications."""
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import TYPE_CHECKING
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from sqlalchemy import select, and_
|
|
|
|
from app.config import get_settings
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.domain import Domain, DomainCheck
|
|
from app.models.user import User
|
|
from app.models.subscription import Subscription, SubscriptionTier, TIER_CONFIG
|
|
from app.services.domain_checker import domain_checker
|
|
from app.services.email_service import email_service
|
|
from app.services.price_tracker import price_tracker
|
|
|
|
if TYPE_CHECKING:
|
|
from app.models.sniper_alert import SniperAlert
|
|
from app.models.auction import DomainAuction
|
|
|
|
logger = logging.getLogger(__name__)
|
|
settings = get_settings()
|
|
|
|
# Global scheduler instance
|
|
scheduler = AsyncIOScheduler()
|
|
|
|
|
|
async def scrape_tld_prices():
|
|
"""Scheduled task to scrape TLD prices from public sources."""
|
|
from app.services.tld_scraper.aggregator import tld_aggregator
|
|
|
|
logger.info("Starting scheduled TLD price scrape...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
result = await tld_aggregator.run_scrape(db)
|
|
|
|
logger.info(
|
|
f"TLD scrape completed: {result.status}, "
|
|
f"{result.tlds_scraped} TLDs, {result.prices_saved} prices saved"
|
|
)
|
|
|
|
if result.errors:
|
|
logger.warning(f"Scrape errors: {result.errors}")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"TLD price scrape failed: {e}")
|
|
|
|
|
|
async def check_domains_by_frequency(frequency: str):
|
|
"""Check availability of domains based on their subscription frequency.
|
|
|
|
Args:
|
|
frequency: One of 'daily', 'hourly', 'realtime' (10-min)
|
|
"""
|
|
logger.info(f"Starting {frequency} domain check...")
|
|
start_time = datetime.utcnow()
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
# Get users with matching check frequency
|
|
tiers_for_frequency = []
|
|
for tier, config in TIER_CONFIG.items():
|
|
if config['check_frequency'] == frequency:
|
|
tiers_for_frequency.append(tier)
|
|
# Realtime includes hourly and daily too (more frequent = superset)
|
|
elif frequency == 'realtime':
|
|
tiers_for_frequency.append(tier)
|
|
elif frequency == 'hourly' and config['check_frequency'] in ['hourly', 'realtime']:
|
|
tiers_for_frequency.append(tier)
|
|
|
|
# Get domains from users with matching subscription tier
|
|
from sqlalchemy.orm import joinedload
|
|
result = await db.execute(
|
|
select(Domain)
|
|
.join(User, Domain.user_id == User.id)
|
|
.outerjoin(Subscription, Subscription.user_id == User.id)
|
|
.where(
|
|
(Subscription.tier.in_(tiers_for_frequency)) |
|
|
(Subscription.id.is_(None) & (frequency == 'daily')) # Scout users (no subscription) = daily
|
|
)
|
|
)
|
|
domains = result.scalars().all()
|
|
|
|
logger.info(f"Checking {len(domains)} domains...")
|
|
|
|
checked = 0
|
|
errors = 0
|
|
newly_available = []
|
|
|
|
for domain in domains:
|
|
try:
|
|
# Check domain availability
|
|
check_result = await domain_checker.check_domain(domain.name)
|
|
|
|
# Track if domain became available
|
|
was_taken = not domain.is_available
|
|
is_now_available = check_result.is_available
|
|
|
|
if was_taken and is_now_available and domain.notify_on_available:
|
|
newly_available.append(domain)
|
|
|
|
# Update domain
|
|
domain.status = check_result.status
|
|
domain.is_available = check_result.is_available
|
|
domain.registrar = check_result.registrar
|
|
domain.expiration_date = check_result.expiration_date
|
|
domain.last_checked = datetime.utcnow()
|
|
|
|
# Create check record
|
|
check = DomainCheck(
|
|
domain_id=domain.id,
|
|
status=check_result.status,
|
|
is_available=check_result.is_available,
|
|
response_data=str(check_result.to_dict()),
|
|
checked_at=datetime.utcnow(),
|
|
)
|
|
db.add(check)
|
|
|
|
checked += 1
|
|
|
|
# Small delay to avoid rate limiting
|
|
await asyncio.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking domain {domain.name}: {e}")
|
|
errors += 1
|
|
|
|
await db.commit()
|
|
|
|
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
|
logger.info(
|
|
f"Domain check complete. Checked: {checked}, Errors: {errors}, "
|
|
f"Newly available: {len(newly_available)}, Time: {elapsed:.2f}s"
|
|
)
|
|
|
|
# Send notifications for newly available domains
|
|
if newly_available:
|
|
logger.info(f"Domains that became available: {[d.name for d in newly_available]}")
|
|
await send_domain_availability_alerts(db, newly_available)
|
|
|
|
|
|
async def check_all_domains():
|
|
"""Legacy function - checks all domains (daily)."""
|
|
await check_domains_by_frequency('daily')
|
|
|
|
|
|
async def check_hourly_domains():
|
|
"""Check domains for Trader users (hourly)."""
|
|
await check_domains_by_frequency('hourly')
|
|
|
|
|
|
async def check_realtime_domains():
|
|
"""Check domains for Tycoon users (every 10 minutes)."""
|
|
await check_domains_by_frequency('realtime')
|
|
|
|
|
|
async def send_weekly_digests():
|
|
"""
|
|
Send weekly summary emails to all users.
|
|
|
|
Includes: domains tracked, status changes, available domains, etc.
|
|
"""
|
|
logger.info("📊 Sending weekly digest emails...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
# Get all users with domains
|
|
users_result = await db.execute(
|
|
select(User).where(User.is_verified == True)
|
|
)
|
|
users = users_result.scalars().all()
|
|
|
|
sent = 0
|
|
for user in users:
|
|
try:
|
|
# Get user's domains
|
|
domains_result = await db.execute(
|
|
select(Domain).where(Domain.user_id == user.id)
|
|
)
|
|
domains = domains_result.scalars().all()
|
|
|
|
if not domains:
|
|
continue
|
|
|
|
# Calculate stats
|
|
total_domains = len(domains)
|
|
available_domains = [d.name for d in domains if d.is_available]
|
|
|
|
# Get status changes from last week
|
|
week_ago = datetime.utcnow() - timedelta(days=7)
|
|
checks_result = await db.execute(
|
|
select(DomainCheck)
|
|
.join(Domain, DomainCheck.domain_id == Domain.id)
|
|
.where(
|
|
and_(
|
|
Domain.user_id == user.id,
|
|
DomainCheck.checked_at >= week_ago,
|
|
)
|
|
)
|
|
)
|
|
checks = checks_result.scalars().all()
|
|
|
|
# Count status changes (simplified - just count checks)
|
|
status_changes = len(set(c.domain_id for c in checks))
|
|
|
|
if email_service.is_configured():
|
|
await email_service.send_weekly_digest(
|
|
to_email=user.email,
|
|
total_domains=total_domains,
|
|
status_changes=status_changes,
|
|
price_alerts=0, # Could track this separately
|
|
available_domains=available_domains[:5], # Limit to 5
|
|
)
|
|
sent += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send digest to {user.email}: {e}")
|
|
|
|
logger.info(f"📧 Sent {sent} weekly digest emails")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Weekly digest failed: {e}")
|
|
|
|
|
|
async def check_expiring_domains():
|
|
"""
|
|
Check for domains expiring soon and send warnings.
|
|
|
|
Sends alerts for domains expiring within 30 days.
|
|
"""
|
|
logger.info("📅 Checking for expiring domains...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
# Get domains expiring within 30 days
|
|
cutoff = datetime.utcnow() + timedelta(days=30)
|
|
|
|
result = await db.execute(
|
|
select(Domain)
|
|
.where(
|
|
and_(
|
|
Domain.is_available == False,
|
|
Domain.expiration_date != None,
|
|
Domain.expiration_date <= cutoff,
|
|
Domain.expiration_date > datetime.utcnow(), # Not yet expired
|
|
Domain.notify_on_available == True, # User wants notifications
|
|
)
|
|
)
|
|
)
|
|
expiring = result.scalars().all()
|
|
|
|
if not expiring:
|
|
logger.info("No domains expiring soon")
|
|
return
|
|
|
|
logger.info(f"Found {len(expiring)} domains expiring within 30 days")
|
|
|
|
# Group by user and send alerts
|
|
user_domains = {}
|
|
for domain in expiring:
|
|
if domain.user_id not in user_domains:
|
|
user_domains[domain.user_id] = []
|
|
days_left = (domain.expiration_date - datetime.utcnow()).days
|
|
user_domains[domain.user_id].append({
|
|
'name': domain.name,
|
|
'days_left': days_left,
|
|
'expiration_date': domain.expiration_date,
|
|
})
|
|
|
|
alerts_sent = 0
|
|
for user_id, domains_list in user_domains.items():
|
|
try:
|
|
user_result = await db.execute(
|
|
select(User).where(User.id == user_id)
|
|
)
|
|
user = user_result.scalar_one_or_none()
|
|
|
|
if user and user.email and email_service.is_configured():
|
|
# Build email content
|
|
domain_lines = "\n".join([
|
|
f"• {d['name']} - {d['days_left']} days left"
|
|
for d in sorted(domains_list, key=lambda x: x['days_left'])
|
|
])
|
|
|
|
await email_service.send_email(
|
|
to_email=user.email,
|
|
subject=f"⏰ {len(domains_list)} domain{'s' if len(domains_list) > 1 else ''} expiring soon",
|
|
html_content=f"""
|
|
<h2 style="margin: 0 0 24px 0; font-size: 20px; font-weight: 600; color: #000000;">
|
|
Domains expiring soon
|
|
</h2>
|
|
<p style="margin: 0 0 24px 0; font-size: 15px; color: #333333; line-height: 1.6;">
|
|
The following domains on your watchlist are expiring within 30 days:
|
|
</p>
|
|
<div style="margin: 24px 0; padding: 20px; background: #fafafa; border-radius: 6px; border-left: 3px solid #f59e0b;">
|
|
{"".join(f'<p style="margin: 8px 0; font-family: monospace;"><strong>{d["name"]}</strong> — <span style="color: {"#ef4444" if d["days_left"] <= 7 else "#f59e0b"};">{d["days_left"]} days left</span></p>' for d in sorted(domains_list, key=lambda x: x["days_left"]))}
|
|
</div>
|
|
<p style="margin: 24px 0 0 0; font-size: 14px; color: #666666;">
|
|
Keep an eye on these domains — they may become available soon.
|
|
</p>
|
|
""",
|
|
text_content=f"Domains expiring soon:\n{domain_lines}",
|
|
)
|
|
alerts_sent += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send expiry alert to user {user_id}: {e}")
|
|
|
|
logger.info(f"📧 Sent {alerts_sent} expiry warning emails")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Expiry check failed: {e}")
|
|
|
|
|
|
async def run_health_checks():
|
|
"""
|
|
Run automated health checks on all watched domains.
|
|
|
|
This runs 1x daily to update domain health status (DNS, HTTP, SSL).
|
|
Health data is cached and used to detect weakening domains.
|
|
"""
|
|
from app.services.domain_health import get_health_checker
|
|
from app.models.domain import DomainHealthCache
|
|
|
|
logger.info("🏥 Starting automated health checks...")
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
# Get all watched domains (registered, not available)
|
|
result = await db.execute(
|
|
select(Domain).where(Domain.is_available == False)
|
|
)
|
|
domains = result.scalars().all()
|
|
|
|
logger.info(f"Running health checks on {len(domains)} domains...")
|
|
|
|
health_checker = get_health_checker()
|
|
checked = 0
|
|
errors = 0
|
|
status_changes = []
|
|
|
|
for domain in domains:
|
|
try:
|
|
# Run health check
|
|
report = await health_checker.check_domain(domain.name)
|
|
|
|
# Check for status changes (if we have previous data)
|
|
# Get existing cache
|
|
cache_result = await db.execute(
|
|
select(DomainHealthCache).where(DomainHealthCache.domain_id == domain.id)
|
|
)
|
|
existing_cache = cache_result.scalar_one_or_none()
|
|
|
|
old_status = existing_cache.status if existing_cache else None
|
|
new_status = report.status.value
|
|
|
|
# Detect significant changes
|
|
if old_status and old_status != new_status:
|
|
status_changes.append({
|
|
'domain': domain.name,
|
|
'old_status': old_status,
|
|
'new_status': new_status,
|
|
'user_id': domain.user_id,
|
|
})
|
|
logger.info(f"⚠️ Status change: {domain.name} {old_status} → {new_status}")
|
|
|
|
# Serialize data to JSON strings
|
|
import json
|
|
signals_json = json.dumps(report.signals) if report.signals else None
|
|
|
|
# Update or create cache
|
|
if existing_cache:
|
|
existing_cache.status = new_status
|
|
existing_cache.score = report.score
|
|
existing_cache.signals = signals_json
|
|
existing_cache.checked_at = datetime.utcnow()
|
|
else:
|
|
# Create new cache entry
|
|
new_cache = DomainHealthCache(
|
|
domain_id=domain.id,
|
|
status=new_status,
|
|
score=report.score,
|
|
signals=signals_json,
|
|
checked_at=datetime.utcnow(),
|
|
)
|
|
db.add(new_cache)
|
|
|
|
checked += 1
|
|
|
|
# Small delay to avoid overwhelming DNS servers
|
|
await asyncio.sleep(0.3)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Health check failed for {domain.name}: {e}")
|
|
errors += 1
|
|
|
|
await db.commit()
|
|
|
|
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
|
logger.info(
|
|
f"✅ Health checks complete. Checked: {checked}, Errors: {errors}, "
|
|
f"Status changes: {len(status_changes)}, Time: {elapsed:.1f}s"
|
|
)
|
|
|
|
# Send alerts for critical status changes (domains becoming critical)
|
|
if status_changes:
|
|
await send_health_change_alerts(db, status_changes)
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Health check job failed: {e}")
|
|
|
|
|
|
async def send_health_change_alerts(db, changes: list):
|
|
"""Send alerts when domains have significant health changes."""
|
|
if not email_service.is_configured():
|
|
return
|
|
|
|
for change in changes:
|
|
# Only alert on critical changes
|
|
if change['new_status'] == 'critical':
|
|
try:
|
|
result = await db.execute(
|
|
select(User).where(User.id == change['user_id'])
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user and user.email:
|
|
# Use domain available template as fallback (domain might be dropping)
|
|
await email_service.send_domain_available(
|
|
to_email=user.email,
|
|
domain=change['domain'],
|
|
register_url=f"https://pounce.ch/terminal/watchlist",
|
|
)
|
|
logger.info(f"📧 Critical health alert sent for {change['domain']}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send health alert: {e}")
|
|
|
|
|
|
def setup_scheduler():
|
|
"""Configure and start the scheduler."""
|
|
# Daily domain check for Scout users at configured hour
|
|
scheduler.add_job(
|
|
check_all_domains,
|
|
CronTrigger(hour=settings.check_hour, minute=settings.check_minute),
|
|
id="daily_domain_check",
|
|
name="Daily Domain Check (Scout)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Hourly domain check for Trader users
|
|
scheduler.add_job(
|
|
check_hourly_domains,
|
|
CronTrigger(minute=0), # Every hour at :00
|
|
id="hourly_domain_check",
|
|
name="Hourly Domain Check (Trader)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# 10-minute domain check for Tycoon users
|
|
scheduler.add_job(
|
|
check_realtime_domains,
|
|
CronTrigger(minute='*/10'), # Every 10 minutes
|
|
id="realtime_domain_check",
|
|
name="10-Minute Domain Check (Tycoon)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Automated health checks 1x daily at 06:00 UTC
|
|
scheduler.add_job(
|
|
run_health_checks,
|
|
CronTrigger(hour=6, minute=0),
|
|
id="daily_health_check",
|
|
name="Daily Health Check (All Domains)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Expiry warnings 1x weekly (Mondays at 08:00 UTC)
|
|
scheduler.add_job(
|
|
check_expiring_domains,
|
|
CronTrigger(day_of_week='mon', hour=8, minute=0),
|
|
id="weekly_expiry_check",
|
|
name="Weekly Expiry Warning",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Weekly digest (Sundays at 10:00 UTC)
|
|
scheduler.add_job(
|
|
send_weekly_digests,
|
|
CronTrigger(day_of_week='sun', hour=10, minute=0),
|
|
id="weekly_digest",
|
|
name="Weekly Digest Email",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# TLD price scrape 2x daily for better historical data
|
|
# Morning scrape at 03:00 UTC
|
|
scheduler.add_job(
|
|
scrape_tld_prices,
|
|
CronTrigger(hour=3, minute=0),
|
|
id="morning_tld_scrape",
|
|
name="TLD Price Scrape (Morning 03:00 UTC)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Afternoon scrape at 15:00 UTC (captures price changes during US business hours)
|
|
scheduler.add_job(
|
|
scrape_tld_prices,
|
|
CronTrigger(hour=15, minute=0),
|
|
id="afternoon_tld_scrape",
|
|
name="TLD Price Scrape (Afternoon 15:00 UTC)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Price change check at 04:00 UTC (after morning scrape completes)
|
|
scheduler.add_job(
|
|
check_price_changes,
|
|
CronTrigger(hour=4, minute=0),
|
|
id="morning_price_check",
|
|
name="Price Change Check (Morning)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Price change check at 16:00 UTC (after afternoon scrape)
|
|
scheduler.add_job(
|
|
check_price_changes,
|
|
CronTrigger(hour=16, minute=0),
|
|
id="afternoon_price_check",
|
|
name="Price Change Check (Afternoon)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Auction scrape every 2 hours (at :30 to avoid conflict with other jobs)
|
|
scheduler.add_job(
|
|
scrape_auctions,
|
|
CronTrigger(hour='*/2', minute=30), # Every 2 hours at :30
|
|
id="auction_scrape",
|
|
name="Auction Scrape (2h)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Cleanup expired auctions every 5 minutes (CRITICAL for data freshness!)
|
|
scheduler.add_job(
|
|
cleanup_expired_auctions,
|
|
CronTrigger(minute='*/5'), # Every 5 minutes
|
|
id="auction_cleanup",
|
|
name="Expired Auction Cleanup (5m)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# Sniper alert matching every 30 minutes
|
|
scheduler.add_job(
|
|
match_sniper_alerts,
|
|
CronTrigger(minute='*/30'), # Every 30 minutes
|
|
id="sniper_matching",
|
|
name="Sniper Alert Matching (30m)",
|
|
replace_existing=True,
|
|
)
|
|
|
|
logger.info(
|
|
f"Scheduler configured:"
|
|
f"\n - Scout domain check at {settings.check_hour:02d}:{settings.check_minute:02d} (daily)"
|
|
f"\n - Trader domain check every hour at :00"
|
|
f"\n - Tycoon domain check every 10 minutes"
|
|
f"\n - TLD price scrape 2x daily at 03:00 & 15:00 UTC"
|
|
f"\n - Price change alerts at 04:00 & 16:00 UTC"
|
|
f"\n - Auction scrape every 2 hours at :30"
|
|
f"\n - Expired auction cleanup every 15 minutes"
|
|
f"\n - Sniper alert matching every 30 minutes"
|
|
)
|
|
|
|
|
|
def start_scheduler():
|
|
"""Start the scheduler if not already running."""
|
|
if not scheduler.running:
|
|
setup_scheduler()
|
|
scheduler.start()
|
|
logger.info("Scheduler started")
|
|
|
|
|
|
def stop_scheduler():
|
|
"""Stop the scheduler."""
|
|
if scheduler.running:
|
|
scheduler.shutdown()
|
|
logger.info("Scheduler stopped")
|
|
|
|
|
|
async def run_manual_check():
|
|
"""Run domain check manually (for testing or on-demand)."""
|
|
await check_all_domains()
|
|
|
|
|
|
async def run_manual_tld_scrape():
|
|
"""Run TLD price scrape manually (for testing or on-demand)."""
|
|
await scrape_tld_prices()
|
|
|
|
|
|
async def send_domain_availability_alerts(db, domains: list[Domain]):
|
|
"""Send email alerts for newly available domains."""
|
|
if not email_service.is_configured():
|
|
logger.info("Email service not configured, skipping domain alerts")
|
|
return
|
|
|
|
alerts_sent = 0
|
|
|
|
for domain in domains:
|
|
try:
|
|
# Get domain owner
|
|
result = await db.execute(
|
|
select(User).where(User.id == domain.user_id)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user and user.email and domain.notify_on_available:
|
|
# Create registration URL
|
|
register_url = f"https://www.namecheap.com/domains/registration/results/?domain={domain.name}"
|
|
|
|
success = await email_service.send_domain_available(
|
|
to_email=user.email,
|
|
domain=domain.name,
|
|
register_url=register_url,
|
|
)
|
|
if success:
|
|
alerts_sent += 1
|
|
logger.info(f"📧 Alert sent for {domain.name} to {user.email}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send alert for {domain.name}: {e}")
|
|
|
|
logger.info(f"Sent {alerts_sent} domain availability alerts")
|
|
|
|
|
|
async def check_price_changes():
|
|
"""Check for TLD price changes and send alerts."""
|
|
logger.info("Checking for TLD price changes...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
# Detect changes in last 24 hours
|
|
changes = await price_tracker.detect_price_changes(db, hours=24)
|
|
|
|
if changes:
|
|
logger.info(f"Found {len(changes)} significant price changes")
|
|
|
|
# Send alerts (if email is configured)
|
|
alerts_sent = await price_tracker.send_price_alerts(db, changes)
|
|
logger.info(f"Sent {alerts_sent} price change alerts")
|
|
else:
|
|
logger.info("No significant price changes detected")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Price change check failed: {e}")
|
|
|
|
|
|
async def cleanup_expired_auctions():
|
|
"""
|
|
Mark expired auctions as inactive and delete very old ones.
|
|
|
|
This is CRITICAL for data freshness! Without this, the Market page
|
|
would show auctions that ended days ago.
|
|
|
|
Runs every 15 minutes to ensure users always see live data.
|
|
"""
|
|
from app.models.auction import DomainAuction
|
|
from sqlalchemy import update, delete
|
|
|
|
logger.info("Starting expired auction cleanup...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
now = datetime.utcnow()
|
|
|
|
# 1. Mark ended auctions as inactive (<= now to avoid "0m" linger)
|
|
stmt = (
|
|
update(DomainAuction)
|
|
.where(
|
|
and_(
|
|
DomainAuction.end_time <= now,
|
|
DomainAuction.is_active == True
|
|
)
|
|
)
|
|
.values(is_active=False)
|
|
)
|
|
result = await db.execute(stmt)
|
|
marked_inactive = result.rowcount
|
|
|
|
# 2. Delete very old inactive auctions (> 7 days)
|
|
cutoff = now - timedelta(days=7)
|
|
stmt = delete(DomainAuction).where(
|
|
and_(
|
|
DomainAuction.is_active == False,
|
|
DomainAuction.end_time < cutoff
|
|
)
|
|
)
|
|
result = await db.execute(stmt)
|
|
deleted = result.rowcount
|
|
|
|
await db.commit()
|
|
|
|
if marked_inactive > 0 or deleted > 0:
|
|
logger.info(f"Auction cleanup: {marked_inactive} marked inactive, {deleted} deleted")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Auction cleanup failed: {e}")
|
|
|
|
|
|
async def scrape_auctions():
|
|
"""Scheduled task to scrape domain auctions from public sources."""
|
|
from app.services.auction_scraper import auction_scraper
|
|
|
|
logger.info("Starting scheduled auction scrape...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
result = await auction_scraper.scrape_all_platforms(db)
|
|
|
|
logger.info(
|
|
f"Auction scrape completed: "
|
|
f"{result['total_found']} found, {result['total_new']} new, "
|
|
f"{result['total_updated']} updated"
|
|
)
|
|
|
|
if result.get('errors'):
|
|
logger.warning(f"Scrape errors: {result['errors']}")
|
|
|
|
# Match new auctions against Sniper Alerts
|
|
if result['total_new'] > 0:
|
|
await match_sniper_alerts()
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Auction scrape failed: {e}")
|
|
|
|
|
|
async def match_sniper_alerts():
|
|
"""Match active sniper alerts against current auctions and notify users."""
|
|
from app.models.sniper_alert import SniperAlert, SniperAlertMatch
|
|
from app.models.auction import DomainAuction
|
|
|
|
logger.info("Matching sniper alerts against new auctions...")
|
|
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
# Get all active sniper alerts
|
|
alerts_result = await db.execute(
|
|
select(SniperAlert).where(SniperAlert.is_active == True)
|
|
)
|
|
alerts = alerts_result.scalars().all()
|
|
|
|
if not alerts:
|
|
logger.info("No active sniper alerts to match")
|
|
return
|
|
|
|
# Get recent auctions (added in last 2 hours)
|
|
cutoff = datetime.utcnow() - timedelta(hours=2)
|
|
auctions_result = await db.execute(
|
|
select(DomainAuction).where(
|
|
and_(
|
|
DomainAuction.is_active == True,
|
|
DomainAuction.scraped_at >= cutoff,
|
|
)
|
|
)
|
|
)
|
|
auctions = auctions_result.scalars().all()
|
|
|
|
if not auctions:
|
|
logger.info("No recent auctions to match against")
|
|
return
|
|
|
|
matches_created = 0
|
|
notifications_sent = 0
|
|
|
|
for alert in alerts:
|
|
matching_auctions = []
|
|
|
|
for auction in auctions:
|
|
if _auction_matches_alert(auction, alert):
|
|
matching_auctions.append(auction)
|
|
|
|
if matching_auctions:
|
|
for auction in matching_auctions:
|
|
# Check if this match already exists
|
|
existing = await db.execute(
|
|
select(SniperAlertMatch).where(
|
|
and_(
|
|
SniperAlertMatch.alert_id == alert.id,
|
|
SniperAlertMatch.domain == auction.domain,
|
|
)
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
continue
|
|
|
|
# Create new match
|
|
match = SniperAlertMatch(
|
|
alert_id=alert.id,
|
|
domain=auction.domain,
|
|
platform=auction.platform,
|
|
current_bid=auction.current_bid,
|
|
end_time=auction.end_time,
|
|
auction_url=auction.auction_url,
|
|
matched_at=datetime.utcnow(),
|
|
)
|
|
db.add(match)
|
|
matches_created += 1
|
|
|
|
# Update alert last_triggered
|
|
alert.last_triggered = datetime.utcnow()
|
|
|
|
# Send notification if enabled
|
|
if alert.notify_email:
|
|
try:
|
|
user_result = await db.execute(
|
|
select(User).where(User.id == alert.user_id)
|
|
)
|
|
user = user_result.scalar_one_or_none()
|
|
|
|
if user and email_service.is_enabled:
|
|
# Send email with matching domains
|
|
domains_list = ", ".join([a.domain for a in matching_auctions[:5]])
|
|
await email_service.send_email(
|
|
to_email=user.email,
|
|
subject=f"🎯 Sniper Alert: {len(matching_auctions)} matching domains found!",
|
|
html_content=f"""
|
|
<h2>Your Sniper Alert "{alert.name}" matched!</h2>
|
|
<p>We found {len(matching_auctions)} domains matching your criteria:</p>
|
|
<ul>
|
|
{"".join(f"<li><strong>{a.domain}</strong> - ${a.current_bid:.0f} on {a.platform}</li>" for a in matching_auctions[:10])}
|
|
</ul>
|
|
<p><a href="https://pounce.ch/command/alerts">View all matches in your Command Center</a></p>
|
|
"""
|
|
)
|
|
notifications_sent += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to send sniper alert notification: {e}")
|
|
|
|
await db.commit()
|
|
logger.info(f"Sniper alert matching complete: {matches_created} matches created, {notifications_sent} notifications sent")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Sniper alert matching failed: {e}")
|
|
|
|
|
|
def _auction_matches_alert(auction: "DomainAuction", alert: "SniperAlert") -> bool:
|
|
"""Check if an auction matches the criteria of a sniper alert."""
|
|
domain_name = auction.domain.rsplit('.', 1)[0] if '.' in auction.domain else auction.domain
|
|
|
|
# Check keyword filter
|
|
if alert.keyword:
|
|
if alert.keyword.lower() not in domain_name.lower():
|
|
return False
|
|
|
|
# Check TLD filter
|
|
if alert.tlds:
|
|
allowed_tlds = [t.strip().lower() for t in alert.tlds.split(',')]
|
|
if auction.tld.lower() not in allowed_tlds:
|
|
return False
|
|
|
|
# Check length filters
|
|
if alert.min_length and len(domain_name) < alert.min_length:
|
|
return False
|
|
if alert.max_length and len(domain_name) > alert.max_length:
|
|
return False
|
|
|
|
# Check price filters
|
|
if alert.min_price and auction.current_bid < alert.min_price:
|
|
return False
|
|
if alert.max_price and auction.current_bid > alert.max_price:
|
|
return False
|
|
|
|
# Check exclusion filters
|
|
if alert.exclude_numbers:
|
|
if any(c.isdigit() for c in domain_name):
|
|
return False
|
|
|
|
if alert.exclude_hyphens:
|
|
if '-' in domain_name:
|
|
return False
|
|
|
|
if alert.exclude_chars:
|
|
excluded = set(alert.exclude_chars.lower())
|
|
if any(c in excluded for c in domain_name.lower()):
|
|
return False
|
|
|
|
return True
|
|
|