pounce/backend/app/api/auctions.py
yves.gugger 6a6e2460d5 feat: Unified Market Feed API + Pounce Direct Integration
🚀 MARKET CONCEPT IMPLEMENTATION

Backend:
- Added /auctions/feed unified endpoint combining Pounce Direct + external auctions
- Implemented Pounce Score v2.0 with market signals (length, TLD, bids, age)
- Added vanity filter for premium domains (non-auth users)
- Integrated DomainListing model for Pounce Direct

Frontend:
- Refactored terminal/market page with Pounce Direct hierarchy
- Updated public auctions page with Pounce Exclusive section
- Added api.getMarketFeed() to API client
- Converted /market to redirect to /auctions

Documentation:
- Created MARKET_CONCEPT.md with full unicorn roadmap
- Created ZONE_FILE_ACCESS.md with Verisign access guide
- Updated todos and progress tracking

Cleanup:
- Deleted empty legacy folders (dashboard, portfolio, settings, watchlist, careers)
2025-12-11 08:59:50 +01:00

1112 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Smart Pounce - Domain Auction Aggregator
This module provides auction data from our database of scraped listings.
Data is scraped from public auction platforms - NO APIS used.
Data Sources (Web Scraping):
- ExpiredDomains.net (aggregator)
- GoDaddy Auctions (public listings)
- Sedo (public search)
- NameJet (public auctions)
PLUS Pounce Direct Listings (user-created marketplace):
- DNS-verified owner listings
- Instant buy option
- 0% commission
IMPORTANT:
- All data comes from web scraping of public pages
- No mock data - everything is real scraped data
- Data is cached in PostgreSQL/SQLite for performance
- Scraper runs on schedule (see scheduler.py)
Legal Note (Switzerland):
- No escrow/payment handling = no GwG/FINMA requirements
- Users click through to external platforms
- We only provide market intelligence
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, List
from itertools import groupby
from fastapi import APIRouter, Depends, Query, HTTPException
from pydantic import BaseModel
from sqlalchemy import select, func, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user, get_current_user_optional
from app.models.user import User
from app.models.auction import DomainAuction, AuctionScrapeLog
from app.models.listing import DomainListing, ListingStatus, VerificationStatus
from app.services.valuation import valuation_service
from app.services.auction_scraper import auction_scraper
logger = logging.getLogger(__name__)
router = APIRouter()
# ============== Schemas ==============
class AuctionValuation(BaseModel):
"""Valuation details for an auction."""
estimated_value: float
value_ratio: float
potential_profit: float
confidence: str
valuation_formula: str
class AuctionListing(BaseModel):
"""A domain auction listing from the database."""
domain: str
platform: str
platform_url: str
current_bid: float
currency: str
num_bids: int
end_time: datetime
time_remaining: str
buy_now_price: Optional[float] = None
reserve_met: Optional[bool] = None
traffic: Optional[int] = None
age_years: Optional[int] = None
tld: str
affiliate_url: str
valuation: Optional[AuctionValuation] = None
class Config:
from_attributes = True
class AuctionSearchResponse(BaseModel):
"""Response for auction search."""
auctions: List[AuctionListing]
total: int
platforms_searched: List[str]
last_updated: datetime
data_source: str = "scraped"
valuation_note: str = (
"Values are estimated using our algorithm: "
"$50 × Length × TLD × Keyword × Brand factors. "
"See /portfolio/valuation/{domain} for detailed breakdown."
)
class PlatformStats(BaseModel):
"""Statistics for an auction platform."""
platform: str
active_auctions: int
avg_bid: float
ending_soon: int
class ScrapeStatus(BaseModel):
"""Status of auction scraping."""
last_scrape: Optional[datetime]
total_auctions: int
platforms: List[str]
next_scrape: Optional[datetime]
class MarketFeedItem(BaseModel):
"""Unified market feed item - combines auctions and Pounce Direct listings."""
id: str
domain: str
tld: str
price: float
currency: str = "USD"
price_type: str # "bid" or "fixed"
status: str # "auction" or "instant"
# Source info
source: str # "Pounce", "GoDaddy", "Sedo", etc.
is_pounce: bool = False
verified: bool = False
# Auction-specific
time_remaining: Optional[str] = None
end_time: Optional[datetime] = None
num_bids: Optional[int] = None
# Pounce Direct specific
slug: Optional[str] = None
seller_verified: bool = False
# URLs
url: str # Internal for Pounce, external for auctions
is_external: bool = True
# Scoring
pounce_score: int = 50
# Valuation (optional)
valuation: Optional[AuctionValuation] = None
class Config:
from_attributes = True
class MarketFeedResponse(BaseModel):
"""Response for unified market feed."""
items: List[MarketFeedItem]
total: int
pounce_direct_count: int
auction_count: int
sources: List[str]
last_updated: datetime
filters_applied: dict = {}
# ============== Helper Functions ==============
def _format_time_remaining(end_time: datetime) -> str:
"""Format time remaining in human-readable format."""
delta = end_time - datetime.utcnow()
if delta.total_seconds() <= 0:
return "Ended"
hours = int(delta.total_seconds() // 3600)
minutes = int((delta.total_seconds() % 3600) // 60)
if hours > 24:
days = hours // 24
return f"{days}d {hours % 24}h"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def _get_affiliate_url(platform: str, domain: str, auction_url: str) -> str:
"""Get affiliate URL for a platform - links directly to the auction page."""
# Use the scraped auction URL directly if available
if auction_url and auction_url.startswith("http"):
return auction_url
# Fallback to platform-specific search/listing pages
platform_urls = {
"GoDaddy": f"https://auctions.godaddy.com/trpItemListing.aspx?domain={domain}",
"Sedo": f"https://sedo.com/search/?keyword={domain}",
"NameJet": f"https://www.namejet.com/Pages/Auctions/BackorderSearch.aspx?q={domain}",
"DropCatch": f"https://www.dropcatch.com/domain/{domain}",
"ExpiredDomains": f"https://www.expireddomains.net/domain-name-search/?q={domain}",
"Afternic": f"https://www.afternic.com/search?k={domain}",
"Dynadot": f"https://www.dynadot.com/market/auction/{domain}",
"Porkbun": f"https://porkbun.com/checkout/search?q={domain}",
}
return platform_urls.get(platform, f"https://www.google.com/search?q={domain}+domain+auction")
async def _convert_to_listing(
auction: DomainAuction,
db: AsyncSession,
include_valuation: bool = True
) -> AuctionListing:
"""Convert database auction to API response."""
valuation_data = None
if include_valuation:
try:
result = await valuation_service.estimate_value(auction.domain, db, save_result=False)
if "error" not in result:
estimated_value = result["estimated_value"]
value_ratio = round(estimated_value / auction.current_bid, 2) if auction.current_bid > 0 else 99
valuation_data = AuctionValuation(
estimated_value=estimated_value,
value_ratio=value_ratio,
potential_profit=round(estimated_value - auction.current_bid, 2),
confidence=result.get("confidence", "medium"),
valuation_formula=result.get("calculation", {}).get("formula", "N/A"),
)
except Exception as e:
logger.error(f"Valuation error for {auction.domain}: {e}")
return AuctionListing(
domain=auction.domain,
platform=auction.platform,
platform_url=auction.auction_url or "",
current_bid=auction.current_bid,
currency=auction.currency,
num_bids=auction.num_bids,
end_time=auction.end_time,
time_remaining=_format_time_remaining(auction.end_time),
buy_now_price=auction.buy_now_price,
reserve_met=auction.reserve_met,
traffic=auction.traffic,
age_years=auction.age_years,
tld=auction.tld,
affiliate_url=_get_affiliate_url(auction.platform, auction.domain, auction.auction_url),
valuation=valuation_data,
)
# ============== Endpoints ==============
@router.get("", response_model=AuctionSearchResponse)
async def search_auctions(
keyword: Optional[str] = Query(None, description="Search keyword in domain names"),
tld: Optional[str] = Query(None, description="Filter by TLD (e.g., 'com', 'io')"),
platform: Optional[str] = Query(None, description="Filter by platform"),
min_bid: Optional[float] = Query(None, ge=0, description="Minimum current bid"),
max_bid: Optional[float] = Query(None, ge=0, description="Maximum current bid"),
ending_soon: bool = Query(False, description="Only show auctions ending in < 1 hour"),
sort_by: str = Query("ending", enum=["ending", "bid_asc", "bid_desc", "bids", "value_ratio"]),
limit: int = Query(20, le=100),
offset: int = Query(0, ge=0),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Search domain auctions from our scraped database.
All data comes from web scraping of public auction pages.
NO mock data - everything is real scraped data.
Data Sources:
- ExpiredDomains.net (aggregator)
- GoDaddy Auctions (coming soon)
- Sedo (coming soon)
- NameJet (coming soon)
Smart Pounce Strategy:
- Look for value_ratio > 1.0 (estimated value exceeds current bid)
- Focus on auctions ending soon with low bid counts
"""
# Build query
query = select(DomainAuction).where(DomainAuction.is_active == True)
# VANITY FILTER: For public (non-logged-in) users, only show premium-looking domains
# This ensures the first impression is high-quality, not spam domains
if current_user is None:
# Premium TLDs only (no .cc, .website, .info spam clusters)
premium_tlds = ['com', 'io', 'ai', 'co', 'de', 'ch', 'net', 'org', 'app', 'dev', 'xyz']
query = query.where(DomainAuction.tld.in_(premium_tlds))
# No domains with more than 15 characters (excluding TLD)
# Note: We filter further in Python for complex rules
if keyword:
query = query.where(DomainAuction.domain.ilike(f"%{keyword}%"))
if tld:
query = query.where(DomainAuction.tld == tld.lower().lstrip("."))
if platform:
query = query.where(DomainAuction.platform == platform)
if min_bid is not None:
query = query.where(DomainAuction.current_bid >= min_bid)
if max_bid is not None:
query = query.where(DomainAuction.current_bid <= max_bid)
if ending_soon:
cutoff = datetime.utcnow() + timedelta(hours=1)
query = query.where(DomainAuction.end_time <= cutoff)
# Count total
count_query = select(func.count()).select_from(query.subquery())
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
# Sort
if sort_by == "ending":
query = query.order_by(DomainAuction.end_time.asc())
elif sort_by == "bid_asc":
query = query.order_by(DomainAuction.current_bid.asc())
elif sort_by == "bid_desc":
query = query.order_by(DomainAuction.current_bid.desc())
elif sort_by == "bids":
query = query.order_by(DomainAuction.num_bids.desc())
else:
query = query.order_by(DomainAuction.end_time.asc())
# Pagination
query = query.offset(offset).limit(limit)
result = await db.execute(query)
auctions = list(result.scalars().all())
# VANITY FILTER PART 2: Apply Python-side filtering for public users
# This ensures only premium-looking domains are shown to non-logged-in users
if current_user is None:
def is_premium_domain(domain_name: str) -> bool:
"""Check if a domain looks premium/professional"""
# Extract just the domain part (without TLD)
parts = domain_name.rsplit('.', 1)
name = parts[0] if parts else domain_name
# Rule 1: No more than 15 characters
if len(name) > 15:
return False
# Rule 2: No more than 1 hyphen
if name.count('-') > 1:
return False
# Rule 3: No more than 2 digits total
digit_count = sum(1 for c in name if c.isdigit())
if digit_count > 2:
return False
# Rule 4: Must be at least 3 characters
if len(name) < 3:
return False
# Rule 5: No random-looking strings (too many consonants in a row)
consonants = 'bcdfghjklmnpqrstvwxyz'
consonant_streak = 0
max_streak = 0
for c in name.lower():
if c in consonants:
consonant_streak += 1
max_streak = max(max_streak, consonant_streak)
else:
consonant_streak = 0
if max_streak > 4:
return False
return True
auctions = [a for a in auctions if is_premium_domain(a.domain)]
# Convert to response with valuations
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
# Sort by value_ratio if requested (after valuation)
if sort_by == "value_ratio":
listings.sort(
key=lambda x: x.valuation.value_ratio if x.valuation else 0,
reverse=True
)
# Get platforms searched
platforms_result = await db.execute(
select(DomainAuction.platform).distinct()
)
platforms = [p for (p,) in platforms_result.all()]
# Get last update time
last_update_result = await db.execute(
select(func.max(DomainAuction.updated_at))
)
last_updated = last_update_result.scalar() or datetime.utcnow()
return AuctionSearchResponse(
auctions=listings,
total=total,
platforms_searched=platforms or ["No data yet - scrape pending"],
last_updated=last_updated,
data_source="scraped from public auction sites",
)
@router.get("/ending-soon", response_model=List[AuctionListing])
async def get_ending_soon(
hours: int = Query(1, ge=1, le=24, description="Hours until end"),
limit: int = Query(10, le=50),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get auctions ending soon - best opportunities for sniping.
Data is scraped from public auction sites - no mock data.
"""
cutoff = datetime.utcnow() + timedelta(hours=hours)
query = (
select(DomainAuction)
.where(
and_(
DomainAuction.is_active == True,
DomainAuction.end_time <= cutoff,
DomainAuction.end_time > datetime.utcnow(),
)
)
.order_by(DomainAuction.end_time.asc())
.limit(limit)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
return listings
@router.get("/hot", response_model=List[AuctionListing])
async def get_hot_auctions(
limit: int = Query(10, le=50),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get hottest auctions by bidding activity.
Data is scraped from public auction sites - no mock data.
"""
query = (
select(DomainAuction)
.where(DomainAuction.is_active == True)
.order_by(DomainAuction.num_bids.desc())
.limit(limit)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
return listings
@router.get("/stats", response_model=List[PlatformStats])
async def get_platform_stats(
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get statistics for each auction platform.
Data is scraped from public auction sites - no mock data.
"""
# Get stats per platform
stats_query = (
select(
DomainAuction.platform,
func.count(DomainAuction.id).label("count"),
func.avg(DomainAuction.current_bid).label("avg_bid"),
)
.where(DomainAuction.is_active == True)
.group_by(DomainAuction.platform)
)
result = await db.execute(stats_query)
platform_data = result.all()
# Get ending soon counts
cutoff = datetime.utcnow() + timedelta(hours=1)
ending_query = (
select(
DomainAuction.platform,
func.count(DomainAuction.id).label("ending_count"),
)
.where(
and_(
DomainAuction.is_active == True,
DomainAuction.end_time <= cutoff,
)
)
.group_by(DomainAuction.platform)
)
ending_result = await db.execute(ending_query)
ending_data = {p: c for p, c in ending_result.all()}
stats = []
for platform, count, avg_bid in platform_data:
stats.append(PlatformStats(
platform=platform,
active_auctions=count,
avg_bid=round(avg_bid or 0, 2),
ending_soon=ending_data.get(platform, 0),
))
return sorted(stats, key=lambda x: x.active_auctions, reverse=True)
@router.get("/scrape-status", response_model=ScrapeStatus)
async def get_scrape_status(
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""Get status of auction scraping."""
# Get last successful scrape
last_scrape_query = (
select(AuctionScrapeLog)
.where(AuctionScrapeLog.status == "success")
.order_by(AuctionScrapeLog.completed_at.desc())
.limit(1)
)
result = await db.execute(last_scrape_query)
last_log = result.scalar_one_or_none()
# Get total auctions
total_query = select(func.count(DomainAuction.id)).where(DomainAuction.is_active == True)
total_result = await db.execute(total_query)
total = total_result.scalar() or 0
# Get platforms
platforms_result = await db.execute(
select(DomainAuction.platform).distinct()
)
platforms = [p for (p,) in platforms_result.all()]
return ScrapeStatus(
last_scrape=last_log.completed_at if last_log else None,
total_auctions=total,
platforms=platforms or ["Pending initial scrape"],
next_scrape=datetime.utcnow() + timedelta(hours=1), # Approximation
)
@router.post("/trigger-scrape")
async def trigger_scrape(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Manually trigger auction scraping (admin only for now).
In production, this runs automatically every hour.
"""
try:
result = await auction_scraper.scrape_all_platforms(db)
return {
"status": "success",
"message": "Scraping completed",
"result": result,
}
except Exception as e:
logger.error(f"Manual scrape failed: {e}")
raise HTTPException(status_code=500, detail=f"Scrape failed: {str(e)}")
@router.post("/seed")
async def seed_auctions(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Seed the database with realistic sample auction data.
Useful for development and demo purposes.
"""
try:
result = await auction_scraper.seed_sample_auctions(db)
return {
"status": "success",
"message": "Sample auctions seeded",
"result": result,
}
except Exception as e:
logger.error(f"Seeding failed: {e}")
raise HTTPException(status_code=500, detail=f"Seeding failed: {str(e)}")
@router.get("/opportunities")
async def get_smart_opportunities(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Smart Pounce Algorithm - Find the best auction opportunities.
Analyzes auction data to find sweet spots:
- Auctions ending soon (snipe potential)
- Low bid counts (overlooked gems)
- Good price points
Opportunity Score = time_urgency × competition_factor × price_factor
"""
# Get active auctions
query = (
select(DomainAuction)
.where(DomainAuction.is_active == True)
.order_by(DomainAuction.end_time.asc())
.limit(100)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
if not auctions:
return {
"opportunities": [],
"message": "No active auctions found.",
"strategy_tips": [
"🔄 Check back soon for new auctions",
"⏰ Best opportunities often appear as auctions near their end",
],
"generated_at": datetime.utcnow().isoformat(),
}
opportunities = []
for auction in auctions:
hours_left = (auction.end_time - datetime.utcnow()).total_seconds() / 3600
# Skip auctions that have ended or are too far out
if hours_left <= 0 or hours_left > 72:
continue
# Time urgency: Higher score for auctions ending soon
if hours_left < 1:
time_score = 5.0
urgency = "Ending in minutes!"
elif hours_left < 4:
time_score = 3.0
urgency = "Ending very soon"
elif hours_left < 12:
time_score = 2.0
urgency = "Ending today"
elif hours_left < 24:
time_score = 1.5
urgency = "Ending tomorrow"
else:
time_score = 1.0
urgency = "Active"
# Competition factor: Lower bids = better opportunity
if auction.num_bids < 3:
competition_score = 3.0
competition = "Almost no competition"
elif auction.num_bids < 10:
competition_score = 2.0
competition = "Low competition"
elif auction.num_bids < 20:
competition_score = 1.2
competition = "Moderate competition"
else:
competition_score = 0.8
competition = "High competition"
# Price factor: Reasonable price points are opportunities
if auction.current_bid < 100:
price_score = 2.0
price_range = "Budget-friendly"
elif auction.current_bid < 500:
price_score = 1.5
price_range = "Mid-range"
elif auction.current_bid < 2000:
price_score = 1.2
price_range = "Premium"
else:
price_score = 1.0
price_range = "High-value"
# Calculate overall opportunity score
opportunity_score = round(time_score * competition_score * price_score, 1)
# Only include if score is interesting (> 3)
if opportunity_score < 3:
continue
listing = await _convert_to_listing(auction, db, include_valuation=False)
recommendation = (
"🔥 Hot" if opportunity_score >= 10 else
"⚡ Great" if opportunity_score >= 6 else
"👀 Watch"
)
opportunities.append({
"auction": listing.model_dump(),
"analysis": {
"opportunity_score": opportunity_score,
"time_score": time_score,
"competition_score": competition_score,
"price_score": price_score,
"urgency": urgency,
"competition": competition,
"price_range": price_range,
"recommendation": recommendation,
"reasoning": f"{urgency}{competition}{price_range}",
}
})
# Sort by opportunity score
opportunities.sort(key=lambda x: x["analysis"]["opportunity_score"], reverse=True)
return {
"opportunities": opportunities[:15],
"strategy_tips": [
"⏰ Auctions ending soon have snipe potential",
"📉 Low bid count = overlooked opportunities",
"💡 Set a max budget and stick to it",
],
"generated_at": datetime.utcnow().isoformat(),
}
def _get_opportunity_reasoning(value_ratio: float, hours_left: float, num_bids: int, score: float) -> str:
"""Generate human-readable reasoning for the opportunity."""
reasons = []
if value_ratio > 2:
reasons.append(f"Significantly undervalued ({value_ratio:.1f}× estimated value)")
elif value_ratio > 1:
reasons.append(f"Undervalued ({value_ratio:.1f}× estimated value)")
else:
reasons.append(f"Current bid exceeds our estimate ({value_ratio:.2f}×)")
if hours_left < 1:
reasons.append("⚡ Ending very soon - final chance to bid")
elif hours_left < 4:
reasons.append("⏰ Ending soon - limited time remaining")
if num_bids < 5:
reasons.append("📉 Very low competition - potential overlooked opportunity")
elif num_bids < 10:
reasons.append("📊 Moderate competition")
else:
reasons.append(f"🔥 High demand ({num_bids} bids)")
return " | ".join(reasons)
def _calculate_pounce_score_v2(domain: str, tld: str, num_bids: int = 0, age_years: int = 0, is_pounce: bool = False) -> int:
"""
Pounce Score v2.0 - Enhanced scoring algorithm.
Factors:
- Length (shorter = more valuable)
- TLD premium
- Market activity (bids)
- Age bonus
- Pounce Direct bonus (verified listings)
- Penalties (hyphens, numbers, etc.)
"""
score = 50 # Baseline
name = domain.rsplit('.', 1)[0] if '.' in domain else domain
# A) LENGTH BONUS (exponential for short domains)
length_scores = {1: 50, 2: 45, 3: 40, 4: 30, 5: 20, 6: 15, 7: 10}
score += length_scores.get(len(name), max(0, 15 - len(name)))
# B) TLD PREMIUM
tld_scores = {
'com': 20, 'ai': 25, 'io': 18, 'co': 12,
'ch': 15, 'de': 10, 'net': 8, 'org': 8,
'app': 10, 'dev': 10, 'xyz': 5
}
score += tld_scores.get(tld.lower(), 0)
# C) MARKET ACTIVITY (bids = demand signal)
if num_bids >= 20:
score += 15
elif num_bids >= 10:
score += 10
elif num_bids >= 5:
score += 5
elif num_bids >= 2:
score += 2
# D) AGE BONUS (established domains)
if age_years and age_years > 15:
score += 10
elif age_years and age_years > 10:
score += 7
elif age_years and age_years > 5:
score += 3
# E) POUNCE DIRECT BONUS (verified = trustworthy)
if is_pounce:
score += 10
# F) PENALTIES
if '-' in name:
score -= 25
if any(c.isdigit() for c in name) and len(name) > 3:
score -= 20
if len(name) > 15:
score -= 15
# G) CONSONANT CHECK (no gibberish like "xkqzfgh")
consonants = 'bcdfghjklmnpqrstvwxyz'
max_streak = 0
current_streak = 0
for c in name.lower():
if c in consonants:
current_streak += 1
max_streak = max(max_streak, current_streak)
else:
current_streak = 0
if max_streak > 4:
score -= 15
return max(0, min(100, score))
def _is_premium_domain(domain_name: str) -> bool:
"""Check if a domain looks premium/professional (Vanity Filter)."""
parts = domain_name.rsplit('.', 1)
name = parts[0] if parts else domain_name
tld = parts[1].lower() if len(parts) > 1 else ""
# Premium TLDs only
premium_tlds = ['com', 'io', 'ai', 'co', 'de', 'ch', 'net', 'org', 'app', 'dev', 'xyz']
if tld and tld not in premium_tlds:
return False
# Length check
if len(name) > 15:
return False
if len(name) < 3:
return False
# Hyphen check
if name.count('-') > 1:
return False
# Digit check
if sum(1 for c in name if c.isdigit()) > 2:
return False
# Consonant cluster check
consonants = 'bcdfghjklmnpqrstvwxyz'
max_streak = 0
current_streak = 0
for c in name.lower():
if c in consonants:
current_streak += 1
max_streak = max(max_streak, current_streak)
else:
current_streak = 0
if max_streak > 4:
return False
return True
# ============== UNIFIED MARKET FEED ==============
@router.get("/feed", response_model=MarketFeedResponse)
async def get_market_feed(
# Source filter
source: str = Query("all", enum=["all", "pounce", "external"]),
# Search & filters
keyword: Optional[str] = Query(None, description="Search in domain names"),
tld: Optional[str] = Query(None, description="Filter by TLD"),
min_price: Optional[float] = Query(None, ge=0),
max_price: Optional[float] = Query(None, ge=0),
min_score: int = Query(0, ge=0, le=100),
ending_within: Optional[int] = Query(None, description="Auctions ending within X hours"),
verified_only: bool = Query(False, description="Only show verified Pounce listings"),
# Sort
sort_by: str = Query("score", enum=["score", "price_asc", "price_desc", "time", "newest"]),
# Pagination
limit: int = Query(50, le=200),
offset: int = Query(0, ge=0),
# Auth
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
🚀 UNIFIED MARKET FEED — The heart of Pounce
Combines:
- 💎 Pounce Direct: DNS-verified user listings (instant buy)
- 🏢 External Auctions: Scraped from GoDaddy, Sedo, NameJet, etc.
For non-authenticated users:
- Vanity filter applied (premium domains only)
- Pounce Score visible but limited details
For authenticated users (Trader/Tycoon):
- Full access to all domains
- Advanced filtering
- Valuation data
POUNCE EXCLUSIVE domains are highlighted and appear first.
"""
items: List[MarketFeedItem] = []
pounce_count = 0
auction_count = 0
# ═══════════════════════════════════════════════════════════════
# 1. POUNCE DIRECT LISTINGS (Our USP!)
# ═══════════════════════════════════════════════════════════════
if source in ["all", "pounce"]:
listing_query = select(DomainListing).where(
DomainListing.status == ListingStatus.ACTIVE.value
)
if keyword:
listing_query = listing_query.where(
DomainListing.domain.ilike(f"%{keyword}%")
)
if verified_only:
listing_query = listing_query.where(
DomainListing.verification_status == VerificationStatus.VERIFIED.value
)
if min_price is not None:
listing_query = listing_query.where(DomainListing.asking_price >= min_price)
if max_price is not None:
listing_query = listing_query.where(DomainListing.asking_price <= max_price)
result = await db.execute(listing_query)
listings = result.scalars().all()
for listing in listings:
domain_tld = listing.domain.rsplit('.', 1)[1] if '.' in listing.domain else ""
# Apply TLD filter
if tld and domain_tld.lower() != tld.lower().lstrip('.'):
continue
pounce_score = listing.pounce_score or _calculate_pounce_score_v2(
listing.domain, domain_tld, is_pounce=True
)
# Apply score filter
if pounce_score < min_score:
continue
items.append(MarketFeedItem(
id=f"pounce-{listing.id}",
domain=listing.domain,
tld=domain_tld,
price=listing.asking_price or 0,
currency=listing.currency or "USD",
price_type="fixed" if listing.price_type == "fixed" else "negotiable",
status="instant",
source="Pounce",
is_pounce=True,
verified=listing.is_verified,
seller_verified=listing.is_verified,
slug=listing.slug,
url=f"/buy/{listing.slug}",
is_external=False,
pounce_score=pounce_score,
))
pounce_count += 1
# ═══════════════════════════════════════════════════════════════
# 2. EXTERNAL AUCTIONS (Scraped from platforms)
# ═══════════════════════════════════════════════════════════════
if source in ["all", "external"]:
auction_query = select(DomainAuction).where(DomainAuction.is_active == True)
if keyword:
auction_query = auction_query.where(
DomainAuction.domain.ilike(f"%{keyword}%")
)
if tld:
auction_query = auction_query.where(
DomainAuction.tld == tld.lower().lstrip('.')
)
if min_price is not None:
auction_query = auction_query.where(DomainAuction.current_bid >= min_price)
if max_price is not None:
auction_query = auction_query.where(DomainAuction.current_bid <= max_price)
if ending_within:
cutoff = datetime.utcnow() + timedelta(hours=ending_within)
auction_query = auction_query.where(DomainAuction.end_time <= cutoff)
result = await db.execute(auction_query)
auctions = result.scalars().all()
for auction in auctions:
# Apply vanity filter for non-authenticated users
if current_user is None and not _is_premium_domain(auction.domain):
continue
pounce_score = _calculate_pounce_score_v2(
auction.domain,
auction.tld,
num_bids=auction.num_bids,
age_years=auction.age_years or 0,
is_pounce=False
)
# Apply score filter
if pounce_score < min_score:
continue
items.append(MarketFeedItem(
id=f"auction-{auction.id}",
domain=auction.domain,
tld=auction.tld,
price=auction.current_bid,
currency=auction.currency,
price_type="bid",
status="auction",
source=auction.platform,
is_pounce=False,
verified=False,
time_remaining=_format_time_remaining(auction.end_time),
end_time=auction.end_time,
num_bids=auction.num_bids,
url=_get_affiliate_url(auction.platform, auction.domain, auction.auction_url),
is_external=True,
pounce_score=pounce_score,
))
auction_count += 1
# ═══════════════════════════════════════════════════════════════
# 3. SORT (Pounce Direct always appears first within same score)
# ═══════════════════════════════════════════════════════════════
if sort_by == "score":
items.sort(key=lambda x: (-x.pounce_score, -int(x.is_pounce), x.domain))
elif sort_by == "price_asc":
items.sort(key=lambda x: (x.price, -int(x.is_pounce), x.domain))
elif sort_by == "price_desc":
items.sort(key=lambda x: (-x.price, -int(x.is_pounce), x.domain))
elif sort_by == "time":
# Pounce Direct first (no time limit), then by end time
def time_sort_key(x):
if x.is_pounce:
return (0, datetime.max)
return (1, x.end_time or datetime.max)
items.sort(key=time_sort_key)
elif sort_by == "newest":
items.sort(key=lambda x: (-int(x.is_pounce), x.domain))
total = len(items)
# Pagination
items = items[offset:offset + limit]
# Get unique sources
sources = list(set(item.source for item in items))
# Last update time
last_update_result = await db.execute(
select(func.max(DomainAuction.updated_at))
)
last_updated = last_update_result.scalar() or datetime.utcnow()
return MarketFeedResponse(
items=items,
total=total,
pounce_direct_count=pounce_count,
auction_count=auction_count,
sources=sources,
last_updated=last_updated,
filters_applied={
"source": source,
"keyword": keyword,
"tld": tld,
"min_price": min_price,
"max_price": max_price,
"min_score": min_score,
"ending_within": ending_within,
"verified_only": verified_only,
"sort_by": sort_by,
}
)