pounce/backend/app/api/admin.py
yves.gugger f0cc69ac95 feat: TLD price scraper, .ch domain fix, DB integration
Major changes:
- Add TLD price scraper with Porkbun API (886+ TLDs, no API key needed)
- Fix .ch domain checker using rdap.nic.ch custom RDAP
- Integrate database for TLD price history tracking
- Add admin endpoints for manual scrape and stats
- Extend scheduler with daily TLD price scrape job (03:00 UTC)
- Update API to use DB data with static fallback
- Update README with complete documentation

New files:
- backend/app/services/tld_scraper/ (scraper package)
- TLD_TRACKING_PLAN.md (implementation plan)

API changes:
- POST /admin/scrape-tld-prices - trigger manual scrape
- GET /admin/tld-prices/stats - database statistics
- GET /tld-prices/overview now uses DB data
2025-12-08 09:12:44 +01:00

190 lines
6.1 KiB
Python

"""Admin API endpoints - for internal use only."""
from fastapi import APIRouter, HTTPException, status, BackgroundTasks
from pydantic import BaseModel, EmailStr
from sqlalchemy import select
from app.api.deps import Database
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
router = APIRouter()
@router.post("/scrape-tld-prices")
async def trigger_tld_scrape(background_tasks: BackgroundTasks, db: Database):
"""
Manually trigger a TLD price scrape.
This runs in the background and returns immediately.
Check logs for scrape results.
NOTE: In production, this should require admin authentication!
"""
from app.services.tld_scraper.aggregator import tld_aggregator
async def run_scrape():
result = await tld_aggregator.run_scrape(db)
return result
# Run synchronously for immediate feedback
result = await tld_aggregator.run_scrape(db)
return {
"message": "TLD price scrape completed",
"status": result.status,
"tlds_scraped": result.tlds_scraped,
"prices_saved": result.prices_saved,
"sources_succeeded": result.sources_succeeded,
"sources_attempted": result.sources_attempted,
"errors": result.errors,
"started_at": result.started_at.isoformat(),
"completed_at": result.completed_at.isoformat() if result.completed_at else None,
}
@router.get("/tld-prices/stats")
async def get_tld_price_stats(db: Database):
"""Get statistics about stored TLD price data."""
from sqlalchemy import func
from app.models.tld_price import TLDPrice
# Total records
total_result = await db.execute(select(func.count(TLDPrice.id)))
total_records = total_result.scalar()
# Unique TLDs
tlds_result = await db.execute(select(func.count(func.distinct(TLDPrice.tld))))
unique_tlds = tlds_result.scalar()
# Unique registrars
registrars_result = await db.execute(select(func.count(func.distinct(TLDPrice.registrar))))
unique_registrars = registrars_result.scalar()
# Latest record
latest_result = await db.execute(
select(TLDPrice.recorded_at).order_by(TLDPrice.recorded_at.desc()).limit(1)
)
latest_record = latest_result.scalar()
# Oldest record
oldest_result = await db.execute(
select(TLDPrice.recorded_at).order_by(TLDPrice.recorded_at.asc()).limit(1)
)
oldest_record = oldest_result.scalar()
return {
"total_records": total_records,
"unique_tlds": unique_tlds,
"unique_registrars": unique_registrars,
"latest_record": latest_record.isoformat() if latest_record else None,
"oldest_record": oldest_record.isoformat() if oldest_record else None,
"data_range_days": (latest_record - oldest_record).days if latest_record and oldest_record else 0,
}
class UpgradeUserRequest(BaseModel):
"""Request schema for upgrading a user."""
email: EmailStr
tier: str # starter, professional, enterprise
@router.post("/upgrade-user")
async def upgrade_user(request: UpgradeUserRequest, db: Database):
"""
Upgrade a user's subscription tier.
NOTE: In production, this should require admin authentication!
"""
# Find user
result = await db.execute(
select(User).where(User.email == request.email)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with email {request.email} not found"
)
# Validate tier
try:
new_tier = SubscriptionTier(request.tier)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid tier: {request.tier}. Valid options: starter, professional, enterprise"
)
# Find or create subscription
result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = result.scalar_one_or_none()
if not subscription:
# Create new subscription
subscription = Subscription(
user_id=user.id,
tier=new_tier,
status=SubscriptionStatus.ACTIVE,
domain_limit=TIER_CONFIG[new_tier]["domain_limit"],
)
db.add(subscription)
else:
# Update existing
subscription.tier = new_tier
subscription.domain_limit = TIER_CONFIG[new_tier]["domain_limit"]
subscription.status = SubscriptionStatus.ACTIVE
await db.commit()
await db.refresh(subscription)
config = TIER_CONFIG[new_tier]
return {
"message": f"User {request.email} upgraded to {config['name']}",
"user_id": user.id,
"tier": new_tier.value,
"tier_name": config["name"],
"domain_limit": config["domain_limit"],
"features": config["features"],
}
@router.get("/users")
async def list_users(db: Database, limit: int = 50, offset: int = 0):
"""
List all users with their subscriptions.
NOTE: In production, this should require admin authentication!
"""
result = await db.execute(
select(User).offset(offset).limit(limit)
)
users = result.scalars().all()
user_list = []
for user in users:
# Get subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == user.id)
)
subscription = sub_result.scalar_one_or_none()
user_list.append({
"id": user.id,
"email": user.email,
"name": user.name,
"is_active": user.is_active,
"created_at": user.created_at.isoformat(),
"subscription": {
"tier": subscription.tier.value if subscription else None,
"status": subscription.status.value if subscription else None,
"domain_limit": subscription.domain_limit if subscription else 0,
} if subscription else None,
})
return {"users": user_list, "count": len(user_list)}