pounce/backend/app/api/listings.py
Yves Gugger 356db5afee
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: Complete mobile redesign for Acquire page - terminal style
2025-12-13 17:54:28 +01:00

936 lines
30 KiB
Python

"""
Domain Listings API - Pounce Marketplace
This implements the "Micro-Marktplatz" from analysis_3.md:
- Create professional "For Sale" landing pages
- DNS verification for ownership
- Contact form for buyers
- Analytics
Endpoints:
- GET /listings - Public: Browse active listings
- GET /listings/{slug} - Public: View listing details
- POST /listings/{slug}/inquire - Public: Contact seller
- POST /listings - Auth: Create new listing
- GET /listings/my - Auth: Get user's listings
- PUT /listings/{id} - Auth: Update listing
- DELETE /listings/{id} - Auth: Delete listing
- POST /listings/{id}/verify-dns - Auth: Start DNS verification
- GET /listings/{id}/verify-dns/check - Auth: Check verification status
"""
import logging
import secrets
import re
from datetime import datetime, timedelta
from typing import Optional, List
from fastapi import APIRouter, Depends, Query, HTTPException, Request
from pydantic import BaseModel, Field, EmailStr
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user, get_current_user_optional
from app.models.user import User
from app.models.listing import DomainListing, ListingInquiry, ListingView, ListingStatus, VerificationStatus
from app.services.valuation import valuation_service
def _calculate_pounce_score(domain: str, is_pounce: bool = True) -> int:
"""
Calculate Pounce Score for a domain.
Uses the same algorithm as Market Feed (_calculate_pounce_score_v2 in auctions.py).
"""
# Parse domain
parts = domain.lower().rsplit(".", 1)
if len(parts) != 2:
return 50
name, tld = parts
score = 50 # Baseline
# A) LENGTH BONUS (exponential for short domains)
length_scores = {1: 50, 2: 45, 3: 40, 4: 30, 5: 20, 6: 15, 7: 10}
score += length_scores.get(len(name), max(0, 15 - len(name)))
# B) TLD PREMIUM
tld_scores = {
'com': 20, 'ai': 25, 'io': 18, 'co': 12,
'ch': 15, 'de': 10, 'net': 8, 'org': 8,
'app': 10, 'dev': 10, 'xyz': 5
}
score += tld_scores.get(tld.lower(), 0)
# C) POUNCE DIRECT BONUS (listings are always Pounce Direct)
if is_pounce:
score += 10
# D) PENALTIES
if '-' in name:
score -= 25
if any(c.isdigit() for c in name) and len(name) > 3:
score -= 20
if len(name) > 15:
score -= 15
# Clamp to 0-100
return max(0, min(100, score))
logger = logging.getLogger(__name__)
router = APIRouter()
# ============== Schemas ==============
class ListingCreate(BaseModel):
"""Create a new domain listing."""
domain: str = Field(..., min_length=3, max_length=255)
title: Optional[str] = Field(None, max_length=200)
description: Optional[str] = None
asking_price: Optional[float] = Field(None, ge=0)
min_offer: Optional[float] = Field(None, ge=0)
currency: str = Field("USD", max_length=3)
price_type: str = Field("negotiable") # fixed, negotiable, make_offer
show_valuation: bool = True
allow_offers: bool = True
class ListingUpdate(BaseModel):
"""Update a listing."""
title: Optional[str] = Field(None, max_length=200)
description: Optional[str] = None
asking_price: Optional[float] = Field(None, ge=0)
min_offer: Optional[float] = Field(None, ge=0)
price_type: Optional[str] = None
show_valuation: Optional[bool] = None
allow_offers: Optional[bool] = None
status: Optional[str] = None
class ListingResponse(BaseModel):
"""Listing response."""
id: int
domain: str
slug: str
title: Optional[str]
description: Optional[str]
asking_price: Optional[float]
min_offer: Optional[float]
currency: str
price_type: str
pounce_score: Optional[int]
estimated_value: Optional[float]
verification_status: str
is_verified: bool
status: str
show_valuation: bool
allow_offers: bool
view_count: int
inquiry_count: int
public_url: str
created_at: datetime
published_at: Optional[datetime]
# Seller info (minimal for privacy)
seller_verified: bool = False
seller_member_since: Optional[datetime] = None
class Config:
from_attributes = True
class ListingPublicResponse(BaseModel):
"""Public listing response (limited info)."""
domain: str
slug: str
title: Optional[str]
description: Optional[str]
asking_price: Optional[float]
currency: str
price_type: str
pounce_score: Optional[int]
estimated_value: Optional[float]
is_verified: bool
allow_offers: bool
public_url: str
# Seller trust indicators
seller_verified: bool
seller_member_since: Optional[datetime]
class Config:
from_attributes = True
class InquiryCreate(BaseModel):
"""Create an inquiry for a listing."""
name: str = Field(..., min_length=2, max_length=100)
email: EmailStr
phone: Optional[str] = Field(None, max_length=50)
company: Optional[str] = Field(None, max_length=200)
message: str = Field(..., min_length=10, max_length=2000)
offer_amount: Optional[float] = Field(None, ge=0)
class InquiryResponse(BaseModel):
"""Inquiry response for listing owner."""
id: int
name: str
email: str
phone: Optional[str]
company: Optional[str]
message: str
offer_amount: Optional[float]
status: str
created_at: datetime
read_at: Optional[datetime]
class Config:
from_attributes = True
class VerificationResponse(BaseModel):
"""DNS verification response."""
verification_code: str
dns_record_type: str = "TXT"
dns_record_name: str
dns_record_value: str
instructions: str
status: str
# ============== Helper Functions ==============
def _generate_slug(domain: str) -> str:
"""Generate URL-friendly slug from domain."""
# Remove TLD for cleaner slug
slug = domain.lower().replace('.', '-')
# Remove any non-alphanumeric chars except hyphens
slug = re.sub(r'[^a-z0-9-]', '', slug)
return slug
def _generate_verification_code() -> str:
"""Generate a unique verification code."""
return f"pounce-verify-{secrets.token_hex(16)}"
# Security: Block phishing keywords (from analysis_3.md - Säule 3)
BLOCKED_KEYWORDS = [
'login', 'bank', 'verify', 'paypal', 'password', 'account',
'credit', 'social security', 'ssn', 'wire', 'transfer'
]
def _check_content_safety(text: str) -> bool:
"""Check if content contains phishing keywords."""
text_lower = text.lower()
return not any(keyword in text_lower for keyword in BLOCKED_KEYWORDS)
# ============== Public Endpoints ==============
@router.get("", response_model=List[ListingPublicResponse])
async def browse_listings(
keyword: Optional[str] = Query(None),
min_price: Optional[float] = Query(None, ge=0),
max_price: Optional[float] = Query(None, ge=0),
verified_only: bool = Query(False),
sort_by: str = Query("newest", enum=["newest", "price_asc", "price_desc", "popular"]),
limit: int = Query(20, le=50),
offset: int = Query(0, ge=0),
db: AsyncSession = Depends(get_db),
):
"""Browse active domain listings (public)."""
query = select(DomainListing).where(
DomainListing.status == ListingStatus.ACTIVE.value
)
if keyword:
query = query.where(DomainListing.domain.ilike(f"%{keyword}%"))
if min_price is not None:
query = query.where(DomainListing.asking_price >= min_price)
if max_price is not None:
query = query.where(DomainListing.asking_price <= max_price)
if verified_only:
query = query.where(
DomainListing.verification_status == VerificationStatus.VERIFIED.value
)
# Sorting
if sort_by == "price_asc":
query = query.order_by(DomainListing.asking_price.asc().nullslast())
elif sort_by == "price_desc":
query = query.order_by(DomainListing.asking_price.desc().nullsfirst())
elif sort_by == "popular":
query = query.order_by(DomainListing.view_count.desc())
else: # newest
query = query.order_by(DomainListing.published_at.desc())
query = query.offset(offset).limit(limit)
result = await db.execute(query)
listings = list(result.scalars().all())
responses = []
for listing in listings:
# Calculate pounce_score dynamically if not stored
pounce_score = listing.pounce_score
if pounce_score is None:
pounce_score = _calculate_pounce_score(listing.domain)
# Save it for future requests
listing.pounce_score = pounce_score
responses.append(ListingPublicResponse(
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=pounce_score, # Always return the score
estimated_value=listing.estimated_value if listing.show_valuation else None,
is_verified=listing.is_verified,
allow_offers=listing.allow_offers,
public_url=listing.public_url,
seller_verified=listing.is_verified,
seller_member_since=listing.user.created_at if listing.user else None,
))
await db.commit() # Save any updated pounce_scores
return responses
# ============== Authenticated Endpoints (before dynamic routes!) ==============
@router.get("/my", response_model=List[ListingResponse])
async def get_my_listings(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get current user's listings."""
result = await db.execute(
select(DomainListing)
.where(DomainListing.user_id == current_user.id)
.order_by(DomainListing.created_at.desc())
)
listings = list(result.scalars().all())
return [
ListingResponse(
id=listing.id,
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
min_offer=listing.min_offer,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=listing.pounce_score,
estimated_value=listing.estimated_value,
verification_status=listing.verification_status,
is_verified=listing.is_verified,
status=listing.status,
show_valuation=listing.show_valuation,
allow_offers=listing.allow_offers,
view_count=listing.view_count,
inquiry_count=listing.inquiry_count,
public_url=listing.public_url,
created_at=listing.created_at,
published_at=listing.published_at,
seller_verified=current_user.is_verified,
seller_member_since=current_user.created_at,
)
for listing in listings
]
# ============== Public Dynamic Routes ==============
@router.get("/{slug}", response_model=ListingPublicResponse)
async def get_listing_by_slug(
slug: str,
request: Request,
db: AsyncSession = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user_optional),
):
"""Get listing details by slug (public)."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.slug == slug,
DomainListing.status == ListingStatus.ACTIVE.value,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
# Record view
view = ListingView(
listing_id=listing.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent", "")[:500],
referrer=request.headers.get("referer", "")[:500],
user_id=current_user.id if current_user else None,
)
db.add(view)
# Increment view count
listing.view_count += 1
# Calculate pounce_score dynamically if not stored (same as Market Feed)
pounce_score = listing.pounce_score
if pounce_score is None:
pounce_score = _calculate_pounce_score(listing.domain)
# Save it for future requests
listing.pounce_score = pounce_score
await db.commit()
return ListingPublicResponse(
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=pounce_score, # Always return the score
estimated_value=listing.estimated_value if listing.show_valuation else None,
is_verified=listing.is_verified,
allow_offers=listing.allow_offers,
public_url=listing.public_url,
seller_verified=listing.is_verified,
seller_member_since=listing.user.created_at if listing.user else None,
)
@router.post("/{slug}/inquire")
async def submit_inquiry(
slug: str,
inquiry: InquiryCreate,
request: Request,
db: AsyncSession = Depends(get_db),
):
"""Submit an inquiry for a listing (public)."""
# Find listing
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.slug == slug,
DomainListing.status == ListingStatus.ACTIVE.value,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
# Security: Check for phishing keywords
if not _check_content_safety(inquiry.message):
raise HTTPException(
status_code=400,
detail="Message contains blocked content. Please revise."
)
# Rate limiting check (simple: max 3 inquiries per email per listing per day)
today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
existing_count = await db.execute(
select(func.count(ListingInquiry.id)).where(
and_(
ListingInquiry.listing_id == listing.id,
ListingInquiry.email == inquiry.email.lower(),
ListingInquiry.created_at >= today_start,
)
)
)
if existing_count.scalar() >= 3:
raise HTTPException(
status_code=429,
detail="Too many inquiries. Please try again tomorrow."
)
# Create inquiry
new_inquiry = ListingInquiry(
listing_id=listing.id,
name=inquiry.name,
email=inquiry.email.lower(),
phone=inquiry.phone,
company=inquiry.company,
message=inquiry.message,
offer_amount=inquiry.offer_amount,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent", "")[:500],
)
db.add(new_inquiry)
# Increment inquiry count
listing.inquiry_count += 1
await db.commit()
# Send email notification to seller
try:
from app.services.email_service import email_service
from app.models.user import User
# Get seller's email
seller_result = await db.execute(
select(User).where(User.id == listing.user_id)
)
seller = seller_result.scalar_one_or_none()
if seller and seller.email and email_service.is_configured():
await email_service.send_listing_inquiry(
to_email=seller.email,
domain=listing.domain,
name=inquiry.name,
email=inquiry.email,
message=inquiry.message,
company=inquiry.company,
offer_amount=inquiry.offer_amount,
)
logger.info(f"📧 Inquiry notification sent to {seller.email} for {listing.domain}")
except Exception as e:
logger.error(f"Failed to send inquiry notification: {e}")
return {
"success": True,
"message": "Your inquiry has been sent to the seller.",
}
# ============== Listing Management (Authenticated) ==============
@router.post("", response_model=ListingResponse)
async def create_listing(
data: ListingCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Create a new domain listing.
SECURITY: Domain must be in user's portfolio before listing for sale.
DNS verification happens in the verification step (separate endpoint).
"""
from app.models.portfolio import PortfolioDomain
domain_lower = data.domain.lower()
# SECURITY CHECK: Domain must be in user's portfolio
portfolio_result = await db.execute(
select(PortfolioDomain).where(
PortfolioDomain.domain == domain_lower,
PortfolioDomain.user_id == current_user.id,
)
)
portfolio_domain = portfolio_result.scalar_one_or_none()
if not portfolio_domain:
raise HTTPException(
status_code=403,
detail="Domain must be in your portfolio before listing for sale. Add it to your portfolio first.",
)
# Check if domain is sold
if portfolio_domain.is_sold:
raise HTTPException(
status_code=400,
detail="Cannot list a sold domain for sale.",
)
# Check if domain is already listed
existing = await db.execute(
select(DomainListing).where(DomainListing.domain == domain_lower)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="This domain is already listed")
# Check user's listing limit based on subscription
user_listings = await db.execute(
select(func.count(DomainListing.id)).where(
DomainListing.user_id == current_user.id
)
)
listing_count = user_listings.scalar() or 0
# Listing limits by tier (from pounce_pricing.md)
# Load subscription separately to avoid async lazy loading issues
from app.models.subscription import Subscription
sub_result = await db.execute(
select(Subscription).where(Subscription.user_id == current_user.id)
)
subscription = sub_result.scalar_one_or_none()
tier = subscription.tier if subscription else "scout"
limits = {"scout": 0, "trader": 5, "tycoon": 50}
max_listings = limits.get(tier, 0)
if listing_count >= max_listings:
raise HTTPException(
status_code=403,
detail=f"Listing limit reached ({max_listings}). Upgrade your plan for more."
)
# Generate slug
slug = _generate_slug(domain_lower)
# Check slug uniqueness
slug_check = await db.execute(
select(DomainListing).where(DomainListing.slug == slug)
)
if slug_check.scalar_one_or_none():
slug = f"{slug}-{secrets.token_hex(4)}"
# Get valuation
try:
valuation = await valuation_service.estimate_value(domain_lower, db, save_result=False)
pounce_score = min(100, int(valuation.get("score", 50)))
estimated_value = valuation.get("value", 0) # Fixed: was 'estimated_value', service returns 'value'
except Exception:
pounce_score = 50
estimated_value = None
# Create listing
listing = DomainListing(
user_id=current_user.id,
domain=domain_lower,
slug=slug,
title=data.title,
description=data.description,
asking_price=data.asking_price,
min_offer=data.min_offer,
currency=data.currency.upper(),
price_type=data.price_type,
show_valuation=data.show_valuation,
allow_offers=data.allow_offers,
pounce_score=pounce_score,
estimated_value=estimated_value,
verification_code=_generate_verification_code(),
status=ListingStatus.DRAFT.value,
)
db.add(listing)
await db.commit()
await db.refresh(listing)
return ListingResponse(
id=listing.id,
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
min_offer=listing.min_offer,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=listing.pounce_score,
estimated_value=listing.estimated_value,
verification_status=listing.verification_status,
is_verified=listing.is_verified,
status=listing.status,
show_valuation=listing.show_valuation,
allow_offers=listing.allow_offers,
view_count=listing.view_count,
inquiry_count=listing.inquiry_count,
public_url=listing.public_url,
created_at=listing.created_at,
published_at=listing.published_at,
seller_verified=current_user.is_verified,
seller_member_since=current_user.created_at,
)
@router.get("/{id}/inquiries", response_model=List[InquiryResponse])
async def get_listing_inquiries(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get inquiries for a listing."""
# Verify ownership
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
inquiries_result = await db.execute(
select(ListingInquiry)
.where(ListingInquiry.listing_id == id)
.order_by(ListingInquiry.created_at.desc())
)
inquiries = list(inquiries_result.scalars().all())
return [
InquiryResponse(
id=inq.id,
name=inq.name,
email=inq.email,
phone=inq.phone,
company=inq.company,
message=inq.message,
offer_amount=inq.offer_amount,
status=inq.status,
created_at=inq.created_at,
read_at=inq.read_at,
)
for inq in inquiries
]
@router.put("/{id}", response_model=ListingResponse)
async def update_listing(
id: int,
data: ListingUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update a listing."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
# Update fields
if data.title is not None:
listing.title = data.title
if data.description is not None:
listing.description = data.description
if data.asking_price is not None:
listing.asking_price = data.asking_price
if data.min_offer is not None:
listing.min_offer = data.min_offer
if data.price_type is not None:
listing.price_type = data.price_type
if data.show_valuation is not None:
listing.show_valuation = data.show_valuation
if data.allow_offers is not None:
listing.allow_offers = data.allow_offers
# Status change
if data.status is not None:
if data.status == "active" and listing.status == "draft":
# Publish listing
if not listing.is_verified:
raise HTTPException(
status_code=400,
detail="Cannot publish without DNS verification"
)
listing.status = ListingStatus.ACTIVE.value
listing.published_at = datetime.utcnow()
elif data.status in ["draft", "sold", "expired"]:
listing.status = data.status
await db.commit()
await db.refresh(listing)
return ListingResponse(
id=listing.id,
domain=listing.domain,
slug=listing.slug,
title=listing.title,
description=listing.description,
asking_price=listing.asking_price,
min_offer=listing.min_offer,
currency=listing.currency,
price_type=listing.price_type,
pounce_score=listing.pounce_score,
estimated_value=listing.estimated_value,
verification_status=listing.verification_status,
is_verified=listing.is_verified,
status=listing.status,
show_valuation=listing.show_valuation,
allow_offers=listing.allow_offers,
view_count=listing.view_count,
inquiry_count=listing.inquiry_count,
public_url=listing.public_url,
created_at=listing.created_at,
published_at=listing.published_at,
seller_verified=current_user.is_verified,
seller_member_since=current_user.created_at,
)
@router.delete("/{id}")
async def delete_listing(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete a listing."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
await db.delete(listing)
await db.commit()
return {"success": True, "message": "Listing deleted"}
@router.post("/{id}/verify-dns", response_model=VerificationResponse)
async def start_dns_verification(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Start DNS verification for a listing."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
# Generate new code if needed
if not listing.verification_code:
listing.verification_code = _generate_verification_code()
listing.verification_status = VerificationStatus.PENDING.value
await db.commit()
# Extract domain root for DNS
domain_parts = listing.domain.split('.')
if len(domain_parts) > 2:
dns_name = f"_pounce.{'.'.join(domain_parts[-2:])}"
else:
dns_name = f"_pounce.{listing.domain}"
return VerificationResponse(
verification_code=listing.verification_code,
dns_record_type="TXT",
dns_record_name=dns_name,
dns_record_value=listing.verification_code,
instructions=f"""
To verify ownership of {listing.domain}:
1. Go to your domain registrar's DNS settings
2. Add a new TXT record:
- Name/Host: _pounce (or _pounce.{listing.domain})
- Value: {listing.verification_code}
- TTL: 300 (or lowest available)
3. Wait 1-5 minutes for DNS propagation
4. Click "Check Verification" to complete
This proves you control the domain's DNS, confirming ownership.
""".strip(),
status=listing.verification_status,
)
@router.get("/{id}/verify-dns/check")
async def check_dns_verification(
id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Check DNS verification status."""
result = await db.execute(
select(DomainListing).where(
and_(
DomainListing.id == id,
DomainListing.user_id == current_user.id,
)
)
)
listing = result.scalar_one_or_none()
if not listing:
raise HTTPException(status_code=404, detail="Listing not found")
if not listing.verification_code:
raise HTTPException(status_code=400, detail="Start verification first")
# Check DNS TXT record
import dns.resolver
try:
domain_parts = listing.domain.split('.')
if len(domain_parts) > 2:
dns_name = f"_pounce.{'.'.join(domain_parts[-2:])}"
else:
dns_name = f"_pounce.{listing.domain}"
answers = dns.resolver.resolve(dns_name, 'TXT')
for rdata in answers:
txt_value = str(rdata).strip('"')
if txt_value == listing.verification_code:
# Verified!
listing.verification_status = VerificationStatus.VERIFIED.value
listing.verified_at = datetime.utcnow()
await db.commit()
return {
"verified": True,
"status": "verified",
"message": "DNS verification successful! You can now publish your listing.",
}
# Code not found
return {
"verified": False,
"status": "pending",
"message": "TXT record found but value doesn't match. Please check the value.",
}
except dns.resolver.NXDOMAIN:
return {
"verified": False,
"status": "pending",
"message": "DNS record not found. Please add the TXT record and wait for propagation.",
}
except dns.resolver.NoAnswer:
return {
"verified": False,
"status": "pending",
"message": "No TXT record found. Please add the record.",
}
except Exception as e:
logger.error(f"DNS check failed for {listing.domain}: {e}")
return {
"verified": False,
"status": "error",
"message": "DNS check failed. Please try again in a few minutes.",
}