pounce/backend/app/api/admin.py
Yves Gugger 6a56360f56
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Watchlist: Fixed layout order + TLD Detail: Full techy mobile design
2025-12-13 15:22:45 +01:00

1183 lines
36 KiB
Python

"""
Admin API endpoints for pounce administration.
Provides admin-only access to:
- User management
- TLD price scraping
- System statistics
- Newsletter management
- Domain/Portfolio overview
"""
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, status, Depends, BackgroundTasks
from pydantic import BaseModel, EmailStr
from sqlalchemy import select, func, desc
from app.api.deps import Database, get_current_user
from app.config import get_settings
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
from app.models.domain import Domain
from app.models.portfolio import PortfolioDomain
from app.models.newsletter import NewsletterSubscriber
from app.models.tld_price import TLDPrice, TLDInfo
from app.models.auction import DomainAuction
from app.models.price_alert import PriceAlert
router = APIRouter()
settings = get_settings()
# ============== Admin Authentication ==============
async def require_admin(
current_user: User = Depends(get_current_user),
) -> User:
"""Dependency that requires admin privileges."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user
# ============== Scraping Ops (Server-only, free alternative to paid proxies) ==============
class PlaywrightCookiesUpload(BaseModel):
"""Upload Playwright cookies JSON used by protected scrapers (e.g. NameJet)."""
cookies: list[dict]
@router.post("/scraping/playwright-cookies")
async def upload_playwright_cookies(
payload: PlaywrightCookiesUpload,
admin: User = Depends(require_admin),
):
"""Replace the server's Playwright cookie jar file."""
cookie_dir = Path(__file__).parent.parent / "data" / "cookies"
cookie_dir.mkdir(parents=True, exist_ok=True)
cookie_file = cookie_dir / "session_cookies.json"
if not payload.cookies:
raise HTTPException(status_code=400, detail="cookies must not be empty")
try:
import json
cookie_file.write_text(json.dumps(payload.cookies, indent=2))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to write cookie file: {e}")
return {
"status": "ok",
"cookies_count": len(payload.cookies),
"updated_at": datetime.utcnow().isoformat(),
"note": "Enable protected scraping with POUNCE_ENABLE_PROTECTED_SCRAPERS=true",
}
@router.get("/scraping/playwright-cookies")
async def get_playwright_cookie_status(
admin: User = Depends(require_admin),
):
"""Return Playwright cookie jar status (no contents)."""
cookie_dir = Path(__file__).parent.parent / "data" / "cookies"
cookie_file = cookie_dir / "session_cookies.json"
if not cookie_file.exists():
return {"exists": False}
stat = cookie_file.stat()
return {
"exists": True,
"path": str(cookie_file),
"size_bytes": stat.st_size,
"modified_at": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
}
# ============== Dashboard Stats ==============
@router.get("/stats")
async def get_admin_stats(
db: Database,
admin: User = Depends(require_admin)
):
"""Get comprehensive admin dashboard statistics."""
# User stats
total_users = await db.execute(select(func.count(User.id)))
total_users = total_users.scalar()
active_users = await db.execute(
select(func.count(User.id)).where(User.is_active == True)
)
active_users = active_users.scalar()
verified_users = await db.execute(
select(func.count(User.id)).where(User.is_verified == True)
)
verified_users = verified_users.scalar()
# New users last 7 days
week_ago = datetime.utcnow() - timedelta(days=7)
new_users_week = await db.execute(
select(func.count(User.id)).where(User.created_at >= week_ago)
)
new_users_week = new_users_week.scalar()
# Subscription stats
subscriptions_by_tier = {}
for tier in SubscriptionTier:
count = await db.execute(
select(func.count(Subscription.id)).where(
Subscription.tier == tier,
Subscription.status == SubscriptionStatus.ACTIVE
)
)
subscriptions_by_tier[tier.value] = count.scalar()
# Domain stats
total_domains = await db.execute(select(func.count(Domain.id)))
total_domains = total_domains.scalar()
total_portfolio = await db.execute(select(func.count(PortfolioDomain.id)))
total_portfolio = total_portfolio.scalar()
# TLD stats
total_tlds = await db.execute(select(func.count(func.distinct(TLDPrice.tld))))
total_tlds = total_tlds.scalar()
total_price_records = await db.execute(select(func.count(TLDPrice.id)))
total_price_records = total_price_records.scalar()
# Newsletter stats
newsletter_subscribers = await db.execute(
select(func.count(NewsletterSubscriber.id)).where(NewsletterSubscriber.is_active == True)
)
newsletter_subscribers = newsletter_subscribers.scalar()
# Auction stats
total_auctions = await db.execute(select(func.count(DomainAuction.id)))
total_auctions = total_auctions.scalar()
# Price alerts
total_alerts = await db.execute(
select(func.count(PriceAlert.id)).where(PriceAlert.is_active == True)
)
total_alerts = total_alerts.scalar()
return {
"users": {
"total": total_users,
"active": active_users,
"verified": verified_users,
"new_this_week": new_users_week,
},
"subscriptions": subscriptions_by_tier,
"domains": {
"watched": total_domains,
"portfolio": total_portfolio,
},
"tld_data": {
"unique_tlds": total_tlds,
"price_records": total_price_records,
},
"newsletter_subscribers": newsletter_subscribers,
"auctions": total_auctions,
"price_alerts": total_alerts,
}
# ============== User Management ==============
class UpdateUserRequest(BaseModel):
"""Schema for updating user."""
name: Optional[str] = None
is_active: Optional[bool] = None
is_verified: Optional[bool] = None
is_admin: Optional[bool] = None
class UpgradeUserRequest(BaseModel):
"""Request schema for upgrading a user."""
email: EmailStr
tier: str # scout, trader, tycoon
@router.get("/users")
async def list_users(
db: Database,
admin: User = Depends(require_admin),
limit: int = 50,
offset: int = 0,
search: Optional[str] = None,
):
"""List all users with pagination and search."""
# PERF: Avoid N+1 queries (subscription + domain_count per user).
domain_counts = (
select(
Domain.user_id.label("user_id"),
func.count(Domain.id).label("domain_count"),
)
.group_by(Domain.user_id)
.subquery()
)
base = (
select(
User,
Subscription,
func.coalesce(domain_counts.c.domain_count, 0).label("domain_count"),
)
.outerjoin(Subscription, Subscription.user_id == User.id)
.outerjoin(domain_counts, domain_counts.c.user_id == User.id)
)
if search:
base = base.where(
User.email.ilike(f"%{search}%") | User.name.ilike(f"%{search}%")
)
# Total count (for pagination UI)
count_query = select(func.count(User.id))
if search:
count_query = count_query.where(
User.email.ilike(f"%{search}%") | User.name.ilike(f"%{search}%")
)
total = (await db.execute(count_query)).scalar() or 0
result = await db.execute(
base.order_by(desc(User.created_at)).offset(offset).limit(limit)
)
rows = result.all()
user_list = []
for user, subscription, domain_count in rows:
user_list.append(
{
"id": user.id,
"email": user.email,
"name": user.name,
"is_active": user.is_active,
"is_verified": user.is_verified,
"is_admin": user.is_admin,
"created_at": user.created_at.isoformat(),
"last_login": user.last_login.isoformat() if user.last_login else None,
"domain_count": int(domain_count or 0),
"subscription": {
"tier": subscription.tier.value if subscription else "scout",
"tier_name": TIER_CONFIG.get(subscription.tier, {}).get("name", "Scout") if subscription else "Scout",
"status": subscription.status.value if subscription else None,
"domain_limit": subscription.domain_limit if subscription else 5,
} if subscription else {
"tier": "scout",
"tier_name": "Scout",
"status": None,
"domain_limit": 5,
},
}
)
return {"users": user_list, "total": total, "limit": limit, "offset": offset}
# ============== User Export ==============
# NOTE: This must come BEFORE /users/{user_id} to avoid route conflict
@router.get("/users/export")
async def export_users_csv(
db: Database,
admin: User = Depends(require_admin),
):
"""Export all users as CSV data."""
import csv
import io
domain_counts = (
select(
Domain.user_id.label("user_id"),
func.count(Domain.id).label("domain_count"),
)
.group_by(Domain.user_id)
.subquery()
)
result = await db.execute(
select(
User,
Subscription,
func.coalesce(domain_counts.c.domain_count, 0).label("domain_count"),
)
.outerjoin(Subscription, Subscription.user_id == User.id)
.outerjoin(domain_counts, domain_counts.c.user_id == User.id)
.order_by(User.created_at)
)
users_list = result.all()
# Create CSV
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
"ID", "Email", "Name", "Active", "Verified", "Admin",
"Created At", "Last Login", "Tier", "Domain Limit", "Domains Used"
])
for user, subscription, domain_count in users_list:
writer.writerow([
user.id,
user.email,
user.name or "",
"Yes" if user.is_active else "No",
"Yes" if user.is_verified else "No",
"Yes" if user.is_admin else "No",
user.created_at.strftime("%Y-%m-%d %H:%M"),
user.last_login.strftime("%Y-%m-%d %H:%M") if user.last_login else "",
subscription.tier.value if subscription else "scout",
subscription.domain_limit if subscription else 5,
int(domain_count or 0),
])
return {
"csv": output.getvalue(),
"count": len(users_list),
"exported_at": datetime.utcnow().isoformat(),
}
@router.get("/users/{user_id}")
async def get_user(
user_id: int,
db: Database,
admin: User = Depends(require_admin),
):
"""Get detailed user information."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Get subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = sub_result.scalar_one_or_none()
# Get domains
domains_result = await db.execute(
select(Domain).where(Domain.user_id == user.id)
)
domains = domains_result.scalars().all()
# Get portfolio
portfolio_result = await db.execute(
select(PortfolioDomain).where(PortfolioDomain.user_id == user.id)
)
portfolio = portfolio_result.scalars().all()
return {
"id": user.id,
"email": user.email,
"name": user.name,
"is_active": user.is_active,
"is_verified": user.is_verified,
"is_admin": user.is_admin,
"stripe_customer_id": user.stripe_customer_id,
"created_at": user.created_at.isoformat(),
"updated_at": user.updated_at.isoformat(),
"last_login": user.last_login.isoformat() if user.last_login else None,
"subscription": {
"tier": subscription.tier.value if subscription else None,
"status": subscription.status.value if subscription else None,
"domain_limit": subscription.domain_limit if subscription else 0,
"stripe_subscription_id": subscription.stripe_subscription_id if subscription else None,
} if subscription else None,
"domains": [
{
"id": d.id,
"domain": d.domain,
"status": d.status,
"created_at": d.created_at.isoformat(),
}
for d in domains
],
"portfolio": [
{
"id": p.id,
"domain": p.domain,
"purchase_price": p.purchase_price,
"estimated_value": p.estimated_value,
}
for p in portfolio
],
}
@router.patch("/users/{user_id}")
async def update_user(
user_id: int,
request: UpdateUserRequest,
db: Database,
admin: User = Depends(require_admin),
):
"""Update user details."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if request.name is not None:
user.name = request.name
if request.is_active is not None:
user.is_active = request.is_active
if request.is_verified is not None:
user.is_verified = request.is_verified
if request.is_admin is not None:
user.is_admin = request.is_admin
await db.commit()
await db.refresh(user)
return {"message": f"User {user.email} updated", "user_id": user.id}
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
db: Database,
admin: User = Depends(require_admin),
):
"""Delete a user and all their data."""
from app.models.blog import BlogPost
from app.models.admin_log import AdminActivityLog
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.is_admin:
raise HTTPException(status_code=400, detail="Cannot delete admin user")
user_email = user.email
# Delete user's blog posts (or set author_id to NULL if you want to keep them)
await db.execute(
BlogPost.__table__.delete().where(BlogPost.author_id == user_id)
)
# Delete user's admin activity logs (if any)
await db.execute(
AdminActivityLog.__table__.delete().where(AdminActivityLog.admin_id == user_id)
)
# Now delete the user (cascades to domains, subscriptions, portfolio, price_alerts)
await db.delete(user)
await db.commit()
# Log this action
await log_admin_activity(
db, admin.id, "user_delete",
f"Deleted user {user_email} and all their data"
)
return {"message": f"User {user_email} and all their data have been deleted"}
@router.post("/users/{user_id}/upgrade")
async def upgrade_user(
user_id: int,
tier: str,
db: Database,
admin: User = Depends(require_admin),
):
"""Upgrade a user's subscription tier."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Validate tier
try:
new_tier = SubscriptionTier(tier)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid tier: {tier}. Valid: scout, trader, tycoon"
)
# Find or create subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = sub_result.scalar_one_or_none()
config = TIER_CONFIG.get(new_tier, {})
if not subscription:
subscription = Subscription(
user_id=user.id,
tier=new_tier,
status=SubscriptionStatus.ACTIVE,
domain_limit=config.get("domain_limit", 5),
)
db.add(subscription)
else:
subscription.tier = new_tier
subscription.domain_limit = config.get("domain_limit", 5)
subscription.status = SubscriptionStatus.ACTIVE
await db.commit()
return {
"message": f"User {user.email} upgraded to {config.get('name', tier)}",
"tier": new_tier.value,
}
# ============== Newsletter Management ==============
@router.get("/newsletter")
async def list_newsletter_subscribers(
db: Database,
admin: User = Depends(require_admin),
limit: int = 100,
offset: int = 0,
active_only: bool = True,
):
"""List newsletter subscribers."""
query = select(NewsletterSubscriber).order_by(desc(NewsletterSubscriber.subscribed_at))
if active_only:
query = query.where(NewsletterSubscriber.is_active == True)
query = query.offset(offset).limit(limit)
result = await db.execute(query)
subscribers = result.scalars().all()
# Total count
count_query = select(func.count(NewsletterSubscriber.id))
if active_only:
count_query = count_query.where(NewsletterSubscriber.is_active == True)
total = await db.execute(count_query)
total = total.scalar()
return {
"subscribers": [
{
"id": s.id,
"email": s.email,
"is_active": s.is_active,
"subscribed_at": s.subscribed_at.isoformat(),
"unsubscribed_at": s.unsubscribed_at.isoformat() if s.unsubscribed_at else None,
}
for s in subscribers
],
"total": total,
}
@router.get("/newsletter/export")
async def export_newsletter_emails(
db: Database,
admin: User = Depends(require_admin),
):
"""Export all active newsletter emails (for external tools)."""
result = await db.execute(
select(NewsletterSubscriber.email).where(NewsletterSubscriber.is_active == True)
)
emails = [row[0] for row in result.fetchall()]
return {
"emails": emails,
"count": len(emails),
}
# ============== TLD Management ==============
@router.post("/scrape-tld-prices")
async def trigger_tld_scrape(
db: Database,
admin: User = Depends(require_admin),
):
"""Manually trigger a TLD price scrape."""
# Prefer job queue in production (non-blocking)
if settings.enable_job_queue and settings.redis_url:
from app.jobs.client import enqueue_job
job_id = await enqueue_job("scrape_tld_prices")
return {"message": "TLD price scrape enqueued", "job_id": job_id}
# Fallback: run inline
from app.services.tld_scraper.aggregator import tld_aggregator
result = await tld_aggregator.run_scrape(db)
return {
"message": "TLD price scrape completed",
"status": result.status,
"tlds_scraped": result.tlds_scraped,
"prices_saved": result.prices_saved,
"sources_succeeded": result.sources_succeeded,
"sources_attempted": result.sources_attempted,
"errors": result.errors,
"started_at": result.started_at.isoformat(),
"completed_at": result.completed_at.isoformat() if result.completed_at else None,
}
@router.get("/tld-prices/stats")
async def get_tld_price_stats(
db: Database,
admin: User = Depends(require_admin),
):
"""Get TLD price data statistics."""
# Total records
total = await db.execute(select(func.count(TLDPrice.id)))
total_records = total.scalar()
# Unique TLDs
tlds = await db.execute(select(func.count(func.distinct(TLDPrice.tld))))
unique_tlds = tlds.scalar()
# Unique registrars
registrars = await db.execute(select(func.count(func.distinct(TLDPrice.registrar))))
unique_registrars = registrars.scalar()
# Latest record
latest = await db.execute(
select(TLDPrice.recorded_at).order_by(desc(TLDPrice.recorded_at)).limit(1)
)
latest_record = latest.scalar()
# Oldest record
oldest = await db.execute(
select(TLDPrice.recorded_at).order_by(TLDPrice.recorded_at.asc()).limit(1)
)
oldest_record = oldest.scalar()
return {
"total_records": total_records,
"unique_tlds": unique_tlds,
"unique_registrars": unique_registrars,
"latest_record": latest_record.isoformat() if latest_record else None,
"oldest_record": oldest_record.isoformat() if oldest_record else None,
"data_range_days": (latest_record - oldest_record).days if latest_record and oldest_record else 0,
}
# ============== System ==============
@router.get("/system/health")
async def system_health(
db: Database,
admin: User = Depends(require_admin),
):
"""Check system health."""
# Test database
try:
await db.execute(select(1))
db_status = "healthy"
except Exception as e:
db_status = f"error: {str(e)}"
# Check email config
from app.services.email_service import EmailService
email_configured = EmailService.is_configured()
# Check Stripe config
import os
stripe_configured = bool(os.getenv("STRIPE_SECRET_KEY"))
return {
"status": "healthy" if db_status == "healthy" else "degraded",
"database": db_status,
"email_configured": email_configured,
"stripe_configured": stripe_configured,
"timestamp": datetime.utcnow().isoformat(),
}
@router.post("/system/make-admin")
async def make_user_admin(
email: str,
db: Database,
):
"""
Make a user an admin.
NOTE: This endpoint has NO authentication for initial setup!
In production, disable after first admin is created.
"""
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.is_admin = True
await db.commit()
return {"message": f"User {email} is now an admin"}
# ============== Price Alerts ==============
@router.get("/price-alerts")
async def list_price_alerts(
db: Database,
admin: User = Depends(require_admin),
limit: int = 100,
offset: int = 0,
):
"""List all active price alerts with user info."""
query = (
select(PriceAlert, User)
.join(User, PriceAlert.user_id == User.id)
.where(PriceAlert.is_active == True)
.order_by(desc(PriceAlert.created_at))
.offset(offset)
.limit(limit)
)
result = await db.execute(query)
alerts = result.all()
# Total count
count_query = select(func.count(PriceAlert.id)).where(PriceAlert.is_active == True)
total = await db.execute(count_query)
total = total.scalar()
return {
"alerts": [
{
"id": alert.id,
"tld": alert.tld,
"target_price": float(alert.target_price) if alert.target_price else None,
"alert_type": alert.alert_type,
"created_at": alert.created_at.isoformat(),
"user": {
"id": user.id,
"email": user.email,
"name": user.name,
}
}
for alert, user in alerts
],
"total": total,
}
# ============== Domain Health ==============
@router.post("/domains/check-all")
async def trigger_domain_checks(
background_tasks: BackgroundTasks,
db: Database,
admin: User = Depends(require_admin),
):
"""Manually trigger domain availability checks for all watched domains."""
from app.services.domain_checker import check_all_domains
# Count domains to check
total_domains = await db.execute(select(func.count(Domain.id)))
total_domains = total_domains.scalar()
if total_domains == 0:
return {"message": "No domains to check", "domains_queued": 0}
# Run in background
background_tasks.add_task(check_all_domains, db)
return {
"message": "Domain checks started",
"domains_queued": total_domains,
"started_at": datetime.utcnow().isoformat(),
}
# ============== Email Test ==============
@router.post("/system/test-email")
async def test_email(
db: Database,
admin: User = Depends(require_admin),
):
"""Send a test email to the admin user."""
from app.services.email_service import email_service
if not email_service.is_configured():
raise HTTPException(
status_code=400,
detail="Email service is not configured. Check SMTP settings."
)
try:
await email_service.send_email(
to_email=admin.email,
subject="pounce Admin Panel - Test Email",
html_content=f"""
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto;">
<h1 style="color: #22c55e;">✅ Email Test Successful</h1>
<p>This is a test email from the pounce Admin Panel.</p>
<p>If you received this, your SMTP configuration is working correctly.</p>
<hr style="border: none; border-top: 1px solid #e5e5e5; margin: 20px 0;">
<p style="color: #666; font-size: 12px;">
Sent at: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}<br>
Admin: {admin.email}
</p>
</div>
""",
text_content=f"Email Test Successful\n\nThis is a test email from the pounce Admin Panel.\nSent at: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}"
)
return {
"message": "Test email sent successfully",
"sent_to": admin.email,
"timestamp": datetime.utcnow().isoformat(),
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to send email: {str(e)}"
)
# ============== Scheduler Status ==============
@router.get("/system/scheduler")
async def get_scheduler_status(
db: Database,
admin: User = Depends(require_admin),
):
"""Get scheduler job status and last run times."""
from app.scheduler import scheduler
jobs = []
for job in scheduler.get_jobs():
jobs.append({
"id": job.id,
"name": job.name,
"next_run": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger),
})
# Get last scrape times from database
# TLD scrape - check latest TLD price record
latest_tld = await db.execute(
select(TLDPrice.recorded_at).order_by(desc(TLDPrice.recorded_at)).limit(1)
)
latest_tld = latest_tld.scalar()
# Auction scrape - check latest auction record
latest_auction = await db.execute(
select(DomainAuction.scraped_at).order_by(desc(DomainAuction.scraped_at)).limit(1)
)
latest_auction = latest_auction.scalar()
# Domain check - check latest domain check (via Domain.last_checked)
latest_domain_check = await db.execute(
select(Domain.last_checked).where(Domain.last_checked.isnot(None)).order_by(desc(Domain.last_checked)).limit(1)
)
latest_domain_check = latest_domain_check.scalar()
return {
"scheduler_running": scheduler.running,
"jobs": jobs,
"last_runs": {
"tld_scrape": latest_tld.isoformat() if latest_tld else None,
"auction_scrape": latest_auction.isoformat() if latest_auction else None,
"domain_check": latest_domain_check.isoformat() if latest_domain_check else None,
},
"timestamp": datetime.utcnow().isoformat(),
}
# ============== Bulk Operations ==============
class BulkUpgradeRequest(BaseModel):
"""Request for bulk tier upgrade."""
user_ids: list[int]
tier: str
@router.post("/users/bulk-upgrade")
async def bulk_upgrade_users(
request: BulkUpgradeRequest,
db: Database,
admin: User = Depends(require_admin),
):
"""Upgrade multiple users to a specific tier."""
# Validate tier
try:
new_tier = SubscriptionTier(request.tier)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid tier: {request.tier}. Valid: scout, trader, tycoon"
)
config = TIER_CONFIG.get(new_tier, {})
upgraded = []
failed = []
for user_id in request.user_ids:
try:
# Get user
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
failed.append({"user_id": user_id, "reason": "User not found"})
continue
# Get or create subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = sub_result.scalar_one_or_none()
if not subscription:
subscription = Subscription(
user_id=user.id,
tier=new_tier,
status=SubscriptionStatus.ACTIVE,
max_domains=config.get("domain_limit", 5),
)
db.add(subscription)
else:
subscription.tier = new_tier
subscription.max_domains = config.get("domain_limit", 5)
subscription.status = SubscriptionStatus.ACTIVE
upgraded.append({"user_id": user_id, "email": user.email})
except Exception as e:
failed.append({"user_id": user_id, "reason": str(e)})
await db.commit()
# Log activity
await log_admin_activity(
db, admin.id, "bulk_upgrade",
f"Upgraded {len(upgraded)} users to {new_tier.value}"
)
return {
"message": f"Bulk upgrade completed",
"tier": new_tier.value,
"upgraded": upgraded,
"failed": failed,
"total_upgraded": len(upgraded),
"total_failed": len(failed),
}
# ============== Activity Log ==============
async def log_admin_activity(
db: Database,
admin_id: int,
action: str,
details: str,
):
"""Helper to log admin activities."""
from app.models.admin_log import AdminActivityLog
try:
log = AdminActivityLog(
admin_id=admin_id,
action=action,
details=details,
)
db.add(log)
await db.commit()
except Exception:
# Don't fail if logging fails
pass
@router.get("/activity-log")
async def get_activity_log(
db: Database,
admin: User = Depends(require_admin),
limit: int = 50,
offset: int = 0,
):
"""Get admin activity log."""
from app.models.admin_log import AdminActivityLog
query = (
select(AdminActivityLog, User)
.join(User, AdminActivityLog.admin_id == User.id)
.order_by(desc(AdminActivityLog.created_at))
.offset(offset)
.limit(limit)
)
try:
result = await db.execute(query)
logs = result.all()
except Exception:
# Table might not exist yet
return {"logs": [], "total": 0}
# Total count
try:
count_query = select(func.count(AdminActivityLog.id))
total = await db.execute(count_query)
total = total.scalar()
except Exception:
total = 0
return {
"logs": [
{
"id": log.id,
"action": log.action,
"details": log.details,
"created_at": log.created_at.isoformat(),
"admin": {
"id": user.id,
"email": user.email,
"name": user.name,
}
}
for log, user in logs
],
"total": total,
}
# ============== API Connection Tests ==============
@router.get("/test-apis")
async def test_external_apis(
admin: User = Depends(require_admin),
):
"""
Test connections to all external APIs.
Returns status of:
- DropCatch API
- Sedo API
- Moz API (if configured)
"""
from app.services.dropcatch_api import dropcatch_client
from app.services.sedo_api import sedo_client
results = {
"tested_at": datetime.utcnow().isoformat(),
"apis": {}
}
# Test DropCatch API
try:
dropcatch_result = await dropcatch_client.test_connection()
results["apis"]["dropcatch"] = dropcatch_result
except Exception as e:
results["apis"]["dropcatch"] = {
"success": False,
"error": str(e),
"configured": dropcatch_client.is_configured
}
# Test Sedo API
try:
sedo_result = await sedo_client.test_connection()
results["apis"]["sedo"] = sedo_result
except Exception as e:
results["apis"]["sedo"] = {
"success": False,
"error": str(e),
"configured": sedo_client.is_configured
}
# Summary
results["summary"] = {
"total": len(results["apis"]),
"configured": sum(1 for api in results["apis"].values() if api.get("configured")),
"connected": sum(1 for api in results["apis"].values() if api.get("success")),
}
return results
@router.post("/trigger-scrape")
async def trigger_auction_scrape(
db: Database,
admin: User = Depends(require_admin),
):
"""
Manually trigger auction scraping from all sources.
This will:
1. Try Tier 1 APIs (DropCatch, Sedo) first
2. Fall back to web scraping for others
"""
# Prefer job queue in production (non-blocking)
if settings.enable_job_queue and settings.redis_url:
from app.jobs.client import enqueue_job
job_id = await enqueue_job("scrape_auctions")
return {
"message": "Auction scraping enqueued",
"job_id": job_id,
"note": "Check /admin/scrape-status for results",
}
# Fallback: run inline
from app.services.auction_scraper import AuctionScraperService
scraper = AuctionScraperService()
result = await scraper.scrape_all_platforms(db)
return {
"message": "Auction scraping completed",
"result": result,
"note": "Check /admin/scrape-status for results",
}
@router.get("/scrape-status")
async def get_scrape_status(
db: Database,
admin: User = Depends(require_admin),
limit: int = 10,
):
"""Get recent scrape logs."""
from app.models.auction import AuctionScrapeLog
query = (
select(AuctionScrapeLog)
.order_by(desc(AuctionScrapeLog.started_at))
.limit(limit)
)
try:
result = await db.execute(query)
logs = result.scalars().all()
except Exception:
return {"logs": [], "error": "Table not found"}
return {
"logs": [
{
"id": log.id,
"platform": log.platform,
"status": log.status,
"auctions_found": log.auctions_found,
"auctions_new": log.auctions_new,
"auctions_updated": log.auctions_updated,
"error_message": log.error_message,
"started_at": log.started_at.isoformat() if log.started_at else None,
"completed_at": log.completed_at.isoformat() if log.completed_at else None,
}
for log in logs
]
}