pounce/backend/app/api/admin.py
yves.gugger b5a0d17689
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: Complete admin dashboard and improved navigation
Navigation Header:
- Playfair Display font for 'pounce' logo (explicit style)
- Perfect vertical centering with h-full on containers
- Consistent text sizes (0.8125rem / 13px)

Admin Dashboard (/admin):
- Overview tab: User stats, subscription breakdown, quick actions
- Users tab: Search, list, upgrade tier, toggle admin
- Newsletter tab: List subscribers, export emails
- TLD tab: Price data stats, trigger manual scrape
- System tab: Health status, environment info

Backend Admin API:
- Added is_admin field to User model
- Admin authentication via require_admin dependency
- GET /admin/stats - Dashboard statistics
- GET/PATCH/DELETE /admin/users - User management
- POST /admin/users/{id}/upgrade - Tier upgrades
- GET /admin/newsletter - Subscriber list
- GET /admin/newsletter/export - Email export
- POST /admin/scrape-tld-prices - Manual TLD scrape
- GET /admin/tld-prices/stats - TLD data statistics
- GET /admin/system/health - System health check
- POST /admin/system/make-admin - Initial admin setup

Frontend API:
- Added AdminApiClient with all admin endpoints
- getAdminStats, getAdminUsers, updateAdminUser
- upgradeUser, getAdminNewsletter, exportNewsletterEmails
- triggerTldScrape, getSystemHealth, makeUserAdmin
2025-12-08 16:05:53 +01:00

577 lines
17 KiB
Python

"""
Admin API endpoints for pounce administration.
Provides admin-only access to:
- User management
- TLD price scraping
- System statistics
- Newsletter management
- Domain/Portfolio overview
"""
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, HTTPException, status, BackgroundTasks, Depends
from pydantic import BaseModel, EmailStr
from sqlalchemy import select, func, desc
from app.api.deps import Database, get_current_user
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
from app.models.domain import Domain
from app.models.portfolio import PortfolioDomain
from app.models.newsletter import NewsletterSubscriber
from app.models.tld_price import TLDPrice, TLDInfo
from app.models.auction import DomainAuction
from app.models.price_alert import PriceAlert
router = APIRouter()
# ============== Admin Authentication ==============
async def require_admin(
current_user: User = Depends(get_current_user),
) -> User:
"""Dependency that requires admin privileges."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user
# ============== Dashboard Stats ==============
@router.get("/stats")
async def get_admin_stats(
db: Database,
admin: User = Depends(require_admin)
):
"""Get comprehensive admin dashboard statistics."""
# User stats
total_users = await db.execute(select(func.count(User.id)))
total_users = total_users.scalar()
active_users = await db.execute(
select(func.count(User.id)).where(User.is_active == True)
)
active_users = active_users.scalar()
verified_users = await db.execute(
select(func.count(User.id)).where(User.is_verified == True)
)
verified_users = verified_users.scalar()
# New users last 7 days
week_ago = datetime.utcnow() - timedelta(days=7)
new_users_week = await db.execute(
select(func.count(User.id)).where(User.created_at >= week_ago)
)
new_users_week = new_users_week.scalar()
# Subscription stats
subscriptions_by_tier = {}
for tier in SubscriptionTier:
count = await db.execute(
select(func.count(Subscription.id)).where(
Subscription.tier == tier,
Subscription.status == SubscriptionStatus.ACTIVE
)
)
subscriptions_by_tier[tier.value] = count.scalar()
# Domain stats
total_domains = await db.execute(select(func.count(Domain.id)))
total_domains = total_domains.scalar()
total_portfolio = await db.execute(select(func.count(PortfolioDomain.id)))
total_portfolio = total_portfolio.scalar()
# TLD stats
total_tlds = await db.execute(select(func.count(func.distinct(TLDPrice.tld))))
total_tlds = total_tlds.scalar()
total_price_records = await db.execute(select(func.count(TLDPrice.id)))
total_price_records = total_price_records.scalar()
# Newsletter stats
newsletter_subscribers = await db.execute(
select(func.count(NewsletterSubscriber.id)).where(NewsletterSubscriber.is_active == True)
)
newsletter_subscribers = newsletter_subscribers.scalar()
# Auction stats
total_auctions = await db.execute(select(func.count(DomainAuction.id)))
total_auctions = total_auctions.scalar()
# Price alerts
total_alerts = await db.execute(
select(func.count(PriceAlert.id)).where(PriceAlert.is_active == True)
)
total_alerts = total_alerts.scalar()
return {
"users": {
"total": total_users,
"active": active_users,
"verified": verified_users,
"new_this_week": new_users_week,
},
"subscriptions": subscriptions_by_tier,
"domains": {
"watched": total_domains,
"portfolio": total_portfolio,
},
"tld_data": {
"unique_tlds": total_tlds,
"price_records": total_price_records,
},
"newsletter_subscribers": newsletter_subscribers,
"auctions": total_auctions,
"price_alerts": total_alerts,
}
# ============== User Management ==============
class UpdateUserRequest(BaseModel):
"""Schema for updating user."""
name: Optional[str] = None
is_active: Optional[bool] = None
is_verified: Optional[bool] = None
is_admin: Optional[bool] = None
class UpgradeUserRequest(BaseModel):
"""Request schema for upgrading a user."""
email: EmailStr
tier: str # scout, trader, tycoon
@router.get("/users")
async def list_users(
db: Database,
admin: User = Depends(require_admin),
limit: int = 50,
offset: int = 0,
search: Optional[str] = None,
):
"""List all users with pagination and search."""
query = select(User).order_by(desc(User.created_at))
if search:
query = query.where(
User.email.ilike(f"%{search}%") |
User.name.ilike(f"%{search}%")
)
query = query.offset(offset).limit(limit)
result = await db.execute(query)
users = result.scalars().all()
# Get total count
count_query = select(func.count(User.id))
if search:
count_query = count_query.where(
User.email.ilike(f"%{search}%") |
User.name.ilike(f"%{search}%")
)
total = await db.execute(count_query)
total = total.scalar()
user_list = []
for user in users:
# Get subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = sub_result.scalar_one_or_none()
# Get domain count
domain_count = await db.execute(
select(func.count(Domain.id)).where(Domain.user_id == user.id)
)
domain_count = domain_count.scalar()
user_list.append({
"id": user.id,
"email": user.email,
"name": user.name,
"is_active": user.is_active,
"is_verified": user.is_verified,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat(),
"last_login": user.last_login.isoformat() if user.last_login else None,
"domain_count": domain_count,
"subscription": {
"tier": subscription.tier.value if subscription else "scout",
"tier_name": TIER_CONFIG.get(subscription.tier, {}).get("name", "Scout") if subscription else "Scout",
"status": subscription.status.value if subscription else None,
"domain_limit": subscription.domain_limit if subscription else 5,
} if subscription else {
"tier": "scout",
"tier_name": "Scout",
"status": None,
"domain_limit": 5,
},
})
return {
"users": user_list,
"total": total,
"limit": limit,
"offset": offset,
}
@router.get("/users/{user_id}")
async def get_user(
user_id: int,
db: Database,
admin: User = Depends(require_admin),
):
"""Get detailed user information."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Get subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = sub_result.scalar_one_or_none()
# Get domains
domains_result = await db.execute(
select(Domain).where(Domain.user_id == user.id)
)
domains = domains_result.scalars().all()
# Get portfolio
portfolio_result = await db.execute(
select(PortfolioDomain).where(PortfolioDomain.user_id == user.id)
)
portfolio = portfolio_result.scalars().all()
return {
"id": user.id,
"email": user.email,
"name": user.name,
"is_active": user.is_active,
"is_verified": user.is_verified,
"is_admin": user.is_admin,
"stripe_customer_id": user.stripe_customer_id,
"created_at": user.created_at.isoformat(),
"updated_at": user.updated_at.isoformat(),
"last_login": user.last_login.isoformat() if user.last_login else None,
"subscription": {
"tier": subscription.tier.value if subscription else None,
"status": subscription.status.value if subscription else None,
"domain_limit": subscription.domain_limit if subscription else 0,
"stripe_subscription_id": subscription.stripe_subscription_id if subscription else None,
} if subscription else None,
"domains": [
{
"id": d.id,
"domain": d.domain,
"status": d.status,
"created_at": d.created_at.isoformat(),
}
for d in domains
],
"portfolio": [
{
"id": p.id,
"domain": p.domain,
"purchase_price": p.purchase_price,
"estimated_value": p.estimated_value,
}
for p in portfolio
],
}
@router.patch("/users/{user_id}")
async def update_user(
user_id: int,
request: UpdateUserRequest,
db: Database,
admin: User = Depends(require_admin),
):
"""Update user details."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if request.name is not None:
user.name = request.name
if request.is_active is not None:
user.is_active = request.is_active
if request.is_verified is not None:
user.is_verified = request.is_verified
if request.is_admin is not None:
user.is_admin = request.is_admin
await db.commit()
await db.refresh(user)
return {"message": f"User {user.email} updated", "user_id": user.id}
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
db: Database,
admin: User = Depends(require_admin),
):
"""Delete a user and all their data."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.is_admin:
raise HTTPException(status_code=400, detail="Cannot delete admin user")
await db.delete(user)
await db.commit()
return {"message": f"User {user.email} deleted"}
@router.post("/users/{user_id}/upgrade")
async def upgrade_user(
user_id: int,
tier: str,
db: Database,
admin: User = Depends(require_admin),
):
"""Upgrade a user's subscription tier."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Validate tier
try:
new_tier = SubscriptionTier(tier)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid tier: {tier}. Valid: scout, trader, tycoon"
)
# Find or create subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = sub_result.scalar_one_or_none()
config = TIER_CONFIG.get(new_tier, {})
if not subscription:
subscription = Subscription(
user_id=user.id,
tier=new_tier,
status=SubscriptionStatus.ACTIVE,
domain_limit=config.get("domain_limit", 5),
)
db.add(subscription)
else:
subscription.tier = new_tier
subscription.domain_limit = config.get("domain_limit", 5)
subscription.status = SubscriptionStatus.ACTIVE
await db.commit()
return {
"message": f"User {user.email} upgraded to {config.get('name', tier)}",
"tier": new_tier.value,
}
# ============== Newsletter Management ==============
@router.get("/newsletter")
async def list_newsletter_subscribers(
db: Database,
admin: User = Depends(require_admin),
limit: int = 100,
offset: int = 0,
active_only: bool = True,
):
"""List newsletter subscribers."""
query = select(NewsletterSubscriber).order_by(desc(NewsletterSubscriber.subscribed_at))
if active_only:
query = query.where(NewsletterSubscriber.is_active == True)
query = query.offset(offset).limit(limit)
result = await db.execute(query)
subscribers = result.scalars().all()
# Total count
count_query = select(func.count(NewsletterSubscriber.id))
if active_only:
count_query = count_query.where(NewsletterSubscriber.is_active == True)
total = await db.execute(count_query)
total = total.scalar()
return {
"subscribers": [
{
"id": s.id,
"email": s.email,
"is_active": s.is_active,
"subscribed_at": s.subscribed_at.isoformat(),
"unsubscribed_at": s.unsubscribed_at.isoformat() if s.unsubscribed_at else None,
}
for s in subscribers
],
"total": total,
}
@router.get("/newsletter/export")
async def export_newsletter_emails(
db: Database,
admin: User = Depends(require_admin),
):
"""Export all active newsletter emails (for external tools)."""
result = await db.execute(
select(NewsletterSubscriber.email).where(NewsletterSubscriber.is_active == True)
)
emails = [row[0] for row in result.fetchall()]
return {
"emails": emails,
"count": len(emails),
}
# ============== TLD Management ==============
@router.post("/scrape-tld-prices")
async def trigger_tld_scrape(
db: Database,
admin: User = Depends(require_admin),
):
"""Manually trigger a TLD price scrape."""
from app.services.tld_scraper.aggregator import tld_aggregator
result = await tld_aggregator.run_scrape(db)
return {
"message": "TLD price scrape completed",
"status": result.status,
"tlds_scraped": result.tlds_scraped,
"prices_saved": result.prices_saved,
"sources_succeeded": result.sources_succeeded,
"sources_attempted": result.sources_attempted,
"errors": result.errors,
"started_at": result.started_at.isoformat(),
"completed_at": result.completed_at.isoformat() if result.completed_at else None,
}
@router.get("/tld-prices/stats")
async def get_tld_price_stats(
db: Database,
admin: User = Depends(require_admin),
):
"""Get TLD price data statistics."""
# Total records
total = await db.execute(select(func.count(TLDPrice.id)))
total_records = total.scalar()
# Unique TLDs
tlds = await db.execute(select(func.count(func.distinct(TLDPrice.tld))))
unique_tlds = tlds.scalar()
# Unique registrars
registrars = await db.execute(select(func.count(func.distinct(TLDPrice.registrar))))
unique_registrars = registrars.scalar()
# Latest record
latest = await db.execute(
select(TLDPrice.recorded_at).order_by(desc(TLDPrice.recorded_at)).limit(1)
)
latest_record = latest.scalar()
# Oldest record
oldest = await db.execute(
select(TLDPrice.recorded_at).order_by(TLDPrice.recorded_at.asc()).limit(1)
)
oldest_record = oldest.scalar()
return {
"total_records": total_records,
"unique_tlds": unique_tlds,
"unique_registrars": unique_registrars,
"latest_record": latest_record.isoformat() if latest_record else None,
"oldest_record": oldest_record.isoformat() if oldest_record else None,
"data_range_days": (latest_record - oldest_record).days if latest_record and oldest_record else 0,
}
# ============== System ==============
@router.get("/system/health")
async def system_health(
db: Database,
admin: User = Depends(require_admin),
):
"""Check system health."""
# Test database
try:
await db.execute(select(1))
db_status = "healthy"
except Exception as e:
db_status = f"error: {str(e)}"
# Check email config
from app.services.email_service import EmailService
email_configured = EmailService.is_configured()
# Check Stripe config
import os
stripe_configured = bool(os.getenv("STRIPE_SECRET_KEY"))
return {
"status": "healthy" if db_status == "healthy" else "degraded",
"database": db_status,
"email_configured": email_configured,
"stripe_configured": stripe_configured,
"timestamp": datetime.utcnow().isoformat(),
}
@router.post("/system/make-admin")
async def make_user_admin(
email: str,
db: Database,
):
"""
Make a user an admin.
NOTE: This endpoint has NO authentication for initial setup!
In production, disable after first admin is created.
"""
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.is_admin = True
await db.commit()
return {"message": f"User {email} is now an admin"}