pounce/backend/app/api/oauth.py
yves.gugger 6ce926d405 Premium service implementation & Tone of Voice consistency
🚀 PREMIUM DATA COLLECTOR:
- New script: backend/scripts/premium_data_collector.py
- Automated TLD price collection with quality scoring
- Automated auction scraping with validation
- Data quality reports (JSON + console output)
- Premium-ready score calculation (target: 80+)

 CRON AUTOMATION:
- New script: backend/scripts/setup_cron.sh
- TLD prices: Every 6 hours
- Auctions: Every 2 hours
- Quality reports: Daily at 1:00 AM

👤 ADMIN PRIVILEGES:
- guggeryves@hotmail.com always admin + verified
- Auto-creates Tycoon subscription for admin
- Works for OAuth and regular registration

🎯 TONE OF VOICE FIXES:
- 'Get Started Free' → 'Join the Hunt'
- 'Blog' → 'Briefings' (Footer + Pages)
- 'Loading...' → 'Acquiring targets...'
- 'Back to Blog' → 'Back to Briefings'
- Analysis report: TONE_OF_VOICE_ANALYSIS.md (85% consistent)
2025-12-10 09:22:29 +01:00

415 lines
13 KiB
Python

"""
OAuth authentication endpoints.
Supports:
- Google OAuth 2.0
- GitHub OAuth
"""
import os
import secrets
import logging
from datetime import datetime, timedelta
from typing import Optional
from urllib.parse import urlencode
import httpx
from fastapi import APIRouter, HTTPException, status, Query
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from sqlalchemy import select
from app.api.deps import Database
from app.config import get_settings
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
from app.services.auth import AuthService
logger = logging.getLogger(__name__)
router = APIRouter()
settings = get_settings()
# ============== Config ==============
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "")
GOOGLE_REDIRECT_URI = os.getenv("GOOGLE_REDIRECT_URI", "http://localhost:8000/api/v1/oauth/google/callback")
GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID", "")
GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET", "")
GITHUB_REDIRECT_URI = os.getenv("GITHUB_REDIRECT_URI", "http://localhost:8000/api/v1/oauth/github/callback")
FRONTEND_URL = os.getenv("SITE_URL", "http://localhost:3000")
# ============== Schemas ==============
class OAuthProviderInfo(BaseModel):
"""OAuth provider availability."""
google_enabled: bool
github_enabled: bool
class OAuthToken(BaseModel):
"""OAuth response with JWT token."""
access_token: str
token_type: str = "bearer"
expires_in: int
is_new_user: bool = False
# ============== Helper Functions ==============
async def get_or_create_oauth_user(
db: Database,
email: str,
name: Optional[str],
provider: str,
oauth_id: str,
avatar: Optional[str] = None,
) -> tuple[User, bool]:
"""Get existing user or create new one from OAuth."""
is_new = False
# First, check if user with this OAuth ID exists
result = await db.execute(
select(User).where(
User.oauth_provider == provider,
User.oauth_id == oauth_id,
)
)
user = result.scalar_one_or_none()
if user:
return user, False
# Check if user with this email exists (link accounts)
result = await db.execute(
select(User).where(User.email == email.lower())
)
user = result.scalar_one_or_none()
if user:
# Link OAuth to existing account
user.oauth_provider = provider
user.oauth_id = oauth_id
if avatar:
user.oauth_avatar = avatar
user.is_verified = True # OAuth emails are verified
await db.commit()
return user, False
# Create new user
user = User(
email=email.lower(),
hashed_password=secrets.token_urlsafe(32), # Random password (won't be used)
name=name,
oauth_provider=provider,
oauth_id=oauth_id,
oauth_avatar=avatar,
is_verified=True, # OAuth emails are pre-verified
is_active=True,
)
# Auto-admin for specific email - always admin + verified + Tycoon
ADMIN_EMAILS = ["guggeryves@hotmail.com"]
is_admin_user = user.email.lower() in [e.lower() for e in ADMIN_EMAILS]
if is_admin_user:
user.is_admin = True
user.is_verified = True
db.add(user)
await db.commit()
await db.refresh(user)
# Create Tycoon subscription for admin users
if is_admin_user:
tycoon_config = TIER_CONFIG.get(SubscriptionTier.TYCOON, {})
subscription = Subscription(
user_id=user.id,
tier=SubscriptionTier.TYCOON,
status=SubscriptionStatus.ACTIVE,
max_domains=tycoon_config.get("domain_limit", 500),
)
db.add(subscription)
await db.commit()
return user, True
def create_jwt_for_user(user: User) -> tuple[str, int]:
"""Create JWT token for user."""
expires_minutes = settings.access_token_expire_minutes
access_token = AuthService.create_access_token(
data={"sub": str(user.id), "email": user.email},
expires_delta=timedelta(minutes=expires_minutes),
)
return access_token, expires_minutes * 60
# ============== Endpoints ==============
@router.get("/providers", response_model=OAuthProviderInfo)
async def get_oauth_providers():
"""Get available OAuth providers."""
return OAuthProviderInfo(
google_enabled=bool(GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET),
github_enabled=bool(GITHUB_CLIENT_ID and GITHUB_CLIENT_SECRET),
)
# ============== Google OAuth ==============
@router.get("/google/login")
async def google_login(redirect: Optional[str] = Query(None)):
"""Redirect to Google OAuth."""
if not GOOGLE_CLIENT_ID:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Google OAuth not configured",
)
# Store redirect URL in state
state = secrets.token_urlsafe(16)
if redirect:
state = f"{state}:{redirect}"
params = {
"client_id": GOOGLE_CLIENT_ID,
"redirect_uri": GOOGLE_REDIRECT_URI,
"response_type": "code",
"scope": "openid email profile",
"state": state,
"access_type": "offline",
"prompt": "select_account",
}
url = f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode(params)}"
return RedirectResponse(url=url)
@router.get("/google/callback")
async def google_callback(
code: str = Query(...),
state: str = Query(""),
db: Database = None,
):
"""Handle Google OAuth callback."""
if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Google OAuth not configured",
)
# Parse redirect from state
redirect_path = "/dashboard"
if ":" in state:
_, redirect_path = state.split(":", 1)
try:
# Exchange code for tokens
async with httpx.AsyncClient() as client:
token_response = await client.post(
"https://oauth2.googleapis.com/token",
data={
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"code": code,
"redirect_uri": GOOGLE_REDIRECT_URI,
"grant_type": "authorization_code",
},
)
if token_response.status_code != 200:
logger.error(f"Google token error: {token_response.text}")
return RedirectResponse(
url=f"{FRONTEND_URL}/login?error=oauth_failed"
)
tokens = token_response.json()
access_token = tokens.get("access_token")
# Get user info
user_response = await client.get(
"https://www.googleapis.com/oauth2/v2/userinfo",
headers={"Authorization": f"Bearer {access_token}"},
)
if user_response.status_code != 200:
logger.error(f"Google user info error: {user_response.text}")
return RedirectResponse(
url=f"{FRONTEND_URL}/login?error=oauth_failed"
)
user_info = user_response.json()
# Get or create user
user, is_new = await get_or_create_oauth_user(
db=db,
email=user_info.get("email"),
name=user_info.get("name"),
provider="google",
oauth_id=user_info.get("id"),
avatar=user_info.get("picture"),
)
# Create JWT
jwt_token, _ = create_jwt_for_user(user)
# Redirect to frontend with token
redirect_url = f"{FRONTEND_URL}/oauth/callback?token={jwt_token}&redirect={redirect_path}"
if is_new:
redirect_url += "&new=true"
return RedirectResponse(url=redirect_url)
except Exception as e:
logger.exception(f"Google OAuth error: {e}")
return RedirectResponse(
url=f"{FRONTEND_URL}/login?error=oauth_failed"
)
# ============== GitHub OAuth ==============
@router.get("/github/login")
async def github_login(redirect: Optional[str] = Query(None)):
"""Redirect to GitHub OAuth."""
if not GITHUB_CLIENT_ID:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="GitHub OAuth not configured",
)
# Store redirect URL in state
state = secrets.token_urlsafe(16)
if redirect:
state = f"{state}:{redirect}"
params = {
"client_id": GITHUB_CLIENT_ID,
"redirect_uri": GITHUB_REDIRECT_URI,
"scope": "user:email",
"state": state,
}
url = f"https://github.com/login/oauth/authorize?{urlencode(params)}"
return RedirectResponse(url=url)
@router.get("/github/callback")
async def github_callback(
code: str = Query(...),
state: str = Query(""),
db: Database = None,
):
"""Handle GitHub OAuth callback."""
if not GITHUB_CLIENT_ID or not GITHUB_CLIENT_SECRET:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="GitHub OAuth not configured",
)
# Parse redirect from state
redirect_path = "/dashboard"
if ":" in state:
_, redirect_path = state.split(":", 1)
try:
async with httpx.AsyncClient() as client:
# Exchange code for token
token_response = await client.post(
"https://github.com/login/oauth/access_token",
data={
"client_id": GITHUB_CLIENT_ID,
"client_secret": GITHUB_CLIENT_SECRET,
"code": code,
"redirect_uri": GITHUB_REDIRECT_URI,
},
headers={"Accept": "application/json"},
)
if token_response.status_code != 200:
logger.error(f"GitHub token error: {token_response.text}")
return RedirectResponse(
url=f"{FRONTEND_URL}/login?error=oauth_failed"
)
tokens = token_response.json()
access_token = tokens.get("access_token")
if not access_token:
logger.error(f"GitHub no access token: {tokens}")
return RedirectResponse(
url=f"{FRONTEND_URL}/login?error=oauth_failed"
)
# Get user info
user_response = await client.get(
"https://api.github.com/user",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
)
if user_response.status_code != 200:
logger.error(f"GitHub user info error: {user_response.text}")
return RedirectResponse(
url=f"{FRONTEND_URL}/login?error=oauth_failed"
)
user_info = user_response.json()
# Get primary email (might need separate call)
email = user_info.get("email")
if not email:
emails_response = await client.get(
"https://api.github.com/user/emails",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
)
if emails_response.status_code == 200:
emails = emails_response.json()
for e in emails:
if e.get("primary"):
email = e.get("email")
break
if not email and emails:
email = emails[0].get("email")
if not email:
return RedirectResponse(
url=f"{FRONTEND_URL}/login?error=no_email"
)
# Get or create user
user, is_new = await get_or_create_oauth_user(
db=db,
email=email,
name=user_info.get("name") or user_info.get("login"),
provider="github",
oauth_id=str(user_info.get("id")),
avatar=user_info.get("avatar_url"),
)
# Create JWT
jwt_token, _ = create_jwt_for_user(user)
# Redirect to frontend with token
redirect_url = f"{FRONTEND_URL}/oauth/callback?token={jwt_token}&redirect={redirect_path}"
if is_new:
redirect_url += "&new=true"
return RedirectResponse(url=redirect_url)
except Exception as e:
logger.exception(f"GitHub OAuth error: {e}")
return RedirectResponse(
url=f"{FRONTEND_URL}/login?error=oauth_failed"
)