pounce/backend/app/api/domains.py
yves.gugger dc77b2110a feat: Complete Watchlist monitoring, Portfolio tracking & Listings marketplace
## Watchlist & Monitoring
-  Automatic domain monitoring based on subscription tier
-  Email alerts when domains become available
-  Health checks (DNS/HTTP/SSL) with caching
-  Expiry warnings for domains <30 days
-  Weekly digest emails
-  Instant alert toggle (optimistic UI updates)
-  Redesigned health check overlays with full details
- 🔒 'Not public' display for .ch/.de domains without public expiry

## Portfolio Management (NEW)
-  Track owned domains with purchase price & date
-  ROI calculation (unrealized & realized)
-  Domain valuation with auto-refresh
-  Renewal date tracking
-  Sale recording with profit calculation
-  List domains for sale directly from portfolio
-  Full portfolio summary dashboard

## Listings / For Sale
-  Renamed from 'Portfolio' to 'For Sale'
-  Fixed listing limits: Scout=0, Trader=5, Tycoon=50
-  Featured badge for Tycoon listings
-  Inquiries modal for sellers
-  Email notifications when buyer inquires
-  Inquiries column in listings table

## Scrapers & Data
-  Added 4 new registrar scrapers (Namecheap, Cloudflare, GoDaddy, Dynadot)
-  Increased scraping frequency to 2x daily (03:00 & 15:00 UTC)
-  Real historical data from database
-  Fixed RDAP/WHOIS for .ch/.de domains
-  Enhanced SSL certificate parsing

## Scheduler Jobs
-  Tiered domain checks (Scout=daily, Trader=hourly, Tycoon=10min)
-  Daily health checks (06:00 UTC)
-  Weekly expiry warnings (Mon 08:00 UTC)
-  Weekly digest emails (Sun 10:00 UTC)
-  Auction cleanup every 15 minutes

## UI/UX Improvements
-  Removed 'Back' buttons from Intel pages
-  Redesigned Radar page to match Market/Intel design
-  Less prominent check frequency footer
-  Consistent StatCard components across all pages
-  Ambient background glows
-  Better error handling

## Documentation
-  Updated README with monitoring section
-  Added env.example with all required variables
-  Updated Memory Bank (activeContext.md)
-  SMTP configuration requirements documented
2025-12-11 16:57:28 +01:00

426 lines
12 KiB
Python

"""Domain management API (requires authentication)."""
from datetime import datetime
from math import ceil
from fastapi import APIRouter, HTTPException, status, Query
from pydantic import BaseModel
from sqlalchemy import select, func
from app.api.deps import Database, CurrentUser
from app.models.domain import Domain, DomainCheck, DomainStatus
from app.models.subscription import TIER_CONFIG, SubscriptionTier
from app.schemas.domain import DomainCreate, DomainResponse, DomainListResponse
from app.services.domain_checker import domain_checker
from app.services.domain_health import get_health_checker, HealthStatus
router = APIRouter()
@router.get("", response_model=DomainListResponse)
async def list_domains(
current_user: CurrentUser,
db: Database,
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
):
"""Get list of monitored domains for current user."""
# Count total
count_query = select(func.count(Domain.id)).where(Domain.user_id == current_user.id)
total = (await db.execute(count_query)).scalar()
# Get domains with pagination
offset = (page - 1) * per_page
query = (
select(Domain)
.where(Domain.user_id == current_user.id)
.order_by(Domain.created_at.desc())
.offset(offset)
.limit(per_page)
)
result = await db.execute(query)
domains = result.scalars().all()
return DomainListResponse(
domains=[DomainResponse.model_validate(d) for d in domains],
total=total,
page=page,
per_page=per_page,
pages=ceil(total / per_page) if total > 0 else 1,
)
@router.post("", response_model=DomainResponse, status_code=status.HTTP_201_CREATED)
async def add_domain(
domain_data: DomainCreate,
current_user: CurrentUser,
db: Database,
):
"""Add a domain to monitoring list."""
# Check subscription limit
await db.refresh(current_user, ["subscription", "domains"])
if current_user.subscription:
limit = current_user.subscription.max_domains
else:
limit = TIER_CONFIG[SubscriptionTier.SCOUT]["domain_limit"]
current_count = len(current_user.domains)
if current_count >= limit:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Domain limit reached ({limit}). Upgrade your subscription to add more domains.",
)
# Check if domain already exists for this user
existing = await db.execute(
select(Domain).where(
Domain.user_id == current_user.id,
Domain.name == domain_data.name,
)
)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Domain already in your monitoring list",
)
# Check domain availability
check_result = await domain_checker.check_domain(domain_data.name)
# Create domain
domain = Domain(
name=domain_data.name,
user_id=current_user.id,
status=check_result.status,
is_available=check_result.is_available,
registrar=check_result.registrar,
expiration_date=check_result.expiration_date,
notify_on_available=domain_data.notify_on_available,
last_checked=datetime.utcnow(),
)
db.add(domain)
await db.flush()
# Create initial check record
check = DomainCheck(
domain_id=domain.id,
status=check_result.status,
is_available=check_result.is_available,
response_data=str(check_result.to_dict()),
checked_at=datetime.utcnow(),
)
db.add(check)
await db.commit()
await db.refresh(domain)
return domain
@router.get("/{domain_id}", response_model=DomainResponse)
async def get_domain(
domain_id: int,
current_user: CurrentUser,
db: Database,
):
"""Get a specific domain."""
result = await db.execute(
select(Domain).where(
Domain.id == domain_id,
Domain.user_id == current_user.id,
)
)
domain = result.scalar_one_or_none()
if not domain:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Domain not found",
)
return domain
@router.delete("/{domain_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_domain(
domain_id: int,
current_user: CurrentUser,
db: Database,
):
"""Remove a domain from monitoring list."""
result = await db.execute(
select(Domain).where(
Domain.id == domain_id,
Domain.user_id == current_user.id,
)
)
domain = result.scalar_one_or_none()
if not domain:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Domain not found",
)
await db.delete(domain)
await db.commit()
@router.post("/{domain_id}/refresh", response_model=DomainResponse)
async def refresh_domain(
domain_id: int,
current_user: CurrentUser,
db: Database,
):
"""Manually refresh domain availability status."""
result = await db.execute(
select(Domain).where(
Domain.id == domain_id,
Domain.user_id == current_user.id,
)
)
domain = result.scalar_one_or_none()
if not domain:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Domain not found",
)
# Check domain
check_result = await domain_checker.check_domain(domain.name)
# Update domain
domain.status = check_result.status
domain.is_available = check_result.is_available
domain.registrar = check_result.registrar
domain.expiration_date = check_result.expiration_date
domain.last_checked = datetime.utcnow()
# Create check record
check = DomainCheck(
domain_id=domain.id,
status=check_result.status,
is_available=check_result.is_available,
response_data=str(check_result.to_dict()),
checked_at=datetime.utcnow(),
)
db.add(check)
await db.commit()
await db.refresh(domain)
return domain
class NotifyUpdate(BaseModel):
"""Schema for updating notification settings."""
notify: bool
@router.patch("/{domain_id}/notify", response_model=DomainResponse)
async def update_notification_settings(
domain_id: int,
data: NotifyUpdate,
current_user: CurrentUser,
db: Database,
):
"""Update notification settings for a domain."""
result = await db.execute(
select(Domain).where(
Domain.id == domain_id,
Domain.user_id == current_user.id,
)
)
domain = result.scalar_one_or_none()
if not domain:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Domain not found",
)
domain.notify_on_available = data.notify
await db.commit()
await db.refresh(domain)
return domain
@router.patch("/{domain_id}/expiry", response_model=DomainResponse)
async def update_expiration_date(
domain_id: int,
data: dict,
current_user: CurrentUser,
db: Database,
):
"""
Manually set the expiration date for a domain.
Useful for TLDs like .ch, .de that don't expose expiration via public WHOIS/RDAP.
The date can be found in your registrar's control panel.
"""
from datetime import datetime
result = await db.execute(
select(Domain).where(
Domain.id == domain_id,
Domain.user_id == current_user.id,
)
)
domain = result.scalar_one_or_none()
if not domain:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Domain not found",
)
# Parse and set expiration date
expiration_str = data.get('expiration_date')
if expiration_str:
try:
if isinstance(expiration_str, str):
# Parse ISO format
expiration_str = expiration_str.replace('Z', '+00:00')
domain.expiration_date = datetime.fromisoformat(expiration_str)
else:
domain.expiration_date = expiration_str
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid date format: {e}",
)
else:
domain.expiration_date = None
await db.commit()
await db.refresh(domain)
return domain
@router.get("/{domain_id}/history")
async def get_domain_history(
domain_id: int,
current_user: CurrentUser,
db: Database,
limit: int = Query(30, ge=1, le=365),
):
"""Get check history for a domain (Professional and Enterprise plans)."""
# Verify domain ownership
result = await db.execute(
select(Domain).where(
Domain.id == domain_id,
Domain.user_id == current_user.id,
)
)
domain = result.scalar_one_or_none()
if not domain:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Domain not found",
)
# Check subscription for history access
await db.refresh(current_user, ["subscription"])
if current_user.subscription:
history_days = current_user.subscription.history_days
if history_days == 0:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Check history requires Professional or Enterprise plan",
)
# Limit based on plan (-1 means unlimited)
if history_days > 0:
limit = min(limit, history_days)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Check history requires Professional or Enterprise plan",
)
# Get check history
history_query = (
select(DomainCheck)
.where(DomainCheck.domain_id == domain_id)
.order_by(DomainCheck.checked_at.desc())
.limit(limit)
)
history_result = await db.execute(history_query)
checks = history_result.scalars().all()
return {
"domain": domain.name,
"total_checks": len(checks),
"history": [
{
"id": check.id,
"status": check.status.value if hasattr(check.status, 'value') else check.status,
"is_available": check.is_available,
"checked_at": check.checked_at.isoformat(),
}
for check in checks
]
}
@router.get("/{domain_id}/health")
async def get_domain_health(
domain_id: int,
current_user: CurrentUser,
db: Database,
):
"""
Get comprehensive health report for a domain.
Checks 4 layers:
- DNS: Nameservers, MX records, A records
- HTTP: Website availability, parking detection
- SSL: Certificate validity and expiration
- Status signals and recommendations
Returns:
Health report with score (0-100) and status
"""
# Get domain
result = await db.execute(
select(Domain).where(
Domain.id == domain_id,
Domain.user_id == current_user.id,
)
)
domain = result.scalar_one_or_none()
if not domain:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Domain not found",
)
# Run health check
health_checker = get_health_checker()
report = await health_checker.check_domain(domain.name)
return report.to_dict()
@router.post("/health-check")
async def quick_health_check(
current_user: CurrentUser,
domain: str = Query(..., description="Domain to check"),
):
"""
Quick health check for any domain (doesn't need to be in watchlist).
Premium feature - checks DNS, HTTP, and SSL layers.
"""
# Run health check
health_checker = get_health_checker()
report = await health_checker.check_domain(domain)
return report.to_dict()