Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
290 lines
8.2 KiB
Python
290 lines
8.2 KiB
Python
"""
|
|
Contact and Newsletter API endpoints.
|
|
|
|
Endpoints:
|
|
- POST /contact - Submit contact form
|
|
- POST /newsletter/subscribe - Subscribe to newsletter
|
|
- POST /newsletter/unsubscribe - Unsubscribe from newsletter
|
|
|
|
Rate Limits:
|
|
- Contact form: 5 requests per hour per IP
|
|
- Newsletter: 10 requests per hour per IP
|
|
"""
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, HTTPException, status, BackgroundTasks, Request
|
|
from fastapi.responses import HTMLResponse
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
from sqlalchemy import select, delete
|
|
from slowapi import Limiter
|
|
from slowapi.util import get_remote_address
|
|
from urllib.parse import urlencode
|
|
|
|
from app.api.deps import Database
|
|
from app.services.email_service import email_service
|
|
from app.models.newsletter import NewsletterSubscriber
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
# Rate limiter for contact endpoints
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
def _build_unsubscribe_url(email: str, token: str) -> str:
|
|
base = os.getenv("SITE_URL", "https://pounce.ch").rstrip("/")
|
|
query = urlencode({"email": email, "token": token})
|
|
return f"{base}/api/v1/contact/newsletter/unsubscribe?{query}"
|
|
|
|
|
|
# ============== Schemas ==============
|
|
|
|
class ContactRequest(BaseModel):
|
|
"""Contact form submission."""
|
|
name: str = Field(..., min_length=2, max_length=100)
|
|
email: EmailStr
|
|
subject: str = Field(..., min_length=5, max_length=200)
|
|
message: str = Field(..., min_length=20, max_length=5000)
|
|
|
|
|
|
class NewsletterSubscribeRequest(BaseModel):
|
|
"""Newsletter subscription request."""
|
|
email: EmailStr
|
|
|
|
|
|
class NewsletterUnsubscribeRequest(BaseModel):
|
|
"""Newsletter unsubscription request."""
|
|
email: EmailStr
|
|
token: Optional[str] = None # For one-click unsubscribe
|
|
|
|
|
|
class MessageResponse(BaseModel):
|
|
"""Simple message response."""
|
|
message: str
|
|
success: bool = True
|
|
|
|
|
|
# ============== Contact Endpoints ==============
|
|
|
|
@router.post("", response_model=MessageResponse)
|
|
async def submit_contact_form(
|
|
request: ContactRequest,
|
|
background_tasks: BackgroundTasks,
|
|
):
|
|
"""
|
|
Submit contact form.
|
|
|
|
- Sends email to support team
|
|
- Sends confirmation to user
|
|
- Rate limited to prevent abuse
|
|
"""
|
|
try:
|
|
# Send emails in background
|
|
background_tasks.add_task(
|
|
email_service.send_contact_form,
|
|
name=request.name,
|
|
email=request.email,
|
|
subject=request.subject,
|
|
message=request.message,
|
|
)
|
|
|
|
logger.info(f"Contact form submitted: {request.email} - {request.subject}")
|
|
|
|
return MessageResponse(
|
|
message="Thank you for your message! We'll get back to you soon.",
|
|
success=True,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process contact form: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to submit contact form. Please try again later.",
|
|
)
|
|
|
|
|
|
# ============== Newsletter Endpoints ==============
|
|
|
|
@router.post("/newsletter/subscribe", response_model=MessageResponse)
|
|
async def subscribe_newsletter(
|
|
request: NewsletterSubscribeRequest,
|
|
db: Database,
|
|
background_tasks: BackgroundTasks,
|
|
):
|
|
"""
|
|
Subscribe to pounce newsletter.
|
|
|
|
- Stores email in database
|
|
- Sends welcome email
|
|
- Idempotent (subscribing twice is OK)
|
|
"""
|
|
email_lower = request.email.lower()
|
|
|
|
# Check if already subscribed
|
|
result = await db.execute(
|
|
select(NewsletterSubscriber).where(
|
|
NewsletterSubscriber.email == email_lower
|
|
)
|
|
)
|
|
existing = result.scalar_one_or_none()
|
|
|
|
if existing:
|
|
if existing.is_active:
|
|
return MessageResponse(
|
|
message="You're already subscribed to our newsletter!",
|
|
success=True,
|
|
)
|
|
else:
|
|
# Reactivate subscription
|
|
existing.is_active = True
|
|
existing.subscribed_at = datetime.utcnow()
|
|
await db.commit()
|
|
|
|
background_tasks.add_task(
|
|
email_service.send_newsletter_welcome,
|
|
to_email=email_lower,
|
|
unsubscribe_url=_build_unsubscribe_url(email_lower, existing.unsubscribe_token),
|
|
)
|
|
|
|
return MessageResponse(
|
|
message="Welcome back! You've been re-subscribed to our newsletter.",
|
|
success=True,
|
|
)
|
|
|
|
# Create new subscription
|
|
import secrets
|
|
subscriber = NewsletterSubscriber(
|
|
email=email_lower,
|
|
is_active=True,
|
|
unsubscribe_token=secrets.token_urlsafe(32),
|
|
)
|
|
db.add(subscriber)
|
|
await db.commit()
|
|
|
|
# Send welcome email
|
|
background_tasks.add_task(
|
|
email_service.send_newsletter_welcome,
|
|
to_email=email_lower,
|
|
unsubscribe_url=_build_unsubscribe_url(email_lower, subscriber.unsubscribe_token),
|
|
)
|
|
|
|
logger.info(f"Newsletter subscription: {email_lower}")
|
|
|
|
return MessageResponse(
|
|
message="Thanks for subscribing! Check your inbox for a welcome email.",
|
|
success=True,
|
|
)
|
|
|
|
|
|
@router.post("/newsletter/unsubscribe", response_model=MessageResponse)
|
|
async def unsubscribe_newsletter(
|
|
request: NewsletterUnsubscribeRequest,
|
|
db: Database,
|
|
):
|
|
"""
|
|
Unsubscribe from pounce newsletter.
|
|
|
|
- Marks subscription as inactive
|
|
- Can use token for one-click unsubscribe
|
|
"""
|
|
email_lower = request.email.lower()
|
|
|
|
# Find subscription
|
|
query = select(NewsletterSubscriber).where(
|
|
NewsletterSubscriber.email == email_lower
|
|
)
|
|
|
|
# If token provided, verify it
|
|
if request.token:
|
|
query = query.where(
|
|
NewsletterSubscriber.unsubscribe_token == request.token
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
subscriber = result.scalar_one_or_none()
|
|
|
|
if not subscriber:
|
|
# Always return success (don't reveal if email exists)
|
|
return MessageResponse(
|
|
message="If you were subscribed, you have been unsubscribed.",
|
|
success=True,
|
|
)
|
|
|
|
subscriber.is_active = False
|
|
subscriber.unsubscribed_at = datetime.utcnow()
|
|
await db.commit()
|
|
|
|
logger.info(f"Newsletter unsubscription: {email_lower}")
|
|
|
|
return MessageResponse(
|
|
message="You have been unsubscribed from our newsletter.",
|
|
success=True,
|
|
)
|
|
|
|
|
|
@router.get("/newsletter/unsubscribe")
|
|
async def unsubscribe_newsletter_one_click(
|
|
email: EmailStr,
|
|
token: str,
|
|
db: Database,
|
|
):
|
|
"""
|
|
One-click unsubscribe endpoint (for List-Unsubscribe header).
|
|
Always returns 200 with a human-readable HTML response.
|
|
"""
|
|
email_lower = email.lower()
|
|
result = await db.execute(
|
|
select(NewsletterSubscriber).where(
|
|
NewsletterSubscriber.email == email_lower,
|
|
NewsletterSubscriber.unsubscribe_token == token,
|
|
)
|
|
)
|
|
subscriber = result.scalar_one_or_none()
|
|
if subscriber and subscriber.is_active:
|
|
subscriber.is_active = False
|
|
subscriber.unsubscribed_at = datetime.utcnow()
|
|
await db.commit()
|
|
|
|
return HTMLResponse(
|
|
content="""
|
|
<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8" />
|
|
<meta name="viewport" content="width=device-width, initial-scale=1" />
|
|
<title>Unsubscribed</title>
|
|
</head>
|
|
<body style="font-family: system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial, sans-serif; padding: 32px;">
|
|
<h1 style="margin: 0 0 12px 0;">You are unsubscribed.</h1>
|
|
<p style="margin: 0; color: #555;">
|
|
If you were subscribed, you will no longer receive pounce insights emails.
|
|
</p>
|
|
</body>
|
|
</html>
|
|
""".strip(),
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@router.get("/newsletter/status")
|
|
async def check_newsletter_status(
|
|
email: EmailStr,
|
|
db: Database,
|
|
):
|
|
"""Check if an email is subscribed to the newsletter."""
|
|
result = await db.execute(
|
|
select(NewsletterSubscriber).where(
|
|
NewsletterSubscriber.email == email.lower()
|
|
)
|
|
)
|
|
subscriber = result.scalar_one_or_none()
|
|
|
|
return {
|
|
"email": email,
|
|
"subscribed": subscriber is not None and subscriber.is_active,
|
|
}
|
|
|