pounce/backend/app/services/price_tracker.py
yves.gugger fb080375a0 feat: add email notifications and price change alerts
New features:
- Email service for domain availability alerts
- Price tracker for detecting significant price changes (>5%)
- Automated email notifications for:
  - Domain becomes available
  - TLD price changes
  - Weekly digest summaries
- New scheduler job for price change alerts (04:00 UTC)

Updated documentation:
- README: email notifications section, new services
- DEPLOYMENT: email setup, troubleshooting, scheduler jobs

New files:
- backend/app/services/email_service.py
- backend/app/services/price_tracker.py
2025-12-08 09:27:03 +01:00

255 lines
8.0 KiB
Python

"""TLD Price change tracking and alerting service."""
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.tld_price import TLDPrice
from app.models.user import User
from app.services.email_service import email_service
logger = logging.getLogger(__name__)
@dataclass
class PriceChange:
"""Represents a TLD price change."""
tld: str
registrar: str
old_price: float
new_price: float
change_amount: float
change_percent: float
detected_at: datetime
@property
def is_significant(self) -> bool:
"""Check if price change is significant (>5%)."""
return abs(self.change_percent) >= 5.0
@property
def direction(self) -> str:
"""Get price change direction."""
if self.change_percent > 0:
return "up"
elif self.change_percent < 0:
return "down"
return "stable"
class PriceTracker:
"""
Tracks TLD price changes and sends alerts.
Features:
- Detect significant price changes (>5%)
- Send email alerts to subscribed users
- Generate price change reports
"""
# Threshold for significant price change (percentage)
SIGNIFICANT_CHANGE_THRESHOLD = 5.0
async def detect_price_changes(
self,
db: AsyncSession,
hours: int = 24,
) -> list[PriceChange]:
"""
Detect price changes in the last N hours.
Args:
db: Database session
hours: Look back period in hours
Returns:
List of significant price changes
"""
changes = []
now = datetime.utcnow()
cutoff = now - timedelta(hours=hours)
# Get unique TLD/registrar combinations
tld_registrars = await db.execute(
select(TLDPrice.tld, TLDPrice.registrar)
.distinct()
)
for tld, registrar in tld_registrars:
# Get the two most recent prices for this TLD/registrar
result = await db.execute(
select(TLDPrice)
.where(
and_(
TLDPrice.tld == tld,
TLDPrice.registrar == registrar,
)
)
.order_by(TLDPrice.recorded_at.desc())
.limit(2)
)
prices = result.scalars().all()
if len(prices) < 2:
continue
new_price = prices[0]
old_price = prices[1]
# Check if change is within our time window
if new_price.recorded_at < cutoff:
continue
# Calculate change
if old_price.registration_price == 0:
continue
change_amount = new_price.registration_price - old_price.registration_price
change_percent = (change_amount / old_price.registration_price) * 100
# Only track significant changes
if abs(change_percent) >= self.SIGNIFICANT_CHANGE_THRESHOLD:
changes.append(PriceChange(
tld=tld,
registrar=registrar,
old_price=old_price.registration_price,
new_price=new_price.registration_price,
change_amount=change_amount,
change_percent=change_percent,
detected_at=new_price.recorded_at,
))
# Sort by absolute change percentage (most significant first)
changes.sort(key=lambda x: abs(x.change_percent), reverse=True)
logger.info(f"Detected {len(changes)} significant price changes in last {hours}h")
return changes
async def send_price_alerts(
self,
db: AsyncSession,
changes: list[PriceChange],
min_tier: str = "professional",
) -> int:
"""
Send price change alerts to subscribed users.
Args:
db: Database session
changes: List of price changes to alert about
min_tier: Minimum subscription tier to receive alerts
Returns:
Number of alerts sent
"""
if not changes:
return 0
if not email_service.is_enabled:
logger.warning("Email service not configured, skipping price alerts")
return 0
# Get users with appropriate subscription
# For now, send to all active users (can filter by tier later)
result = await db.execute(
select(User).where(User.is_active == True)
)
users = result.scalars().all()
alerts_sent = 0
for change in changes[:10]: # Limit to top 10 changes
for user in users:
try:
success = await email_service.send_price_change_alert(
to_email=user.email,
tld=change.tld,
old_price=change.old_price,
new_price=change.new_price,
change_percent=change.change_percent,
registrar=change.registrar,
)
if success:
alerts_sent += 1
except Exception as e:
logger.error(f"Failed to send alert to {user.email}: {e}")
logger.info(f"Sent {alerts_sent} price change alerts")
return alerts_sent
async def get_trending_changes(
self,
db: AsyncSession,
days: int = 7,
limit: int = 10,
) -> list[dict]:
"""
Get trending price changes for dashboard display.
Args:
db: Database session
days: Look back period
limit: Max results to return
Returns:
List of trending price changes
"""
changes = await self.detect_price_changes(db, hours=days * 24)
return [
{
"tld": c.tld,
"registrar": c.registrar,
"old_price": c.old_price,
"new_price": c.new_price,
"change_percent": round(c.change_percent, 2),
"direction": c.direction,
"detected_at": c.detected_at.isoformat(),
}
for c in changes[:limit]
]
async def generate_price_report(
self,
db: AsyncSession,
days: int = 30,
) -> dict:
"""Generate a comprehensive price change report."""
changes = await self.detect_price_changes(db, hours=days * 24)
increases = [c for c in changes if c.direction == "up"]
decreases = [c for c in changes if c.direction == "down"]
return {
"period_days": days,
"total_changes": len(changes),
"price_increases": len(increases),
"price_decreases": len(decreases),
"avg_increase": round(sum(c.change_percent for c in increases) / len(increases), 2) if increases else 0,
"avg_decrease": round(sum(c.change_percent for c in decreases) / len(decreases), 2) if decreases else 0,
"biggest_increase": {
"tld": increases[0].tld,
"change": round(increases[0].change_percent, 2),
} if increases else None,
"biggest_decrease": {
"tld": decreases[0].tld,
"change": round(decreases[0].change_percent, 2),
} if decreases else None,
"top_changes": [
{
"tld": c.tld,
"change_percent": round(c.change_percent, 2),
"new_price": c.new_price,
}
for c in changes[:10]
],
}
# Singleton instance
price_tracker = PriceTracker()