import json
from pathlib import Path
from typing import Any
# ── Catalogue schema ──────────────────────────────────────────────────────────
CATALOGUE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12",
"type": "object",
"required": ["datasets"],
"properties": {
"datasets": {
"type": "array",
"items": {
"type": "object",
"required": ["id", "description", "location", "format",
"schema_version", "crs", "spatial_extent",
"temporal_extent", "columns", "owner"],
"properties": {
"id": {"type": "string"},
"description": {"type": "string"},
"location": {"type": "string"},
"format": {"type": "string",
"enum": ["GeoParquet", "GeoJSON",
"Shapefile", "PostGIS"]},
"schema_version": {"type": "integer", "minimum": 1},
"crs": {"type": "string"},
"spatial_extent": {
"type": "object",
"required": ["minx", "miny", "maxx", "maxy"],
"properties": {
"minx": {"type": "number"},
"miny": {"type": "number"},
"maxx": {"type": "number"},
"maxy": {"type": "number"}
}
},
"temporal_extent": {
"type": "object",
"required": ["start", "end"],
"properties": {
"start": {"type": "string", "format": "date"},
"end": {"type": "string", "format": "date"}
}
},
"columns": {
"type": "array",
"items": {
"type": "object",
"required": ["name", "type", "nullable"],
"properties": {
"name": {"type": "string"},
"type": {"type": "string"},
"nullable": {"type": "boolean"},
"description": {"type": "string"}
}
}
},
"owner": {"type": "string"},
"last_validated": {"type": "string"}
}
}
}
}
}
# ── Parquet type mapping ───────────────────────────────────────────────────────
# Maps catalogue type strings to sets of acceptable pyarrow/pandas dtype strings.
CATALOGUE_TO_ARROW_TYPES: dict[str, set[str]] = {
"int32": {"int32", "int32[pyarrow]"},
"int64": {"int64", "int64[pyarrow]"},
"float32": {"float32", "float[pyarrow]"},
"float64": {"float64", "double[pyarrow]"},
"string": {"object", "string", "large_string", "string[pyarrow]"},
"timestamp": {"datetime64[ns]", "datetime64[us]", "timestamp[us][pyarrow]",
"timestamp[ns][pyarrow]"},
"boolean": {"bool", "bool[pyarrow]"},
"geometry": {"object", "binary", "large_binary"}, # WKB stored as binary
}
# ── Validation ─────────────────────────────────────────────────────────────────
class ValidationError:
"""A single validation finding."""
def __init__(self, level: str, field: str, message: str):
self.level = level # "error" | "warning"
self.field = field
self.message = message
def __repr__(self) -> str:
return f"[{self.level.upper()}] {self.field}: {self.message}"
def validate_geoparquet(
parquet_path: str | Path,
catalogue_path: str | Path,
dataset_id: str,
) -> dict[str, Any]:
"""
Validate a GeoParquet file against a catalogue entry.
Parameters
----------
parquet_path: Path to the .parquet file to validate.
catalogue_path: Path to the JSON catalogue file.
dataset_id: The catalogue dataset ID to validate against.
Returns
-------
A dict with keys:
"dataset_id" : str
"parquet_path": str
"valid" : bool (True only if no errors)
"findings" : list of ValidationError objects
"summary" : str
"""
try:
import pyarrow.parquet as pq
except ImportError:
raise ImportError(
"pyarrow is required for GeoParquet validation. "
"Install it with: pip install pyarrow"
)
findings: list[ValidationError] = []
# ── Load catalogue ──────────────────────────────────────────────────────
with open(catalogue_path) as f:
raw = json.load(f)
datasets_by_id = {d["id"]: d for d in raw.get("datasets", [])}
if dataset_id not in datasets_by_id:
findings.append(ValidationError(
"error", "catalogue",
f"Dataset '{dataset_id}' not found in catalogue at {catalogue_path}. "
f"Available IDs: {list(datasets_by_id.keys())}"
))
return _report(dataset_id, str(parquet_path), findings)
entry = datasets_by_id[dataset_id]
expected_columns = {col["name"]: col for col in entry["columns"]}
expected_extent = entry["spatial_extent"]
# ── Read Parquet schema ─────────────────────────────────────────────────
try:
pf = pq.read_metadata(parquet_path)
schema = pq.read_schema(parquet_path)
except Exception as exc:
findings.append(ValidationError(
"error", "parquet_file",
f"Could not read Parquet file: {exc}"
))
return _report(dataset_id, str(parquet_path), findings)
actual_columns = {field.name: str(field.type) for field in schema}
# ── Check 1: required columns present ──────────────────────────────────
for col_name, col_def in expected_columns.items():
if col_name not in actual_columns:
findings.append(ValidationError(
"error", f"column:{col_name}",
f"Required column '{col_name}' (type: {col_def['type']}) "
"is missing from the Parquet file."
))
# ── Check 2: column types match ─────────────────────────────────────────
for col_name, col_def in expected_columns.items():
if col_name not in actual_columns:
continue # already flagged above
catalogue_type = col_def["type"]
actual_type = actual_columns[col_name]
acceptable = CATALOGUE_TO_ARROW_TYPES.get(catalogue_type, set())
# Normalise: strip whitespace and lower-case for comparison
actual_norm = actual_type.strip().lower()
acceptable_norm = {t.lower() for t in acceptable}
if acceptable_norm and actual_norm not in acceptable_norm:
findings.append(ValidationError(
"error", f"column:{col_name}",
f"Type mismatch: catalogue records '{catalogue_type}', "
f"Parquet file has '{actual_type}'. "
f"Acceptable Arrow types: {sorted(acceptable)}"
))
# ── Check 3: spatial extent ─────────────────────────────────────────────
# GeoParquet stores bbox in file-level metadata under "geo" key
file_meta = pf.metadata.get("geo") if pf.metadata else None
if file_meta is None:
findings.append(ValidationError(
"warning", "spatial_extent",
"No 'geo' metadata key found in Parquet file. "
"Cannot validate spatial extent. "
"File may not be a valid GeoParquet file."
))
else:
try:
geo = json.loads(file_meta)
# GeoParquet spec: geo.columns.<primary>.bbox = [minx, miny, maxx, maxy]
primary_col = geo.get("primary_column", "geometry")
col_meta = geo.get("columns", {}).get(primary_col, {})
file_bbox = col_meta.get("bbox")
if file_bbox is None:
findings.append(ValidationError(
"warning", "spatial_extent",
f"No bbox found for primary geometry column '{primary_col}' "
"in GeoParquet metadata."
))
else:
fminx, fminy, fmaxx, fmaxy = file_bbox
cminx = expected_extent["minx"]
cminy = expected_extent["miny"]
cmaxx = expected_extent["maxx"]
cmaxy = expected_extent["maxy"]
tolerance = 0.001 # degrees — for floating point comparison
if (fminx < cminx - tolerance or fminy < cminy - tolerance or
fmaxx > cmaxx + tolerance or fmaxy > cmaxy + tolerance):
findings.append(ValidationError(
"error", "spatial_extent",
f"File spatial extent [{fminx:.4f}, {fminy:.4f}, "
f"{fmaxx:.4f}, {fmaxy:.4f}] "
f"exceeds catalogued extent [{cminx}, {cminy}, {cmaxx}, {cmaxy}]. "
"Data may be outside the expected geographic region."
))
except (json.JSONDecodeError, KeyError, TypeError) as exc:
findings.append(ValidationError(
"warning", "spatial_extent",
f"Could not parse GeoParquet 'geo' metadata: {exc}"
))
return _report(dataset_id, str(parquet_path), findings)
def _report(dataset_id: str, parquet_path: str,
findings: list[ValidationError]) -> dict[str, Any]:
errors = [f for f in findings if f.level == "error"]
warnings = [f for f in findings if f.level == "warning"]
valid = len(errors) == 0
summary = (
f"{'VALID' if valid else 'INVALID'}: "
f"{len(errors)} error(s), {len(warnings)} warning(s)"
)
return {
"dataset_id": dataset_id,
"parquet_path": parquet_path,
"valid": valid,
"findings": findings,
"summary": summary,
}
# ── Usage example ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
result = validate_geoparquet(
parquet_path = "data/land_cover_aus_2024.parquet",
catalogue_path = "catalogue.json",
dataset_id = "land_cover_aus_2024",
)
print(result["summary"])
for finding in result["findings"]:
print(f" {finding}")