pounce/backend/app/scheduler.py
yves.gugger f0cc69ac95 feat: TLD price scraper, .ch domain fix, DB integration
Major changes:
- Add TLD price scraper with Porkbun API (886+ TLDs, no API key needed)
- Fix .ch domain checker using rdap.nic.ch custom RDAP
- Integrate database for TLD price history tracking
- Add admin endpoints for manual scrape and stats
- Extend scheduler with daily TLD price scrape job (03:00 UTC)
- Update API to use DB data with static fallback
- Update README with complete documentation

New files:
- backend/app/services/tld_scraper/ (scraper package)
- TLD_TRACKING_PLAN.md (implementation plan)

API changes:
- POST /admin/scrape-tld-prices - trigger manual scrape
- GET /admin/tld-prices/stats - database statistics
- GET /tld-prices/overview now uses DB data
2025-12-08 09:12:44 +01:00

163 lines
5.2 KiB
Python

"""Background scheduler for daily domain checks and TLD price scraping."""
import asyncio
import logging
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select
from app.config import get_settings
from app.database import AsyncSessionLocal
from app.models.domain import Domain, DomainCheck
from app.services.domain_checker import domain_checker
logger = logging.getLogger(__name__)
settings = get_settings()
# Global scheduler instance
scheduler = AsyncIOScheduler()
async def scrape_tld_prices():
"""Scheduled task to scrape TLD prices from public sources."""
from app.services.tld_scraper.aggregator import tld_aggregator
logger.info("Starting scheduled TLD price scrape...")
try:
async with AsyncSessionLocal() as db:
result = await tld_aggregator.run_scrape(db)
logger.info(
f"TLD scrape completed: {result.status}, "
f"{result.tlds_scraped} TLDs, {result.prices_saved} prices saved"
)
if result.errors:
logger.warning(f"Scrape errors: {result.errors}")
except Exception as e:
logger.exception(f"TLD price scrape failed: {e}")
async def check_all_domains():
"""Check availability of all monitored domains."""
logger.info("Starting daily domain check...")
start_time = datetime.utcnow()
async with AsyncSessionLocal() as db:
# Get all domains
result = await db.execute(select(Domain))
domains = result.scalars().all()
logger.info(f"Checking {len(domains)} domains...")
checked = 0
errors = 0
newly_available = []
for domain in domains:
try:
# Check domain availability
check_result = await domain_checker.check_domain(domain.name)
# Track if domain became available
was_taken = not domain.is_available
is_now_available = check_result.is_available
if was_taken and is_now_available and domain.notify_on_available:
newly_available.append(domain)
# Update domain
domain.status = check_result.status
domain.is_available = check_result.is_available
domain.registrar = check_result.registrar
domain.expiration_date = check_result.expiration_date
domain.last_checked = datetime.utcnow()
# Create check record
check = DomainCheck(
domain_id=domain.id,
status=check_result.status,
is_available=check_result.is_available,
response_data=str(check_result.to_dict()),
checked_at=datetime.utcnow(),
)
db.add(check)
checked += 1
# Small delay to avoid rate limiting
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error checking domain {domain.name}: {e}")
errors += 1
await db.commit()
elapsed = (datetime.utcnow() - start_time).total_seconds()
logger.info(
f"Domain check complete. Checked: {checked}, Errors: {errors}, "
f"Newly available: {len(newly_available)}, Time: {elapsed:.2f}s"
)
# TODO: Send notifications for newly available domains
if newly_available:
logger.info(f"Domains that became available: {[d.name for d in newly_available]}")
# await send_availability_notifications(newly_available)
def setup_scheduler():
"""Configure and start the scheduler."""
# Daily domain check at configured hour
scheduler.add_job(
check_all_domains,
CronTrigger(hour=settings.check_hour, minute=settings.check_minute),
id="daily_domain_check",
name="Daily Domain Availability Check",
replace_existing=True,
)
# Daily TLD price scrape at 03:00 UTC
scheduler.add_job(
scrape_tld_prices,
CronTrigger(hour=3, minute=0),
id="daily_tld_scrape",
name="Daily TLD Price Scrape",
replace_existing=True,
)
logger.info(
f"Scheduler configured:"
f"\n - Domain check at {settings.check_hour:02d}:{settings.check_minute:02d}"
f"\n - TLD price scrape at 03:00 UTC"
)
def start_scheduler():
"""Start the scheduler if not already running."""
if not scheduler.running:
setup_scheduler()
scheduler.start()
logger.info("Scheduler started")
def stop_scheduler():
"""Stop the scheduler."""
if scheduler.running:
scheduler.shutdown()
logger.info("Scheduler stopped")
async def run_manual_check():
"""Run domain check manually (for testing or on-demand)."""
await check_all_domains()
async def run_manual_tld_scrape():
"""Run TLD price scrape manually (for testing or on-demand)."""
await scrape_tld_prices()