"""Admin API endpoints - for internal use only.""" from fastapi import APIRouter, HTTPException, status, BackgroundTasks from pydantic import BaseModel, EmailStr from sqlalchemy import select from app.api.deps import Database from app.models.user import User from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG router = APIRouter() @router.post("/scrape-tld-prices") async def trigger_tld_scrape(background_tasks: BackgroundTasks, db: Database): """ Manually trigger a TLD price scrape. This runs in the background and returns immediately. Check logs for scrape results. NOTE: In production, this should require admin authentication! """ from app.services.tld_scraper.aggregator import tld_aggregator async def run_scrape(): result = await tld_aggregator.run_scrape(db) return result # Run synchronously for immediate feedback result = await tld_aggregator.run_scrape(db) return { "message": "TLD price scrape completed", "status": result.status, "tlds_scraped": result.tlds_scraped, "prices_saved": result.prices_saved, "sources_succeeded": result.sources_succeeded, "sources_attempted": result.sources_attempted, "errors": result.errors, "started_at": result.started_at.isoformat(), "completed_at": result.completed_at.isoformat() if result.completed_at else None, } @router.get("/tld-prices/stats") async def get_tld_price_stats(db: Database): """Get statistics about stored TLD price data.""" from sqlalchemy import func from app.models.tld_price import TLDPrice # Total records total_result = await db.execute(select(func.count(TLDPrice.id))) total_records = total_result.scalar() # Unique TLDs tlds_result = await db.execute(select(func.count(func.distinct(TLDPrice.tld)))) unique_tlds = tlds_result.scalar() # Unique registrars registrars_result = await db.execute(select(func.count(func.distinct(TLDPrice.registrar)))) unique_registrars = registrars_result.scalar() # Latest record latest_result = await db.execute( select(TLDPrice.recorded_at).order_by(TLDPrice.recorded_at.desc()).limit(1) ) latest_record = latest_result.scalar() # Oldest record oldest_result = await db.execute( select(TLDPrice.recorded_at).order_by(TLDPrice.recorded_at.asc()).limit(1) ) oldest_record = oldest_result.scalar() return { "total_records": total_records, "unique_tlds": unique_tlds, "unique_registrars": unique_registrars, "latest_record": latest_record.isoformat() if latest_record else None, "oldest_record": oldest_record.isoformat() if oldest_record else None, "data_range_days": (latest_record - oldest_record).days if latest_record and oldest_record else 0, } class UpgradeUserRequest(BaseModel): """Request schema for upgrading a user.""" email: EmailStr tier: str # starter, professional, enterprise @router.post("/upgrade-user") async def upgrade_user(request: UpgradeUserRequest, db: Database): """ Upgrade a user's subscription tier. NOTE: In production, this should require admin authentication! """ # Find user result = await db.execute( select(User).where(User.email == request.email) ) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with email {request.email} not found" ) # Validate tier try: new_tier = SubscriptionTier(request.tier) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid tier: {request.tier}. Valid options: starter, professional, enterprise" ) # Find or create subscription result = await db.execute( select(Subscription).where(Subscription.user_id == user.id) ) subscription = result.scalar_one_or_none() if not subscription: # Create new subscription subscription = Subscription( user_id=user.id, tier=new_tier, status=SubscriptionStatus.ACTIVE, domain_limit=TIER_CONFIG[new_tier]["domain_limit"], ) db.add(subscription) else: # Update existing subscription.tier = new_tier subscription.domain_limit = TIER_CONFIG[new_tier]["domain_limit"] subscription.status = SubscriptionStatus.ACTIVE await db.commit() await db.refresh(subscription) config = TIER_CONFIG[new_tier] return { "message": f"User {request.email} upgraded to {config['name']}", "user_id": user.id, "tier": new_tier.value, "tier_name": config["name"], "domain_limit": config["domain_limit"], "features": config["features"], } @router.get("/users") async def list_users(db: Database, limit: int = 50, offset: int = 0): """ List all users with their subscriptions. NOTE: In production, this should require admin authentication! """ result = await db.execute( select(User).offset(offset).limit(limit) ) users = result.scalars().all() user_list = [] for user in users: # Get subscription sub_result = await db.execute( select(Subscription).where(Subscription.user_id == user.id) ) subscription = sub_result.scalar_one_or_none() user_list.append({ "id": user.id, "email": user.email, "name": user.name, "is_active": user.is_active, "created_at": user.created_at.isoformat(), "subscription": { "tier": subscription.tier.value if subscription else None, "status": subscription.status.value if subscription else None, "domain_limit": subscription.domain_limit if subscription else 0, } if subscription else None, }) return {"users": user_list, "count": len(user_list)}