pounce/backend/app/api/auth.py
yves.gugger df4d87a643
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: Auto-admin for specific email + POUNCE branding
- guggeryves@hotmail.com automatically gets admin rights on registration
- Changed navigation logo from 'pounce' to 'POUNCE' (uppercase)
- Adjusted letter-spacing for better visual appearance
2025-12-08 16:10:58 +01:00

362 lines
11 KiB
Python

"""
Authentication API endpoints.
Endpoints:
- POST /auth/register - Register new user
- POST /auth/login - Login and get JWT token
- GET /auth/me - Get current user info
- PUT /auth/me - Update current user
- POST /auth/forgot-password - Request password reset
- POST /auth/reset-password - Reset password with token
- POST /auth/verify-email - Verify email address
- POST /auth/resend-verification - Resend verification email
"""
import os
import secrets
import logging
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, HTTPException, status, BackgroundTasks, Request
from pydantic import BaseModel, EmailStr
from sqlalchemy import select
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.api.deps import Database, CurrentUser
from app.config import get_settings
from app.schemas.auth import UserCreate, UserLogin, UserResponse, Token
from app.services.auth import AuthService
from app.services.email_service import email_service
from app.models.user import User
logger = logging.getLogger(__name__)
router = APIRouter()
# Rate limiter for auth endpoints
limiter = Limiter(key_func=get_remote_address)
settings = get_settings()
# ============== Schemas ==============
class ForgotPasswordRequest(BaseModel):
"""Request password reset."""
email: EmailStr
class ResetPasswordRequest(BaseModel):
"""Reset password with token."""
token: str
new_password: str
class VerifyEmailRequest(BaseModel):
"""Verify email with token."""
token: str
class MessageResponse(BaseModel):
"""Simple message response."""
message: str
success: bool = True
class UpdateUserRequest(BaseModel):
"""Update user profile."""
name: Optional[str] = None
# ============== Endpoints ==============
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(
user_data: UserCreate,
db: Database,
background_tasks: BackgroundTasks,
):
"""
Register a new user.
- Creates user account
- Sends verification email (if SMTP configured)
- Returns user info (without password)
"""
# Check if user exists
existing_user = await AuthService.get_user_by_email(db, user_data.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Create user
user = await AuthService.create_user(
db=db,
email=user_data.email,
password=user_data.password,
name=user_data.name,
)
# Auto-admin for specific email
ADMIN_EMAILS = ["guggeryves@hotmail.com"]
if user.email.lower() in [e.lower() for e in ADMIN_EMAILS]:
user.is_admin = True
await db.commit()
# Generate verification token
verification_token = secrets.token_urlsafe(32)
user.email_verification_token = verification_token
user.email_verification_expires = datetime.utcnow() + timedelta(hours=24)
await db.commit()
# Send verification email in background
if email_service.is_configured:
site_url = os.getenv("SITE_URL", "http://localhost:3000")
verify_url = f"{site_url}/verify-email?token={verification_token}"
background_tasks.add_task(
email_service.send_email_verification,
to_email=user.email,
user_name=user.name or "there",
verification_url=verify_url,
)
return user
@router.post("/login", response_model=Token)
async def login(user_data: UserLogin, db: Database):
"""
Authenticate user and return JWT token.
Note: Email verification is currently not enforced.
Set REQUIRE_EMAIL_VERIFICATION=true to enforce.
"""
user = await AuthService.authenticate_user(db, user_data.email, user_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Optional: Check email verification
require_verification = os.getenv("REQUIRE_EMAIL_VERIFICATION", "false").lower() == "true"
if require_verification and not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Please verify your email address before logging in",
)
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = AuthService.create_access_token(
data={"sub": str(user.id), "email": user.email},
expires_delta=access_token_expires,
)
return Token(
access_token=access_token,
token_type="bearer",
expires_in=settings.access_token_expire_minutes * 60,
)
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(current_user: CurrentUser):
"""Get current user information."""
return current_user
@router.put("/me", response_model=UserResponse)
async def update_current_user(
update_data: UpdateUserRequest,
current_user: CurrentUser,
db: Database,
):
"""Update current user information."""
if update_data.name is not None:
current_user.name = update_data.name
await db.commit()
await db.refresh(current_user)
return current_user
@router.post("/forgot-password", response_model=MessageResponse)
async def forgot_password(
request: ForgotPasswordRequest,
db: Database,
background_tasks: BackgroundTasks,
):
"""
Request password reset email.
- Always returns success (to prevent email enumeration)
- If email exists, sends reset link
- Reset token expires in 1 hour
"""
# Always return success (security: don't reveal if email exists)
success_message = "If an account with this email exists, a password reset link has been sent."
# Look up user
result = await db.execute(
select(User).where(User.email == request.email.lower())
)
user = result.scalar_one_or_none()
if not user:
# Return success anyway (security)
return MessageResponse(message=success_message)
# Generate reset token
reset_token = secrets.token_urlsafe(32)
user.password_reset_token = reset_token
user.password_reset_expires = datetime.utcnow() + timedelta(hours=1)
await db.commit()
# Send reset email in background
if email_service.is_configured:
site_url = os.getenv("SITE_URL", "http://localhost:3000")
reset_url = f"{site_url}/reset-password?token={reset_token}"
background_tasks.add_task(
email_service.send_password_reset,
to_email=user.email,
user_name=user.name or "there",
reset_url=reset_url,
)
logger.info(f"Password reset email queued for {user.email}")
else:
logger.warning(f"SMTP not configured, cannot send reset email for {user.email}")
return MessageResponse(message=success_message)
@router.post("/reset-password", response_model=MessageResponse)
async def reset_password(
request: ResetPasswordRequest,
db: Database,
):
"""
Reset password using token from email.
- Token must be valid and not expired
- Password must be at least 8 characters
- Invalidates token after use
"""
if len(request.new_password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password must be at least 8 characters",
)
# Find user with valid token
result = await db.execute(
select(User).where(
User.password_reset_token == request.token,
User.password_reset_expires > datetime.utcnow(),
)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token",
)
# Update password
user.password_hash = AuthService.get_password_hash(request.new_password)
user.password_reset_token = None
user.password_reset_expires = None
await db.commit()
logger.info(f"Password reset successful for user {user.id}")
return MessageResponse(message="Password has been reset successfully. You can now log in.")
@router.post("/verify-email", response_model=MessageResponse)
async def verify_email(
request: VerifyEmailRequest,
db: Database,
):
"""
Verify email address using token from email.
- Token must be valid and not expired
- Marks user as verified
"""
# Find user with valid token
result = await db.execute(
select(User).where(
User.email_verification_token == request.token,
User.email_verification_expires > datetime.utcnow(),
)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired verification token",
)
# Mark as verified
user.is_verified = True
user.email_verification_token = None
user.email_verification_expires = None
await db.commit()
logger.info(f"Email verified for user {user.id}")
return MessageResponse(message="Email verified successfully. You can now log in.")
@router.post("/resend-verification", response_model=MessageResponse)
async def resend_verification(
request: ForgotPasswordRequest, # Reuse schema - just needs email
db: Database,
background_tasks: BackgroundTasks,
):
"""
Resend verification email.
- Rate limited to prevent abuse
- Always returns success (security)
"""
success_message = "If an unverified account with this email exists, a verification link has been sent."
# Look up user
result = await db.execute(
select(User).where(User.email == request.email.lower())
)
user = result.scalar_one_or_none()
if not user or user.is_verified:
return MessageResponse(message=success_message)
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
user.email_verification_token = verification_token
user.email_verification_expires = datetime.utcnow() + timedelta(hours=24)
await db.commit()
# Send verification email
if email_service.is_configured:
site_url = os.getenv("SITE_URL", "http://localhost:3000")
verify_url = f"{site_url}/verify-email?token={verification_token}"
background_tasks.add_task(
email_service.send_email_verification,
to_email=user.email,
user_name=user.name or "there",
verification_url=verify_url,
)
return MessageResponse(message=success_message)