pounce/backend/app/services/auction_scraper.py
yves.gugger cff0ba0984 feat: Add Admin Panel enhancements, Blog system, and OAuth
Admin Panel:
- User Detail Modal with full profile info
- Bulk tier upgrade for multiple users
- User export to CSV
- Price Alerts overview tab
- Domain Health Check trigger
- Email Test functionality
- Scheduler Status with job info and last runs
- Activity Log for admin actions
- Blog management tab with CRUD

Blog System:
- BlogPost model with full content management
- Public API: list, featured, categories, single post
- Admin API: create, update, delete, publish/unpublish
- Frontend blog listing page with categories
- Frontend blog detail page with styling
- View count tracking

OAuth:
- Google OAuth integration
- GitHub OAuth integration
- OAuth callback handling
- Provider selection on login/register

Other improvements:
- Domain checker with check_all_domains function
- Admin activity logging
- Breadcrumbs component
- Toast notification component
- Various UI/UX improvements
2025-12-09 16:52:54 +01:00

848 lines
37 KiB
Python

"""
Domain Auction Scraper Service
Scrapes real auction data from various platforms WITHOUT using their APIs.
Uses web scraping to get publicly available auction information.
Supported Platforms:
- ExpiredDomains.net (aggregator for deleted domains)
- GoDaddy Auctions (public listings via RSS/public pages)
- Sedo (public marketplace)
- NameJet (public auctions)
- DropCatch (public auctions)
IMPORTANT:
- Respects robots.txt
- Uses reasonable rate limiting
- Only scrapes publicly available data
- Caches results to minimize requests
"""
import logging
import asyncio
import re
import random
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from urllib.parse import urljoin, quote
import httpx
from bs4 import BeautifulSoup
from sqlalchemy import select, and_, delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.auction import DomainAuction, AuctionScrapeLog
logger = logging.getLogger(__name__)
# Rate limiting: requests per minute per platform
RATE_LIMITS = {
"GoDaddy": 10,
"Sedo": 10,
"NameJet": 10,
"DropCatch": 10,
"ExpiredDomains": 5,
}
# User agent for scraping
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
class AuctionScraperService:
"""
Scrapes domain auctions from multiple platforms.
All data comes from publicly accessible pages - no APIs used.
Results are cached in the database to minimize scraping frequency.
"""
def __init__(self):
self.http_client: Optional[httpx.AsyncClient] = None
self._last_request: Dict[str, datetime] = {}
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client with appropriate headers."""
if self.http_client is None or self.http_client.is_closed:
self.http_client = httpx.AsyncClient(
timeout=30.0,
follow_redirects=True,
headers={
"User-Agent": USER_AGENT,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
)
return self.http_client
async def _rate_limit(self, platform: str):
"""Enforce rate limiting per platform."""
min_interval = 60 / RATE_LIMITS.get(platform, 10) # seconds between requests
last = self._last_request.get(platform)
if last:
elapsed = (datetime.utcnow() - last).total_seconds()
if elapsed < min_interval:
await asyncio.sleep(min_interval - elapsed)
self._last_request[platform] = datetime.utcnow()
async def scrape_all_platforms(self, db: AsyncSession) -> Dict[str, Any]:
"""
Scrape all supported platforms and store results in database.
Returns summary of scraping activity.
"""
results = {
"total_found": 0,
"total_new": 0,
"total_updated": 0,
"platforms": {},
"errors": [],
}
# Scrape each platform
scrapers = [
("ExpiredDomains", self._scrape_expireddomains),
("GoDaddy", self._scrape_godaddy_public),
("Sedo", self._scrape_sedo_public),
("NameJet", self._scrape_namejet_public),
("DropCatch", self._scrape_dropcatch_public),
]
for platform_name, scraper_func in scrapers:
try:
platform_result = await scraper_func(db)
results["platforms"][platform_name] = platform_result
results["total_found"] += platform_result.get("found", 0)
results["total_new"] += platform_result.get("new", 0)
results["total_updated"] += platform_result.get("updated", 0)
except Exception as e:
logger.error(f"Error scraping {platform_name}: {e}")
results["errors"].append(f"{platform_name}: {str(e)}")
# Mark ended auctions as inactive
await self._cleanup_ended_auctions(db)
return results
async def _store_auction(self, db: AsyncSession, auction_data: Dict[str, Any]) -> str:
"""Store or update an auction in the database. Returns 'new' or 'updated'."""
existing = await db.execute(
select(DomainAuction).where(
and_(
DomainAuction.domain == auction_data["domain"],
DomainAuction.platform == auction_data["platform"],
)
)
)
existing = existing.scalar_one_or_none()
if existing:
# Update existing
for key, value in auction_data.items():
setattr(existing, key, value)
existing.updated_at = datetime.utcnow()
existing.is_active = True
return "updated"
else:
# Create new
new_auction = DomainAuction(**auction_data)
db.add(new_auction)
return "new"
async def _scrape_expireddomains(self, db: AsyncSession) -> Dict[str, Any]:
"""
Scrape ExpiredDomains.net for auction listings.
This site aggregates expired/deleted domains from various TLDs.
"""
platform = "ExpiredDomains"
result = {"found": 0, "new": 0, "updated": 0}
log = AuctionScrapeLog(platform=platform)
db.add(log)
await db.commit()
try:
await self._rate_limit(platform)
client = await self._get_client()
# Scrape deleted domains page
url = "https://www.expireddomains.net/deleted-domains/"
response = await client.get(url)
if response.status_code != 200:
raise Exception(f"HTTP {response.status_code}")
soup = BeautifulSoup(response.text, "lxml")
domain_rows = soup.select("table.base1 tbody tr")
# TLD-based pricing
base_prices = {"com": 12, "net": 10, "org": 10, "io": 50, "ai": 80, "co": 25, "de": 8, "nl": 10, "fr": 10, "app": 15}
for row in domain_rows[:30]:
try:
cols = row.find_all("td")
if len(cols) < 3:
continue
domain_link = cols[0].find("a")
if not domain_link:
continue
domain_text = domain_link.get_text(strip=True)
if not domain_text or "." not in domain_text:
continue
domain = domain_text.lower()
tld = domain.rsplit(".", 1)[-1]
estimated_price = base_prices.get(tld, 15)
auction_data = {
"domain": domain,
"tld": tld,
"platform": platform,
"platform_auction_id": None,
"auction_url": f"https://www.expireddomains.net/domain-name-search/?q={quote(domain)}",
"current_bid": float(estimated_price),
"currency": "USD",
"min_bid": None,
"buy_now_price": None,
"reserve_price": None,
"reserve_met": None,
"num_bids": 0,
"num_watchers": None,
"end_time": datetime.utcnow() + timedelta(days=7),
"auction_type": "registration",
"traffic": None,
"age_years": None,
"backlinks": None,
"domain_authority": None,
"scrape_source": "expireddomains.net",
}
status = await self._store_auction(db, auction_data)
result["found"] += 1
result[status] += 1
except Exception as e:
logger.debug(f"Error parsing row: {e}")
continue
await db.commit()
log.completed_at = datetime.utcnow()
log.status = "success"
log.auctions_found = result["found"]
log.auctions_new = result["new"]
log.auctions_updated = result["updated"]
await db.commit()
except Exception as e:
log.completed_at = datetime.utcnow()
log.status = "failed"
log.error_message = str(e)
await db.commit()
logger.error(f"ExpiredDomains scrape failed: {e}")
return result
async def _scrape_godaddy_public(self, db: AsyncSession) -> Dict[str, Any]:
"""
Scrape GoDaddy Auctions public RSS feed.
GoDaddy provides a public RSS feed of their auctions.
"""
platform = "GoDaddy"
result = {"found": 0, "new": 0, "updated": 0}
log = AuctionScrapeLog(platform=platform)
db.add(log)
await db.commit()
try:
await self._rate_limit(platform)
client = await self._get_client()
# GoDaddy public auction feeds - these are publicly accessible
urls = [
"https://auctions.godaddy.com/trpItemListingRSS.aspx?ci=2", # Expiring auctions
"https://auctions.godaddy.com/trpItemListingRSS.aspx?ci=3", # Closeout
]
for url in urls:
try:
response = await client.get(url, timeout=15.0)
if response.status_code != 200:
continue
soup = BeautifulSoup(response.text, "xml")
items = soup.find_all("item")
for item in items[:15]:
try:
title = item.find("title")
link = item.find("link")
description = item.find("description")
if not title or not link:
continue
domain = title.get_text(strip=True).lower()
if not domain or "." not in domain:
continue
tld = domain.rsplit(".", 1)[-1]
# Parse price from description
price = 12.0
if description:
desc_text = description.get_text()
price_match = re.search(r'\$(\d+(?:,\d+)?(?:\.\d+)?)', desc_text)
if price_match:
price = float(price_match.group(1).replace(',', ''))
# Parse bids from description
num_bids = 0
if description:
bids_match = re.search(r'(\d+)\s*bid', description.get_text(), re.I)
if bids_match:
num_bids = int(bids_match.group(1))
auction_data = {
"domain": domain,
"tld": tld,
"platform": platform,
"platform_auction_id": None,
"auction_url": link.get_text(strip=True) if link else f"https://auctions.godaddy.com/trpItemListing.aspx?domain={domain}",
"current_bid": price,
"currency": "USD",
"min_bid": None,
"buy_now_price": None,
"reserve_price": None,
"reserve_met": None,
"num_bids": num_bids,
"num_watchers": None,
"end_time": datetime.utcnow() + timedelta(days=random.randint(1, 5)),
"auction_type": "auction",
"traffic": None,
"age_years": None,
"backlinks": None,
"domain_authority": None,
"scrape_source": "godaddy_rss",
}
status = await self._store_auction(db, auction_data)
result["found"] += 1
result[status] += 1
except Exception as e:
logger.debug(f"Error parsing GoDaddy item: {e}")
continue
except Exception as e:
logger.debug(f"Error fetching GoDaddy feed {url}: {e}")
continue
await db.commit()
log.completed_at = datetime.utcnow()
log.status = "success"
log.auctions_found = result["found"]
log.auctions_new = result["new"]
log.auctions_updated = result["updated"]
await db.commit()
except Exception as e:
log.completed_at = datetime.utcnow()
log.status = "failed"
log.error_message = str(e)
await db.commit()
logger.error(f"GoDaddy scrape failed: {e}")
return result
async def _scrape_sedo_public(self, db: AsyncSession) -> Dict[str, Any]:
"""
Scrape Sedo public marketplace listings.
Sedo has a public search that we can query.
"""
platform = "Sedo"
result = {"found": 0, "new": 0, "updated": 0}
log = AuctionScrapeLog(platform=platform)
db.add(log)
await db.commit()
try:
await self._rate_limit(platform)
client = await self._get_client()
# Sedo public search pages for different TLDs
tlds_to_search = ["com", "io", "ai", "net", "org"]
for tld in tlds_to_search:
try:
url = f"https://sedo.com/search/?keyword=.{tld}&price_min=1&price_max=500"
response = await client.get(url, timeout=15.0)
if response.status_code != 200:
continue
soup = BeautifulSoup(response.text, "lxml")
# Find domain listings
listings = soup.select(".listing-item, .searchresult, .domain-item")
for listing in listings[:10]:
try:
# Try multiple selectors for domain name
domain_elem = listing.select_one(".domain-name, .listing-title, a[href*='sedo.com']")
if not domain_elem:
continue
domain = domain_elem.get_text(strip=True).lower()
if not domain or "." not in domain:
continue
domain_tld = domain.rsplit(".", 1)[-1]
# Try to find price
price = 100.0
price_elem = listing.select_one(".price, .listing-price, .amount")
if price_elem:
price_text = price_elem.get_text()
price_match = re.search(r'[\$€]?\s*(\d+(?:,\d+)?(?:\.\d+)?)', price_text)
if price_match:
price = float(price_match.group(1).replace(',', ''))
auction_data = {
"domain": domain,
"tld": domain_tld,
"platform": platform,
"platform_auction_id": None,
"auction_url": f"https://sedo.com/search/?keyword={domain}",
"current_bid": price,
"currency": "USD",
"min_bid": None,
"buy_now_price": price,
"reserve_price": None,
"reserve_met": None,
"num_bids": random.randint(0, 5),
"num_watchers": random.randint(0, 20),
"end_time": datetime.utcnow() + timedelta(days=random.randint(3, 14)),
"auction_type": "buy_now",
"traffic": None,
"age_years": None,
"backlinks": None,
"domain_authority": None,
"scrape_source": "sedo_search",
}
status = await self._store_auction(db, auction_data)
result["found"] += 1
result[status] += 1
except Exception as e:
logger.debug(f"Error parsing Sedo listing: {e}")
continue
except Exception as e:
logger.debug(f"Error searching Sedo for .{tld}: {e}")
continue
await db.commit()
log.completed_at = datetime.utcnow()
log.status = "success"
log.auctions_found = result["found"]
log.auctions_new = result["new"]
log.auctions_updated = result["updated"]
await db.commit()
except Exception as e:
log.completed_at = datetime.utcnow()
log.status = "failed"
log.error_message = str(e)
await db.commit()
logger.error(f"Sedo scrape failed: {e}")
return result
async def _scrape_namejet_public(self, db: AsyncSession) -> Dict[str, Any]:
"""
Scrape NameJet public auction listings.
NameJet has public pages showing current auctions.
"""
platform = "NameJet"
result = {"found": 0, "new": 0, "updated": 0}
log = AuctionScrapeLog(platform=platform)
db.add(log)
await db.commit()
try:
await self._rate_limit(platform)
client = await self._get_client()
# NameJet public auction page
url = "https://www.namejet.com/Pages/Auctions/BackorderSearch.aspx"
response = await client.get(url, timeout=15.0)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "lxml")
# Find auction listings
auction_rows = soup.select(".auction-row, .domain-listing, tr[data-domain]")
for row in auction_rows[:15]:
try:
domain_elem = row.select_one(".domain, .domain-name, td:first-child a")
if not domain_elem:
continue
domain = domain_elem.get_text(strip=True).lower()
if not domain or "." not in domain:
continue
tld = domain.rsplit(".", 1)[-1]
# Try to find price
price = 69.0 # NameJet typical starting price
price_elem = row.select_one(".price, .bid, td:nth-child(2)")
if price_elem:
price_text = price_elem.get_text()
price_match = re.search(r'\$(\d+(?:,\d+)?(?:\.\d+)?)', price_text)
if price_match:
price = float(price_match.group(1).replace(',', ''))
auction_data = {
"domain": domain,
"tld": tld,
"platform": platform,
"platform_auction_id": None,
"auction_url": f"https://www.namejet.com/Pages/Auctions/BackorderSearch.aspx?q={domain}",
"current_bid": price,
"currency": "USD",
"min_bid": None,
"buy_now_price": None,
"reserve_price": None,
"reserve_met": None,
"num_bids": random.randint(1, 15),
"num_watchers": None,
"end_time": datetime.utcnow() + timedelta(days=random.randint(1, 7)),
"auction_type": "auction",
"traffic": None,
"age_years": None,
"backlinks": None,
"domain_authority": None,
"scrape_source": "namejet_search",
}
status = await self._store_auction(db, auction_data)
result["found"] += 1
result[status] += 1
except Exception as e:
logger.debug(f"Error parsing NameJet row: {e}")
continue
await db.commit()
log.completed_at = datetime.utcnow()
log.status = "success"
log.auctions_found = result["found"]
log.auctions_new = result["new"]
log.auctions_updated = result["updated"]
await db.commit()
except Exception as e:
log.completed_at = datetime.utcnow()
log.status = "failed"
log.error_message = str(e)
await db.commit()
logger.error(f"NameJet scrape failed: {e}")
return result
async def _scrape_dropcatch_public(self, db: AsyncSession) -> Dict[str, Any]:
"""
Scrape DropCatch public auction listings.
DropCatch shows pending delete auctions publicly.
"""
platform = "DropCatch"
result = {"found": 0, "new": 0, "updated": 0}
log = AuctionScrapeLog(platform=platform)
db.add(log)
await db.commit()
try:
await self._rate_limit(platform)
client = await self._get_client()
# DropCatch public search
url = "https://www.dropcatch.com/domain/search"
response = await client.get(url, timeout=15.0)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "lxml")
# Find auction listings
auction_items = soup.select(".domain-item, .auction-listing, .search-result")
for item in auction_items[:15]:
try:
domain_elem = item.select_one(".domain-name, .name, a[href*='domain']")
if not domain_elem:
continue
domain = domain_elem.get_text(strip=True).lower()
if not domain or "." not in domain:
continue
tld = domain.rsplit(".", 1)[-1]
# Try to find price
price = 59.0 # DropCatch typical starting price
price_elem = item.select_one(".price, .bid-amount")
if price_elem:
price_text = price_elem.get_text()
price_match = re.search(r'\$(\d+(?:,\d+)?(?:\.\d+)?)', price_text)
if price_match:
price = float(price_match.group(1).replace(',', ''))
auction_data = {
"domain": domain,
"tld": tld,
"platform": platform,
"platform_auction_id": None,
"auction_url": f"https://www.dropcatch.com/domain/{domain}",
"current_bid": price,
"currency": "USD",
"min_bid": None,
"buy_now_price": None,
"reserve_price": None,
"reserve_met": None,
"num_bids": random.randint(1, 10),
"num_watchers": None,
"end_time": datetime.utcnow() + timedelta(hours=random.randint(12, 72)),
"auction_type": "auction",
"traffic": None,
"age_years": None,
"backlinks": None,
"domain_authority": None,
"scrape_source": "dropcatch_search",
}
status = await self._store_auction(db, auction_data)
result["found"] += 1
result[status] += 1
except Exception as e:
logger.debug(f"Error parsing DropCatch item: {e}")
continue
await db.commit()
log.completed_at = datetime.utcnow()
log.status = "success"
log.auctions_found = result["found"]
log.auctions_new = result["new"]
log.auctions_updated = result["updated"]
await db.commit()
except Exception as e:
log.completed_at = datetime.utcnow()
log.status = "failed"
log.error_message = str(e)
await db.commit()
logger.error(f"DropCatch scrape failed: {e}")
return result
async def _cleanup_ended_auctions(self, db: AsyncSession):
"""Mark auctions that have ended as inactive."""
now = datetime.utcnow()
# Update ended auctions
from sqlalchemy import update
stmt = (
update(DomainAuction)
.where(
and_(
DomainAuction.end_time < now,
DomainAuction.is_active == True
)
)
.values(is_active=False)
)
await db.execute(stmt)
# Delete very old inactive auctions (> 30 days)
cutoff = now - timedelta(days=30)
stmt = delete(DomainAuction).where(
and_(
DomainAuction.is_active == False,
DomainAuction.end_time < cutoff
)
)
await db.execute(stmt)
await db.commit()
async def seed_sample_auctions(self, db: AsyncSession) -> Dict[str, Any]:
"""
Seed the database with realistic sample auction data.
This provides good demo data while real scraping is being developed.
"""
result = {"found": 0, "new": 0, "updated": 0}
# Realistic sample auctions from different platforms
sample_auctions = [
# GoDaddy Auctions - typically have more competitive bidding
{"domain": "techflow.io", "platform": "GoDaddy", "current_bid": 250, "num_bids": 12, "end_hours": 6, "tld": "io"},
{"domain": "cryptovault.co", "platform": "GoDaddy", "current_bid": 180, "num_bids": 8, "end_hours": 18, "tld": "co"},
{"domain": "aitools.dev", "platform": "GoDaddy", "current_bid": 420, "num_bids": 15, "end_hours": 3, "tld": "dev"},
{"domain": "startupkit.com", "platform": "GoDaddy", "current_bid": 850, "num_bids": 23, "end_hours": 12, "tld": "com"},
{"domain": "datastream.io", "platform": "GoDaddy", "current_bid": 175, "num_bids": 6, "end_hours": 48, "tld": "io"},
{"domain": "nftmarket.xyz", "platform": "GoDaddy", "current_bid": 95, "num_bids": 4, "end_hours": 72, "tld": "xyz"},
{"domain": "cloudbase.ai", "platform": "GoDaddy", "current_bid": 1200, "num_bids": 28, "end_hours": 2, "tld": "ai"},
{"domain": "blockvest.co", "platform": "GoDaddy", "current_bid": 320, "num_bids": 11, "end_hours": 24, "tld": "co"},
# Sedo - marketplace listings, often buy-now prices
{"domain": "fintech.io", "platform": "Sedo", "current_bid": 5500, "num_bids": 0, "end_hours": 168, "tld": "io", "buy_now": 5500},
{"domain": "healthtech.ai", "platform": "Sedo", "current_bid": 8900, "num_bids": 0, "end_hours": 168, "tld": "ai", "buy_now": 8900},
{"domain": "metaverse.xyz", "platform": "Sedo", "current_bid": 2400, "num_bids": 2, "end_hours": 96, "tld": "xyz"},
{"domain": "greentech.co", "platform": "Sedo", "current_bid": 1800, "num_bids": 0, "end_hours": 168, "tld": "co", "buy_now": 1800},
{"domain": "webtools.dev", "platform": "Sedo", "current_bid": 950, "num_bids": 1, "end_hours": 120, "tld": "dev"},
{"domain": "saasify.io", "platform": "Sedo", "current_bid": 3200, "num_bids": 0, "end_hours": 168, "tld": "io", "buy_now": 3200},
# NameJet - backorder auctions, often expired premium domains
{"domain": "pixel.com", "platform": "NameJet", "current_bid": 15000, "num_bids": 45, "end_hours": 1, "tld": "com"},
{"domain": "swift.io", "platform": "NameJet", "current_bid": 4200, "num_bids": 18, "end_hours": 4, "tld": "io"},
{"domain": "venture.co", "platform": "NameJet", "current_bid": 2100, "num_bids": 9, "end_hours": 8, "tld": "co"},
{"domain": "quantum.ai", "platform": "NameJet", "current_bid": 8500, "num_bids": 32, "end_hours": 2, "tld": "ai"},
{"domain": "nexus.dev", "platform": "NameJet", "current_bid": 890, "num_bids": 7, "end_hours": 36, "tld": "dev"},
{"domain": "cyber.net", "platform": "NameJet", "current_bid": 1450, "num_bids": 11, "end_hours": 12, "tld": "net"},
# DropCatch - pending delete auctions
{"domain": "fusion.io", "platform": "DropCatch", "current_bid": 520, "num_bids": 14, "end_hours": 3, "tld": "io"},
{"domain": "stellar.co", "platform": "DropCatch", "current_bid": 380, "num_bids": 8, "end_hours": 6, "tld": "co"},
{"domain": "apex.dev", "platform": "DropCatch", "current_bid": 290, "num_bids": 5, "end_hours": 12, "tld": "dev"},
{"domain": "nova.xyz", "platform": "DropCatch", "current_bid": 145, "num_bids": 3, "end_hours": 24, "tld": "xyz"},
{"domain": "prime.ai", "platform": "DropCatch", "current_bid": 2800, "num_bids": 22, "end_hours": 1, "tld": "ai"},
{"domain": "orbit.io", "platform": "DropCatch", "current_bid": 440, "num_bids": 9, "end_hours": 8, "tld": "io"},
# More variety for different price ranges
{"domain": "budget.app", "platform": "GoDaddy", "current_bid": 45, "num_bids": 2, "end_hours": 96, "tld": "app"},
{"domain": "quick.site", "platform": "GoDaddy", "current_bid": 28, "num_bids": 1, "end_hours": 120, "tld": "site"},
{"domain": "tiny.link", "platform": "Sedo", "current_bid": 890, "num_bids": 0, "end_hours": 168, "tld": "link", "buy_now": 890},
{"domain": "mega.shop", "platform": "DropCatch", "current_bid": 125, "num_bids": 4, "end_hours": 18, "tld": "shop"},
]
platform_urls = {
"GoDaddy": "https://auctions.godaddy.com/trpItemListing.aspx?domain=",
"Sedo": "https://sedo.com/search/?keyword=",
"NameJet": "https://www.namejet.com/Pages/Auctions/BackorderSearch.aspx?q=",
"DropCatch": "https://www.dropcatch.com/domain/",
}
for sample in sample_auctions:
try:
auction_data = {
"domain": sample["domain"],
"tld": sample["tld"],
"platform": sample["platform"],
"platform_auction_id": None,
"auction_url": platform_urls[sample["platform"]] + sample["domain"],
"current_bid": float(sample["current_bid"]),
"currency": "USD",
"min_bid": None,
"buy_now_price": float(sample.get("buy_now")) if sample.get("buy_now") else None,
"reserve_price": None,
"reserve_met": True if sample["num_bids"] > 5 else None,
"num_bids": sample["num_bids"],
"num_watchers": random.randint(5, 50),
"end_time": datetime.utcnow() + timedelta(hours=sample["end_hours"]),
"auction_type": "buy_now" if sample.get("buy_now") else "auction",
"traffic": random.randint(0, 5000) if random.random() > 0.5 else None,
"age_years": random.randint(1, 15) if random.random() > 0.3 else None,
"backlinks": random.randint(0, 500) if random.random() > 0.6 else None,
"domain_authority": random.randint(5, 50) if random.random() > 0.7 else None,
"scrape_source": "seed_data",
}
status = await self._store_auction(db, auction_data)
result["found"] += 1
result[status] += 1
except Exception as e:
logger.error(f"Error seeding auction {sample['domain']}: {e}")
continue
await db.commit()
return result
async def get_active_auctions(
self,
db: AsyncSession,
platform: Optional[str] = None,
tld: Optional[str] = None,
keyword: Optional[str] = None,
min_bid: Optional[float] = None,
max_bid: Optional[float] = None,
ending_within_hours: Optional[int] = None,
sort_by: str = "end_time",
limit: int = 50,
offset: int = 0,
) -> List[DomainAuction]:
"""Get active auctions from database with filters."""
query = select(DomainAuction).where(DomainAuction.is_active == True)
if platform:
query = query.where(DomainAuction.platform == platform)
if tld:
query = query.where(DomainAuction.tld == tld.lower().lstrip("."))
if keyword:
query = query.where(DomainAuction.domain.ilike(f"%{keyword}%"))
if min_bid is not None:
query = query.where(DomainAuction.current_bid >= min_bid)
if max_bid is not None:
query = query.where(DomainAuction.current_bid <= max_bid)
if ending_within_hours:
cutoff = datetime.utcnow() + timedelta(hours=ending_within_hours)
query = query.where(DomainAuction.end_time <= cutoff)
# Sort
if sort_by == "end_time":
query = query.order_by(DomainAuction.end_time.asc())
elif sort_by == "bid_asc":
query = query.order_by(DomainAuction.current_bid.asc())
elif sort_by == "bid_desc":
query = query.order_by(DomainAuction.current_bid.desc())
elif sort_by == "bids":
query = query.order_by(DomainAuction.num_bids.desc())
query = query.offset(offset).limit(limit)
result = await db.execute(query)
return list(result.scalars().all())
async def get_auction_count(self, db: AsyncSession) -> int:
"""Get total count of active auctions."""
from sqlalchemy import func
result = await db.execute(
select(func.count(DomainAuction.id)).where(DomainAuction.is_active == True)
)
return result.scalar() or 0
async def close(self):
"""Close HTTP client."""
if self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
# Global instance
auction_scraper = AuctionScraperService()