Yves Gugger 9acb90c067 Initial commit: Pounce - Domain Monitoring System
- FastAPI backend mit Domain-Check, TLD-Pricing, User-Management
- Next.js frontend mit modernem UI
- Sortierbare TLD-Tabelle mit Mini-Charts
- Domain availability monitoring
- Subscription tiers (Starter, Professional, Enterprise)
- Authentication & Authorization
- Scheduler für automatische Domain-Checks
2025-12-08 07:26:57 +01:00

119 lines
3.6 KiB
Python

"""Authentication service."""
from datetime import datetime, timedelta
from typing import Optional
import bcrypt
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
settings = get_settings()
class AuthService:
"""Service for authentication operations."""
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
@staticmethod
def hash_password(password: str) -> str:
"""Hash a password."""
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
@staticmethod
def decode_token(token: str) -> Optional[dict]:
"""Decode and validate a JWT token."""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
return payload
except JWTError:
return None
@staticmethod
async def get_user_by_email(db: AsyncSession, email: str) -> Optional[User]:
"""Get user by email."""
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
@staticmethod
async def get_user_by_id(db: AsyncSession, user_id: int) -> Optional[User]:
"""Get user by ID."""
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
@staticmethod
async def authenticate_user(db: AsyncSession, email: str, password: str) -> Optional[User]:
"""Authenticate user with email and password."""
user = await AuthService.get_user_by_email(db, email)
if not user:
return None
if not AuthService.verify_password(password, user.hashed_password):
return None
return user
@staticmethod
async def create_user(
db: AsyncSession,
email: str,
password: str,
name: Optional[str] = None
) -> User:
"""Create a new user with default subscription."""
# Create user
user = User(
email=email,
hashed_password=AuthService.hash_password(password),
name=name,
)
db.add(user)
await db.flush()
# Create default starter subscription
subscription = Subscription(
user_id=user.id,
tier=SubscriptionTier.STARTER,
status=SubscriptionStatus.ACTIVE,
domain_limit=TIER_CONFIG[SubscriptionTier.STARTER]["domain_limit"],
)
db.add(subscription)
await db.commit()
await db.refresh(user)
return user
# Singleton instance
auth_service = AuthService()