pounce/backend/app/api/drops.py
Yves Gugger 93a18820c2
Some checks failed
CI / Frontend Lint & Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Backend Lint (push) Has been cancelled
CI / Backend Tests (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
Deploy / Build & Push Images (push) Has been cancelled
Deploy / Deploy to Server (push) Has been cancelled
Deploy / Notify (push) Has been cancelled
Deploy: 2025-12-19 09:35
2025-12-19 09:35:11 +01:00

295 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Drops API - Zone File Analysis Endpoints
=========================================
API endpoints for accessing freshly dropped domains from:
- Switch.ch zone files (.ch, .li)
- ICANN CZDS zone files (.com, .net, .org, .xyz, .info, .dev, .app, .online)
"""
from datetime import datetime
from typing import Optional
import logging
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from app.database import get_db
from app.api.deps import get_current_user
from app.models.zone_file import DroppedDomain
from app.services.zone_file import (
ZoneFileService,
get_dropped_domains,
get_zone_stats,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/drops", tags=["drops"])
# All supported TLDs
SWITCH_TLDS = ["ch", "li"]
CZDS_TLDS = ["xyz", "org", "online", "info", "dev", "app"] # Approved
CZDS_PENDING = ["com", "net", "club", "biz"] # Pending approval
ALL_TLDS = SWITCH_TLDS + CZDS_TLDS
# ============================================================================
# PUBLIC ENDPOINTS (for stats)
# ============================================================================
@router.get("/stats")
async def api_get_zone_stats(
db: AsyncSession = Depends(get_db)
):
"""
Get zone file statistics.
Returns domain counts and last sync times for .ch and .li.
"""
try:
stats = await get_zone_stats(db)
return stats
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# AUTHENTICATED ENDPOINTS
# ============================================================================
@router.get("")
async def api_get_drops(
tld: Optional[str] = Query(None, description="Filter by TLD"),
hours: int = Query(24, ge=1, le=48, description="Hours to look back (max 48h, we only store 48h)"),
min_length: Optional[int] = Query(None, ge=1, le=63, description="Minimum domain length"),
max_length: Optional[int] = Query(None, ge=1, le=63, description="Maximum domain length"),
exclude_numeric: bool = Query(False, description="Exclude numeric-only domains"),
exclude_hyphen: bool = Query(False, description="Exclude domains with hyphens"),
keyword: Optional[str] = Query(None, description="Search keyword"),
limit: int = Query(50, ge=1, le=200, description="Results per page"),
offset: int = Query(0, ge=0, description="Offset for pagination"),
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
Get recently dropped domains from zone files.
Supports:
- Switch.ch zones: .ch, .li
- ICANN CZDS zones: .xyz, .org, .online, .info, .dev, .app
Domains are detected by comparing daily zone file snapshots.
Only available for authenticated users.
"""
if tld and tld not in ALL_TLDS:
raise HTTPException(
status_code=400,
detail=f"Unsupported TLD. Supported: {', '.join(ALL_TLDS)}"
)
try:
result = await get_dropped_domains(
db=db,
tld=tld,
hours=hours,
min_length=min_length,
max_length=max_length,
exclude_numeric=exclude_numeric,
exclude_hyphen=exclude_hyphen,
keyword=keyword,
limit=limit,
offset=offset
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sync/{tld}")
async def api_trigger_sync(
tld: str,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
Trigger a manual zone file sync for a specific TLD.
Only available for admin users.
This is normally run automatically by the scheduler.
"""
# Check if user is admin
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Admin access required")
if tld not in ALL_TLDS:
raise HTTPException(
status_code=400,
detail=f"Unsupported TLD. Supported: {', '.join(ALL_TLDS)}"
)
async def run_sync():
from app.database import AsyncSessionLocal
async with AsyncSessionLocal() as session:
try:
if tld in SWITCH_TLDS:
# Use Switch.ch zone transfer
service = ZoneFileService()
await service.run_daily_sync(session, tld)
else:
# Use ICANN CZDS
from app.services.czds_client import CZDSClient
client = CZDSClient()
await client.sync_zone(session, tld)
except Exception as e:
print(f"Zone sync failed for .{tld}: {e}")
background_tasks.add_task(run_sync)
return {"status": "sync_started", "tld": tld}
# ============================================================================
# HELPER ENDPOINTS
# ============================================================================
@router.get("/tlds")
async def api_get_supported_tlds():
"""
Get list of supported TLDs for zone file analysis.
"""
return {
"tlds": [
# Switch.ch zones
{"tld": "ch", "name": "Switzerland", "flag": "🇨🇭", "registry": "Switch", "source": "switch"},
{"tld": "li", "name": "Liechtenstein", "flag": "🇱🇮", "registry": "Switch", "source": "switch"},
# ICANN CZDS zones (approved)
{"tld": "xyz", "name": "XYZ", "flag": "🌐", "registry": "XYZ.COM LLC", "source": "czds"},
{"tld": "org", "name": "Organization", "flag": "🏛️", "registry": "PIR", "source": "czds"},
{"tld": "online", "name": "Online", "flag": "💻", "registry": "Radix", "source": "czds"},
{"tld": "info", "name": "Information", "flag": "", "registry": "Afilias", "source": "czds"},
{"tld": "dev", "name": "Developer", "flag": "👨‍💻", "registry": "Google", "source": "czds"},
{"tld": "app", "name": "Application", "flag": "📱", "registry": "Google", "source": "czds"},
],
"pending": [
# CZDS pending approval
{"tld": "com", "name": "Commercial", "flag": "🏢", "registry": "Verisign", "source": "czds"},
{"tld": "net", "name": "Network", "flag": "🌐", "registry": "Verisign", "source": "czds"},
{"tld": "club", "name": "Club", "flag": "🎉", "registry": "GoDaddy", "source": "czds"},
{"tld": "biz", "name": "Business", "flag": "💼", "registry": "GoDaddy", "source": "czds"},
]
}
@router.post("/check-status/{drop_id}")
async def api_check_drop_status(
drop_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
Check the real-time availability status of a dropped domain.
Returns:
- available: Domain can be registered NOW
- dropping_soon: Domain is in deletion phase (track it!)
- taken: Domain was re-registered
- unknown: Could not determine status
"""
from app.services.drop_status_checker import check_drop_status
# Get the drop from DB
result = await db.execute(
select(DroppedDomain).where(DroppedDomain.id == drop_id)
)
drop = result.scalar_one_or_none()
if not drop:
raise HTTPException(status_code=404, detail="Drop not found")
full_domain = f"{drop.domain}.{drop.tld}"
try:
# Check with dedicated drop status checker
status_result = await check_drop_status(full_domain)
# Update the drop in DB
await db.execute(
update(DroppedDomain)
.where(DroppedDomain.id == drop_id)
.values(
availability_status=status_result.status,
rdap_status=str(status_result.rdap_status) if status_result.rdap_status else None,
last_status_check=datetime.utcnow()
)
)
await db.commit()
return {
"id": drop_id,
"domain": full_domain,
"status": status_result.status,
"rdap_status": status_result.rdap_status,
"can_register_now": status_result.can_register_now,
"should_track": status_result.should_monitor,
"message": status_result.message,
}
except Exception as e:
logger.error(f"Status check failed for {full_domain}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/track/{drop_id}")
async def api_track_drop(
drop_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
Add a dropped domain to the user's Watchlist.
Will send notification when domain becomes available.
This is the same as adding to watchlist, but optimized for drops.
"""
from app.models.domain import Domain, DomainStatus
# Get the drop
result = await db.execute(
select(DroppedDomain).where(DroppedDomain.id == drop_id)
)
drop = result.scalar_one_or_none()
if not drop:
raise HTTPException(status_code=404, detail="Drop not found")
full_domain = f"{drop.domain}.{drop.tld}"
# Check if already in watchlist
existing = await db.execute(
select(Domain).where(
Domain.user_id == current_user.id,
Domain.name == full_domain
)
)
if existing.scalar_one_or_none():
return {"status": "already_tracking", "domain": full_domain}
# Add to watchlist with notification enabled
domain = Domain(
user_id=current_user.id,
name=full_domain,
status=DomainStatus.AVAILABLE if drop.availability_status == 'available' else DomainStatus.UNKNOWN,
is_available=drop.availability_status == 'available',
notify_on_available=True, # Enable notification!
)
db.add(domain)
await db.commit()
return {
"status": "tracking",
"domain": full_domain,
"message": f"Added {full_domain} to your Watchlist. You'll be notified when available!"
}