pounce/backend/app/api/contact.py
yves.gugger 6b6ec01484 feat: Add missing features - Stripe payments, password reset, rate limiting, contact form, newsletter
Backend:
- Add Stripe API endpoints (checkout, portal, webhook) in subscription.py
- Add password reset (forgot-password, reset-password) in auth.py
- Add email verification endpoints
- Add rate limiting with slowapi
- Add contact form and newsletter API (contact.py)
- Add webhook endpoint for Stripe (webhooks.py)
- Add NewsletterSubscriber model
- Extend User model with password reset and email verification tokens
- Extend email_service with new templates (password reset, verification, contact, newsletter)
- Update env.example with all new environment variables

Frontend:
- Add /forgot-password page
- Add /reset-password page with token handling
- Add /verify-email page with auto-verification
- Add forgot password link to login page
- Connect contact form to API
- Add API methods for all new endpoints

Documentation:
- Update README with new API endpoints
- Update environment variables documentation
- Update pages overview
2025-12-08 14:37:42 +01:00

237 lines
6.4 KiB
Python

"""
Contact and Newsletter API endpoints.
Endpoints:
- POST /contact - Submit contact form
- POST /newsletter/subscribe - Subscribe to newsletter
- POST /newsletter/unsubscribe - Unsubscribe from newsletter
Rate Limits:
- Contact form: 5 requests per hour per IP
- Newsletter: 10 requests per hour per IP
"""
import logging
import os
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, status, BackgroundTasks, Request
from pydantic import BaseModel, EmailStr, Field
from sqlalchemy import select, delete
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.api.deps import Database
from app.services.email_service import email_service
from app.models.newsletter import NewsletterSubscriber
logger = logging.getLogger(__name__)
router = APIRouter()
# Rate limiter for contact endpoints
limiter = Limiter(key_func=get_remote_address)
# ============== Schemas ==============
class ContactRequest(BaseModel):
"""Contact form submission."""
name: str = Field(..., min_length=2, max_length=100)
email: EmailStr
subject: str = Field(..., min_length=5, max_length=200)
message: str = Field(..., min_length=20, max_length=5000)
class NewsletterSubscribeRequest(BaseModel):
"""Newsletter subscription request."""
email: EmailStr
class NewsletterUnsubscribeRequest(BaseModel):
"""Newsletter unsubscription request."""
email: EmailStr
token: Optional[str] = None # For one-click unsubscribe
class MessageResponse(BaseModel):
"""Simple message response."""
message: str
success: bool = True
# ============== Contact Endpoints ==============
@router.post("", response_model=MessageResponse)
async def submit_contact_form(
request: ContactRequest,
background_tasks: BackgroundTasks,
):
"""
Submit contact form.
- Sends email to support team
- Sends confirmation to user
- Rate limited to prevent abuse
"""
try:
# Send emails in background
background_tasks.add_task(
email_service.send_contact_form,
name=request.name,
email=request.email,
subject=request.subject,
message=request.message,
)
logger.info(f"Contact form submitted: {request.email} - {request.subject}")
return MessageResponse(
message="Thank you for your message! We'll get back to you soon.",
success=True,
)
except Exception as e:
logger.error(f"Failed to process contact form: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to submit contact form. Please try again later.",
)
# ============== Newsletter Endpoints ==============
@router.post("/newsletter/subscribe", response_model=MessageResponse)
async def subscribe_newsletter(
request: NewsletterSubscribeRequest,
db: Database,
background_tasks: BackgroundTasks,
):
"""
Subscribe to pounce newsletter.
- Stores email in database
- Sends welcome email
- Idempotent (subscribing twice is OK)
"""
email_lower = request.email.lower()
# Check if already subscribed
result = await db.execute(
select(NewsletterSubscriber).where(
NewsletterSubscriber.email == email_lower
)
)
existing = result.scalar_one_or_none()
if existing:
if existing.is_active:
return MessageResponse(
message="You're already subscribed to our newsletter!",
success=True,
)
else:
# Reactivate subscription
existing.is_active = True
existing.subscribed_at = datetime.utcnow()
await db.commit()
background_tasks.add_task(
email_service.send_newsletter_welcome,
to_email=email_lower,
)
return MessageResponse(
message="Welcome back! You've been re-subscribed to our newsletter.",
success=True,
)
# Create new subscription
import secrets
subscriber = NewsletterSubscriber(
email=email_lower,
is_active=True,
unsubscribe_token=secrets.token_urlsafe(32),
)
db.add(subscriber)
await db.commit()
# Send welcome email
background_tasks.add_task(
email_service.send_newsletter_welcome,
to_email=email_lower,
)
logger.info(f"Newsletter subscription: {email_lower}")
return MessageResponse(
message="Thanks for subscribing! Check your inbox for a welcome email.",
success=True,
)
@router.post("/newsletter/unsubscribe", response_model=MessageResponse)
async def unsubscribe_newsletter(
request: NewsletterUnsubscribeRequest,
db: Database,
):
"""
Unsubscribe from pounce newsletter.
- Marks subscription as inactive
- Can use token for one-click unsubscribe
"""
email_lower = request.email.lower()
# Find subscription
query = select(NewsletterSubscriber).where(
NewsletterSubscriber.email == email_lower
)
# If token provided, verify it
if request.token:
query = query.where(
NewsletterSubscriber.unsubscribe_token == request.token
)
result = await db.execute(query)
subscriber = result.scalar_one_or_none()
if not subscriber:
# Always return success (don't reveal if email exists)
return MessageResponse(
message="If you were subscribed, you have been unsubscribed.",
success=True,
)
subscriber.is_active = False
subscriber.unsubscribed_at = datetime.utcnow()
await db.commit()
logger.info(f"Newsletter unsubscription: {email_lower}")
return MessageResponse(
message="You have been unsubscribed from our newsletter.",
success=True,
)
@router.get("/newsletter/status")
async def check_newsletter_status(
email: EmailStr,
db: Database,
):
"""Check if an email is subscribed to the newsletter."""
result = await db.execute(
select(NewsletterSubscriber).where(
NewsletterSubscriber.email == email.lower()
)
)
subscriber = result.scalar_one_or_none()
return {
"email": email,
"subscribed": subscriber is not None and subscriber.is_active,
}