pounce/backend/app/api/auth.py
yves.gugger 58228e3d33
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: integrate Pounce self-promotion & viral growth system
Pounce Eigenwerbung (from pounce_endgame.md):
- Add 'pounce_promo' as fallback partner for generic/unclear intent domains
- Create dedicated Pounce promo landing page with CTA to register
- Update footer on all yield pages: 'Monetized by Pounce • Own a domain? Start yielding'

Tech/Investment Domain Detection:
- Add 'investment_domains' category (invest, crypto, trading, domain, startup)
- Add 'tech_dev' category (developer, web3, fintech, proptech)
- Both categories have 'pounce_affinity' flag for higher Pounce conversion

Referral Tracking for Domain Owners:
- Add user fields: referred_by_user_id, referred_by_domain, referral_code
- Parse yield referral codes (yield_{user_id}_{domain_id}) on registration
- Domain owners earn lifetime commission when visitors sign up via their domain

DB Migrations:
- Add referral tracking columns to users table
2025-12-12 15:27:53 +01:00

455 lines
15 KiB
Python

"""
Authentication API endpoints.
Endpoints:
- POST /auth/register - Register new user
- POST /auth/login - Login and get JWT token
- GET /auth/me - Get current user info
- PUT /auth/me - Update current user
- POST /auth/forgot-password - Request password reset
- POST /auth/reset-password - Reset password with token
- POST /auth/verify-email - Verify email address
- POST /auth/resend-verification - Resend verification email
"""
import os
import secrets
import logging
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, HTTPException, status, BackgroundTasks, Request, Response
from pydantic import BaseModel, EmailStr
from sqlalchemy import select
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.api.deps import Database, CurrentUser
from app.config import get_settings
from app.schemas.auth import UserCreate, UserLogin, UserResponse, LoginResponse
from app.services.auth import AuthService
from app.services.email_service import email_service
from app.models.user import User
from app.security import set_auth_cookie, clear_auth_cookie
logger = logging.getLogger(__name__)
router = APIRouter()
# Rate limiter for auth endpoints
limiter = Limiter(key_func=get_remote_address)
settings = get_settings()
# ============== Schemas ==============
class ForgotPasswordRequest(BaseModel):
"""Request password reset."""
email: EmailStr
class ResetPasswordRequest(BaseModel):
"""Reset password with token."""
token: str
new_password: str
class VerifyEmailRequest(BaseModel):
"""Verify email with token."""
token: str
class MessageResponse(BaseModel):
"""Simple message response."""
message: str
success: bool = True
class UpdateUserRequest(BaseModel):
"""Update user profile."""
name: Optional[str] = None
# ============== Endpoints ==============
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(
user_data: UserCreate,
db: Database,
background_tasks: BackgroundTasks,
):
"""
Register a new user.
- Creates user account
- Sends verification email (if SMTP configured)
- Returns user info (without password)
"""
# Check if user exists
existing_user = await AuthService.get_user_by_email(db, user_data.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Create user
user = await AuthService.create_user(
db=db,
email=user_data.email,
password=user_data.password,
name=user_data.name,
)
# Process yield referral if present
# Format: yield_{user_id}_{domain_id}
if user_data.ref and user_data.ref.startswith("yield_"):
try:
parts = user_data.ref.split("_")
if len(parts) >= 3:
referrer_user_id = int(parts[1])
# Store referral info
user.referred_by_user_id = referrer_user_id
user.referral_code = user_data.ref
# Try to get domain name from yield_domain_id
try:
from app.models.yield_domain import YieldDomain
yield_domain_id = int(parts[2])
yield_domain = await db.execute(
select(YieldDomain).where(YieldDomain.id == yield_domain_id)
)
yd = yield_domain.scalar_one_or_none()
if yd:
user.referred_by_domain = yd.domain
except Exception:
pass
await db.commit()
logger.info(f"User {user.email} referred by user {referrer_user_id}")
except Exception as e:
logger.warning(f"Failed to process referral code: {user_data.ref}, error: {e}")
# Auto-admin for specific email
ADMIN_EMAILS = ["guggeryves@hotmail.com"]
if user.email.lower() in [e.lower() for e in ADMIN_EMAILS]:
user.is_admin = True
user.is_verified = True # Auto-verify admins
await db.commit()
# Give admin Tycoon subscription (only if no subscription exists)
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
from sqlalchemy import select
# Check if subscription already exists
existing_sub = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
if not existing_sub.scalar_one_or_none():
tycoon_config = TIER_CONFIG.get(SubscriptionTier.TYCOON, {})
subscription = Subscription(
user_id=user.id,
tier=SubscriptionTier.TYCOON,
status=SubscriptionStatus.ACTIVE,
max_domains=tycoon_config.get("domain_limit", 500),
)
db.add(subscription)
await db.commit()
# Generate verification token
verification_token = secrets.token_urlsafe(32)
user.email_verification_token = verification_token
user.email_verification_expires = datetime.utcnow() + timedelta(hours=24)
await db.commit()
# Send verification email in background
if email_service.is_configured:
site_url = os.getenv("SITE_URL", "http://localhost:3000")
verify_url = f"{site_url}/verify-email?token={verification_token}"
background_tasks.add_task(
email_service.send_email_verification,
to_email=user.email,
user_name=user.name or "there",
verification_url=verify_url,
)
return user
@router.post("/login", response_model=LoginResponse)
async def login(user_data: UserLogin, db: Database, response: Response):
"""
Authenticate user and return JWT token.
Note: Email verification is currently not enforced.
Set REQUIRE_EMAIL_VERIFICATION=true to enforce.
"""
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
from sqlalchemy import select
user = await AuthService.authenticate_user(db, user_data.email, user_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Auto-admin for specific email
ADMIN_EMAILS = ["guggeryves@hotmail.com"]
if user.email.lower() in [e.lower() for e in ADMIN_EMAILS]:
if not user.is_admin:
user.is_admin = True
user.is_verified = True # Auto-verify admins
await db.commit()
# Ensure admin has Tycoon subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = sub_result.scalar_one_or_none()
tycoon_config = TIER_CONFIG.get(SubscriptionTier.TYCOON, {})
if not subscription:
subscription = Subscription(
user_id=user.id,
tier=SubscriptionTier.TYCOON,
status=SubscriptionStatus.ACTIVE,
max_domains=tycoon_config.get("domain_limit", 500),
)
db.add(subscription)
await db.commit()
elif subscription.tier != SubscriptionTier.TYCOON:
subscription.tier = SubscriptionTier.TYCOON
subscription.max_domains = tycoon_config.get("domain_limit", 500)
subscription.status = SubscriptionStatus.ACTIVE
await db.commit()
# Optional: Check email verification
require_verification = os.getenv("REQUIRE_EMAIL_VERIFICATION", "false").lower() == "true"
if require_verification and not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Please verify your email address before logging in",
)
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = AuthService.create_access_token(
data={"sub": str(user.id), "email": user.email},
expires_delta=access_token_expires,
)
# Set HttpOnly cookie (preferred for browser clients)
set_auth_cookie(
response=response,
token=access_token,
max_age_seconds=settings.access_token_expire_minutes * 60,
)
# Do NOT return the token in the response body (prevents leaks via logs/JS storage)
return LoginResponse(expires_in=settings.access_token_expire_minutes * 60)
@router.post("/logout", response_model=MessageResponse)
async def logout(response: Response):
"""Clear auth cookie."""
clear_auth_cookie(response)
return MessageResponse(message="Logged out")
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(current_user: CurrentUser):
"""Get current user information."""
return current_user
@router.put("/me", response_model=UserResponse)
async def update_current_user(
update_data: UpdateUserRequest,
current_user: CurrentUser,
db: Database,
):
"""Update current user information."""
if update_data.name is not None:
current_user.name = update_data.name
await db.commit()
await db.refresh(current_user)
return current_user
@router.post("/forgot-password", response_model=MessageResponse)
async def forgot_password(
request: ForgotPasswordRequest,
db: Database,
background_tasks: BackgroundTasks,
):
"""
Request password reset email.
- Always returns success (to prevent email enumeration)
- If email exists, sends reset link
- Reset token expires in 1 hour
"""
# Always return success (security: don't reveal if email exists)
success_message = "If an account with this email exists, a password reset link has been sent."
# Look up user
result = await db.execute(
select(User).where(User.email == request.email.lower())
)
user = result.scalar_one_or_none()
if not user:
# Return success anyway (security)
return MessageResponse(message=success_message)
# Generate reset token
reset_token = secrets.token_urlsafe(32)
user.password_reset_token = reset_token
user.password_reset_expires = datetime.utcnow() + timedelta(hours=1)
await db.commit()
# Send reset email in background
if email_service.is_configured:
site_url = os.getenv("SITE_URL", "http://localhost:3000")
reset_url = f"{site_url}/reset-password?token={reset_token}"
background_tasks.add_task(
email_service.send_password_reset,
to_email=user.email,
user_name=user.name or "there",
reset_url=reset_url,
)
logger.info(f"Password reset email queued for {user.email}")
else:
logger.warning(f"SMTP not configured, cannot send reset email for {user.email}")
return MessageResponse(message=success_message)
@router.post("/reset-password", response_model=MessageResponse)
async def reset_password(
request: ResetPasswordRequest,
db: Database,
):
"""
Reset password using token from email.
- Token must be valid and not expired
- Password must be at least 8 characters
- Invalidates token after use
"""
if len(request.new_password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password must be at least 8 characters",
)
# Find user with valid token
result = await db.execute(
select(User).where(
User.password_reset_token == request.token,
User.password_reset_expires > datetime.utcnow(),
)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token",
)
# Update password
user.password_hash = AuthService.get_password_hash(request.new_password)
user.password_reset_token = None
user.password_reset_expires = None
await db.commit()
logger.info(f"Password reset successful for user {user.id}")
return MessageResponse(message="Password has been reset successfully. You can now log in.")
@router.post("/verify-email", response_model=MessageResponse)
async def verify_email(
request: VerifyEmailRequest,
db: Database,
):
"""
Verify email address using token from email.
- Token must be valid and not expired
- Marks user as verified
"""
# Find user with valid token
result = await db.execute(
select(User).where(
User.email_verification_token == request.token,
User.email_verification_expires > datetime.utcnow(),
)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired verification token",
)
# Mark as verified
user.is_verified = True
user.email_verification_token = None
user.email_verification_expires = None
await db.commit()
logger.info(f"Email verified for user {user.id}")
return MessageResponse(message="Email verified successfully. You can now log in.")
@router.post("/resend-verification", response_model=MessageResponse)
async def resend_verification(
request: ForgotPasswordRequest, # Reuse schema - just needs email
db: Database,
background_tasks: BackgroundTasks,
):
"""
Resend verification email.
- Rate limited to prevent abuse
- Always returns success (security)
"""
success_message = "If an unverified account with this email exists, a verification link has been sent."
# Look up user
result = await db.execute(
select(User).where(User.email == request.email.lower())
)
user = result.scalar_one_or_none()
if not user or user.is_verified:
return MessageResponse(message=success_message)
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
user.email_verification_token = verification_token
user.email_verification_expires = datetime.utcnow() + timedelta(hours=24)
await db.commit()
# Send verification email
if email_service.is_configured:
site_url = os.getenv("SITE_URL", "http://localhost:3000")
verify_url = f"{site_url}/verify-email?token={verification_token}"
background_tasks.add_task(
email_service.send_email_verification,
to_email=user.email,
user_name=user.name or "there",
verification_url=verify_url,
)
return MessageResponse(message=success_message)