# app.py — complete FastAPI spatial service
# requires: pip install fastapi uvicorn asyncpg redis pydantic
# requires: PostGIS database with parcels table, Redis at localhost:6379
# run with: uvicorn app:app --reload
from __future__ import annotations
import json
from contextlib import asynccontextmanager
from typing import Annotated, Any, Literal, Optional
from urllib.parse import urlencode
import asyncpg
import redis.asyncio as aioredis
from fastapi import Depends, FastAPI, HTTPException, Query, Request, Response
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from starlette.middleware.base import BaseHTTPMiddleware
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DATABASE_URL = "postgresql://user:pass@localhost:5432/spatial_db"
REDIS_URL = "redis://localhost:6379"
CACHE_TTL_S = 60
REQUESTS_PER_MIN = 120
API_TITLE = "Spatial Parcel Service"
API_VERSION = "1.0.0"
# ---------------------------------------------------------------------------
# Database and cache pools
# ---------------------------------------------------------------------------
db_pool: asyncpg.Pool | None = None
redis_client: aioredis.Redis | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global db_pool, redis_client
db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)
redis_client = aioredis.from_url(REDIS_URL, decode_responses=True)
yield
await db_pool.close()
await redis_client.aclose()
app = FastAPI(title=API_TITLE, version=API_VERSION, lifespan=lifespan)
async def get_conn() -> asyncpg.Connection:
async with db_pool.acquire() as conn:
yield conn
async def get_redis() -> aioredis.Redis:
return redis_client
Connection = Annotated[asyncpg.Connection, Depends(get_conn)]
RedisClient = Annotated[aioredis.Redis, Depends(get_redis)]
# ---------------------------------------------------------------------------
# Rate limiting middleware
# ---------------------------------------------------------------------------
class RateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
key = f"ratelimit:{request.client.host}"
r = redis_client
current = await r.incr(key)
if current == 1:
await r.expire(key, 60)
if current > REQUESTS_PER_MIN:
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"},
headers={"Retry-After": "60"},
)
resp = await call_next(request)
resp.headers["X-RateLimit-Limit"] = str(REQUESTS_PER_MIN)
resp.headers["X-RateLimit-Remaining"] = str(max(0, REQUESTS_PER_MIN - current))
return resp
app.add_middleware(RateLimitMiddleware)
# ---------------------------------------------------------------------------
# Pydantic schemas (OGC API Features compatible)
# ---------------------------------------------------------------------------
class Geometry(BaseModel):
type: str
coordinates: Any
class Link(BaseModel):
href: str
rel: str
type: str = "application/geo+json"
class Feature(BaseModel):
type: Literal["Feature"] = "Feature"
id: Optional[str] = None
geometry: Geometry
properties: dict[str, Any]
class FeatureCollection(BaseModel):
type: Literal["FeatureCollection"] = "FeatureCollection"
features: list[Feature]
numberMatched: Optional[int] = None
numberReturned: Optional[int] = None
links: list[Link] = Field(default_factory=list)
class Config:
populate_by_name = True
# ---------------------------------------------------------------------------
# OGC API landing page and conformance
# ---------------------------------------------------------------------------
@app.get("/", summary="Landing page")
async def landing_page(request: Request):
base = str(request.base_url).rstrip("/")
return {
"title": API_TITLE,
"description": "OGC API Features — parcel and zoning data",
"links": [
{"href": f"{base}/conformance", "rel": "conformance"},
{"href": f"{base}/collections", "rel": "data"},
{"href": f"{base}/openapi.json", "rel": "service-desc"},
],
}
@app.get("/conformance", summary="OGC conformance classes")
async def conformance():
return {
"conformsTo": [
"http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/core",
"http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/geojson",
]
}
@app.get("/collections", summary="Available feature collections")
async def collections(request: Request):
base = str(request.base_url).rstrip("/")
return {
"collections": [
{
"id": "parcels",
"title": "Land parcels with zoning",
"links": [{"href": f"{base}/collections/parcels/items",
"rel": "items", "type": "application/geo+json"}],
}
]
}
# ---------------------------------------------------------------------------
# Core endpoint: /features/within (bounding-box feature query)
# ---------------------------------------------------------------------------
@app.get(
"/collections/parcels/items",
response_model=FeatureCollection,
summary="Return parcels within a bounding box (OGC API Features /items)",
)
async def get_features_within(
request: Request,
response: Response,
conn: Connection,
r: RedisClient,
bbox: str = Query(
...,
description="minLon,minLat,maxLon,maxLat in EPSG:4326",
example="-122.45,37.75,-122.40,37.78",
),
limit: int = Query(100, ge=1, le=1000),
cursor: Optional[str] = Query(None, description="Pagination cursor (parcel_id)"),
):
# Parse and validate bbox
try:
min_lon, min_lat, max_lon, max_lat = map(float, bbox.split(","))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid bbox: expected minLon,minLat,maxLon,maxLat")
if not (-180 <= min_lon < max_lon <= 180 and -90 <= min_lat < max_lat <= 90):
raise HTTPException(status_code=400, detail="bbox values out of range")
# Cache key
cursor_part = f":cursor:{cursor}" if cursor else ""
cache_key = f"parcels:{round(min_lon,4)}:{round(min_lat,4)}:{round(max_lon,4)}:{round(max_lat,4)}:limit:{limit}{cursor_part}"
# Cache lookup
cached = await r.get(cache_key)
if cached:
response.headers["X-Cache"] = "HIT"
response.headers["Cache-Control"] = f"public, max-age={CACHE_TTL_S}"
return FeatureCollection(**json.loads(cached))
response.headers["X-Cache"] = "MISS"
# Database query — cursor-based pagination
if cursor:
try:
cursor_id = int(cursor)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid cursor")
rows = await conn.fetch(
"""
SELECT parcel_id, zoning_code, area_m2,
ST_AsGeoJSON(geom)::json AS geometry
FROM parcels_with_zoning
WHERE geom && ST_MakeEnvelope($1, $2, $3, $4, 4326)
AND parcel_id > $5
ORDER BY parcel_id
LIMIT $6
""",
min_lon, min_lat, max_lon, max_lat, cursor_id, limit,
)
else:
rows = await conn.fetch(
"""
SELECT parcel_id, zoning_code, area_m2,
ST_AsGeoJSON(geom)::json AS geometry
FROM parcels_with_zoning
WHERE geom && ST_MakeEnvelope($1, $2, $3, $4, 4326)
ORDER BY parcel_id
LIMIT $2
""",
min_lon, min_lat, max_lon, max_lat, limit,
)
# Total count (for numberMatched; run concurrently in production)
count_row = await conn.fetchrow(
"""
SELECT COUNT(*) AS n
FROM parcels_with_zoning
WHERE geom && ST_MakeEnvelope($1, $2, $3, $4, 4326)
""",
min_lon, min_lat, max_lon, max_lat,
)
features = [
Feature(
id=str(row["parcel_id"]),
geometry=Geometry(**row["geometry"]),
properties={
"parcel_id": row["parcel_id"],
"zoning_code": row["zoning_code"],
"area_m2": float(row["area_m2"]),
},
)
for row in rows
]
# Build next link if there are more results
links = []
base = str(request.base_url).rstrip("/")
if len(rows) == limit:
next_cursor = str(rows[-1]["parcel_id"])
next_params = urlencode({"bbox": bbox, "limit": limit, "cursor": next_cursor})
links.append(Link(
href=f"{base}/collections/parcels/items?{next_params}",
rel="next",
))
result = FeatureCollection(
features=features,
numberMatched=count_row["n"],
numberReturned=len(features),
links=links,
)
# Write to cache
await r.set(cache_key, result.model_dump_json(), ex=CACHE_TTL_S)
response.headers["Cache-Control"] = f"public, max-age={CACHE_TTL_S}"
return result