""" Domain Auction Scraper Service (Pounce) Hard rules (project requirement): - No mock/demo data. - No estimated / placeholder auction prices. - Store auctions only when we have real `current_bid` and a real `end_time` (or a provider-provided time-left that can be converted deterministically). Current data sources (works without scraping Cloudflare-protected providers): - Dynadot: hidden JSON API (via `hidden_api_scraper`) - ExpiredDomains provider auction pages (GoDaddy / Namecheap / Sedo): include Price, Bids, Endtime - Park.io: public auctions table includes Price, Bids, Close Date - Sav: auctions table endpoint includes Price, Bids, Time left Optional sources: - DropCatch Partner API (if configured) - Sedo Partner API (if configured) - Playwright (opt-in) for Cloudflare-protected providers like NameJet """ import asyncio import logging import os import re from datetime import datetime, timedelta from typing import Any, Dict, List, Optional import httpx from bs4 import BeautifulSoup from sqlalchemy import and_, delete, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from app.models.auction import AuctionScrapeLog, DomainAuction from app.services.dropcatch_api import dropcatch_client from app.services.hidden_api_scrapers import build_affiliate_url, hidden_api_scraper from app.services.sedo_api import sedo_client try: from app.services.playwright_scraper import playwright_scraper PLAYWRIGHT_AVAILABLE = True except ImportError: playwright_scraper = None PLAYWRIGHT_AVAILABLE = False logger = logging.getLogger(__name__) # Rate limiting: requests per minute per platform RATE_LIMITS: Dict[str, int] = { "ExpiredDomains": 5, "Park.io": 10, "Sav": 10, "DropCatch": 10, "Sedo": 10, "NameJet": 5, } USER_AGENT = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) class AuctionScraperService: """ Orchestrates scraping across multiple sources and stores results in DB. """ def __init__(self): self.http_client: Optional[httpx.AsyncClient] = None self._last_request: Dict[str, datetime] = {} async def _get_client(self) -> httpx.AsyncClient: """Get or create HTTP client with appropriate headers (and optional proxy).""" if self.http_client is None or self.http_client.is_closed: proxy = os.getenv("SCRAPER_HTTP_PROXY") or os.getenv("SCRAPER_PROXY_URL") self.http_client = httpx.AsyncClient( timeout=30.0, follow_redirects=True, proxy=proxy, headers={ "User-Agent": USER_AGENT, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", }, ) return self.http_client async def _rate_limit(self, platform: str): """Enforce rate limiting per platform.""" min_interval = 60 / RATE_LIMITS.get(platform, 10) last = self._last_request.get(platform) if last: elapsed = (datetime.utcnow() - last).total_seconds() if elapsed < min_interval: await asyncio.sleep(min_interval - elapsed) self._last_request[platform] = datetime.utcnow() # ---------------------------- # Parsing & validation helpers # ---------------------------- def _parse_datetime(self, value: Any) -> Optional[datetime]: """Parse datetime from common API formats (ISO strings, timestamps).""" if value is None: return None if isinstance(value, datetime): return value.replace(tzinfo=None) if isinstance(value, (int, float)): try: return datetime.utcfromtimestamp(float(value)).replace(tzinfo=None) except Exception: return None if isinstance(value, str): raw = value.strip() if not raw: return None try: return datetime.fromisoformat(raw.replace("Z", "+00:00")).replace(tzinfo=None) except Exception: return None return None def _to_float(self, value: Any) -> Optional[float]: """Parse float from strings like '$1,234.56'.""" if value is None: return None if isinstance(value, (int, float)): return float(value) if isinstance(value, str): cleaned = value.strip().replace(",", "") cleaned = cleaned.replace("$", "").replace("€", "").replace("£", "") if not cleaned: return None try: return float(cleaned) except Exception: return None return None def _parse_price_currency(self, text: str) -> Optional[tuple[float, str]]: """Parse price strings like '7,100 USD' or '$530.00' into (price, currency).""" if not text: return None raw = text.strip() if not raw or raw.lower() in {"-", "n/a", "na"}: return None currency = "USD" m_amount = re.search(r"([0-9][0-9,]*(?:\.[0-9]+)?)", raw) if not m_amount: return None amount = self._to_float(m_amount.group(1)) if amount is None: return None m_cur = re.search(r"\b([A-Z]{3})\b", raw) if m_cur: currency = m_cur.group(1).upper() elif "$" in raw: currency = "USD" elif "€" in raw: currency = "EUR" elif "£" in raw: currency = "GBP" return float(amount), currency def _parse_timeleft(self, text: str) -> Optional[timedelta]: """ Parse relative time strings into a timedelta. Supported examples: - ExpiredDomains: '4d 20h 39m', '6m 48s', '23h 46m' - Sav: '6D 2H' """ if not text: return None raw = text.strip().lower() if not raw or raw in {"-", "n/a", "na", "ended"}: return None matches = re.findall(r"(\d+)\s*([dhms])", raw) if not matches: return None total_seconds = 0 for amount_str, unit in matches: try: amount = int(amount_str) except Exception: return None if unit == "d": total_seconds += amount * 86400 elif unit == "h": total_seconds += amount * 3600 elif unit == "m": total_seconds += amount * 60 elif unit == "s": total_seconds += amount if total_seconds <= 0: return None return timedelta(seconds=total_seconds) def _sanitize_auction_payload(self, auction_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Ensure we only store real, complete auctions. Rules (strict): - domain/platform/auction_url must be present - current_bid must be > 0 - end_time must be parseable - drop unknown keys (prevents accidental schema drift) """ if not isinstance(auction_data, dict): return None domain = str(auction_data.get("domain", "")).strip().lower() platform = str(auction_data.get("platform", "")).strip() auction_url = str(auction_data.get("auction_url", "")).strip() if not domain or "." not in domain: return None if not platform: return None if not auction_url: return None tld = auction_data.get("tld") or domain.rsplit(".", 1)[-1] tld = str(tld).strip().lower().lstrip(".") if not tld: return None current_bid = self._to_float(auction_data.get("current_bid")) if current_bid is None or current_bid <= 0: return None end_time = self._parse_datetime(auction_data.get("end_time")) if end_time is None: return None try: num_bids = int(auction_data.get("num_bids", 0) or 0) except Exception: return None if num_bids < 0: return None # Normalize optional floats min_bid = self._to_float(auction_data.get("min_bid")) buy_now_price = self._to_float(auction_data.get("buy_now_price")) reserve_price = self._to_float(auction_data.get("reserve_price")) # Normalize watchers (optional) try: num_watchers = auction_data.get("num_watchers") num_watchers = int(num_watchers) if num_watchers is not None else None except Exception: num_watchers = None allowed = {c.name for c in DomainAuction.__table__.columns} cleaned = {k: v for k, v in auction_data.items() if k in allowed} cleaned.update( { "domain": domain, "tld": tld, "platform": platform, "auction_url": auction_url, "current_bid": float(current_bid), "min_bid": float(min_bid) if min_bid is not None else None, "buy_now_price": float(buy_now_price) if buy_now_price is not None else None, "reserve_price": float(reserve_price) if reserve_price is not None else None, "num_bids": num_bids, "num_watchers": num_watchers, "end_time": end_time, "is_active": True, } ) currency = cleaned.get("currency") or "USD" cleaned["currency"] = str(currency).strip().upper() return cleaned async def _store_auction(self, db: AsyncSession, auction_data: Dict[str, Any]) -> str: """Store or update an auction in the database. Returns 'new', 'updated' or 'skipped'.""" cleaned = self._sanitize_auction_payload(auction_data) if cleaned is None: return "skipped" # AsyncSessionLocal is configured with autoflush=False. # Flush pending inserts/updates so the existence check can see them and we don't create duplicates. await db.flush() existing = await db.execute( select(DomainAuction).where( and_( DomainAuction.domain == cleaned["domain"], DomainAuction.platform == cleaned["platform"], ) ) ) existing = existing.scalar_one_or_none() if existing: for key, value in cleaned.items(): setattr(existing, key, value) existing.updated_at = datetime.utcnow() existing.is_active = True return "updated" try: # Protect against concurrent inserts (e.g. cron overlap) when a unique index exists. async with db.begin_nested(): db.add(DomainAuction(**cleaned)) await db.flush() return "new" except IntegrityError: # Another transaction inserted the same (platform, domain) in the meantime. existing = await db.execute( select(DomainAuction).where( and_( DomainAuction.domain == cleaned["domain"], DomainAuction.platform == cleaned["platform"], ) ) ) existing = existing.scalar_one_or_none() if not existing: return "skipped" for key, value in cleaned.items(): setattr(existing, key, value) existing.updated_at = datetime.utcnow() existing.is_active = True return "updated" # ---------------------------- # Source scrapers # ---------------------------- async def _scrape_expireddomains_auction_page( self, db: AsyncSession, platform: str, url: str, limit: int = 200, ) -> Dict[str, Any]: """Scrape ExpiredDomains provider-specific auction pages (real Price/Bids/Endtime).""" result = {"found": 0, "new": 0, "updated": 0} log = AuctionScrapeLog(platform=platform) db.add(log) await db.commit() try: await self._rate_limit("ExpiredDomains") client = await self._get_client() resp = await client.get(url, timeout=20.0) if resp.status_code != 200: raise Exception(f"HTTP {resp.status_code}") soup = BeautifulSoup(resp.text, "lxml") table = soup.select_one("table.base1") if not table: raise Exception("ExpiredDomains table not found") headers = [th.get_text(" ", strip=True) for th in table.select("thead th")] header_index = {h: i for i, h in enumerate(headers)} required = ["Domain", "Price", "Bids", "Endtime"] if not all(k in header_index for k in required): raise Exception(f"Missing required columns: {required} in {headers}") rows = table.select("tbody tr") now = datetime.utcnow() for row in rows[:limit]: cols = row.find_all("td") if len(cols) < len(headers): continue domain = cols[header_index["Domain"]].get_text(" ", strip=True).lower() if not domain or "." not in domain: continue tld = domain.rsplit(".", 1)[-1].lower() parsed_price = self._parse_price_currency(cols[header_index["Price"]].get_text(" ", strip=True)) if not parsed_price: continue current_bid, currency = parsed_price if current_bid <= 0: continue bids_raw = cols[header_index["Bids"]].get_text(" ", strip=True) try: num_bids = int(re.sub(r"[^0-9]", "", bids_raw) or "0") except Exception: continue end_raw = cols[header_index["Endtime"]].get_text(" ", strip=True) delta = self._parse_timeleft(end_raw) if not delta: continue end_time = now + delta domain_link = cols[header_index["Domain"]].find("a") href = domain_link.get("href") if domain_link else None if href and href.startswith("/"): href = f"https://www.expireddomains.net{href}" auction_data = { "domain": domain, "tld": tld, "platform": platform, "platform_auction_id": None, "auction_url": href or build_affiliate_url(platform, domain), "current_bid": current_bid, "currency": currency, "num_bids": num_bids, "end_time": end_time, "scrape_source": f"expireddomains:{url}", } status = await self._store_auction(db, auction_data) if status == "skipped": continue result["found"] += 1 result[status] += 1 await db.commit() log.completed_at = datetime.utcnow() log.status = "success" log.auctions_found = result["found"] log.auctions_new = result["new"] log.auctions_updated = result["updated"] await db.commit() except Exception as e: log.completed_at = datetime.utcnow() log.status = "failed" log.error_message = str(e)[:500] await db.commit() logger.error(f"ExpiredDomains({platform}) scrape failed: {e}") return result async def _scrape_expireddomains_godaddy(self, db: AsyncSession) -> Dict[str, Any]: return await self._scrape_expireddomains_auction_page( db=db, platform="GoDaddy", url="https://www.expireddomains.net/godaddy-domain-auctions-with-bids/", ) async def _scrape_expireddomains_namecheap(self, db: AsyncSession) -> Dict[str, Any]: return await self._scrape_expireddomains_auction_page( db=db, platform="Namecheap", url="https://www.expireddomains.net/namecheap-auction-domains/", ) async def _scrape_expireddomains_sedo(self, db: AsyncSession) -> Dict[str, Any]: return await self._scrape_expireddomains_auction_page( db=db, platform="Sedo", url="https://www.expireddomains.net/sedo-auction-domains/", ) async def _scrape_parkio_public(self, db: AsyncSession) -> Dict[str, Any]: """Scrape Park.io public auctions page (includes price + close date).""" platform = "Park.io" result = {"found": 0, "new": 0, "updated": 0} log = AuctionScrapeLog(platform=platform) db.add(log) await db.commit() try: await self._rate_limit(platform) client = await self._get_client() resp = await client.get("https://park.io/auctions", timeout=20.0) if resp.status_code != 200: raise Exception(f"HTTP {resp.status_code}") soup = BeautifulSoup(resp.text, "lxml") table = soup.select_one("table.table") if not table: raise Exception("Park.io table not found") rows = table.select("tbody tr") for row in rows[:200]: cols = row.find_all("td") if len(cols) < 5: continue domain = cols[1].get_text(" ", strip=True).lower() if not domain or "." not in domain: continue tld = domain.rsplit(".", 1)[-1].lower() parsed_price = self._parse_price_currency(cols[2].get_text(" ", strip=True)) if not parsed_price: continue current_bid, currency = parsed_price if current_bid <= 0: continue bids_raw = cols[3].get_text(" ", strip=True) try: num_bids = int(re.sub(r"[^0-9]", "", bids_raw) or "0") except Exception: continue close_raw = cols[4].get_text(" ", strip=True) try: # Park.io displays a naive timestamp. We treat it as UTC. end_time = datetime.strptime(close_raw, "%Y-%m-%d %H:%M:%S") except Exception: continue link_el = cols[1].find("a", href=True) href = link_el["href"] if link_el else None if href and href.startswith("/"): href = f"https://park.io{href}" auction_data = { "domain": domain, "tld": tld, "platform": platform, "auction_url": href or "https://park.io/auctions", "current_bid": current_bid, "currency": currency, "num_bids": num_bids, "end_time": end_time, "scrape_source": "park.io:auctions", } status = await self._store_auction(db, auction_data) if status == "skipped": continue result["found"] += 1 result[status] += 1 await db.commit() log.completed_at = datetime.utcnow() log.status = "success" log.auctions_found = result["found"] log.auctions_new = result["new"] log.auctions_updated = result["updated"] await db.commit() except Exception as e: log.completed_at = datetime.utcnow() log.status = "failed" log.error_message = str(e)[:500] await db.commit() logger.error(f"Park.io scrape failed: {e}") return result async def _scrape_sav_public(self, db: AsyncSession) -> Dict[str, Any]: """Scrape Sav auctions from their HTML table endpoint.""" platform = "Sav" result = {"found": 0, "new": 0, "updated": 0} log = AuctionScrapeLog(platform=platform) db.add(log) await db.commit() try: await self._rate_limit(platform) client = await self._get_client() now = datetime.utcnow() for page in range(0, 3): resp = await client.post( f"https://www.sav.com/auctions/load_domains_ajax/{page}", headers={"X-Requested-With": "XMLHttpRequest"}, timeout=20.0, ) if resp.status_code != 200: continue soup = BeautifulSoup(resp.text, "html.parser") rows = soup.select("tr") if not rows: continue for row in rows[:200]: cells = row.find_all("td") if len(cells) < 7: continue domain_link = cells[1].find("a") domain = domain_link.get_text(" ", strip=True).lower() if domain_link else "" if not domain or "." not in domain: continue tld = domain.rsplit(".", 1)[-1].lower() parsed_price = self._parse_price_currency(cells[2].get_text(" ", strip=True)) if not parsed_price: continue current_bid, currency = parsed_price if current_bid <= 0: continue bids_raw = cells[3].get_text(" ", strip=True) try: num_bids = int(re.sub(r"[^0-9]", "", bids_raw) or "0") except Exception: continue time_left_raw = cells[6].get_text(" ", strip=True) delta = self._parse_timeleft(time_left_raw) if not delta: continue end_time = now + delta href = domain_link.get("href") if domain_link else None if href and href.startswith("/"): href = f"https://www.sav.com{href}" auction_data = { "domain": domain, "tld": tld, "platform": platform, "auction_url": href or "https://www.sav.com/domains/auctions", "current_bid": current_bid, "currency": currency, "num_bids": num_bids, "end_time": end_time, "scrape_source": f"sav:load_domains_ajax:{page}", } status = await self._store_auction(db, auction_data) if status == "skipped": continue result["found"] += 1 result[status] += 1 await asyncio.sleep(1) await db.commit() log.completed_at = datetime.utcnow() log.status = "success" log.auctions_found = result["found"] log.auctions_new = result["new"] log.auctions_updated = result["updated"] await db.commit() except Exception as e: log.completed_at = datetime.utcnow() log.status = "failed" log.error_message = str(e)[:500] await db.commit() logger.error(f"Sav scrape failed: {e}") return result # ---------------------------- # Orchestration # ---------------------------- async def scrape_all_platforms(self, db: AsyncSession) -> Dict[str, Any]: """Scrape all configured sources and store results in DB.""" results = { "total_found": 0, "total_new": 0, "total_updated": 0, "platforms": {}, "errors": [], } def _touch_platform(platform: str): if platform not in results["platforms"]: results["platforms"][platform] = {"found": 0, "new": 0, "updated": 0} # TIER 0: Hidden APIs (Dynadot, etc.) try: hidden_api_result = await hidden_api_scraper.scrape_all(limit_per_platform=100) for item in hidden_api_result.get("items", []): action = await self._store_auction(db, item) if action == "skipped": continue platform = item.get("platform", "Unknown") _touch_platform(platform) results["platforms"][platform]["found"] += 1 results["total_found"] += 1 if action == "new": results["platforms"][platform]["new"] += 1 results["total_new"] += 1 elif action == "updated": results["platforms"][platform]["updated"] += 1 results["total_updated"] += 1 if hidden_api_result.get("errors"): for error in hidden_api_result["errors"]: results["errors"].append(f"Hidden API: {error}") except Exception as e: results["errors"].append(f"Hidden APIs: {str(e)}") await db.commit() # TIER 1: Official Partner APIs (if configured) for platform_name, api_func in [("DropCatch", self._fetch_dropcatch_api), ("Sedo", self._fetch_sedo_api)]: try: api_result = await api_func(db) if api_result.get("found", 0) > 0: results["platforms"][platform_name] = api_result results["total_found"] += api_result.get("found", 0) results["total_new"] += api_result.get("new", 0) results["total_updated"] += api_result.get("updated", 0) except Exception as e: results["errors"].append(f"{platform_name} API: {str(e)}") # TIER 2: Web scraping (non-Cloudflare, or via ExpiredDomains provider pages) scrapers = [ ("GoDaddy", self._scrape_expireddomains_godaddy), ("Namecheap", self._scrape_expireddomains_namecheap), ("Sedo", self._scrape_expireddomains_sedo), ("Park.io", self._scrape_parkio_public), ("Sav", self._scrape_sav_public), ] for platform_name, fn in scrapers: try: r = await fn(db) results["platforms"][platform_name] = r results["total_found"] += r.get("found", 0) results["total_new"] += r.get("new", 0) results["total_updated"] += r.get("updated", 0) except Exception as e: results["errors"].append(f"{platform_name}: {str(e)}") # TIER 3: Playwright (opt-in) playwright_enabled = os.getenv("POUNCE_ENABLE_PROTECTED_SCRAPERS", "false").lower() in ("1", "true", "yes") if PLAYWRIGHT_AVAILABLE and playwright_scraper and playwright_enabled: try: playwright_result = await playwright_scraper.scrape_all_protected() for item in playwright_result.get("items", []): action = await self._store_auction(db, item) if action == "skipped": continue platform = item.get("platform", "Unknown") _touch_platform(platform) results["platforms"][platform]["found"] += 1 results["total_found"] += 1 if action == "new": results["platforms"][platform]["new"] += 1 results["total_new"] += 1 elif action == "updated": results["platforms"][platform]["updated"] += 1 results["total_updated"] += 1 if playwright_result.get("errors"): for error in playwright_result["errors"]: results["errors"].append(f"Playwright: {error}") except Exception as e: results["errors"].append(f"Playwright: {str(e)}") await db.commit() await self._cleanup_ended_auctions(db) return results # ---------------------------- # Tier 1 helpers (official APIs) # ---------------------------- async def _fetch_dropcatch_api(self, db: AsyncSession) -> Dict[str, Any]: platform = "DropCatch" result = {"found": 0, "new": 0, "updated": 0, "source": "api"} if not dropcatch_client.is_configured: return result log = AuctionScrapeLog(platform=platform) db.add(log) await db.commit() try: api_result = await dropcatch_client.search_auctions(page_size=100) auctions = api_result.get("auctions") or api_result.get("items") or [] result["found"] = len(auctions) for dc_auction in auctions: auction_data = dropcatch_client.transform_to_pounce_format(dc_auction) status = await self._store_auction(db, auction_data) if status == "skipped": continue result[status] += 1 await db.commit() log.status = "success" log.auctions_found = result["found"] log.auctions_new = result["new"] log.auctions_updated = result["updated"] log.completed_at = datetime.utcnow() await db.commit() except Exception as e: log.status = "failed" log.error_message = str(e)[:500] log.completed_at = datetime.utcnow() await db.commit() return result async def _fetch_sedo_api(self, db: AsyncSession) -> Dict[str, Any]: platform = "Sedo" result = {"found": 0, "new": 0, "updated": 0, "source": "api"} if not sedo_client.is_configured: return result log = AuctionScrapeLog(platform=platform) db.add(log) await db.commit() try: api_result = await sedo_client.search_auctions(page_size=100) listings = api_result.get("domains") or api_result.get("items") or api_result.get("result") or [] if isinstance(listings, dict): listings = list(listings.values()) if listings else [] result["found"] = len(listings) for sedo_listing in listings: auction_data = sedo_client.transform_to_pounce_format(sedo_listing) status = await self._store_auction(db, auction_data) if status == "skipped": continue result[status] += 1 await db.commit() log.status = "success" log.auctions_found = result["found"] log.auctions_new = result["new"] log.auctions_updated = result["updated"] log.completed_at = datetime.utcnow() await db.commit() except Exception as e: log.status = "failed" log.error_message = str(e)[:500] log.completed_at = datetime.utcnow() await db.commit() return result # ---------------------------- # DB cleanup / queries # ---------------------------- async def _cleanup_ended_auctions(self, db: AsyncSession): """Mark auctions that have ended as inactive and delete very old inactive auctions.""" now = datetime.utcnow() from sqlalchemy import update await db.execute( update(DomainAuction) .where(and_(DomainAuction.end_time < now, DomainAuction.is_active == True)) .values(is_active=False) ) cutoff = now - timedelta(days=30) await db.execute( delete(DomainAuction).where(and_(DomainAuction.is_active == False, DomainAuction.end_time < cutoff)) ) await db.commit() async def get_active_auctions( self, db: AsyncSession, platform: Optional[str] = None, tld: Optional[str] = None, keyword: Optional[str] = None, min_bid: Optional[float] = None, max_bid: Optional[float] = None, ending_within_hours: Optional[int] = None, sort_by: str = "end_time", limit: int = 50, offset: int = 0, ) -> List[DomainAuction]: """Get active auctions from database with filters.""" query = select(DomainAuction).where(DomainAuction.is_active == True) if platform: query = query.where(DomainAuction.platform == platform) if tld: query = query.where(DomainAuction.tld == tld.lower().lstrip(".")) if keyword: query = query.where(DomainAuction.domain.ilike(f"%{keyword}%")) if min_bid is not None: query = query.where(DomainAuction.current_bid >= min_bid) if max_bid is not None: query = query.where(DomainAuction.current_bid <= max_bid) if ending_within_hours: cutoff = datetime.utcnow() + timedelta(hours=ending_within_hours) query = query.where(DomainAuction.end_time <= cutoff) if sort_by == "end_time": query = query.order_by(DomainAuction.end_time.asc()) elif sort_by == "bid_asc": query = query.order_by(DomainAuction.current_bid.asc()) elif sort_by == "bid_desc": query = query.order_by(DomainAuction.current_bid.desc()) elif sort_by == "bids": query = query.order_by(DomainAuction.num_bids.desc()) result = await db.execute(query.offset(offset).limit(limit)) return list(result.scalars().all()) async def get_auction_count(self, db: AsyncSession) -> int: """Get total count of active auctions.""" from sqlalchemy import func result = await db.execute(select(func.count(DomainAuction.id)).where(DomainAuction.is_active == True)) return result.scalar() or 0 async def close(self): """Close HTTP client.""" if self.http_client and not self.http_client.is_closed: await self.http_client.aclose() # Global instance auction_scraper = AuctionScraperService()