from __future__ import annotations import json from dataclasses import dataclass from datetime import datetime from decimal import Decimal from typing import Any, Awaitable, Callable, Optional from sqlalchemy import and_, case, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.api.tld_prices import get_trending_tlds from app.models.auction import DomainAuction from app.models.domain import Domain from app.models.listing import DomainListing, ListingStatus from app.models.listing import ListingInquiry, ListingInquiryMessage from app.models.portfolio import PortfolioDomain from app.models.sniper_alert import SniperAlert from app.models.subscription import Subscription, SubscriptionTier from app.models.user import User from app.models.yield_domain import YieldDomain, YieldTransaction from app.services.analyze.service import get_domain_analysis from app.services.domain_checker import domain_checker from app.services.hunt.brandables import check_domains, generate_cvcvc, generate_cvccv, generate_human from app.services.hunt.trends import fetch_google_trends_daily_rss from app.services.hunt.typos import generate_typos from app.services.zone_file import get_dropped_domains ToolHandler = Callable[[AsyncSession, User, dict[str, Any]], Awaitable[dict[str, Any]]] @dataclass(frozen=True) class ToolDef: name: str description: str json_schema: dict[str, Any] min_tier: SubscriptionTier = SubscriptionTier.TRADER handler: ToolHandler | None = None def _tier_level(tier: SubscriptionTier) -> int: if tier == SubscriptionTier.TYCOON: return 3 if tier == SubscriptionTier.TRADER: return 2 return 1 async def _get_subscription(db: AsyncSession, user: User) -> Subscription | None: res = await db.execute(select(Subscription).where(Subscription.user_id == user.id)) return res.scalar_one_or_none() def _require_tier(user_tier: SubscriptionTier, tool_tier: SubscriptionTier) -> None: if _tier_level(user_tier) < _tier_level(tool_tier): raise PermissionError(f"Tool requires {tool_tier.value} tier.") def _clamp_int(value: Any, *, lo: int, hi: int, default: int) -> int: try: v = int(value) except Exception: return default return max(lo, min(hi, v)) def _clamp_float(value: Any, *, lo: float, hi: float, default: float) -> float: try: v = float(value) except Exception: return default return max(lo, min(hi, v)) # ============================================================================ # TOOL IMPLEMENTATIONS (READ-ONLY) # ============================================================================ async def tool_get_subscription(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: sub = await _get_subscription(db, user) if not sub: return {"tier": "scout", "features": {}, "limits": {}} cfg = sub.config return { "tier": sub.tier.value, "tier_name": cfg.get("name"), "features": cfg.get("features", {}), "limits": { "watchlist": sub.domain_limit, "portfolio": cfg.get("portfolio_limit"), "listings": cfg.get("listing_limit"), "sniper": cfg.get("sniper_limit"), "history_days": cfg.get("history_days"), "check_frequency": cfg.get("check_frequency"), }, } async def tool_get_dashboard_summary(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: # Similar to /dashboard/summary but kept lightweight and tool-friendly. now = args.get("now") # ignored (server time used) _ = now from datetime import datetime, timedelta t = datetime.utcnow() active = and_(DomainAuction.is_active == True, DomainAuction.end_time > t) total_auctions = (await db.execute(select(func.count(DomainAuction.id)).where(active))).scalar() or 0 cutoff = t + timedelta(hours=24) ending = and_(DomainAuction.is_active == True, DomainAuction.end_time > t, DomainAuction.end_time <= cutoff) ending_soon_count = (await db.execute(select(func.count(DomainAuction.id)).where(ending))).scalar() or 0 ending_rows = ( await db.execute(select(DomainAuction).where(ending).order_by(DomainAuction.end_time.asc()).limit(10)) ).scalars().all() # Listings counts listing_counts = ( await db.execute( select(DomainListing.status, func.count(DomainListing.id)) .where(DomainListing.user_id == user.id) .group_by(DomainListing.status) ) ).all() by_status = {str(status): int(count) for status, count in listing_counts} trending = await get_trending_tlds(db) return { "market": { "total_auctions": total_auctions, "ending_soon_24h": ending_soon_count, "ending_soon_preview": [ { "domain": a.domain, "current_bid": a.current_bid, "platform": a.platform, "end_time": a.end_time.isoformat() if a.end_time else None, "auction_url": a.auction_url, } for a in ending_rows ], }, "listings": { "active": by_status.get(ListingStatus.ACTIVE.value, 0), "sold": by_status.get(ListingStatus.SOLD.value, 0), "draft": by_status.get(ListingStatus.DRAFT.value, 0), "total": sum(by_status.values()), }, "tlds": trending, "timestamp": t.isoformat(), } async def tool_hunt_trends(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: geo = (args.get("geo") or "US").strip().upper() if len(geo) != 2: geo = "US" items_raw = await fetch_google_trends_daily_rss(geo=geo) return { "geo": geo, "items": [ { "title": i["title"], "approx_traffic": i.get("approx_traffic"), "published_at": i.get("published_at"), "link": i.get("link"), } for i in items_raw[:50] ], "timestamp": datetime.utcnow().isoformat(), } async def tool_hunt_brandables(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: pattern = (args.get("pattern") or "cvcvc").strip().lower() if pattern not in ("cvcvc", "cvccv", "human"): pattern = "cvcvc" tlds = args.get("tlds") or ["com"] if not isinstance(tlds, list): tlds = ["com"] tlds = [str(t).lower().lstrip(".") for t in tlds if str(t).strip()] if not tlds: tlds = ["com"] max_checks = _clamp_int(args.get("max_checks"), lo=10, hi=80, default=40) limit = _clamp_int(args.get("limit"), lo=1, hi=25, default=15) candidates: list[str] = [] for _ in range(max_checks): if pattern == "cvcvc": sld = generate_cvcvc() elif pattern == "cvccv": sld = generate_cvccv() else: sld = generate_human() for t in tlds: candidates.append(f"{sld}.{t}") checked = await check_domains(candidates, concurrency=30) available = [c for c in checked if c.status == "available"] seen = set() out = [] for c in available: if c.domain in seen: continue seen.add(c.domain) out.append({"domain": c.domain, "status": c.status, "is_available": bool(c.is_available)}) if len(out) >= limit: break return {"pattern": pattern, "tlds": tlds, "items": out, "timestamp": datetime.utcnow().isoformat()} async def tool_hunt_typos(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: brand = (args.get("brand") or "").strip() if not brand or len(brand) < 2: return {"error": "Missing brand"} limit = _clamp_int(args.get("limit"), lo=1, hi=30, default=15) tlds = args.get("tlds") or ["com"] if not isinstance(tlds, list): tlds = ["com"] tlds = [str(t).lower().lstrip(".") for t in tlds if str(t).strip()] if not tlds: tlds = ["com"] # generate_typos returns SLD strings; we format domains for UX slugs = generate_typos(brand, limit=limit) items = [] for sld in slugs: for t in tlds[:3]: items.append(f"{sld}.{t}") if len(items) >= limit: break if len(items) >= limit: break return {"brand": brand, "items": items, "timestamp": datetime.utcnow().isoformat()} async def tool_keyword_availability(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: keyword = (args.get("keyword") or "").strip().lower() if not keyword: return {"error": "Missing keyword"} tlds = args.get("tlds") or ["com"] if not isinstance(tlds, list): tlds = ["com"] tlds = [str(t).lower().lstrip(".") for t in tlds if str(t).strip()] if not tlds: tlds = ["com"] limit = _clamp_int(args.get("limit"), lo=1, hi=25, default=10) candidates = [f"{keyword}.{t}" for t in tlds][:limit] results = [] for d in candidates: r = await domain_checker.check_domain(d) results.append({"domain": d, "status": r.status, "is_available": bool(getattr(r, "is_available", False))}) return {"keyword": keyword, "items": results, "timestamp": datetime.utcnow().isoformat()} async def tool_portfolio_summary(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: rows = (await db.execute(select(PortfolioDomain).where(PortfolioDomain.user_id == user.id))).scalars().all() total = len(rows) active = sum(1 for d in rows if not d.is_sold and (d.status or "active") != "expired") sold = sum(1 for d in rows if bool(d.is_sold)) total_cost = float(sum(Decimal(str(d.purchase_price or 0)) for d in rows)) total_value = float(sum(Decimal(str(d.estimated_value or 0)) for d in rows if not d.is_sold)) roi = None if total_cost > 0: roi = round(((total_value - total_cost) / total_cost) * 100, 2) return { "total_domains": total, "active_domains": active, "sold_domains": sold, "total_cost": round(total_cost, 2), "total_estimated_value": round(total_value, 2), "overall_roi_percent": roi, "timestamp": datetime.utcnow().isoformat(), } async def tool_list_portfolio(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: limit = _clamp_int(args.get("limit"), lo=1, hi=50, default=25) status = (args.get("status") or "").strip().lower() or None q = select(PortfolioDomain).where(PortfolioDomain.user_id == user.id) if status: q = q.where(PortfolioDomain.status == status) q = q.order_by(PortfolioDomain.created_at.desc()).limit(limit) rows = (await db.execute(q)).scalars().all() return { "items": [ { "id": d.id, "domain": d.domain, "status": d.status, "purchase_price": d.purchase_price, "estimated_value": d.estimated_value, "roi": d.roi, "renewal_date": d.renewal_date.isoformat() if d.renewal_date else None, "is_dns_verified": bool(getattr(d, "is_dns_verified", False) or False), } for d in rows ], "count": len(rows), "timestamp": datetime.utcnow().isoformat(), } async def tool_list_sniper_alerts(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: limit = _clamp_int(args.get("limit"), lo=1, hi=50, default=25) rows = ( await db.execute( select(SniperAlert) .where(SniperAlert.user_id == user.id) .order_by(SniperAlert.created_at.desc()) .limit(limit) ) ).scalars().all() return { "items": [ { "id": a.id, "name": a.name, "description": a.description, "is_active": bool(a.is_active), "tlds": a.tlds, "keywords": a.keywords, "exclude_keywords": a.exclude_keywords, "max_price": a.max_price, "ending_within_hours": a.ending_within_hours, "notify_email": bool(a.notify_email), "notify_sms": bool(a.notify_sms), "created_at": a.created_at.isoformat() if a.created_at else None, } for a in rows ], "count": len(rows), "timestamp": datetime.utcnow().isoformat(), } async def tool_list_my_listings(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: limit = _clamp_int(args.get("limit"), lo=1, hi=50, default=25) rows = ( await db.execute( select(DomainListing) .where(DomainListing.user_id == user.id) .order_by(DomainListing.updated_at.desc()) .limit(limit) ) ).scalars().all() return { "items": [ { "id": l.id, "domain": l.domain, "status": l.status, "asking_price": l.asking_price, "min_offer": l.min_offer, "currency": l.currency, "slug": l.slug, "verification_status": l.verification_status, "view_count": l.view_count, "inquiry_count": l.inquiry_count, "updated_at": l.updated_at.isoformat() if l.updated_at else None, } for l in rows ], "count": len(rows), "timestamp": datetime.utcnow().isoformat(), } async def tool_get_inbox_counts(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: buyer_inqs = (await db.execute(select(ListingInquiry.id).where(ListingInquiry.buyer_user_id == user.id))).scalars().all() buyer_unread = 0 for inq_id in list(buyer_inqs)[:200]: msg = ( await db.execute( select(ListingInquiryMessage) .where(ListingInquiryMessage.inquiry_id == inq_id) .order_by(ListingInquiryMessage.created_at.desc()) .limit(1) ) ).scalar_one_or_none() if msg and msg.sender_user_id != user.id: buyer_unread += 1 listing_ids = (await db.execute(select(DomainListing.id).where(DomainListing.user_id == user.id))).scalars().all() seller_unread = 0 if listing_ids: new_count = ( await db.execute( select(func.count(ListingInquiry.id)).where( and_(ListingInquiry.listing_id.in_(listing_ids), ListingInquiry.status == "new") ) ) ).scalar() or 0 seller_unread += int(new_count) inqs = ( await db.execute( select(ListingInquiry.id, ListingInquiry.status) .where( and_( ListingInquiry.listing_id.in_(listing_ids), ListingInquiry.status.notin_(["closed", "spam"]), ) ) .order_by(ListingInquiry.created_at.desc()) .limit(200) ) ).all() for inq_id, st in inqs: if st == "new": continue msg = ( await db.execute( select(ListingInquiryMessage) .where(ListingInquiryMessage.inquiry_id == inq_id) .order_by(ListingInquiryMessage.created_at.desc()) .limit(1) ) ).scalar_one_or_none() if msg and msg.sender_user_id != user.id: seller_unread += 1 return {"buyer_unread": buyer_unread, "seller_unread": seller_unread, "total_unread": buyer_unread + seller_unread} async def tool_get_seller_inbox(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: limit = _clamp_int(args.get("limit"), lo=1, hi=50, default=25) status_filter = (args.get("status") or "all").strip().lower() listings = (await db.execute(select(DomainListing).where(DomainListing.user_id == user.id))).scalars().all() listing_map = {l.id: l for l in listings} if not listing_map: return {"inquiries": [], "total": 0, "unread": 0} q = ( select(ListingInquiry) .where(ListingInquiry.listing_id.in_(list(listing_map.keys()))) .order_by(ListingInquiry.created_at.desc()) ) if status_filter and status_filter != "all": q = q.where(ListingInquiry.status == status_filter) inqs = (await db.execute(q.limit(limit))).scalars().all() unread = sum(1 for i in inqs if i.status == "new" or not i.read_at) items = [] for inq in inqs: listing = listing_map.get(inq.listing_id) if not listing: continue latest_msg = ( await db.execute( select(ListingInquiryMessage) .where(ListingInquiryMessage.inquiry_id == inq.id) .order_by(ListingInquiryMessage.created_at.desc()) .limit(1) ) ).scalar_one_or_none() items.append( { "id": inq.id, "domain": listing.domain, "listing_id": listing.id, "slug": listing.slug, "status": inq.status, "buyer_name": inq.name, "offer_amount": inq.offer_amount, "created_at": inq.created_at.isoformat() if inq.created_at else None, "has_unread_reply": bool(latest_msg and latest_msg.sender_user_id != user.id and inq.status not in ["closed", "spam"]), "last_message_preview": ( (latest_msg.body[:100] + "..." if len(latest_msg.body) > 100 else latest_msg.body) if latest_msg else ((inq.message or "")[:100]) ), "last_message_at": latest_msg.created_at.isoformat() if latest_msg and latest_msg.created_at else (inq.created_at.isoformat() if inq.created_at else None), } ) return {"inquiries": items, "total": len(items), "unread": unread, "timestamp": datetime.utcnow().isoformat()} async def tool_yield_dashboard(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: now = datetime.utcnow() month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) domains = ( await db.execute( select(YieldDomain) .where(YieldDomain.user_id == user.id) .order_by(YieldDomain.total_revenue.desc()) ) ).scalars().all() domain_ids = [d.id for d in domains] monthly_revenue = Decimal("0") monthly_clicks = 0 monthly_conversions = 0 if domain_ids: monthly_stats = ( await db.execute( select( func.coalesce( func.sum( case( (YieldTransaction.status.in_(["confirmed", "paid"]), YieldTransaction.net_amount), else_=0, ) ), 0, ).label("revenue"), func.sum(case((YieldTransaction.event_type == "click", 1), else_=0)).label("clicks"), func.sum( case( ( and_( YieldTransaction.event_type.in_(["lead", "sale"]), YieldTransaction.status.in_(["confirmed", "paid"]), ), 1, ), else_=0, ) ).label("conversions"), ).where(YieldTransaction.yield_domain_id.in_(domain_ids), YieldTransaction.created_at >= month_start) ) ).first() if monthly_stats: monthly_revenue = monthly_stats.revenue or Decimal("0") monthly_clicks = int(monthly_stats.clicks or 0) monthly_conversions = int(monthly_stats.conversions or 0) return { "stats": { "active_domains": sum(1 for d in domains if d.status == "active"), "total_domains": len(domains), "monthly_revenue": float(monthly_revenue), "monthly_clicks": monthly_clicks, "monthly_conversions": monthly_conversions, }, "top_domains": [ { "id": d.id, "domain": d.domain, "status": d.status, "dns_verified": bool(d.dns_verified), "total_revenue": float(d.total_revenue or 0), "total_clicks": int(d.total_clicks or 0), "total_conversions": int(d.total_conversions or 0), "detected_intent": d.detected_intent, } for d in domains[:10] ], "timestamp": now.isoformat(), } async def tool_list_watchlist(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: page = _clamp_int(args.get("page"), lo=1, hi=50, default=1) per_page = _clamp_int(args.get("per_page"), lo=1, hi=50, default=20) offset = (page - 1) * per_page total = (await db.execute(select(func.count(Domain.id)).where(Domain.user_id == user.id))).scalar() or 0 rows = ( await db.execute( select(Domain) .where(Domain.user_id == user.id) .order_by(Domain.created_at.desc()) .offset(offset) .limit(per_page) ) ).scalars().all() return { "page": page, "per_page": per_page, "total": int(total), "domains": [ { "id": d.id, "name": d.name, "status": getattr(d.status, "value", d.status), "is_available": bool(d.is_available), "registrar": d.registrar, "created_at": d.created_at.isoformat() if d.created_at else None, "updated_at": d.updated_at.isoformat() if d.updated_at else None, } for d in rows ], } async def tool_analyze_domain(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: domain = (args.get("domain") or "").strip() if not domain: return {"error": "Missing domain"} fast = bool(args.get("fast", False)) refresh = bool(args.get("refresh", False)) res = await get_domain_analysis(db, domain, fast=fast, refresh=refresh) return res.model_dump(mode="json") async def tool_market_feed(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: # Read-only query against DomainAuction similar to /auctions/feed; keep it capped. from datetime import datetime, timedelta limit = _clamp_int(args.get("limit"), lo=1, hi=50, default=20) source = (args.get("source") or "all").lower() keyword = (args.get("keyword") or "").strip().lower() or None tld = (args.get("tld") or "").strip().lower().lstrip(".") or None sort_by = (args.get("sort_by") or "time").lower() ending_within = args.get("ending_within_hours") ending_within_h = _clamp_int(ending_within, lo=1, hi=168, default=0) if ending_within is not None else None now = datetime.utcnow() q = select(DomainAuction).where(DomainAuction.is_active == True, DomainAuction.end_time > now) if source in ("pounce", "external"): q = q.where(DomainAuction.source == source) if keyword: q = q.where(DomainAuction.domain.ilike(f"%{keyword}%")) if tld: q = q.where(DomainAuction.domain.ilike(f"%.{tld}")) if ending_within_h: q = q.where(DomainAuction.end_time <= (now + timedelta(hours=ending_within_h))) if sort_by == "score": q = q.order_by(DomainAuction.score.desc().nullslast(), DomainAuction.end_time.asc()) else: q = q.order_by(DomainAuction.end_time.asc()) auctions = (await db.execute(q.limit(limit))).scalars().all() return { "items": [ { "domain": a.domain, "current_bid": a.current_bid, "platform": a.platform, "end_time": a.end_time.isoformat() if a.end_time else None, "bids": a.bids, "score": a.score, "auction_url": a.auction_url, "source": a.source, } for a in auctions ], "count": len(auctions), "timestamp": now.isoformat(), } async def tool_get_drops(db: AsyncSession, user: User, args: dict[str, Any]) -> dict[str, Any]: tld = (args.get("tld") or None) hours = _clamp_int(args.get("hours"), lo=1, hi=48, default=24) limit = _clamp_int(args.get("limit"), lo=1, hi=100, default=50) offset = _clamp_int(args.get("offset"), lo=0, hi=10000, default=0) keyword = (args.get("keyword") or None) min_length = args.get("min_length") max_length = args.get("max_length") exclude_numeric = bool(args.get("exclude_numeric", False)) exclude_hyphen = bool(args.get("exclude_hyphen", False)) result = await get_dropped_domains( db=db, tld=(tld.lower().lstrip(".") if isinstance(tld, str) and tld.strip() else None), hours=hours, min_length=int(min_length) if min_length is not None else None, max_length=int(max_length) if max_length is not None else None, exclude_numeric=exclude_numeric, exclude_hyphen=exclude_hyphen, keyword=(str(keyword).strip() if keyword else None), limit=limit, offset=offset, ) return result def get_tool_defs() -> list[ToolDef]: return [ ToolDef( name="get_subscription", description="Get current user's subscription tier, features and limits.", json_schema={"type": "object", "properties": {}, "additionalProperties": False}, min_tier=SubscriptionTier.TRADER, handler=tool_get_subscription, ), ToolDef( name="get_dashboard_summary", description="Get a compact snapshot: ending auctions, listing stats, trending TLDs.", json_schema={"type": "object", "properties": {}, "additionalProperties": False}, min_tier=SubscriptionTier.TRADER, handler=tool_get_dashboard_summary, ), ToolDef( name="hunt_trends", description="Get Google Trends daily RSS items (Hunt > Trends).", json_schema={ "type": "object", "properties": {"geo": {"type": "string", "minLength": 2, "maxLength": 2}}, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_hunt_trends, ), ToolDef( name="hunt_brandables", description="Generate brandable domains and check availability (Hunt > Forge).", json_schema={ "type": "object", "properties": { "pattern": {"type": "string", "enum": ["cvcvc", "cvccv", "human"]}, "tlds": {"type": "array", "items": {"type": "string"}}, "max_checks": {"type": "integer", "minimum": 10, "maximum": 80}, "limit": {"type": "integer", "minimum": 1, "maximum": 25}, }, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_hunt_brandables, ), ToolDef( name="hunt_typos", description="Generate typo candidates for a brand (Hunt > Typos).", json_schema={ "type": "object", "properties": { "brand": {"type": "string"}, "tlds": {"type": "array", "items": {"type": "string"}}, "limit": {"type": "integer", "minimum": 1, "maximum": 30}, }, "required": ["brand"], "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_hunt_typos, ), ToolDef( name="keyword_availability", description="Check availability for keyword + TLD list (Hunt > Search).", json_schema={ "type": "object", "properties": { "keyword": {"type": "string"}, "tlds": {"type": "array", "items": {"type": "string"}}, "limit": {"type": "integer", "minimum": 1, "maximum": 25}, }, "required": ["keyword"], "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_keyword_availability, ), ToolDef( name="list_watchlist", description="List user's watchlist domains (monitored domains).", json_schema={ "type": "object", "properties": { "page": {"type": "integer", "minimum": 1, "maximum": 50}, "per_page": {"type": "integer", "minimum": 1, "maximum": 50}, }, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_list_watchlist, ), ToolDef( name="portfolio_summary", description="Get portfolio summary (counts, total cost/value, ROI).", json_schema={"type": "object", "properties": {}, "additionalProperties": False}, min_tier=SubscriptionTier.TRADER, handler=tool_portfolio_summary, ), ToolDef( name="list_portfolio", description="List portfolio domains (owned domains).", json_schema={ "type": "object", "properties": { "status": {"type": "string"}, "limit": {"type": "integer", "minimum": 1, "maximum": 50}, }, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_list_portfolio, ), ToolDef( name="list_sniper_alerts", description="List sniper alerts (user-defined filters).", json_schema={ "type": "object", "properties": {"limit": {"type": "integer", "minimum": 1, "maximum": 50}}, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_list_sniper_alerts, ), ToolDef( name="list_my_listings", description="List seller's Pounce Direct listings (For Sale).", json_schema={ "type": "object", "properties": {"limit": {"type": "integer", "minimum": 1, "maximum": 50}}, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_list_my_listings, ), ToolDef( name="get_inbox_counts", description="Get unified buyer/seller unread counts for inbox badge.", json_schema={"type": "object", "properties": {}, "additionalProperties": False}, min_tier=SubscriptionTier.TRADER, handler=tool_get_inbox_counts, ), ToolDef( name="get_seller_inbox", description="Seller inbox threads across all listings (preview list).", json_schema={ "type": "object", "properties": { "status": {"type": "string", "enum": ["all", "new", "read", "replied", "closed", "spam"]}, "limit": {"type": "integer", "minimum": 1, "maximum": 50}, }, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_get_seller_inbox, ), ToolDef( name="analyze_domain", description="Run Pounce domain analysis (Authority/Market/Risk/Value) for a given domain.", json_schema={ "type": "object", "properties": { "domain": {"type": "string"}, "fast": {"type": "boolean"}, "refresh": {"type": "boolean"}, }, "required": ["domain"], "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_analyze_domain, ), ToolDef( name="market_feed", description="Get current auction feed (filters: source, keyword, tld, ending window).", json_schema={ "type": "object", "properties": { "source": {"type": "string", "enum": ["all", "pounce", "external"]}, "keyword": {"type": "string"}, "tld": {"type": "string"}, "sort_by": {"type": "string", "enum": ["time", "score"]}, "ending_within_hours": {"type": "integer", "minimum": 1, "maximum": 168}, "limit": {"type": "integer", "minimum": 1, "maximum": 50}, }, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_market_feed, ), ToolDef( name="get_drops", description="Get recently dropped domains from zone files (auth required).", json_schema={ "type": "object", "properties": { "tld": {"type": "string"}, "hours": {"type": "integer", "minimum": 1, "maximum": 48}, "min_length": {"type": "integer", "minimum": 1, "maximum": 63}, "max_length": {"type": "integer", "minimum": 1, "maximum": 63}, "exclude_numeric": {"type": "boolean"}, "exclude_hyphen": {"type": "boolean"}, "keyword": {"type": "string"}, "limit": {"type": "integer", "minimum": 1, "maximum": 100}, "offset": {"type": "integer", "minimum": 0, "maximum": 10000}, }, "additionalProperties": False, }, min_tier=SubscriptionTier.TRADER, handler=tool_get_drops, ), ToolDef( name="yield_dashboard", description="Get Yield dashboard stats and top earning domains.", json_schema={"type": "object", "properties": {}, "additionalProperties": False}, min_tier=SubscriptionTier.TRADER, handler=tool_yield_dashboard, ), ] def tools_for_path(path: str) -> list[str]: """ Limit the visible tool list depending on the current Terminal page. This keeps prompts smaller and makes the model more decisive. """ p = (path or "").split("?")[0] if p.startswith("/terminal/hunt"): return [ "get_subscription", "get_dashboard_summary", "market_feed", "get_drops", "hunt_trends", "hunt_brandables", "hunt_typos", "keyword_availability", "analyze_domain", "list_watchlist", ] if p.startswith("/terminal/market"): return ["get_subscription", "market_feed", "analyze_domain"] if p.startswith("/terminal/watchlist"): return ["get_subscription", "list_watchlist", "analyze_domain"] if p.startswith("/terminal/portfolio"): return ["get_subscription", "portfolio_summary", "list_portfolio", "analyze_domain"] if p.startswith("/terminal/sniper"): return ["get_subscription", "list_sniper_alerts", "market_feed", "analyze_domain"] if p.startswith("/terminal/listing"): return ["get_subscription", "list_my_listings", "get_seller_inbox"] if p.startswith("/terminal/inbox"): return ["get_subscription", "get_inbox_counts", "get_seller_inbox"] if p.startswith("/terminal/yield"): return ["get_subscription", "yield_dashboard"] if p.startswith("/terminal/intel"): return ["get_subscription", "analyze_domain", "market_feed", "get_drops"] if p.startswith("/terminal/settings"): return ["get_subscription"] if p.startswith("/terminal/welcome"): return ["get_subscription", "get_dashboard_summary"] # default: allow a safe minimal set return ["get_subscription", "get_dashboard_summary", "analyze_domain"] async def execute_tool(db: AsyncSession, user: User, name: str, args: dict[str, Any], *, path: str) -> dict[str, Any]: defs = {t.name: t for t in get_tool_defs()} tool = defs.get(name) if tool is None or tool.handler is None: return {"error": f"Unknown tool: {name}"} # Enforce tool allowed on this page allowed = set(tools_for_path(path)) if name not in allowed: return {"error": f"Tool not allowed for path: {name}"} sub = await _get_subscription(db, user) user_tier = sub.tier if sub else SubscriptionTier.SCOUT try: _require_tier(user_tier, tool.min_tier) except PermissionError as e: return {"error": str(e)} try: return await tool.handler(db, user, args or {}) except Exception as e: return {"error": f"{type(e).__name__}: {e}"} def tool_catalog_for_prompt(path: str) -> list[dict[str, Any]]: allowed = set(tools_for_path(path)) out: list[dict[str, Any]] = [] for t in get_tool_defs(): if t.name in allowed: out.append( { "name": t.name, "description": t.description, "schema": t.json_schema, } ) return out