import functools
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
import geopandas as gpd
import numpy as np
import pandas as pd
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from pyproj import CRS
from shapely import Geometry
from shapely import box
from shapely import extract_unique_points
from shapely.geometry import Polygon
from ..parallel.parallel import Parallel
from .conversion import to_bbox
from .conversion import to_gdf
from .general import clean_clip
from .general import get_common_crs
[docs]
@dataclass
class Gridlooper:
"""Run functions in a loop cellwise based on a grid.
Args:
gridsize: Size of the grid cells in units of the crs (meters, degrees).
mask: Geometry object to create a grid around.
gridbuffer: Units to buffer each gridcell by. For edge cases.
Defaults to 0.
clip: If True (default) geometries are clipped by the grid cells.
If False, all geometries that intersect will be selected in each iteration.
verbose: Whether to print progress. Defaults to False.
keep_geom_type: Whether to keep only the input geometry types after clipping.
Defaults to True.
Examples:
---------
Get some points and some polygons.
>>> import sgis as sg
>>> points = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/points_oslo.parquet")
>>> points["idx"] = points.index
>>> buffered = sg.buff(points, 100)
>>> buffered
idx geometry
0 0 POLYGON ((263222.700 6651184.900, 263222.651 6...
1 1 POLYGON ((272556.100 6653369.500, 272556.051 6...
2 2 POLYGON ((270182.300 6653032.700, 270182.251 6...
3 3 POLYGON ((259904.800 6650339.700, 259904.751 6...
4 4 POLYGON ((272976.200 6652889.100, 272976.151 6...
.. ... ...
995 995 POLYGON ((266901.700 6647844.500, 266901.651 6...
996 996 POLYGON ((261374.000 6653593.400, 261373.951 6...
997 997 POLYGON ((263642.900 6645427.000, 263642.851 6...
998 998 POLYGON ((269326.700 6650628.000, 269326.651 6...
999 999 POLYGON ((264670.300 6644239.500, 264670.251 6...
[1000 rows x 2 columns]
Instantiate a gridlooper.
>>> looper = sg.Gridlooper(gridsize=200, mask=buffered, concat=True, parallelizer=sg.Parallel(1, backend="multiprocessing"))
Run the function clean_overlay in a gridloop.
>>> results = looper.run(
... sg.clean_overlay,
... points,
... buffered,
... )
>>> results
idx_1 idx_2 geometry
0 220 220 POINT (254575.200 6661631.500)
1 735 735 POINT (256337.400 6649931.700)
2 575 575 POINT (256369.200 6650413.300)
3 39 39 POINT (256142.300 6650526.300)
4 235 235 POINT (256231.300 6650720.200)
... ... ... ...
1481 711 795 POINT (272845.500 6655048.800)
1482 711 711 POINT (272845.500 6655048.800)
1483 757 757 POINT (273507.600 6652806.600)
1484 457 457 POINT (273524.400 6652979.900)
1485 284 284 POINT (273650.800 6653000.500)
[1486 rows x 3 columns]
"""
gridsize: int
mask: GeoDataFrame | GeoSeries | Geometry
gridbuffer: int = 0
parallelizer: Parallel | None = None
concat: bool = False
clip: bool = True
keep_geom_type: bool = True
verbose: bool = False
def __post_init__(self) -> None:
"""Fix types."""
if not isinstance(self.mask, GeoDataFrame):
self.mask = to_gdf(self.mask)
[docs]
def run(self, func: Callable, *args, **kwargs) -> Any | list[Any]:
"""Run a function for each grid cell in a loop.
Returns a list of the return values from the function,
or a (Geo)DataFrame if self.concat is True.
"""
def _intersects_mask(df: GeoDataFrame) -> pd.Series:
return df.index.isin(df.sjoin(self.mask).index)
grid: GeoSeries = (
make_grid(self.mask, gridsize=self.gridsize).loc[_intersects_mask].geometry
)
n = len(grid)
buffered_grid = grid.buffer(self.gridbuffer, resolution=1, join_style=2)
if self.parallelizer is not None:
func_with_clip = functools.partial(
_clip_and_run_func,
func=func,
args=args,
kwargs=kwargs,
keep_geom_type=self.keep_geom_type,
clip=self.clip,
)
results = self.parallelizer.map(func_with_clip, buffered_grid)
if not self.gridbuffer or not self.clip:
return self._return(results, args, kwargs)
out = []
for cell_res, unbuffered in zip(results, grid, strict=True):
out.append(
_clip_back_to_unbuffered_grid(
cell_res, unbuffered, self.keep_geom_type
)
)
return self._return(out, args, kwargs)
results = []
for i, (unbuffered, buffered) in enumerate(
zip(grid, buffered_grid, strict=False)
):
cell_kwargs = {
key: _clip_if_isinstance(
value, buffered, self.keep_geom_type, self.clip
)
for key, value in kwargs.items()
}
cell_args = tuple(
_clip_if_isinstance(value, buffered, self.keep_geom_type, self.clip)
for value in args
)
cell_res = func(*cell_args, **cell_kwargs)
# clip back to original
if self.gridbuffer and self.clip:
cell_res = _clip_back_to_unbuffered_grid(
cell_res, unbuffered, self.keep_geom_type
)
results.append(cell_res)
if self.verbose:
print(f"Done with {i+1} of {n} grid cells", end="\r")
return self._return(results, args, kwargs)
def _return(
self, results: list[Any], args: tuple[Any], kwargs: dict[str, Any]
) -> list[Any] | GeoDataFrame:
if self.concat and len(results):
return pd.concat(results, ignore_index=True)
elif self.concat:
crs = get_common_crs(list(args) + list(kwargs.values()))
return GeoDataFrame({"geometry": []}, crs=crs)
else:
return results
[docs]
def gridloop(
func: Callable,
gridsize: int,
mask: GeoDataFrame | GeoSeries | Geometry,
gridbuffer: int = 0,
clip: bool = True,
keep_geom_type: bool = True,
verbose: bool = False,
args: tuple | None = None,
kwargs: dict | None = None,
parallelizer: Parallel | None = None,
) -> list[Any]:
"""Runs a function in a loop cellwise based on a grid.
Creates grid from a mask, and runs the function for each cell
with all GeoDataFrame keyword arguments clipped to the cell
extent.
Args:
func: Function to run cellwise.
mask: Geometry object to create a grid around.
gridsize: Size of the grid cells in units of the crs (meters, degrees).
gridbuffer: Units to buffer each gridcell by. For edge cases.
Defaults to 0.
clip: If True (default) geometries are clipped by the grid cells.
If False, all geometries that intersect will be selected in each iteration.
verbose: Whether to print progress. Defaults to False.
keep_geom_type: Whether to keep only the input geometry types after clipping.
Defaults to True.
args: Positional arguments to pass to the function. Arguments
of type GeoDataFrame or GeoSeries will be clipped by the grid cells in
a loop.
kwargs: Keyword arguments to pass to the function. Arguments
of type GeoDataFrame or GeoSeries will be clipped by the grid cells in
a loop.
parallelizer: Optional instance of sgis.Parallel, to run the function in parallel.
Returns:
List of results with the same length as number of grid cells.
Raises:
TypeError: If args or kwargs has a wrong type
Examples:
---------
Get some points and some polygons.
>>> import sgis as sg
>>> points = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/points_oslo.parquet")
>>> points["idx"] = points.index
>>> buffered = sg.buff(points, 100)
>>> buffered
idx geometry
0 0 POLYGON ((263222.700 6651184.900, 263222.651 6...
1 1 POLYGON ((272556.100 6653369.500, 272556.051 6...
2 2 POLYGON ((270182.300 6653032.700, 270182.251 6...
3 3 POLYGON ((259904.800 6650339.700, 259904.751 6...
4 4 POLYGON ((272976.200 6652889.100, 272976.151 6...
.. ... ...
995 995 POLYGON ((266901.700 6647844.500, 266901.651 6...
996 996 POLYGON ((261374.000 6653593.400, 261373.951 6...
997 997 POLYGON ((263642.900 6645427.000, 263642.851 6...
998 998 POLYGON ((269326.700 6650628.000, 269326.651 6...
999 999 POLYGON ((264670.300 6644239.500, 264670.251 6...
[1000 rows x 2 columns]
Run the function clean_overlay where the data is clipped to a grid
of 1000x1000 meters. Args are the first two arguments of clean_overlay,
kwargs are additional keyword arguments.
>>> resultslist = sg.gridloop(
... func=sg.clean_overlay,
... mask=buffered,
... gridsize=1000,
... args=(points, buffered),
... kwargs={"how": "intersection"}
... )
>>> type(resultslist)
list
>>> results = pd.concat(resultslist, ignore_index=True)
>>> results
idx_1 idx_2 geometry
0 220 220 POINT (254575.200 6661631.500)
1 735 735 POINT (256337.400 6649931.700)
2 575 575 POINT (256369.200 6650413.300)
3 39 39 POINT (256142.300 6650526.300)
4 235 235 POINT (256231.300 6650720.200)
... ... ... ...
1481 711 795 POINT (272845.500 6655048.800)
1482 711 711 POINT (272845.500 6655048.800)
1483 757 757 POINT (273507.600 6652806.600)
1484 457 457 POINT (273524.400 6652979.900)
1485 284 284 POINT (273650.800 6653000.500)
[1486 rows x 3 columns]
"""
if kwargs is None:
kwargs = {}
elif not isinstance(kwargs, dict):
raise TypeError("kwargs should be a dict")
if args is None:
args = ()
elif not isinstance(args, tuple):
raise TypeError("args should be a tuple")
if not isinstance(mask, GeoDataFrame):
mask = to_gdf(mask)
def _intersects_mask(df: GeoDataFrame) -> pd.Series:
return df.index.isin(df.sjoin(mask).index)
grid: GeoSeries = make_grid(mask, gridsize=gridsize).loc[_intersects_mask].geometry
n = len(grid)
buffered_grid = grid.buffer(gridbuffer, resolution=1, join_style=2)
if parallelizer is not None:
func_with_clip = functools.partial(
_clip_and_run_func,
func=func,
args=args,
kwargs=kwargs,
keep_geom_type=keep_geom_type,
clip=clip,
)
results = parallelizer.map(func_with_clip, buffered_grid)
if not gridbuffer or not clip:
return results
out = []
for cell_res, unbuffered in zip(results, grid, strict=True):
out.append(
_clip_back_to_unbuffered_grid(cell_res, unbuffered, keep_geom_type)
)
return out
results = []
for i, (unbuffered, buffered) in enumerate(zip(grid, buffered_grid, strict=False)):
cell_kwargs = {
key: _clip_if_isinstance(value, buffered, keep_geom_type, clip)
for key, value in kwargs.items()
}
cell_args = tuple(
_clip_if_isinstance(value, buffered, keep_geom_type, clip) for value in args
)
cell_res = func(*cell_args, **cell_kwargs)
# clip back to original
if gridbuffer and clip:
cell_res = _clip_back_to_unbuffered_grid(
cell_res, unbuffered, keep_geom_type
)
results.append(cell_res)
if verbose:
print(f"Done with {i+1} of {n} grid cells", end="\r")
return results
def _clip_and_run_func(
grid_cell: Polygon,
func: Callable,
args: tuple,
kwargs: dict,
keep_geom_type: bool,
clip: bool,
) -> Any:
cell_args = tuple(
_clip_if_isinstance(value, grid_cell, keep_geom_type, clip) for value in args
)
cell_kwargs = {
key: _clip_if_isinstance(value, grid_cell, keep_geom_type, clip)
for key, value in kwargs.items()
}
return func(*cell_args, **cell_kwargs)
def _clip_if_isinstance(
value: Any, cell: Geometry, keep_geom_type: bool, clip: bool
) -> Any:
if not isinstance(value, (gpd.GeoDataFrame | gpd.GeoSeries | Geometry)):
return value
if isinstance(value, (gpd.GeoDataFrame | gpd.GeoSeries)):
if clip:
return clean_clip(value, cell, keep_geom_type=keep_geom_type)
return value.loc[value.intersects(cell)]
return value.intersection(cell).make_valid()
def _clip_back_to_unbuffered_grid(
results: Any, mask: GeoDataFrame, keep_geom_type: bool
) -> Any:
if isinstance(results, (gpd.GeoDataFrame | gpd.GeoSeries | Geometry)):
return _clip_if_isinstance(results, mask, keep_geom_type, clip=True)
elif isinstance(results, (pd.DataFrame | pd.Series | np.ndarray)):
return results
try:
for key, value in results.items():
results[key] = _clip_if_isinstance(value, mask, keep_geom_type, clip=True)
except AttributeError:
try:
return [
_clip_if_isinstance(res, mask, keep_geom_type, clip=True)
for res in results
]
except TypeError:
pass
return results
[docs]
def make_grid_from_bbox(
minx: int | float,
miny: int | float,
maxx: int | float,
maxy: int | float,
*_,
gridsize: int | float,
crs: CRS | int | str,
) -> GeoDataFrame:
"""Creates a polygon grid from a bounding box.
Creates a GeoDataFrame of grid cells of a given size within the given
maxumum and mimimum x and y values.
Args:
minx: Minumum x coordinate.
miny: Minumum y coordinate.
maxx: Maximum x coordinate.
maxy: Maximum y coordinate.
gridsize: Length of the grid walls.
crs: Coordinate reference system.
Returns:
GeoDataFrame with grid geometries.
"""
xs0, ys0, xs1, ys1 = [], [], [], []
for x0 in np.arange(minx, maxx + gridsize, gridsize):
for y0 in np.arange(miny, maxy + gridsize, gridsize):
x1 = x0 - gridsize
y1 = y0 + gridsize
xs0.append(x0)
ys0.append(y0)
xs1.append(x1)
ys1.append(y1)
grid_cells = box(xs0, ys0, xs1, ys1)
return gpd.GeoDataFrame(grid_cells, columns=["geometry"], crs=crs)
[docs]
def make_grid(
obj: GeoDataFrame | GeoSeries | Geometry | tuple,
gridsize: int | float,
*,
crs: CRS | None = None,
clip_to_bounds: bool = False,
) -> GeoDataFrame:
"""Create a polygon grid around geometries.
Creates a GeoDataFrame of grid cells of a given size around the bounds of
a given GeoDataFrame. The corners are rounded to the nearest integer.
Args:
obj: GeoDataFrame, GeoSeries, shapely geometry or bounding box
(an iterable with four values (minx, miny, maxx, maxy)).
gridsize: Length of the grid cell walls.
crs: Coordinate reference system if 'obj' is not GeoDataFrame or GeoSeries.
clip_to_bounds: Whether to clip the grid to the total bounds of the geometries.
Defaults to False.
Returns:
GeoDataFrame with grid polygons.
Raises:
ValueError: crs can only be None if obj is GeoDataFrame/GeoSeries.
"""
if isinstance(obj, (GeoDataFrame | GeoSeries)):
crs = obj.crs or crs
if hasattr(obj, "__len__") and not len(obj):
return GeoDataFrame({"geometry": []}, crs=crs)
minx, miny, maxx, maxy = to_bbox(obj)
minx = int(minx) if minx > 0 else int(minx - 1)
miny = int(miny) if miny > 0 else int(miny - 1)
grid = make_grid_from_bbox(minx, miny, maxx, maxy, gridsize=gridsize, crs=crs)
if clip_to_bounds:
grid = grid.clip(to_bbox(obj))
return grid
[docs]
def make_ssb_grid(
gdf: GeoDataFrame | GeoSeries, gridsize: int = 1000, add: int | float = 1
) -> GeoDataFrame:
"""Creates a polygon grid around a GeoDataFrame with an SSB id column.
Creates a grid that follows the grids produced by Statistics Norway.
The GeoDataFrame must have 25833 as crs (UTM 33 N).
Courtesy https://gis.stackexchange.com/questions/269243/creating-polygon-grid-using-geopandas
Args:
gdf: A GeoDataFrame.
gridsize: Size of the grid in meters.
add: Number of grid cells to add on each side,
to make sure all data is covered by the grid.
Returns:
GeoDataFrame with grid geometries and a column 'SSBID'.
Raises:
ValueError: If the GeoDataFrame does not have 25833 as crs.
TypeError: if gdf has wrong type.
"""
if not isinstance(gdf, (GeoDataFrame | GeoSeries)):
raise TypeError("gdf must be GeoDataFrame og GeoSeries.")
if not gdf.crs.equals(25833):
raise ValueError(
"Geodataframe must have crs = 25833. Use df.set_crs(25833) to set "
"projection or df.to_crs(25833) for transforming."
)
minx, miny, maxx, maxy = gdf.total_bounds
minx = minx - add * gridsize
miny = miny - add * gridsize
maxx = maxx + add * gridsize
maxy = maxy + add * gridsize
# Adjust for SSB-grid
if minx > 0:
minx = int(minx / int(gridsize)) * int(gridsize)
else:
minx = int((minx - gridsize) / int(gridsize)) * int(gridsize)
if minx > 0:
miny = int(miny / int(gridsize)) * int(gridsize)
else:
miny = int((miny - gridsize) / int(gridsize)) * int(gridsize)
cols = list(np.arange(minx, maxx + gridsize, gridsize))
rows = list(np.arange(miny, maxy + gridsize, gridsize))
polygons = []
for x in cols[:-1]:
for y in rows[:-1]:
polygons.append(
Polygon(
[
(x, y),
(x + gridsize, y),
(x + gridsize, y + gridsize),
(x, y + gridsize),
]
)
)
grid = gpd.GeoDataFrame({"geometry": polygons}, crs=25833)
# Make SSB id
grid["ostc"] = (
(np.floor((grid.geometry.centroid.x + 2000000) / gridsize) * gridsize).apply(
int
)
).apply(str)
grid["nordc"] = (
(np.floor((grid.geometry.centroid.y) / gridsize) * gridsize).apply(int)
).apply(str)
grid["SSBID"] = grid["ostc"] + grid["nordc"]
return grid[["SSBID", "geometry"]]
[docs]
def add_grid_id(
gdf: GeoDataFrame, gridsize: int, out_column: str = "SSBID"
) -> GeoDataFrame:
"""Adds an SSB grid ID column to a GeoDataFrame of points.
The GeoDataFrame must have 25833 as crs (UTM 33 N).
Args:
gdf: A GeoDataFrame.
gridsize: Size of the grid in meters.
out_column: Name of column for the grid id.
Returns:
The input GeoDataFrame with a new grid id column.
Raises:
ValueError: If the GeoDataFrame does not have 25833 as crs.
"""
if gdf.crs != 25833:
raise ValueError(
"Geodataframe must have crs = 25833. Use df.set_crs(25833) to set "
"projection or df.to_crs(25833) for transforming."
)
midlrdf = gdf.copy()
midlrdf["ostc"] = (
(np.floor((midlrdf.geometry.x + 2000000) / gridsize) * gridsize).apply(int)
).apply(str)
midlrdf["nordc"] = (
(np.floor((midlrdf.geometry.y) / gridsize) * gridsize).apply(int)
).apply(str)
midlrdf[out_column] = midlrdf["ostc"] + midlrdf["nordc"]
midlrdf = midlrdf.drop(columns=["nordc", "ostc"])
return midlrdf
[docs]
def bounds_to_polygon(
gdf: GeoDataFrame | GeoSeries, copy: bool = True
) -> GeoDataFrame | GeoSeries:
"""Creates a box around the geometry in each row of a GeoDataFrame.
Args:
gdf: The GeoDataFrame.
copy: Defaults to True.
Returns:
GeoDataFrame of box polygons with length and index of 'gdf'.
Examples:
---------
>>> import sgis as sg
>>> gdf = sg.to_gdf([MultiPoint([(0, 0), (1, 1)]), Point(0, 0)])
>>> gdf
geometry
0 MULTIPOINT (0.00000 0.00000, 1.00000 1.00000)
1 POINT (0.00000 0.00000)
>>> sg.bounds_to_polygon(gdf)
geometry
0 POLYGON ((1.00000 0.00000, 1.00000 1.00000, 0....
1 POLYGON ((0.00000 0.00000, 0.00000 0.00000, 0....
"""
if isinstance(gdf, GeoSeries):
return GeoSeries([box(*arr) for arr in gdf.bounds.values], index=gdf.index)
if copy:
gdf = gdf.copy()
gdf.geometry = [box(*arr) for arr in gdf.bounds.values]
return gdf
[docs]
def bounds_to_points(
gdf: GeoDataFrame | GeoSeries, copy: bool = True
) -> GeoDataFrame | GeoSeries:
"""Creates a 4-noded multipoint around the geometry in each row of a GeoDataFrame.
Args:
gdf: The GeoDataFrame.
copy: Defaults to True.
Returns:
GeoDataFrame of multipoints with same length and index as 'gdf'.
Examples:
---------
>>> import sgis as sg
>>> from shapely.geometry import MultiPoint, Point
>>> gdf = sg.to_gdf([MultiPoint([(0, 0), (1, 1)]), Point(0, 0)])
>>> gdf
geometry
0 MULTIPOINT (0.00000 0.00000, 1.00000 1.00000)
1 POINT (0.00000 0.00000)
>>> sg.bounds_to_points(gdf)
geometry
0 MULTIPOINT (1.00000 0.00000, 1.00000 1.00000, ...
1 MULTIPOINT (0.00000 0.00000)
"""
as_bounds = bounds_to_polygon(gdf, copy=copy)
if isinstance(gdf, GeoSeries):
return GeoSeries(extract_unique_points(as_bounds), index=gdf.index)
gdf.geometry = extract_unique_points(as_bounds.geometry)
return gdf
[docs]
def get_total_bounds(
*geometries: GeoDataFrame | GeoSeries | Geometry, strict: bool = False
) -> tuple[float, float, float, float]:
"""Get a combined total bounds of multiple geometry objects."""
xs, ys = [], []
for obj in geometries:
try:
minx, miny, maxx, maxy = to_bbox(obj)
xs += [minx, maxx]
ys += [miny, maxy]
except Exception as e:
try:
for x in obj:
minx, miny, maxx, maxy = to_bbox(x)
xs += [minx, maxx]
ys += [miny, maxy]
except Exception as e2:
if strict:
raise e2 from e
else:
continue
return min(xs), min(ys), max(xs), max(ys)