"""TLD Price change tracking and alerting service.""" import logging from datetime import datetime, timedelta from dataclasses import dataclass from typing import Optional from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.models.tld_price import TLDPrice from app.models.user import User from app.services.email_service import email_service logger = logging.getLogger(__name__) @dataclass class PriceChange: """Represents a TLD price change.""" tld: str registrar: str old_price: float new_price: float change_amount: float change_percent: float detected_at: datetime @property def is_significant(self) -> bool: """Check if price change is significant (>5%).""" return abs(self.change_percent) >= 5.0 @property def direction(self) -> str: """Get price change direction.""" if self.change_percent > 0: return "up" elif self.change_percent < 0: return "down" return "stable" class PriceTracker: """ Tracks TLD price changes and sends alerts. Features: - Detect significant price changes (>5%) - Send email alerts to subscribed users - Generate price change reports """ # Threshold for significant price change (percentage) SIGNIFICANT_CHANGE_THRESHOLD = 5.0 async def detect_price_changes( self, db: AsyncSession, hours: int = 24, ) -> list[PriceChange]: """ Detect price changes in the last N hours. Args: db: Database session hours: Look back period in hours Returns: List of significant price changes """ changes = [] now = datetime.utcnow() cutoff = now - timedelta(hours=hours) # Get unique TLD/registrar combinations tld_registrars = await db.execute( select(TLDPrice.tld, TLDPrice.registrar) .distinct() ) for tld, registrar in tld_registrars: # Get the two most recent prices for this TLD/registrar result = await db.execute( select(TLDPrice) .where( and_( TLDPrice.tld == tld, TLDPrice.registrar == registrar, ) ) .order_by(TLDPrice.recorded_at.desc()) .limit(2) ) prices = result.scalars().all() if len(prices) < 2: continue new_price = prices[0] old_price = prices[1] # Check if change is within our time window if new_price.recorded_at < cutoff: continue # Calculate change if old_price.registration_price == 0: continue change_amount = new_price.registration_price - old_price.registration_price change_percent = (change_amount / old_price.registration_price) * 100 # Only track significant changes if abs(change_percent) >= self.SIGNIFICANT_CHANGE_THRESHOLD: changes.append(PriceChange( tld=tld, registrar=registrar, old_price=old_price.registration_price, new_price=new_price.registration_price, change_amount=change_amount, change_percent=change_percent, detected_at=new_price.recorded_at, )) # Sort by absolute change percentage (most significant first) changes.sort(key=lambda x: abs(x.change_percent), reverse=True) logger.info(f"Detected {len(changes)} significant price changes in last {hours}h") return changes async def send_price_alerts( self, db: AsyncSession, changes: list[PriceChange], min_tier: str = "professional", ) -> int: """ Send price change alerts to subscribed users. Args: db: Database session changes: List of price changes to alert about min_tier: Minimum subscription tier to receive alerts Returns: Number of alerts sent """ if not changes: return 0 if not email_service.is_enabled: logger.warning("Email service not configured, skipping price alerts") return 0 # Get users with appropriate subscription # For now, send to all active users (can filter by tier later) result = await db.execute( select(User).where(User.is_active == True) ) users = result.scalars().all() alerts_sent = 0 for change in changes[:10]: # Limit to top 10 changes for user in users: try: success = await email_service.send_price_change_alert( to_email=user.email, tld=change.tld, old_price=change.old_price, new_price=change.new_price, change_percent=change.change_percent, registrar=change.registrar, ) if success: alerts_sent += 1 except Exception as e: logger.error(f"Failed to send alert to {user.email}: {e}") logger.info(f"Sent {alerts_sent} price change alerts") return alerts_sent async def get_trending_changes( self, db: AsyncSession, days: int = 7, limit: int = 10, ) -> list[dict]: """ Get trending price changes for dashboard display. Args: db: Database session days: Look back period limit: Max results to return Returns: List of trending price changes """ changes = await self.detect_price_changes(db, hours=days * 24) return [ { "tld": c.tld, "registrar": c.registrar, "old_price": c.old_price, "new_price": c.new_price, "change_percent": round(c.change_percent, 2), "direction": c.direction, "detected_at": c.detected_at.isoformat(), } for c in changes[:limit] ] async def generate_price_report( self, db: AsyncSession, days: int = 30, ) -> dict: """Generate a comprehensive price change report.""" changes = await self.detect_price_changes(db, hours=days * 24) increases = [c for c in changes if c.direction == "up"] decreases = [c for c in changes if c.direction == "down"] return { "period_days": days, "total_changes": len(changes), "price_increases": len(increases), "price_decreases": len(decreases), "avg_increase": round(sum(c.change_percent for c in increases) / len(increases), 2) if increases else 0, "avg_decrease": round(sum(c.change_percent for c in decreases) / len(decreases), 2) if decreases else 0, "biggest_increase": { "tld": increases[0].tld, "change": round(increases[0].change_percent, 2), } if increases else None, "biggest_decrease": { "tld": decreases[0].tld, "change": round(decreases[0].change_percent, 2), } if decreases else None, "top_changes": [ { "tld": c.tld, "change_percent": round(c.change_percent, 2), "new_price": c.new_price, } for c in changes[:10] ], } # Singleton instance price_tracker = PriceTracker()