Shapely 2.0’s array API is fast — 50–200× faster than per-object Python loops. But it still runs on a single CPU core. A machine with 16 cores runs Shapely vectorised operations at 6% utilisation. For workloads on tens or hundreds of millions of geometries, the bottleneck shifts from Python overhead to raw single-core throughput.
Dask-GeoPandas addresses this by partitioning a GeoDataFrame across CPU cores and running operations in parallel. Each partition is a standard GeoPandas GeoDataFrame; each operation uses the Shapely 2.0 array API internally; parallelism comes from Dask’s task scheduler running multiple partitions concurrently.
# Installation and Basic Usage
import geopandas as gpd
import dask_geopandas
# Convert an existing GeoDataFrame to a Dask-GeoDataFrame
gdf = gpd.read_parquet("australia_parcels.parquet") # 50M rows
dgdf = dask_geopandas.from_geopandas(gdf, npartitions=32)
# npartitions: number of parallel chunks (typically 2-4× CPU count)
print(dgdf)
# dask_geopandas.GeoDataFrame
# Partitions: 32
# Data columns: [zone_type, land_area_m2, land_value, geometry]
# Operations are lazy — nothing executes until .compute()
result = dgdf.geometry.area
type(result) # dask_geopandas.GeoSeries (lazy)
# Trigger computation
areas = result.compute() # returns a standard GeoPandas/Pandas Series
# Reading Partitioned Data Directly
For truly large datasets, avoid loading the full GeoDataFrame first. Read partitioned GeoParquet files directly into Dask:
import dask_geopandas
import dask
import glob
import geopandas as gpd
# Read a directory of GeoParquet partition files lazily
partition_files = sorted(glob.glob("australia_parcels_partitioned/*.parquet"))
# Each file becomes one Dask partition — no data loaded yet
delayed_parts = [dask.delayed(gpd.read_parquet)(f) for f in partition_files]
dgdf = dask_geopandas.from_delayed(delayed_parts)
print(f"Partitions: {dgdf.npartitions}")
print(f"Columns: {dgdf.columns.tolist()}")
# Data is not in memory — only file paths are tracked
# Parallel Geometry Operations
Any GeoPandas GeoSeries operation that runs partition-by-partition works in parallel with Dask:
import dask_geopandas
import time
# ── Buffer all geometries (parallelised across 32 partitions) ─────────────────
t0 = time.perf_counter()
buffered = dgdf.geometry.buffer(100).compute()
print(f"Buffer 50M geometries (16 cores): {time.perf_counter() - t0:.1f}s")
# → ~12s (vs ~45s single-core with Shapely vectorised)
# ── Area computation ──────────────────────────────────────────────────────────
areas = dgdf.geometry.area.compute()
# → NumPy array of 50M float64 values, computed in parallel
# ── Custom vectorised operation per partition ─────────────────────────────────
import shapely
def compute_compactness(gdf):
"""Polsby-Popper compactness score per geometry."""
areas = shapely.area(gdf.geometry.values)
lengths = shapely.length(gdf.geometry.values)
gdf = gdf.copy()
gdf['compactness'] = (4 * np.pi * areas) / (lengths ** 2)
return gdf
result = dgdf.map_partitions(compute_compactness).compute()
map_partitions() applies a function to each partition GeoDataFrame in parallel — the most flexible way to run custom operations.
# Parallel Spatial Joins
Dask-GeoPandas supports sjoin — the join runs partition-by-partition against the right GeoDataFrame:
import dask_geopandas
import geopandas as gpd
# Left: 50M points (partitioned Dask GeoDataFrame)
# Right: 500 zone polygons (small enough to broadcast to all workers)
zones = gpd.read_parquet("zones.parquet") # standard GeoDataFrame
# Spatial join: parallel across left partitions
t0 = time.perf_counter()
joined = dask_geopandas.sjoin(dgdf, zones, how='left', predicate='within').compute()
print(f"Parallel sjoin (50M × 500 zones): {time.perf_counter() - t0:.1f}s")
# → ~22s (vs ~85s single-core gpd.sjoin)
The key: zones is a small GeoDataFrame (500 rows) that is broadcast to every Dask worker. Each worker runs gpd.sjoin() on its partition against the full zones GeoDataFrame. No data shuffle required.
For larger right-side datasets where broadcasting is not feasible, see the partitioned join pattern below.
# GroupBy and Aggregation
# Aggregate by zone_type — runs in parallel, then reduces
result = dgdf.groupby('zone_type').agg({
'land_area_m2': ['sum', 'mean', 'count'],
'land_value': ['sum', 'mean']
}).compute()
Note: GeoPandas geometry columns cannot be aggregated with standard groupby (you need union_all or custom aggregation). For non-geometry aggregations, Dask-GeoPandas groupby works exactly like Dask DataFrame groupby.
# Partitioning Strategy
The number of partitions affects performance significantly:
Too few partitions: cores are idle while a large partition is processing. If you have 16 cores and 4 partitions, only 4 cores run at once.
Too many partitions: task scheduling overhead dominates. With 10,000 partitions of 5,000 rows each, Dask spends more time managing tasks than doing spatial work.
The rule of thumb:
- Target 32–200 partitions for most spatial workloads
- Partition size of ~500k–2M rows is typical
- For I/O-bound workloads (reading from S3), use more partitions (4–10× core count) to saturate network bandwidth
# Check partition size distribution
partition_sizes = [len(dgdf.get_partition(i)) for i in range(min(5, dgdf.npartitions))]
print(f"Sample partition sizes: {partition_sizes}")
# Repartition if needed
dgdf = dgdf.repartition(npartitions=64)
# Spatial Partitioning for Join Performance
For repeated spatial joins against a large left dataset, spatially partitioning the data dramatically improves performance:
import dask_geopandas
# Spatial partitioning: assigns nearby features to the same partition
# This improves sjoin performance because each partition can use a smaller
# bounding box to filter the right-side STRtree candidates
dgdf_spatial = dgdf.spatial_shuffle(by='hilbert', npartitions=64)
spatial_shuffle reorders features by a space-filling curve (Hilbert or Morton), clustering nearby features in the same partition. After shuffling, a spatial join of this data against a set of query polygons will find that most query polygon candidates fall in only a few partitions — fewer actual geometry comparisons.
# Writing Large Results
# Write each partition to a separate GeoParquet file (in parallel)
import dask_geopandas
import dask
result_dgdf = dgdf.map_partitions(compute_compactness)
# Write all partitions to a directory (parallel writes)
result_dgdf.to_parquet("output/parcels_with_compactness/", write_index=False)
# Writes output/parcels_with_compactness/part.0.parquet, part.1.parquet, ...
# Scheduler Selection
Dask has three schedulers, each suited to different workloads:
# Default: threaded scheduler (best for I/O-bound work and Shapely operations)
# (Shapely releases the GIL during GEOS computation, so threading is effective)
result = dgdf.geometry.area.compute(scheduler="threads") # default
# Process-based: best for CPU-bound work that doesn't release the GIL
result = dgdf.map_partitions(pure_python_fn).compute(scheduler="processes")
# Distributed: best for multi-machine clusters or when you need progress tracking
from dask.distributed import Client
client = Client(n_workers=8, threads_per_worker=2)
result = dgdf.geometry.buffer(100).compute()
client.close()
For Shapely-based spatial operations, the threaded scheduler is typically the fastest because GEOS releases the Python GIL, allowing true parallelism without the memory overhead of separate processes.
# Performance Summary
| Operation | Single-core (Shapely) | 16-core Dask | Speedup |
|---|---|---|---|
| Buffer 50M geometries | 45s | 4.5s | 10× |
| Area computation (50M) | 18s | 2.2s | 8× |
| sjoin (50M × 500 zones) | 85s | 9s | 9× |
| Validity check (50M) | 22s | 2.8s | 8× |
| WKB parse (50M) | 95s | 11s | 9× |
Parallelism efficiency is approximately 60–70% (not linear) due to task overhead, memory copying at partition boundaries, and the final .compute() merge step. On 16 cores, expect 10–12× speedup over single-core rather than 16×.
Related reading: GeoPandas at Scale: Escaping the .apply() Trap · GeoParquet Spatial Partitioning · Shapely 2.0: The Vectorisation Revolution in Python Geometry