feat: DropCatch & Sedo API Clients + MARKET_CONCEPT v2

- DropCatch API Client mit OAuth2 Authentifizierung
- Sedo API Client (bereit für Credentials)
- Tier 1 APIs → Tier 2 Scraping Fallback-Logik
- Admin Endpoints: /test-apis, /trigger-scrape, /scrape-status
- MARKET_CONCEPT.md komplett überarbeitet:
  - Realistische Bestandsaufnahme
  - 3-Säulen-Konzept (Auktionen, Pounce Direct, Drops)
  - API-Realität dokumentiert (DropCatch = nur eigene Aktivitäten)
  - Roadmap und nächste Schritte
This commit is contained in:
yves.gugger
2025-12-11 09:36:32 +01:00
parent 6a6e2460d5
commit e9f06d1cbf
6 changed files with 1425 additions and 1315 deletions

File diff suppressed because it is too large Load Diff

View File

@ -981,3 +981,126 @@ async def get_activity_log(
],
"total": total,
}
# ============== API Connection Tests ==============
@router.get("/test-apis")
async def test_external_apis(
admin: User = Depends(require_admin),
):
"""
Test connections to all external APIs.
Returns status of:
- DropCatch API
- Sedo API
- Moz API (if configured)
"""
from app.services.dropcatch_api import dropcatch_client
from app.services.sedo_api import sedo_client
results = {
"tested_at": datetime.utcnow().isoformat(),
"apis": {}
}
# Test DropCatch API
try:
dropcatch_result = await dropcatch_client.test_connection()
results["apis"]["dropcatch"] = dropcatch_result
except Exception as e:
results["apis"]["dropcatch"] = {
"success": False,
"error": str(e),
"configured": dropcatch_client.is_configured
}
# Test Sedo API
try:
sedo_result = await sedo_client.test_connection()
results["apis"]["sedo"] = sedo_result
except Exception as e:
results["apis"]["sedo"] = {
"success": False,
"error": str(e),
"configured": sedo_client.is_configured
}
# Summary
results["summary"] = {
"total": len(results["apis"]),
"configured": sum(1 for api in results["apis"].values() if api.get("configured")),
"connected": sum(1 for api in results["apis"].values() if api.get("success")),
}
return results
@router.post("/trigger-scrape")
async def trigger_auction_scrape(
background_tasks: BackgroundTasks,
db: Database,
admin: User = Depends(require_admin),
):
"""
Manually trigger auction scraping from all sources.
This will:
1. Try Tier 1 APIs (DropCatch, Sedo) first
2. Fall back to web scraping for others
"""
from app.services.auction_scraper import AuctionScraperService
scraper = AuctionScraperService()
# Run scraping in background
async def run_scrape():
async with db.begin():
return await scraper.scrape_all_platforms(db)
background_tasks.add_task(run_scrape)
return {
"message": "Auction scraping started in background",
"note": "Check /admin/scrape-status for results"
}
@router.get("/scrape-status")
async def get_scrape_status(
db: Database,
admin: User = Depends(require_admin),
limit: int = 10,
):
"""Get recent scrape logs."""
from app.models.auction import AuctionScrapeLog
query = (
select(AuctionScrapeLog)
.order_by(desc(AuctionScrapeLog.started_at))
.limit(limit)
)
try:
result = await db.execute(query)
logs = result.scalars().all()
except Exception:
return {"logs": [], "error": "Table not found"}
return {
"logs": [
{
"id": log.id,
"platform": log.platform,
"status": log.status,
"auctions_found": log.auctions_found,
"auctions_new": log.auctions_new,
"auctions_updated": log.auctions_updated,
"error_message": log.error_message,
"started_at": log.started_at.isoformat() if log.started_at else None,
"completed_at": log.completed_at.isoformat() if log.completed_at else None,
}
for log in logs
]
}

View File

@ -33,6 +33,27 @@ class Settings(BaseSettings):
check_minute: int = 0
scheduler_check_interval_hours: int = 24
# =================================
# External API Credentials
# =================================
# DropCatch API (Official Partner API)
# Docs: https://www.dropcatch.com/hiw/dropcatch-api
dropcatch_client_id: str = ""
dropcatch_client_secret: str = ""
dropcatch_api_base: str = "https://api.dropcatch.com"
# Sedo API (Partner API - XML-RPC)
# Docs: https://api.sedo.com/apidocs/v1/
# Find your credentials: Sedo.com → Mein Sedo → API-Zugang
sedo_partner_id: str = ""
sedo_sign_key: str = ""
sedo_api_base: str = "https://api.sedo.com/api/v1/"
# Moz API (SEO Data)
moz_access_id: str = ""
moz_secret_key: str = ""
class Config:
env_file = ".env"
env_file_encoding = "utf-8"

View File

@ -1,15 +1,18 @@
"""
Domain Auction Scraper Service
Scrapes real auction data from various platforms WITHOUT using their APIs.
Uses web scraping to get publicly available auction information.
Data Acquisition Strategy (from MARKET_CONCEPT.md):
Supported Platforms:
TIER 1: OFFICIAL APIs (Most Reliable)
- DropCatch API (Official Partner) ← WE HAVE THIS!
TIER 2: WEB SCRAPING (Fallback)
- ExpiredDomains.net (aggregator for deleted domains)
- GoDaddy Auctions (public listings via RSS/public pages)
- Sedo (public marketplace)
- NameJet (public auctions)
- DropCatch (public auctions)
The scraper tries Tier 1 first, then falls back to Tier 2 if needed.
IMPORTANT:
- Respects robots.txt
@ -31,6 +34,8 @@ from sqlalchemy import select, and_, delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.auction import DomainAuction, AuctionScrapeLog
from app.services.dropcatch_api import dropcatch_client
from app.services.sedo_api import sedo_client
logger = logging.getLogger(__name__)
@ -102,15 +107,41 @@ class AuctionScraperService:
"errors": [],
}
# Scrape each platform
# ═══════════════════════════════════════════════════════════════
# TIER 1: Official APIs (Best data quality)
# ═══════════════════════════════════════════════════════════════
tier1_apis = [
("DropCatch", self._fetch_dropcatch_api), # We have API access!
("Sedo", self._fetch_sedo_api), # We have API access!
]
for platform_name, api_func in tier1_apis:
try:
api_result = await api_func(db)
if api_result.get("found", 0) > 0:
results["platforms"][platform_name] = api_result
results["total_found"] += api_result.get("found", 0)
results["total_new"] += api_result.get("new", 0)
results["total_updated"] += api_result.get("updated", 0)
logger.info(f"{platform_name} API: {api_result['found']} auctions")
except Exception as e:
logger.warning(f"⚠️ {platform_name} API failed, will try scraping: {e}")
# ═══════════════════════════════════════════════════════════════
# TIER 2: Web Scraping (Fallback for platforms without API access)
# ═══════════════════════════════════════════════════════════════
scrapers = [
("ExpiredDomains", self._scrape_expireddomains),
("GoDaddy", self._scrape_godaddy_public),
("Sedo", self._scrape_sedo_public),
("NameJet", self._scrape_namejet_public),
("DropCatch", self._scrape_dropcatch_public),
]
# Add fallbacks only if APIs failed
if "DropCatch" not in results["platforms"]:
scrapers.append(("DropCatch", self._scrape_dropcatch_public))
if "Sedo" not in results["platforms"]:
scrapers.append(("Sedo", self._scrape_sedo_public))
for platform_name, scraper_func in scrapers:
try:
platform_result = await scraper_func(db)
@ -561,13 +592,206 @@ class AuctionScraperService:
return result
async def _scrape_dropcatch_public(self, db: AsyncSession) -> Dict[str, Any]:
async def _fetch_dropcatch_api(self, db: AsyncSession) -> Dict[str, Any]:
"""
Scrape DropCatch public auction listings.
DropCatch shows pending delete auctions publicly.
🚀 TIER 1: Fetch DropCatch auctions via OFFICIAL API
This is our preferred method - faster, more reliable, more data.
Uses the official DropCatch Partner API.
"""
platform = "DropCatch"
result = {"found": 0, "new": 0, "updated": 0}
result = {"found": 0, "new": 0, "updated": 0, "source": "api"}
if not dropcatch_client.is_configured:
logger.info("DropCatch API not configured, skipping")
return result
log = AuctionScrapeLog(platform=platform)
db.add(log)
await db.commit()
try:
# Fetch auctions from official API
api_result = await dropcatch_client.search_auctions(page_size=100)
auctions = api_result.get("auctions") or api_result.get("items") or []
result["found"] = len(auctions)
for dc_auction in auctions:
try:
# Transform to our format
auction_data = dropcatch_client.transform_to_pounce_format(dc_auction)
if not auction_data["domain"]:
continue
# Check if exists
existing = await db.execute(
select(DomainAuction).where(
and_(
DomainAuction.domain == auction_data["domain"],
DomainAuction.platform == platform
)
)
)
existing_auction = existing.scalar_one_or_none()
if existing_auction:
# Update existing
existing_auction.current_bid = auction_data["current_bid"]
existing_auction.num_bids = auction_data["num_bids"]
existing_auction.end_time = auction_data["end_time"]
existing_auction.is_active = True
existing_auction.updated_at = datetime.utcnow()
result["updated"] += 1
else:
# Create new
new_auction = DomainAuction(
domain=auction_data["domain"],
tld=auction_data["tld"],
platform=platform,
current_bid=auction_data["current_bid"],
currency=auction_data["currency"],
num_bids=auction_data["num_bids"],
end_time=auction_data["end_time"],
auction_url=auction_data["auction_url"],
age_years=auction_data.get("age_years"),
buy_now_price=auction_data.get("buy_now_price"),
reserve_met=auction_data.get("reserve_met"),
traffic=auction_data.get("traffic"),
is_active=True,
)
db.add(new_auction)
result["new"] += 1
except Exception as e:
logger.warning(f"Error processing DropCatch auction: {e}")
continue
await db.commit()
log.status = "success"
log.auctions_found = result["found"]
log.auctions_new = result["new"]
log.auctions_updated = result["updated"]
log.completed_at = datetime.utcnow()
await db.commit()
logger.info(f"DropCatch API: Found {result['found']}, New {result['new']}, Updated {result['updated']}")
return result
except Exception as e:
logger.error(f"DropCatch API error: {e}")
log.status = "failed"
log.error_message = str(e)[:500]
log.completed_at = datetime.utcnow()
await db.commit()
return result
async def _fetch_sedo_api(self, db: AsyncSession) -> Dict[str, Any]:
"""
🚀 TIER 1: Fetch Sedo auctions via OFFICIAL API
This is our preferred method for Sedo data.
Uses the official Sedo Partner API.
"""
platform = "Sedo"
result = {"found": 0, "new": 0, "updated": 0, "source": "api"}
if not sedo_client.is_configured:
logger.info("Sedo API not configured, skipping")
return result
log = AuctionScrapeLog(platform=platform)
db.add(log)
await db.commit()
try:
# Fetch auctions from official API
api_result = await sedo_client.search_auctions(page_size=100)
# Sedo response structure may vary
listings = api_result.get("domains") or api_result.get("items") or api_result.get("result") or []
if isinstance(listings, dict):
listings = list(listings.values()) if listings else []
result["found"] = len(listings)
for sedo_listing in listings:
try:
# Transform to our format
auction_data = sedo_client.transform_to_pounce_format(sedo_listing)
if not auction_data["domain"]:
continue
# Check if exists
existing = await db.execute(
select(DomainAuction).where(
and_(
DomainAuction.domain == auction_data["domain"],
DomainAuction.platform == platform
)
)
)
existing_auction = existing.scalar_one_or_none()
if existing_auction:
# Update existing
existing_auction.current_bid = auction_data["current_bid"]
existing_auction.num_bids = auction_data["num_bids"]
existing_auction.end_time = auction_data["end_time"]
existing_auction.is_active = True
existing_auction.updated_at = datetime.utcnow()
result["updated"] += 1
else:
# Create new
new_auction = DomainAuction(
domain=auction_data["domain"],
tld=auction_data["tld"],
platform=platform,
current_bid=auction_data["current_bid"],
currency=auction_data["currency"],
num_bids=auction_data["num_bids"],
end_time=auction_data["end_time"],
auction_url=auction_data["auction_url"],
buy_now_price=auction_data.get("buy_now_price"),
is_active=True,
)
db.add(new_auction)
result["new"] += 1
except Exception as e:
logger.warning(f"Error processing Sedo listing: {e}")
continue
await db.commit()
log.status = "success"
log.auctions_found = result["found"]
log.auctions_new = result["new"]
log.auctions_updated = result["updated"]
log.completed_at = datetime.utcnow()
await db.commit()
logger.info(f"Sedo API: Found {result['found']}, New {result['new']}, Updated {result['updated']}")
return result
except Exception as e:
logger.error(f"Sedo API error: {e}")
log.status = "failed"
log.error_message = str(e)[:500]
log.completed_at = datetime.utcnow()
await db.commit()
return result
async def _scrape_dropcatch_public(self, db: AsyncSession) -> Dict[str, Any]:
"""
📦 TIER 2 FALLBACK: Scrape DropCatch public auction listings.
Only used if the API is not configured or fails.
"""
platform = "DropCatch"
result = {"found": 0, "new": 0, "updated": 0, "source": "scrape"}
log = AuctionScrapeLog(platform=platform)
db.add(log)

View File

@ -0,0 +1,334 @@
"""
DropCatch Official API Client
This service provides access to DropCatch's official API for:
- Searching domain auctions
- Getting auction details
- Backorder management
API Documentation: https://www.dropcatch.com/hiw/dropcatch-api
Interactive Docs: https://api.dropcatch.com/swagger
SECURITY:
- Credentials are loaded from environment variables
- NEVER hardcode credentials in this file
Usage:
from app.services.dropcatch_api import dropcatch_client
# Get active auctions
auctions = await dropcatch_client.search_auctions(keyword="tech")
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
import httpx
from functools import lru_cache
from app.config import get_settings
logger = logging.getLogger(__name__)
class DropCatchAPIClient:
"""
Official DropCatch API Client.
This uses the V2 API endpoints (V1 is deprecated).
Authentication is via OAuth2 client credentials.
"""
def __init__(self):
self.settings = get_settings()
self.base_url = self.settings.dropcatch_api_base or "https://api.dropcatch.com"
self.client_id = self.settings.dropcatch_client_id
self.client_secret = self.settings.dropcatch_client_secret
# Token cache
self._access_token: Optional[str] = None
self._token_expires_at: Optional[datetime] = None
# HTTP client
self._client: Optional[httpx.AsyncClient] = None
@property
def is_configured(self) -> bool:
"""Check if API credentials are configured."""
return bool(self.client_id and self.client_secret)
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=30.0,
headers={
"Content-Type": "application/json",
"User-Agent": "Pounce/1.0 (Domain Intelligence Platform)"
}
)
return self._client
async def close(self):
"""Close the HTTP client."""
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
async def _authenticate(self) -> str:
"""
Authenticate with DropCatch API and get access token.
POST https://api.dropcatch.com/authorize
Body: { "clientId": "...", "clientSecret": "..." }
Returns: Access token string
"""
if not self.is_configured:
raise ValueError("DropCatch API credentials not configured")
# Check if we have a valid cached token
if self._access_token and self._token_expires_at:
if datetime.utcnow() < self._token_expires_at - timedelta(minutes=5):
return self._access_token
client = await self._get_client()
try:
response = await client.post(
f"{self.base_url}/authorize",
json={
"clientId": self.client_id,
"clientSecret": self.client_secret
}
)
if response.status_code != 200:
logger.error(f"DropCatch auth failed: {response.status_code} - {response.text}")
raise Exception(f"Authentication failed: {response.status_code}")
data = response.json()
# Extract token - the response format may vary
# Common formats: { "token": "...", "expiresIn": 3600 }
# or: { "accessToken": "...", "expiresIn": 3600 }
self._access_token = data.get("token") or data.get("accessToken") or data.get("access_token")
# Calculate expiry (default 1 hour if not specified)
expires_in = data.get("expiresIn") or data.get("expires_in") or 3600
self._token_expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
logger.info("DropCatch API: Successfully authenticated")
return self._access_token
except httpx.HTTPError as e:
logger.error(f"DropCatch auth HTTP error: {e}")
raise
async def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
json_data: Optional[Dict] = None
) -> Dict[str, Any]:
"""Make an authenticated API request."""
token = await self._authenticate()
client = await self._get_client()
headers = {
"Authorization": f"Bearer {token}"
}
url = f"{self.base_url}{endpoint}"
try:
response = await client.request(
method=method,
url=url,
params=params,
json=json_data,
headers=headers
)
if response.status_code == 401:
# Token expired, re-authenticate
self._access_token = None
token = await self._authenticate()
headers["Authorization"] = f"Bearer {token}"
response = await client.request(
method=method,
url=url,
params=params,
json=json_data,
headers=headers
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error(f"DropCatch API request failed: {e}")
raise
# =========================================================================
# AUCTION ENDPOINTS (V2)
# =========================================================================
async def search_auctions(
self,
keyword: Optional[str] = None,
tld: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
ending_within_hours: Optional[int] = None,
page_size: int = 100,
page_token: Optional[str] = None,
) -> Dict[str, Any]:
"""
Search for domain auctions.
Endpoint: GET /v2/auctions (or similar - check interactive docs)
Returns:
{
"auctions": [...],
"cursor": {
"next": "...",
"previous": "..."
}
}
"""
params = {
"pageSize": page_size,
}
if keyword:
params["searchTerm"] = keyword
if tld:
params["tld"] = tld.lstrip(".")
if min_price is not None:
params["minPrice"] = min_price
if max_price is not None:
params["maxPrice"] = max_price
if ending_within_hours:
params["endingWithinHours"] = ending_within_hours
if page_token:
params["pageToken"] = page_token
return await self._request("GET", "/v2/auctions", params=params)
async def get_auction(self, auction_id: int) -> Dict[str, Any]:
"""Get details for a specific auction."""
return await self._request("GET", f"/v2/auctions/{auction_id}")
async def get_ending_soon(
self,
hours: int = 24,
page_size: int = 50
) -> Dict[str, Any]:
"""Get auctions ending soon."""
return await self.search_auctions(
ending_within_hours=hours,
page_size=page_size
)
async def get_hot_auctions(self, page_size: int = 50) -> Dict[str, Any]:
"""
Get hot/popular auctions (high bid activity).
Note: The actual endpoint may vary - check interactive docs.
"""
# This might be a different endpoint or sort parameter
params = {
"pageSize": page_size,
"sortBy": "bidCount", # or "popularity" - check docs
"sortOrder": "desc"
}
return await self._request("GET", "/v2/auctions", params=params)
# =========================================================================
# BACKORDER ENDPOINTS (V2)
# =========================================================================
async def search_backorders(
self,
keyword: Optional[str] = None,
page_size: int = 100,
page_token: Optional[str] = None,
) -> Dict[str, Any]:
"""Search for available backorders (domains dropping soon)."""
params = {"pageSize": page_size}
if keyword:
params["searchTerm"] = keyword
if page_token:
params["pageToken"] = page_token
return await self._request("GET", "/v2/backorders", params=params)
# =========================================================================
# UTILITY METHODS
# =========================================================================
async def test_connection(self) -> Dict[str, Any]:
"""Test the API connection and credentials."""
if not self.is_configured:
return {
"success": False,
"error": "API credentials not configured",
"configured": False
}
try:
await self._authenticate()
return {
"success": True,
"configured": True,
"client_id": self.client_id.split(":")[0] if ":" in self.client_id else self.client_id,
"authenticated_at": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"success": False,
"error": str(e),
"configured": True
}
def transform_to_pounce_format(self, dc_auction: Dict) -> Dict[str, Any]:
"""
Transform DropCatch auction to Pounce internal format.
Maps DropCatch fields to our DomainAuction model.
"""
domain = dc_auction.get("domainName") or dc_auction.get("domain", "")
tld = domain.rsplit(".", 1)[1] if "." in domain else ""
# Parse end time (format may vary)
end_time_str = dc_auction.get("auctionEndTime") or dc_auction.get("endTime")
if end_time_str:
try:
end_time = datetime.fromisoformat(end_time_str.replace("Z", "+00:00"))
except:
end_time = datetime.utcnow() + timedelta(days=1)
else:
end_time = datetime.utcnow() + timedelta(days=1)
return {
"domain": domain,
"tld": tld,
"platform": "DropCatch",
"current_bid": dc_auction.get("currentBid") or dc_auction.get("price", 0),
"currency": "USD",
"num_bids": dc_auction.get("bidCount") or dc_auction.get("numberOfBids", 0),
"end_time": end_time,
"auction_url": f"https://www.dropcatch.com/domain/{domain}",
"age_years": dc_auction.get("yearsOld") or dc_auction.get("age"),
"buy_now_price": dc_auction.get("buyNowPrice"),
"reserve_met": dc_auction.get("reserveMet"),
"traffic": dc_auction.get("traffic"),
"external_id": str(dc_auction.get("auctionId") or dc_auction.get("id", "")),
}
# Singleton instance
dropcatch_client = DropCatchAPIClient()

View File

@ -0,0 +1,314 @@
"""
Sedo Official API Client
This service provides access to Sedo's official API for:
- Domain search and auctions
- Marketplace listings
- Domain pricing
API Documentation: https://api.sedo.com/apidocs/v1/
Type: XML-RPC based API
SECURITY:
- Credentials are loaded from environment variables
- NEVER hardcode credentials in this file
WHERE TO FIND YOUR CREDENTIALS:
1. Login to https://sedo.com
2. Go to "Mein Sedo" / "My Sedo"
3. Navigate to "API-Zugang" / "API Access"
4. You'll find:
- Partner ID (your user ID)
- SignKey (signature key for authentication)
Usage:
from app.services.sedo_api import sedo_client
# Search domains for sale
listings = await sedo_client.search_domains(keyword="tech")
"""
import logging
import hashlib
import time
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
import httpx
from xml.etree import ElementTree
from app.config import get_settings
logger = logging.getLogger(__name__)
class SedoAPIClient:
"""
Official Sedo API Client.
Sedo uses an XML-RPC style API with signature-based authentication.
Each request must include:
- partnerid: Your partner ID
- signkey: Your signature key (or hashed signature)
"""
def __init__(self):
self.settings = get_settings()
self.base_url = self.settings.sedo_api_base or "https://api.sedo.com/api/v1/"
self.partner_id = self.settings.sedo_partner_id
self.sign_key = self.settings.sedo_sign_key
# HTTP client
self._client: Optional[httpx.AsyncClient] = None
@property
def is_configured(self) -> bool:
"""Check if API credentials are configured."""
return bool(self.partner_id and self.sign_key)
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=30.0,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "Pounce/1.0 (Domain Intelligence Platform)"
}
)
return self._client
async def close(self):
"""Close the HTTP client."""
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
def _generate_signature(self, params: Dict[str, Any]) -> str:
"""
Generate request signature for Sedo API.
The signature is typically: MD5(signkey + sorted_params)
Check Sedo docs for exact implementation.
"""
# Simple implementation - may need adjustment based on actual Sedo requirements
sorted_params = "&".join(f"{k}={v}" for k, v in sorted(params.items()))
signature_base = f"{self.sign_key}{sorted_params}"
return hashlib.md5(signature_base.encode()).hexdigest()
async def _request(
self,
endpoint: str,
params: Optional[Dict] = None
) -> Dict[str, Any]:
"""Make an authenticated API request."""
if not self.is_configured:
raise ValueError("Sedo API credentials not configured")
client = await self._get_client()
# Base params for all requests
request_params = {
"partnerid": self.partner_id,
"signkey": self.sign_key,
**(params or {})
}
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
try:
response = await client.get(url, params=request_params)
response.raise_for_status()
# Sedo API can return XML or JSON depending on endpoint
content_type = response.headers.get("content-type", "")
if "xml" in content_type:
return self._parse_xml_response(response.text)
elif "json" in content_type:
return response.json()
else:
# Try JSON first, fallback to XML
try:
return response.json()
except:
return self._parse_xml_response(response.text)
except httpx.HTTPError as e:
logger.error(f"Sedo API request failed: {e}")
raise
def _parse_xml_response(self, xml_text: str) -> Dict[str, Any]:
"""Parse XML response from Sedo API."""
try:
root = ElementTree.fromstring(xml_text)
return self._xml_to_dict(root)
except Exception as e:
logger.warning(f"Failed to parse XML: {e}")
return {"raw": xml_text}
def _xml_to_dict(self, element) -> Dict[str, Any]:
"""Convert XML element to dictionary."""
result = {}
for child in element:
if len(child) > 0:
result[child.tag] = self._xml_to_dict(child)
else:
result[child.tag] = child.text
return result
# =========================================================================
# DOMAIN SEARCH ENDPOINTS
# =========================================================================
async def search_domains(
self,
keyword: Optional[str] = None,
tld: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
page: int = 1,
page_size: int = 100,
) -> Dict[str, Any]:
"""
Search for domains listed on Sedo marketplace.
Returns domains for sale (not auctions).
"""
params = {
"output_method": "json", # Request JSON response
}
if keyword:
params["keyword"] = keyword
if tld:
params["tld"] = tld.lstrip(".")
if min_price is not None:
params["minprice"] = min_price
if max_price is not None:
params["maxprice"] = max_price
if page:
params["page"] = page
if page_size:
params["pagesize"] = min(page_size, 100)
return await self._request("DomainSearch", params)
async def search_auctions(
self,
keyword: Optional[str] = None,
tld: Optional[str] = None,
ending_within_hours: Optional[int] = None,
page: int = 1,
page_size: int = 100,
) -> Dict[str, Any]:
"""
Search for active domain auctions on Sedo.
"""
params = {
"output_method": "json",
"auction": "true", # Only auctions
}
if keyword:
params["keyword"] = keyword
if tld:
params["tld"] = tld.lstrip(".")
if page:
params["page"] = page
if page_size:
params["pagesize"] = min(page_size, 100)
return await self._request("DomainSearch", params)
async def get_domain_details(self, domain: str) -> Dict[str, Any]:
"""Get detailed information about a specific domain."""
params = {
"domain": domain,
"output_method": "json",
}
return await self._request("DomainDetails", params)
async def get_ending_soon_auctions(
self,
hours: int = 24,
page_size: int = 50
) -> Dict[str, Any]:
"""Get auctions ending soon."""
return await self.search_auctions(
ending_within_hours=hours,
page_size=page_size
)
# =========================================================================
# UTILITY METHODS
# =========================================================================
async def test_connection(self) -> Dict[str, Any]:
"""Test the API connection and credentials."""
if not self.is_configured:
return {
"success": False,
"error": "API credentials not configured",
"configured": False,
"hint": "Find your credentials at: Sedo.com → Mein Sedo → API-Zugang"
}
try:
# Try a simple search to test connection
result = await self.search_domains(keyword="test", page_size=1)
return {
"success": True,
"configured": True,
"partner_id": self.partner_id,
"authenticated_at": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"success": False,
"error": str(e),
"configured": True
}
def transform_to_pounce_format(self, sedo_listing: Dict) -> Dict[str, Any]:
"""
Transform Sedo listing to Pounce internal format.
Maps Sedo fields to our DomainAuction model.
"""
domain = sedo_listing.get("domain") or sedo_listing.get("domainname", "")
tld = domain.rsplit(".", 1)[1] if "." in domain else ""
# Parse end time if auction
end_time_str = sedo_listing.get("auctionend") or sedo_listing.get("enddate")
if end_time_str:
try:
end_time = datetime.fromisoformat(end_time_str.replace("Z", "+00:00"))
except:
end_time = datetime.utcnow() + timedelta(days=7)
else:
end_time = datetime.utcnow() + timedelta(days=7)
# Price handling
price = sedo_listing.get("price") or sedo_listing.get("currentbid") or 0
if isinstance(price, str):
price = float(price.replace(",", "").replace("$", "").replace("", ""))
return {
"domain": domain,
"tld": tld,
"platform": "Sedo",
"current_bid": price,
"buy_now_price": sedo_listing.get("buynow") or sedo_listing.get("bin"),
"currency": sedo_listing.get("currency", "EUR"),
"num_bids": sedo_listing.get("numbids") or sedo_listing.get("bidcount", 0),
"end_time": end_time,
"auction_url": f"https://sedo.com/search/details/?domain={domain}",
"age_years": None,
"reserve_met": sedo_listing.get("reservemet"),
"traffic": sedo_listing.get("traffic"),
"is_auction": sedo_listing.get("isaution") == "1" or sedo_listing.get("auction") == True,
}
# Singleton instance
sedo_client = SedoAPIClient()