pounce/backend/app/api/auctions.py
yves.gugger cff0ba0984 feat: Add Admin Panel enhancements, Blog system, and OAuth
Admin Panel:
- User Detail Modal with full profile info
- Bulk tier upgrade for multiple users
- User export to CSV
- Price Alerts overview tab
- Domain Health Check trigger
- Email Test functionality
- Scheduler Status with job info and last runs
- Activity Log for admin actions
- Blog management tab with CRUD

Blog System:
- BlogPost model with full content management
- Public API: list, featured, categories, single post
- Admin API: create, update, delete, publish/unpublish
- Frontend blog listing page with categories
- Frontend blog detail page with styling
- View count tracking

OAuth:
- Google OAuth integration
- GitHub OAuth integration
- OAuth callback handling
- Provider selection on login/register

Other improvements:
- Domain checker with check_all_domains function
- Admin activity logging
- Breadcrumbs component
- Toast notification component
- Various UI/UX improvements
2025-12-09 16:52:54 +01:00

662 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Smart Pounce - Domain Auction Aggregator
This module provides auction data from our database of scraped listings.
Data is scraped from public auction platforms - NO APIS used.
Data Sources (Web Scraping):
- ExpiredDomains.net (aggregator)
- GoDaddy Auctions (public listings)
- Sedo (public search)
- NameJet (public auctions)
IMPORTANT:
- All data comes from web scraping of public pages
- No mock data - everything is real scraped data
- Data is cached in PostgreSQL/SQLite for performance
- Scraper runs on schedule (see scheduler.py)
Legal Note (Switzerland):
- No escrow/payment handling = no GwG/FINMA requirements
- Users click through to external platforms
- We only provide market intelligence
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, List
from fastapi import APIRouter, Depends, Query, HTTPException
from pydantic import BaseModel
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user, get_current_user_optional
from app.models.user import User
from app.models.auction import DomainAuction, AuctionScrapeLog
from app.services.valuation import valuation_service
from app.services.auction_scraper import auction_scraper
logger = logging.getLogger(__name__)
router = APIRouter()
# ============== Schemas ==============
class AuctionValuation(BaseModel):
"""Valuation details for an auction."""
estimated_value: float
value_ratio: float
potential_profit: float
confidence: str
valuation_formula: str
class AuctionListing(BaseModel):
"""A domain auction listing from the database."""
domain: str
platform: str
platform_url: str
current_bid: float
currency: str
num_bids: int
end_time: datetime
time_remaining: str
buy_now_price: Optional[float] = None
reserve_met: Optional[bool] = None
traffic: Optional[int] = None
age_years: Optional[int] = None
tld: str
affiliate_url: str
valuation: Optional[AuctionValuation] = None
class Config:
from_attributes = True
class AuctionSearchResponse(BaseModel):
"""Response for auction search."""
auctions: List[AuctionListing]
total: int
platforms_searched: List[str]
last_updated: datetime
data_source: str = "scraped"
valuation_note: str = (
"Values are estimated using our algorithm: "
"$50 × Length × TLD × Keyword × Brand factors. "
"See /portfolio/valuation/{domain} for detailed breakdown."
)
class PlatformStats(BaseModel):
"""Statistics for an auction platform."""
platform: str
active_auctions: int
avg_bid: float
ending_soon: int
class ScrapeStatus(BaseModel):
"""Status of auction scraping."""
last_scrape: Optional[datetime]
total_auctions: int
platforms: List[str]
next_scrape: Optional[datetime]
# ============== Helper Functions ==============
def _format_time_remaining(end_time: datetime) -> str:
"""Format time remaining in human-readable format."""
delta = end_time - datetime.utcnow()
if delta.total_seconds() <= 0:
return "Ended"
hours = int(delta.total_seconds() // 3600)
minutes = int((delta.total_seconds() % 3600) // 60)
if hours > 24:
days = hours // 24
return f"{days}d {hours % 24}h"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def _get_affiliate_url(platform: str, domain: str, auction_url: str) -> str:
"""Get affiliate URL for a platform - links directly to the auction page."""
# Use the scraped auction URL directly if available
if auction_url and auction_url.startswith("http"):
return auction_url
# Fallback to platform-specific search/listing pages
platform_urls = {
"GoDaddy": f"https://auctions.godaddy.com/trpItemListing.aspx?domain={domain}",
"Sedo": f"https://sedo.com/search/?keyword={domain}",
"NameJet": f"https://www.namejet.com/Pages/Auctions/BackorderSearch.aspx?q={domain}",
"DropCatch": f"https://www.dropcatch.com/domain/{domain}",
"ExpiredDomains": f"https://www.expireddomains.net/domain-name-search/?q={domain}",
"Afternic": f"https://www.afternic.com/search?k={domain}",
"Dynadot": f"https://www.dynadot.com/market/auction/{domain}",
"Porkbun": f"https://porkbun.com/checkout/search?q={domain}",
}
return platform_urls.get(platform, f"https://www.google.com/search?q={domain}+domain+auction")
async def _convert_to_listing(
auction: DomainAuction,
db: AsyncSession,
include_valuation: bool = True
) -> AuctionListing:
"""Convert database auction to API response."""
valuation_data = None
if include_valuation:
try:
result = await valuation_service.estimate_value(auction.domain, db, save_result=False)
if "error" not in result:
estimated_value = result["estimated_value"]
value_ratio = round(estimated_value / auction.current_bid, 2) if auction.current_bid > 0 else 99
valuation_data = AuctionValuation(
estimated_value=estimated_value,
value_ratio=value_ratio,
potential_profit=round(estimated_value - auction.current_bid, 2),
confidence=result.get("confidence", "medium"),
valuation_formula=result.get("calculation", {}).get("formula", "N/A"),
)
except Exception as e:
logger.error(f"Valuation error for {auction.domain}: {e}")
return AuctionListing(
domain=auction.domain,
platform=auction.platform,
platform_url=auction.auction_url or "",
current_bid=auction.current_bid,
currency=auction.currency,
num_bids=auction.num_bids,
end_time=auction.end_time,
time_remaining=_format_time_remaining(auction.end_time),
buy_now_price=auction.buy_now_price,
reserve_met=auction.reserve_met,
traffic=auction.traffic,
age_years=auction.age_years,
tld=auction.tld,
affiliate_url=_get_affiliate_url(auction.platform, auction.domain, auction.auction_url),
valuation=valuation_data,
)
# ============== Endpoints ==============
@router.get("", response_model=AuctionSearchResponse)
async def search_auctions(
keyword: Optional[str] = Query(None, description="Search keyword in domain names"),
tld: Optional[str] = Query(None, description="Filter by TLD (e.g., 'com', 'io')"),
platform: Optional[str] = Query(None, description="Filter by platform"),
min_bid: Optional[float] = Query(None, ge=0, description="Minimum current bid"),
max_bid: Optional[float] = Query(None, ge=0, description="Maximum current bid"),
ending_soon: bool = Query(False, description="Only show auctions ending in < 1 hour"),
sort_by: str = Query("ending", enum=["ending", "bid_asc", "bid_desc", "bids", "value_ratio"]),
limit: int = Query(20, le=100),
offset: int = Query(0, ge=0),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Search domain auctions from our scraped database.
All data comes from web scraping of public auction pages.
NO mock data - everything is real scraped data.
Data Sources:
- ExpiredDomains.net (aggregator)
- GoDaddy Auctions (coming soon)
- Sedo (coming soon)
- NameJet (coming soon)
Smart Pounce Strategy:
- Look for value_ratio > 1.0 (estimated value exceeds current bid)
- Focus on auctions ending soon with low bid counts
"""
# Build query
query = select(DomainAuction).where(DomainAuction.is_active == True)
if keyword:
query = query.where(DomainAuction.domain.ilike(f"%{keyword}%"))
if tld:
query = query.where(DomainAuction.tld == tld.lower().lstrip("."))
if platform:
query = query.where(DomainAuction.platform == platform)
if min_bid is not None:
query = query.where(DomainAuction.current_bid >= min_bid)
if max_bid is not None:
query = query.where(DomainAuction.current_bid <= max_bid)
if ending_soon:
cutoff = datetime.utcnow() + timedelta(hours=1)
query = query.where(DomainAuction.end_time <= cutoff)
# Count total
count_query = select(func.count()).select_from(query.subquery())
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
# Sort
if sort_by == "ending":
query = query.order_by(DomainAuction.end_time.asc())
elif sort_by == "bid_asc":
query = query.order_by(DomainAuction.current_bid.asc())
elif sort_by == "bid_desc":
query = query.order_by(DomainAuction.current_bid.desc())
elif sort_by == "bids":
query = query.order_by(DomainAuction.num_bids.desc())
else:
query = query.order_by(DomainAuction.end_time.asc())
# Pagination
query = query.offset(offset).limit(limit)
result = await db.execute(query)
auctions = list(result.scalars().all())
# Convert to response with valuations
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
# Sort by value_ratio if requested (after valuation)
if sort_by == "value_ratio":
listings.sort(
key=lambda x: x.valuation.value_ratio if x.valuation else 0,
reverse=True
)
# Get platforms searched
platforms_result = await db.execute(
select(DomainAuction.platform).distinct()
)
platforms = [p for (p,) in platforms_result.all()]
# Get last update time
last_update_result = await db.execute(
select(func.max(DomainAuction.updated_at))
)
last_updated = last_update_result.scalar() or datetime.utcnow()
return AuctionSearchResponse(
auctions=listings,
total=total,
platforms_searched=platforms or ["No data yet - scrape pending"],
last_updated=last_updated,
data_source="scraped from public auction sites",
)
@router.get("/ending-soon", response_model=List[AuctionListing])
async def get_ending_soon(
hours: int = Query(1, ge=1, le=24, description="Hours until end"),
limit: int = Query(10, le=50),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get auctions ending soon - best opportunities for sniping.
Data is scraped from public auction sites - no mock data.
"""
cutoff = datetime.utcnow() + timedelta(hours=hours)
query = (
select(DomainAuction)
.where(
and_(
DomainAuction.is_active == True,
DomainAuction.end_time <= cutoff,
DomainAuction.end_time > datetime.utcnow(),
)
)
.order_by(DomainAuction.end_time.asc())
.limit(limit)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
return listings
@router.get("/hot", response_model=List[AuctionListing])
async def get_hot_auctions(
limit: int = Query(10, le=50),
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get hottest auctions by bidding activity.
Data is scraped from public auction sites - no mock data.
"""
query = (
select(DomainAuction)
.where(DomainAuction.is_active == True)
.order_by(DomainAuction.num_bids.desc())
.limit(limit)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
listings = []
for auction in auctions:
listing = await _convert_to_listing(auction, db, include_valuation=True)
listings.append(listing)
return listings
@router.get("/stats", response_model=List[PlatformStats])
async def get_platform_stats(
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""
Get statistics for each auction platform.
Data is scraped from public auction sites - no mock data.
"""
# Get stats per platform
stats_query = (
select(
DomainAuction.platform,
func.count(DomainAuction.id).label("count"),
func.avg(DomainAuction.current_bid).label("avg_bid"),
)
.where(DomainAuction.is_active == True)
.group_by(DomainAuction.platform)
)
result = await db.execute(stats_query)
platform_data = result.all()
# Get ending soon counts
cutoff = datetime.utcnow() + timedelta(hours=1)
ending_query = (
select(
DomainAuction.platform,
func.count(DomainAuction.id).label("ending_count"),
)
.where(
and_(
DomainAuction.is_active == True,
DomainAuction.end_time <= cutoff,
)
)
.group_by(DomainAuction.platform)
)
ending_result = await db.execute(ending_query)
ending_data = {p: c for p, c in ending_result.all()}
stats = []
for platform, count, avg_bid in platform_data:
stats.append(PlatformStats(
platform=platform,
active_auctions=count,
avg_bid=round(avg_bid or 0, 2),
ending_soon=ending_data.get(platform, 0),
))
return sorted(stats, key=lambda x: x.active_auctions, reverse=True)
@router.get("/scrape-status", response_model=ScrapeStatus)
async def get_scrape_status(
current_user: Optional[User] = Depends(get_current_user_optional),
db: AsyncSession = Depends(get_db),
):
"""Get status of auction scraping."""
# Get last successful scrape
last_scrape_query = (
select(AuctionScrapeLog)
.where(AuctionScrapeLog.status == "success")
.order_by(AuctionScrapeLog.completed_at.desc())
.limit(1)
)
result = await db.execute(last_scrape_query)
last_log = result.scalar_one_or_none()
# Get total auctions
total_query = select(func.count(DomainAuction.id)).where(DomainAuction.is_active == True)
total_result = await db.execute(total_query)
total = total_result.scalar() or 0
# Get platforms
platforms_result = await db.execute(
select(DomainAuction.platform).distinct()
)
platforms = [p for (p,) in platforms_result.all()]
return ScrapeStatus(
last_scrape=last_log.completed_at if last_log else None,
total_auctions=total,
platforms=platforms or ["Pending initial scrape"],
next_scrape=datetime.utcnow() + timedelta(hours=1), # Approximation
)
@router.post("/trigger-scrape")
async def trigger_scrape(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Manually trigger auction scraping (admin only for now).
In production, this runs automatically every hour.
"""
try:
result = await auction_scraper.scrape_all_platforms(db)
return {
"status": "success",
"message": "Scraping completed",
"result": result,
}
except Exception as e:
logger.error(f"Manual scrape failed: {e}")
raise HTTPException(status_code=500, detail=f"Scrape failed: {str(e)}")
@router.post("/seed")
async def seed_auctions(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Seed the database with realistic sample auction data.
Useful for development and demo purposes.
"""
try:
result = await auction_scraper.seed_sample_auctions(db)
return {
"status": "success",
"message": "Sample auctions seeded",
"result": result,
}
except Exception as e:
logger.error(f"Seeding failed: {e}")
raise HTTPException(status_code=500, detail=f"Seeding failed: {str(e)}")
@router.get("/opportunities")
async def get_smart_opportunities(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Smart Pounce Algorithm - Find the best auction opportunities.
Analyzes auction data to find sweet spots:
- Auctions ending soon (snipe potential)
- Low bid counts (overlooked gems)
- Good price points
Opportunity Score = time_urgency × competition_factor × price_factor
"""
# Get active auctions
query = (
select(DomainAuction)
.where(DomainAuction.is_active == True)
.order_by(DomainAuction.end_time.asc())
.limit(100)
)
result = await db.execute(query)
auctions = list(result.scalars().all())
if not auctions:
return {
"opportunities": [],
"message": "No active auctions found.",
"strategy_tips": [
"🔄 Check back soon for new auctions",
"⏰ Best opportunities often appear as auctions near their end",
],
"generated_at": datetime.utcnow().isoformat(),
}
opportunities = []
for auction in auctions:
hours_left = (auction.end_time - datetime.utcnow()).total_seconds() / 3600
# Skip auctions that have ended or are too far out
if hours_left <= 0 or hours_left > 72:
continue
# Time urgency: Higher score for auctions ending soon
if hours_left < 1:
time_score = 5.0
urgency = "Ending in minutes!"
elif hours_left < 4:
time_score = 3.0
urgency = "Ending very soon"
elif hours_left < 12:
time_score = 2.0
urgency = "Ending today"
elif hours_left < 24:
time_score = 1.5
urgency = "Ending tomorrow"
else:
time_score = 1.0
urgency = "Active"
# Competition factor: Lower bids = better opportunity
if auction.num_bids < 3:
competition_score = 3.0
competition = "Almost no competition"
elif auction.num_bids < 10:
competition_score = 2.0
competition = "Low competition"
elif auction.num_bids < 20:
competition_score = 1.2
competition = "Moderate competition"
else:
competition_score = 0.8
competition = "High competition"
# Price factor: Reasonable price points are opportunities
if auction.current_bid < 100:
price_score = 2.0
price_range = "Budget-friendly"
elif auction.current_bid < 500:
price_score = 1.5
price_range = "Mid-range"
elif auction.current_bid < 2000:
price_score = 1.2
price_range = "Premium"
else:
price_score = 1.0
price_range = "High-value"
# Calculate overall opportunity score
opportunity_score = round(time_score * competition_score * price_score, 1)
# Only include if score is interesting (> 3)
if opportunity_score < 3:
continue
listing = await _convert_to_listing(auction, db, include_valuation=False)
recommendation = (
"🔥 Hot" if opportunity_score >= 10 else
"⚡ Great" if opportunity_score >= 6 else
"👀 Watch"
)
opportunities.append({
"auction": listing.model_dump(),
"analysis": {
"opportunity_score": opportunity_score,
"time_score": time_score,
"competition_score": competition_score,
"price_score": price_score,
"urgency": urgency,
"competition": competition,
"price_range": price_range,
"recommendation": recommendation,
"reasoning": f"{urgency}{competition}{price_range}",
}
})
# Sort by opportunity score
opportunities.sort(key=lambda x: x["analysis"]["opportunity_score"], reverse=True)
return {
"opportunities": opportunities[:15],
"strategy_tips": [
"⏰ Auctions ending soon have snipe potential",
"📉 Low bid count = overlooked opportunities",
"💡 Set a max budget and stick to it",
],
"generated_at": datetime.utcnow().isoformat(),
}
def _get_opportunity_reasoning(value_ratio: float, hours_left: float, num_bids: int, score: float) -> str:
"""Generate human-readable reasoning for the opportunity."""
reasons = []
if value_ratio > 2:
reasons.append(f"Significantly undervalued ({value_ratio:.1f}× estimated value)")
elif value_ratio > 1:
reasons.append(f"Undervalued ({value_ratio:.1f}× estimated value)")
else:
reasons.append(f"Current bid exceeds our estimate ({value_ratio:.2f}×)")
if hours_left < 1:
reasons.append("⚡ Ending very soon - final chance to bid")
elif hours_left < 4:
reasons.append("⏰ Ending soon - limited time remaining")
if num_bids < 5:
reasons.append("📉 Very low competition - potential overlooked opportunity")
elif num_bids < 10:
reasons.append("📊 Moderate competition")
else:
reasons.append(f"🔥 High demand ({num_bids} bids)")
return " | ".join(reasons)