""" Analyze service orchestrator (Alpha Terminal). Implements the plan: - Quadrants: authority | market | risk | value - Analyzer registry (plugin-like) - Open-data-first (null + reason) """ from __future__ import annotations import json from datetime import datetime, timedelta, timezone from fastapi import HTTPException, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.domain_analysis_cache import DomainAnalysisCache from app.schemas.analyze import AnalyzeResponse, AnalyzeSection from app.services.analyze.base import AnalyzeContext from app.services.analyze.registry import get_default_analyzers from app.services.domain_checker import domain_checker from app.services.domain_health import get_health_checker DEFAULT_CACHE_TTL_SECONDS = 60 * 10 # conservative fallback (10m) def _utcnow() -> datetime: return datetime.now(timezone.utc) def _is_cache_valid(row: DomainAnalysisCache) -> bool: ttl = int(row.ttl_seconds or 0) if ttl <= 0: return False computed = row.computed_at if computed is None: return False if computed.tzinfo is None: # stored as naive UTC typically computed = computed.replace(tzinfo=timezone.utc) return computed + timedelta(seconds=ttl) > _utcnow() async def get_domain_analysis( db: AsyncSession, domain: str, *, fast: bool = False, refresh: bool = False, ) -> AnalyzeResponse: is_valid, error = domain_checker.validate_domain(domain) if not is_valid: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error) norm = domain_checker._normalize_domain(domain) # internal normalize # Cache lookup if not refresh: row = ( await db.execute(select(DomainAnalysisCache).where(DomainAnalysisCache.domain == norm)) ).scalar_one_or_none() if row and _is_cache_valid(row): payload = json.loads(row.payload_json) payload["cached"] = True return AnalyzeResponse.model_validate(payload) computed_at = _utcnow() # Core domain facts via RDAP/DNS/WHOIS (shared input for analyzers) check = await domain_checker.check_domain(norm, quick=False) # Health is expensive; compute once only when needed health = None if not fast: health = await get_health_checker().check_domain(norm) ctx = AnalyzeContext(db=db, domain=norm, computed_at=computed_at, fast=fast, check=check, health=health) analyzers = get_default_analyzers() ttl = DEFAULT_CACHE_TTL_SECONDS # Quadrants per plan (stable ordering) quadrants: dict[str, AnalyzeSection] = { "authority": AnalyzeSection(key="authority", title="Authority", items=[]), "market": AnalyzeSection(key="market", title="Market", items=[]), "risk": AnalyzeSection(key="risk", title="Risk", items=[]), "value": AnalyzeSection(key="value", title="Value", items=[]), } for a in analyzers: ttl = min(ttl, int(getattr(a, "ttl_seconds", DEFAULT_CACHE_TTL_SECONDS) or DEFAULT_CACHE_TTL_SECONDS)) contributions = await a.analyze(ctx) for c in contributions: if c.quadrant in quadrants: quadrants[c.quadrant].items.extend(c.items) resp = AnalyzeResponse( domain=norm, computed_at=computed_at, cached=False, sections=[quadrants["authority"], quadrants["market"], quadrants["risk"], quadrants["value"]], ) # Upsert cache (best-effort) payload = resp.model_dump(mode="json") payload_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False) existing = ( await db.execute(select(DomainAnalysisCache).where(DomainAnalysisCache.domain == norm)) ).scalar_one_or_none() if existing: existing.payload_json = payload_json existing.computed_at = computed_at.replace(tzinfo=None) existing.ttl_seconds = int(ttl or DEFAULT_CACHE_TTL_SECONDS) else: db.add( DomainAnalysisCache( domain=norm, payload_json=payload_json, computed_at=computed_at.replace(tzinfo=None), ttl_seconds=int(ttl or DEFAULT_CACHE_TTL_SECONDS), ) ) return resp