feat: phase 2 observability, job queue, load testing, docker hardening

This commit is contained in:
yves.gugger
2025-12-12 12:05:12 +01:00
parent ee4266d8f0
commit 5d23d34a8a
15 changed files with 446 additions and 24 deletions

View File

@ -11,11 +11,12 @@ Provides admin-only access to:
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, status, BackgroundTasks, Depends
from fastapi import APIRouter, HTTPException, status, Depends
from pydantic import BaseModel, EmailStr
from sqlalchemy import select, func, desc
from app.api.deps import Database, get_current_user
from app.config import get_settings
from app.models.user import User
from app.models.subscription import Subscription, SubscriptionTier, SubscriptionStatus, TIER_CONFIG
from app.models.domain import Domain
@ -26,6 +27,7 @@ from app.models.auction import DomainAuction
from app.models.price_alert import PriceAlert
router = APIRouter()
settings = get_settings()
# ============== Admin Authentication ==============
async def require_admin(
@ -606,10 +608,18 @@ async def trigger_tld_scrape(
admin: User = Depends(require_admin),
):
"""Manually trigger a TLD price scrape."""
# Prefer job queue in production (non-blocking)
if settings.enable_job_queue and settings.redis_url:
from app.jobs.client import enqueue_job
job_id = await enqueue_job("scrape_tld_prices")
return {"message": "TLD price scrape enqueued", "job_id": job_id}
# Fallback: run inline
from app.services.tld_scraper.aggregator import tld_aggregator
result = await tld_aggregator.run_scrape(db)
return {
"message": "TLD price scrape completed",
"status": result.status,
@ -1099,7 +1109,6 @@ async def test_external_apis(
@router.post("/trigger-scrape")
async def trigger_auction_scrape(
background_tasks: BackgroundTasks,
db: Database,
admin: User = Depends(require_admin),
):
@ -1110,20 +1119,27 @@ async def trigger_auction_scrape(
1. Try Tier 1 APIs (DropCatch, Sedo) first
2. Fall back to web scraping for others
"""
# Prefer job queue in production (non-blocking)
if settings.enable_job_queue and settings.redis_url:
from app.jobs.client import enqueue_job
job_id = await enqueue_job("scrape_auctions")
return {
"message": "Auction scraping enqueued",
"job_id": job_id,
"note": "Check /admin/scrape-status for results",
}
# Fallback: run inline
from app.services.auction_scraper import AuctionScraperService
scraper = AuctionScraperService()
# Run scraping in background
async def run_scrape():
async with db.begin():
return await scraper.scrape_all_platforms(db)
background_tasks.add_task(run_scrape)
result = await scraper.scrape_all_platforms(db)
return {
"message": "Auction scraping started in background",
"note": "Check /admin/scrape-status for results"
"message": "Auction scraping completed",
"result": result,
"note": "Check /admin/scrape-status for results",
}

View File

@ -33,6 +33,23 @@ class Settings(BaseSettings):
check_minute: int = 0
scheduler_check_interval_hours: int = 24
enable_scheduler: bool = False # Run APScheduler jobs in this process (recommend: separate scheduler process)
# Job Queue / Redis (Phase 2)
redis_url: str = "" # e.g. redis://redis:6379/0
enable_job_queue: bool = False
# Observability (Phase 2)
enable_metrics: bool = True
metrics_path: str = "/metrics"
enable_db_query_metrics: bool = False
# Rate limiting storage (SlowAPI / limits). Use Redis in production.
rate_limit_storage_uri: str = "memory://"
# Database pooling (PostgreSQL)
db_pool_size: int = 5
db_max_overflow: int = 10
db_pool_timeout: int = 30
# =================================
# External API Credentials

View File

@ -7,11 +7,22 @@ from app.config import get_settings
settings = get_settings()
# Create async engine
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
future=True,
)
engine_kwargs = {
"echo": settings.debug,
"future": True,
}
# Production hardening: enable connection pooling for Postgres
if settings.database_url.startswith("postgresql"):
engine_kwargs.update(
{
"pool_size": settings.db_pool_size,
"max_overflow": settings.db_max_overflow,
"pool_timeout": settings.db_pool_timeout,
"pool_pre_ping": True,
}
)
engine = create_async_engine(settings.database_url, **engine_kwargs)
# Create async session factory
AsyncSessionLocal = async_sessionmaker(

View File

@ -0,0 +1,3 @@
"""Async job queue (ARQ / Redis)."""

View File

@ -0,0 +1,38 @@
"""ARQ client helper to enqueue jobs."""
from __future__ import annotations
from typing import Any
from arq.connections import RedisSettings, create_pool
from app.config import get_settings
_pool = None
async def _get_pool():
global _pool
if _pool is not None:
return _pool
settings = get_settings()
if not settings.redis_url:
raise RuntimeError("redis_url is not configured (set REDIS_URL)")
_pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
return _pool
async def enqueue_job(name: str, *args: Any, **kwargs: Any) -> str:
"""
Enqueue a job by name. Returns the job id.
"""
pool = await _get_pool()
job = await pool.enqueue_job(name, *args, **kwargs)
# job may be None if enqueue failed
if job is None:
raise RuntimeError(f"Failed to enqueue job: {name}")
return job.job_id

72
backend/app/jobs/tasks.py Normal file
View File

@ -0,0 +1,72 @@
"""Job functions executed by the ARQ worker."""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import select
from app.database import AsyncSessionLocal, init_db
from app.models.auction import DomainAuction
from app.services.auction_scraper import auction_scraper
from app.services.pounce_score import calculate_pounce_score_v2
from app.services.tld_scraper.aggregator import tld_aggregator
async def scrape_auctions(ctx) -> dict: # arq passes ctx
"""Scrape auctions from all platforms and store results."""
await init_db()
async with AsyncSessionLocal() as db:
result = await auction_scraper.scrape_all_platforms(db)
await db.commit()
return {"status": "ok", "result": result, "timestamp": datetime.utcnow().isoformat()}
async def scrape_tld_prices(ctx) -> dict:
"""Scrape TLD prices from all sources and store results."""
await init_db()
async with AsyncSessionLocal() as db:
result = await tld_aggregator.run_scrape(db)
await db.commit()
return {
"status": "ok",
"tlds_scraped": result.tlds_scraped,
"prices_saved": result.prices_saved,
"sources_succeeded": result.sources_succeeded,
"sources_attempted": result.sources_attempted,
"timestamp": datetime.utcnow().isoformat(),
}
async def backfill_auction_scores(ctx, *, limit: int = 5000) -> dict:
"""
Backfill DomainAuction.pounce_score for legacy rows.
Safe to run multiple times; only fills NULL scores.
"""
await init_db()
updated = 0
async with AsyncSessionLocal() as db:
rows = (
await db.execute(
select(DomainAuction)
.where(DomainAuction.pounce_score == None) # noqa: E711
.limit(limit)
)
).scalars().all()
for auction in rows:
auction.pounce_score = calculate_pounce_score_v2(
auction.domain,
auction.tld,
num_bids=auction.num_bids or 0,
age_years=auction.age_years or 0,
is_pounce=False,
)
updated += 1
await db.commit()
return {"status": "ok", "updated": updated, "timestamp": datetime.utcnow().isoformat()}

View File

@ -0,0 +1,26 @@
"""ARQ worker configuration."""
from __future__ import annotations
from arq.connections import RedisSettings
from app.config import get_settings
from app.jobs import tasks
class WorkerSettings:
"""
Run with:
arq app.jobs.worker.WorkerSettings
"""
settings = get_settings()
redis_settings = RedisSettings.from_dsn(settings.redis_url or "redis://localhost:6379/0")
functions = [
tasks.scrape_auctions,
tasks.scrape_tld_prices,
tasks.backfill_auction_scores,
]

View File

@ -18,6 +18,7 @@ from app.api import api_router
from app.config import get_settings
from app.database import init_db
from app.scheduler import start_scheduler, stop_scheduler
from app.observability.metrics import instrument_app
# Configure logging
logging.basicConfig(
@ -32,7 +33,7 @@ settings = get_settings()
limiter = Limiter(
key_func=get_remote_address,
default_limits=["200/minute"], # Global default
storage_uri="memory://", # In-memory storage (use Redis in production)
storage_uri=settings.rate_limit_storage_uri, # Use Redis in production
)
@ -78,8 +79,8 @@ Domain availability monitoring and portfolio management service.
## Authentication
Most endpoints require authentication via Bearer token.
Get a token via POST /api/v1/auth/login
Most endpoints require authentication via HttpOnly session cookie (recommended).
Login: POST /api/v1/auth/login
## Rate Limits
@ -98,6 +99,10 @@ For API issues, contact support@pounce.ch
redoc_url="/redoc",
)
# Observability (Prometheus metrics)
if settings.enable_metrics:
instrument_app(app, metrics_path=settings.metrics_path, enable_db_metrics=settings.enable_db_query_metrics)
# Add rate limiter to app state
app.state.limiter = limiter

View File

@ -0,0 +1,3 @@
"""Observability helpers (metrics, tracing)."""

View File

@ -0,0 +1,122 @@
"""Prometheus metrics for FastAPI + optional DB query metrics."""
from __future__ import annotations
import time
from typing import Optional
from fastapi import FastAPI, Request, Response
try:
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
except Exception: # pragma: no cover
Counter = None # type: ignore
Histogram = None # type: ignore
generate_latest = None # type: ignore
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4" # type: ignore
_instrumented = False
_db_instrumented = False
def _get_route_template(request: Request) -> str:
route = request.scope.get("route")
if route is not None and hasattr(route, "path"):
return str(route.path)
return request.url.path
def instrument_app(app: FastAPI, *, metrics_path: str = "/metrics", enable_db_metrics: bool = False) -> None:
"""
Add Prometheus request metrics and a `/metrics` endpoint.
- Low-cardinality path labels by using FastAPI route templates.
- Optional SQLAlchemy query timing metrics (off by default).
"""
global _instrumented
if _instrumented:
return
_instrumented = True
if Counter is None or Histogram is None:
# Dependency not installed; keep app working without metrics.
return
http_requests_total = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "path", "status"],
)
http_request_duration_seconds = Histogram(
"http_request_duration_seconds",
"HTTP request duration (seconds)",
["method", "path"],
buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10),
)
@app.middleware("http")
async def _metrics_middleware(request: Request, call_next):
start = time.perf_counter()
response: Optional[Response] = None
try:
response = await call_next(request)
return response
finally:
duration = time.perf_counter() - start
path = _get_route_template(request)
method = request.method
status = str(getattr(response, "status_code", 500))
http_requests_total.labels(method=method, path=path, status=status).inc()
http_request_duration_seconds.labels(method=method, path=path).observe(duration)
@app.get(metrics_path, include_in_schema=False)
async def _metrics_endpoint():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
if enable_db_metrics:
_instrument_db_metrics()
def _instrument_db_metrics() -> None:
"""Attach SQLAlchemy event listeners to track query latencies."""
global _db_instrumented
if _db_instrumented:
return
_db_instrumented = True
if Counter is None or Histogram is None:
return
from sqlalchemy import event
from app.database import engine
db_queries_total = Counter(
"db_queries_total",
"Total DB queries executed",
["dialect"],
)
db_query_duration_seconds = Histogram(
"db_query_duration_seconds",
"DB query duration (seconds)",
["dialect"],
buckets=(0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5),
)
dialect = engine.sync_engine.dialect.name
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def _before_cursor_execute(conn, cursor, statement, parameters, context, executemany): # type: ignore[no-untyped-def]
conn.info.setdefault("_query_start_time", []).append(time.perf_counter())
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def _after_cursor_execute(conn, cursor, statement, parameters, context, executemany): # type: ignore[no-untyped-def]
start_list = conn.info.get("_query_start_time") or []
if not start_list:
return
start = start_list.pop()
duration = time.perf_counter() - start
db_queries_total.labels(dialect=dialect).inc()
db_query_duration_seconds.labels(dialect=dialect).observe(duration)

View File

@ -45,5 +45,12 @@ stripe>=7.0.0
# Rate Limiting
slowapi>=0.1.9
# Observability (Prometheus)
prometheus-client>=0.20.0
# Job Queue (Redis)
arq>=0.26.0
redis>=5.0.0
# Production Database (optional)
# asyncpg>=0.30.0 # Already included above

View File

@ -18,6 +18,20 @@ services:
timeout: 5s
retries: 5
# Redis (job queue + rate limiting storage)
redis:
image: redis:7-alpine
container_name: pounce-redis
restart: unless-stopped
command: ["redis-server", "--appendonly", "yes"]
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
# FastAPI Backend
backend:
build:
@ -32,9 +46,15 @@ services:
SECRET_KEY: ${SECRET_KEY:-change-this-in-production}
CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:3000}
ENABLE_SCHEDULER: "false"
ENABLE_JOB_QUEUE: "true"
REDIS_URL: redis://redis:6379/0
RATE_LIMIT_STORAGE_URI: redis://redis:6379/0
ENABLE_METRICS: "true"
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
@ -57,6 +77,26 @@ services:
condition: service_healthy
command: ["python", "run_scheduler.py"]
# Worker (ARQ / Redis job queue)
worker:
build:
context: ./backend
dockerfile: Dockerfile
container_name: pounce-worker
restart: unless-stopped
environment:
DATABASE_URL: postgresql+asyncpg://pounce:${DB_PASSWORD:-changeme}@db:5432/pounce
SECRET_KEY: ${SECRET_KEY:-change-this-in-production}
ENABLE_SCHEDULER: "false"
ENABLE_JOB_QUEUE: "true"
REDIS_URL: redis://redis:6379/0
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
command: ["arq", "app.jobs.worker.WorkerSettings"]
# Next.js Frontend
frontend:
build:
@ -73,4 +113,5 @@ services:
volumes:
postgres_data:
redis_data:

View File

@ -1,7 +1,7 @@
/** @type {import('next').NextConfig} */
const nextConfig = {
reactStrictMode: true,
// output: 'standalone', // Only needed for Docker deployment
output: 'standalone',
// Performance & SEO optimizations
poweredByHeader: false, // Remove X-Powered-By header for security

24
loadtest/README.md Normal file
View File

@ -0,0 +1,24 @@
## Load testing
This folder contains lightweight load test scaffolding to validate API performance regressions.
### k6 (recommended)
1. Install k6 (macOS):
```bash
brew install k6
```
2. Run the smoke test against a running stack:
```bash
BASE_URL=http://localhost:8000 k6 run loadtest/k6/api-smoke.js
```
### Notes
- The scripts assume the FastAPI backend is reachable at `BASE_URL` and the API prefix is `/api/v1`.
- For authenticated endpoints, extend the script to login first and replay the cookie.

37
loadtest/k6/api-smoke.js Normal file
View File

@ -0,0 +1,37 @@
import http from 'k6/http'
import { check, sleep } from 'k6'
export const options = {
vus: 5,
duration: '30s',
thresholds: {
http_req_failed: ['rate<0.01'],
http_req_duration: ['p(95)<800'], // p95 < 800ms for these lightweight endpoints
},
}
const BASE_URL = __ENV.BASE_URL || 'http://localhost:8000'
export default function () {
// Health
const health = http.get(`${BASE_URL}/health`)
check(health, {
'health status 200': (r) => r.status === 200,
})
// Public: auctions feed (anonymous)
const feed = http.get(`${BASE_URL}/api/v1/auctions/feed?source=external&limit=20&offset=0&sort_by=score`)
check(feed, {
'feed status 200': (r) => r.status === 200,
})
// Public: trending TLDs
const trending = http.get(`${BASE_URL}/api/v1/tld-prices/trending`)
check(trending, {
'trending status 200': (r) => r.status === 200,
})
sleep(1)
}