""" Domain Listings API - Pounce Marketplace This implements the "Micro-Marktplatz" from analysis_3.md: - Create professional "For Sale" landing pages - DNS verification for ownership - Contact form for buyers - Analytics Endpoints: - GET /listings - Public: Browse active listings - GET /listings/{slug} - Public: View listing details - POST /listings/{slug}/inquire - Public: Contact seller - POST /listings - Auth: Create new listing - GET /listings/my - Auth: Get user's listings - PUT /listings/{id} - Auth: Update listing - DELETE /listings/{id} - Auth: Delete listing - POST /listings/{id}/verify-dns - Auth: Start DNS verification - GET /listings/{id}/verify-dns/check - Auth: Check verification status """ import logging import secrets import re from datetime import datetime, timedelta from typing import Optional, List from fastapi import APIRouter, Depends, Query, HTTPException, Request from pydantic import BaseModel, Field, EmailStr from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.api.deps import get_current_user, get_current_user_optional from app.models.user import User from app.models.listing import DomainListing, ListingInquiry, ListingView, ListingStatus, VerificationStatus from app.services.valuation import valuation_service def _calculate_pounce_score(domain: str, is_pounce: bool = True) -> int: """ Calculate Pounce Score for a domain. Uses the same algorithm as Market Feed (_calculate_pounce_score_v2 in auctions.py). """ # Parse domain parts = domain.lower().rsplit(".", 1) if len(parts) != 2: return 50 name, tld = parts score = 50 # Baseline # A) LENGTH BONUS (exponential for short domains) length_scores = {1: 50, 2: 45, 3: 40, 4: 30, 5: 20, 6: 15, 7: 10} score += length_scores.get(len(name), max(0, 15 - len(name))) # B) TLD PREMIUM tld_scores = { 'com': 20, 'ai': 25, 'io': 18, 'co': 12, 'ch': 15, 'de': 10, 'net': 8, 'org': 8, 'app': 10, 'dev': 10, 'xyz': 5 } score += tld_scores.get(tld.lower(), 0) # C) POUNCE DIRECT BONUS (listings are always Pounce Direct) if is_pounce: score += 10 # D) PENALTIES if '-' in name: score -= 25 if any(c.isdigit() for c in name) and len(name) > 3: score -= 20 if len(name) > 15: score -= 15 # Clamp to 0-100 return max(0, min(100, score)) logger = logging.getLogger(__name__) router = APIRouter() # ============== Schemas ============== class ListingCreate(BaseModel): """Create a new domain listing.""" domain: str = Field(..., min_length=3, max_length=255) title: Optional[str] = Field(None, max_length=200) description: Optional[str] = None asking_price: Optional[float] = Field(None, ge=0) min_offer: Optional[float] = Field(None, ge=0) currency: str = Field("USD", max_length=3) price_type: str = Field("negotiable") # fixed, negotiable, make_offer show_valuation: bool = True allow_offers: bool = True class ListingUpdate(BaseModel): """Update a listing.""" title: Optional[str] = Field(None, max_length=200) description: Optional[str] = None asking_price: Optional[float] = Field(None, ge=0) min_offer: Optional[float] = Field(None, ge=0) price_type: Optional[str] = None show_valuation: Optional[bool] = None allow_offers: Optional[bool] = None status: Optional[str] = None class ListingResponse(BaseModel): """Listing response.""" id: int domain: str slug: str title: Optional[str] description: Optional[str] asking_price: Optional[float] min_offer: Optional[float] currency: str price_type: str pounce_score: Optional[int] estimated_value: Optional[float] verification_status: str is_verified: bool status: str show_valuation: bool allow_offers: bool view_count: int inquiry_count: int public_url: str created_at: datetime published_at: Optional[datetime] # Seller info (minimal for privacy) seller_verified: bool = False seller_member_since: Optional[datetime] = None class Config: from_attributes = True class ListingPublicResponse(BaseModel): """Public listing response (limited info).""" domain: str slug: str title: Optional[str] description: Optional[str] asking_price: Optional[float] currency: str price_type: str pounce_score: Optional[int] estimated_value: Optional[float] is_verified: bool allow_offers: bool public_url: str # Seller trust indicators seller_verified: bool seller_member_since: Optional[datetime] class Config: from_attributes = True class InquiryCreate(BaseModel): """Create an inquiry for a listing.""" name: str = Field(..., min_length=2, max_length=100) email: EmailStr phone: Optional[str] = Field(None, max_length=50) company: Optional[str] = Field(None, max_length=200) message: str = Field(..., min_length=10, max_length=2000) offer_amount: Optional[float] = Field(None, ge=0) class InquiryResponse(BaseModel): """Inquiry response for listing owner.""" id: int name: str email: str phone: Optional[str] company: Optional[str] message: str offer_amount: Optional[float] status: str created_at: datetime read_at: Optional[datetime] class Config: from_attributes = True class VerificationResponse(BaseModel): """DNS verification response.""" verification_code: str dns_record_type: str = "TXT" dns_record_name: str dns_record_value: str instructions: str status: str # ============== Helper Functions ============== def _generate_slug(domain: str) -> str: """Generate URL-friendly slug from domain.""" # Remove TLD for cleaner slug slug = domain.lower().replace('.', '-') # Remove any non-alphanumeric chars except hyphens slug = re.sub(r'[^a-z0-9-]', '', slug) return slug def _generate_verification_code() -> str: """Generate a unique verification code.""" return f"pounce-verify-{secrets.token_hex(16)}" # Security: Block phishing keywords (from analysis_3.md - Säule 3) BLOCKED_KEYWORDS = [ 'login', 'bank', 'verify', 'paypal', 'password', 'account', 'credit', 'social security', 'ssn', 'wire', 'transfer' ] def _check_content_safety(text: str) -> bool: """Check if content contains phishing keywords.""" text_lower = text.lower() return not any(keyword in text_lower for keyword in BLOCKED_KEYWORDS) # ============== Public Endpoints ============== @router.get("", response_model=List[ListingPublicResponse]) async def browse_listings( keyword: Optional[str] = Query(None), min_price: Optional[float] = Query(None, ge=0), max_price: Optional[float] = Query(None, ge=0), verified_only: bool = Query(False), sort_by: str = Query("newest", enum=["newest", "price_asc", "price_desc", "popular"]), limit: int = Query(20, le=50), offset: int = Query(0, ge=0), db: AsyncSession = Depends(get_db), ): """Browse active domain listings (public).""" query = select(DomainListing).where( DomainListing.status == ListingStatus.ACTIVE.value ) if keyword: query = query.where(DomainListing.domain.ilike(f"%{keyword}%")) if min_price is not None: query = query.where(DomainListing.asking_price >= min_price) if max_price is not None: query = query.where(DomainListing.asking_price <= max_price) if verified_only: query = query.where( DomainListing.verification_status == VerificationStatus.VERIFIED.value ) # Sorting if sort_by == "price_asc": query = query.order_by(DomainListing.asking_price.asc().nullslast()) elif sort_by == "price_desc": query = query.order_by(DomainListing.asking_price.desc().nullsfirst()) elif sort_by == "popular": query = query.order_by(DomainListing.view_count.desc()) else: # newest query = query.order_by(DomainListing.published_at.desc()) query = query.offset(offset).limit(limit) result = await db.execute(query) listings = list(result.scalars().all()) responses = [] for listing in listings: # Calculate pounce_score dynamically if not stored pounce_score = listing.pounce_score if pounce_score is None: pounce_score = _calculate_pounce_score(listing.domain) # Save it for future requests listing.pounce_score = pounce_score responses.append(ListingPublicResponse( domain=listing.domain, slug=listing.slug, title=listing.title, description=listing.description, asking_price=listing.asking_price, currency=listing.currency, price_type=listing.price_type, pounce_score=pounce_score, # Always return the score estimated_value=listing.estimated_value if listing.show_valuation else None, is_verified=listing.is_verified, allow_offers=listing.allow_offers, public_url=listing.public_url, seller_verified=listing.is_verified, seller_member_since=listing.user.created_at if listing.user else None, )) await db.commit() # Save any updated pounce_scores return responses # ============== Authenticated Endpoints (before dynamic routes!) ============== @router.get("/my", response_model=List[ListingResponse]) async def get_my_listings( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Get current user's listings.""" result = await db.execute( select(DomainListing) .where(DomainListing.user_id == current_user.id) .order_by(DomainListing.created_at.desc()) ) listings = list(result.scalars().all()) return [ ListingResponse( id=listing.id, domain=listing.domain, slug=listing.slug, title=listing.title, description=listing.description, asking_price=listing.asking_price, min_offer=listing.min_offer, currency=listing.currency, price_type=listing.price_type, pounce_score=listing.pounce_score, estimated_value=listing.estimated_value, verification_status=listing.verification_status, is_verified=listing.is_verified, status=listing.status, show_valuation=listing.show_valuation, allow_offers=listing.allow_offers, view_count=listing.view_count, inquiry_count=listing.inquiry_count, public_url=listing.public_url, created_at=listing.created_at, published_at=listing.published_at, seller_verified=current_user.is_verified, seller_member_since=current_user.created_at, ) for listing in listings ] # ============== Public Dynamic Routes ============== @router.get("/{slug}", response_model=ListingPublicResponse) async def get_listing_by_slug( slug: str, request: Request, db: AsyncSession = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): """Get listing details by slug (public).""" result = await db.execute( select(DomainListing).where( and_( DomainListing.slug == slug, DomainListing.status == ListingStatus.ACTIVE.value, ) ) ) listing = result.scalar_one_or_none() if not listing: raise HTTPException(status_code=404, detail="Listing not found") # Record view view = ListingView( listing_id=listing.id, ip_address=request.client.host if request.client else None, user_agent=request.headers.get("user-agent", "")[:500], referrer=request.headers.get("referer", "")[:500], user_id=current_user.id if current_user else None, ) db.add(view) # Increment view count listing.view_count += 1 # Calculate pounce_score dynamically if not stored (same as Market Feed) pounce_score = listing.pounce_score if pounce_score is None: pounce_score = _calculate_pounce_score(listing.domain) # Save it for future requests listing.pounce_score = pounce_score await db.commit() return ListingPublicResponse( domain=listing.domain, slug=listing.slug, title=listing.title, description=listing.description, asking_price=listing.asking_price, currency=listing.currency, price_type=listing.price_type, pounce_score=pounce_score, # Always return the score estimated_value=listing.estimated_value if listing.show_valuation else None, is_verified=listing.is_verified, allow_offers=listing.allow_offers, public_url=listing.public_url, seller_verified=listing.is_verified, seller_member_since=listing.user.created_at if listing.user else None, ) @router.post("/{slug}/inquire") async def submit_inquiry( slug: str, inquiry: InquiryCreate, request: Request, db: AsyncSession = Depends(get_db), ): """Submit an inquiry for a listing (public).""" # Find listing result = await db.execute( select(DomainListing).where( and_( DomainListing.slug == slug, DomainListing.status == ListingStatus.ACTIVE.value, ) ) ) listing = result.scalar_one_or_none() if not listing: raise HTTPException(status_code=404, detail="Listing not found") # Security: Check for phishing keywords if not _check_content_safety(inquiry.message): raise HTTPException( status_code=400, detail="Message contains blocked content. Please revise." ) # Rate limiting check (simple: max 3 inquiries per email per listing per day) today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) existing_count = await db.execute( select(func.count(ListingInquiry.id)).where( and_( ListingInquiry.listing_id == listing.id, ListingInquiry.email == inquiry.email.lower(), ListingInquiry.created_at >= today_start, ) ) ) if existing_count.scalar() >= 3: raise HTTPException( status_code=429, detail="Too many inquiries. Please try again tomorrow." ) # Create inquiry new_inquiry = ListingInquiry( listing_id=listing.id, name=inquiry.name, email=inquiry.email.lower(), phone=inquiry.phone, company=inquiry.company, message=inquiry.message, offer_amount=inquiry.offer_amount, ip_address=request.client.host if request.client else None, user_agent=request.headers.get("user-agent", "")[:500], ) db.add(new_inquiry) # Increment inquiry count listing.inquiry_count += 1 await db.commit() # Send email notification to seller try: from app.services.email_service import email_service from app.models.user import User # Get seller's email seller_result = await db.execute( select(User).where(User.id == listing.user_id) ) seller = seller_result.scalar_one_or_none() if seller and seller.email and email_service.is_configured(): await email_service.send_listing_inquiry( to_email=seller.email, domain=listing.domain, name=inquiry.name, email=inquiry.email, message=inquiry.message, company=inquiry.company, offer_amount=inquiry.offer_amount, ) logger.info(f"📧 Inquiry notification sent to {seller.email} for {listing.domain}") except Exception as e: logger.error(f"Failed to send inquiry notification: {e}") return { "success": True, "message": "Your inquiry has been sent to the seller.", } # ============== Listing Management (Authenticated) ============== @router.post("", response_model=ListingResponse) async def create_listing( data: ListingCreate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Create a new domain listing.""" # Check if domain is already listed existing = await db.execute( select(DomainListing).where(DomainListing.domain == data.domain.lower()) ) if existing.scalar_one_or_none(): raise HTTPException(status_code=400, detail="This domain is already listed") # Check user's listing limit based on subscription user_listings = await db.execute( select(func.count(DomainListing.id)).where( DomainListing.user_id == current_user.id ) ) listing_count = user_listings.scalar() or 0 # Listing limits by tier (from pounce_pricing.md) # Load subscription separately to avoid async lazy loading issues from app.models.subscription import Subscription sub_result = await db.execute( select(Subscription).where(Subscription.user_id == current_user.id) ) subscription = sub_result.scalar_one_or_none() tier = subscription.tier if subscription else "scout" limits = {"scout": 0, "trader": 5, "tycoon": 50} max_listings = limits.get(tier, 0) if listing_count >= max_listings: raise HTTPException( status_code=403, detail=f"Listing limit reached ({max_listings}). Upgrade your plan for more." ) # Generate slug slug = _generate_slug(data.domain) # Check slug uniqueness slug_check = await db.execute( select(DomainListing).where(DomainListing.slug == slug) ) if slug_check.scalar_one_or_none(): slug = f"{slug}-{secrets.token_hex(4)}" # Get valuation try: valuation = await valuation_service.estimate_value(data.domain, db, save_result=False) pounce_score = min(100, int(valuation.get("score", 50))) estimated_value = valuation.get("value", 0) # Fixed: was 'estimated_value', service returns 'value' except Exception: pounce_score = 50 estimated_value = None # Create listing listing = DomainListing( user_id=current_user.id, domain=data.domain.lower(), slug=slug, title=data.title, description=data.description, asking_price=data.asking_price, min_offer=data.min_offer, currency=data.currency.upper(), price_type=data.price_type, show_valuation=data.show_valuation, allow_offers=data.allow_offers, pounce_score=pounce_score, estimated_value=estimated_value, verification_code=_generate_verification_code(), status=ListingStatus.DRAFT.value, ) db.add(listing) await db.commit() await db.refresh(listing) return ListingResponse( id=listing.id, domain=listing.domain, slug=listing.slug, title=listing.title, description=listing.description, asking_price=listing.asking_price, min_offer=listing.min_offer, currency=listing.currency, price_type=listing.price_type, pounce_score=listing.pounce_score, estimated_value=listing.estimated_value, verification_status=listing.verification_status, is_verified=listing.is_verified, status=listing.status, show_valuation=listing.show_valuation, allow_offers=listing.allow_offers, view_count=listing.view_count, inquiry_count=listing.inquiry_count, public_url=listing.public_url, created_at=listing.created_at, published_at=listing.published_at, seller_verified=current_user.is_verified, seller_member_since=current_user.created_at, ) @router.get("/{id}/inquiries", response_model=List[InquiryResponse]) async def get_listing_inquiries( id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Get inquiries for a listing.""" # Verify ownership result = await db.execute( select(DomainListing).where( and_( DomainListing.id == id, DomainListing.user_id == current_user.id, ) ) ) listing = result.scalar_one_or_none() if not listing: raise HTTPException(status_code=404, detail="Listing not found") inquiries_result = await db.execute( select(ListingInquiry) .where(ListingInquiry.listing_id == id) .order_by(ListingInquiry.created_at.desc()) ) inquiries = list(inquiries_result.scalars().all()) return [ InquiryResponse( id=inq.id, name=inq.name, email=inq.email, phone=inq.phone, company=inq.company, message=inq.message, offer_amount=inq.offer_amount, status=inq.status, created_at=inq.created_at, read_at=inq.read_at, ) for inq in inquiries ] @router.put("/{id}", response_model=ListingResponse) async def update_listing( id: int, data: ListingUpdate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Update a listing.""" result = await db.execute( select(DomainListing).where( and_( DomainListing.id == id, DomainListing.user_id == current_user.id, ) ) ) listing = result.scalar_one_or_none() if not listing: raise HTTPException(status_code=404, detail="Listing not found") # Update fields if data.title is not None: listing.title = data.title if data.description is not None: listing.description = data.description if data.asking_price is not None: listing.asking_price = data.asking_price if data.min_offer is not None: listing.min_offer = data.min_offer if data.price_type is not None: listing.price_type = data.price_type if data.show_valuation is not None: listing.show_valuation = data.show_valuation if data.allow_offers is not None: listing.allow_offers = data.allow_offers # Status change if data.status is not None: if data.status == "active" and listing.status == "draft": # Publish listing if not listing.is_verified: raise HTTPException( status_code=400, detail="Cannot publish without DNS verification" ) listing.status = ListingStatus.ACTIVE.value listing.published_at = datetime.utcnow() elif data.status in ["draft", "sold", "expired"]: listing.status = data.status await db.commit() await db.refresh(listing) return ListingResponse( id=listing.id, domain=listing.domain, slug=listing.slug, title=listing.title, description=listing.description, asking_price=listing.asking_price, min_offer=listing.min_offer, currency=listing.currency, price_type=listing.price_type, pounce_score=listing.pounce_score, estimated_value=listing.estimated_value, verification_status=listing.verification_status, is_verified=listing.is_verified, status=listing.status, show_valuation=listing.show_valuation, allow_offers=listing.allow_offers, view_count=listing.view_count, inquiry_count=listing.inquiry_count, public_url=listing.public_url, created_at=listing.created_at, published_at=listing.published_at, seller_verified=current_user.is_verified, seller_member_since=current_user.created_at, ) @router.delete("/{id}") async def delete_listing( id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Delete a listing.""" result = await db.execute( select(DomainListing).where( and_( DomainListing.id == id, DomainListing.user_id == current_user.id, ) ) ) listing = result.scalar_one_or_none() if not listing: raise HTTPException(status_code=404, detail="Listing not found") await db.delete(listing) await db.commit() return {"success": True, "message": "Listing deleted"} @router.post("/{id}/verify-dns", response_model=VerificationResponse) async def start_dns_verification( id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Start DNS verification for a listing.""" result = await db.execute( select(DomainListing).where( and_( DomainListing.id == id, DomainListing.user_id == current_user.id, ) ) ) listing = result.scalar_one_or_none() if not listing: raise HTTPException(status_code=404, detail="Listing not found") # Generate new code if needed if not listing.verification_code: listing.verification_code = _generate_verification_code() listing.verification_status = VerificationStatus.PENDING.value await db.commit() # Extract domain root for DNS domain_parts = listing.domain.split('.') if len(domain_parts) > 2: dns_name = f"_pounce.{'.'.join(domain_parts[-2:])}" else: dns_name = f"_pounce.{listing.domain}" return VerificationResponse( verification_code=listing.verification_code, dns_record_type="TXT", dns_record_name=dns_name, dns_record_value=listing.verification_code, instructions=f""" To verify ownership of {listing.domain}: 1. Go to your domain registrar's DNS settings 2. Add a new TXT record: - Name/Host: _pounce (or _pounce.{listing.domain}) - Value: {listing.verification_code} - TTL: 300 (or lowest available) 3. Wait 1-5 minutes for DNS propagation 4. Click "Check Verification" to complete This proves you control the domain's DNS, confirming ownership. """.strip(), status=listing.verification_status, ) @router.get("/{id}/verify-dns/check") async def check_dns_verification( id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Check DNS verification status.""" result = await db.execute( select(DomainListing).where( and_( DomainListing.id == id, DomainListing.user_id == current_user.id, ) ) ) listing = result.scalar_one_or_none() if not listing: raise HTTPException(status_code=404, detail="Listing not found") if not listing.verification_code: raise HTTPException(status_code=400, detail="Start verification first") # Check DNS TXT record import dns.resolver try: domain_parts = listing.domain.split('.') if len(domain_parts) > 2: dns_name = f"_pounce.{'.'.join(domain_parts[-2:])}" else: dns_name = f"_pounce.{listing.domain}" answers = dns.resolver.resolve(dns_name, 'TXT') for rdata in answers: txt_value = str(rdata).strip('"') if txt_value == listing.verification_code: # Verified! listing.verification_status = VerificationStatus.VERIFIED.value listing.verified_at = datetime.utcnow() await db.commit() return { "verified": True, "status": "verified", "message": "DNS verification successful! You can now publish your listing.", } # Code not found return { "verified": False, "status": "pending", "message": "TXT record found but value doesn't match. Please check the value.", } except dns.resolver.NXDOMAIN: return { "verified": False, "status": "pending", "message": "DNS record not found. Please add the TXT record and wait for propagation.", } except dns.resolver.NoAnswer: return { "verified": False, "status": "pending", "message": "No TXT record found. Please add the record.", } except Exception as e: logger.error(f"DNS check failed for {listing.domain}: {e}") return { "verified": False, "status": "error", "message": "DNS check failed. Please try again in a few minutes.", }