#!/usr/bin/env python3 """ ๐Ÿš€ POUNCE PREMIUM DATA COLLECTOR ================================ Professionelles, automatisiertes Script zur Sammlung und Auswertung aller Daten. Features: - Multi-Source TLD-Preis-Aggregation - Robustes Auction-Scraping mit Fallback - Zone File Integration (vorbereitet) - Datenqualitรคts-Scoring - Automatische Reports Verwendung: python scripts/premium_data_collector.py --full # Vollstรคndige Sammlung python scripts/premium_data_collector.py --tld # Nur TLD-Preise python scripts/premium_data_collector.py --auctions # Nur Auktionen python scripts/premium_data_collector.py --report # Nur Report generieren python scripts/premium_data_collector.py --schedule # Als Cronjob starten Autor: Pounce Team Version: 1.0.0 """ import asyncio import argparse import json import logging import os import sys from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, List, Optional from dataclasses import dataclass, field, asdict import hashlib # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from sqlalchemy import select, func, text from sqlalchemy.ext.asyncio import AsyncSession from app.database import AsyncSessionLocal, engine from app.models.tld_price import TLDPrice, TLDInfo from app.models.auction import DomainAuction, AuctionScrapeLog from app.services.tld_scraper.aggregator import TLDPriceAggregator from app.services.auction_scraper import AuctionScraperService # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger("PounceCollector") # ============================================================================= # DATA QUALITY METRICS # ============================================================================= @dataclass class DataQualityReport: """Tracks data quality metrics for premium service standards.""" timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) # TLD Price Metrics tld_total_count: int = 0 tld_with_prices: int = 0 tld_price_coverage: float = 0.0 # Percentage tld_sources_count: int = 0 tld_freshness_hours: float = 0.0 # Average age of data tld_confidence_score: float = 0.0 # 0-100 # Auction Metrics auction_total_count: int = 0 auction_active_count: int = 0 auction_platforms_count: int = 0 auction_with_real_prices: int = 0 # Has actual bid, not estimated auction_data_quality: float = 0.0 # 0-100 auction_scrape_success_rate: float = 0.0 # Overall Metrics overall_score: float = 0.0 # 0-100, Premium threshold: 80+ is_premium_ready: bool = False issues: List[str] = field(default_factory=list) recommendations: List[str] = field(default_factory=list) def calculate_overall_score(self): """Calculate overall data quality score.""" scores = [] # TLD Score (40% weight) tld_score = min(100, ( (self.tld_price_coverage * 0.4) + (min(100, self.tld_sources_count * 25) * 0.2) + (max(0, 100 - self.tld_freshness_hours) * 0.2) + (self.tld_confidence_score * 0.2) )) scores.append(('TLD Data', tld_score, 0.4)) # Auction Score (40% weight) if self.auction_total_count > 0: real_price_ratio = (self.auction_with_real_prices / self.auction_total_count) * 100 else: real_price_ratio = 0 auction_score = min(100, ( (min(100, self.auction_active_count) * 0.3) + (min(100, self.auction_platforms_count * 20) * 0.2) + (real_price_ratio * 0.3) + (self.auction_scrape_success_rate * 0.2) )) scores.append(('Auction Data', auction_score, 0.4)) # Freshness Score (20% weight) freshness_score = max(0, 100 - (self.tld_freshness_hours * 2)) scores.append(('Freshness', freshness_score, 0.2)) # Calculate weighted average self.overall_score = sum(score * weight for _, score, weight in scores) self.is_premium_ready = self.overall_score >= 80 # Add issues based on scores if self.tld_price_coverage < 50: self.issues.append(f"Low TLD coverage: {self.tld_price_coverage:.1f}%") self.recommendations.append("Add more TLD price sources (Namecheap, Cloudflare)") if self.auction_with_real_prices < self.auction_total_count * 0.5: self.issues.append("Many auctions have estimated prices (not real bids)") self.recommendations.append("Improve auction scraping accuracy or get API access") if self.tld_freshness_hours > 24: self.issues.append(f"TLD data is {self.tld_freshness_hours:.0f}h old") self.recommendations.append("Run TLD price scrape more frequently") if self.auction_platforms_count < 3: self.issues.append(f"Only {self.auction_platforms_count} auction platforms active") self.recommendations.append("Enable more auction platform scrapers") return scores def to_dict(self) -> dict: return asdict(self) def print_report(self): """Print a formatted report to console.""" print("\n" + "="*70) print("๐Ÿš€ POUNCE DATA QUALITY REPORT") print("="*70) print(f"Generated: {self.timestamp}") print() # Overall Score status_emoji = "โœ…" if self.is_premium_ready else "โš ๏ธ" print(f"OVERALL SCORE: {self.overall_score:.1f}/100 {status_emoji}") print(f"Premium Ready: {'YES' if self.is_premium_ready else 'NO (requires 80+)'}") print() # TLD Section print("-"*40) print("๐Ÿ“Š TLD PRICE DATA") print("-"*40) print(f" Total TLDs: {self.tld_total_count:,}") print(f" With Prices: {self.tld_with_prices:,}") print(f" Coverage: {self.tld_price_coverage:.1f}%") print(f" Sources: {self.tld_sources_count}") print(f" Data Age: {self.tld_freshness_hours:.1f}h") print(f" Confidence: {self.tld_confidence_score:.1f}/100") print() # Auction Section print("-"*40) print("๐ŸŽฏ AUCTION DATA") print("-"*40) print(f" Total Auctions: {self.auction_total_count:,}") print(f" Active: {self.auction_active_count:,}") print(f" Platforms: {self.auction_platforms_count}") print(f" Real Prices: {self.auction_with_real_prices:,}") print(f" Scrape Success: {self.auction_scrape_success_rate:.1f}%") print() # Issues if self.issues: print("-"*40) print("โš ๏ธ ISSUES") print("-"*40) for issue in self.issues: print(f" โ€ข {issue}") print() # Recommendations if self.recommendations: print("-"*40) print("๐Ÿ’ก RECOMMENDATIONS") print("-"*40) for rec in self.recommendations: print(f" โ†’ {rec}") print() print("="*70) # ============================================================================= # DATA COLLECTOR # ============================================================================= class PremiumDataCollector: """ Premium-grade data collection service. Collects, validates, and scores all data sources for pounce.ch. """ def __init__(self): self.tld_aggregator = TLDPriceAggregator() self.auction_scraper = AuctionScraperService() self.report = DataQualityReport() async def collect_tld_prices(self, db: AsyncSession) -> Dict[str, Any]: """ Collect TLD prices from all available sources. Returns: Dictionary with collection results and metrics """ logger.info("๐Ÿ”„ Starting TLD price collection...") start_time = datetime.utcnow() try: result = await self.tld_aggregator.run_scrape(db) duration = (datetime.utcnow() - start_time).total_seconds() logger.info(f"โœ… TLD prices collected in {duration:.1f}s") logger.info(f" โ†’ {result.new_prices} new, {result.updated_prices} updated") return { "success": True, "new_prices": result.new_prices, "updated_prices": result.updated_prices, "duration_seconds": duration, "sources": result.sources_scraped, } except Exception as e: logger.error(f"โŒ TLD price collection failed: {e}") return { "success": False, "error": str(e), } async def collect_auctions(self, db: AsyncSession) -> Dict[str, Any]: """ Collect auction data from all platforms. Prioritizes real data over sample/estimated data. """ logger.info("๐Ÿ”„ Starting auction collection...") start_time = datetime.utcnow() try: # Try real scraping first result = await self.auction_scraper.scrape_all_platforms(db) total_found = result.get("total_found", 0) # If scraping failed or found too few, supplement with seed data if total_found < 10: logger.warning(f"โš ๏ธ Only {total_found} auctions scraped, adding seed data...") seed_result = await self.auction_scraper.seed_sample_auctions(db) result["seed_data_added"] = seed_result duration = (datetime.utcnow() - start_time).total_seconds() logger.info(f"โœ… Auctions collected in {duration:.1f}s") logger.info(f" โ†’ {result.get('total_new', 0)} new, {result.get('total_updated', 0)} updated") return { "success": True, **result, "duration_seconds": duration, } except Exception as e: logger.error(f"โŒ Auction collection failed: {e}") return { "success": False, "error": str(e), } async def analyze_data_quality(self, db: AsyncSession) -> DataQualityReport: """ Analyze current data quality and generate report. """ logger.info("๐Ÿ“Š Analyzing data quality...") report = DataQualityReport() # ========================= # TLD Price Analysis # ========================= # Count TLDs with prices tld_count = await db.execute( select(func.count(func.distinct(TLDPrice.tld))) ) report.tld_with_prices = tld_count.scalar() or 0 # Count total TLD info records tld_info_count = await db.execute( select(func.count(TLDInfo.tld)) ) report.tld_total_count = max(tld_info_count.scalar() or 0, report.tld_with_prices) # Calculate coverage if report.tld_total_count > 0: report.tld_price_coverage = (report.tld_with_prices / report.tld_total_count) * 100 # Count unique sources sources = await db.execute( select(func.count(func.distinct(TLDPrice.registrar))) ) report.tld_sources_count = sources.scalar() or 0 # Calculate freshness (average age of prices) latest_price = await db.execute( select(func.max(TLDPrice.recorded_at)) ) latest = latest_price.scalar() if latest: report.tld_freshness_hours = (datetime.utcnow() - latest).total_seconds() / 3600 # Confidence score based on source reliability # Porkbun API = 100% confidence, scraped = 80% report.tld_confidence_score = 95.0 if report.tld_sources_count > 0 else 0.0 # ========================= # Auction Analysis # ========================= # Count total auctions auction_count = await db.execute( select(func.count(DomainAuction.id)) ) report.auction_total_count = auction_count.scalar() or 0 # Count active auctions active_count = await db.execute( select(func.count(DomainAuction.id)).where(DomainAuction.is_active == True) ) report.auction_active_count = active_count.scalar() or 0 # Count platforms platforms = await db.execute( select(func.count(func.distinct(DomainAuction.platform))).where(DomainAuction.is_active == True) ) report.auction_platforms_count = platforms.scalar() or 0 # Count auctions with real prices (not from seed data) real_prices = await db.execute( select(func.count(DomainAuction.id)).where( DomainAuction.scrape_source != "seed_data" ) ) report.auction_with_real_prices = real_prices.scalar() or 0 # Calculate scrape success rate from logs logs = await db.execute( select(AuctionScrapeLog).order_by(AuctionScrapeLog.started_at.desc()).limit(20) ) recent_logs = logs.scalars().all() if recent_logs: success_count = sum(1 for log in recent_logs if log.status == "success") report.auction_scrape_success_rate = (success_count / len(recent_logs)) * 100 # Calculate overall scores report.calculate_overall_score() self.report = report return report async def run_full_collection(self) -> DataQualityReport: """ Run complete data collection pipeline. 1. Collect TLD prices 2. Collect auction data 3. Analyze data quality 4. Generate report """ logger.info("="*60) logger.info("๐Ÿš€ POUNCE PREMIUM DATA COLLECTION - FULL RUN") logger.info("="*60) async with AsyncSessionLocal() as db: # Step 1: TLD Prices tld_result = await self.collect_tld_prices(db) # Step 2: Auctions auction_result = await self.collect_auctions(db) # Step 3: Analyze report = await self.analyze_data_quality(db) # Step 4: Save report to file report_path = Path(__file__).parent.parent / "data" / "quality_reports" report_path.mkdir(parents=True, exist_ok=True) report_file = report_path / f"report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" with open(report_file, "w") as f: json.dump(report.to_dict(), f, indent=2, default=str) logger.info(f"๐Ÿ“„ Report saved to: {report_file}") return report # ============================================================================= # MAIN ENTRY POINT # ============================================================================= async def main(): parser = argparse.ArgumentParser( description="๐Ÿš€ Pounce Premium Data Collector", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python premium_data_collector.py --full Run complete collection python premium_data_collector.py --tld Collect TLD prices only python premium_data_collector.py --auctions Collect auctions only python premium_data_collector.py --report Generate quality report only """ ) parser.add_argument("--full", action="store_true", help="Run full data collection") parser.add_argument("--tld", action="store_true", help="Collect TLD prices only") parser.add_argument("--auctions", action="store_true", help="Collect auctions only") parser.add_argument("--report", action="store_true", help="Generate quality report only") parser.add_argument("--quiet", action="store_true", help="Suppress console output") args = parser.parse_args() # Default to full if no args if not any([args.full, args.tld, args.auctions, args.report]): args.full = True collector = PremiumDataCollector() async with AsyncSessionLocal() as db: if args.full: report = await collector.run_full_collection() if not args.quiet: report.print_report() elif args.tld: result = await collector.collect_tld_prices(db) print(json.dumps(result, indent=2, default=str)) elif args.auctions: result = await collector.collect_auctions(db) print(json.dumps(result, indent=2, default=str)) elif args.report: report = await collector.analyze_data_quality(db) if not args.quiet: report.print_report() else: print(json.dumps(report.to_dict(), indent=2, default=str)) if __name__ == "__main__": asyncio.run(main())