""" OAuth authentication endpoints. Supports: - Google OAuth 2.0 - GitHub OAuth """ import base64 import hashlib import hmac import json import os import secrets import logging import time from datetime import datetime, timedelta from typing import Optional from urllib.parse import urlencode import httpx from fastapi import APIRouter, HTTPException, status, Query, Request from fastapi.responses import RedirectResponse from pydantic import BaseModel from sqlalchemy import select from app.api.deps import Database from app.config import get_settings from app.models.user import User from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG from app.services.auth import AuthService from app.security import set_auth_cookie, should_use_secure_cookies logger = logging.getLogger(__name__) router = APIRouter() settings = get_settings() # ============== Config ============== GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "") GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "") GOOGLE_REDIRECT_URI = os.getenv("GOOGLE_REDIRECT_URI", "http://localhost:8000/api/v1/oauth/google/callback") GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID", "") GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET", "") GITHUB_REDIRECT_URI = os.getenv("GITHUB_REDIRECT_URI", "http://localhost:8000/api/v1/oauth/github/callback") FRONTEND_URL = os.getenv("SITE_URL", "http://localhost:3000") OAUTH_STATE_TTL_SECONDS = 600 # 10 minutes def _sanitize_redirect_path(redirect: Optional[str]) -> str: """ Only allow internal (relative) redirects. Prevents open-redirect and token/referrer exfil paths. """ default = "/terminal/radar" if not redirect: return default r = redirect.strip() if not r.startswith("/"): return default if r.startswith("//"): return default if "://" in r: return default if "\\" in r: return default if len(r) > 2048: return default return r def _b64url_encode(data: bytes) -> str: return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") def _b64url_decode(data: str) -> bytes: pad = "=" * (-len(data) % 4) return base64.urlsafe_b64decode(data + pad) def _oauth_nonce_cookie_name(provider: str) -> str: return f"pounce_oauth_nonce_{provider}" def _set_oauth_nonce_cookie(response: RedirectResponse, provider: str, nonce: str) -> None: response.set_cookie( key=_oauth_nonce_cookie_name(provider), value=nonce, httponly=True, secure=should_use_secure_cookies(), samesite="lax", max_age=OAUTH_STATE_TTL_SECONDS, path="/api/v1/oauth", ) def _clear_oauth_nonce_cookie(response: RedirectResponse, provider: str) -> None: response.delete_cookie( key=_oauth_nonce_cookie_name(provider), path="/api/v1/oauth", ) def _create_oauth_state(provider: str, nonce: str, redirect_path: str) -> str: """ Signed, short-lived state payload. Also protects the redirect_path against tampering. """ if not settings.secret_key: raise RuntimeError("SECRET_KEY is required for OAuth state signing") payload = { "p": provider, "n": nonce, "r": redirect_path, "ts": int(time.time()), } payload_b64 = _b64url_encode( json.dumps(payload, separators=(",", ":"), ensure_ascii=False).encode("utf-8") ) sig = hmac.new( settings.secret_key.encode("utf-8"), payload_b64.encode("utf-8"), hashlib.sha256, ).digest() return f"{payload_b64}.{_b64url_encode(sig)}" def _verify_oauth_state(state: str, provider: str) -> tuple[str, str]: if not settings.secret_key: raise ValueError("OAuth state verification not available (missing SECRET_KEY)") if not state or "." not in state: raise ValueError("Invalid state format") payload_b64, sig_b64 = state.split(".", 1) expected_sig = _b64url_encode( hmac.new( settings.secret_key.encode("utf-8"), payload_b64.encode("utf-8"), hashlib.sha256, ).digest() ) if not hmac.compare_digest(expected_sig, sig_b64): raise ValueError("Invalid state signature") payload = json.loads(_b64url_decode(payload_b64).decode("utf-8")) if payload.get("p") != provider: raise ValueError("State provider mismatch") ts = int(payload.get("ts") or 0) if ts <= 0 or (int(time.time()) - ts) > OAUTH_STATE_TTL_SECONDS: raise ValueError("State expired") nonce = str(payload.get("n") or "") redirect_path = _sanitize_redirect_path(payload.get("r")) if not nonce: raise ValueError("Missing nonce") return nonce, redirect_path # ============== Schemas ============== class OAuthProviderInfo(BaseModel): """OAuth provider availability.""" google_enabled: bool github_enabled: bool class OAuthToken(BaseModel): """OAuth response with JWT token.""" access_token: str token_type: str = "bearer" expires_in: int is_new_user: bool = False # ============== Helper Functions ============== async def get_or_create_oauth_user( db: Database, email: str, name: Optional[str], provider: str, oauth_id: str, avatar: Optional[str] = None, ) -> tuple[User, bool]: """Get existing user or create new one from OAuth.""" is_new = False # First, check if user with this OAuth ID exists result = await db.execute( select(User).where( User.oauth_provider == provider, User.oauth_id == oauth_id, ) ) user = result.scalar_one_or_none() if user: return user, False # Check if user with this email exists (link accounts) result = await db.execute( select(User).where(User.email == email.lower()) ) user = result.scalar_one_or_none() if user: # Link OAuth to existing account user.oauth_provider = provider user.oauth_id = oauth_id if avatar: user.oauth_avatar = avatar user.is_verified = True # OAuth emails are verified await db.commit() return user, False # Create new user user = User( email=email.lower(), # Random password (won't be used), but keep it a valid bcrypt hash. hashed_password=AuthService.hash_password(secrets.token_urlsafe(32)), name=name, oauth_provider=provider, oauth_id=oauth_id, oauth_avatar=avatar, is_verified=True, # OAuth emails are pre-verified is_active=True, ) # Auto-admin for specific email - always admin + verified + Tycoon ADMIN_EMAILS = ["guggeryves@hotmail.com"] is_admin_user = user.email.lower() in [e.lower() for e in ADMIN_EMAILS] if is_admin_user: user.is_admin = True user.is_verified = True db.add(user) await db.commit() await db.refresh(user) # Create Tycoon subscription for admin users if is_admin_user: tycoon_config = TIER_CONFIG.get(SubscriptionTier.TYCOON, {}) subscription = Subscription( user_id=user.id, tier=SubscriptionTier.TYCOON, status=SubscriptionStatus.ACTIVE, max_domains=tycoon_config.get("domain_limit", 500), ) db.add(subscription) await db.commit() return user, True def create_jwt_for_user(user: User) -> tuple[str, int]: """Create JWT token for user.""" expires_minutes = settings.access_token_expire_minutes access_token = AuthService.create_access_token( data={"sub": str(user.id), "email": user.email}, expires_delta=timedelta(minutes=expires_minutes), ) return access_token, expires_minutes * 60 # ============== Endpoints ============== @router.get("/providers", response_model=OAuthProviderInfo) async def get_oauth_providers(): """Get available OAuth providers.""" return OAuthProviderInfo( google_enabled=bool(GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET), github_enabled=bool(GITHUB_CLIENT_ID and GITHUB_CLIENT_SECRET), ) # ============== Google OAuth ============== @router.get("/google/login") async def google_login(redirect: Optional[str] = Query(None)): """Redirect to Google OAuth.""" if not GOOGLE_CLIENT_ID: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Google OAuth not configured", ) redirect_path = _sanitize_redirect_path(redirect) nonce = secrets.token_urlsafe(16) state = _create_oauth_state("google", nonce, redirect_path) params = { "client_id": GOOGLE_CLIENT_ID, "redirect_uri": GOOGLE_REDIRECT_URI, "response_type": "code", "scope": "openid email profile", "state": state, "access_type": "offline", "prompt": "select_account", } url = f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode(params)}" response = RedirectResponse(url=url) _set_oauth_nonce_cookie(response, "google", nonce) return response @router.get("/google/callback") async def google_callback( request: Request, code: str = Query(...), state: str = Query(""), db: Database = None, ): """Handle Google OAuth callback.""" if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Google OAuth not configured", ) try: nonce, redirect_path = _verify_oauth_state(state, "google") except Exception as e: logger.warning(f"Invalid OAuth state (google): {e}") return RedirectResponse(url=f"{FRONTEND_URL}/login?error=oauth_state_invalid") cookie_nonce = request.cookies.get(_oauth_nonce_cookie_name("google")) if not cookie_nonce or not hmac.compare_digest(cookie_nonce, nonce): logger.warning("OAuth nonce mismatch (google)") return RedirectResponse(url=f"{FRONTEND_URL}/login?error=oauth_state_invalid") try: # Exchange code for tokens async with httpx.AsyncClient() as client: token_response = await client.post( "https://oauth2.googleapis.com/token", data={ "client_id": GOOGLE_CLIENT_ID, "client_secret": GOOGLE_CLIENT_SECRET, "code": code, "redirect_uri": GOOGLE_REDIRECT_URI, "grant_type": "authorization_code", }, ) if token_response.status_code != 200: logger.error(f"Google token error: {token_response.text}") return RedirectResponse( url=f"{FRONTEND_URL}/login?error=oauth_failed" ) tokens = token_response.json() access_token = tokens.get("access_token") # Get user info user_response = await client.get( "https://www.googleapis.com/oauth2/v2/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) if user_response.status_code != 200: logger.error(f"Google user info error: {user_response.text}") return RedirectResponse( url=f"{FRONTEND_URL}/login?error=oauth_failed" ) user_info = user_response.json() # Get or create user user, is_new = await get_or_create_oauth_user( db=db, email=user_info.get("email"), name=user_info.get("name"), provider="google", oauth_id=user_info.get("id"), avatar=user_info.get("picture"), ) # Create JWT jwt_token, _ = create_jwt_for_user(user) # Redirect to frontend WITHOUT token in URL; set auth cookie instead. query = {"redirect": redirect_path} if is_new: query["new"] = "true" redirect_url = f"{FRONTEND_URL}/oauth/callback?{urlencode(query)}" response = RedirectResponse(url=redirect_url) _clear_oauth_nonce_cookie(response, "google") set_auth_cookie( response=response, token=jwt_token, max_age_seconds=settings.access_token_expire_minutes * 60, ) return response except Exception as e: logger.exception(f"Google OAuth error: {e}") return RedirectResponse( url=f"{FRONTEND_URL}/login?error=oauth_failed" ) # ============== GitHub OAuth ============== @router.get("/github/login") async def github_login(redirect: Optional[str] = Query(None)): """Redirect to GitHub OAuth.""" if not GITHUB_CLIENT_ID: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="GitHub OAuth not configured", ) redirect_path = _sanitize_redirect_path(redirect) nonce = secrets.token_urlsafe(16) state = _create_oauth_state("github", nonce, redirect_path) params = { "client_id": GITHUB_CLIENT_ID, "redirect_uri": GITHUB_REDIRECT_URI, "scope": "user:email", "state": state, } url = f"https://github.com/login/oauth/authorize?{urlencode(params)}" response = RedirectResponse(url=url) _set_oauth_nonce_cookie(response, "github", nonce) return response @router.get("/github/callback") async def github_callback( request: Request, code: str = Query(...), state: str = Query(""), db: Database = None, ): """Handle GitHub OAuth callback.""" if not GITHUB_CLIENT_ID or not GITHUB_CLIENT_SECRET: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="GitHub OAuth not configured", ) try: nonce, redirect_path = _verify_oauth_state(state, "github") except Exception as e: logger.warning(f"Invalid OAuth state (github): {e}") return RedirectResponse(url=f"{FRONTEND_URL}/login?error=oauth_state_invalid") cookie_nonce = request.cookies.get(_oauth_nonce_cookie_name("github")) if not cookie_nonce or not hmac.compare_digest(cookie_nonce, nonce): logger.warning("OAuth nonce mismatch (github)") return RedirectResponse(url=f"{FRONTEND_URL}/login?error=oauth_state_invalid") try: async with httpx.AsyncClient() as client: # Exchange code for token token_response = await client.post( "https://github.com/login/oauth/access_token", data={ "client_id": GITHUB_CLIENT_ID, "client_secret": GITHUB_CLIENT_SECRET, "code": code, "redirect_uri": GITHUB_REDIRECT_URI, }, headers={"Accept": "application/json"}, ) if token_response.status_code != 200: logger.error(f"GitHub token error: {token_response.text}") return RedirectResponse( url=f"{FRONTEND_URL}/login?error=oauth_failed" ) tokens = token_response.json() access_token = tokens.get("access_token") if not access_token: logger.error(f"GitHub no access token: {tokens}") return RedirectResponse( url=f"{FRONTEND_URL}/login?error=oauth_failed" ) # Get user info user_response = await client.get( "https://api.github.com/user", headers={ "Authorization": f"Bearer {access_token}", "Accept": "application/json", }, ) if user_response.status_code != 200: logger.error(f"GitHub user info error: {user_response.text}") return RedirectResponse( url=f"{FRONTEND_URL}/login?error=oauth_failed" ) user_info = user_response.json() # Get primary email (might need separate call) email = user_info.get("email") if not email: emails_response = await client.get( "https://api.github.com/user/emails", headers={ "Authorization": f"Bearer {access_token}", "Accept": "application/json", }, ) if emails_response.status_code == 200: emails = emails_response.json() for e in emails: if e.get("primary"): email = e.get("email") break if not email and emails: email = emails[0].get("email") if not email: return RedirectResponse( url=f"{FRONTEND_URL}/login?error=no_email" ) # Get or create user user, is_new = await get_or_create_oauth_user( db=db, email=email, name=user_info.get("name") or user_info.get("login"), provider="github", oauth_id=str(user_info.get("id")), avatar=user_info.get("avatar_url"), ) # Create JWT jwt_token, _ = create_jwt_for_user(user) query = {"redirect": redirect_path} if is_new: query["new"] = "true" redirect_url = f"{FRONTEND_URL}/oauth/callback?{urlencode(query)}" response = RedirectResponse(url=redirect_url) _clear_oauth_nonce_cookie(response, "github") set_auth_cookie( response=response, token=jwt_token, max_age_seconds=settings.access_token_expire_minutes * 60, ) return response except Exception as e: logger.exception(f"GitHub OAuth error: {e}") return RedirectResponse( url=f"{FRONTEND_URL}/login?error=oauth_failed" )