Source code for sgis.geopandas_tools.polygons_as_rings

from collections.abc import Callable
from typing import Any

import numpy as np
import pandas as pd
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from geopandas.array import GeometryArray
from numpy.typing import NDArray
from pyproj import CRS
from shapely import difference
from shapely import get_coordinates
from shapely import get_exterior_ring
from shapely import get_interior_ring
from shapely import get_num_interior_rings
from shapely import linearrings
from shapely import make_valid
from shapely import polygons
from shapely import unary_union
from shapely.geometry import LinearRing
from shapely.geometry import Polygon

from .conversion import to_gdf
from .conversion import to_geoseries


[docs] class PolygonsAsRings: """Convert polygons to linearrings, apply linestring functions, then convert back to polygons.""" def __init__( self, polys: GeoDataFrame | GeoSeries | GeometryArray, crs: CRS | Any | None = None, allow_multipart: bool = False, gridsize: int | None = None, ) -> None: """Initialize the PolygonsAsRings object with polygons and optional CRS information. Args: polys: GeoDataFrame, GeoSeries, or GeometryArray containing polygon geometries. crs: Coordinate Reference System to be used, defaults to None. allow_multipart: Allow multipart polygons if True, defaults to False. gridsize: Size of the grid for any grid operations, defaults to None. """ if not isinstance(polys, (pd.DataFrame, pd.Series, GeometryArray)): raise TypeError(type(polys)) self.polyclass = polys.__class__ if not isinstance(polys, pd.DataFrame): polys = to_gdf(polys, crs) if not allow_multipart and not (polys.geom_type == "Polygon").all(): raise ValueError( "All geometries must be single-type Polygons. Set allow_multipart=True to allow MultiPolygons", polys.geom_type.value_counts(), ) if not polys.geom_type.isin(["Polygon", "MultiPolygon"]).all(): raise ValueError( f"All geometries must be Polygons. Got {polys.geom_type.value_counts()}" ) self._index_mapper = dict(enumerate(polys.index)) self.gdf = polys.reset_index(drop=True) if crs is not None: self.crs = crs elif hasattr(polys, "crs"): self.crs = polys.crs else: self.crs = None if not len(self.gdf): self.rings = pd.Series() return exterior = pd.Series( get_exterior_ring(self.gdf.geometry.values), index=self._exterior_index, ) self.max_rings: int = np.max(get_num_interior_rings(self.gdf.geometry.values)) if not self.max_rings: self.rings = exterior return # series same length as number of potential inner rings interiors = pd.Series( ( [ [get_interior_ring(geom, i) for i in range(self.max_rings)] for geom in self.gdf.geometry ] ), ).explode() interiors.index = self._interiors_index interiors = interiors.dropna() self.rings = pd.concat([exterior, interiors])
[docs] def get_rings(self, agg: bool = False) -> GeoDataFrame | GeoSeries | np.ndarray: """Retrieve rings from the polygons, optionally aggregating them. Args: agg: If True, aggregate the rings into single geometries. Returns: The rings either aggregated or separated, in the type of the input polygons. """ gdf = self.gdf.copy() rings = self.rings.copy() if not len(rings): return GeoSeries() if agg: gdf.geometry = rings.groupby(level=1).agg(unary_union) else: rings.index = rings.index.get_level_values(1) rings.name = "geometry" gdf = gdf.drop(columns="geometry").join(rings) if issubclass(self.polyclass, pd.DataFrame): return GeoDataFrame(gdf, crs=self.crs) if issubclass(self.polyclass, pd.Series): return GeoSeries(gdf.geometry) return self.polyclass(gdf.geometry.values)
[docs] def apply_numpy_func_to_interiors( self, func: Callable, args: tuple | None = None, kwargs: dict | None = None, ) -> "PolygonsAsRings": """Apply a numpy function specifically to the interior rings of the polygons. Args: func: Numpy function to apply. args: Tuple of positional arguments for the function. kwargs: Dictionary of keyword arguments for the function. Returns: PolygonsAsRings: The instance itself after applying the function. """ kwargs = kwargs or {} args = args or () arr: NDArray[LinearRing] = self.rings.loc[self.is_interior].values index: pd.Index = self.rings.loc[self.is_interior].index results = pd.Series( np.array(func(arr, *args, **kwargs)), index=index, ) self.rings.loc[self.is_interior] = results return self
[docs] def apply_numpy_func( self, func: Callable, args: tuple | None = None, kwargs: dict | None = None, ) -> "PolygonsAsRings": """Apply a numpy function to all rings of the polygons. Args: func: Numpy function to apply. args: Tuple of positional arguments for the function. kwargs: Dictionary of keyword arguments for the function. Returns: PolygonsAsRings: The instance itself after applying the function. """ kwargs = kwargs or {} args = args or () results = np.array(func(self.rings.values, *args, **kwargs)) if len(results) != len(self.rings): raise ValueError( f"Different length of results. Got {len(results)} and {len(self.rings)} original rings" ) self.rings.loc[:] = results # type: ignore [call-overload] return self
[docs] def apply_geoseries_func( self, func: Callable, args: tuple | None = None, kwargs: dict | None = None, ) -> "PolygonsAsRings": """Apply a function that operates on a GeoSeries to the rings. Args: func: Function to apply that expects a GeoSeries. args: Tuple of positional arguments for the function. kwargs: Dictionary of keyword arguments for the function. Returns: PolygonsAsRings: The instance itself after applying the function. """ kwargs = kwargs or {} args = args or () self.rings.loc[:] = np.array( # type: ignore [call-overload] func( GeoSeries( self.rings.values, crs=self.crs, index=self.rings.index.get_level_values(1).map(self._index_mapper), ), *args, **kwargs, ) ) return self
[docs] def apply_gdf_func( self, func: Callable, args: tuple | None = None, kwargs: dict | None = None, ) -> "PolygonsAsRings": """Apply a function that operates on a GeoDataFrame to the rings. Args: func: Function to apply that expects a GeoDataFrame. args: Tuple of positional arguments for the function. kwargs: Dictionary of keyword arguments for the function. Returns: PolygonsAsRings: The instance itself after applying the function. """ kwargs = kwargs or {} args = args or () gdf = GeoDataFrame( {"geometry": self.rings.values}, crs=self.crs, index=self.rings.index.get_level_values(1).map(self._index_mapper), ).join(self.gdf.drop(columns="geometry")) assert len(gdf) == len(self.rings) gdf.index = self.rings.index self.rings.loc[:] = func( # type: ignore [call-overload] gdf, *args, **kwargs, ).geometry.values return self
@property def is_interior(self) -> bool: """Returns a boolean Series of whether the row is an interior ring.""" return self.rings.index.get_level_values(0) == 1 @property def is_exterior(self) -> bool: """Returns a boolean Series of whether the row is an exterior ring.""" return self.rings.index.get_level_values(0) == 0 @property def _interiors_index(self) -> pd.MultiIndex: """A three-leveled MultiIndex. Used to separate interior and exterior and sort the interior in the 'to_numpy' method. level 0: all 1s, indicating "is interior". level 1: gdf index repeated *self.max_rings* times. level 2: interior number index. 0 * len(gdf), 1 * len(gdf), 2 * len(gdf)... """ if not self.max_rings: return pd.MultiIndex() len_gdf = len(self.gdf) n_potential_interiors = len_gdf * self.max_rings gdf_index = sorted(list(self.gdf.index) * self.max_rings) interior_number_index = np.tile(np.arange(self.max_rings), len_gdf) one_for_interior = np.repeat(1, n_potential_interiors) return pd.MultiIndex.from_arrays( [one_for_interior, gdf_index, interior_number_index] ) @property def _exterior_index(self) -> pd.MultiIndex: """A three-leveled MultiIndex. Used to separate interior and exterior in the 'to_numpy' method. Only leve 1 is used for the exterior. level 0: all 0s, indicating "not interior". level 1: gdf index. level 2: All 0s. """ zero_for_not_interior = np.repeat(0, len(self.gdf)) return pd.MultiIndex.from_arrays( [zero_for_not_interior, self.gdf.index, zero_for_not_interior] )
[docs] def to_gdf(self) -> GeoDataFrame: """Return the GeoDataFrame with polygons.""" self.gdf.geometry = self.to_numpy() return self.gdf
[docs] def to_geoseries(self) -> GeoSeries: """Return the GeoSeries with polygons.""" self.gdf.geometry = self.to_numpy() return self.gdf.geometry
[docs] def to_numpy(self) -> NDArray[Polygon]: """Return a numpy array of polygons.""" if not len(self.rings): return np.array([]) exterior = self.rings.loc[self.is_exterior].sort_index() assert exterior.shape == (len(self.gdf),) nonempty_exteriors = exterior.loc[lambda x: x.notna()] empty_exteriors = exterior.loc[lambda x: x.isna()] nonempty_interiors = self.rings.loc[self.is_interior] if not len(nonempty_interiors): nonempty_exteriors.loc[:] = make_valid(polygons(nonempty_exteriors.values)) return pd.concat([empty_exteriors, nonempty_exteriors]).sort_index().values empty_interiors = pd.Series( [None for _ in range(len(self.gdf) * self.max_rings)], index=self._interiors_index, ).loc[lambda x: ~x.index.isin(nonempty_interiors.index)] interiors = ( pd.concat([nonempty_interiors, empty_interiors]) .sort_index() # make each ring level a column with same length and order as gdf .unstack(level=2) .sort_index() ) assert interiors.shape == (len(self.gdf), self.max_rings), interiors.shape interiors = interiors.loc[ interiors.index.get_level_values(1).isin( nonempty_exteriors.index.get_level_values(1) ) ] assert interiors.index.get_level_values(1).equals( nonempty_exteriors.index.get_level_values(1) ) # nan gives TypeError in shapely.polygons. None does not. for i, _ in enumerate(interiors.columns): interiors.loc[interiors.iloc[:, i].isna(), i] = None nonempty_exteriors.loc[nonempty_exteriors.isna()] = None # construct polygons with holes polys = make_valid( polygons( nonempty_exteriors.values, interiors.values, ) ) # interiors might have moved (e.g. snapped) so that they are not within the exterior # these interiors will not be holes, so we need to erase them manually interiors_as_polys = make_valid(polygons(interiors.values)) # merge interior polygons into 1d array interiors_as_polys = np.array( [ make_valid(unary_union(interiors_as_polys[i, :])) for i in range(interiors_as_polys.shape[0]) ] ) # erase rowwise nonempty_exteriors.loc[:] = make_valid(difference(polys, interiors_as_polys)) return pd.concat([empty_exteriors, nonempty_exteriors]).sort_index().values
[docs] def get_linearring_series(geoms: GeoDataFrame | GeoSeries) -> pd.Series: """Convert geometries into a series of LinearRings. Args: geoms: GeoDataFrame or GeoSeries from which to extract LinearRings. Returns: pd.Series: A series containing LinearRings. """ geoms = to_geoseries(geoms).explode(index_parts=False) coords, indices = get_coordinates(geoms, return_index=True) return pd.Series(linearrings(coords, indices=indices), index=geoms.index)
def _geoms_to_linearrings_fallback( exterior: pd.Series, interiors: pd.Series | None = None ) -> pd.Series: exterior.index = exterior.index.get_level_values(1) assert exterior.index.is_monotonic_increasing exterior = get_linearring_series(exterior) if interiors is None: return ( pd.Series( make_valid(polygons(exterior.values)), index=exterior.index, ) .groupby(level=0) .agg(lambda x: make_valid(unary_union(x))) ) interiors.index = interiors.index.get_level_values(1) new_interiors = [] for col in interiors: new_interiors.append(get_linearring_series(interiors[col])) all_none = [[None] * len(new_interiors)] * len(exterior) cols = list(interiors.columns) out_interiors = pd.DataFrame( all_none, columns=cols, index=exterior.index, ) out_interiors[cols] = pd.concat(new_interiors, axis=1) for col in out_interiors: out_interiors.loc[out_interiors[col].isna(), col] = None return ( pd.Series( make_valid(polygons(exterior.values, out_interiors.values)), index=exterior.index, ) .groupby(level=0) .agg(lambda x: make_valid(unary_union(x))) )