Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Fix brandables API NameError, switch Trend Surfer to a working Google Trends RSS endpoint, and harden the HUNT UI against failed requests. Also add sharp for Next.js standalone image optimization and remove PostHog script crossOrigin to reduce CORS breakage.
248 lines
8.0 KiB
Python
248 lines
8.0 KiB
Python
"""HUNT (Discovery) endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request
|
|
from slowapi import Limiter
|
|
from slowapi.util import get_remote_address
|
|
from sqlalchemy import and_, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.api.deps import get_current_user
|
|
from app.database import get_db
|
|
from app.models.auction import DomainAuction
|
|
from app.models.user import User
|
|
from app.schemas.hunt import (
|
|
BrandableRequest,
|
|
BrandableCandidate,
|
|
BrandableResponse,
|
|
HuntSniperItem,
|
|
HuntSniperResponse,
|
|
KeywordAvailabilityRequest,
|
|
KeywordAvailabilityResponse,
|
|
KeywordAvailabilityRow,
|
|
TrendsResponse,
|
|
TrendItem,
|
|
TypoCheckRequest,
|
|
TypoCheckResponse,
|
|
TypoCandidate,
|
|
)
|
|
from app.services.domain_checker import domain_checker
|
|
from app.services.hunt.brandables import check_domains, generate_cvcvc, generate_cvccv, generate_human
|
|
from app.services.hunt.trends import fetch_google_trends_daily_rss
|
|
from app.services.hunt.typos import generate_typos
|
|
|
|
router = APIRouter()
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
|
|
def _utcnow() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
@router.get("/bargain-bin", response_model=HuntSniperResponse)
|
|
@limiter.limit("60/minute")
|
|
async def bargain_bin(
|
|
request: Request,
|
|
_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
limit: int = Query(100, ge=1, le=500),
|
|
):
|
|
"""
|
|
Closeout Sniper (Chris logic):
|
|
price < $10 AND age_years >= 5 AND backlinks > 0
|
|
|
|
Uses ONLY real scraped auction data (DomainAuction.age_years/backlinks).
|
|
Items without required fields are excluded.
|
|
"""
|
|
now = _utcnow().replace(tzinfo=None)
|
|
base = and_(DomainAuction.is_active == True, DomainAuction.end_time > now) # noqa: E712
|
|
|
|
rows = (
|
|
await db.execute(
|
|
select(DomainAuction)
|
|
.where(base)
|
|
.where(DomainAuction.current_bid < 10)
|
|
.order_by(DomainAuction.end_time.asc())
|
|
.limit(limit * 3) # allow filtering
|
|
)
|
|
).scalars().all()
|
|
|
|
filtered_out = 0
|
|
items: list[HuntSniperItem] = []
|
|
for a in rows:
|
|
if a.age_years is None or a.backlinks is None:
|
|
filtered_out += 1
|
|
continue
|
|
if int(a.age_years) < 5 or int(a.backlinks) <= 0:
|
|
continue
|
|
items.append(
|
|
HuntSniperItem(
|
|
domain=a.domain,
|
|
platform=a.platform,
|
|
auction_url=a.auction_url,
|
|
current_bid=float(a.current_bid),
|
|
currency=a.currency,
|
|
end_time=a.end_time.replace(tzinfo=timezone.utc) if a.end_time and a.end_time.tzinfo is None else a.end_time,
|
|
age_years=int(a.age_years) if a.age_years is not None else None,
|
|
backlinks=int(a.backlinks) if a.backlinks is not None else None,
|
|
pounce_score=int(a.pounce_score) if a.pounce_score is not None else None,
|
|
)
|
|
)
|
|
if len(items) >= limit:
|
|
break
|
|
|
|
last_updated = (
|
|
await db.execute(select(func.max(DomainAuction.updated_at)).where(DomainAuction.is_active == True)) # noqa: E712
|
|
).scalar()
|
|
|
|
return HuntSniperResponse(
|
|
items=items,
|
|
total=len(items),
|
|
filtered_out_missing_data=int(filtered_out),
|
|
last_updated=last_updated.replace(tzinfo=timezone.utc) if last_updated and last_updated.tzinfo is None else last_updated,
|
|
)
|
|
|
|
|
|
@router.get("/trends", response_model=TrendsResponse)
|
|
@limiter.limit("30/minute")
|
|
async def trends(
|
|
request: Request,
|
|
_user: User = Depends(get_current_user),
|
|
geo: str = Query("US", min_length=2, max_length=2),
|
|
):
|
|
try:
|
|
items_raw = await fetch_google_trends_daily_rss(geo=geo)
|
|
except Exception:
|
|
# Don't 500 the whole UI when the public feed is temporarily unavailable.
|
|
raise HTTPException(status_code=502, detail="Google Trends feed unavailable")
|
|
items = [
|
|
TrendItem(
|
|
title=i["title"],
|
|
approx_traffic=i.get("approx_traffic"),
|
|
published_at=i.get("published_at"),
|
|
link=i.get("link"),
|
|
)
|
|
for i in items_raw[:50]
|
|
]
|
|
return TrendsResponse(geo=geo.upper(), items=items, fetched_at=_utcnow())
|
|
|
|
|
|
@router.post("/keywords", response_model=KeywordAvailabilityResponse)
|
|
@limiter.limit("30/minute")
|
|
async def keyword_availability(
|
|
request: Request,
|
|
payload: KeywordAvailabilityRequest,
|
|
_user: User = Depends(get_current_user),
|
|
):
|
|
# Normalize + cap work for UX/perf
|
|
keywords = []
|
|
for kw in payload.keywords[:25]:
|
|
k = kw.strip().lower().replace(" ", "")
|
|
if k:
|
|
keywords.append(kw)
|
|
|
|
tlds = [t.lower().lstrip(".") for t in payload.tlds[:20] if t.strip()]
|
|
if not tlds:
|
|
tlds = ["com"]
|
|
|
|
# Build candidate domains
|
|
candidates: list[tuple[str, str, str]] = []
|
|
domain_list: list[str] = []
|
|
for kw in keywords:
|
|
k = kw.strip().lower().replace(" ", "")
|
|
if not k:
|
|
continue
|
|
for t in tlds:
|
|
d = f"{k}.{t}"
|
|
candidates.append((kw, t, d))
|
|
domain_list.append(d)
|
|
|
|
checked = await check_domains(domain_list, concurrency=40)
|
|
by_domain = {c.domain: c for c in checked}
|
|
|
|
rows: list[KeywordAvailabilityRow] = []
|
|
for kw, t, d in candidates:
|
|
c = by_domain.get(d)
|
|
if not c:
|
|
rows.append(KeywordAvailabilityRow(keyword=kw, domain=d, tld=t, is_available=None, status="unknown"))
|
|
else:
|
|
rows.append(KeywordAvailabilityRow(keyword=kw, domain=d, tld=t, is_available=c.is_available, status=c.status))
|
|
return KeywordAvailabilityResponse(items=rows)
|
|
|
|
|
|
@router.post("/typos", response_model=TypoCheckResponse)
|
|
@limiter.limit("20/minute")
|
|
async def typo_check(
|
|
request: Request,
|
|
payload: TypoCheckRequest,
|
|
_user: User = Depends(get_current_user),
|
|
):
|
|
brand = payload.brand.strip()
|
|
typos = generate_typos(brand, limit=min(int(payload.limit) * 4, 400))
|
|
|
|
# Build domain list (dedup)
|
|
tlds = [t.lower().lstrip(".") for t in payload.tlds if t.strip()]
|
|
candidates: list[str] = []
|
|
seen = set()
|
|
for typo in typos:
|
|
for t in tlds:
|
|
d = f"{typo}.{t}"
|
|
if d not in seen:
|
|
candidates.append(d)
|
|
seen.add(d)
|
|
if len(candidates) >= payload.limit * 4:
|
|
break
|
|
if len(candidates) >= payload.limit * 4:
|
|
break
|
|
|
|
checked = await check_domains(candidates, concurrency=30)
|
|
available = [c for c in checked if c.status == "available"]
|
|
items = [TypoCandidate(domain=c.domain, is_available=c.is_available, status=c.status) for c in available[: payload.limit]]
|
|
return TypoCheckResponse(brand=brand, items=items)
|
|
|
|
|
|
@router.post("/brandables", response_model=BrandableResponse)
|
|
@limiter.limit("15/minute")
|
|
async def brandables(
|
|
request: Request,
|
|
payload: BrandableRequest,
|
|
_user: User = Depends(get_current_user),
|
|
):
|
|
pattern = payload.pattern.strip().lower()
|
|
if pattern not in ("cvcvc", "cvccv", "human"):
|
|
pattern = "cvcvc"
|
|
|
|
tlds = [t.lower().lstrip(".") for t in payload.tlds if t.strip()]
|
|
if not tlds:
|
|
tlds = ["com"]
|
|
|
|
# Generate + check up to max_checks; return only available
|
|
candidates: list[str] = []
|
|
for _ in range(int(payload.max_checks)):
|
|
if pattern == "cvcvc":
|
|
sld = generate_cvcvc()
|
|
elif pattern == "cvccv":
|
|
sld = generate_cvccv()
|
|
else:
|
|
sld = generate_human()
|
|
for t in tlds:
|
|
candidates.append(f"{sld}.{t}")
|
|
|
|
checked = await check_domains(candidates, concurrency=40)
|
|
available = [c for c in checked if c.status == "available"]
|
|
# De-dup by domain
|
|
seen = set()
|
|
out = []
|
|
for c in available:
|
|
if c.domain not in seen:
|
|
seen.add(c.domain)
|
|
out.append(BrandableCandidate(domain=c.domain, is_available=c.is_available, status=c.status))
|
|
if len(out) >= payload.limit:
|
|
break
|
|
|
|
return BrandableResponse(pattern=payload.pattern, items=out)
|
|
|