import re
import warnings
from collections.abc import Callable
from typing import Any
import numpy as np
import pandas as pd
import shapely
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from geopandas.array import GeometryArray
from numpy.typing import NDArray
from shapely import extract_unique_points
from shapely import get_coordinates
from shapely import get_parts
from shapely import linestrings
from shapely.errors import GEOSException
from shapely.geometry import LineString
from shapely.geometry import Point
from .buffer_dissolve_explode import buff
from .buffer_dissolve_explode import dissexp
from .conversion import coordinate_array
from .conversion import to_gdf
from .duplicates import get_intersections
from .duplicates import update_geometries
# from .general import sort_large_first as _sort_large_first
from .general import clean_geoms
from .general import sort_large_first
from .general import sort_small_first
from .general import to_lines
from .geometry_types import make_all_singlepart
from .geometry_types import to_single_geom_type
from .overlay import clean_overlay
from .polygon_operations import eliminate_by_longest
from .polygon_operations import get_cluster_mapper
from .polygon_operations import get_gaps
from .sfilter import sfilter_inverse
from .sfilter import sfilter_split
warnings.simplefilter(action="ignore", category=UserWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)
PRECISION = 1e-3
BUFFER_RES = 50
[docs]
def coverage_clean(
gdf: GeoDataFrame,
tolerance: int | float,
duplicate_action: str = "fix",
grid_sizes: tuple[None | int] = (None,),
n_jobs: int = 1,
) -> GeoDataFrame:
"""Fix thin gaps, holes, slivers and double surfaces.
The operations might raise GEOSExceptions, so it might be nessecary to set
the 'grid_sizes' argument, it might also be a good idea to run coverage_clean
twice to fill gaps resulting from these GEOSExceptions.
Rules:
- Holes (interiors) thinner than the tolerance are closed.
- Gaps between polygons are filled if thinner than the tolerance.
- Sliver polygons thinner than the tolerance are eliminated
into the neighbor polygon with the longest shared border.
- Double surfaces thinner than the tolerance are eliminated.
If duplicate_action is "fix", thicker double surfaces will
be updated.
- Line and point geometries are removed with no warning.
- MultiPolygons and GeometryCollections are exploded to Polygons.
- Index is reset.
Args:
gdf: GeoDataFrame to be cleaned.
tolerance: distance (usually meters) used as the minimum thickness
for polygons to be eliminated. Any gap, hole, sliver or double
surface that are empty after a negative buffer of tolerance / 2
are eliminated into the neighbor with the longest shared border.
duplicate_action: Either "fix", "error" or "ignore".
If "fix" (default), double surfaces thicker than the
tolerance will be updated from top to bottom (function update_geometries)
and then dissolved into the neighbor polygon with the longest shared border.
If "error", an Exception is raised if there are any double surfaces thicker
than the tolerance. If "ignore", double surfaces are kept as is.
grid_sizes: One or more grid_sizes used in overlay and dissolve operations that
might raise a GEOSException. Defaults to (None,), meaning no grid_sizes.
n_jobs: Number of threads.
Returns:
A GeoDataFrame with cleaned polygons.
"""
if not len(gdf):
return gdf
_cleaning_checks(gdf, tolerance, duplicate_action)
if not gdf.index.is_unique:
gdf = gdf.reset_index(drop=True)
gdf = make_all_singlepart(gdf).loc[
lambda x: x.geom_type.isin(["Polygon", "MultiPolygon"])
]
try:
gdf = _safe_simplify(gdf, PRECISION)
except GEOSException:
pass
gdf = (
clean_geoms(gdf)
.pipe(make_all_singlepart)
.loc[lambda x: x.geom_type.isin(["Polygon", "MultiPolygon"])]
)
try:
gaps = get_gaps(gdf, include_interiors=True)
except GEOSException:
for i, grid_size in enumerate(grid_sizes):
try:
gaps = get_gaps(gdf, include_interiors=True, grid_size=grid_size)
if grid_size:
# in order to not get more gaps
gaps.geometry = gaps.buffer(grid_size)
break
except GEOSException as e:
if i == len(grid_sizes) - 1:
explore_geosexception(e, gdf)
raise e
gaps["_was_gap"] = 1
if duplicate_action == "ignore":
double = GeoDataFrame({"geometry": []}, crs=gdf.crs)
double["_double_idx"] = None
else:
double = get_intersections(gdf, n_jobs=n_jobs)
double["_double_idx"] = range(len(double))
gdf, slivers = split_out_slivers(gdf, tolerance)
gdf["_poly_idx"] = range(len(gdf))
thin_gaps_and_double = pd.concat([gaps, double]).loc[
lambda x: (x.buffer(-tolerance / 2).is_empty)
]
all_are_thin = double["_double_idx"].isin(thin_gaps_and_double["_double_idx"]).all()
if not all_are_thin and duplicate_action == "fix":
gdf, thin_gaps_and_double, slivers = _properly_fix_duplicates(
gdf,
double,
slivers,
thin_gaps_and_double,
tolerance,
n_jobs=n_jobs,
)
elif not all_are_thin and duplicate_action == "error":
raise ValueError("Large double surfaces.")
to_eliminate = pd.concat([thin_gaps_and_double, slivers], ignore_index=True)
to_eliminate = to_eliminate.loc[lambda x: ~x.buffer(-PRECISION / 10).is_empty]
to_eliminate = try_for_grid_size(
split_by_neighbors,
grid_sizes=grid_sizes,
args=(to_eliminate, gdf),
kwargs=dict(tolerance=tolerance),
)
to_eliminate["_eliminate_idx"] = range(len(to_eliminate))
to_eliminate["_cluster"] = get_cluster_mapper(to_eliminate.buffer(PRECISION))
gdf_geoms_idx = gdf[["_poly_idx", "geometry"]]
poly_idx_mapper = clean_overlay(
buff(
to_eliminate[["_eliminate_idx", "geometry"]],
tolerance,
resolution=BUFFER_RES,
),
gdf_geoms_idx,
geom_type="polygon",
n_jobs=n_jobs,
)
poly_idx_mapper["_area_per_poly"] = poly_idx_mapper.area
poly_idx_mapper["_area_per_poly"] = poly_idx_mapper.groupby("_poly_idx")[
"_area_per_poly"
].transform("sum")
poly_idx_mapper: pd.Series = (
poly_idx_mapper.sort_values("_area_per_poly", ascending=False)
.drop_duplicates("_eliminate_idx")
.set_index("_eliminate_idx")["_poly_idx"]
)
to_eliminate["_poly_idx"] = to_eliminate["_eliminate_idx"].map(poly_idx_mapper)
isolated = to_eliminate[lambda x: x["_poly_idx"].isna()]
intersecting = to_eliminate[lambda x: x["_poly_idx"].notna()]
for i, grid_size in enumerate(grid_sizes):
try:
without_double = update_geometries(
intersecting,
geom_type="polygon",
grid_size=grid_size,
n_jobs=n_jobs,
).drop(columns=["_eliminate_idx", "_double_idx"])
break
except GEOSException as e:
if i == len(grid_sizes) - 1:
explore_geosexception(e, gdf, intersecting, isolated)
raise e
not_really_isolated = isolated[["geometry", "_eliminate_idx", "_cluster"]].merge(
without_double.drop(columns=["geometry"]),
on="_cluster",
how="inner",
)
really_isolated = isolated.loc[
lambda x: ~x["_eliminate_idx"].isin(not_really_isolated["_eliminate_idx"])
]
is_gap = really_isolated["_was_gap"] == 1
isolated_gaps = really_isolated.loc[is_gap, ["geometry"]].sjoin_nearest(
gdf, max_distance=PRECISION
)
really_isolated = really_isolated[~is_gap]
really_isolated["_poly_idx"] = (
really_isolated["_cluster"] + gdf["_poly_idx"].max() + 1
)
cleaned = pd.concat(
[
gdf,
without_double,
not_really_isolated,
really_isolated,
isolated_gaps,
],
).drop(
columns=[
"_cluster",
"_was_gap",
"_eliminate_idx",
"index_right",
"_double_idx",
"_area_per_poly",
],
errors="ignore",
)
try:
only_one = cleaned.groupby("_poly_idx").transform("size") == 1
one_hit = cleaned[only_one].drop(columns="_poly_idx")
many_hits = cleaned[~only_one]
except IndexError:
assert not cleaned["_poly_idx"].notna().any(), cleaned
one_hit = cleaned[lambda x: x.index == min(x.index) - 1].drop(
columns="_poly_idx", errors="ignore"
)
many_hits = cleaned
for i, grid_size in enumerate(grid_sizes):
try:
many_hits = (
dissexp(
many_hits,
by="_poly_idx",
aggfunc="first",
dropna=True,
grid_size=grid_size,
n_jobs=n_jobs,
)
.sort_index()
.reset_index(drop=True)
)
break
except GEOSException as e:
if i == len(grid_sizes) - 1:
explore_geosexception(e, gdf, without_double, isolated, really_isolated)
raise e
cleaned = pd.concat([many_hits, one_hit], ignore_index=True)
gdf = gdf.drop(columns="_poly_idx")
for i, grid_size in enumerate(grid_sizes):
try:
cleaned = clean_overlay(
gdf,
cleaned,
how="update",
geom_type="polygon",
grid_size=grid_size,
n_jobs=n_jobs,
)
break
except GEOSException as e:
if i == len(grid_sizes) - 1:
explore_geosexception(
e,
gdf,
cleaned,
without_double,
isolated,
really_isolated,
)
raise e
cleaned = sort_large_first(cleaned)
# slivers on bottom
cleaned = pd.concat(split_out_slivers(cleaned, tolerance))
for i, grid_size in enumerate(grid_sizes):
try:
cleaned = update_geometries(
cleaned,
geom_type="polygon",
grid_size=grid_size,
n_jobs=n_jobs,
)
break
except GEOSException as e:
if i == len(grid_sizes) - 1:
explore_geosexception(
e,
gdf,
cleaned,
without_double,
isolated,
really_isolated,
)
raise e
# cleaned = _safe_simplify(cleaned, PRECISION)
# cleaned.geometry = shapely.make_valid(cleaned.geometry)
# TODO check why polygons dissappear in rare cases. For now, just add back the missing
dissapeared_polygons = sfilter_inverse(gdf, cleaned.buffer(-PRECISION))
cleaned = pd.concat([cleaned, dissapeared_polygons])
return to_single_geom_type(cleaned, "polygon")
def _safe_simplify(gdf: GeoDataFrame, tolerance: float | int, **kwargs) -> GeoDataFrame:
"""Simplify only if the resulting area is no more than 1 percent larger.
Because simplifying can result in holes being filled.
"""
length_then = gdf.length
copied = gdf.copy()
copied.geometry = shapely.make_valid(
shapely.simplify(copied.geometry.values, tolerance=tolerance)
)
filt = (copied.area > length_then * 1.01) | (copied.geometry.is_empty)
copied.loc[filt, copied._geometry_column_name] = gdf.loc[
filt, copied._geometry_column_name
]
return copied
def _remove_interior_slivers(gdf: GeoDataFrame, tolerance: int | float) -> GeoDataFrame:
gdf, slivers = split_out_slivers(gdf, tolerance)
slivers["_idx"] = range(len(slivers))
without_thick = clean_overlay(
to_lines(slivers), buff(gdf, PRECISION), how="difference"
)
return pd.concat(
[
gdf,
slivers[lambda x: x["_idx"].isin(without_thick["_idx"])].drop(
columns="_idx"
),
]
)
[docs]
def remove_spikes(
gdf: GeoDataFrame, tolerance: int | float, n_jobs: int = 1
) -> GeoDataFrame:
"""Remove thin spikes from polygons.
Args:
gdf: A GeoDataFrame.
tolerance: Spike tolerance.
n_jobs: Number of threads.
Returns:
A GeoDataFrame.
"""
return clean_overlay(
gdf, gdf[["geometry"]], how="intersection", grid_size=tolerance, n_jobs=n_jobs
)
def _properly_fix_duplicates(
gdf: GeoDataFrame,
double: GeoDataFrame,
slivers: GeoDataFrame,
thin_gaps_and_double: GeoDataFrame,
tolerance: int | float,
n_jobs: int,
) -> GeoDataFrame:
gdf = _dissolve_thick_double_and_update(gdf, double, thin_gaps_and_double, n_jobs)
gdf, more_slivers = split_out_slivers(gdf, tolerance)
slivers = pd.concat([slivers, more_slivers], ignore_index=True)
gaps = get_gaps(gdf, include_interiors=True)
gaps["_was_gap"] = 1
assert "_double_idx" not in gaps
double = get_intersections(gdf)
double["_double_idx"] = range(len(double))
thin_gaps_and_double = pd.concat([gaps, double], ignore_index=True).loc[
lambda x: x.buffer(-tolerance / 2).is_empty
]
return gdf, thin_gaps_and_double, slivers
def _dissolve_thick_double_and_update(
gdf: GeoDataFrame, double: GeoDataFrame, thin_double: GeoDataFrame, n_jobs: int
) -> GeoDataFrame:
large = (
double.loc[~double["_double_idx"].isin(thin_double["_double_idx"])].drop(
columns="_double_idx"
)
# .pipe(sort_large_first)
# .sort_values("_poly_idx")
.pipe(update_geometries, geom_type="polygon", n_jobs=n_jobs)
)
return (
clean_overlay(gdf, large, how="update", geom_type="polygon", n_jobs=n_jobs)
# .pipe(sort_large_first)
# .sort_values("_poly_idx")
.pipe(update_geometries, geom_type="polygon", n_jobs=n_jobs)
)
def _cleaning_checks(
gdf: GeoDataFrame, tolerance: int | float, duplicate_action: bool
) -> GeoDataFrame: # , spike_action):
if not len(gdf) or not tolerance:
return gdf
if tolerance < PRECISION:
raise ValueError(
f"'tolerance' must be larger than {PRECISION} to avoid "
"problems with floating point precision."
)
if duplicate_action not in ["fix", "error", "ignore"]:
raise ValueError("duplicate_action must be 'fix', 'error' or 'ignore'")
def split_out_slivers(
gdf: GeoDataFrame | GeoSeries, tolerance: float | int
) -> tuple[GeoDataFrame, GeoDataFrame] | tuple[GeoSeries, GeoSeries]:
is_sliver = gdf.buffer(-tolerance / 2).is_empty
slivers = gdf.loc[is_sliver]
gdf = gdf.loc[~is_sliver]
slivers, isolated = sfilter_split(slivers, gdf.buffer(PRECISION))
gdf = pd.concat([gdf, isolated])
return gdf, slivers
def try_for_grid_size(
func: Callable,
grid_sizes: tuple[None, float | int],
args: tuple | None = None,
kwargs: dict | None = None,
) -> Any:
args = args or ()
kwargs = kwargs or {}
for i, grid_size in enumerate(grid_sizes):
try:
return func(*args, grid_size=grid_size, **kwargs)
except GEOSException as e:
if i == len(grid_sizes) - 1:
raise e
def split_and_eliminate_by_longest(
gdf: GeoDataFrame | list[GeoDataFrame],
to_eliminate: GeoDataFrame,
tolerance: int | float,
grid_sizes: tuple[None | float | int] = (None,),
n_jobs: int = 1,
**kwargs,
) -> GeoDataFrame | tuple[GeoDataFrame]:
if not len(to_eliminate):
return gdf
if not isinstance(gdf, (GeoDataFrame, GeoSeries)):
as_gdf = pd.concat(gdf, ignore_index=True)
else:
as_gdf = gdf
splitted = try_for_grid_size(
split_by_neighbors,
grid_sizes=grid_sizes,
args=(to_eliminate, as_gdf, tolerance),
).pipe(sort_small_first)
splitted = try_for_grid_size(
update_geometries,
grid_sizes=grid_sizes,
args=(splitted,),
kwargs=dict(geom_type="polygon", n_jobs=n_jobs),
)
gdf = try_for_grid_size(
eliminate_by_longest,
grid_sizes=grid_sizes,
args=(
gdf,
splitted,
),
kwargs=kwargs | {"n_jobs": n_jobs},
)
if not isinstance(gdf, (GeoDataFrame, GeoSeries)):
as_gdf = pd.concat(gdf, ignore_index=True)
else:
as_gdf = gdf
missing = try_for_grid_size(
clean_overlay,
grid_sizes=grid_sizes,
args=(
to_eliminate,
as_gdf,
),
kwargs=dict(
how="difference",
geom_type="polygon",
n_jobs=n_jobs,
),
).pipe(lambda x: dissexp(x, n_jobs=n_jobs))
return try_for_grid_size(
eliminate_by_longest,
grid_sizes=grid_sizes,
args=(gdf, missing),
kwargs=kwargs | {"n_jobs": n_jobs},
)
def split_by_neighbors(
df: GeoDataFrame,
split_by: GeoDataFrame,
tolerance: int | float,
grid_size: float | int | None = None,
) -> GeoDataFrame:
if not len(df):
return df
split_by = split_by.copy()
split_by.geometry = shapely.simplify(split_by.geometry, tolerance)
intersecting_lines = (
clean_overlay(
to_lines(split_by),
buff(df, tolerance),
how="intersection",
grid_size=grid_size,
)
.pipe(get_line_segments)
.reset_index(drop=True)
)
endpoints = intersecting_lines.boundary.explode(index_parts=False)
extended_lines = GeoDataFrame(
{
"geometry": extend_lines(
endpoints.loc[lambda x: ~x.index.duplicated(keep="first")].values,
endpoints.loc[lambda x: ~x.index.duplicated(keep="last")].values,
distance=tolerance * 3,
)
},
crs=df.crs,
)
buffered = buff(extended_lines, tolerance, single_sided=True)
return clean_overlay(df, buffered, how="identity", grid_size=grid_size)
def extend_lines(arr1, arr2, distance) -> NDArray[LineString]:
if len(arr1) != len(arr2):
raise ValueError
if not len(arr1):
return arr1
arr1, arr2 = arr2, arr1 # TODO fix
coords1 = coordinate_array(arr1)
coords2 = coordinate_array(arr2)
dx = coords2[:, 0] - coords1[:, 0]
dy = coords2[:, 1] - coords1[:, 1]
len_xy = np.sqrt((dx**2.0) + (dy**2.0))
x = coords1[:, 0] + (coords1[:, 0] - coords2[:, 0]) / len_xy * distance
y = coords1[:, 1] + (coords1[:, 1] - coords2[:, 1]) / len_xy * distance
new_points = np.array([None for _ in range(len(arr1))])
new_points[~np.isnan(x)] = shapely.points(x[~np.isnan(x)], y[~np.isnan(x)])
new_points[~np.isnan(x)] = make_lines_between_points(
arr2[~np.isnan(x)], new_points[~np.isnan(x)]
)
return new_points
def make_lines_between_points(
arr1: NDArray[Point], arr2: NDArray[Point]
) -> NDArray[LineString]:
if arr1.shape != arr2.shape:
raise ValueError(
f"Arrays must have equal shape. Got {arr1.shape} and {arr2.shape}"
)
coords: pd.DataFrame = pd.concat(
[
pd.DataFrame(get_coordinates(arr1), columns=["x", "y"]),
pd.DataFrame(get_coordinates(arr2), columns=["x", "y"]),
]
).sort_index()
return linestrings(coords.values, indices=coords.index)
def get_line_segments(lines: GeoDataFrame | GeoSeries) -> GeoDataFrame:
assert lines.index.is_unique
if isinstance(lines, GeoDataFrame):
geom_col = lines._geometry_column_name
multipoints = lines.assign(
**{geom_col: extract_unique_points(lines.geometry.values)}
)
segments = multipoints_to_line_segments(multipoints.geometry)
return segments.join(lines.drop(columns=geom_col))
multipoints = GeoSeries(extract_unique_points(lines.values), index=lines.index)
return multipoints_to_line_segments(multipoints)
def multipoints_to_line_segments(multipoints: GeoSeries) -> GeoDataFrame:
if not len(multipoints):
return GeoDataFrame({"geometry": multipoints}, index=multipoints.index)
try:
crs = multipoints.crs
except AttributeError:
crs = None
try:
point_df = multipoints.explode(index_parts=False)
except AttributeError:
points, indices = get_parts(multipoints, return_index=True)
if isinstance(multipoints.index, pd.MultiIndex):
indices = pd.MultiIndex.from_arrays(indices, names=multipoints.index.names)
point_df = pd.DataFrame({"geometry": GeometryArray(points)}, index=indices)
try:
point_df = point_df.to_frame("geometry")
except AttributeError:
pass
point_df["next"] = point_df.groupby(level=0)["geometry"].shift(-1)
first_points = point_df.loc[lambda x: ~x.index.duplicated(), "geometry"]
is_last_point = point_df["next"].isna()
point_df.loc[is_last_point, "next"] = first_points
assert point_df["next"].notna().all()
point_df["geometry"] = [
LineString([x1, x2])
for x1, x2 in zip(point_df["geometry"], point_df["next"], strict=False)
]
return GeoDataFrame(point_df.drop(columns=["next"]), geometry="geometry", crs=crs)
def points_to_line_segments(points: GeoDataFrame) -> GeoDataFrame:
points = points.copy()
points["next"] = points.groupby(level=0)["geometry"].shift(-1)
first_points = points.loc[lambda x: ~x.index.duplicated(), "geometry"]
is_last_point = points["next"].isna()
points.loc[is_last_point, "next"] = first_points
assert points["next"].notna().all()
points["geometry"] = [
LineString([x1, x2])
for x1, x2 in zip(points["geometry"], points["next"], strict=False)
]
return GeoDataFrame(
points.drop(columns=["next"]), geometry="geometry", crs=points.crs
)
[docs]
def explore_geosexception(
e: GEOSException, *gdfs: GeoDataFrame, logger: Any | None = None
) -> None:
"""Extract the coordinates of a GEOSException and show in map.
Args:
e: The exception thrown by a GEOS operation, which potentially contains coordinates information.
*gdfs: One or more GeoDataFrames to display for context in the map.
logger: An optional logger to log the error with visualization. If None, uses standard output.
"""
from ..maps.maps import Explore
from ..maps.maps import explore
pattern = r"(\d+\.\d+)\s+(\d+\.\d+)"
matches = re.findall(pattern, str(e))
coords_in_error_message = [(float(match[0]), float(match[1])) for match in matches]
exception_point = to_gdf(coords_in_error_message, crs=gdfs[0].crs)
if len(exception_point):
exception_point["wkt"] = exception_point.to_wkt()
if logger:
logger.error(
e, Explore(exception_point, *gdfs, mask=exception_point.buffer(100))
)
else:
explore(exception_point, *gdfs, mask=exception_point.buffer(100))
else:
if logger:
logger.error(e, Explore(*gdfs))
else:
explore(*gdfs)