pounce/backend/app/services/hidden_api_scrapers.py
Yves Gugger 42fc4fec52
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: Enhanced auction scrapers with multiple sources
- Add GoDaddy RSS Feed scraper (bypasses Cloudflare)
- Enhanced ExpiredDomains scraper (multiple pages, TLDs)
- Improved hidden API scrapers integration
- Add automated scraper cron script (runs every 30 min)
- Playwright stealth mode installed on server

Sources now working:
- Dynadot REST API: ~100 auctions
- GoDaddy RSS: ~100 auctions
- ExpiredDomains: ~250 auctions

Total: 467 auctions in database
2025-12-11 20:58:04 +01:00

1170 lines
52 KiB
Python

"""
Hidden JSON API Scrapers for Domain Auction Platforms.
These scrapers use undocumented but public JSON endpoints that are
much more reliable than HTML scraping.
Discovered Endpoints (December 2025):
- Namecheap: GraphQL API at aftermarketapi.namecheap.com
- Dynadot: REST API at dynadot-vue-api
- Sav.com: AJAX endpoint for auction listings
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import httpx
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════════════
# AFFILIATE LINKS — Monetization through referral commissions
# ═══════════════════════════════════════════════════════════════════════════════
AFFILIATE_CONFIG = {
"Namecheap": {
"base_url": "https://www.namecheap.com/market/",
"affiliate_param": "aff=pounce", # TODO: Replace with actual affiliate ID
"auction_url_template": "https://www.namecheap.com/market/domain/{domain}?aff=pounce",
},
"Dynadot": {
"base_url": "https://www.dynadot.com/market/",
"affiliate_param": "affiliate_id=pounce", # TODO: Replace with actual affiliate ID
"auction_url_template": "https://www.dynadot.com/market/auction/{domain}?affiliate_id=pounce",
},
"Sav": {
"base_url": "https://www.sav.com/auctions",
"affiliate_param": "ref=pounce", # TODO: Replace with actual affiliate ID
"auction_url_template": "https://www.sav.com/domain/{domain}?ref=pounce",
},
"GoDaddy": {
"base_url": "https://auctions.godaddy.com/",
"affiliate_param": "isc=cjcpounce", # TODO: Replace with actual CJ affiliate ID
"auction_url_template": "https://auctions.godaddy.com/trpItemListing.aspx?domain={domain}&isc=cjcpounce",
},
"DropCatch": {
"base_url": "https://www.dropcatch.com/",
"affiliate_param": None, # No affiliate program
"auction_url_template": "https://www.dropcatch.com/domain/{domain}",
},
"Sedo": {
"base_url": "https://sedo.com/",
"affiliate_param": "partnerid=pounce", # TODO: Replace with actual partner ID
"auction_url_template": "https://sedo.com/search/details/?domain={domain}&partnerid=pounce",
},
"NameJet": {
"base_url": "https://www.namejet.com/",
"affiliate_param": None, # No public affiliate program
"auction_url_template": "https://www.namejet.com/pages/Auctions/ViewAuctions.aspx?domain={domain}",
},
"ExpiredDomains": {
"base_url": "https://www.expireddomains.net/",
"affiliate_param": None, # Aggregator, links to actual registrars
"auction_url_template": "https://www.expireddomains.net/domain-name-search/?q={domain}",
},
}
def build_affiliate_url(platform: str, domain: str, original_url: Optional[str] = None) -> str:
"""
Build an affiliate URL for a given platform and domain.
If the platform has an affiliate program, the URL will include
the affiliate tracking parameter. Otherwise, returns the original URL.
"""
config = AFFILIATE_CONFIG.get(platform, {})
if config.get("auction_url_template"):
return config["auction_url_template"].format(domain=domain)
return original_url or f"https://www.google.com/search?q={domain}+auction"
# ═══════════════════════════════════════════════════════════════════════════════
# NAMECHEAP SCRAPER — GraphQL API
# ═══════════════════════════════════════════════════════════════════════════════
class NamecheapApiScraper:
"""
Scraper for Namecheap Marketplace using their hidden GraphQL API.
Endpoint: https://aftermarketapi.namecheap.com/client/graphql
This is a public API used by their frontend, stable and reliable.
"""
GRAPHQL_ENDPOINT = "https://aftermarketapi.namecheap.com/client/graphql"
# GraphQL query for fetching auctions
AUCTIONS_QUERY = """
query GetAuctions($filter: AuctionFilterInput, $pagination: PaginationInput, $sort: SortInput) {
auctions(filter: $filter, pagination: $pagination, sort: $sort) {
items {
id
domain
currentBid
minBid
bidCount
endTime
status
buyNowPrice
hasBuyNow
}
totalCount
pageInfo {
hasNextPage
endCursor
}
}
}
"""
async def fetch_auctions(
self,
limit: int = 100,
offset: int = 0,
keyword: Optional[str] = None,
tld: Optional[str] = None,
) -> Dict[str, Any]:
"""Fetch auctions from Namecheap GraphQL API."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
# Build filter
filter_input = {}
if keyword:
filter_input["searchTerm"] = keyword
if tld:
filter_input["tld"] = tld.lstrip(".")
variables = {
"filter": filter_input,
"pagination": {"limit": limit, "offset": offset},
"sort": {"field": "endTime", "direction": "ASC"},
}
response = await client.post(
self.GRAPHQL_ENDPOINT,
json={
"query": self.AUCTIONS_QUERY,
"variables": variables,
},
headers={
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Origin": "https://www.namecheap.com",
"Referer": "https://www.namecheap.com/market/",
},
)
if response.status_code != 200:
logger.error(f"Namecheap API error: {response.status_code}")
return {"items": [], "total": 0, "error": response.text}
data = response.json()
if "errors" in data:
logger.error(f"Namecheap GraphQL errors: {data['errors']}")
return {"items": [], "total": 0, "error": str(data["errors"])}
auctions_data = data.get("data", {}).get("auctions", {})
items = auctions_data.get("items", [])
# Transform to Pounce format
transformed = []
for item in items:
domain = item.get("domain", "")
tld_part = domain.rsplit(".", 1)[-1] if "." in domain else ""
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "Namecheap",
"current_bid": float(item.get("currentBid", 0)),
"min_bid": float(item.get("minBid", 0)),
"num_bids": int(item.get("bidCount", 0)),
"end_time": item.get("endTime"),
"buy_now_price": float(item.get("buyNowPrice")) if item.get("hasBuyNow") else None,
"auction_url": build_affiliate_url("Namecheap", domain),
"currency": "USD",
"is_active": True,
})
return {
"items": transformed,
"total": auctions_data.get("totalCount", 0),
"has_more": auctions_data.get("pageInfo", {}).get("hasNextPage", False),
}
except Exception as e:
logger.exception(f"Namecheap API scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
# ═══════════════════════════════════════════════════════════════════════════════
# DYNADOT SCRAPER — REST JSON API
# ═══════════════════════════════════════════════════════════════════════════════
class DynadotApiScraper:
"""
Scraper for Dynadot Marketplace using their hidden JSON API.
Endpoints:
- /dynadot-vue-api/dynadot-service/marketplace-api
- /dynadot-vue-api/dynadot-service/main-site-api
Supports:
- EXPIRED_AUCTION: Expired auctions
- BACKORDER: Backorder listings
- USER_LISTING: User marketplace listings
"""
BASE_URL = "https://www.dynadot.com"
MARKETPLACE_API = "/dynadot-vue-api/dynadot-service/marketplace-api"
async def fetch_auctions(
self,
aftermarket_type: str = "EXPIRED_AUCTION",
page_size: int = 100,
page_index: int = 0,
keyword: Optional[str] = None,
) -> Dict[str, Any]:
"""Fetch auctions from Dynadot REST API."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
params = {
"command": "get_list",
"aftermarket_type": aftermarket_type,
"page_size": page_size,
"page_index": page_index,
"lang": "en",
}
if keyword:
params["keyword"] = keyword
response = await client.post(
f"{self.BASE_URL}{self.MARKETPLACE_API}",
params=params,
headers={
"Accept": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Referer": "https://www.dynadot.com/market",
},
)
if response.status_code != 200:
logger.error(f"Dynadot API error: {response.status_code}")
return {"items": [], "total": 0, "error": response.text}
data = response.json()
# Dynadot returns code: 200 for success
if data.get("code") not in [0, 200] and data.get("msg") != "success":
logger.error(f"Dynadot API error: {data}")
return {"items": [], "total": 0, "error": str(data)}
# Data can be in 'records' or 'list'
listings = data.get("data", {}).get("records", []) or data.get("data", {}).get("list", [])
# Transform to Pounce format
transformed = []
for item in listings:
domain = item.get("domain", "") or item.get("name", "") or item.get("utf8_name", "")
tld_part = domain.rsplit(".", 1)[-1] if "." in domain else ""
# Parse end time (Dynadot uses timestamp in milliseconds or string)
end_time = None
end_time_stamp = item.get("end_time_stamp")
if end_time_stamp:
try:
end_time = datetime.fromtimestamp(end_time_stamp / 1000)
except:
pass
if not end_time:
end_time_str = item.get("end_time") or item.get("auction_end_time")
if end_time_str:
try:
# Format: "2025/12/12 08:00 PST"
end_time = datetime.strptime(end_time_str.split(" PST")[0], "%Y/%m/%d %H:%M")
except:
end_time = datetime.utcnow() + timedelta(days=1)
# Parse bid price (can be string or number)
bid_price = item.get("bid_price") or item.get("current_bid") or item.get("price") or 0
if isinstance(bid_price, str):
bid_price = float(bid_price.replace(",", "").replace("$", ""))
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "Dynadot",
"current_bid": float(bid_price),
"min_bid": float(item.get("start_price", 0) or 0),
"num_bids": int(item.get("bids", 0) or item.get("bid_count", 0) or 0),
"end_time": end_time or datetime.utcnow() + timedelta(days=1),
"buy_now_price": float(item.get("accepted_bid_price")) if item.get("accepted_bid_price") else None,
"auction_url": build_affiliate_url("Dynadot", domain),
"currency": item.get("bid_price_currency", "USD"),
"is_active": True,
# Map to existing DomainAuction fields
"backlinks": int(item.get("links", 0) or 0),
"age_years": int(item.get("age", 0) or 0),
})
return {
"items": transformed,
"total": data.get("data", {}).get("total_count", len(transformed)),
"has_more": len(listings) >= page_size,
}
except Exception as e:
logger.exception(f"Dynadot API scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
# ═══════════════════════════════════════════════════════════════════════════════
# SAV.COM SCRAPER — AJAX JSON API
# ═══════════════════════════════════════════════════════════════════════════════
class SavApiScraper:
"""
Scraper for Sav.com Auctions using their hidden AJAX endpoint.
Endpoint: /auctions/load_domains_ajax/{page}
Simple POST request that returns paginated auction data.
"""
BASE_URL = "https://www.sav.com"
AJAX_ENDPOINT = "/auctions/load_domains_ajax"
async def fetch_auctions(
self,
page: int = 0,
) -> Dict[str, Any]:
"""Fetch auctions from Sav.com AJAX API."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.BASE_URL}{self.AJAX_ENDPOINT}/{page}",
headers={
"Accept": "application/json, text/html",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Referer": "https://www.sav.com/domains/auctions",
"X-Requested-With": "XMLHttpRequest",
},
)
if response.status_code != 200:
logger.error(f"Sav API error: {response.status_code}")
return {"items": [], "total": 0, "error": response.text}
# The response is HTML but contains structured data
# We need to parse it or check for JSON
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
data = response.json()
else:
# HTML response - parse it
# For now, we'll use BeautifulSoup if needed
logger.warning("Sav returned HTML instead of JSON, parsing...")
return await self._parse_html_response(response.text)
listings = data.get("domains", data.get("auctions", []))
# Transform to Pounce format
transformed = []
for item in listings:
domain = item.get("domain", "") or item.get("name", "")
tld_part = domain.rsplit(".", 1)[-1] if "." in domain else ""
# Parse end time
end_time_str = item.get("end_time") or item.get("ends_at")
end_time = None
if end_time_str:
try:
end_time = datetime.fromisoformat(end_time_str.replace("Z", "+00:00"))
except:
end_time = datetime.utcnow() + timedelta(days=1)
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "Sav",
"current_bid": float(item.get("current_bid", 0) or item.get("price", 0)),
"min_bid": float(item.get("min_bid", 0) or 0),
"num_bids": int(item.get("bids", 0) or 0),
"end_time": end_time,
"buy_now_price": float(item.get("buy_now")) if item.get("buy_now") else None,
"auction_url": build_affiliate_url("Sav", domain),
"currency": "USD",
"is_active": True,
})
return {
"items": transformed,
"total": len(transformed),
"has_more": len(listings) >= 20, # Default page size
}
except Exception as e:
logger.exception(f"Sav API scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
async def _parse_html_response(self, html: str) -> Dict[str, Any]:
"""Parse HTML response from Sav.com when JSON is not available."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
# Find auction rows
rows = soup.select(".auction-row, .domain-row, tr[data-domain]")
transformed = []
for row in rows:
domain_el = row.select_one(".domain-name, .name, [data-domain]")
price_el = row.select_one(".price, .bid, .current-bid")
time_el = row.select_one(".time-left, .ends, .countdown")
bids_el = row.select_one(".bids, .bid-count")
if not domain_el:
continue
domain = domain_el.get_text(strip=True) or domain_el.get("data-domain", "")
tld_part = domain.rsplit(".", 1)[-1] if "." in domain else ""
price_text = price_el.get_text(strip=True) if price_el else "0"
price = float("".join(c for c in price_text if c.isdigit() or c == ".") or "0")
bids_text = bids_el.get_text(strip=True) if bids_el else "0"
bids = int("".join(c for c in bids_text if c.isdigit()) or "0")
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "Sav",
"current_bid": price,
"min_bid": 0,
"num_bids": bids,
"end_time": datetime.utcnow() + timedelta(days=1), # Estimate
"buy_now_price": None,
"auction_url": build_affiliate_url("Sav", domain),
"currency": "USD",
"is_active": True,
})
return {
"items": transformed,
"total": len(transformed),
"has_more": len(rows) >= 20,
}
except Exception as e:
logger.exception(f"Sav HTML parsing error: {e}")
return {"items": [], "total": 0, "error": str(e)}
# ═══════════════════════════════════════════════════════════════════════════════
# GODADDY SCRAPER — Hidden REST JSON API
# ═══════════════════════════════════════════════════════════════════════════════
class GoDaddyApiScraper:
"""
Scraper for GoDaddy Auctions using their hidden JSON API.
Discovered Endpoint:
https://auctions.godaddy.com/beta/findApiProxy/v4/aftermarket/find/auction/recommend
Parameters:
- paginationSize: number of results (max 150)
- paginationStart: offset
- sortBy: auctionBids:desc, auctionValuationPrice:desc, endingAt:asc
- endTimeAfter: ISO timestamp
- typeIncludeList: 14,16,38 (auction types)
"""
BASE_URL = "https://auctions.godaddy.com"
API_ENDPOINT = "/beta/findApiProxy/v4/aftermarket/find/auction/recommend"
async def fetch_auctions(
self,
limit: int = 100,
offset: int = 0,
sort_by: str = "auctionBids:desc",
ending_within_hours: Optional[int] = None,
) -> Dict[str, Any]:
"""Fetch auctions from GoDaddy hidden JSON API."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
params = {
"paginationSize": min(limit, 150),
"paginationStart": offset,
"sortBy": sort_by,
"typeIncludeList": "14,16,38", # All auction types
"endTimeAfter": datetime.utcnow().isoformat() + "Z",
}
if ending_within_hours:
end_before = (datetime.utcnow() + timedelta(hours=ending_within_hours)).isoformat() + "Z"
params["endTimeBefore"] = end_before
response = await client.get(
f"{self.BASE_URL}{self.API_ENDPOINT}",
params=params,
headers={
"Accept": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Referer": "https://auctions.godaddy.com/beta",
},
)
if response.status_code != 200:
logger.error(f"GoDaddy API error: {response.status_code}")
return {"items": [], "total": 0, "error": response.text}
data = response.json()
# GoDaddy returns listings in 'results' array
listings = data.get("results", [])
# Transform to Pounce format
transformed = []
for item in listings:
domain = item.get("fqdn", "") or item.get("domain", "")
tld_part = domain.rsplit(".", 1)[-1] if "." in domain else ""
# Parse end time
end_time = None
end_at = item.get("endingAt") or item.get("auctionEndTime")
if end_at:
try:
end_time = datetime.fromisoformat(end_at.replace("Z", "+00:00")).replace(tzinfo=None)
except:
pass
# Parse price (can be in different fields)
price = (
item.get("price") or
item.get("currentBidPrice") or
item.get("auctionPrice") or
item.get("minBid") or 0
)
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "GoDaddy",
"current_bid": float(price) if price else 0,
"min_bid": float(item.get("minBid", 0) or 0),
"num_bids": int(item.get("bids", 0) or item.get("bidCount", 0) or 0),
"end_time": end_time or datetime.utcnow() + timedelta(days=1),
"buy_now_price": float(item.get("buyNowPrice")) if item.get("buyNowPrice") else None,
"auction_url": build_affiliate_url("GoDaddy", domain),
"currency": "USD",
"is_active": True,
"traffic": int(item.get("traffic", 0) or 0),
"domain_authority": int(item.get("domainAuthority", 0) or item.get("valuationPrice", 0) or 0),
})
return {
"items": transformed,
"total": data.get("totalRecordCount", len(transformed)),
"has_more": len(listings) >= limit,
}
except Exception as e:
logger.exception(f"GoDaddy API scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
# ═══════════════════════════════════════════════════════════════════════════════
# GODADDY RSS SCRAPER — Public RSS Feed (NO Cloudflare!)
# ═══════════════════════════════════════════════════════════════════════════════
class GoDaddyRssScraper:
"""
Scraper for GoDaddy Auctions using their PUBLIC RSS feeds.
These RSS feeds are NOT protected by Cloudflare and always work!
Feeds:
- https://auctions.godaddy.com/rss/ending.aspx (Ending Soon)
- https://auctions.godaddy.com/rss/new.aspx (New Auctions)
- https://auctions.godaddy.com/rss/closeouts.aspx (Closeouts)
"""
RSS_FEEDS = {
"ending": "https://auctions.godaddy.com/rss/ending.aspx",
"new": "https://auctions.godaddy.com/rss/new.aspx",
"closeouts": "https://auctions.godaddy.com/rss/closeouts.aspx",
}
async def fetch_auctions(
self,
feed_type: str = "ending", # "ending", "new", or "closeouts"
limit: int = 100,
) -> Dict[str, Any]:
"""Fetch auctions from GoDaddy RSS feeds."""
try:
import xml.etree.ElementTree as ET
feed_url = self.RSS_FEEDS.get(feed_type, self.RSS_FEEDS["ending"])
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
feed_url,
headers={
"Accept": "application/rss+xml, application/xml, text/xml",
"User-Agent": "Mozilla/5.0 (compatible; PounceBot/1.0; +https://pounce.ch)",
},
)
if response.status_code != 200:
logger.error(f"GoDaddy RSS error: {response.status_code}")
return {"items": [], "total": 0, "error": f"HTTP {response.status_code}"}
# Parse RSS XML
root = ET.fromstring(response.text)
# Find all items in the RSS feed
items = root.findall(".//item")
transformed = []
for item in items[:limit]:
try:
title = item.find("title").text if item.find("title") is not None else ""
link = item.find("link").text if item.find("link") is not None else ""
description = item.find("description").text if item.find("description") is not None else ""
# Extract domain from title (format: "domain.com - $XX")
domain = ""
price = 0
if title:
# Title format: "example.com - $12" or "example.com"
parts = title.split(" - ")
domain = parts[0].strip().lower()
if len(parts) > 1:
price_str = parts[1].replace("$", "").replace(",", "").strip()
try:
price = float(price_str)
except:
pass
# Try to extract price from description if not in title
if price == 0 and description:
import re
price_match = re.search(r'\$([0-9,]+(?:\.[0-9]+)?)', description)
if price_match:
price = float(price_match.group(1).replace(",", ""))
if not domain or "." not in domain:
continue
tld = domain.rsplit(".", 1)[-1]
# Add affiliate param to link
affiliate_url = link
if link and "?" in link:
affiliate_url = f"{link}&isc=cjcpounce"
elif link:
affiliate_url = f"{link}?isc=cjcpounce"
else:
affiliate_url = build_affiliate_url("GoDaddy", domain)
transformed.append({
"domain": domain,
"tld": tld,
"platform": "GoDaddy",
"current_bid": price,
"min_bid": price,
"num_bids": 0, # RSS doesn't provide bid count
"end_time": datetime.utcnow() + timedelta(hours=24), # Estimate
"buy_now_price": None,
"auction_url": affiliate_url,
"currency": "USD",
"is_active": True,
"source": f"RSS-{feed_type}",
})
except Exception as e:
logger.warning(f"Error parsing GoDaddy RSS item: {e}")
continue
logger.info(f"GoDaddy RSS ({feed_type}): Found {len(transformed)} auctions")
return {
"items": transformed,
"total": len(transformed),
"has_more": False,
}
except Exception as e:
logger.exception(f"GoDaddy RSS scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
async def fetch_all_feeds(self) -> Dict[str, Any]:
"""Fetch from all GoDaddy RSS feeds."""
all_items = []
errors = []
for feed_type in ["ending", "new", "closeouts"]:
result = await self.fetch_auctions(feed_type=feed_type, limit=50)
all_items.extend(result.get("items", []))
if result.get("error"):
errors.append(f"{feed_type}: {result['error']}")
# Dedupe by domain
seen = set()
unique_items = []
for item in all_items:
if item["domain"] not in seen:
seen.add(item["domain"])
unique_items.append(item)
return {
"items": unique_items,
"total": len(unique_items),
"errors": errors if errors else None,
}
# ═══════════════════════════════════════════════════════════════════════════════
# PARK.IO SCRAPER — Backorder Service API
# ═══════════════════════════════════════════════════════════════════════════════
class ParkIoApiScraper:
"""
Scraper for Park.io domain backorders.
Park.io specializes in catching expiring domains - great for drops!
Endpoint: https://park.io/api/domains
"""
BASE_URL = "https://park.io"
API_ENDPOINT = "/api/domains"
async def fetch_pending_drops(
self,
limit: int = 100,
tld: Optional[str] = None,
) -> Dict[str, Any]:
"""Fetch pending domain drops from Park.io."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
params = {
"limit": limit,
"status": "pending", # Pending drops
}
if tld:
params["tld"] = tld.lstrip(".")
response = await client.get(
f"{self.BASE_URL}{self.API_ENDPOINT}",
params=params,
headers={
"Accept": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
},
)
if response.status_code != 200:
logger.error(f"Park.io API error: {response.status_code}")
return {"items": [], "total": 0, "error": response.text}
data = response.json()
domains = data.get("domains", []) if isinstance(data, dict) else data
# Transform to Pounce format
transformed = []
for item in domains:
domain = item.get("domain", "") or item.get("name", "")
tld_part = domain.rsplit(".", 1)[-1] if "." in domain else ""
# Parse drop date
drop_date = None
drop_at = item.get("drop_date") or item.get("expires_at")
if drop_at:
try:
drop_date = datetime.fromisoformat(drop_at.replace("Z", "+00:00")).replace(tzinfo=None)
except:
drop_date = datetime.utcnow() + timedelta(days=1)
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "Park.io",
"current_bid": float(item.get("price", 99)), # Park.io default price
"min_bid": float(item.get("min_price", 99)),
"num_bids": int(item.get("backorders", 0) or 0), # Number of backorders
"end_time": drop_date or datetime.utcnow() + timedelta(days=1),
"buy_now_price": None, # Backorder, not auction
"auction_url": f"https://park.io/domains/{domain}",
"auction_type": "backorder",
"currency": "USD",
"is_active": True,
})
return {
"items": transformed,
"total": len(transformed),
"has_more": len(domains) >= limit,
}
except Exception as e:
logger.exception(f"Park.io API scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
# ═══════════════════════════════════════════════════════════════════════════════
# NAMEJET SCRAPER — Hidden AJAX API
# ═══════════════════════════════════════════════════════════════════════════════
class NameJetApiScraper:
"""
Scraper for NameJet auctions using their AJAX endpoint.
NameJet is owned by GoDaddy but operates independently.
Uses a hidden AJAX endpoint for loading auction data.
"""
BASE_URL = "https://www.namejet.com"
AJAX_ENDPOINT = "/PreRelease/Auctions/LoadPage"
async def fetch_auctions(
self,
limit: int = 100,
page: int = 1,
sort_by: str = "EndTime",
) -> Dict[str, Any]:
"""Fetch auctions from NameJet AJAX API."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
# NameJet uses POST with form data
form_data = {
"page": page,
"rows": limit,
"sidx": sort_by,
"sord": "asc",
}
response = await client.post(
f"{self.BASE_URL}{self.AJAX_ENDPOINT}",
data=form_data,
headers={
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Referer": "https://www.namejet.com/PreRelease/Auctions",
"X-Requested-With": "XMLHttpRequest",
},
)
if response.status_code != 200:
logger.error(f"NameJet API error: {response.status_code}")
return {"items": [], "total": 0, "error": response.text}
# Try JSON first, fall back to HTML parsing
try:
data = response.json()
except:
return await self._parse_html_response(response.text)
# NameJet returns 'rows' array with auction data
rows = data.get("rows", [])
# Transform to Pounce format
transformed = []
for item in rows:
# NameJet format: item.cell contains [domain, endTime, price, bids, ...]
cell = item.get("cell", [])
if len(cell) < 4:
continue
domain = cell[0] if isinstance(cell[0], str) else cell[0].get("domain", "")
tld_part = domain.rsplit(".", 1)[-1] if "." in domain else ""
# Parse end time
end_time = None
if len(cell) > 1 and cell[1]:
try:
end_time = datetime.strptime(cell[1], "%m/%d/%Y %H:%M:%S")
except:
try:
end_time = datetime.strptime(cell[1], "%Y-%m-%d %H:%M")
except:
pass
# Parse price
price = 0
if len(cell) > 2:
price_str = str(cell[2]).replace("$", "").replace(",", "")
try:
price = float(price_str)
except:
pass
# Parse bids
bids = 0
if len(cell) > 3:
try:
bids = int(cell[3])
except:
pass
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "NameJet",
"current_bid": price,
"min_bid": 0,
"num_bids": bids,
"end_time": end_time or datetime.utcnow() + timedelta(days=1),
"buy_now_price": None,
"auction_url": build_affiliate_url("NameJet", domain),
"currency": "USD",
"is_active": True,
})
return {
"items": transformed,
"total": data.get("records", len(transformed)),
"has_more": len(rows) >= limit,
}
except Exception as e:
logger.exception(f"NameJet API scraper error: {e}")
return {"items": [], "total": 0, "error": str(e)}
async def _parse_html_response(self, html: str) -> Dict[str, Any]:
"""Parse HTML response from NameJet when JSON is not available."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
rows = soup.select("tr[data-domain], .auction-row")
transformed = []
for row in rows:
domain_el = row.select_one("td:first-child, .domain")
if not domain_el:
continue
domain = domain_el.get_text(strip=True)
tld_part = domain.rsplit(".", 1)[-1] if "." in domain else ""
transformed.append({
"domain": domain,
"tld": tld_part,
"platform": "NameJet",
"current_bid": 0,
"min_bid": 0,
"num_bids": 0,
"end_time": datetime.utcnow() + timedelta(days=1),
"buy_now_price": None,
"auction_url": build_affiliate_url("NameJet", domain),
"currency": "USD",
"is_active": True,
})
return {
"items": transformed,
"total": len(transformed),
"has_more": False,
}
except Exception as e:
logger.exception(f"NameJet HTML parsing error: {e}")
return {"items": [], "total": 0, "error": str(e)}
# ═══════════════════════════════════════════════════════════════════════════════
# UNIFIED SCRAPER — Combines all hidden API scrapers
# ═══════════════════════════════════════════════════════════════════════════════
class HiddenApiScraperService:
"""
Unified service that combines all hidden API scrapers.
Priority order:
1. GoDaddy JSON API (most reliable, 150 auctions/request)
2. Dynadot REST API (100 auctions/request)
3. NameJet AJAX (requires parsing)
4. Park.io (backorders)
5. Namecheap GraphQL (requires query hash - may fail)
6. Sav.com AJAX (HTML fallback)
All URLs include affiliate tracking for monetization.
"""
def __init__(self):
self.namecheap = NamecheapApiScraper()
self.dynadot = DynadotApiScraper()
self.sav = SavApiScraper()
self.godaddy = GoDaddyApiScraper()
self.godaddy_rss = GoDaddyRssScraper() # RSS fallback (NO Cloudflare!)
self.parkio = ParkIoApiScraper()
self.namejet = NameJetApiScraper()
async def scrape_all(self, limit_per_platform: int = 100) -> Dict[str, Any]:
"""
Scrape all platforms using hidden APIs.
Returns combined results with platform breakdown.
"""
results = {
"total_found": 0,
"platforms": {},
"errors": [],
"items": [],
}
# ═══════════════════════════════════════════════════════════
# TIER 0: RSS Feeds (Most Reliable - NO Cloudflare!)
# ═══════════════════════════════════════════════════════════
# Scrape GoDaddy RSS (Always works!)
try:
rss_data = await self.godaddy_rss.fetch_all_feeds()
rss_count = len(rss_data.get("items", []))
if rss_count > 0:
results["platforms"]["GoDaddy-RSS"] = {
"found": rss_count,
"total": rss_count,
}
results["items"].extend(rss_data.get("items", []))
results["total_found"] += rss_count
logger.info(f"✅ GoDaddy RSS: {rss_count} auctions")
except Exception as e:
results["errors"].append(f"GoDaddy-RSS: {str(e)}")
# ═══════════════════════════════════════════════════════════
# TIER 1: Most Reliable JSON APIs
# ═══════════════════════════════════════════════════════════
# Scrape GoDaddy JSON API (may have Cloudflare issues)
try:
godaddy_data = await self.godaddy.fetch_auctions(limit=limit_per_platform)
godaddy_count = len(godaddy_data.get("items", []))
if godaddy_count > 0:
results["platforms"]["GoDaddy-API"] = {
"found": godaddy_count,
"total": godaddy_data.get("total", 0),
}
results["items"].extend(godaddy_data.get("items", []))
results["total_found"] += godaddy_count
if godaddy_data.get("error"):
results["errors"].append(f"GoDaddy-API: {godaddy_data['error'][:100]}")
except Exception as e:
results["errors"].append(f"GoDaddy-API: {str(e)[:100]}")
# Scrape Dynadot
try:
dynadot_data = await self.dynadot.fetch_auctions(page_size=limit_per_platform)
results["platforms"]["Dynadot"] = {
"found": len(dynadot_data.get("items", [])),
"total": dynadot_data.get("total", 0),
}
results["items"].extend(dynadot_data.get("items", []))
results["total_found"] += len(dynadot_data.get("items", []))
if dynadot_data.get("error"):
results["errors"].append(f"Dynadot: {dynadot_data['error']}")
except Exception as e:
results["errors"].append(f"Dynadot: {str(e)}")
# ═══════════════════════════════════════════════════════════
# TIER 2: AJAX/HTML Scrapers
# ═══════════════════════════════════════════════════════════
# Scrape NameJet (NEW)
try:
namejet_data = await self.namejet.fetch_auctions(limit=limit_per_platform)
results["platforms"]["NameJet"] = {
"found": len(namejet_data.get("items", [])),
"total": namejet_data.get("total", 0),
}
results["items"].extend(namejet_data.get("items", []))
results["total_found"] += len(namejet_data.get("items", []))
if namejet_data.get("error"):
results["errors"].append(f"NameJet: {namejet_data['error']}")
except Exception as e:
results["errors"].append(f"NameJet: {str(e)}")
# Scrape Park.io (Backorders - NEW)
try:
parkio_data = await self.parkio.fetch_pending_drops(limit=limit_per_platform)
results["platforms"]["Park.io"] = {
"found": len(parkio_data.get("items", [])),
"total": parkio_data.get("total", 0),
}
results["items"].extend(parkio_data.get("items", []))
results["total_found"] += len(parkio_data.get("items", []))
if parkio_data.get("error"):
results["errors"].append(f"Park.io: {parkio_data['error']}")
except Exception as e:
results["errors"].append(f"Park.io: {str(e)}")
# Scrape Sav.com
try:
sav_data = await self.sav.fetch_auctions(page=0)
results["platforms"]["Sav"] = {
"found": len(sav_data.get("items", [])),
"total": sav_data.get("total", 0),
}
results["items"].extend(sav_data.get("items", []))
results["total_found"] += len(sav_data.get("items", []))
if sav_data.get("error"):
results["errors"].append(f"Sav: {sav_data['error']}")
except Exception as e:
results["errors"].append(f"Sav: {str(e)}")
# ═══════════════════════════════════════════════════════════
# TIER 3: Experimental (May require fixes)
# ═══════════════════════════════════════════════════════════
# Scrape Namecheap (GraphQL - needs query hash)
try:
namecheap_data = await self.namecheap.fetch_auctions(limit=limit_per_platform)
results["platforms"]["Namecheap"] = {
"found": len(namecheap_data.get("items", [])),
"total": namecheap_data.get("total", 0),
}
results["items"].extend(namecheap_data.get("items", []))
results["total_found"] += len(namecheap_data.get("items", []))
if namecheap_data.get("error"):
results["errors"].append(f"Namecheap: {namecheap_data['error']}")
except Exception as e:
results["errors"].append(f"Namecheap: {str(e)}")
return results
# Export instances
namecheap_scraper = NamecheapApiScraper()
dynadot_scraper = DynadotApiScraper()
sav_scraper = SavApiScraper()
godaddy_scraper = GoDaddyApiScraper()
godaddy_rss_scraper = GoDaddyRssScraper() # RSS fallback (always works!)
parkio_scraper = ParkIoApiScraper()
namejet_scraper = NameJetApiScraper()
hidden_api_scraper = HiddenApiScraperService()