pounce/backend/app/api/yield_webhooks.py
yves.gugger 1705b5cc6e
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
feat: complete Yield feature setup
Backend:
- Add yield_webhooks.py for partner callbacks (generic, Awin, batch import)
- Add yield_routing.py for domain traffic routing with landing pages
- Add DB migrations for yield table indexes
- Add seed script with 30+ Swiss/German affiliate partners
- Register all new routers in API

Frontend:
- Add public /yield landing page with live analyzer demo
- Add Yield to header navigation

Documentation:
- Complete YIELD_SETUP.md with setup guide, API reference, and troubleshooting
2025-12-12 14:52:49 +01:00

458 lines
15 KiB
Python

"""
Webhook endpoints for Yield affiliate partner callbacks.
Partners call these endpoints to report:
- Clicks (redirect happened)
- Leads (form submitted, signup, etc.)
- Sales (purchase completed)
Each partner may have different authentication methods:
- HMAC signature verification
- API key in header
- IP whitelist
"""
import hashlib
import hmac
import json
import logging
from datetime import datetime
from decimal import Decimal
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, Header, BackgroundTasks
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.api.deps import get_db
from app.config import get_settings
from app.models.yield_domain import YieldDomain, YieldTransaction, AffiliatePartner
logger = logging.getLogger(__name__)
settings = get_settings()
router = APIRouter(prefix="/yield-webhooks", tags=["yield-webhooks"])
# Revenue split: User gets 70%, Pounce keeps 30%
USER_REVENUE_SHARE = Decimal("0.70")
# ============================================================================
# Schemas
# ============================================================================
class PartnerEvent(BaseModel):
"""Generic partner event payload."""
event_type: str = Field(..., description="click, lead, or sale")
domain: str = Field(..., description="The yield domain that generated this event")
transaction_id: Optional[str] = Field(None, description="Partner's transaction ID")
amount: Optional[float] = Field(None, description="Gross commission amount")
currency: Optional[str] = Field("CHF", description="Currency code")
# Optional attribution data
geo_country: Optional[str] = None
referrer: Optional[str] = None
user_agent: Optional[str] = None
# Optional metadata
metadata: Optional[dict] = None
class WebhookResponse(BaseModel):
"""Response for webhook calls."""
success: bool
transaction_id: Optional[int] = None
message: str
# ============================================================================
# Signature Verification Helpers
# ============================================================================
def verify_hmac_signature(
payload: bytes,
signature: str,
secret: str,
algorithm: str = "sha256"
) -> bool:
"""Verify HMAC signature for webhook payload."""
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256 if algorithm == "sha256" else hashlib.sha1
).hexdigest()
return hmac.compare_digest(signature, expected)
def hash_ip(ip: str) -> str:
"""Hash IP address for privacy-compliant storage."""
return hashlib.sha256(ip.encode()).hexdigest()[:32]
# ============================================================================
# Generic Webhook Endpoint
# ============================================================================
@router.post("/{partner_slug}", response_model=WebhookResponse)
async def receive_partner_webhook(
partner_slug: str,
event: PartnerEvent,
request: Request,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
x_webhook_signature: Optional[str] = Header(None),
x_api_key: Optional[str] = Header(None),
):
"""
Receive webhook callback from affiliate partner.
Partners POST events here when clicks, leads, or sales occur.
"""
# 1. Find partner
partner = db.query(AffiliatePartner).filter(
AffiliatePartner.slug == partner_slug,
AffiliatePartner.is_active == True,
).first()
if not partner:
logger.warning(f"Webhook from unknown partner: {partner_slug}")
raise HTTPException(status_code=404, detail="Unknown partner")
# 2. Verify authentication (if configured)
# Note: In production, store partner API keys in a secure location
# For now, we accept webhooks if the partner exists
# TODO: Add proper signature verification per partner
# 3. Find yield domain
yield_domain = db.query(YieldDomain).filter(
YieldDomain.domain == event.domain.lower(),
YieldDomain.status == "active",
).first()
if not yield_domain:
logger.warning(f"Webhook for unknown/inactive domain: {event.domain}")
raise HTTPException(status_code=404, detail="Domain not found or inactive")
# 4. Calculate amounts
gross_amount = Decimal(str(event.amount)) if event.amount else Decimal("0")
net_amount = gross_amount * USER_REVENUE_SHARE
# 5. Get client IP for hashing
client_ip = request.client.host if request.client else None
ip_hash = hash_ip(client_ip) if client_ip else None
# 6. Create transaction
transaction = YieldTransaction(
yield_domain_id=yield_domain.id,
event_type=event.event_type,
partner_slug=partner_slug,
partner_transaction_id=event.transaction_id,
gross_amount=gross_amount,
net_amount=net_amount,
currency=event.currency or "CHF",
referrer=event.referrer,
user_agent=event.user_agent,
geo_country=event.geo_country,
ip_hash=ip_hash,
status="pending" if event.event_type in ["lead", "sale"] else "confirmed",
confirmed_at=datetime.utcnow() if event.event_type == "click" else None,
)
db.add(transaction)
# 7. Update domain aggregates
if event.event_type == "click":
yield_domain.total_clicks += 1
yield_domain.last_click_at = datetime.utcnow()
elif event.event_type in ["lead", "sale"]:
yield_domain.total_conversions += 1
yield_domain.last_conversion_at = datetime.utcnow()
# Add revenue when confirmed
if transaction.status == "confirmed":
yield_domain.total_revenue += net_amount
db.commit()
db.refresh(transaction)
logger.info(
f"Webhook processed: {partner_slug} -> {event.domain} "
f"({event.event_type}, gross={gross_amount}, net={net_amount})"
)
return WebhookResponse(
success=True,
transaction_id=transaction.id,
message=f"Event {event.event_type} recorded successfully"
)
# ============================================================================
# Awin-Specific Webhook
# ============================================================================
class AwinEvent(BaseModel):
"""Awin network postback format."""
clickRef: str # Our yield domain ID or domain name
transactionId: str
commission: float
commissionCurrency: str = "CHF"
status: str # "pending", "approved", "declined"
transactionType: str # "sale", "lead"
@router.post("/awin/postback", response_model=WebhookResponse)
async def receive_awin_postback(
event: AwinEvent,
request: Request,
db: Session = Depends(get_db),
x_awin_signature: Optional[str] = Header(None),
):
"""
Receive postback from Awin affiliate network.
Awin sends postbacks for tracked conversions.
"""
# Find domain by click reference
yield_domain = db.query(YieldDomain).filter(
YieldDomain.domain == event.clickRef.lower(),
).first()
if not yield_domain:
# Try to find by ID if clickRef is numeric
try:
domain_id = int(event.clickRef)
yield_domain = db.query(YieldDomain).filter(
YieldDomain.id == domain_id,
).first()
except ValueError:
pass
if not yield_domain:
logger.warning(f"Awin postback for unknown domain: {event.clickRef}")
raise HTTPException(status_code=404, detail="Domain not found")
# Calculate amounts
gross_amount = Decimal(str(event.commission))
net_amount = gross_amount * USER_REVENUE_SHARE
# Map Awin status to our status
status_map = {
"pending": "pending",
"approved": "confirmed",
"declined": "rejected",
}
status = status_map.get(event.status.lower(), "pending")
# Create or update transaction
existing_tx = db.query(YieldTransaction).filter(
YieldTransaction.partner_transaction_id == event.transactionId,
YieldTransaction.partner_slug.like("awin%"),
).first()
if existing_tx:
# Update existing transaction
existing_tx.status = status
if status == "confirmed":
existing_tx.confirmed_at = datetime.utcnow()
yield_domain.total_revenue += net_amount
transaction_id = existing_tx.id
else:
# Create new transaction
transaction = YieldTransaction(
yield_domain_id=yield_domain.id,
event_type="lead" if event.transactionType.lower() == "lead" else "sale",
partner_slug=f"awin_{yield_domain.active_route or 'unknown'}",
partner_transaction_id=event.transactionId,
gross_amount=gross_amount,
net_amount=net_amount,
currency=event.commissionCurrency,
status=status,
confirmed_at=datetime.utcnow() if status == "confirmed" else None,
)
db.add(transaction)
# Update domain stats
yield_domain.total_conversions += 1
yield_domain.last_conversion_at = datetime.utcnow()
if status == "confirmed":
yield_domain.total_revenue += net_amount
db.flush()
transaction_id = transaction.id
db.commit()
logger.info(f"Awin postback processed: {event.transactionId} -> {status}")
return WebhookResponse(
success=True,
transaction_id=transaction_id,
message=f"Awin event processed ({status})"
)
# ============================================================================
# Transaction Confirmation Endpoint (Admin/Internal)
# ============================================================================
@router.post("/confirm/{transaction_id}", response_model=WebhookResponse)
async def confirm_transaction(
transaction_id: int,
db: Session = Depends(get_db),
x_internal_key: Optional[str] = Header(None),
):
"""
Manually confirm a pending transaction.
Internal endpoint for admin use or automated confirmation.
"""
# Basic auth check - in production, use proper admin auth
internal_key = getattr(settings, 'internal_api_key', None) or settings.secret_key
if x_internal_key != internal_key:
raise HTTPException(status_code=401, detail="Unauthorized")
transaction = db.query(YieldTransaction).filter(
YieldTransaction.id == transaction_id,
YieldTransaction.status == "pending",
).first()
if not transaction:
raise HTTPException(status_code=404, detail="Transaction not found or not pending")
# Confirm transaction
transaction.status = "confirmed"
transaction.confirmed_at = datetime.utcnow()
# Update domain revenue
yield_domain = db.query(YieldDomain).filter(
YieldDomain.id == transaction.yield_domain_id
).first()
if yield_domain:
yield_domain.total_revenue += transaction.net_amount
db.commit()
logger.info(f"Transaction {transaction_id} confirmed manually")
return WebhookResponse(
success=True,
transaction_id=transaction_id,
message="Transaction confirmed"
)
# ============================================================================
# Batch Transaction Import (for reconciliation)
# ============================================================================
class BatchTransactionItem(BaseModel):
"""Single transaction in batch import."""
domain: str
event_type: str
partner_slug: str
transaction_id: str
gross_amount: float
currency: str = "CHF"
status: str = "confirmed"
created_at: Optional[str] = None
class BatchImportRequest(BaseModel):
"""Batch transaction import request."""
transactions: list[BatchTransactionItem]
class BatchImportResponse(BaseModel):
"""Batch import response."""
success: bool
imported: int
skipped: int
errors: list[str]
@router.post("/batch-import", response_model=BatchImportResponse)
async def batch_import_transactions(
request_data: BatchImportRequest,
db: Session = Depends(get_db),
x_internal_key: Optional[str] = Header(None),
):
"""
Batch import transactions for reconciliation.
Internal endpoint for importing partner reports.
"""
internal_key = getattr(settings, 'internal_api_key', None) or settings.secret_key
if x_internal_key != internal_key:
raise HTTPException(status_code=401, detail="Unauthorized")
imported = 0
skipped = 0
errors = []
for item in request_data.transactions:
try:
# Find domain
yield_domain = db.query(YieldDomain).filter(
YieldDomain.domain == item.domain.lower(),
).first()
if not yield_domain:
errors.append(f"Domain not found: {item.domain}")
skipped += 1
continue
# Check for duplicate
existing = db.query(YieldTransaction).filter(
YieldTransaction.partner_transaction_id == item.transaction_id,
YieldTransaction.partner_slug == item.partner_slug,
).first()
if existing:
skipped += 1
continue
# Create transaction
gross = Decimal(str(item.gross_amount))
net = gross * USER_REVENUE_SHARE
tx = YieldTransaction(
yield_domain_id=yield_domain.id,
event_type=item.event_type,
partner_slug=item.partner_slug,
partner_transaction_id=item.transaction_id,
gross_amount=gross,
net_amount=net,
currency=item.currency,
status=item.status,
confirmed_at=datetime.utcnow() if item.status == "confirmed" else None,
)
db.add(tx)
# Update domain stats
if item.event_type == "click":
yield_domain.total_clicks += 1
else:
yield_domain.total_conversions += 1
if item.status == "confirmed":
yield_domain.total_revenue += net
imported += 1
except Exception as e:
errors.append(f"Error importing {item.domain}/{item.transaction_id}: {str(e)}")
skipped += 1
db.commit()
return BatchImportResponse(
success=len(errors) == 0,
imported=imported,
skipped=skipped,
errors=errors[:10] # Limit error messages
)