""" Smart Pounce - Domain Auction Aggregator This module provides auction data from our database of scraped listings. Data is scraped from public auction platforms - NO APIS used. Data Sources (Web Scraping): - ExpiredDomains.net (aggregator) - GoDaddy Auctions (public listings) - Sedo (public search) - NameJet (public auctions) IMPORTANT: - All data comes from web scraping of public pages - No mock data - everything is real scraped data - Data is cached in PostgreSQL/SQLite for performance - Scraper runs on schedule (see scheduler.py) Legal Note (Switzerland): - No escrow/payment handling = no GwG/FINMA requirements - Users click through to external platforms - We only provide market intelligence """ import logging from datetime import datetime, timedelta from typing import Optional, List from fastapi import APIRouter, Depends, Query, HTTPException from pydantic import BaseModel from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.api.deps import get_current_user, get_current_user_optional from app.models.user import User from app.models.auction import DomainAuction, AuctionScrapeLog from app.services.valuation import valuation_service from app.services.auction_scraper import auction_scraper logger = logging.getLogger(__name__) router = APIRouter() # ============== Schemas ============== class AuctionValuation(BaseModel): """Valuation details for an auction.""" estimated_value: float value_ratio: float potential_profit: float confidence: str valuation_formula: str class AuctionListing(BaseModel): """A domain auction listing from the database.""" domain: str platform: str platform_url: str current_bid: float currency: str num_bids: int end_time: datetime time_remaining: str buy_now_price: Optional[float] = None reserve_met: Optional[bool] = None traffic: Optional[int] = None age_years: Optional[int] = None tld: str affiliate_url: str valuation: Optional[AuctionValuation] = None class Config: from_attributes = True class AuctionSearchResponse(BaseModel): """Response for auction search.""" auctions: List[AuctionListing] total: int platforms_searched: List[str] last_updated: datetime data_source: str = "scraped" valuation_note: str = ( "Values are estimated using our algorithm: " "$50 × Length × TLD × Keyword × Brand factors. " "See /portfolio/valuation/{domain} for detailed breakdown." ) class PlatformStats(BaseModel): """Statistics for an auction platform.""" platform: str active_auctions: int avg_bid: float ending_soon: int class ScrapeStatus(BaseModel): """Status of auction scraping.""" last_scrape: Optional[datetime] total_auctions: int platforms: List[str] next_scrape: Optional[datetime] # ============== Helper Functions ============== def _format_time_remaining(end_time: datetime) -> str: """Format time remaining in human-readable format.""" delta = end_time - datetime.utcnow() if delta.total_seconds() <= 0: return "Ended" hours = int(delta.total_seconds() // 3600) minutes = int((delta.total_seconds() % 3600) // 60) if hours > 24: days = hours // 24 return f"{days}d {hours % 24}h" elif hours > 0: return f"{hours}h {minutes}m" else: return f"{minutes}m" def _get_affiliate_url(platform: str, domain: str, auction_url: str) -> str: """Get affiliate URL for a platform.""" # Use the scraped auction URL directly if auction_url: return auction_url # Fallback to platform search platform_urls = { "GoDaddy": f"https://auctions.godaddy.com/trpItemListing.aspx?domain={domain}", "Sedo": f"https://sedo.com/search/?keyword={domain}", "NameJet": f"https://www.namejet.com/Pages/Auctions/BackorderSearch.aspx?q={domain}", "ExpiredDomains": f"https://www.expireddomains.net/domain-name-search/?q={domain}", "Afternic": f"https://www.afternic.com/search?k={domain}", } return platform_urls.get(platform, f"https://www.google.com/search?q={domain}+auction") async def _convert_to_listing( auction: DomainAuction, db: AsyncSession, include_valuation: bool = True ) -> AuctionListing: """Convert database auction to API response.""" valuation_data = None if include_valuation: try: result = await valuation_service.estimate_value(auction.domain, db, save_result=False) if "error" not in result: estimated_value = result["estimated_value"] value_ratio = round(estimated_value / auction.current_bid, 2) if auction.current_bid > 0 else 99 valuation_data = AuctionValuation( estimated_value=estimated_value, value_ratio=value_ratio, potential_profit=round(estimated_value - auction.current_bid, 2), confidence=result.get("confidence", "medium"), valuation_formula=result.get("calculation", {}).get("formula", "N/A"), ) except Exception as e: logger.error(f"Valuation error for {auction.domain}: {e}") return AuctionListing( domain=auction.domain, platform=auction.platform, platform_url=auction.auction_url or "", current_bid=auction.current_bid, currency=auction.currency, num_bids=auction.num_bids, end_time=auction.end_time, time_remaining=_format_time_remaining(auction.end_time), buy_now_price=auction.buy_now_price, reserve_met=auction.reserve_met, traffic=auction.traffic, age_years=auction.age_years, tld=auction.tld, affiliate_url=_get_affiliate_url(auction.platform, auction.domain, auction.auction_url), valuation=valuation_data, ) # ============== Endpoints ============== @router.get("", response_model=AuctionSearchResponse) async def search_auctions( keyword: Optional[str] = Query(None, description="Search keyword in domain names"), tld: Optional[str] = Query(None, description="Filter by TLD (e.g., 'com', 'io')"), platform: Optional[str] = Query(None, description="Filter by platform"), min_bid: Optional[float] = Query(None, ge=0, description="Minimum current bid"), max_bid: Optional[float] = Query(None, ge=0, description="Maximum current bid"), ending_soon: bool = Query(False, description="Only show auctions ending in < 1 hour"), sort_by: str = Query("ending", enum=["ending", "bid_asc", "bid_desc", "bids", "value_ratio"]), limit: int = Query(20, le=100), offset: int = Query(0, ge=0), current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db), ): """ Search domain auctions from our scraped database. All data comes from web scraping of public auction pages. NO mock data - everything is real scraped data. Data Sources: - ExpiredDomains.net (aggregator) - GoDaddy Auctions (coming soon) - Sedo (coming soon) - NameJet (coming soon) Smart Pounce Strategy: - Look for value_ratio > 1.0 (estimated value exceeds current bid) - Focus on auctions ending soon with low bid counts """ # Build query query = select(DomainAuction).where(DomainAuction.is_active == True) if keyword: query = query.where(DomainAuction.domain.ilike(f"%{keyword}%")) if tld: query = query.where(DomainAuction.tld == tld.lower().lstrip(".")) if platform: query = query.where(DomainAuction.platform == platform) if min_bid is not None: query = query.where(DomainAuction.current_bid >= min_bid) if max_bid is not None: query = query.where(DomainAuction.current_bid <= max_bid) if ending_soon: cutoff = datetime.utcnow() + timedelta(hours=1) query = query.where(DomainAuction.end_time <= cutoff) # Count total count_query = select(func.count()).select_from(query.subquery()) total_result = await db.execute(count_query) total = total_result.scalar() or 0 # Sort if sort_by == "ending": query = query.order_by(DomainAuction.end_time.asc()) elif sort_by == "bid_asc": query = query.order_by(DomainAuction.current_bid.asc()) elif sort_by == "bid_desc": query = query.order_by(DomainAuction.current_bid.desc()) elif sort_by == "bids": query = query.order_by(DomainAuction.num_bids.desc()) else: query = query.order_by(DomainAuction.end_time.asc()) # Pagination query = query.offset(offset).limit(limit) result = await db.execute(query) auctions = list(result.scalars().all()) # Convert to response with valuations listings = [] for auction in auctions: listing = await _convert_to_listing(auction, db, include_valuation=True) listings.append(listing) # Sort by value_ratio if requested (after valuation) if sort_by == "value_ratio": listings.sort( key=lambda x: x.valuation.value_ratio if x.valuation else 0, reverse=True ) # Get platforms searched platforms_result = await db.execute( select(DomainAuction.platform).distinct() ) platforms = [p for (p,) in platforms_result.all()] # Get last update time last_update_result = await db.execute( select(func.max(DomainAuction.updated_at)) ) last_updated = last_update_result.scalar() or datetime.utcnow() return AuctionSearchResponse( auctions=listings, total=total, platforms_searched=platforms or ["No data yet - scrape pending"], last_updated=last_updated, data_source="scraped from public auction sites", ) @router.get("/ending-soon", response_model=List[AuctionListing]) async def get_ending_soon( hours: int = Query(1, ge=1, le=24, description="Hours until end"), limit: int = Query(10, le=50), current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db), ): """ Get auctions ending soon - best opportunities for sniping. Data is scraped from public auction sites - no mock data. """ cutoff = datetime.utcnow() + timedelta(hours=hours) query = ( select(DomainAuction) .where( and_( DomainAuction.is_active == True, DomainAuction.end_time <= cutoff, DomainAuction.end_time > datetime.utcnow(), ) ) .order_by(DomainAuction.end_time.asc()) .limit(limit) ) result = await db.execute(query) auctions = list(result.scalars().all()) listings = [] for auction in auctions: listing = await _convert_to_listing(auction, db, include_valuation=True) listings.append(listing) return listings @router.get("/hot", response_model=List[AuctionListing]) async def get_hot_auctions( limit: int = Query(10, le=50), current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db), ): """ Get hottest auctions by bidding activity. Data is scraped from public auction sites - no mock data. """ query = ( select(DomainAuction) .where(DomainAuction.is_active == True) .order_by(DomainAuction.num_bids.desc()) .limit(limit) ) result = await db.execute(query) auctions = list(result.scalars().all()) listings = [] for auction in auctions: listing = await _convert_to_listing(auction, db, include_valuation=True) listings.append(listing) return listings @router.get("/stats", response_model=List[PlatformStats]) async def get_platform_stats( current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db), ): """ Get statistics for each auction platform. Data is scraped from public auction sites - no mock data. """ # Get stats per platform stats_query = ( select( DomainAuction.platform, func.count(DomainAuction.id).label("count"), func.avg(DomainAuction.current_bid).label("avg_bid"), ) .where(DomainAuction.is_active == True) .group_by(DomainAuction.platform) ) result = await db.execute(stats_query) platform_data = result.all() # Get ending soon counts cutoff = datetime.utcnow() + timedelta(hours=1) ending_query = ( select( DomainAuction.platform, func.count(DomainAuction.id).label("ending_count"), ) .where( and_( DomainAuction.is_active == True, DomainAuction.end_time <= cutoff, ) ) .group_by(DomainAuction.platform) ) ending_result = await db.execute(ending_query) ending_data = {p: c for p, c in ending_result.all()} stats = [] for platform, count, avg_bid in platform_data: stats.append(PlatformStats( platform=platform, active_auctions=count, avg_bid=round(avg_bid or 0, 2), ending_soon=ending_data.get(platform, 0), )) return sorted(stats, key=lambda x: x.active_auctions, reverse=True) @router.get("/scrape-status", response_model=ScrapeStatus) async def get_scrape_status( current_user: Optional[User] = Depends(get_current_user_optional), db: AsyncSession = Depends(get_db), ): """Get status of auction scraping.""" # Get last successful scrape last_scrape_query = ( select(AuctionScrapeLog) .where(AuctionScrapeLog.status == "success") .order_by(AuctionScrapeLog.completed_at.desc()) .limit(1) ) result = await db.execute(last_scrape_query) last_log = result.scalar_one_or_none() # Get total auctions total_query = select(func.count(DomainAuction.id)).where(DomainAuction.is_active == True) total_result = await db.execute(total_query) total = total_result.scalar() or 0 # Get platforms platforms_result = await db.execute( select(DomainAuction.platform).distinct() ) platforms = [p for (p,) in platforms_result.all()] return ScrapeStatus( last_scrape=last_log.completed_at if last_log else None, total_auctions=total, platforms=platforms or ["Pending initial scrape"], next_scrape=datetime.utcnow() + timedelta(hours=1), # Approximation ) @router.post("/trigger-scrape") async def trigger_scrape( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """ Manually trigger auction scraping (admin only for now). In production, this runs automatically every hour. """ try: result = await auction_scraper.scrape_all_platforms(db) return { "status": "success", "message": "Scraping completed", "result": result, } except Exception as e: logger.error(f"Manual scrape failed: {e}") raise HTTPException(status_code=500, detail=f"Scrape failed: {str(e)}") @router.get("/opportunities") async def get_smart_opportunities( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """ Smart Pounce Algorithm - Find the best auction opportunities. Analyzes scraped auction data (NO mock data) to find: - Auctions ending soon with low bids - Domains with high estimated value vs current bid Opportunity Score = value_ratio × time_factor × bid_factor """ # Get active auctions query = ( select(DomainAuction) .where(DomainAuction.is_active == True) .order_by(DomainAuction.end_time.asc()) .limit(50) ) result = await db.execute(query) auctions = list(result.scalars().all()) if not auctions: return { "opportunities": [], "message": "No active auctions. Trigger a scrape to fetch latest data.", "valuation_method": "Our algorithm calculates: $50 × Length × TLD × Keyword × Brand factors.", "strategy_tips": [ "🔄 Click 'Trigger Scrape' to fetch latest auction data", "🎯 Look for value_ratio > 1.0 (undervalued domains)", "⏰ Auctions ending soon often have best opportunities", ], "generated_at": datetime.utcnow().isoformat(), } opportunities = [] for auction in auctions: valuation = await valuation_service.estimate_value(auction.domain, db, save_result=False) if "error" in valuation: continue estimated_value = valuation["estimated_value"] current_bid = auction.current_bid value_ratio = estimated_value / current_bid if current_bid > 0 else 10 hours_left = (auction.end_time - datetime.utcnow()).total_seconds() / 3600 time_factor = 2.0 if hours_left < 1 else (1.5 if hours_left < 4 else 1.0) bid_factor = 1.5 if auction.num_bids < 10 else 1.0 opportunity_score = value_ratio * time_factor * bid_factor listing = await _convert_to_listing(auction, db, include_valuation=True) opportunities.append({ "auction": listing.model_dump(), "analysis": { "estimated_value": estimated_value, "current_bid": current_bid, "value_ratio": round(value_ratio, 2), "potential_profit": round(estimated_value - current_bid, 2), "opportunity_score": round(opportunity_score, 2), "time_factor": time_factor, "bid_factor": bid_factor, "recommendation": ( "Strong buy" if opportunity_score > 5 else "Consider" if opportunity_score > 2 else "Monitor" ), "reasoning": _get_opportunity_reasoning( value_ratio, hours_left, auction.num_bids, opportunity_score ), } }) opportunities.sort(key=lambda x: x["analysis"]["opportunity_score"], reverse=True) return { "opportunities": opportunities[:10], "data_source": "Real scraped auction data (no mock data)", "valuation_method": ( "Our algorithm calculates: $50 × Length × TLD × Keyword × Brand factors. " "See /portfolio/valuation/{domain} for detailed breakdown of any domain." ), "strategy_tips": [ "🎯 Focus on value_ratio > 1.0 (estimated value exceeds current bid)", "⏰ Auctions ending in < 1 hour often have best snipe opportunities", "📉 Low bid count (< 10) might indicate overlooked gems", "💡 Premium TLDs (.com, .ai, .io) have highest aftermarket demand", ], "generated_at": datetime.utcnow().isoformat(), } def _get_opportunity_reasoning(value_ratio: float, hours_left: float, num_bids: int, score: float) -> str: """Generate human-readable reasoning for the opportunity.""" reasons = [] if value_ratio > 2: reasons.append(f"Significantly undervalued ({value_ratio:.1f}× estimated value)") elif value_ratio > 1: reasons.append(f"Undervalued ({value_ratio:.1f}× estimated value)") else: reasons.append(f"Current bid exceeds our estimate ({value_ratio:.2f}×)") if hours_left < 1: reasons.append("⚡ Ending very soon - final chance to bid") elif hours_left < 4: reasons.append("⏰ Ending soon - limited time remaining") if num_bids < 5: reasons.append("📉 Very low competition - potential overlooked opportunity") elif num_bids < 10: reasons.append("📊 Moderate competition") else: reasons.append(f"🔥 High demand ({num_bids} bids)") return " | ".join(reasons)