"""TLD Price Aggregator - combines multiple scrapers and saves to database.""" import logging from datetime import datetime from dataclasses import dataclass, field from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.tld_price import TLDPrice, TLDInfo from app.services.tld_scraper.base import TLDPriceData, ScraperError from app.services.tld_scraper.porkbun import PorkbunScraper from app.services.tld_scraper.namecheap import NamecheapScraper from app.services.tld_scraper.cloudflare import CloudflareScraper from app.services.tld_scraper.godaddy import GoDaddyScraper from app.services.tld_scraper.dynadot import DynadotScraper logger = logging.getLogger(__name__) @dataclass class ScrapeResult: """Result of a scraping run.""" started_at: datetime = field(default_factory=datetime.utcnow) completed_at: datetime | None = None status: str = "running" # running, success, partial, failed sources_attempted: int = 0 sources_succeeded: int = 0 tlds_scraped: int = 0 prices_saved: int = 0 errors: list[str] = field(default_factory=list) def to_dict(self) -> dict: return { "started_at": self.started_at.isoformat(), "completed_at": self.completed_at.isoformat() if self.completed_at else None, "status": self.status, "sources_attempted": self.sources_attempted, "sources_succeeded": self.sources_succeeded, "tlds_scraped": self.tlds_scraped, "prices_saved": self.prices_saved, "errors": self.errors, } class TLDPriceAggregator: """ Aggregates TLD prices from multiple sources and saves to database. Primary source: Porkbun API (most reliable, 896+ TLDs) Future sources: Can add more scrapers as backup """ def __init__(self): """Initialize the aggregator with available scrapers. Scraper priority: 1. Porkbun (API) - Most TLDs, official API 2. GoDaddy (static) - Largest registrar, promo pricing detection 3. Namecheap (static) - Popular alternative 4. Cloudflare (static) - At-cost baseline 5. Dynadot (static) - Competitive pricing reference """ self.scrapers = [ PorkbunScraper(), # Primary: 896+ TLDs via official API GoDaddyScraper(), # Largest registrar, good for promo detection NamecheapScraper(), # Popular TLDs + budget options CloudflareScraper(), # At-cost (wholesale) baseline DynadotScraper(), # Competitive pricing, 80+ TLDs ] async def run_scrape(self, db: AsyncSession) -> ScrapeResult: """ Run a full scrape from all sources and save to database. Args: db: Database session Returns: ScrapeResult with statistics """ result = ScrapeResult() all_prices: dict[str, TLDPriceData] = {} # Scrape from all sources for scraper in self.scrapers: result.sources_attempted += 1 try: logger.info(f"Scraping from {scraper.name}...") prices = await scraper.scrape() if prices: result.sources_succeeded += 1 # Store prices (later sources can override earlier ones) for price in prices: key = f"{price.tld}_{price.registrar}" all_prices[key] = price logger.info(f"Got {len(prices)} prices from {scraper.name}") else: result.errors.append(f"{scraper.name}: No data returned") except ScraperError as e: error_msg = f"{scraper.name}: {str(e)}" result.errors.append(error_msg) logger.error(error_msg) except Exception as e: error_msg = f"{scraper.name}: Unexpected error - {str(e)}" result.errors.append(error_msg) logger.exception(error_msg) # Save to database if all_prices: result.tlds_scraped = len(set(p.tld for p in all_prices.values())) result.prices_saved = await self._save_prices(db, list(all_prices.values())) # Finalize result result.completed_at = datetime.utcnow() if result.sources_succeeded == result.sources_attempted: result.status = "success" elif result.sources_succeeded > 0: result.status = "partial" else: result.status = "failed" logger.info( f"Scrape completed: {result.status}, " f"{result.tlds_scraped} TLDs, " f"{result.prices_saved} prices saved" ) return result async def _save_prices(self, db: AsyncSession, prices: list[TLDPriceData]) -> int: """ Save scraped prices to database. Args: db: Database session prices: List of TLDPriceData to save Returns: Number of prices saved """ saved_count = 0 # Track TLDs we've already ensured exist (to avoid duplicate inserts) ensured_tlds: set[str] = set() for price_data in prices: try: # Create new price record (for historical tracking) price_record = TLDPrice( tld=price_data.tld, registrar=price_data.registrar, registration_price=price_data.registration_price, renewal_price=price_data.renewal_price, transfer_price=price_data.transfer_price, currency=price_data.currency, promo_price=price_data.promo_price, recorded_at=price_data.scraped_at, ) db.add(price_record) saved_count += 1 # Also update/create TLDInfo if it doesn't exist (only once per TLD) if price_data.tld not in ensured_tlds: await self._ensure_tld_info(db, price_data.tld) ensured_tlds.add(price_data.tld) except Exception as e: logger.warning(f"Error saving price for {price_data.tld}: {e}") continue await db.commit() return saved_count async def _ensure_tld_info(self, db: AsyncSession, tld: str): """Ensure TLDInfo record exists for this TLD.""" try: result = await db.execute( select(TLDInfo).where(TLDInfo.tld == tld) ) existing = result.scalar_one_or_none() if not existing: # Create basic TLDInfo record tld_type = self._guess_tld_type(tld) info = TLDInfo( tld=tld, type=tld_type, ) db.add(info) await db.flush() # Flush immediately to catch duplicates except Exception as e: # Ignore duplicate key errors - TLD already exists logger.debug(f"TLDInfo for {tld} already exists or error: {e}") def _guess_tld_type(self, tld: str) -> str: """Guess TLD type based on length and pattern.""" # Country codes are typically 2 characters if len(tld) == 2: return "ccTLD" # Common generic TLDs generic = {"com", "net", "org", "info", "biz", "name", "pro"} if tld in generic: return "generic" # New gTLDs return "gTLD" async def get_latest_prices(self, db: AsyncSession, tld: str | None = None) -> list[dict]: """ Get latest prices from database. Args: db: Database session tld: Optional TLD to filter by Returns: List of price dictionaries """ from sqlalchemy import func, desc # Subquery to get latest record per TLD/registrar combination subq = ( select( TLDPrice.tld, TLDPrice.registrar, func.max(TLDPrice.recorded_at).label("max_date") ) .group_by(TLDPrice.tld, TLDPrice.registrar) .subquery() ) query = ( select(TLDPrice) .join( subq, (TLDPrice.tld == subq.c.tld) & (TLDPrice.registrar == subq.c.registrar) & (TLDPrice.recorded_at == subq.c.max_date) ) ) if tld: query = query.where(TLDPrice.tld == tld.lower().lstrip(".")) result = await db.execute(query.order_by(TLDPrice.tld)) prices = result.scalars().all() return [ { "tld": p.tld, "registrar": p.registrar, "registration_price": p.registration_price, "renewal_price": p.renewal_price, "transfer_price": p.transfer_price, "currency": p.currency, "promo_price": p.promo_price, "recorded_at": p.recorded_at.isoformat() if p.recorded_at else None, } for p in prices ] async def get_price_history( self, db: AsyncSession, tld: str, days: int = 365 ) -> list[dict]: """ Get price history for a TLD. Args: db: Database session tld: TLD to get history for days: Number of days of history Returns: List of historical price records """ from datetime import timedelta from sqlalchemy import desc cutoff = datetime.utcnow() - timedelta(days=days) result = await db.execute( select(TLDPrice) .where(TLDPrice.tld == tld.lower().lstrip(".")) .where(TLDPrice.recorded_at >= cutoff) .order_by(desc(TLDPrice.recorded_at)) ) prices = result.scalars().all() return [ { "tld": p.tld, "registrar": p.registrar, "registration_price": p.registration_price, "renewal_price": p.renewal_price, "recorded_at": p.recorded_at.isoformat() if p.recorded_at else None, } for p in prices ] # Singleton instance tld_aggregator = TLDPriceAggregator()