Source code for sgis.geopandas_tools.cleaning

import re
import warnings
from typing import Any

import numpy as np
import pandas as pd
import shapely
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from geopandas.array import GeometryArray
from numpy.typing import NDArray
from shapely import extract_unique_points
from shapely import force_2d
from shapely import get_coordinates
from shapely import get_exterior_ring
from shapely import get_parts
from shapely import linearrings
from shapely import linestrings
from shapely import make_valid
from shapely import polygons
from shapely.errors import GEOSException
from shapely.geometry import LinearRing
from shapely.geometry import LineString
from shapely.geometry import Point

from ..networkanalysis.closing_network_holes import get_angle
from .buffer_dissolve_explode import buff
from .buffer_dissolve_explode import dissexp
from .buffer_dissolve_explode import dissexp_by_cluster
from .conversion import coordinate_array
from .conversion import to_geoseries
from .duplicates import get_intersections
from .duplicates import update_geometries
from .general import clean_geoms
from .general import sort_large_first
from .general import sort_small_first
from .general import to_lines
from .geometry_types import make_all_singlepart
from .overlay import clean_overlay
from .polygon_operations import close_all_holes
from .polygon_operations import eliminate_by_longest
from .polygon_operations import get_cluster_mapper
from .polygon_operations import get_gaps
from .polygons_as_rings import PolygonsAsRings
from .sfilter import sfilter

warnings.simplefilter(action="ignore", category=UserWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)


PRECISION = 1e-4
BUFFER_RES = 50


[docs] def coverage_clean( gdf: GeoDataFrame, tolerance: int | float, mask=None, *, duplicate_action: str = "fix", grid_sizes: tuple[None | int] = (None,), logger=None, ) -> GeoDataFrame: """Fix thin gaps, holes, slivers and double surfaces. Rules: - Holes (interiors) thinner than the tolerance are closed. - Gaps between polygons are filled if thinner than the tolerance. - Sliver polygons thinner than the tolerance are eliminated into the neighbor polygon with the longest shared border. - Double surfaces thinner than the tolerance are eliminated. If duplicate_action is "fix", thicker double surfaces will be updated from top to bottom of the GeoDataFrame's rows. - Line and point geometries are removed. - MultiPolygons are exploded to Polygons. - Index is reset. Args: gdf: GeoDataFrame to be cleaned. tolerance: distance (usually meters) used as the minimum thickness for polygons to be eliminated. Any gap, hole, sliver or double surface that are empty after a negative buffer of tolerance / 2 are eliminated into the neighbor with the longest shared border. mask: Unused. duplicate_action: Either "fix", "error" or "ignore". If "fix" (default), double surfaces thicker than the tolerance will be updated from top to bottom (function update_geometries) and then dissolved into the neighbor polygon with the longest shared border. If "error", an Exception is raised if there are any double surfaces thicker than the tolerance. If "ignore", double surfaces are kept as is. grid_sizes: One or more grid_sizes used in overlay and dissolve operations that might raise a GEOSException. Defaults to (None,), meaning no grid_sizes. logger: Optional. Returns: A GeoDataFrame with cleaned polygons. """ if not len(gdf): return gdf _cleaning_checks(gdf, tolerance, duplicate_action) if not gdf.index.is_unique: gdf = gdf.reset_index(drop=True) gdf = make_all_singlepart(gdf).loc[ lambda x: x.geom_type.isin(["Polygon", "MultiPolygon"]) ] gdf = safe_simplify(gdf, PRECISION) gdf = ( clean_geoms(gdf) .pipe(make_all_singlepart) .loc[lambda x: x.geom_type.isin(["Polygon", "MultiPolygon"])] ) try: gaps = get_gaps(gdf, include_interiors=True) except GEOSException: for i, grid_size in enumerate(grid_sizes): try: gaps = get_gaps(gdf, include_interiors=True, grid_size=grid_size) if grid_size: # in order to not get more gaps gaps.geometry = gaps.buffer(grid_size) break except GEOSException as e: if i == len(grid_sizes) - 1: explore_geosexception(e, gdf, logger=logger) raise e gaps["_was_gap"] = 1 if duplicate_action == "ignore": double = GeoDataFrame({"geometry": []}, crs=gdf.crs) double["_double_idx"] = None else: double = get_intersections(gdf) double["_double_idx"] = range(len(double)) gdf, slivers = split_out_slivers(gdf, tolerance) gdf["_poly_idx"] = range(len(gdf)) thin_gaps_and_double = pd.concat([gaps, double]).loc[ lambda x: x.buffer(-tolerance / 2).is_empty ] all_are_thin = double["_double_idx"].isin(thin_gaps_and_double["_double_idx"]).all() if not all_are_thin and duplicate_action == "fix": gdf, thin_gaps_and_double, slivers = _properly_fix_duplicates( gdf, double, slivers, thin_gaps_and_double, tolerance ) elif not all_are_thin and duplicate_action == "error": raise ValueError("Large double surfaces.") to_eliminate = pd.concat([thin_gaps_and_double, slivers], ignore_index=True) to_eliminate = safe_simplify(to_eliminate, PRECISION) to_eliminate = to_eliminate.loc[lambda x: ~x.buffer(-PRECISION / 10).is_empty] to_eliminate = try_for_grid_size( split_by_neighbors, grid_sizes=grid_sizes, args=(to_eliminate, gdf), kwargs=dict(tolerance=tolerance), ) to_eliminate["_eliminate_idx"] = range(len(to_eliminate)) to_eliminate["_cluster"] = get_cluster_mapper(to_eliminate.buffer(PRECISION)) gdf_geoms_idx = gdf[["_poly_idx", "geometry"]] poly_idx_mapper: pd.Series = ( clean_overlay( buff( to_eliminate[["_eliminate_idx", "geometry"]], tolerance, resolution=BUFFER_RES, ), gdf_geoms_idx, geom_type="polygon", ) .pipe(sort_large_first) .drop_duplicates("_eliminate_idx") .set_index("_eliminate_idx")["_poly_idx"] ) to_eliminate["_poly_idx"] = to_eliminate["_eliminate_idx"].map(poly_idx_mapper) isolated = to_eliminate[lambda x: x["_poly_idx"].isna()] intersecting = to_eliminate[lambda x: x["_poly_idx"].notna()] for i, grid_size in enumerate(grid_sizes): try: without_double = update_geometries( intersecting, geom_type="polygon", grid_size=grid_size ).drop(columns=["_eliminate_idx", "_double_idx"]) break except GEOSException as e: if i == len(grid_sizes) - 1: explore_geosexception(e, gdf, intersecting, isolated, logger=logger) raise e not_really_isolated = isolated[["geometry", "_eliminate_idx", "_cluster"]].merge( without_double.drop(columns=["geometry"]), on="_cluster", how="inner", ) really_isolated = isolated.loc[ lambda x: ~x["_eliminate_idx"].isin(not_really_isolated["_eliminate_idx"]) ] is_gap = really_isolated["_was_gap"] == 1 isolated_gaps = really_isolated.loc[is_gap, ["geometry"]].sjoin_nearest( gdf, max_distance=PRECISION ) really_isolated = really_isolated[~is_gap] really_isolated["_poly_idx"] = ( really_isolated["_cluster"] + gdf["_poly_idx"].max() + 1 ) for i, grid_size in enumerate(grid_sizes): try: cleaned = ( dissexp( pd.concat( [ gdf, without_double, not_really_isolated, really_isolated, isolated_gaps, ] ).drop( columns=[ "_cluster", "_was_gap", "_eliminate_idx", "index_right", "_double_idx", ], errors="ignore", ), by="_poly_idx", aggfunc="first", dropna=True, grid_size=grid_size, ) .sort_index() .reset_index(drop=True) ) break except GEOSException as e: if i == len(grid_sizes) - 1: explore_geosexception( e, gdf, without_double, isolated, really_isolated, logger=logger ) raise e cleaned_area_sum = cleaned.area.sum() for i, grid_size in enumerate(grid_sizes): try: cleaned = clean_overlay( gdf.drop(columns="_poly_idx"), cleaned, how="update", geom_type="polygon", ) break except GEOSException as e: if i == len(grid_sizes) - 1: explore_geosexception( e, gdf, cleaned, without_double, isolated, really_isolated, logger=logger, ) raise e try: cleaned = update_geometries( cleaned, geom_type="polygon", grid_size=grid_size ) except GEOSException: pass # if logger and cleaned_area_sum > cleaned.area.sum() + 1: # print("\ncleaned.area.sum() diff", cleaned_area_sum - cleaned.area.sum()) # logger.debug("cleaned.area.sum() diff", cleaned_area_sum - cleaned.area.sum()) cleaned = sort_large_first(cleaned) for i, grid_size in enumerate(grid_sizes): try: cleaned = update_geometries( cleaned, geom_type="polygon", grid_size=grid_size ) break except GEOSException as e: cleaned.geometry = shapely.simplify( cleaned.geometry, PRECISION * (10 * i + 1) ) if i == len(grid_sizes) - 1: explore_geosexception( e, gdf, cleaned, without_double, isolated, really_isolated, logger=logger, ) raise e cleaned = safe_simplify(cleaned, PRECISION) cleaned.geometry = shapely.make_valid(cleaned.geometry) return cleaned
[docs] def safe_simplify(gdf, tolerance: float | int) -> GeoDataFrame: """Simplify only if the resulting area is no more than 1 percent larger. Because simplifying can result in holes being filled. """ length_then = gdf.length copied = gdf.copy() copied.geometry = shapely.make_valid( shapely.simplify(copied.geometry.values, tolerance=tolerance) ) copied.loc[copied.area > length_then * 1.01, copied._geometry_column_name] = ( gdf.loc[copied.area > length_then * 1.01, copied._geometry_column_name] ) return copied
[docs] def remove_spikes(gdf: GeoDataFrame, tolerance: int | float) -> GeoDataFrame: """Remove thin spikes in polygons. Note that this function might be slow. Should only be used if nessecary. Args: gdf: GeoDataFrame of polygons tolerance: distance (usually meters) used as the minimum thickness for polygons to be eliminated. Any spike thinner than the tolerance will be removed. Returns: A GeoDataFrame of polygons without spikes thinner. """ gdf.geometry = ( PolygonsAsRings(gdf.geometry) .apply_numpy_func(_remove_spikes, args=(tolerance,)) .to_numpy() ) return gdf
# def remove_spikes(gdf: GeoDataFrame, tolerance: int | float) -> GeoDataFrame: # return clean_overlay( # gdf, gdf[["geometry"]], how="intersection", grid_size=tolerance # ) def _remove_spikes( geoms: NDArray[LinearRing], tolerance: int | float ) -> NDArray[LinearRing]: if not len(geoms): return geoms geoms = to_geoseries(geoms).reset_index(drop=True) points = ( extract_unique_points(geoms).explode(index_parts=False).to_frame("geometry") ) points = get_angle_between_indexed_points(points) def to_buffered_rings_without_spikes(x): polys = GeoSeries(make_valid(polygons(get_exterior_ring(x)))) return ( polys.buffer(-tolerance, resolution=BUFFER_RES) .explode(index_parts=False) .pipe(close_all_holes) .pipe(get_exterior_ring) .buffer(tolerance * 10) ) buffered = to_buffered_rings_without_spikes( geoms.buffer(tolerance / 2, resolution=BUFFER_RES) ) points_without_spikes = ( extract_unique_points(geoms) .explode(index_parts=False) .loc[lambda x: x.index.isin(sfilter(x, buffered).index)] ) # linearrings require at least 4 coordinate pairs, or three unique points_without_spikes = points_without_spikes.loc[ lambda x: x.groupby(level=0).size() >= 3 ] # need an index from 0 to n-1 in 'linearrings' to_int_index = { ring_idx: i for i, ring_idx in enumerate(sorted(set(points_without_spikes.index))) } int_indices = points_without_spikes.index.map(to_int_index) as_lines = pd.Series( linearrings( get_coordinates(points_without_spikes.geometry.values), indices=int_indices, ), index=points_without_spikes.index.unique(), ) # the missing polygons are thin and/or spiky. Let's remove them missing = geoms.loc[~geoms.index.isin(as_lines.index)] missing = pd.Series( [None] * len(missing), index=missing.index.values, ) return pd.concat([as_lines, missing]).sort_index()
[docs] def get_angle_between_indexed_points(point_df: GeoDataFrame) -> GeoDataFrame: """Get angle difference between the two lines.""" point_df["next"] = point_df.groupby(level=0)["geometry"].shift(-1) notna = point_df["next"].notna() this = coordinate_array(point_df.loc[notna, "geometry"].values) next_ = coordinate_array(point_df.loc[notna, "next"].values) point_df.loc[notna, "angle"] = get_angle(this, next_) point_df["prev_angle"] = point_df.groupby(level=0)["angle"].shift(1) point_df["angle_diff"] = point_df["angle"] - point_df["prev_angle"] return point_df
def _properly_fix_duplicates(gdf, double, slivers, thin_gaps_and_double, tolerance): gdf = _dissolve_thick_double_and_update(gdf, double, thin_gaps_and_double) gdf, more_slivers = split_out_slivers(gdf, tolerance) slivers = pd.concat([slivers, more_slivers], ignore_index=True) gaps = get_gaps(gdf, include_interiors=True) gaps["_was_gap"] = 1 assert "_double_idx" not in gaps double = get_intersections(gdf) double["_double_idx"] = range(len(double)) thin_gaps_and_double = pd.concat([gaps, double], ignore_index=True).loc[ lambda x: x.buffer(-tolerance / 2).is_empty ] return gdf, thin_gaps_and_double, slivers def _dissolve_thick_double_and_update(gdf, double, thin_double): large = ( double.loc[~double["_double_idx"].isin(thin_double["_double_idx"])] .drop(columns="_double_idx") # .pipe(sort_large_first) .sort_values("_poly_idx") .pipe(update_geometries, geom_type="polygon") ) return ( clean_overlay(gdf, large, how="update") # .pipe(sort_large_first) .sort_values("_poly_idx").pipe(update_geometries, geom_type="polygon") ) def _cleaning_checks(gdf, tolerance, duplicate_action): # , spike_action): if not len(gdf) or not tolerance: return gdf if tolerance < PRECISION: raise ValueError( f"'tolerance' must be larger than {PRECISION} to avoid " "problems with floating point precision." ) if duplicate_action not in ["fix", "error", "ignore"]: raise ValueError("duplicate_action must be 'fix', 'error' or 'ignore'") def split_out_slivers( gdf: GeoDataFrame | GeoSeries, tolerance: float | int ) -> tuple[GeoDataFrame, GeoDataFrame] | tuple[GeoSeries, GeoSeries]: is_sliver = gdf.buffer(-tolerance / 2).is_empty slivers = gdf.loc[is_sliver] gdf = gdf.loc[~is_sliver] return gdf, slivers def try_for_grid_size( func, grid_sizes: tuple[None, float | int], args: tuple | None = None, kwargs: dict | None = None, ) -> Any: if args is None: args = () if kwargs is None: kwargs = {} for i, grid_size in enumerate(grid_sizes): try: return func(*args, grid_size=grid_size, **kwargs) except GEOSException as e: if i == len(grid_sizes) - 1: raise e def split_and_eliminate_by_longest( gdf: GeoDataFrame | list[GeoDataFrame], to_eliminate: GeoDataFrame, tolerance: int | float, grid_sizes: tuple[None | float | int] = (None,), logger=None, **kwargs, ) -> GeoDataFrame | tuple[GeoDataFrame]: if not len(to_eliminate): return gdf if not isinstance(gdf, (GeoDataFrame, GeoSeries)): as_gdf = pd.concat(gdf, ignore_index=True) else: as_gdf = gdf splitted = try_for_grid_size( split_by_neighbors, grid_sizes=grid_sizes, args=(to_eliminate, as_gdf, tolerance), ).pipe(sort_small_first) splitted = try_for_grid_size( update_geometries, grid_sizes=grid_sizes, args=(splitted,), kwargs=dict(geom_type="polygon"), ) gdf = try_for_grid_size( eliminate_by_longest, grid_sizes=grid_sizes, args=( gdf, splitted, ), kwargs=kwargs, ) if not isinstance(gdf, (GeoDataFrame, GeoSeries)): as_gdf = pd.concat(gdf, ignore_index=True) else: as_gdf = gdf missing = try_for_grid_size( clean_overlay, grid_sizes=grid_sizes, args=( to_eliminate, as_gdf, ), kwargs=dict( how="difference", geom_type="polygon", ), ).pipe(dissexp_by_cluster) return try_for_grid_size( eliminate_by_longest, grid_sizes=grid_sizes, args=(gdf, missing), kwargs=kwargs ) def split_by_neighbors(df, split_by, tolerance, grid_size=None) -> GeoDataFrame: if not len(df): return df split_by = split_by.copy() split_by.geometry = shapely.simplify(split_by.geometry, tolerance) intersecting_lines = ( clean_overlay( to_lines(split_by), buff(df, tolerance), how="identity", grid_size=grid_size ) .pipe(get_line_segments) .reset_index(drop=True) ) endpoints = intersecting_lines.boundary.explode(index_parts=False) extended_lines = GeoDataFrame( { "geometry": extend_lines( endpoints.loc[lambda x: ~x.index.duplicated(keep="first")].values, endpoints.loc[lambda x: ~x.index.duplicated(keep="last")].values, distance=tolerance * 3, ) }, crs=df.crs, ) buffered = buff(extended_lines, tolerance, single_sided=True) return clean_overlay(df, buffered, how="identity", grid_size=grid_size) def extend_lines(arr1, arr2, distance) -> NDArray[LineString]: if len(arr1) != len(arr2): raise ValueError if not len(arr1): return arr1 arr1, arr2 = arr2, arr1 # TODO fix coords1 = coordinate_array(arr1) coords2 = coordinate_array(arr2) dx = coords2[:, 0] - coords1[:, 0] dy = coords2[:, 1] - coords1[:, 1] len_xy = np.sqrt((dx**2.0) + (dy**2.0)) x = coords1[:, 0] + (coords1[:, 0] - coords2[:, 0]) / len_xy * distance y = coords1[:, 1] + (coords1[:, 1] - coords2[:, 1]) / len_xy * distance new_points = np.array([None for _ in range(len(arr1))]) new_points[~np.isnan(x)] = shapely.points(x[~np.isnan(x)], y[~np.isnan(x)]) new_points[~np.isnan(x)] = make_lines_between_points( arr2[~np.isnan(x)], new_points[~np.isnan(x)] ) return new_points def make_lines_between_points( arr1: NDArray[Point], arr2: NDArray[Point] ) -> NDArray[LineString]: if arr1.shape != arr2.shape: raise ValueError( f"Arrays must have equal shape. Got {arr1.shape} and {arr2.shape}" ) coords: pd.DataFrame = pd.concat( [ pd.DataFrame(get_coordinates(arr1), columns=["x", "y"]), pd.DataFrame(get_coordinates(arr2), columns=["x", "y"]), ] ).sort_index() return linestrings(coords.values, indices=coords.index) def get_line_segments(lines) -> GeoDataFrame: assert lines.index.is_unique if isinstance(lines, GeoDataFrame): multipoints = lines.assign( **{ lines._geometry_column_name: force_2d( extract_unique_points(lines.geometry.values) ) } ) return multipoints_to_line_segments(multipoints.geometry) multipoints = GeoSeries(extract_unique_points(lines.values), index=lines.index) return multipoints_to_line_segments(multipoints) def multipoints_to_line_segments(multipoints: GeoSeries) -> GeoDataFrame: if not len(multipoints): return GeoDataFrame({"geometry": multipoints}, index=multipoints.index) try: crs = multipoints.crs except AttributeError: crs = None try: point_df = multipoints.explode(index_parts=False) if isinstance(point_df, GeoSeries): point_df = point_df.to_frame("geometry") except AttributeError: points, indices = get_parts(multipoints, return_index=True) if isinstance(multipoints.index, pd.MultiIndex): indices = pd.MultiIndex.from_arrays(indices, names=multipoints.index.names) point_df = pd.DataFrame({"geometry": GeometryArray(points)}, index=indices) point_df["next"] = point_df.groupby(level=0)["geometry"].shift(-1) first_points = point_df.loc[lambda x: ~x.index.duplicated(), "geometry"] is_last_point = point_df["next"].isna() point_df.loc[is_last_point, "next"] = first_points assert point_df["next"].notna().all() point_df["geometry"] = [ LineString([x1, x2]) for x1, x2 in zip(point_df["geometry"], point_df["next"], strict=False) ] return GeoDataFrame(point_df.drop(columns=["next"]), geometry="geometry", crs=crs) def explore_geosexception(e: GEOSException, *gdfs, logger=None) -> None: from ..maps.maps import Explore from ..maps.maps import explore from .conversion import to_gdf pattern = r"(\d+\.\d+)\s+(\d+\.\d+)" matches = re.findall(pattern, str(e)) coords_in_error_message = [(float(match[0]), float(match[1])) for match in matches] exception_point = to_gdf(coords_in_error_message, crs=gdfs[0].crs) if len(exception_point): exception_point["wkt"] = exception_point.to_wkt() if logger: logger.error( e, Explore(exception_point, *gdfs, mask=exception_point.buffer(100)) ) else: explore(exception_point, *gdfs, mask=exception_point.buffer(100)) else: if logger: logger.error(e, Explore(*gdfs)) else: explore(*gdfs)