pounce/backend/app/api/drops.py
Yves Gugger 8dc6f85fb8
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Deploy: 2025-12-19 09:11
2025-12-19 09:11:46 +01:00

334 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Drops API - Zone File Analysis Endpoints
=========================================
API endpoints for accessing freshly dropped domains from:
- Switch.ch zone files (.ch, .li)
- ICANN CZDS zone files (.com, .net, .org, .xyz, .info, .dev, .app, .online)
"""
from datetime import datetime
from typing import Optional
import logging
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from app.database import get_db
from app.api.deps import get_current_user
from app.models.zone_file import DroppedDomain
from app.services.zone_file import (
ZoneFileService,
get_dropped_domains,
get_zone_stats,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/drops", tags=["drops"])
# All supported TLDs
SWITCH_TLDS = ["ch", "li"]
CZDS_TLDS = ["xyz", "org", "online", "info", "dev", "app"] # Approved
CZDS_PENDING = ["com", "net", "club", "biz"] # Pending approval
ALL_TLDS = SWITCH_TLDS + CZDS_TLDS
# ============================================================================
# PUBLIC ENDPOINTS (for stats)
# ============================================================================
@router.get("/stats")
async def api_get_zone_stats(
db: AsyncSession = Depends(get_db)
):
"""
Get zone file statistics.
Returns domain counts and last sync times for .ch and .li.
"""
try:
stats = await get_zone_stats(db)
return stats
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# AUTHENTICATED ENDPOINTS
# ============================================================================
@router.get("")
async def api_get_drops(
tld: Optional[str] = Query(None, description="Filter by TLD"),
hours: int = Query(24, ge=1, le=48, description="Hours to look back (max 48h, we only store 48h)"),
min_length: Optional[int] = Query(None, ge=1, le=63, description="Minimum domain length"),
max_length: Optional[int] = Query(None, ge=1, le=63, description="Maximum domain length"),
exclude_numeric: bool = Query(False, description="Exclude numeric-only domains"),
exclude_hyphen: bool = Query(False, description="Exclude domains with hyphens"),
keyword: Optional[str] = Query(None, description="Search keyword"),
limit: int = Query(50, ge=1, le=200, description="Results per page"),
offset: int = Query(0, ge=0, description="Offset for pagination"),
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
Get recently dropped domains from zone files.
Supports:
- Switch.ch zones: .ch, .li
- ICANN CZDS zones: .xyz, .org, .online, .info, .dev, .app
Domains are detected by comparing daily zone file snapshots.
Only available for authenticated users.
"""
if tld and tld not in ALL_TLDS:
raise HTTPException(
status_code=400,
detail=f"Unsupported TLD. Supported: {', '.join(ALL_TLDS)}"
)
try:
result = await get_dropped_domains(
db=db,
tld=tld,
hours=hours,
min_length=min_length,
max_length=max_length,
exclude_numeric=exclude_numeric,
exclude_hyphen=exclude_hyphen,
keyword=keyword,
limit=limit,
offset=offset
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sync/{tld}")
async def api_trigger_sync(
tld: str,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
Trigger a manual zone file sync for a specific TLD.
Only available for admin users.
This is normally run automatically by the scheduler.
"""
# Check if user is admin
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Admin access required")
if tld not in ALL_TLDS:
raise HTTPException(
status_code=400,
detail=f"Unsupported TLD. Supported: {', '.join(ALL_TLDS)}"
)
async def run_sync():
from app.database import AsyncSessionLocal
async with AsyncSessionLocal() as session:
try:
if tld in SWITCH_TLDS:
# Use Switch.ch zone transfer
service = ZoneFileService()
await service.run_daily_sync(session, tld)
else:
# Use ICANN CZDS
from app.services.czds_client import CZDSClient
client = CZDSClient()
await client.sync_zone(session, tld)
except Exception as e:
print(f"Zone sync failed for .{tld}: {e}")
background_tasks.add_task(run_sync)
return {"status": "sync_started", "tld": tld}
# ============================================================================
# HELPER ENDPOINTS
# ============================================================================
@router.get("/tlds")
async def api_get_supported_tlds():
"""
Get list of supported TLDs for zone file analysis.
"""
return {
"tlds": [
# Switch.ch zones
{"tld": "ch", "name": "Switzerland", "flag": "🇨🇭", "registry": "Switch", "source": "switch"},
{"tld": "li", "name": "Liechtenstein", "flag": "🇱🇮", "registry": "Switch", "source": "switch"},
# ICANN CZDS zones (approved)
{"tld": "xyz", "name": "XYZ", "flag": "🌐", "registry": "XYZ.COM LLC", "source": "czds"},
{"tld": "org", "name": "Organization", "flag": "🏛️", "registry": "PIR", "source": "czds"},
{"tld": "online", "name": "Online", "flag": "💻", "registry": "Radix", "source": "czds"},
{"tld": "info", "name": "Information", "flag": "", "registry": "Afilias", "source": "czds"},
{"tld": "dev", "name": "Developer", "flag": "👨‍💻", "registry": "Google", "source": "czds"},
{"tld": "app", "name": "Application", "flag": "📱", "registry": "Google", "source": "czds"},
],
"pending": [
# CZDS pending approval
{"tld": "com", "name": "Commercial", "flag": "🏢", "registry": "Verisign", "source": "czds"},
{"tld": "net", "name": "Network", "flag": "🌐", "registry": "Verisign", "source": "czds"},
{"tld": "club", "name": "Club", "flag": "🎉", "registry": "GoDaddy", "source": "czds"},
{"tld": "biz", "name": "Business", "flag": "💼", "registry": "GoDaddy", "source": "czds"},
]
}
@router.post("/check-status/{drop_id}")
async def api_check_drop_status(
drop_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
Check the real-time availability status of a dropped domain.
Returns:
- available: Domain can be registered NOW
- pending_delete: Domain is in deletion phase (monitor it!)
- redemption: Domain is in redemption period (owner can recover)
- taken: Domain was re-registered
- unknown: Could not determine status
"""
from app.services.domain_checker import domain_checker
# Get the drop from DB
result = await db.execute(
select(DroppedDomain).where(DroppedDomain.id == drop_id)
)
drop = result.scalar_one_or_none()
if not drop:
raise HTTPException(status_code=404, detail="Drop not found")
full_domain = f"{drop.domain}.{drop.tld}"
try:
# Check with RDAP (not quick mode)
check_result = await domain_checker.check_domain(full_domain, quick=False)
# Determine availability status
if check_result.is_available:
availability_status = "available"
rdap_status = check_result.raw_data.get("rdap_status", []) if check_result.raw_data else []
# Check if it's pending delete (available but not immediately)
if rdap_status and any("pending" in str(s).lower() for s in rdap_status):
availability_status = "pending_delete"
else:
# Domain exists - check specific status
rdap_status = check_result.raw_data.get("rdap_status", []) if check_result.raw_data else []
rdap_str = str(rdap_status).lower()
if "pending delete" in rdap_str or "pendingdelete" in rdap_str:
availability_status = "pending_delete"
elif "redemption" in rdap_str:
availability_status = "redemption"
else:
availability_status = "taken"
# Update the drop in DB
await db.execute(
update(DroppedDomain)
.where(DroppedDomain.id == drop_id)
.values(
availability_status=availability_status,
rdap_status=str(rdap_status) if rdap_status else None,
last_status_check=datetime.utcnow()
)
)
await db.commit()
return {
"id": drop_id,
"domain": full_domain,
"availability_status": availability_status,
"rdap_status": rdap_status,
"is_available": check_result.is_available,
"can_register_now": availability_status == "available",
"can_monitor": availability_status in ("pending_delete", "redemption"),
}
except Exception as e:
logger.error(f"Status check failed for {full_domain}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/monitor/{drop_id}")
async def api_monitor_drop(
drop_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
Add a dropped domain to the Sniper monitoring list.
Will send notification when domain becomes available.
"""
from app.models.sniper_alert import SniperAlert
# Get the drop
result = await db.execute(
select(DroppedDomain).where(DroppedDomain.id == drop_id)
)
drop = result.scalar_one_or_none()
if not drop:
raise HTTPException(status_code=404, detail="Drop not found")
full_domain = f"{drop.domain}.{drop.tld}"
# Check if already monitoring
existing = await db.execute(
select(SniperAlert).where(
SniperAlert.user_id == current_user.id,
SniperAlert.domain == full_domain,
SniperAlert.is_active == True
)
)
if existing.scalar_one_or_none():
return {"status": "already_monitoring", "domain": full_domain}
# Check user limits
from app.api.sniper_alerts import get_user_alert_limit
limit = get_user_alert_limit(current_user)
count_result = await db.execute(
select(SniperAlert).where(
SniperAlert.user_id == current_user.id,
SniperAlert.is_active == True
)
)
current_count = len(count_result.scalars().all())
if current_count >= limit:
raise HTTPException(
status_code=400,
detail=f"Monitor limit reached ({limit}). Upgrade to add more."
)
# Create sniper alert for this drop
alert = SniperAlert(
user_id=current_user.id,
domain=full_domain,
alert_type="drop",
is_active=True,
notify_email=True,
notify_push=True,
)
db.add(alert)
await db.commit()
return {
"status": "monitoring",
"domain": full_domain,
"message": f"You'll be notified when {full_domain} becomes available!"
}