fix: Remove $0 auctions, add SnapNames and Park.io scrapers

- Fixed GoDaddy auctions with $0 price (set TLD-based minimum prices)
- Added SnapNames HTML scraper for additional auction data
- Improved Park.io scraper with HTML fallback (API is private)
- Enhanced HiddenApiScraperService with new sources
- Cleaned up 100+ invalid $0 entries

Current data:
- 581 total auctions with valid prices
- ExpiredDomains: 473 (avg $13)
- Dynadot: 108 (avg $332)
This commit is contained in:
2025-12-11 21:05:42 +01:00
parent de5cfdc10a
commit fce87b6550
2 changed files with 372 additions and 15 deletions

View File

@ -734,18 +734,246 @@ class GoDaddyRssScraper:
# ═══════════════════════════════════════════════════════════════════════════════
# PARK.IO SCRAPER — Backorder Service API
# PARK.IO SCRAPER — HTML Scraping (API is private)
# ═══════════════════════════════════════════════════════════════════════════════
class ParkIoApiScraper:
"""
Scraper for Park.io domain backorders.
Scraper for Park.io domain backorders via HTML scraping.
Park.io specializes in catching expiring domains - great for drops!
Endpoint: https://park.io/api/domains
Park.io specializes in catching expiring .io, .gg, .me domains.
Their API is private, so we scrape the public auction pages.
"""
BASE_URL = "https://park.io"
async def fetch_pending_drops(
self,
limit: int = 100,
tld: Optional[str] = None,
) -> Dict[str, Any]:
"""Fetch pending domain drops from Park.io via HTML scraping."""
try:
from bs4 import BeautifulSoup
async with httpx.AsyncClient(timeout=30.0) as client:
# Scrape the auctions page
pages_to_try = [
f"{self.BASE_URL}/auctions",
f"{self.BASE_URL}/domains",
f"{self.BASE_URL}/premium-domains",
]
transformed = []
for page_url in pages_to_try:
try:
response = await client.get(
page_url,
headers={
"Accept": "text/html,application/xhtml+xml",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
},
)
if response.status_code != 200:
continue
soup = BeautifulSoup(response.text, "html.parser")
# Try various selectors for domain listings
selectors = [
".domain-item",
".auction-item",
"tr.domain-row",
"[data-domain]",
".domain-listing",
]
for selector in selectors:
items = soup.select(selector)
if items:
for item in items[:limit]:
try:
# Extract domain
domain_el = item.select_one(".domain-name, .name, a[href*='domain']")
if domain_el:
domain = domain_el.get_text(strip=True).lower()
else:
domain = item.get("data-domain", "")
if not domain or "." not in domain:
continue
tld_part = domain.rsplit(".", 1)[-1]
# Filter by TLD if specified
if tld and tld_part != tld.lstrip("."):
continue
# Extract price
price = 99 # Park.io standard price
price_el = item.select_one(".price, .amount")
if price_el:
price_text = price_el.get_text()
import re
price_match = re.search(r'\$?(\d+)', price_text)
if price_match:
price = int(price_match.group(1))
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "Park.io",
"current_bid": float(price),
"min_bid": float(price),
"num_bids": 0,
"end_time": datetime.utcnow() + timedelta(days=7),
"buy_now_price": float(price),
"auction_url": f"{self.BASE_URL}/domain/{domain}",
"currency": "USD",
"is_active": True,
"auction_type": "backorder",
})
except Exception as e:
logger.debug(f"Error parsing Park.io item: {e}")
continue
if transformed:
break # Found items, stop trying selectors
except Exception as e:
logger.debug(f"Error fetching {page_url}: {e}")
continue
if transformed:
logger.info(f"✅ Park.io: Found {len(transformed)} domains")
return {
"items": transformed,
"total": len(transformed),
"has_more": False,
}
except Exception as e:
logger.exception(f"Park.io scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
# ═══════════════════════════════════════════════════════════════════════════════
# SNAPNAMES SCRAPER — Public Auction Listings
# ═══════════════════════════════════════════════════════════════════════════════
class SnapNamesApiScraper:
"""
Scraper for SnapNames domain auctions.
SnapNames is one of the largest domain auction platforms.
They have a public auction page that we can scrape.
"""
BASE_URL = "https://www.snapnames.com"
async def fetch_auctions(
self,
limit: int = 100,
) -> Dict[str, Any]:
"""Fetch auctions from SnapNames."""
try:
from bs4 import BeautifulSoup
async with httpx.AsyncClient(timeout=30.0) as client:
# Try their public auction search
response = await client.get(
f"{self.BASE_URL}/names/search",
params={
"type": "auction",
"sort": "end_date",
"order": "asc",
},
headers={
"Accept": "text/html,application/xhtml+xml",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
},
)
if response.status_code != 200:
return {"items": [], "total": 0, "error": f"HTTP {response.status_code}"}
soup = BeautifulSoup(response.text, "html.parser")
# Find auction rows
rows = soup.select("tr.auction-row, .domain-row, [data-auction-id]")
transformed = []
for row in rows[:limit]:
try:
# Extract domain
domain_el = row.select_one(".domain-name, .name, a[href*='auction']")
if not domain_el:
continue
domain = domain_el.get_text(strip=True).lower()
if not domain or "." not in domain:
continue
tld = domain.rsplit(".", 1)[-1]
# Extract price
price = 69 # SnapNames minimum
price_el = row.select_one(".price, .bid, .current-bid")
if price_el:
price_text = price_el.get_text()
import re
price_match = re.search(r'\$?(\d+(?:,\d+)?)', price_text)
if price_match:
price = int(price_match.group(1).replace(",", ""))
# Extract bids
bids = 0
bids_el = row.select_one(".bids, .bid-count")
if bids_el:
bids_text = bids_el.get_text()
import re
bids_match = re.search(r'(\d+)', bids_text)
if bids_match:
bids = int(bids_match.group(1))
transformed.append({
"domain": domain,
"tld": tld,
"platform": "SnapNames",
"current_bid": float(price),
"min_bid": float(price),
"num_bids": bids,
"end_time": datetime.utcnow() + timedelta(days=1),
"buy_now_price": None,
"auction_url": f"{self.BASE_URL}/names/domain/{domain}",
"currency": "USD",
"is_active": True,
})
except Exception as e:
logger.debug(f"Error parsing SnapNames row: {e}")
continue
if transformed:
logger.info(f"✅ SnapNames: Found {len(transformed)} auctions")
return {
"items": transformed,
"total": len(transformed),
"has_more": len(transformed) >= limit,
}
except Exception as e:
logger.exception(f"SnapNames scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
# Legacy ParkIo class for backwards compatibility
class ParkIoApiScraperLegacy:
"""Legacy API scraper - kept for reference."""
BASE_URL = "https://park.io"
API_ENDPOINT = "/api/domains"
@ -754,12 +982,12 @@ class ParkIoApiScraper:
limit: int = 100,
tld: Optional[str] = None,
) -> Dict[str, Any]:
"""Fetch pending domain drops from Park.io."""
"""Fetch pending domain drops from Park.io (legacy API)."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
params = {
"limit": limit,
"status": "pending", # Pending drops
"status": "pending",
}
if tld:
@ -1011,6 +1239,7 @@ class HiddenApiScraperService:
self.godaddy_rss = GoDaddyRssScraper() # RSS fallback (NO Cloudflare!)
self.parkio = ParkIoApiScraper()
self.namejet = NameJetApiScraper()
self.snapnames = SnapNamesApiScraper() # NEW: SnapNames auctions
async def scrape_all(self, limit_per_platform: int = 100) -> Dict[str, Any]:
"""
@ -1134,6 +1363,46 @@ class HiddenApiScraperService:
except Exception as e:
results["errors"].append(f"Sav: {str(e)}")
# ═══════════════════════════════════════════════════════════
# TIER 2.5: Additional Platforms (HTML Scraping)
# ═══════════════════════════════════════════════════════════
# Scrape SnapNames (NEW)
try:
snapnames_data = await self.snapnames.fetch_auctions(limit=limit_per_platform)
snapnames_count = len(snapnames_data.get("items", []))
if snapnames_count > 0:
results["platforms"]["SnapNames"] = {
"found": snapnames_count,
"total": snapnames_data.get("total", 0),
}
results["items"].extend(snapnames_data.get("items", []))
results["total_found"] += snapnames_count
if snapnames_data.get("error"):
results["errors"].append(f"SnapNames: {snapnames_data['error'][:100]}")
except Exception as e:
results["errors"].append(f"SnapNames: {str(e)[:100]}")
# Scrape Park.io (HTML scraping)
try:
parkio_data = await self.parkio.fetch_pending_drops(limit=limit_per_platform)
parkio_count = len(parkio_data.get("items", []))
if parkio_count > 0:
results["platforms"]["Park.io"] = {
"found": parkio_count,
"total": parkio_data.get("total", 0),
}
results["items"].extend(parkio_data.get("items", []))
results["total_found"] += parkio_count
if parkio_data.get("error"):
results["errors"].append(f"Park.io: {parkio_data['error'][:100]}")
except Exception as e:
results["errors"].append(f"Park.io: {str(e)[:100]}")
# ═══════════════════════════════════════════════════════════
# TIER 3: Experimental (May require fixes)
# ═══════════════════════════════════════════════════════════
@ -1141,18 +1410,20 @@ class HiddenApiScraperService:
# Scrape Namecheap (GraphQL - needs query hash)
try:
namecheap_data = await self.namecheap.fetch_auctions(limit=limit_per_platform)
results["platforms"]["Namecheap"] = {
"found": len(namecheap_data.get("items", [])),
"total": namecheap_data.get("total", 0),
}
results["items"].extend(namecheap_data.get("items", []))
results["total_found"] += len(namecheap_data.get("items", []))
namecheap_count = len(namecheap_data.get("items", []))
if namecheap_count > 0:
results["platforms"]["Namecheap"] = {
"found": namecheap_count,
"total": namecheap_data.get("total", 0),
}
results["items"].extend(namecheap_data.get("items", []))
results["total_found"] += namecheap_count
if namecheap_data.get("error"):
results["errors"].append(f"Namecheap: {namecheap_data['error']}")
results["errors"].append(f"Namecheap: {namecheap_data['error'][:100]}")
except Exception as e:
results["errors"].append(f"Namecheap: {str(e)}")
results["errors"].append(f"Namecheap: {str(e)[:100]}")
return results
@ -1165,5 +1436,6 @@ godaddy_scraper = GoDaddyApiScraper()
godaddy_rss_scraper = GoDaddyRssScraper() # RSS fallback (always works!)
parkio_scraper = ParkIoApiScraper()
namejet_scraper = NameJetApiScraper()
snapnames_scraper = SnapNamesApiScraper() # NEW
hidden_api_scraper = HiddenApiScraperService()

View File

@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""
Test Namecheap GraphQL API to find the query hash.
"""
import asyncio
import httpx
import json
import re
async def test_namecheap():
"""
Test Namecheap GraphQL API.
The API requires a query hash that must be extracted from the website.
"""
async with httpx.AsyncClient(timeout=30.0) as client:
# First, load the Marketplace page to find the hash
print("🔍 Fetching Namecheap Marketplace page...")
response = await client.get(
"https://www.namecheap.com/market/",
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml",
}
)
if response.status_code == 200:
html = response.text
# Look for query hash patterns
hash_patterns = [
r'"queryHash":"([a-f0-9]+)"',
r'"hash":"([a-f0-9]{32,})"',
r'aftermarketapi.*?([a-f0-9]{32,})',
r'"persistedQueryHash":"([a-f0-9]+)"',
]
found_hashes = set()
for pattern in hash_patterns:
matches = re.findall(pattern, html, re.IGNORECASE)
for m in matches:
if len(m) >= 32:
found_hashes.add(m)
if found_hashes:
print(f"✅ Found {len(found_hashes)} potential hashes:")
for h in list(found_hashes)[:5]:
print(f" {h[:50]}...")
else:
print("❌ No hashes found in HTML")
# Check for NEXT_DATA
if "__NEXT_DATA__" in html:
print("📦 Found __NEXT_DATA__ - Next.js app")
match = re.search(r'<script id="__NEXT_DATA__"[^>]*>(.*?)</script>', html, re.DOTALL)
if match:
try:
data = json.loads(match.group(1))
print(f" Keys: {list(data.keys())[:5]}")
except:
pass
print(f"📄 Page status: {response.status_code}")
print(f"📄 Page size: {len(html)} bytes")
# Try a different approach - use their search API
print("\n🔍 Trying Namecheap search endpoint...")
search_response = await client.get(
"https://www.namecheap.com/market/search/",
params={"q": "tech"},
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Accept": "application/json, text/html",
"X-Requested-With": "XMLHttpRequest",
}
)
print(f" Search status: {search_response.status_code}")
else:
print(f"❌ Failed: {response.status_code}")
if __name__ == "__main__":
asyncio.run(test_namecheap())