pounce/backend/app/api/blog.py

426 lines
12 KiB
Python

"""
Blog API endpoints.
Public endpoints for reading blog posts.
Admin endpoints for managing blog posts.
"""
import re
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, status, Depends
from pydantic import BaseModel
from sqlalchemy import select, func, desc
from sqlalchemy.orm import selectinload
from app.api.deps import Database, get_current_user, get_current_user_optional
from app.models.user import User
from app.models.blog import BlogPost
from app.services.html_sanitizer import sanitize_html
router = APIRouter()
# ============== Schemas ==============
class BlogPostCreate(BaseModel):
"""Schema for creating a blog post."""
title: str
content: str
excerpt: Optional[str] = None
cover_image: Optional[str] = None
category: Optional[str] = None
tags: Optional[list[str]] = None
meta_title: Optional[str] = None
meta_description: Optional[str] = None
is_published: bool = False
class BlogPostUpdate(BaseModel):
"""Schema for updating a blog post."""
title: Optional[str] = None
content: Optional[str] = None
excerpt: Optional[str] = None
cover_image: Optional[str] = None
category: Optional[str] = None
tags: Optional[list[str]] = None
meta_title: Optional[str] = None
meta_description: Optional[str] = None
is_published: Optional[bool] = None
# ============== Helper Functions ==============
def generate_slug(title: str) -> str:
"""Generate URL-friendly slug from title."""
# Convert to lowercase
slug = title.lower()
# Replace spaces with hyphens
slug = re.sub(r'\s+', '-', slug)
# Remove special characters
slug = re.sub(r'[^a-z0-9\-]', '', slug)
# Remove multiple hyphens
slug = re.sub(r'-+', '-', slug)
# Remove leading/trailing hyphens
slug = slug.strip('-')
return slug
async def require_admin(
current_user: User = Depends(get_current_user),
) -> User:
"""Dependency that requires admin privileges."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user
# ============== Public Endpoints ==============
@router.get("/posts")
async def list_blog_posts(
db: Database,
limit: int = 10,
offset: int = 0,
category: Optional[str] = None,
tag: Optional[str] = None,
):
"""
List published blog posts.
Returns paginated list of published posts with metadata.
"""
query = (
select(BlogPost)
.options(selectinload(BlogPost.author))
.where(BlogPost.is_published == True)
.order_by(desc(BlogPost.published_at))
)
if category:
query = query.where(BlogPost.category == category)
if tag:
query = query.where(BlogPost.tags.ilike(f"%{tag}%"))
query = query.offset(offset).limit(limit)
result = await db.execute(query)
posts = result.scalars().all()
# Total count
count_query = select(func.count(BlogPost.id)).where(BlogPost.is_published == True)
if category:
count_query = count_query.where(BlogPost.category == category)
if tag:
count_query = count_query.where(BlogPost.tags.ilike(f"%{tag}%"))
total = await db.execute(count_query)
total = total.scalar()
return {
"posts": [post.to_dict(include_content=False) for post in posts],
"total": total,
"limit": limit,
"offset": offset,
}
@router.get("/posts/featured")
async def get_featured_posts(
db: Database,
limit: int = 3,
):
"""Get featured/latest blog posts for homepage."""
query = (
select(BlogPost)
.options(selectinload(BlogPost.author))
.where(BlogPost.is_published == True)
.order_by(desc(BlogPost.published_at))
.limit(limit)
)
result = await db.execute(query)
posts = result.scalars().all()
return {
"posts": [post.to_dict(include_content=False) for post in posts]
}
@router.get("/posts/categories")
async def get_categories(db: Database):
"""Get all blog categories with post counts."""
result = await db.execute(
select(BlogPost.category, func.count(BlogPost.id))
.where(BlogPost.is_published == True, BlogPost.category.isnot(None))
.group_by(BlogPost.category)
)
categories = result.all()
return {
"categories": [
{"name": cat, "count": count}
for cat, count in categories
]
}
@router.get("/posts/{slug}")
async def get_blog_post(
slug: str,
db: Database,
):
"""
Get a single blog post by slug.
Increments view count.
"""
result = await db.execute(
select(BlogPost)
.options(selectinload(BlogPost.author))
.where(
BlogPost.slug == slug,
BlogPost.is_published == True
)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Blog post not found"
)
# Increment view count
post.view_count += 1
await db.commit()
data = post.to_dict(include_content=True)
data["content"] = sanitize_html(data.get("content") or "")
return data
# ============== Admin Endpoints ==============
@router.get("/admin/posts")
async def admin_list_posts(
db: Database,
admin: User = Depends(require_admin),
limit: int = 50,
offset: int = 0,
status_filter: Optional[str] = None, # "published", "draft"
):
"""Admin: List all blog posts (including drafts)."""
query = select(BlogPost).options(selectinload(BlogPost.author)).order_by(desc(BlogPost.created_at))
if status_filter == "published":
query = query.where(BlogPost.is_published == True)
elif status_filter == "draft":
query = query.where(BlogPost.is_published == False)
query = query.offset(offset).limit(limit)
result = await db.execute(query)
posts = result.scalars().all()
# Total count
count_query = select(func.count(BlogPost.id))
if status_filter == "published":
count_query = count_query.where(BlogPost.is_published == True)
elif status_filter == "draft":
count_query = count_query.where(BlogPost.is_published == False)
total = await db.execute(count_query)
total = total.scalar()
return {
"posts": [post.to_dict(include_content=False) for post in posts],
"total": total,
}
@router.post("/admin/posts")
async def create_blog_post(
data: BlogPostCreate,
db: Database,
admin: User = Depends(require_admin),
):
"""Admin: Create a new blog post."""
# Generate slug
slug = generate_slug(data.title)
# Check if slug exists
existing = await db.execute(
select(BlogPost).where(BlogPost.slug == slug)
)
if existing.scalar_one_or_none():
# Add timestamp to make unique
slug = f"{slug}-{int(datetime.utcnow().timestamp())}"
post = BlogPost(
title=data.title,
slug=slug,
content=sanitize_html(data.content),
excerpt=data.excerpt,
cover_image=data.cover_image,
category=data.category,
tags=",".join(data.tags) if data.tags else None,
meta_title=data.meta_title,
meta_description=data.meta_description,
is_published=data.is_published,
published_at=datetime.utcnow() if data.is_published else None,
author_id=admin.id,
)
db.add(post)
await db.commit()
await db.refresh(post)
return post.to_dict()
@router.get("/admin/posts/{post_id}")
async def admin_get_post(
post_id: int,
db: Database,
admin: User = Depends(require_admin),
):
"""Admin: Get a single post (including drafts)."""
result = await db.execute(
select(BlogPost)
.options(selectinload(BlogPost.author))
.where(BlogPost.id == post_id)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Blog post not found"
)
return post.to_dict()
@router.patch("/admin/posts/{post_id}")
async def update_blog_post(
post_id: int,
data: BlogPostUpdate,
db: Database,
admin: User = Depends(require_admin),
):
"""Admin: Update a blog post."""
result = await db.execute(
select(BlogPost).where(BlogPost.id == post_id)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Blog post not found"
)
# Update fields
if data.title is not None:
post.title = data.title
# Optionally update slug if title changes
# post.slug = generate_slug(data.title)
if data.content is not None:
post.content = sanitize_html(data.content)
if data.excerpt is not None:
post.excerpt = data.excerpt
if data.cover_image is not None:
post.cover_image = data.cover_image
if data.category is not None:
post.category = data.category
if data.tags is not None:
post.tags = ",".join(data.tags)
if data.meta_title is not None:
post.meta_title = data.meta_title
if data.meta_description is not None:
post.meta_description = data.meta_description
if data.is_published is not None:
was_published = post.is_published
post.is_published = data.is_published
# Set published_at when first published
if data.is_published and not was_published:
post.published_at = datetime.utcnow()
await db.commit()
await db.refresh(post)
return post.to_dict()
@router.delete("/admin/posts/{post_id}")
async def delete_blog_post(
post_id: int,
db: Database,
admin: User = Depends(require_admin),
):
"""Admin: Delete a blog post."""
result = await db.execute(
select(BlogPost).where(BlogPost.id == post_id)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Blog post not found"
)
await db.delete(post)
await db.commit()
return {"message": "Blog post deleted"}
@router.post("/admin/posts/{post_id}/publish")
async def publish_blog_post(
post_id: int,
db: Database,
admin: User = Depends(require_admin),
):
"""Admin: Publish a draft post."""
result = await db.execute(
select(BlogPost).where(BlogPost.id == post_id)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Blog post not found"
)
post.is_published = True
post.published_at = datetime.utcnow()
await db.commit()
return {"message": "Blog post published", "published_at": post.published_at.isoformat()}
@router.post("/admin/posts/{post_id}/unpublish")
async def unpublish_blog_post(
post_id: int,
db: Database,
admin: User = Depends(require_admin),
):
"""Admin: Unpublish a post (make it a draft)."""
result = await db.execute(
select(BlogPost).where(BlogPost.id == post_id)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Blog post not found"
)
post.is_published = False
await db.commit()
return {"message": "Blog post unpublished"}