pounce/backend/app/api/auctions.py
yves.gugger 35d943a372
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Premium overhaul based on review feedback
- Fix Command Center loading on mobile (add mobile sidebar menu)
- Rename 'Market' to 'Auctions' in navigation (clearer naming)
- Add Vanity Filter for public auctions (hide spam domains)
  - Premium TLDs only for public (.com, .io, .ai, etc.)
  - Max 15 chars, max 1 hyphen, max 2 digits
  - No random consonant strings
- Improve pricing page differentiation
  - Highlight 'Smart spam filter' for Trader
  - Show 'Curated list' vs 'Raw feed'
  - Add sublabels for key features
- Add background effects to Command Center
- Improve responsive design
2025-12-10 08:53:41 +01:00

714 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Smart Pounce - Domain Auction Aggregator
This module provides auction data from our database of scraped listings.
Data is scraped from public auction platforms - NO APIS used.
Data Sources (Web Scraping):
- ExpiredDomains.net (aggregator)
- GoDaddy Auctions (public listings)
- Sedo (public search)
- NameJet (public auctions)
IMPORTANT:
- All data comes from web scraping of public pages
- No mock data - everything is real scraped data
- Data is cached in PostgreSQL/SQLite for performance
- Scraper runs on schedule (see scheduler.py)
Legal Note (Switzerland):
- No escrow/payment handling = no GwG/FINMA requirements
- Users click through to external platforms
- We only provide market intelligence
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, List
from fastapi import APIRouter, Depends, Query, HTTPException
from pydantic import BaseModel
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user, get_current_user_optional
from app.models.user import User
from app.models.auction import DomainAuction, AuctionScrapeLog
from app.services.valuation import valuation_service
from app.services.auction_scraper import auction_scraper
logger = logging.getLogger(__name__)
router = APIRouter()
# ============== Schemas ==============
class AuctionValuation(BaseModel):
"""Valuation details for an auction."""
estimated_value: float
value_ratio: float
potential_profit: float
confidence: str
valuation_formula: str
class AuctionListing(BaseModel):
"""A domain auction listing from the database."""
domain: str
platform: str
platform_url: str
current_bid: float
currency: str
num_bids: int
end_time: datetime
time_remaining: str
buy_now_price: Optional[float] = None
reserve_met: Optional[bool] = None
traffic: Optional[int] = None
age_years: Optional[int] = None
tld: str
affiliate_url: str
valuation: Optional[AuctionValuation] = None
class Config:
from_attributes = True
class AuctionSearchResponse(BaseModel):
"""Response for auction search."""
auctions: List[AuctionListing]
total: int
platforms_searched: List[str]
last_updated: datetime
data_source: str = "scraped"
valuation_note: str = (
"Values are estimated using our algorithm: "
"$50 × Length × TLD × Keyword × Brand factors. "
"See /portfolio/valuation/{domain} for detailed breakdown."
)
class PlatformStats(BaseModel):
"""Statistics for an auction platform."""
platform: str
active_auctions: int
avg_bid: float
ending_soon: int
class ScrapeStatus(BaseModel):
"""Status of auction scraping."""
last_scrape: Optional[datetime]
total_auctions: int
platforms: List[str]
next_scrape: Optional[datetime]
# ============== Helper Functions ==============
def _format_time_remaining(end_time: datetime) -> str:
"""Format time remaining in human-readable format."""
delta = end_time - datetime.utcnow()
if delta.total_seconds() <= 0:
return "Ended"
hours = int(delta.total_seconds() // 3600)
minutes = int((delta.total_seconds() % 3600) // 60)
if hours > 24:
days = hours // 24
return f"{days}d {hours % 24}h"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def _get_affiliate_url(platform: str, domain: str, auction_url: str) -> str:
"""Get affiliate URL for a platform - links directly to the auction page."""
# Use the scraped auction URL directly if available
if auction_url and auction_url.startswith("http"):
return auction_url
# Fallback to platform-specific search/listing pages
platform_urls = {
"GoDaddy": f"https://auctions.godaddy.com/trpItemListing.aspx?domain={domain}",
"Sedo": f"https://sedo.com/search/?keyword={domain}",
"NameJet": f"https://www.namejet.com/Pages/Auctions/BackorderSearch.aspx?q={domain}",
"DropCatch": f"https://www.dropcatch.com/domain/{domain}",
"ExpiredDomains": f"https://www.expireddomains.net/domain-name-search/?q={domain}",
"Afternic": f"https://www.afternic.com/search?k={domain}",
"Dynadot": f"https://www.dynadot.com/market/auction/{domain}",
"Porkbun": f"https://porkbun.com/checkout/search?q={domain}",
}
return platform_urls.get(platform, f"https://www.google.com/search?q={domain}+domain+auction")
async def _convert_to_listing(
auction: DomainAuction,
db: AsyncSession,
include_valuation: bool = True
) -> AuctionListing:
"""Convert database auction to API response."""
valuation_data = None
if include_valuation:
try:
result = await valuation_service.estimate_value(auction.domain, db, save_result=False)
if "error" not in result:
estimated_value = result["estimated_value"]
value_ratio = round(estimated_value / auction.current_bid, 2) if auction.current_bid > 0 else 99
valuation_data = AuctionValuation(
estimated_value=estimated_value,
value_ratio=value_ratio,
potential_profit=round(estimated_value - auction.current_bid, 2),
confidence=result.get("confidence", "medium"),
valuation_formula=result.get("calculation", {}).get("formula", "N/A"),
)
except Exception as e:
logger.error(f"Valuation error for {auction.domain}: {e}")
return AuctionListing(
domain=auction.domain,
platform=auction.platform,
platform_url=auction.auction_url or "",
current_bid=auction.current_bid,
currency=auction.currency,
num_bids=auction.num_bids,
end_time=auction.end_time,
time_remaining=_format_time_remaining(auction.end_time),
buy_now_price=auction.buy_now_price,
reserve_met=auction.reserve_met,
traffic=auction.traffic,
age_years=auction.age_years,
tld=auction.tld,
affiliate_url=_get_affiliate_url(auction.platform, auction.domain, auction.auction_url),
valuation=valuation_data,
)
# ============== Endpoints ==============
@router.get("", response_model=AuctionSearchResponse)
async def search_auctions(
keyword: Optional[str] = Query(None, description="Search keyword in domain names"),
tld: Optional[str] = Query(None, description="Filter by TLD (e.g., 'com', 'io')"),
platform: Optional[str] = Query(None, description="Filter by platform"),
min_bid: Optional[float] = Query(None, ge=0, description="Minimum current bid"),
max_bid: Optional[float] = Query(None, ge=0, description="Maximum current bid"),
ending_soon: bool = Query(False, description="Only show auctions ending in < 1 hour"),
sort_by: str = Query("ending", enum=["ending", "bid_asc", "bid_desc", "bids", "value_ratio"]),
limit: int = Query(20, le=100),
offset: int = Query(0, ge=0),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Search domain auctions from our scraped database.
All data comes from web scraping of public auction pages.
NO mock data - everything is real scraped data.
Data Sources:
- ExpiredDomains.net (aggregator)
- GoDaddy Auctions (coming soon)
- Sedo (coming soon)
- NameJet (coming soon)
Smart Pounce Strategy:
- Look for value_ratio > 1.0 (estimated value exceeds current bid)
- Focus on auctions ending soon with low bid counts
"""
# Build query
query = select(DomainAuction).where(DomainAuction.is_active == True)
# VANITY FILTER: For public (non-logged-in) users, only show premium-looking domains
# This ensures the first impression is high-quality, not spam domains
if current_user is None:
# Premium TLDs only (no .cc, .website, .info spam clusters)
premium_tlds = ['com', 'io', 'ai', 'co', 'de', 'ch', 'net', 'org', 'app', 'dev', 'xyz']
query = query.where(DomainAuction.tld.in_(premium_tlds))
# No domains with more than 15 characters (excluding TLD)
# Note: We filter further in Python for complex rules
if keyword:
query = query.where(DomainAuction.domain.ilike(f"%{keyword}%"))
if tld:
query = query.where(DomainAuction.tld == tld.lower().lstrip("."))
if platform:
query = query.where(DomainAuction.platform == platform)
if min_bid is not None:
query = query.where(DomainAuction.current_bid >= min_bid)
if max_bid is not None:
query = query.where(DomainAuction.current_bid <= max_bid)
if ending_soon:
cutoff = datetime.utcnow() + timedelta(hours=1)
query = query.where(DomainAuction.end_time <= cutoff)
# Count total
count_query = select(func.count()).select_from(query.subquery())
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
# Sort
if sort_by == "ending":
query = query.order_by(DomainAuction.end_time.asc())
elif sort_by == "bid_asc":
query = query.order_by(DomainAuction.current_bid.asc())
elif sort_by == "bid_desc":
query = query.order_by(DomainAuction.current_bid.desc())
elif sort_by == "bids":
query = query.order_by(DomainAuction.num_bids.desc())
else:
query = query.order_by(DomainAuction.end_time.asc())
# Pagination
query = query.offset(offset).limit(limit)
result = await db.execute(query)
auctions = list(result.scalars().all())
# VANITY FILTER PART 2: Apply Python-side filtering for public users
# This ensures only premium-looking domains are shown to non-logged-in users
if current_user is None:
def is_premium_domain(domain_name: str) -> bool:
"""Check if a domain looks premium/professional"""
# Extract just the domain part (without TLD)
parts = domain_name.rsplit('.', 1)
name = parts[0] if parts else domain_name
# Rule 1: No more than 15 characters
if len(name) > 15:
return False
# Rule 2: No more than 1 hyphen
if name.count('-') > 1:
return False
# Rule 3: No more than 2 digits total
digit_count = sum(1 for c in name if c.isdigit())
if digit_count > 2:
return False
# Rule 4: Must be at least 3 characters
if len(name) < 3:
return False
# Rule 5: No random-looking strings (too many consonants in a row)
consonants = 'bcdfghjklmnpqrstvwxyz'
consonant_streak = 0
max_streak = 0
for c in name.lower():
if c in consonants:
consonant_streak += 1
max_streak = max(max_streak, consonant_streak)
else:
consonant_streak = 0
if max_streak > 4:
return False
return True
auctions = [a for a in auctions if is_premium_domain(a.domain)]
# Convert to response with valuations
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
# Sort by value_ratio if requested (after valuation)
if sort_by == "value_ratio":
listings.sort(
key=lambda x: x.valuation.value_ratio if x.valuation else 0,
reverse=True
)
# Get platforms searched
platforms_result = await db.execute(
select(DomainAuction.platform).distinct()
)
platforms = [p for (p,) in platforms_result.all()]
# Get last update time
last_update_result = await db.execute(
select(func.max(DomainAuction.updated_at))
)
last_updated = last_update_result.scalar() or datetime.utcnow()
return AuctionSearchResponse(
auctions=listings,
total=total,
platforms_searched=platforms or ["No data yet - scrape pending"],
last_updated=last_updated,
data_source="scraped from public auction sites",
)
@router.get("/ending-soon", response_model=List[AuctionListing])
async def get_ending_soon(
hours: int = Query(1, ge=1, le=24, description="Hours until end"),
limit: int = Query(10, le=50),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get auctions ending soon - best opportunities for sniping.
Data is scraped from public auction sites - no mock data.
"""
cutoff = datetime.utcnow() + timedelta(hours=hours)
query = (
select(DomainAuction)
.where(
and_(
DomainAuction.is_active == True,
DomainAuction.end_time <= cutoff,
DomainAuction.end_time > datetime.utcnow(),
)
)
.order_by(DomainAuction.end_time.asc())
.limit(limit)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
return listings
@router.get("/hot", response_model=List[AuctionListing])
async def get_hot_auctions(
limit: int = Query(10, le=50),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get hottest auctions by bidding activity.
Data is scraped from public auction sites - no mock data.
"""
query = (
select(DomainAuction)
.where(DomainAuction.is_active == True)
.order_by(DomainAuction.num_bids.desc())
.limit(limit)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
return listings
@router.get("/stats", response_model=List[PlatformStats])
async def get_platform_stats(
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get statistics for each auction platform.
Data is scraped from public auction sites - no mock data.
"""
# Get stats per platform
stats_query = (
select(
DomainAuction.platform,
func.count(DomainAuction.id).label("count"),
func.avg(DomainAuction.current_bid).label("avg_bid"),
)
.where(DomainAuction.is_active == True)
.group_by(DomainAuction.platform)
)
result = await db.execute(stats_query)
platform_data = result.all()
# Get ending soon counts
cutoff = datetime.utcnow() + timedelta(hours=1)
ending_query = (
select(
DomainAuction.platform,
func.count(DomainAuction.id).label("ending_count"),
)
.where(
and_(
DomainAuction.is_active == True,
DomainAuction.end_time <= cutoff,
)
)
.group_by(DomainAuction.platform)
)
ending_result = await db.execute(ending_query)
ending_data = {p: c for p, c in ending_result.all()}
stats = []
for platform, count, avg_bid in platform_data:
stats.append(PlatformStats(
platform=platform,
active_auctions=count,
avg_bid=round(avg_bid or 0, 2),
ending_soon=ending_data.get(platform, 0),
))
return sorted(stats, key=lambda x: x.active_auctions, reverse=True)
@router.get("/scrape-status", response_model=ScrapeStatus)
async def get_scrape_status(
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""Get status of auction scraping."""
# Get last successful scrape
last_scrape_query = (
select(AuctionScrapeLog)
.where(AuctionScrapeLog.status == "success")
.order_by(AuctionScrapeLog.completed_at.desc())
.limit(1)
)
result = await db.execute(last_scrape_query)
last_log = result.scalar_one_or_none()
# Get total auctions
total_query = select(func.count(DomainAuction.id)).where(DomainAuction.is_active == True)
total_result = await db.execute(total_query)
total = total_result.scalar() or 0
# Get platforms
platforms_result = await db.execute(
select(DomainAuction.platform).distinct()
)
platforms = [p for (p,) in platforms_result.all()]
return ScrapeStatus(
last_scrape=last_log.completed_at if last_log else None,
total_auctions=total,
platforms=platforms or ["Pending initial scrape"],
next_scrape=datetime.utcnow() + timedelta(hours=1), # Approximation
)
@router.post("/trigger-scrape")
async def trigger_scrape(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Manually trigger auction scraping (admin only for now).
In production, this runs automatically every hour.
"""
try:
result = await auction_scraper.scrape_all_platforms(db)
return {
"status": "success",
"message": "Scraping completed",
"result": result,
}
except Exception as e:
logger.error(f"Manual scrape failed: {e}")
raise HTTPException(status_code=500, detail=f"Scrape failed: {str(e)}")
@router.post("/seed")
async def seed_auctions(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Seed the database with realistic sample auction data.
Useful for development and demo purposes.
"""
try:
result = await auction_scraper.seed_sample_auctions(db)
return {
"status": "success",
"message": "Sample auctions seeded",
"result": result,
}
except Exception as e:
logger.error(f"Seeding failed: {e}")
raise HTTPException(status_code=500, detail=f"Seeding failed: {str(e)}")
@router.get("/opportunities")
async def get_smart_opportunities(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Smart Pounce Algorithm - Find the best auction opportunities.
Analyzes auction data to find sweet spots:
- Auctions ending soon (snipe potential)
- Low bid counts (overlooked gems)
- Good price points
Opportunity Score = time_urgency × competition_factor × price_factor
"""
# Get active auctions
query = (
select(DomainAuction)
.where(DomainAuction.is_active == True)
.order_by(DomainAuction.end_time.asc())
.limit(100)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
if not auctions:
return {
"opportunities": [],
"message": "No active auctions found.",
"strategy_tips": [
"🔄 Check back soon for new auctions",
"⏰ Best opportunities often appear as auctions near their end",
],
"generated_at": datetime.utcnow().isoformat(),
}
opportunities = []
for auction in auctions:
hours_left = (auction.end_time - datetime.utcnow()).total_seconds() / 3600
# Skip auctions that have ended or are too far out
if hours_left <= 0 or hours_left > 72:
continue
# Time urgency: Higher score for auctions ending soon
if hours_left < 1:
time_score = 5.0
urgency = "Ending in minutes!"
elif hours_left < 4:
time_score = 3.0
urgency = "Ending very soon"
elif hours_left < 12:
time_score = 2.0
urgency = "Ending today"
elif hours_left < 24:
time_score = 1.5
urgency = "Ending tomorrow"
else:
time_score = 1.0
urgency = "Active"
# Competition factor: Lower bids = better opportunity
if auction.num_bids < 3:
competition_score = 3.0
competition = "Almost no competition"
elif auction.num_bids < 10:
competition_score = 2.0
competition = "Low competition"
elif auction.num_bids < 20:
competition_score = 1.2
competition = "Moderate competition"
else:
competition_score = 0.8
competition = "High competition"
# Price factor: Reasonable price points are opportunities
if auction.current_bid < 100:
price_score = 2.0
price_range = "Budget-friendly"
elif auction.current_bid < 500:
price_score = 1.5
price_range = "Mid-range"
elif auction.current_bid < 2000:
price_score = 1.2
price_range = "Premium"
else:
price_score = 1.0
price_range = "High-value"
# Calculate overall opportunity score
opportunity_score = round(time_score * competition_score * price_score, 1)
# Only include if score is interesting (> 3)
if opportunity_score < 3:
continue
listing = await _convert_to_listing(auction, db, include_valuation=False)
recommendation = (
"🔥 Hot" if opportunity_score >= 10 else
"⚡ Great" if opportunity_score >= 6 else
"👀 Watch"
)
opportunities.append({
"auction": listing.model_dump(),
"analysis": {
"opportunity_score": opportunity_score,
"time_score": time_score,
"competition_score": competition_score,
"price_score": price_score,
"urgency": urgency,
"competition": competition,
"price_range": price_range,
"recommendation": recommendation,
"reasoning": f"{urgency}{competition}{price_range}",
}
})
# Sort by opportunity score
opportunities.sort(key=lambda x: x["analysis"]["opportunity_score"], reverse=True)
return {
"opportunities": opportunities[:15],
"strategy_tips": [
"⏰ Auctions ending soon have snipe potential",
"📉 Low bid count = overlooked opportunities",
"💡 Set a max budget and stick to it",
],
"generated_at": datetime.utcnow().isoformat(),
}
def _get_opportunity_reasoning(value_ratio: float, hours_left: float, num_bids: int, score: float) -> str:
"""Generate human-readable reasoning for the opportunity."""
reasons = []
if value_ratio > 2:
reasons.append(f"Significantly undervalued ({value_ratio:.1f}× estimated value)")
elif value_ratio > 1:
reasons.append(f"Undervalued ({value_ratio:.1f}× estimated value)")
else:
reasons.append(f"Current bid exceeds our estimate ({value_ratio:.2f}×)")
if hours_left < 1:
reasons.append("⚡ Ending very soon - final chance to bid")
elif hours_left < 4:
reasons.append("⏰ Ending soon - limited time remaining")
if num_bids < 5:
reasons.append("📉 Very low competition - potential overlooked opportunity")
elif num_bids < 10:
reasons.append("📊 Moderate competition")
else:
reasons.append(f"🔥 High demand ({num_bids} bids)")
return " | ".join(reasons)