pounce/backend/app/api/sniper_alerts.py
Yves Gugger 891d17362e
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Major tier overhaul: Scout gets Portfolio+Listing, new limits, Yield live
2025-12-17 09:16:34 +01:00

459 lines
14 KiB
Python

"""
Sniper Alerts API - Hyper-personalized auction notifications
This implements "Strategie 4: Alerts nach Maß" from analysis_3.md:
"Der User kann extrem spezifische Filter speichern:
- Informiere mich NUR, wenn eine 4-Letter .com Domain droppt, die kein 'q' oder 'x' enthält."
Endpoints:
- GET /sniper-alerts - Get user's alerts
- POST /sniper-alerts - Create new alert
- PUT /sniper-alerts/{id} - Update alert
- DELETE /sniper-alerts/{id} - Delete alert
- GET /sniper-alerts/{id}/matches - Get matched auctions
- POST /sniper-alerts/{id}/test - Test alert against current auctions
"""
import logging
from datetime import datetime
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user
from app.models.user import User
from app.models.sniper_alert import SniperAlert, SniperAlertMatch
from app.models.auction import DomainAuction
logger = logging.getLogger(__name__)
router = APIRouter()
# ============== Schemas ==============
class SniperAlertCreate(BaseModel):
"""Create a new sniper alert."""
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=500)
# Filter criteria
tlds: Optional[str] = Field(None, description="Comma-separated TLDs: com,io,ai")
keywords: Optional[str] = Field(None, description="Must contain (comma-separated)")
exclude_keywords: Optional[str] = Field(None, description="Must not contain")
max_length: Optional[int] = Field(None, ge=1, le=63)
min_length: Optional[int] = Field(None, ge=1, le=63)
max_price: Optional[float] = Field(None, ge=0)
min_price: Optional[float] = Field(None, ge=0)
max_bids: Optional[int] = Field(None, ge=0, description="Max bids (low competition)")
ending_within_hours: Optional[int] = Field(None, ge=1, le=168)
platforms: Optional[str] = Field(None, description="Comma-separated platforms")
# Advanced
no_numbers: bool = False
no_hyphens: bool = False
exclude_chars: Optional[str] = Field(None, description="Chars to exclude: q,x,z")
# Notifications
notify_email: bool = True
notify_sms: bool = False
class SniperAlertUpdate(BaseModel):
"""Update a sniper alert."""
name: Optional[str] = Field(None, max_length=100)
description: Optional[str] = Field(None, max_length=500)
tlds: Optional[str] = None
keywords: Optional[str] = None
exclude_keywords: Optional[str] = None
max_length: Optional[int] = Field(None, ge=1, le=63)
min_length: Optional[int] = Field(None, ge=1, le=63)
max_price: Optional[float] = Field(None, ge=0)
min_price: Optional[float] = Field(None, ge=0)
max_bids: Optional[int] = Field(None, ge=0)
ending_within_hours: Optional[int] = Field(None, ge=1, le=168)
platforms: Optional[str] = None
no_numbers: Optional[bool] = None
no_hyphens: Optional[bool] = None
exclude_chars: Optional[str] = None
notify_email: Optional[bool] = None
notify_sms: Optional[bool] = None
is_active: Optional[bool] = None
class SniperAlertResponse(BaseModel):
"""Sniper alert response."""
id: int
name: str
description: Optional[str]
tlds: Optional[str]
keywords: Optional[str]
exclude_keywords: Optional[str]
max_length: Optional[int]
min_length: Optional[int]
max_price: Optional[float]
min_price: Optional[float]
max_bids: Optional[int]
ending_within_hours: Optional[int]
platforms: Optional[str]
no_numbers: bool
no_hyphens: bool
exclude_chars: Optional[str]
notify_email: bool
notify_sms: bool
is_active: bool
matches_count: int
notifications_sent: int
last_matched_at: Optional[datetime]
created_at: datetime
class Config:
from_attributes = True
class MatchResponse(BaseModel):
"""Alert match response."""
id: int
domain: str
platform: str
current_bid: float
end_time: datetime
auction_url: Optional[str]
matched_at: datetime
notified: bool
class Config:
from_attributes = True
# ============== Endpoints ==============
@router.get("", response_model=List[SniperAlertResponse])
async def get_sniper_alerts(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get user's sniper alerts."""
result = await db.execute(
select(SniperAlert)
.where(SniperAlert.user_id == current_user.id)
.order_by(SniperAlert.created_at.desc())
)
alerts = list(result.scalars().all())
return [
SniperAlertResponse(
id=alert.id,
name=alert.name,
description=alert.description,
tlds=alert.tlds,
keywords=alert.keywords,
exclude_keywords=alert.exclude_keywords,
max_length=alert.max_length,
min_length=alert.min_length,
max_price=alert.max_price,
min_price=alert.min_price,
max_bids=alert.max_bids,
ending_within_hours=alert.ending_within_hours,
platforms=alert.platforms,
no_numbers=alert.no_numbers,
no_hyphens=alert.no_hyphens,
exclude_chars=alert.exclude_chars,
notify_email=alert.notify_email,
notify_sms=alert.notify_sms,
is_active=alert.is_active,
matches_count=alert.matches_count,
notifications_sent=alert.notifications_sent,
last_matched_at=alert.last_matched_at,
created_at=alert.created_at,
)
for alert in alerts
]
@router.post("", response_model=SniperAlertResponse)
async def create_sniper_alert(
data: SniperAlertCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new sniper alert."""
# Check alert limit based on subscription
user_alerts = await db.execute(
select(func.count(SniperAlert.id)).where(
SniperAlert.user_id == current_user.id
)
)
alert_count = user_alerts.scalar() or 0
from app.models.subscription import TIER_CONFIG, SubscriptionTier
tier = current_user.subscription.tier if current_user.subscription else SubscriptionTier.SCOUT
tier_config = TIER_CONFIG.get(tier, TIER_CONFIG[SubscriptionTier.SCOUT])
max_alerts = tier_config.get("sniper_limit", 2)
if alert_count >= max_alerts:
raise HTTPException(
status_code=403,
detail=f"Alert limit reached ({max_alerts}). Upgrade for more."
)
# SMS notifications are Tycoon only
if data.notify_sms and tier != "tycoon":
raise HTTPException(
status_code=403,
detail="SMS notifications are a Tycoon feature"
)
# Build filter criteria JSON
filter_criteria = {
"tlds": data.tlds.split(',') if data.tlds else None,
"keywords": data.keywords.split(',') if data.keywords else None,
"exclude_keywords": data.exclude_keywords.split(',') if data.exclude_keywords else None,
"max_length": data.max_length,
"min_length": data.min_length,
"max_price": data.max_price,
"min_price": data.min_price,
"max_bids": data.max_bids,
"ending_within_hours": data.ending_within_hours,
"platforms": data.platforms.split(',') if data.platforms else None,
"no_numbers": data.no_numbers,
"no_hyphens": data.no_hyphens,
"exclude_chars": data.exclude_chars.split(',') if data.exclude_chars else None,
}
alert = SniperAlert(
user_id=current_user.id,
name=data.name,
description=data.description,
filter_criteria=filter_criteria,
tlds=data.tlds,
keywords=data.keywords,
exclude_keywords=data.exclude_keywords,
max_length=data.max_length,
min_length=data.min_length,
max_price=data.max_price,
min_price=data.min_price,
max_bids=data.max_bids,
ending_within_hours=data.ending_within_hours,
platforms=data.platforms,
no_numbers=data.no_numbers,
no_hyphens=data.no_hyphens,
exclude_chars=data.exclude_chars,
notify_email=data.notify_email,
notify_sms=data.notify_sms,
)
db.add(alert)
await db.commit()
await db.refresh(alert)
return SniperAlertResponse(
id=alert.id,
name=alert.name,
description=alert.description,
tlds=alert.tlds,
keywords=alert.keywords,
exclude_keywords=alert.exclude_keywords,
max_length=alert.max_length,
min_length=alert.min_length,
max_price=alert.max_price,
min_price=alert.min_price,
max_bids=alert.max_bids,
ending_within_hours=alert.ending_within_hours,
platforms=alert.platforms,
no_numbers=alert.no_numbers,
no_hyphens=alert.no_hyphens,
exclude_chars=alert.exclude_chars,
notify_email=alert.notify_email,
notify_sms=alert.notify_sms,
is_active=alert.is_active,
matches_count=alert.matches_count,
notifications_sent=alert.notifications_sent,
last_matched_at=alert.last_matched_at,
created_at=alert.created_at,
)
@router.put("/{id}", response_model=SniperAlertResponse)
async def update_sniper_alert(
id: int,
data: SniperAlertUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update a sniper alert."""
result = await db.execute(
select(SniperAlert).where(
and_(
SniperAlert.id == id,
SniperAlert.user_id == current_user.id,
)
)
)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
# Update fields
update_fields = data.model_dump(exclude_unset=True)
for field, value in update_fields.items():
if hasattr(alert, field):
setattr(alert, field, value)
await db.commit()
await db.refresh(alert)
return SniperAlertResponse(
id=alert.id,
name=alert.name,
description=alert.description,
tlds=alert.tlds,
keywords=alert.keywords,
exclude_keywords=alert.exclude_keywords,
max_length=alert.max_length,
min_length=alert.min_length,
max_price=alert.max_price,
min_price=alert.min_price,
max_bids=alert.max_bids,
ending_within_hours=alert.ending_within_hours,
platforms=alert.platforms,
no_numbers=alert.no_numbers,
no_hyphens=alert.no_hyphens,
exclude_chars=alert.exclude_chars,
notify_email=alert.notify_email,
notify_sms=alert.notify_sms,
is_active=alert.is_active,
matches_count=alert.matches_count,
notifications_sent=alert.notifications_sent,
last_matched_at=alert.last_matched_at,
created_at=alert.created_at,
)
@router.delete("/{id}")
async def delete_sniper_alert(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete a sniper alert."""
result = await db.execute(
select(SniperAlert).where(
and_(
SniperAlert.id == id,
SniperAlert.user_id == current_user.id,
)
)
)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
await db.delete(alert)
await db.commit()
return {"success": True, "message": "Alert deleted"}
@router.get("/{id}/matches", response_model=List[MatchResponse])
async def get_alert_matches(
id: int,
limit: int = 50,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get matched auctions for an alert."""
# Verify ownership
result = await db.execute(
select(SniperAlert).where(
and_(
SniperAlert.id == id,
SniperAlert.user_id == current_user.id,
)
)
)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
matches_result = await db.execute(
select(SniperAlertMatch)
.where(SniperAlertMatch.alert_id == id)
.order_by(SniperAlertMatch.matched_at.desc())
.limit(limit)
)
matches = list(matches_result.scalars().all())
return [
MatchResponse(
id=m.id,
domain=m.domain,
platform=m.platform,
current_bid=m.current_bid,
end_time=m.end_time,
auction_url=m.auction_url,
matched_at=m.matched_at,
notified=m.notified,
)
for m in matches
]
@router.post("/{id}/test")
async def test_sniper_alert(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Test alert against current auctions."""
# Verify ownership
result = await db.execute(
select(SniperAlert).where(
and_(
SniperAlert.id == id,
SniperAlert.user_id == current_user.id,
)
)
)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
# Get active auctions
auctions_result = await db.execute(
select(DomainAuction)
.where(DomainAuction.is_active == True)
.limit(500)
)
auctions = list(auctions_result.scalars().all())
matches = []
for auction in auctions:
if alert.matches_domain(
auction.domain,
auction.tld,
auction.current_bid,
auction.num_bids
):
matches.append({
"domain": auction.domain,
"platform": auction.platform,
"current_bid": auction.current_bid,
"num_bids": auction.num_bids,
"end_time": auction.end_time.isoformat(),
})
return {
"alert_name": alert.name,
"auctions_checked": len(auctions),
"matches_found": len(matches),
"matches": matches[:20], # Limit to 20 for preview
"message": f"Found {len(matches)} matching auctions" if matches else "No matches found. Try adjusting your criteria.",
}