pounce/backend/app/services/price_tracker.py
yves.gugger 76a118ddbf
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: implement Yield/Intent Routing feature (pounce_endgame)
Backend:
- Add YieldDomain, YieldTransaction, YieldPayout, AffiliatePartner models
- Create IntentDetector service for keyword-based intent classification
- Implement /api/v1/yield/* endpoints (dashboard, domains, transactions, partners)
- Support domain activation, DNS verification, and revenue tracking

Frontend:
- Add /terminal/yield page with dashboard and activate wizard
- Add YIELD to sidebar navigation under 'Monetize' section
- Add 4th pillar 'Yield' to landing page 'Beyond Hunting' section
- Extend API client with yield endpoints and types

Features:
- AI-powered intent detection (medical, finance, legal, realestate, etc.)
- Swiss/German geo-targeting with city recognition
- Revenue estimation based on intent category and geo
- DNS verification via nameservers or CNAME
- 70/30 revenue split tracking
2025-12-12 14:39:56 +01:00

269 lines
8.6 KiB
Python

"""TLD Price change tracking and alerting service."""
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.tld_price import TLDPrice
from app.models.user import User
from app.services.email_service import email_service
logger = logging.getLogger(__name__)
@dataclass
class PriceChange:
"""Represents a TLD price change."""
tld: str
registrar: str
old_price: float
new_price: float
change_amount: float
change_percent: float
detected_at: datetime
@property
def is_significant(self) -> bool:
"""Check if price change is significant (>5%)."""
return abs(self.change_percent) >= 5.0
@property
def direction(self) -> str:
"""Get price change direction."""
if self.change_percent > 0:
return "up"
elif self.change_percent < 0:
return "down"
return "stable"
class PriceTracker:
"""
Tracks TLD price changes and sends alerts.
Features:
- Detect significant price changes (>5%)
- Send email alerts to subscribed users
- Generate price change reports
"""
# Threshold for significant price change (percentage)
SIGNIFICANT_CHANGE_THRESHOLD = 5.0
async def detect_price_changes(
self,
db: AsyncSession,
hours: int = 24,
) -> list[PriceChange]:
"""
Detect price changes in the last N hours.
Args:
db: Database session
hours: Look back period in hours
Returns:
List of significant price changes
"""
changes: list[PriceChange] = []
now = datetime.utcnow()
cutoff = now - timedelta(hours=hours)
# PERF: Avoid N+1 queries (distinct(tld, registrar) + per-pair LIMIT 2).
# We fetch the latest 2 rows per (tld, registrar) using a window function.
ranked = (
select(
TLDPrice.tld.label("tld"),
TLDPrice.registrar.label("registrar"),
TLDPrice.registration_price.label("price"),
TLDPrice.recorded_at.label("recorded_at"),
func.row_number()
.over(
partition_by=(TLDPrice.tld, TLDPrice.registrar),
order_by=TLDPrice.recorded_at.desc(),
)
.label("rn"),
)
).subquery()
rows = (
await db.execute(
select(
ranked.c.tld,
ranked.c.registrar,
ranked.c.price,
ranked.c.recorded_at,
ranked.c.rn,
)
.where(ranked.c.rn <= 2)
.order_by(ranked.c.tld, ranked.c.registrar, ranked.c.rn)
)
).all()
from itertools import groupby
for (tld, registrar), grp in groupby(rows, key=lambda r: (r.tld, r.registrar)):
pair = list(grp)
if len(pair) < 2:
continue
newest = pair[0] if pair[0].rn == 1 else pair[1]
previous = pair[1] if pair[0].rn == 1 else pair[0]
# Only consider if the newest price is within the requested window
if newest.recorded_at is None or newest.recorded_at < cutoff:
continue
if not previous.price or previous.price == 0:
continue
change_amount = float(newest.price) - float(previous.price)
change_percent = (change_amount / float(previous.price)) * 100
if abs(change_percent) >= self.SIGNIFICANT_CHANGE_THRESHOLD:
changes.append(
PriceChange(
tld=tld,
registrar=registrar,
old_price=float(previous.price),
new_price=float(newest.price),
change_amount=change_amount,
change_percent=change_percent,
detected_at=newest.recorded_at,
)
)
# Sort by absolute change percentage (most significant first)
changes.sort(key=lambda x: abs(x.change_percent), reverse=True)
logger.info(f"Detected {len(changes)} significant price changes in last {hours}h")
return changes
async def send_price_alerts(
self,
db: AsyncSession,
changes: list[PriceChange],
min_tier: str = "professional",
) -> int:
"""
Send price change alerts to subscribed users.
Args:
db: Database session
changes: List of price changes to alert about
min_tier: Minimum subscription tier to receive alerts
Returns:
Number of alerts sent
"""
if not changes:
return 0
if not email_service.is_enabled:
logger.warning("Email service not configured, skipping price alerts")
return 0
# Get users with appropriate subscription
# For now, send to all active users (can filter by tier later)
result = await db.execute(
select(User).where(User.is_active == True)
)
users = result.scalars().all()
alerts_sent = 0
for change in changes[:10]: # Limit to top 10 changes
for user in users:
try:
success = await email_service.send_price_change_alert(
to_email=user.email,
tld=change.tld,
old_price=change.old_price,
new_price=change.new_price,
change_percent=change.change_percent,
registrar=change.registrar,
)
if success:
alerts_sent += 1
except Exception as e:
logger.error(f"Failed to send alert to {user.email}: {e}")
logger.info(f"Sent {alerts_sent} price change alerts")
return alerts_sent
async def get_trending_changes(
self,
db: AsyncSession,
days: int = 7,
limit: int = 10,
) -> list[dict]:
"""
Get trending price changes for dashboard display.
Args:
db: Database session
days: Look back period
limit: Max results to return
Returns:
List of trending price changes
"""
changes = await self.detect_price_changes(db, hours=days * 24)
return [
{
"tld": c.tld,
"registrar": c.registrar,
"old_price": c.old_price,
"new_price": c.new_price,
"change_percent": round(c.change_percent, 2),
"direction": c.direction,
"detected_at": c.detected_at.isoformat(),
}
for c in changes[:limit]
]
async def generate_price_report(
self,
db: AsyncSession,
days: int = 30,
) -> dict:
"""Generate a comprehensive price change report."""
changes = await self.detect_price_changes(db, hours=days * 24)
increases = [c for c in changes if c.direction == "up"]
decreases = [c for c in changes if c.direction == "down"]
return {
"period_days": days,
"total_changes": len(changes),
"price_increases": len(increases),
"price_decreases": len(decreases),
"avg_increase": round(sum(c.change_percent for c in increases) / len(increases), 2) if increases else 0,
"avg_decrease": round(sum(c.change_percent for c in decreases) / len(decreases), 2) if decreases else 0,
"biggest_increase": {
"tld": increases[0].tld,
"change": round(increases[0].change_percent, 2),
} if increases else None,
"biggest_decrease": {
"tld": decreases[0].tld,
"change": round(decreases[0].change_percent, 2),
} if decreases else None,
"top_changes": [
{
"tld": c.tld,
"change_percent": round(c.change_percent, 2),
"new_price": c.new_price,
}
for c in changes[:10]
],
}
# Singleton instance
price_tracker = PriceTracker()