Yves Gugger 9fde9eab4a
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Fix subscription tier and domain_limit issues
- Changed SubscriptionTier.STARTER to SCOUT (correct tier name)
- Removed domain_limit from Subscription constructor (it's a computed property)
- Database needs to be re-initialized with: python scripts/init_db.py && python scripts/seed_tld_prices.py
2025-12-09 08:41:45 +01:00

118 lines
3.6 KiB
Python

"""Authentication service."""
from datetime import datetime, timedelta
from typing import Optional
import bcrypt
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
settings = get_settings()
class AuthService:
"""Service for authentication operations."""
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
@staticmethod
def hash_password(password: str) -> str:
"""Hash a password."""
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
@staticmethod
def decode_token(token: str) -> Optional[dict]:
"""Decode and validate a JWT token."""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
return payload
except JWTError:
return None
@staticmethod
async def get_user_by_email(db: AsyncSession, email: str) -> Optional[User]:
"""Get user by email."""
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
@staticmethod
async def get_user_by_id(db: AsyncSession, user_id: int) -> Optional[User]:
"""Get user by ID."""
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
@staticmethod
async def authenticate_user(db: AsyncSession, email: str, password: str) -> Optional[User]:
"""Authenticate user with email and password."""
user = await AuthService.get_user_by_email(db, email)
if not user:
return None
if not AuthService.verify_password(password, user.hashed_password):
return None
return user
@staticmethod
async def create_user(
db: AsyncSession,
email: str,
password: str,
name: Optional[str] = None
) -> User:
"""Create a new user with default subscription."""
# Create user
user = User(
email=email,
hashed_password=AuthService.hash_password(password),
name=name,
)
db.add(user)
await db.flush()
# Create default Scout (free) subscription
subscription = Subscription(
user_id=user.id,
tier=SubscriptionTier.SCOUT,
status=SubscriptionStatus.ACTIVE,
)
db.add(subscription)
await db.commit()
await db.refresh(user)
return user
# Singleton instance
auth_service = AuthService()