"""Domain management API (requires authentication).""" import json from datetime import datetime from math import ceil from fastapi import APIRouter, HTTPException, status, Query from pydantic import BaseModel from sqlalchemy import select, func, and_ from app.api.deps import Database, CurrentUser from app.models.domain import Domain, DomainCheck, DomainStatus, DomainHealthCache from app.models.subscription import TIER_CONFIG, SubscriptionTier from app.schemas.domain import DomainCreate, DomainResponse, DomainListResponse from app.services.domain_checker import domain_checker from app.services.domain_health import get_health_checker, HealthStatus router = APIRouter() def _safe_json_loads(value: str | None, default): if not value: return default try: return json.loads(value) except Exception: return default def _health_cache_to_report(domain: Domain, cache: DomainHealthCache) -> dict: """Convert DomainHealthCache row into the same shape as DomainHealthReport.to_dict().""" return { "domain": domain.name, "status": cache.status or "unknown", "score": cache.score or 0, "signals": _safe_json_loads(cache.signals, []), "recommendations": [], # not stored in cache (yet) "checked_at": cache.checked_at.isoformat() if cache.checked_at else datetime.utcnow().isoformat(), "dns": _safe_json_loads( cache.dns_data, {"has_ns": False, "has_a": False, "has_mx": False, "nameservers": [], "is_parked": False, "error": None}, ), "http": _safe_json_loads( cache.http_data, {"is_reachable": False, "status_code": None, "is_parked": False, "parking_keywords": [], "content_length": 0, "error": None}, ), "ssl": _safe_json_loads( cache.ssl_data, {"has_certificate": False, "is_valid": False, "expires_at": None, "days_until_expiry": None, "issuer": None, "error": None}, ), } @router.get("", response_model=DomainListResponse) async def list_domains( current_user: CurrentUser, db: Database, page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), ): """Get list of monitored domains for current user.""" # Count total count_query = select(func.count(Domain.id)).where(Domain.user_id == current_user.id) total = (await db.execute(count_query)).scalar() # Get domains with pagination offset = (page - 1) * per_page query = ( select(Domain) .where(Domain.user_id == current_user.id) .order_by(Domain.created_at.desc()) .offset(offset) .limit(per_page) ) result = await db.execute(query) domains = result.scalars().all() return DomainListResponse( domains=[DomainResponse.model_validate(d) for d in domains], total=total, page=page, per_page=per_page, pages=ceil(total / per_page) if total > 0 else 1, ) @router.get("/health-cache") async def get_domains_health_cache( current_user: CurrentUser, db: Database, ): """ Get cached domain health reports for the current user (bulk). This avoids N requests from the frontend and returns the cached health data written by the scheduler job. """ result = await db.execute( select(Domain, DomainHealthCache) .outerjoin(DomainHealthCache, DomainHealthCache.domain_id == Domain.id) .where(Domain.user_id == current_user.id) ) rows = result.all() reports: dict[str, dict] = {} cached = 0 for domain, cache in rows: if cache is None: continue reports[str(domain.id)] = _health_cache_to_report(domain, cache) cached += 1 return { "reports": reports, "total_domains": len(rows), "cached_domains": cached, "timestamp": datetime.utcnow().isoformat(), } @router.post("", response_model=DomainResponse, status_code=status.HTTP_201_CREATED) async def add_domain( domain_data: DomainCreate, current_user: CurrentUser, db: Database, ): """Add a domain to monitoring list.""" # Check subscription limit await db.refresh(current_user, ["subscription", "domains"]) if current_user.subscription: limit = current_user.subscription.max_domains else: limit = TIER_CONFIG[SubscriptionTier.SCOUT]["domain_limit"] current_count = len(current_user.domains) if current_count >= limit: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Domain limit reached ({limit}). Upgrade your subscription to add more domains.", ) # Check if domain already exists for this user existing = await db.execute( select(Domain).where( Domain.user_id == current_user.id, Domain.name == domain_data.name, ) ) if existing.scalar_one_or_none(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Domain already in your monitoring list", ) # Check domain availability check_result = await domain_checker.check_domain(domain_data.name) # Create domain domain = Domain( name=domain_data.name, user_id=current_user.id, status=check_result.status, is_available=check_result.is_available, registrar=check_result.registrar, expiration_date=check_result.expiration_date, notify_on_available=domain_data.notify_on_available, last_checked=datetime.utcnow(), ) db.add(domain) await db.flush() # Create initial check record check = DomainCheck( domain_id=domain.id, status=check_result.status, is_available=check_result.is_available, response_data=str(check_result.to_dict()), checked_at=datetime.utcnow(), ) db.add(check) await db.commit() await db.refresh(domain) return domain @router.get("/{domain_id}", response_model=DomainResponse) async def get_domain( domain_id: int, current_user: CurrentUser, db: Database, ): """Get a specific domain.""" result = await db.execute( select(Domain).where( Domain.id == domain_id, Domain.user_id == current_user.id, ) ) domain = result.scalar_one_or_none() if not domain: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Domain not found", ) return domain @router.delete("/{domain_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_domain( domain_id: int, current_user: CurrentUser, db: Database, ): """Remove a domain from monitoring list.""" result = await db.execute( select(Domain).where( Domain.id == domain_id, Domain.user_id == current_user.id, ) ) domain = result.scalar_one_or_none() if not domain: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Domain not found", ) await db.delete(domain) await db.commit() @router.post("/{domain_id}/refresh", response_model=DomainResponse) async def refresh_domain( domain_id: int, current_user: CurrentUser, db: Database, ): """Manually refresh domain availability status.""" result = await db.execute( select(Domain).where( Domain.id == domain_id, Domain.user_id == current_user.id, ) ) domain = result.scalar_one_or_none() if not domain: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Domain not found", ) # Check domain check_result = await domain_checker.check_domain(domain.name) # Update domain domain.status = check_result.status domain.is_available = check_result.is_available domain.registrar = check_result.registrar domain.expiration_date = check_result.expiration_date domain.last_checked = datetime.utcnow() # Create check record check = DomainCheck( domain_id=domain.id, status=check_result.status, is_available=check_result.is_available, response_data=str(check_result.to_dict()), checked_at=datetime.utcnow(), ) db.add(check) await db.commit() await db.refresh(domain) return domain class NotifyUpdate(BaseModel): """Schema for updating notification settings.""" notify: bool @router.patch("/{domain_id}/notify", response_model=DomainResponse) async def update_notification_settings( domain_id: int, data: NotifyUpdate, current_user: CurrentUser, db: Database, ): """Update notification settings for a domain.""" result = await db.execute( select(Domain).where( Domain.id == domain_id, Domain.user_id == current_user.id, ) ) domain = result.scalar_one_or_none() if not domain: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Domain not found", ) domain.notify_on_available = data.notify await db.commit() await db.refresh(domain) return domain @router.patch("/{domain_id}/expiry", response_model=DomainResponse) async def update_expiration_date( domain_id: int, data: dict, current_user: CurrentUser, db: Database, ): """ Manually set the expiration date for a domain. Useful for TLDs like .ch, .de that don't expose expiration via public WHOIS/RDAP. The date can be found in your registrar's control panel. """ from datetime import datetime result = await db.execute( select(Domain).where( Domain.id == domain_id, Domain.user_id == current_user.id, ) ) domain = result.scalar_one_or_none() if not domain: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Domain not found", ) # Parse and set expiration date expiration_str = data.get('expiration_date') if expiration_str: try: if isinstance(expiration_str, str): # Parse ISO format expiration_str = expiration_str.replace('Z', '+00:00') domain.expiration_date = datetime.fromisoformat(expiration_str) else: domain.expiration_date = expiration_str except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid date format: {e}", ) else: domain.expiration_date = None await db.commit() await db.refresh(domain) return domain @router.get("/{domain_id}/history") async def get_domain_history( domain_id: int, current_user: CurrentUser, db: Database, limit: int = Query(30, ge=1, le=365), ): """Get check history for a domain (Professional and Enterprise plans).""" # Verify domain ownership result = await db.execute( select(Domain).where( Domain.id == domain_id, Domain.user_id == current_user.id, ) ) domain = result.scalar_one_or_none() if not domain: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Domain not found", ) # Check subscription for history access await db.refresh(current_user, ["subscription"]) if current_user.subscription: history_days = current_user.subscription.history_days if history_days == 0: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Check history requires Professional or Enterprise plan", ) # Limit based on plan (-1 means unlimited) if history_days > 0: limit = min(limit, history_days) else: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Check history requires Professional or Enterprise plan", ) # Get check history history_query = ( select(DomainCheck) .where(DomainCheck.domain_id == domain_id) .order_by(DomainCheck.checked_at.desc()) .limit(limit) ) history_result = await db.execute(history_query) checks = history_result.scalars().all() return { "domain": domain.name, "total_checks": len(checks), "history": [ { "id": check.id, "status": check.status.value if hasattr(check.status, 'value') else check.status, "is_available": check.is_available, "checked_at": check.checked_at.isoformat(), } for check in checks ] } @router.get("/{domain_id}/health") async def get_domain_health( domain_id: int, current_user: CurrentUser, db: Database, refresh: bool = Query(False, description="Force a live health check instead of using cache"), ): """ Get comprehensive health report for a domain. Checks 4 layers: - DNS: Nameservers, MX records, A records - HTTP: Website availability, parking detection - SSL: Certificate validity and expiration - Status signals and recommendations Returns: Health report with score (0-100) and status """ # Get domain result = await db.execute( select(Domain).where( Domain.id == domain_id, Domain.user_id == current_user.id, ) ) domain = result.scalar_one_or_none() if not domain: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Domain not found", ) # Prefer cached report for UI performance if not refresh: cache_result = await db.execute( select(DomainHealthCache).where(DomainHealthCache.domain_id == domain.id) ) cache = cache_result.scalar_one_or_none() if cache is not None: return _health_cache_to_report(domain, cache) # Live health check (slow) + update cache health_checker = get_health_checker() report = await health_checker.check_domain(domain.name) report_dict = report.to_dict() signals_json = json.dumps(report_dict.get("signals") or []) dns_json = json.dumps(report_dict.get("dns") or {}) http_json = json.dumps(report_dict.get("http") or {}) ssl_json = json.dumps(report_dict.get("ssl") or {}) cache_result = await db.execute( select(DomainHealthCache).where(DomainHealthCache.domain_id == domain.id) ) cache = cache_result.scalar_one_or_none() if cache is None: cache = DomainHealthCache(domain_id=domain.id) db.add(cache) cache.status = report_dict.get("status") or "unknown" cache.score = int(report_dict.get("score") or 0) cache.signals = signals_json cache.dns_data = dns_json cache.http_data = http_json cache.ssl_data = ssl_json cache.checked_at = datetime.utcnow() await db.commit() return report_dict @router.post("/health-check") async def quick_health_check( current_user: CurrentUser, domain: str = Query(..., description="Domain to check"), ): """ Quick health check for any domain (doesn't need to be in watchlist). Premium feature - checks DNS, HTTP, and SSL layers. """ # Run health check health_checker = get_health_checker() report = await health_checker.check_domain(domain) return report.to_dict()