Shapely 2.0’s array API is fast — 50–200× faster than per-object Python loops. But it still runs on a single CPU core. A machine with 16 cores runs Shapely vectorised operations at 6% utilisation. For workloads on tens or hundreds of millions of geometries, the bottleneck shifts from Python overhead to raw single-core throughput.

Dask-GeoPandas addresses this by partitioning a GeoDataFrame across CPU cores and running operations in parallel. Each partition is a standard GeoPandas GeoDataFrame; each operation uses the Shapely 2.0 array API internally; parallelism comes from Dask’s task scheduler running multiple partitions concurrently.

# Installation and Basic Usage

import geopandas as gpd
import dask_geopandas

# Convert an existing GeoDataFrame to a Dask-GeoDataFrame
gdf = gpd.read_parquet("australia_parcels.parquet")   # 50M rows
dgdf = dask_geopandas.from_geopandas(gdf, npartitions=32)
# npartitions: number of parallel chunks (typically 2-4× CPU count)

print(dgdf)
# dask_geopandas.GeoDataFrame
# Partitions: 32
# Data columns: [zone_type, land_area_m2, land_value, geometry]

# Operations are lazy — nothing executes until .compute()
result = dgdf.geometry.area
type(result)   # dask_geopandas.GeoSeries (lazy)

# Trigger computation
areas = result.compute()   # returns a standard GeoPandas/Pandas Series

# Reading Partitioned Data Directly

For truly large datasets, avoid loading the full GeoDataFrame first. Read partitioned GeoParquet files directly into Dask:

import dask_geopandas
import dask
import glob
import geopandas as gpd

# Read a directory of GeoParquet partition files lazily
partition_files = sorted(glob.glob("australia_parcels_partitioned/*.parquet"))

# Each file becomes one Dask partition — no data loaded yet
delayed_parts = [dask.delayed(gpd.read_parquet)(f) for f in partition_files]
dgdf = dask_geopandas.from_delayed(delayed_parts)

print(f"Partitions: {dgdf.npartitions}")
print(f"Columns: {dgdf.columns.tolist()}")
# Data is not in memory — only file paths are tracked

# Parallel Geometry Operations

Any GeoPandas GeoSeries operation that runs partition-by-partition works in parallel with Dask:

import dask_geopandas
import time

# ── Buffer all geometries (parallelised across 32 partitions) ─────────────────
t0 = time.perf_counter()
buffered = dgdf.geometry.buffer(100).compute()
print(f"Buffer 50M geometries (16 cores): {time.perf_counter() - t0:.1f}s")
# → ~12s (vs ~45s single-core with Shapely vectorised)

# ── Area computation ──────────────────────────────────────────────────────────
areas = dgdf.geometry.area.compute()
# → NumPy array of 50M float64 values, computed in parallel

# ── Custom vectorised operation per partition ─────────────────────────────────
import shapely

def compute_compactness(gdf):
    """Polsby-Popper compactness score per geometry."""
    areas   = shapely.area(gdf.geometry.values)
    lengths = shapely.length(gdf.geometry.values)
    gdf = gdf.copy()
    gdf['compactness'] = (4 * np.pi * areas) / (lengths ** 2)
    return gdf

result = dgdf.map_partitions(compute_compactness).compute()

map_partitions() applies a function to each partition GeoDataFrame in parallel — the most flexible way to run custom operations.

# Parallel Spatial Joins

Dask-GeoPandas supports sjoin — the join runs partition-by-partition against the right GeoDataFrame:

import dask_geopandas
import geopandas as gpd

# Left: 50M points (partitioned Dask GeoDataFrame)
# Right: 500 zone polygons (small enough to broadcast to all workers)
zones = gpd.read_parquet("zones.parquet")   # standard GeoDataFrame

# Spatial join: parallel across left partitions
t0 = time.perf_counter()
joined = dask_geopandas.sjoin(dgdf, zones, how='left', predicate='within').compute()
print(f"Parallel sjoin (50M × 500 zones): {time.perf_counter() - t0:.1f}s")
# → ~22s (vs ~85s single-core gpd.sjoin)

The key: zones is a small GeoDataFrame (500 rows) that is broadcast to every Dask worker. Each worker runs gpd.sjoin() on its partition against the full zones GeoDataFrame. No data shuffle required.

For larger right-side datasets where broadcasting is not feasible, see the partitioned join pattern below.

# GroupBy and Aggregation

# Aggregate by zone_type — runs in parallel, then reduces
result = dgdf.groupby('zone_type').agg({
    'land_area_m2': ['sum', 'mean', 'count'],
    'land_value': ['sum', 'mean']
}).compute()

Note: GeoPandas geometry columns cannot be aggregated with standard groupby (you need union_all or custom aggregation). For non-geometry aggregations, Dask-GeoPandas groupby works exactly like Dask DataFrame groupby.

# Partitioning Strategy

The number of partitions affects performance significantly:

Too few partitions: cores are idle while a large partition is processing. If you have 16 cores and 4 partitions, only 4 cores run at once.

Too many partitions: task scheduling overhead dominates. With 10,000 partitions of 5,000 rows each, Dask spends more time managing tasks than doing spatial work.

The rule of thumb:

  • Target 32–200 partitions for most spatial workloads
  • Partition size of ~500k–2M rows is typical
  • For I/O-bound workloads (reading from S3), use more partitions (4–10× core count) to saturate network bandwidth
# Check partition size distribution
partition_sizes = [len(dgdf.get_partition(i)) for i in range(min(5, dgdf.npartitions))]
print(f"Sample partition sizes: {partition_sizes}")

# Repartition if needed
dgdf = dgdf.repartition(npartitions=64)

# Spatial Partitioning for Join Performance

For repeated spatial joins against a large left dataset, spatially partitioning the data dramatically improves performance:

import dask_geopandas

# Spatial partitioning: assigns nearby features to the same partition
# This improves sjoin performance because each partition can use a smaller
# bounding box to filter the right-side STRtree candidates
dgdf_spatial = dgdf.spatial_shuffle(by='hilbert', npartitions=64)

spatial_shuffle reorders features by a space-filling curve (Hilbert or Morton), clustering nearby features in the same partition. After shuffling, a spatial join of this data against a set of query polygons will find that most query polygon candidates fall in only a few partitions — fewer actual geometry comparisons.

# Writing Large Results

# Write each partition to a separate GeoParquet file (in parallel)
import dask_geopandas
import dask

result_dgdf = dgdf.map_partitions(compute_compactness)

# Write all partitions to a directory (parallel writes)
result_dgdf.to_parquet("output/parcels_with_compactness/", write_index=False)
# Writes output/parcels_with_compactness/part.0.parquet, part.1.parquet, ...

# Scheduler Selection

Dask has three schedulers, each suited to different workloads:

# Default: threaded scheduler (best for I/O-bound work and Shapely operations)
# (Shapely releases the GIL during GEOS computation, so threading is effective)
result = dgdf.geometry.area.compute(scheduler="threads")   # default

# Process-based: best for CPU-bound work that doesn't release the GIL
result = dgdf.map_partitions(pure_python_fn).compute(scheduler="processes")

# Distributed: best for multi-machine clusters or when you need progress tracking
from dask.distributed import Client
client = Client(n_workers=8, threads_per_worker=2)
result = dgdf.geometry.buffer(100).compute()
client.close()

For Shapely-based spatial operations, the threaded scheduler is typically the fastest because GEOS releases the Python GIL, allowing true parallelism without the memory overhead of separate processes.

# Performance Summary

Operation Single-core (Shapely) 16-core Dask Speedup
Buffer 50M geometries 45s 4.5s 10×
Area computation (50M) 18s 2.2s
sjoin (50M × 500 zones) 85s 9s
Validity check (50M) 22s 2.8s
WKB parse (50M) 95s 11s

Parallelism efficiency is approximately 60–70% (not linear) due to task overhead, memory copying at partition boundaries, and the final .compute() merge step. On 16 cores, expect 10–12× speedup over single-core rather than 16×.


Related reading: GeoPandas at Scale: Escaping the .apply() Trap · GeoParquet Spatial Partitioning · Shapely 2.0: The Vectorisation Revolution in Python Geometry