pounce/backend/app/api/listings.py
yves.gugger 18d50e96f4 feat: Add For Sale Marketplace + Sniper Alerts
BACKEND - New Models:
- DomainListing: For sale landing pages with DNS verification
- ListingInquiry: Contact form submissions from buyers
- ListingView: Analytics tracking
- SniperAlert: Hyper-personalized auction filters
- SniperAlertMatch: Matched auctions for alerts

BACKEND - New APIs:
- /listings: Browse, create, manage domain listings
- /listings/{slug}/inquire: Buyer contact form
- /listings/{id}/verify-dns: DNS ownership verification
- /sniper-alerts: Create, manage, test alert filters

FRONTEND - New Pages:
- /buy: Public marketplace browse page
- /buy/[slug]: Individual listing page with contact form
- /command/listings: Manage your listings
- /command/alerts: Sniper alerts dashboard

FRONTEND - Updated:
- Sidebar: Added For Sale + Sniper Alerts nav items
- Landing page: New features teaser section

DOCS:
- DATABASE_MIGRATIONS.md: Complete SQL for new tables

From analysis_3.md:
- Strategie 2: Micro-Marktplatz (For Sale Pages)
- Strategie 4: Alerts nach Maß (Sniper Alerts)
- Säule 2: DNS Ownership Verification
2025-12-10 11:44:56 +01:00

815 lines
26 KiB
Python

"""
Domain Listings API - Pounce Marketplace
This implements the "Micro-Marktplatz" from analysis_3.md:
- Create professional "For Sale" landing pages
- DNS verification for ownership
- Contact form for buyers
- Analytics
Endpoints:
- GET /listings - Public: Browse active listings
- GET /listings/{slug} - Public: View listing details
- POST /listings/{slug}/inquire - Public: Contact seller
- POST /listings - Auth: Create new listing
- GET /listings/my - Auth: Get user's listings
- PUT /listings/{id} - Auth: Update listing
- DELETE /listings/{id} - Auth: Delete listing
- POST /listings/{id}/verify-dns - Auth: Start DNS verification
- GET /listings/{id}/verify-dns/check - Auth: Check verification status
"""
import logging
import secrets
import re
from datetime import datetime, timedelta
from typing import Optional, List
from fastapi import APIRouter, Depends, Query, HTTPException, Request
from pydantic import BaseModel, Field, EmailStr
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user, get_current_user_optional
from app.models.user import User
from app.models.listing import DomainListing, ListingInquiry, ListingView, ListingStatus, VerificationStatus
from app.services.valuation import valuation_service
logger = logging.getLogger(__name__)
router = APIRouter()
# ============== Schemas ==============
class ListingCreate(BaseModel):
"""Create a new domain listing."""
domain: str = Field(..., min_length=3, max_length=255)
title: Optional[str] = Field(None, max_length=200)
description: Optional[str] = None
asking_price: Optional[float] = Field(None, ge=0)
min_offer: Optional[float] = Field(None, ge=0)
currency: str = Field("USD", max_length=3)
price_type: str = Field("negotiable") # fixed, negotiable, make_offer
show_valuation: bool = True
allow_offers: bool = True
class ListingUpdate(BaseModel):
"""Update a listing."""
title: Optional[str] = Field(None, max_length=200)
description: Optional[str] = None
asking_price: Optional[float] = Field(None, ge=0)
min_offer: Optional[float] = Field(None, ge=0)
price_type: Optional[str] = None
show_valuation: Optional[bool] = None
allow_offers: Optional[bool] = None
status: Optional[str] = None
class ListingResponse(BaseModel):
"""Listing response."""
id: int
domain: str
slug: str
title: Optional[str]
description: Optional[str]
asking_price: Optional[float]
min_offer: Optional[float]
currency: str
price_type: str
pounce_score: Optional[int]
estimated_value: Optional[float]
verification_status: str
is_verified: bool
status: str
show_valuation: bool
allow_offers: bool
view_count: int
inquiry_count: int
public_url: str
created_at: datetime
published_at: Optional[datetime]
# Seller info (minimal for privacy)
seller_verified: bool = False
seller_member_since: Optional[datetime] = None
class Config:
from_attributes = True
class ListingPublicResponse(BaseModel):
"""Public listing response (limited info)."""
domain: str
slug: str
title: Optional[str]
description: Optional[str]
asking_price: Optional[float]
currency: str
price_type: str
pounce_score: Optional[int]
estimated_value: Optional[float]
is_verified: bool
allow_offers: bool
public_url: str
# Seller trust indicators
seller_verified: bool
seller_member_since: Optional[datetime]
class Config:
from_attributes = True
class InquiryCreate(BaseModel):
"""Create an inquiry for a listing."""
name: str = Field(..., min_length=2, max_length=100)
email: EmailStr
phone: Optional[str] = Field(None, max_length=50)
company: Optional[str] = Field(None, max_length=200)
message: str = Field(..., min_length=10, max_length=2000)
offer_amount: Optional[float] = Field(None, ge=0)
class InquiryResponse(BaseModel):
"""Inquiry response for listing owner."""
id: int
name: str
email: str
phone: Optional[str]
company: Optional[str]
message: str
offer_amount: Optional[float]
status: str
created_at: datetime
read_at: Optional[datetime]
class Config:
from_attributes = True
class VerificationResponse(BaseModel):
"""DNS verification response."""
verification_code: str
dns_record_type: str = "TXT"
dns_record_name: str
dns_record_value: str
instructions: str
status: str
# ============== Helper Functions ==============
def _generate_slug(domain: str) -> str:
"""Generate URL-friendly slug from domain."""
# Remove TLD for cleaner slug
slug = domain.lower().replace('.', '-')
# Remove any non-alphanumeric chars except hyphens
slug = re.sub(r'[^a-z0-9-]', '', slug)
return slug
def _generate_verification_code() -> str:
"""Generate a unique verification code."""
return f"pounce-verify-{secrets.token_hex(16)}"
# Security: Block phishing keywords (from analysis_3.md - Säule 3)
BLOCKED_KEYWORDS = [
'login', 'bank', 'verify', 'paypal', 'password', 'account',
'credit', 'social security', 'ssn', 'wire', 'transfer'
]
def _check_content_safety(text: str) -> bool:
"""Check if content contains phishing keywords."""
text_lower = text.lower()
return not any(keyword in text_lower for keyword in BLOCKED_KEYWORDS)
# ============== Public Endpoints ==============
@router.get("", response_model=List[ListingPublicResponse])
async def browse_listings(
keyword: Optional[str] = Query(None),
min_price: Optional[float] = Query(None, ge=0),
max_price: Optional[float] = Query(None, ge=0),
verified_only: bool = Query(False),
sort_by: str = Query("newest", enum=["newest", "price_asc", "price_desc", "popular"]),
limit: int = Query(20, le=50),
offset: int = Query(0, ge=0),
db: AsyncSession = Depends(get_db),
):
"""Browse active domain listings (public)."""
query = select(DomainListing).where(
DomainListing.status == ListingStatus.ACTIVE.value
)
if keyword:
query = query.where(DomainListing.domain.ilike(f"%{keyword}%"))
if min_price is not None:
query = query.where(DomainListing.asking_price >= min_price)
if max_price is not None:
query = query.where(DomainListing.asking_price <= max_price)
if verified_only:
query = query.where(
DomainListing.verification_status == VerificationStatus.VERIFIED.value
)
# Sorting
if sort_by == "price_asc":
query = query.order_by(DomainListing.asking_price.asc().nullslast())
elif sort_by == "price_desc":
query = query.order_by(DomainListing.asking_price.desc().nullsfirst())
elif sort_by == "popular":
query = query.order_by(DomainListing.view_count.desc())
else: # newest
query = query.order_by(DomainListing.published_at.desc())
query = query.offset(offset).limit(limit)
result = await db.execute(query)
listings = list(result.scalars().all())
responses = []
for listing in listings:
responses.append(ListingPublicResponse(
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=listing.pounce_score if listing.show_valuation else None,
estimated_value=listing.estimated_value if listing.show_valuation else None,
is_verified=listing.is_verified,
allow_offers=listing.allow_offers,
public_url=listing.public_url,
seller_verified=listing.is_verified,
seller_member_since=listing.user.created_at if listing.user else None,
))
return responses
@router.get("/{slug}", response_model=ListingPublicResponse)
async def get_listing_by_slug(
slug: str,
request: Request,
db: AsyncSession = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user_optional),
):
"""Get listing details by slug (public)."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.slug == slug,
DomainListing.status == ListingStatus.ACTIVE.value,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
# Record view
view = ListingView(
listing_id=listing.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent", "")[:500],
referrer=request.headers.get("referer", "")[:500],
user_id=current_user.id if current_user else None,
)
db.add(view)
# Increment view count
listing.view_count += 1
await db.commit()
return ListingPublicResponse(
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=listing.pounce_score if listing.show_valuation else None,
estimated_value=listing.estimated_value if listing.show_valuation else None,
is_verified=listing.is_verified,
allow_offers=listing.allow_offers,
public_url=listing.public_url,
seller_verified=listing.is_verified,
seller_member_since=listing.user.created_at if listing.user else None,
)
@router.post("/{slug}/inquire")
async def submit_inquiry(
slug: str,
inquiry: InquiryCreate,
request: Request,
db: AsyncSession = Depends(get_db),
):
"""Submit an inquiry for a listing (public)."""
# Find listing
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.slug == slug,
DomainListing.status == ListingStatus.ACTIVE.value,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
# Security: Check for phishing keywords
if not _check_content_safety(inquiry.message):
raise HTTPException(
status_code=400,
detail="Message contains blocked content. Please revise."
)
# Rate limiting check (simple: max 3 inquiries per email per listing per day)
today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
existing_count = await db.execute(
select(func.count(ListingInquiry.id)).where(
and_(
ListingInquiry.listing_id == listing.id,
ListingInquiry.email == inquiry.email.lower(),
ListingInquiry.created_at >= today_start,
)
)
)
if existing_count.scalar() >= 3:
raise HTTPException(
status_code=429,
detail="Too many inquiries. Please try again tomorrow."
)
# Create inquiry
new_inquiry = ListingInquiry(
listing_id=listing.id,
name=inquiry.name,
email=inquiry.email.lower(),
phone=inquiry.phone,
company=inquiry.company,
message=inquiry.message,
offer_amount=inquiry.offer_amount,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent", "")[:500],
)
db.add(new_inquiry)
# Increment inquiry count
listing.inquiry_count += 1
await db.commit()
# TODO: Send email notification to seller
return {
"success": True,
"message": "Your inquiry has been sent to the seller.",
}
# ============== Authenticated Endpoints ==============
@router.post("", response_model=ListingResponse)
async def create_listing(
data: ListingCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new domain listing."""
# Check if domain is already listed
existing = await db.execute(
select(DomainListing).where(DomainListing.domain == data.domain.lower())
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="This domain is already listed")
# Check user's listing limit based on subscription
user_listings = await db.execute(
select(func.count(DomainListing.id)).where(
DomainListing.user_id == current_user.id
)
)
listing_count = user_listings.scalar() or 0
# Listing limits by tier
tier = current_user.subscription.tier if current_user.subscription else "scout"
limits = {"scout": 2, "trader": 10, "tycoon": 50}
max_listings = limits.get(tier, 2)
if listing_count >= max_listings:
raise HTTPException(
status_code=403,
detail=f"Listing limit reached ({max_listings}). Upgrade your plan for more."
)
# Generate slug
slug = _generate_slug(data.domain)
# Check slug uniqueness
slug_check = await db.execute(
select(DomainListing).where(DomainListing.slug == slug)
)
if slug_check.scalar_one_or_none():
slug = f"{slug}-{secrets.token_hex(4)}"
# Get valuation
try:
valuation = await valuation_service.estimate_value(data.domain, db, save_result=False)
pounce_score = min(100, int(valuation.get("score", 50)))
estimated_value = valuation.get("estimated_value", 0)
except Exception:
pounce_score = 50
estimated_value = None
# Create listing
listing = DomainListing(
user_id=current_user.id,
domain=data.domain.lower(),
slug=slug,
title=data.title,
description=data.description,
asking_price=data.asking_price,
min_offer=data.min_offer,
currency=data.currency.upper(),
price_type=data.price_type,
show_valuation=data.show_valuation,
allow_offers=data.allow_offers,
pounce_score=pounce_score,
estimated_value=estimated_value,
verification_code=_generate_verification_code(),
status=ListingStatus.DRAFT.value,
)
db.add(listing)
await db.commit()
await db.refresh(listing)
return ListingResponse(
id=listing.id,
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
min_offer=listing.min_offer,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=listing.pounce_score,
estimated_value=listing.estimated_value,
verification_status=listing.verification_status,
is_verified=listing.is_verified,
status=listing.status,
show_valuation=listing.show_valuation,
allow_offers=listing.allow_offers,
view_count=listing.view_count,
inquiry_count=listing.inquiry_count,
public_url=listing.public_url,
created_at=listing.created_at,
published_at=listing.published_at,
seller_verified=current_user.is_verified,
seller_member_since=current_user.created_at,
)
@router.get("/my", response_model=List[ListingResponse])
async def get_my_listings(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get current user's listings."""
result = await db.execute(
select(DomainListing)
.where(DomainListing.user_id == current_user.id)
.order_by(DomainListing.created_at.desc())
)
listings = list(result.scalars().all())
return [
ListingResponse(
id=listing.id,
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
min_offer=listing.min_offer,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=listing.pounce_score,
estimated_value=listing.estimated_value,
verification_status=listing.verification_status,
is_verified=listing.is_verified,
status=listing.status,
show_valuation=listing.show_valuation,
allow_offers=listing.allow_offers,
view_count=listing.view_count,
inquiry_count=listing.inquiry_count,
public_url=listing.public_url,
created_at=listing.created_at,
published_at=listing.published_at,
seller_verified=current_user.is_verified,
seller_member_since=current_user.created_at,
)
for listing in listings
]
@router.get("/{id}/inquiries", response_model=List[InquiryResponse])
async def get_listing_inquiries(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get inquiries for a listing."""
# Verify ownership
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
inquiries_result = await db.execute(
select(ListingInquiry)
.where(ListingInquiry.listing_id == id)
.order_by(ListingInquiry.created_at.desc())
)
inquiries = list(inquiries_result.scalars().all())
return [
InquiryResponse(
id=inq.id,
name=inq.name,
email=inq.email,
phone=inq.phone,
company=inq.company,
message=inq.message,
offer_amount=inq.offer_amount,
status=inq.status,
created_at=inq.created_at,
read_at=inq.read_at,
)
for inq in inquiries
]
@router.put("/{id}", response_model=ListingResponse)
async def update_listing(
id: int,
data: ListingUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update a listing."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
# Update fields
if data.title is not None:
listing.title = data.title
if data.description is not None:
listing.description = data.description
if data.asking_price is not None:
listing.asking_price = data.asking_price
if data.min_offer is not None:
listing.min_offer = data.min_offer
if data.price_type is not None:
listing.price_type = data.price_type
if data.show_valuation is not None:
listing.show_valuation = data.show_valuation
if data.allow_offers is not None:
listing.allow_offers = data.allow_offers
# Status change
if data.status is not None:
if data.status == "active" and listing.status == "draft":
# Publish listing
if not listing.is_verified:
raise HTTPException(
status_code=400,
detail="Cannot publish without DNS verification"
)
listing.status = ListingStatus.ACTIVE.value
listing.published_at = datetime.utcnow()
elif data.status in ["draft", "sold", "expired"]:
listing.status = data.status
await db.commit()
await db.refresh(listing)
return ListingResponse(
id=listing.id,
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
min_offer=listing.min_offer,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=listing.pounce_score,
estimated_value=listing.estimated_value,
verification_status=listing.verification_status,
is_verified=listing.is_verified,
status=listing.status,
show_valuation=listing.show_valuation,
allow_offers=listing.allow_offers,
view_count=listing.view_count,
inquiry_count=listing.inquiry_count,
public_url=listing.public_url,
created_at=listing.created_at,
published_at=listing.published_at,
seller_verified=current_user.is_verified,
seller_member_since=current_user.created_at,
)
@router.delete("/{id}")
async def delete_listing(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete a listing."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
await db.delete(listing)
await db.commit()
return {"success": True, "message": "Listing deleted"}
@router.post("/{id}/verify-dns", response_model=VerificationResponse)
async def start_dns_verification(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Start DNS verification for a listing."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
# Generate new code if needed
if not listing.verification_code:
listing.verification_code = _generate_verification_code()
listing.verification_status = VerificationStatus.PENDING.value
await db.commit()
# Extract domain root for DNS
domain_parts = listing.domain.split('.')
if len(domain_parts) > 2:
dns_name = f"_pounce.{'.'.join(domain_parts[-2:])}"
else:
dns_name = f"_pounce.{listing.domain}"
return VerificationResponse(
verification_code=listing.verification_code,
dns_record_type="TXT",
dns_record_name=dns_name,
dns_record_value=listing.verification_code,
instructions=f"""
To verify ownership of {listing.domain}:
1. Go to your domain registrar's DNS settings
2. Add a new TXT record:
- Name/Host: _pounce (or _pounce.{listing.domain})
- Value: {listing.verification_code}
- TTL: 300 (or lowest available)
3. Wait 1-5 minutes for DNS propagation
4. Click "Check Verification" to complete
This proves you control the domain's DNS, confirming ownership.
""".strip(),
status=listing.verification_status,
)
@router.get("/{id}/verify-dns/check")
async def check_dns_verification(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Check DNS verification status."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
if not listing.verification_code:
raise HTTPException(status_code=400, detail="Start verification first")
# Check DNS TXT record
import dns.resolver
try:
domain_parts = listing.domain.split('.')
if len(domain_parts) > 2:
dns_name = f"_pounce.{'.'.join(domain_parts[-2:])}"
else:
dns_name = f"_pounce.{listing.domain}"
answers = dns.resolver.resolve(dns_name, 'TXT')
for rdata in answers:
txt_value = str(rdata).strip('"')
if txt_value == listing.verification_code:
# Verified!
listing.verification_status = VerificationStatus.VERIFIED.value
listing.verified_at = datetime.utcnow()
await db.commit()
return {
"verified": True,
"status": "verified",
"message": "DNS verification successful! You can now publish your listing.",
}
# Code not found
return {
"verified": False,
"status": "pending",
"message": "TXT record found but value doesn't match. Please check the value.",
}
except dns.resolver.NXDOMAIN:
return {
"verified": False,
"status": "pending",
"message": "DNS record not found. Please add the TXT record and wait for propagation.",
}
except dns.resolver.NoAnswer:
return {
"verified": False,
"status": "pending",
"message": "No TXT record found. Please add the record.",
}
except Exception as e:
logger.error(f"DNS check failed for {listing.domain}: {e}")
return {
"verified": False,
"status": "error",
"message": "DNS check failed. Please try again in a few minutes.",
}