yves.gugger f0cc69ac95 feat: TLD price scraper, .ch domain fix, DB integration
Major changes:
- Add TLD price scraper with Porkbun API (886+ TLDs, no API key needed)
- Fix .ch domain checker using rdap.nic.ch custom RDAP
- Integrate database for TLD price history tracking
- Add admin endpoints for manual scrape and stats
- Extend scheduler with daily TLD price scrape job (03:00 UTC)
- Update API to use DB data with static fallback
- Update README with complete documentation

New files:
- backend/app/services/tld_scraper/ (scraper package)
- TLD_TRACKING_PLAN.md (implementation plan)

API changes:
- POST /admin/scrape-tld-prices - trigger manual scrape
- GET /admin/tld-prices/stats - database statistics
- GET /tld-prices/overview now uses DB data
2025-12-08 09:12:44 +01:00

289 lines
9.3 KiB
Python

"""TLD Price Aggregator - combines multiple scrapers and saves to database."""
import logging
from datetime import datetime
from dataclasses import dataclass, field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.tld_price import TLDPrice, TLDInfo
from app.services.tld_scraper.base import TLDPriceData, ScraperError
from app.services.tld_scraper.porkbun import PorkbunScraper
logger = logging.getLogger(__name__)
@dataclass
class ScrapeResult:
"""Result of a scraping run."""
started_at: datetime = field(default_factory=datetime.utcnow)
completed_at: datetime | None = None
status: str = "running" # running, success, partial, failed
sources_attempted: int = 0
sources_succeeded: int = 0
tlds_scraped: int = 0
prices_saved: int = 0
errors: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"started_at": self.started_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"status": self.status,
"sources_attempted": self.sources_attempted,
"sources_succeeded": self.sources_succeeded,
"tlds_scraped": self.tlds_scraped,
"prices_saved": self.prices_saved,
"errors": self.errors,
}
class TLDPriceAggregator:
"""
Aggregates TLD prices from multiple sources and saves to database.
Primary source: Porkbun API (most reliable, 896+ TLDs)
Future sources: Can add more scrapers as backup
"""
def __init__(self):
"""Initialize the aggregator with available scrapers."""
self.scrapers = [
PorkbunScraper(),
# Add more scrapers here as they become available
# TLDListScraper(), # Currently blocked
]
async def run_scrape(self, db: AsyncSession) -> ScrapeResult:
"""
Run a full scrape from all sources and save to database.
Args:
db: Database session
Returns:
ScrapeResult with statistics
"""
result = ScrapeResult()
all_prices: dict[str, TLDPriceData] = {}
# Scrape from all sources
for scraper in self.scrapers:
result.sources_attempted += 1
try:
logger.info(f"Scraping from {scraper.name}...")
prices = await scraper.scrape()
if prices:
result.sources_succeeded += 1
# Store prices (later sources can override earlier ones)
for price in prices:
key = f"{price.tld}_{price.registrar}"
all_prices[key] = price
logger.info(f"Got {len(prices)} prices from {scraper.name}")
else:
result.errors.append(f"{scraper.name}: No data returned")
except ScraperError as e:
error_msg = f"{scraper.name}: {str(e)}"
result.errors.append(error_msg)
logger.error(error_msg)
except Exception as e:
error_msg = f"{scraper.name}: Unexpected error - {str(e)}"
result.errors.append(error_msg)
logger.exception(error_msg)
# Save to database
if all_prices:
result.tlds_scraped = len(set(p.tld for p in all_prices.values()))
result.prices_saved = await self._save_prices(db, list(all_prices.values()))
# Finalize result
result.completed_at = datetime.utcnow()
if result.sources_succeeded == result.sources_attempted:
result.status = "success"
elif result.sources_succeeded > 0:
result.status = "partial"
else:
result.status = "failed"
logger.info(
f"Scrape completed: {result.status}, "
f"{result.tlds_scraped} TLDs, "
f"{result.prices_saved} prices saved"
)
return result
async def _save_prices(self, db: AsyncSession, prices: list[TLDPriceData]) -> int:
"""
Save scraped prices to database.
Args:
db: Database session
prices: List of TLDPriceData to save
Returns:
Number of prices saved
"""
saved_count = 0
for price_data in prices:
try:
# Create new price record (for historical tracking)
price_record = TLDPrice(
tld=price_data.tld,
registrar=price_data.registrar,
registration_price=price_data.registration_price,
renewal_price=price_data.renewal_price,
transfer_price=price_data.transfer_price,
currency=price_data.currency,
promo_price=price_data.promo_price,
recorded_at=price_data.scraped_at,
)
db.add(price_record)
saved_count += 1
# Also update/create TLDInfo if it doesn't exist
await self._ensure_tld_info(db, price_data.tld)
except Exception as e:
logger.warning(f"Error saving price for {price_data.tld}: {e}")
continue
await db.commit()
return saved_count
async def _ensure_tld_info(self, db: AsyncSession, tld: str):
"""Ensure TLDInfo record exists for this TLD."""
result = await db.execute(
select(TLDInfo).where(TLDInfo.tld == tld)
)
existing = result.scalar_one_or_none()
if not existing:
# Create basic TLDInfo record
tld_type = self._guess_tld_type(tld)
info = TLDInfo(
tld=tld,
type=tld_type,
)
db.add(info)
def _guess_tld_type(self, tld: str) -> str:
"""Guess TLD type based on length and pattern."""
# Country codes are typically 2 characters
if len(tld) == 2:
return "ccTLD"
# Common generic TLDs
generic = {"com", "net", "org", "info", "biz", "name", "pro"}
if tld in generic:
return "generic"
# New gTLDs
return "gTLD"
async def get_latest_prices(self, db: AsyncSession, tld: str | None = None) -> list[dict]:
"""
Get latest prices from database.
Args:
db: Database session
tld: Optional TLD to filter by
Returns:
List of price dictionaries
"""
from sqlalchemy import func, desc
# Subquery to get latest record per TLD/registrar combination
subq = (
select(
TLDPrice.tld,
TLDPrice.registrar,
func.max(TLDPrice.recorded_at).label("max_date")
)
.group_by(TLDPrice.tld, TLDPrice.registrar)
.subquery()
)
query = (
select(TLDPrice)
.join(
subq,
(TLDPrice.tld == subq.c.tld) &
(TLDPrice.registrar == subq.c.registrar) &
(TLDPrice.recorded_at == subq.c.max_date)
)
)
if tld:
query = query.where(TLDPrice.tld == tld.lower().lstrip("."))
result = await db.execute(query.order_by(TLDPrice.tld))
prices = result.scalars().all()
return [
{
"tld": p.tld,
"registrar": p.registrar,
"registration_price": p.registration_price,
"renewal_price": p.renewal_price,
"transfer_price": p.transfer_price,
"currency": p.currency,
"promo_price": p.promo_price,
"recorded_at": p.recorded_at.isoformat() if p.recorded_at else None,
}
for p in prices
]
async def get_price_history(
self,
db: AsyncSession,
tld: str,
days: int = 365
) -> list[dict]:
"""
Get price history for a TLD.
Args:
db: Database session
tld: TLD to get history for
days: Number of days of history
Returns:
List of historical price records
"""
from datetime import timedelta
from sqlalchemy import desc
cutoff = datetime.utcnow() - timedelta(days=days)
result = await db.execute(
select(TLDPrice)
.where(TLDPrice.tld == tld.lower().lstrip("."))
.where(TLDPrice.recorded_at >= cutoff)
.order_by(desc(TLDPrice.recorded_at))
)
prices = result.scalars().all()
return [
{
"tld": p.tld,
"registrar": p.registrar,
"registration_price": p.registration_price,
"renewal_price": p.renewal_price,
"recorded_at": p.recorded_at.isoformat() if p.recorded_at else None,
}
for p in prices
]
# Singleton instance
tld_aggregator = TLDPriceAggregator()