pounce/backend/app/api/sniper_alerts.py
yves.gugger 18d50e96f4 feat: Add For Sale Marketplace + Sniper Alerts
BACKEND - New Models:
- DomainListing: For sale landing pages with DNS verification
- ListingInquiry: Contact form submissions from buyers
- ListingView: Analytics tracking
- SniperAlert: Hyper-personalized auction filters
- SniperAlertMatch: Matched auctions for alerts

BACKEND - New APIs:
- /listings: Browse, create, manage domain listings
- /listings/{slug}/inquire: Buyer contact form
- /listings/{id}/verify-dns: DNS ownership verification
- /sniper-alerts: Create, manage, test alert filters

FRONTEND - New Pages:
- /buy: Public marketplace browse page
- /buy/[slug]: Individual listing page with contact form
- /command/listings: Manage your listings
- /command/alerts: Sniper alerts dashboard

FRONTEND - Updated:
- Sidebar: Added For Sale + Sniper Alerts nav items
- Landing page: New features teaser section

DOCS:
- DATABASE_MIGRATIONS.md: Complete SQL for new tables

From analysis_3.md:
- Strategie 2: Micro-Marktplatz (For Sale Pages)
- Strategie 4: Alerts nach Maß (Sniper Alerts)
- Säule 2: DNS Ownership Verification
2025-12-10 11:44:56 +01:00

458 lines
14 KiB
Python

"""
Sniper Alerts API - Hyper-personalized auction notifications
This implements "Strategie 4: Alerts nach Maß" from analysis_3.md:
"Der User kann extrem spezifische Filter speichern:
- Informiere mich NUR, wenn eine 4-Letter .com Domain droppt, die kein 'q' oder 'x' enthält."
Endpoints:
- GET /sniper-alerts - Get user's alerts
- POST /sniper-alerts - Create new alert
- PUT /sniper-alerts/{id} - Update alert
- DELETE /sniper-alerts/{id} - Delete alert
- GET /sniper-alerts/{id}/matches - Get matched auctions
- POST /sniper-alerts/{id}/test - Test alert against current auctions
"""
import logging
from datetime import datetime
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user
from app.models.user import User
from app.models.sniper_alert import SniperAlert, SniperAlertMatch
from app.models.auction import DomainAuction
logger = logging.getLogger(__name__)
router = APIRouter()
# ============== Schemas ==============
class SniperAlertCreate(BaseModel):
"""Create a new sniper alert."""
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=500)
# Filter criteria
tlds: Optional[str] = Field(None, description="Comma-separated TLDs: com,io,ai")
keywords: Optional[str] = Field(None, description="Must contain (comma-separated)")
exclude_keywords: Optional[str] = Field(None, description="Must not contain")
max_length: Optional[int] = Field(None, ge=1, le=63)
min_length: Optional[int] = Field(None, ge=1, le=63)
max_price: Optional[float] = Field(None, ge=0)
min_price: Optional[float] = Field(None, ge=0)
max_bids: Optional[int] = Field(None, ge=0, description="Max bids (low competition)")
ending_within_hours: Optional[int] = Field(None, ge=1, le=168)
platforms: Optional[str] = Field(None, description="Comma-separated platforms")
# Advanced
no_numbers: bool = False
no_hyphens: bool = False
exclude_chars: Optional[str] = Field(None, description="Chars to exclude: q,x,z")
# Notifications
notify_email: bool = True
notify_sms: bool = False
class SniperAlertUpdate(BaseModel):
"""Update a sniper alert."""
name: Optional[str] = Field(None, max_length=100)
description: Optional[str] = Field(None, max_length=500)
tlds: Optional[str] = None
keywords: Optional[str] = None
exclude_keywords: Optional[str] = None
max_length: Optional[int] = Field(None, ge=1, le=63)
min_length: Optional[int] = Field(None, ge=1, le=63)
max_price: Optional[float] = Field(None, ge=0)
min_price: Optional[float] = Field(None, ge=0)
max_bids: Optional[int] = Field(None, ge=0)
ending_within_hours: Optional[int] = Field(None, ge=1, le=168)
platforms: Optional[str] = None
no_numbers: Optional[bool] = None
no_hyphens: Optional[bool] = None
exclude_chars: Optional[str] = None
notify_email: Optional[bool] = None
notify_sms: Optional[bool] = None
is_active: Optional[bool] = None
class SniperAlertResponse(BaseModel):
"""Sniper alert response."""
id: int
name: str
description: Optional[str]
tlds: Optional[str]
keywords: Optional[str]
exclude_keywords: Optional[str]
max_length: Optional[int]
min_length: Optional[int]
max_price: Optional[float]
min_price: Optional[float]
max_bids: Optional[int]
ending_within_hours: Optional[int]
platforms: Optional[str]
no_numbers: bool
no_hyphens: bool
exclude_chars: Optional[str]
notify_email: bool
notify_sms: bool
is_active: bool
matches_count: int
notifications_sent: int
last_matched_at: Optional[datetime]
created_at: datetime
class Config:
from_attributes = True
class MatchResponse(BaseModel):
"""Alert match response."""
id: int
domain: str
platform: str
current_bid: float
end_time: datetime
auction_url: Optional[str]
matched_at: datetime
notified: bool
class Config:
from_attributes = True
# ============== Endpoints ==============
@router.get("", response_model=List[SniperAlertResponse])
async def get_sniper_alerts(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get user's sniper alerts."""
result = await db.execute(
select(SniperAlert)
.where(SniperAlert.user_id == current_user.id)
.order_by(SniperAlert.created_at.desc())
)
alerts = list(result.scalars().all())
return [
SniperAlertResponse(
id=alert.id,
name=alert.name,
description=alert.description,
tlds=alert.tlds,
keywords=alert.keywords,
exclude_keywords=alert.exclude_keywords,
max_length=alert.max_length,
min_length=alert.min_length,
max_price=alert.max_price,
min_price=alert.min_price,
max_bids=alert.max_bids,
ending_within_hours=alert.ending_within_hours,
platforms=alert.platforms,
no_numbers=alert.no_numbers,
no_hyphens=alert.no_hyphens,
exclude_chars=alert.exclude_chars,
notify_email=alert.notify_email,
notify_sms=alert.notify_sms,
is_active=alert.is_active,
matches_count=alert.matches_count,
notifications_sent=alert.notifications_sent,
last_matched_at=alert.last_matched_at,
created_at=alert.created_at,
)
for alert in alerts
]
@router.post("", response_model=SniperAlertResponse)
async def create_sniper_alert(
data: SniperAlertCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new sniper alert."""
# Check alert limit based on subscription
user_alerts = await db.execute(
select(func.count(SniperAlert.id)).where(
SniperAlert.user_id == current_user.id
)
)
alert_count = user_alerts.scalar() or 0
tier = current_user.subscription.tier if current_user.subscription else "scout"
limits = {"scout": 2, "trader": 10, "tycoon": 50}
max_alerts = limits.get(tier, 2)
if alert_count >= max_alerts:
raise HTTPException(
status_code=403,
detail=f"Alert limit reached ({max_alerts}). Upgrade for more."
)
# SMS notifications are Tycoon only
if data.notify_sms and tier != "tycoon":
raise HTTPException(
status_code=403,
detail="SMS notifications are a Tycoon feature"
)
# Build filter criteria JSON
filter_criteria = {
"tlds": data.tlds.split(',') if data.tlds else None,
"keywords": data.keywords.split(',') if data.keywords else None,
"exclude_keywords": data.exclude_keywords.split(',') if data.exclude_keywords else None,
"max_length": data.max_length,
"min_length": data.min_length,
"max_price": data.max_price,
"min_price": data.min_price,
"max_bids": data.max_bids,
"ending_within_hours": data.ending_within_hours,
"platforms": data.platforms.split(',') if data.platforms else None,
"no_numbers": data.no_numbers,
"no_hyphens": data.no_hyphens,
"exclude_chars": data.exclude_chars.split(',') if data.exclude_chars else None,
}
alert = SniperAlert(
user_id=current_user.id,
name=data.name,
description=data.description,
filter_criteria=filter_criteria,
tlds=data.tlds,
keywords=data.keywords,
exclude_keywords=data.exclude_keywords,
max_length=data.max_length,
min_length=data.min_length,
max_price=data.max_price,
min_price=data.min_price,
max_bids=data.max_bids,
ending_within_hours=data.ending_within_hours,
platforms=data.platforms,
no_numbers=data.no_numbers,
no_hyphens=data.no_hyphens,
exclude_chars=data.exclude_chars,
notify_email=data.notify_email,
notify_sms=data.notify_sms,
)
db.add(alert)
await db.commit()
await db.refresh(alert)
return SniperAlertResponse(
id=alert.id,
name=alert.name,
description=alert.description,
tlds=alert.tlds,
keywords=alert.keywords,
exclude_keywords=alert.exclude_keywords,
max_length=alert.max_length,
min_length=alert.min_length,
max_price=alert.max_price,
min_price=alert.min_price,
max_bids=alert.max_bids,
ending_within_hours=alert.ending_within_hours,
platforms=alert.platforms,
no_numbers=alert.no_numbers,
no_hyphens=alert.no_hyphens,
exclude_chars=alert.exclude_chars,
notify_email=alert.notify_email,
notify_sms=alert.notify_sms,
is_active=alert.is_active,
matches_count=alert.matches_count,
notifications_sent=alert.notifications_sent,
last_matched_at=alert.last_matched_at,
created_at=alert.created_at,
)
@router.put("/{id}", response_model=SniperAlertResponse)
async def update_sniper_alert(
id: int,
data: SniperAlertUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update a sniper alert."""
result = await db.execute(
select(SniperAlert).where(
and_(
SniperAlert.id == id,
SniperAlert.user_id == current_user.id,
)
)
)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
# Update fields
update_fields = data.model_dump(exclude_unset=True)
for field, value in update_fields.items():
if hasattr(alert, field):
setattr(alert, field, value)
await db.commit()
await db.refresh(alert)
return SniperAlertResponse(
id=alert.id,
name=alert.name,
description=alert.description,
tlds=alert.tlds,
keywords=alert.keywords,
exclude_keywords=alert.exclude_keywords,
max_length=alert.max_length,
min_length=alert.min_length,
max_price=alert.max_price,
min_price=alert.min_price,
max_bids=alert.max_bids,
ending_within_hours=alert.ending_within_hours,
platforms=alert.platforms,
no_numbers=alert.no_numbers,
no_hyphens=alert.no_hyphens,
exclude_chars=alert.exclude_chars,
notify_email=alert.notify_email,
notify_sms=alert.notify_sms,
is_active=alert.is_active,
matches_count=alert.matches_count,
notifications_sent=alert.notifications_sent,
last_matched_at=alert.last_matched_at,
created_at=alert.created_at,
)
@router.delete("/{id}")
async def delete_sniper_alert(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete a sniper alert."""
result = await db.execute(
select(SniperAlert).where(
and_(
SniperAlert.id == id,
SniperAlert.user_id == current_user.id,
)
)
)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
await db.delete(alert)
await db.commit()
return {"success": True, "message": "Alert deleted"}
@router.get("/{id}/matches", response_model=List[MatchResponse])
async def get_alert_matches(
id: int,
limit: int = 50,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get matched auctions for an alert."""
# Verify ownership
result = await db.execute(
select(SniperAlert).where(
and_(
SniperAlert.id == id,
SniperAlert.user_id == current_user.id,
)
)
)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
matches_result = await db.execute(
select(SniperAlertMatch)
.where(SniperAlertMatch.alert_id == id)
.order_by(SniperAlertMatch.matched_at.desc())
.limit(limit)
)
matches = list(matches_result.scalars().all())
return [
MatchResponse(
id=m.id,
domain=m.domain,
platform=m.platform,
current_bid=m.current_bid,
end_time=m.end_time,
auction_url=m.auction_url,
matched_at=m.matched_at,
notified=m.notified,
)
for m in matches
]
@router.post("/{id}/test")
async def test_sniper_alert(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Test alert against current auctions."""
# Verify ownership
result = await db.execute(
select(SniperAlert).where(
and_(
SniperAlert.id == id,
SniperAlert.user_id == current_user.id,
)
)
)
alert = result.scalar_one_or_none()
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
# Get active auctions
auctions_result = await db.execute(
select(DomainAuction)
.where(DomainAuction.is_active == True)
.limit(500)
)
auctions = list(auctions_result.scalars().all())
matches = []
for auction in auctions:
if alert.matches_domain(
auction.domain,
auction.tld,
auction.current_bid,
auction.num_bids
):
matches.append({
"domain": auction.domain,
"platform": auction.platform,
"current_bid": auction.current_bid,
"num_bids": auction.num_bids,
"end_time": auction.end_time.isoformat(),
})
return {
"alert_name": alert.name,
"auctions_checked": len(auctions),
"matches_found": len(matches),
"matches": matches[:20], # Limit to 20 for preview
"message": f"Found {len(matches)} matching auctions" if matches else "No matches found. Try adjusting your criteria.",
}