yves.gugger fb418b91ad feat: Professional redesign of user and admin backend
- Redesigned Sidebar with pounce puma logo and elegant premium styling
- Updated CommandCenterLayout with improved top bar styling
- Integrated Admin page into CommandCenterLayout for consistent experience
- Created reusable DataTable component with elegant styling
- Enhanced Dashboard with premium card designs and visual effects
- Improved Watchlist with modern gradient styling
- Added case-insensitive email handling in auth (from previous fix)

All tables now have consistent, elegant styling with:
- Gradient backgrounds
- Subtle borders and shadows
- Hover effects with accent color
- Responsive design
- Status badges and action buttons
2025-12-10 10:14:34 +01:00

121 lines
3.7 KiB
Python

"""Authentication service."""
from datetime import datetime, timedelta
from typing import Optional
import bcrypt
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
settings = get_settings()
class AuthService:
"""Service for authentication operations."""
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
@staticmethod
def hash_password(password: str) -> str:
"""Hash a password."""
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
@staticmethod
def decode_token(token: str) -> Optional[dict]:
"""Decode and validate a JWT token."""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
return payload
except JWTError:
return None
@staticmethod
async def get_user_by_email(db: AsyncSession, email: str) -> Optional[User]:
"""Get user by email (case-insensitive)."""
from sqlalchemy import func
result = await db.execute(
select(User).where(func.lower(User.email) == email.lower())
)
return result.scalar_one_or_none()
@staticmethod
async def get_user_by_id(db: AsyncSession, user_id: int) -> Optional[User]:
"""Get user by ID."""
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
@staticmethod
async def authenticate_user(db: AsyncSession, email: str, password: str) -> Optional[User]:
"""Authenticate user with email and password."""
user = await AuthService.get_user_by_email(db, email)
if not user:
return None
if not AuthService.verify_password(password, user.hashed_password):
return None
return user
@staticmethod
async def create_user(
db: AsyncSession,
email: str,
password: str,
name: Optional[str] = None
) -> User:
"""Create a new user with default subscription."""
# Create user (normalize email to lowercase)
user = User(
email=email.lower().strip(),
hashed_password=AuthService.hash_password(password),
name=name,
)
db.add(user)
await db.flush()
# Create default Scout (free) subscription
subscription = Subscription(
user_id=user.id,
tier=SubscriptionTier.SCOUT,
status=SubscriptionStatus.ACTIVE,
)
db.add(subscription)
await db.commit()
await db.refresh(user)
return user
# Singleton instance
auth_service = AuthService()