Source code for sgis.geopandas_tools.cleaning

# %%
import warnings
from collections.abc import Callable
from typing import Any

import numpy as np
import pandas as pd
import shapely
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from numpy.typing import NDArray
from shapely import Geometry
from shapely import STRtree
from shapely import extract_unique_points
from shapely import get_coordinates
from shapely import linearrings
from shapely import polygons
from shapely.errors import GEOSException
from shapely.geometry import LinearRing
from shapely.geometry import LineString
from shapely.geometry import Point

try:
    import numba
except ImportError:

[docs] class numba: """Placeholder."""
[docs] @staticmethod def njit(func) -> Callable: """Placeholder that does nothing.""" def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper
from ..debug_config import _DEBUG_CONFIG from ..maps.maps import explore from .conversion import to_gdf from .conversion import to_geoseries from .duplicates import update_geometries from .general import clean_geoms from .geometry_types import make_all_singlepart from .geometry_types import to_single_geom_type from .overlay import clean_overlay from .polygon_operations import eliminate_by_longest from .polygon_operations import split_by_neighbors from .polygons_as_rings import PolygonsAsRings from .sfilter import sfilter from .sfilter import sfilter_inverse warnings.simplefilter(action="ignore", category=UserWarning) warnings.simplefilter(action="ignore", category=RuntimeWarning) PRECISION = 1e-3 BUFFER_RES = 50 # def explore(*args, **kwargs): # pass # def explore_locals(*args, **kwargs): # pass # def no_njit(func): # def wrapper(*args, **kwargs): # result = func(*args, **kwargs) # return result # return wrapper # numba.njit = no_njit
[docs] def coverage_clean( gdf: GeoDataFrame, tolerance: int | float, mask: GeoDataFrame | GeoSeries | Geometry | None = None, snap_to_anchors: bool = True, **kwargs, ) -> GeoDataFrame: """Fix thin gaps, holes, slivers and double surfaces. The operations might raise GEOSExceptions, so it might be nessecary to set the 'grid_sizes' argument, it might also be a good idea to run coverage_clean twice to fill gaps resulting from these GEOSExceptions. Rules: - Holes (interiors) thinner than the tolerance are closed. - Gaps between polygons are filled if thinner than the tolerance. - Sliver polygons thinner than the tolerance are eliminated into the neighbor polygon with the longest shared border. - Double surfaces thinner than the tolerance are eliminated. If duplicate_action is "fix", thicker double surfaces will be updated. - Line and point geometries are removed with no warning. - MultiPolygons and GeometryCollections are exploded to Polygons. - Index is reset. Args: gdf: GeoDataFrame to be cleaned. tolerance: distance (usually meters) used as the minimum thickness for polygons to be eliminated. Any gap, hole, sliver or double surface that are empty after a negative buffer of tolerance / 2 are eliminated into the neighbor with the longest shared border. mask: Mask to clip gdf to. snap_to_anchors: If True (default), snaps to anchor nodes in gdf. If False, only snaps to mask nodes (mask cannot be None in this case). **kwargs: Temporary backwards compatibility to avoid TypeErrors. Returns: A GeoDataFrame with cleaned polygons. """ if not len(gdf): return gdf gdf_original = gdf.copy() # more_than_one = get_num_geometries(gdf.geometry.values) > 1 # gdf.loc[more_than_one, gdf._geometry_column_name] = gdf.loc[ # more_than_one, gdf._geometry_column_name # ].apply(_unary_union_for_notna) if mask is not None: try: mask: GeoDataFrame = mask[["geometry"]].pipe(make_all_singlepart) except Exception: mask: GeoDataFrame = ( to_geoseries(mask).to_frame("geometry").pipe(make_all_singlepart) ) # mask: GeoDataFrame = close_all_holes( # dissexp_by_cluster(gdf[["geometry"]]) # ).pipe(make_all_singlepart) # mask = GeoDataFrame( # { # "geometry": [ # mask.union_all() # .buffer( # PRECISION, # resolution=1, # join_style=2, # ) # .buffer( # -PRECISION, # resolution=1, # join_style=2, # ) # ] # }, # crs=gdf.crs, # ).pipe(make_all_singlepart) # # gaps = shapely.union_all(get_gaps(mask).geometry.values) # # mask = shapely.get_parts(extract_unique_points(mask.geometry.values)) # # not_by_gaps = shapely.distance(mask, gaps) > PRECISION # # mask = GeoDataFrame({"geometry": mask[not_by_gaps]}) gdf = snap_polygons(gdf, tolerance, mask=mask, snap_to_anchors=snap_to_anchors) if mask is not None: missing_from_mask = clean_overlay( mask, gdf, how="difference", geom_type="polygon" ).loc[lambda x: x.buffer(-tolerance + PRECISION).is_empty] gdf, _ = eliminate_by_longest(gdf, missing_from_mask) missing_from_gdf = sfilter_inverse(gdf_original, gdf.buffer(-PRECISION)).loc[ lambda x: (~x.buffer(-PRECISION).is_empty) ] return pd.concat([gdf, missing_from_gdf], ignore_index=True).pipe( update_geometries, geom_type="polygon" )
def snap_polygons( gdf: GeoDataFrame, tolerance: int | float, mask: GeoDataFrame | GeoSeries | Geometry | None = None, snap_to_anchors: bool = True, ) -> GeoDataFrame: if not len(gdf): return gdf.copy() gdf_orig = gdf.copy() crs = gdf.crs gdf = ( clean_geoms(gdf) .pipe(make_all_singlepart, ignore_index=True) .pipe(to_single_geom_type, "polygon") ) gdf.crs = None gdf = gdf[lambda x: ~x.buffer(-tolerance / 2 - PRECISION).is_empty] # gdf = gdf[lambda x: ~x.buffer(-tolerance / 3).is_empty] # donuts_without_spikes = ( # gdf.geometry.buffer(tolerance / 2, resolution=1, join_style=2) # .buffer(-tolerance, resolution=1, join_style=2) # .buffer(tolerance / 2, resolution=1, join_style=2) # .pipe(to_lines) # .buffer(tolerance) # ) gdf.geometry = ( PolygonsAsRings(gdf.geometry.values) .apply_numpy_func( _snap_linearrings, kwargs=dict( tolerance=tolerance, mask=mask, snap_to_anchors=snap_to_anchors, ), ) .to_numpy() ) gdf = ( to_single_geom_type(make_all_singlepart(clean_geoms(gdf)), "polygon") .reset_index(drop=True) .set_crs(crs) ) missing = clean_overlay(gdf_orig, gdf, how="difference").loc[ lambda x: ~x.buffer(-tolerance / 2).is_empty ] if mask is None: mask = GeoDataFrame({"geometry": []}) explore( gdf, # gdf_orig, # thin, mask, missing, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(5.36765872, 59.01199837, 1), ) explore( gdf, gdf_orig, # thin, mask, missing, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(5.36820681, 59.01182298, 2), ) explore( gdf, gdf_orig, # thin, mask, missing, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(5.37327042, 59.01099359, 5), ) explore( gdf, gdf_orig, # thin, mask, missing, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(5.36853688, 59.01169013, 5), ) explore( gdf, # gdf_orig, missing, mask, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(5.37142966, 59.009799, 0.01), max_zoom=40, ) explore( gdf, # gdf_orig, missing, mask, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(5.36866312, 59.00842846, 0.01), max_zoom=40, ) explore( gdf, # gdf_orig, missing, mask, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(5.37707146, 59.01065274, 0.4), max_zoom=40, ) explore( gdf, # gdf_orig, missing, mask, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(-52074.0241, 6580847.4464, 0.1), max_zoom=40, ) explore( gdf, # gdf_orig, missing, mask, mask_p=to_gdf(mask.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), gdf_p=to_gdf(gdf.extract_unique_points().explode()).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), center=(5.38389153, 59.00548223, 1), max_zoom=40, ) # explore( # gdf_orig, # gdf, # dups=get_intersections(gdf, geom_type="polygon"), # msk=mask, # gaps=get_gaps(gdf), # updated=update_geometries(gdf, geom_type="polygon"), # # browser=False, # ) # gdf = update_geometries(gdf, geom_type="polygon") return gdf # .pipe(clean_clip, mask, geom_type="polygon") # @numba.njit def _snap_to_anchors( geoms, indices: NDArray[np.int32], anchors, anchor_indices, mask, mask_indices, was_midpoint, was_midpoint_mask, tolerance: int | float, ) -> tuple[NDArray, NDArray, NDArray]: coords, all_distances = _snap_to_anchors_inner( geoms, indices, anchors, anchor_indices, mask, mask_indices, was_midpoint, was_midpoint_mask, tolerance, ) not_inf = coords[:, 0] != np.inf all_distances = all_distances[not_inf] indices = indices[not_inf] coords = coords[not_inf] is_snapped = np.full(len(coords), False) n_coords = len(coords) range_indices = np.arange(len(coords)) range_index = -1 for index in np.unique(indices): cond = indices == index these_coords = coords[cond] # explore(ll=to_gdf(LineString(shapely.points(these_coords)), 25833)) # assert np.array_equal(these_coords[0], these_coords[-1]), these_coords these_range_indices = range_indices[cond] these_distances = all_distances[cond] for i in range(len(these_coords)): range_index += 1 if is_snapped[range_index]: print(i, "000") continue # distances = all_distances[range_index] distances = these_distances[i] # distances = these_distances[:, i] min_dist = np.min(distances) if min_dist > tolerance: # or min_dist == 0: print(i, "111", min_dist) continue is_snapped_now = False for j in np.argsort(distances): if distances[j] > tolerance: # TODO or distances[j] == 0: break if was_midpoint_mask[j]: continue anchor = anchors[j] ring = these_coords.copy() ring[i] = anchor # snap the nexts points to same anchor if neighboring points have same anchor # in order to properly check if the ring will be simple after snapping indices_with_same_anchor = [range_index] # these_coords = coords[indices==index] pos_counter = 0 # has_same_anchor_pos = True # has_same_anchor_neg = True while ( pos_counter + i < len(these_distances) - 1 ): # has_same_anchor_pos or has_same_anchor_neg: pos_counter += 1 # if indices[i + pos_counter] != index: # break # next_distances = all_distances[range_index + pos_counter] next_distances = these_distances[i + pos_counter] has_same_anchor_pos = False for j2 in np.argsort(next_distances): if was_midpoint_mask[j2]: continue if next_distances[j2] > tolerance: break has_same_anchor_pos = j2 == j # print( # "pos c", # i, # j, # j2, # pos_counter, # has_same_anchor_pos, # distances[j], # next_distances[j2], # ) break if has_same_anchor_pos: ring[i + pos_counter] = anchor indices_with_same_anchor.append(range_index + pos_counter) else: break # for j4 in np.arange( # indices_with_same_anchor[0], indices_with_same_anchor[-1] # ): # ring[j4 - range_index + i] = anchor # indices_with_same_anchor.append(j4) if i == 0: # snap points at the end of the line if same anchor neg_counter = 0 # has_same_anchor_neg = True while True: # has_same_anchor_pos or has_same_anchor_neg: neg_counter -= 1 # if indices[i + pos_counter] != index: # break this_range_index = these_range_indices[neg_counter] # next_distances = all_distances[this_range_index] next_distances = these_distances[neg_counter] has_same_anchor_neg = False for j3 in np.argsort(next_distances): if was_midpoint_mask[j3]: continue if next_distances[j3] > tolerance: break has_same_anchor_neg = j3 == j # print( # "neg c", # i, # j, # j3, # pos_counter, # # has_same_anchor, # distances[j], # next_distances[j3], # ) break if has_same_anchor_neg: ring[neg_counter] = anchor indices_with_same_anchor.append(this_range_index) else: break # for j5 in np.arange(0, indices_with_same_anchor[-1]): # ring[j5 - range_index + i] = anchor # indices_with_same_anchor.append(j5) indices_with_same_anchor = np.unique(indices_with_same_anchor) line_is_simple: bool = LineString(ring).is_simple # if i in [67, 68, 69, 173, 174, 175, 176, 177]: # or if Point(these_coords[i]).intersects( to_gdf([12.08375303, 67.50052183], 4326) .to_crs(25833) .buffer(10) .union_all() ): # for xxx, yyy in locals().items(): # if len(str(yyy)) > 50: # continue # print(xxx) # print(yyy) # print("prev:", was_midpoint_mask[j - 1]) # print(distances[np.argsort(distances)]) # print(anchors[np.argsort(distances)]) # print(ring) explore( out_coords=to_gdf( shapely.linestrings(coords, indices=indices), 25833 ), llll=to_gdf(LineString(ring), 25833), # this=to_gdf(this), # next_=to_gdf(next_), # line=to_gdf(LineString(np.array([this, next_])), 25833), geom=to_gdf(these_coords[i], 25833), prev=to_gdf(these_coords[i - 1], 25833), nxt=to_gdf(these_coords[i + 1], 25833), nxt2=to_gdf(these_coords[i + 2], 25833), anchor=to_gdf(anchor, 25833), # browser=True, ) # print( # "line_is_simple", line_is_simple, range_index, i, index, j # ) # , j2, j3, x) if not line_is_simple: # for j4 in range(len(ring)): # this_p = ring[j4] # for j5 in range(len(ring)): # that_p = ring[j5] # dist_ = np.sqrt( # (this_p[0] - that_p[0]) ** 2 # + (this_p[1] - that_p[1]) ** 2 # ) # if dist_ > 0 and dist_ < 1e-5: # print(this_p) # print(that_p) # ring[j5] = this_p print(LineString(ring).wkt) # explore( # out_coords=to_gdf( # shapely.linestrings(coords, indices=indices), 25833 # ), # llll=to_gdf(LineString(ring), 25833), # # this=to_gdf(this), # # next_=to_gdf(next_), # # line=to_gdf(LineString(np.array([this, next_])), 25833), # geom=to_gdf(these_coords[i], 25833), # prev=to_gdf(these_coords[i - 1], 25833), # nxt=to_gdf(these_coords[i + 1], 25833), # nxt2=to_gdf(these_coords[i + 2], 25833), # anchor=to_gdf(anchor, 25833), # # browser=True, # ) line_is_simple: bool = LineString(ring).is_simple if line_is_simple: # coords[i] = anchors[j] # is_snapped_to[j] = True # is_snapped[i] = True # explore( # out_coords=to_gdf( # shapely.linestrings(coords, indices=indices), 25833 # ), # llll=to_gdf(LineString(ring), 25833), # # this=to_gdf(this), # # next_=to_gdf(next_), # # line=to_gdf(LineString(np.array([this, next_])), 25833), # anc=to_gdf(anchors[j]), # geom=to_gdf(coords[i], 25833), # these=to_gdf(coords[i : i + n_points_with_same_anchor ], 25833), # prev=to_gdf(coords[i - 1], 25833), # prev2=to_gdf(coords[i - 2], 25833), # nxt=to_gdf(coords[i + n_points_with_same_anchor + 1], 25833), # nxt2=to_gdf(coords[i + n_points_with_same_anchor + 2], 25833), # nxt3=to_gdf(coords[i + n_points_with_same_anchor + 3], 25833), # ) # print(coords[i : i + n_points_with_same_anchor + 1]) for ( x ) in indices_with_same_anchor: # range(n_points_with_same_anchor): # print(range_index, i, index, j, j2, j3, x) coords[x] = anchor # s[j] is_snapped[x] = True # coords[i + x] = anchors[j] # is_snapped[i + x] = True # print(coords[i : i + n_points_with_same_anchor + 1]) is_snapped_now = True break # else: if not is_snapped_now: coords[range_index] = anchors[np.argmin(distances)] # is_snapped_to[np.argmin(distances)] = True if 0 and index == 0: # i > 30 and i < 40: print(i) explore( out_coords=to_gdf( shapely.linestrings(coords, indices=indices), 25833 ), llll=to_gdf(LineString(ring), 25833), pppp=to_gdf(shapely.points(ring), 25833).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), # this=to_gdf(this), # next_=to_gdf(next_), # line=to_gdf(LineString(np.array([this, next_])), 25833), anc=to_gdf(anchors[j]).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), geom=to_gdf(these_coords[i], 25833).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), # these=to_gdf( # these_coords[i : i + n_points_with_same_anchor], 25833 # ).assign(wkt=lambda x: [g.wkt for g in x.geometry]), prev=to_gdf(these_coords[i - 1], 25833).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), prev2=to_gdf(these_coords[i - 2], 25833).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), nxt=to_gdf(these_coords[i + 1], 25833).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), nxt2=to_gdf(these_coords[i + 2], 25833).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), nxt3=to_gdf(these_coords[i + 3], 25833).assign( wkt=lambda x: [g.wkt for g in x.geometry] ), # browser=True, # nxt_n=to_gdf( # coords[i + n_points_with_same_anchor + 1], 25833 # ).assign(wkt=lambda x: [g.wkt for g in x.geometry]), # nxt_n2=to_gdf( # coords[i + n_points_with_same_anchor + 2], 25833 # ).assign(wkt=lambda x: [g.wkt for g in x.geometry]), # nxt_n3=to_gdf( # coords[i + n_points_with_same_anchor + 3], 25833 # ).assign(wkt=lambda x: [g.wkt for g in x.geometry]), ) # if ( # indices[i] == 48 # ): # and int(out_coords[i][0]) == 375502 and int(out_coords[i][1]) == 7490104: # print(geom, out_coords[i], out_coords[-3:]) # xxx += 1 # if xxx > 100 and i >= 2106: # print(locals()) # explore( # geom=to_gdf(geom, 25833), # out=to_gdf(out_coords[i], 25833), # anc=to_gdf(shapely.points(anchors), 25833), # llll=to_gdf( # shapely.geometry.LineString( # np.array(out_coords)[indices[: len(out_coords)] == 48] # ), # 25833, # ), # ) return coords, indices @numba.njit def _snap_to_anchors_inner( geoms, indices: NDArray[np.int32], anchors, anchor_indices, mask, mask_indices, was_midpoint, was_midpoint_mask, tolerance: int | float, ) -> tuple[NDArray, NDArray, NDArray]: # def orientation(p, q, r): # # Calculate orientation of the triplet (p, q, r). # # 0 -> collinear, 1 -> clockwise, 2 -> counterclockwise # val = (q[1] - p[1]) * (r[0] - q[0]) - (q[0] - p[0]) * (r[1] - q[1]) # if val == 0: # return 0 # return 1 if val > 0 else 2 # def on_segment(p, q, r): # # Check if point q lies on line segment pr # if min(p[0], r[0]) <= q[0] <= max(p[0], r[0]) and min(p[1], r[1]) <= q[ # 1 # ] <= max(p[1], r[1]): # return True # return False # def check_intersection(line1, line2): # """ # Check if two line segments intersect. # Parameters: # line1 : np.array : 2x2 array with endpoints of the first line segment [[x1, y1], [x2, y2]] # line2 : np.array : 2x2 array with endpoints of the second line segment [[x3, y3], [x4, y4]] # Returns: # bool : True if the lines intersect, False otherwise. # """ # p1, q1 = line1 # p2, q2 = line2 # # Find the four orientations needed for the general and special cases # o1 = orientation(p1, q1, p2) # o2 = orientation(p1, q1, q2) # o3 = orientation(p2, q2, p1) # o4 = orientation(p2, q2, q1) # # General case # if o1 != o2 and o3 != o4: # return True # # Special cases # # p1, q1, p2 are collinear and p2 lies on segment p1q1 # if o1 == 0 and on_segment(p1, p2, q1): # return True # # p1, q1, q2 are collinear and q2 lies on segment p1q1 # if o2 == 0 and on_segment(p1, q2, q1): # return True # # p2, q2, p1 are collinear and p1 lies on segment p2q2 # if o3 == 0 and on_segment(p2, p1, q2): # return True # # p2, q2, q1 are collinear and q1 lies on segment p2q2 # if o4 == 0 and on_segment(p2, q1, q2): # return True # return False out_coords = geoms.copy() # is_snapped = np.full(len(geoms), False) n_anchors = len(anchors) mask_n_minus_1 = len(mask) - 1 is_snapped_to = np.full(len(anchors), False) out_distances = np.full((len(geoms), n_anchors), tolerance * 3) for i in range(len(geoms)): # if is_snapped[i]: # continue geom = geoms[i] index = indices[i] # if i == 0 or index != indices[i - 1]: # i_for_this_index = 0 # else: # i_for_this_index += 1 is_snapped = False for j in range(len(mask)): mask_index = mask_indices[j] is_last = j == mask_n_minus_1 or mask_index != mask_indices[j + 1] if is_last: continue mask_point0 = mask[j] # if ( # not mask_is_snapped_to[j] # and np.sqrt( # (geom[0] - mask_point0[0]) ** 2 + (geom[1] - mask_point0[1]) ** 2 # ) # <= tolerance # ): # out_coords[i] = mask_point0 # mask_is_snapped_to[j] = True # is_snapped = True # break mask_point1 = mask[j + 1] segment_vector = mask_point1 - mask_point0 point_vector = geom - mask_point0 segment_length_squared = np.dot(segment_vector, segment_vector) if segment_length_squared == 0: closest_point = mask_point0 else: factor = np.dot(point_vector, segment_vector) / segment_length_squared factor = max(0, min(1, factor)) closest_point = mask_point0 + factor * segment_vector if np.linalg.norm(geom - closest_point) == 0 and was_midpoint[i]: out_coords[i] = np.array([np.inf, np.inf]) is_snapped = True break if is_snapped: continue distances = np.full(n_anchors, tolerance * 3) for j2 in range(n_anchors): anchor = anchors[j2] # if anchor_indices[j] == index: # continue dist = np.sqrt((geom[0] - anchor[0]) ** 2 + (geom[1] - anchor[1]) ** 2) distances[j2] = dist out_distances[i, j2] = dist if dist == 0 and not was_midpoint_mask[j2]: break return out_coords, out_distances @numba.njit def _build_anchors( geoms: NDArray[np.float64], indices: NDArray[np.int32], mask_coords: NDArray[np.float64], mask_indices: NDArray[np.int32], was_midpoint_mask: NDArray[bool], tolerance: int | float, ): anchors = list(mask_coords) anchor_indices = list(mask_indices) is_anchor_arr = np.full(len(geoms), False) was_midpoint_mask = list(was_midpoint_mask) for i in np.arange(len(geoms)): geom = geoms[i] index = indices[i] # distances = [] # for j, anchor in zip(anchor_indices, anchors): is_anchor = True for j in range(len(anchors)): # if indices[i] != indices[j]: # if i != j and indices[i] != indices[j]: anchor = anchors[j] dist = np.sqrt((geom[0] - anchor[0]) ** 2 + (geom[1] - anchor[1]) ** 2) if dist <= tolerance: is_anchor = False break # distances.append(dist) # distances = np.array(distances) is_anchor_arr[i] = is_anchor if is_anchor: # not len(distances) or np.min(distances) > tolerance: anchors.append(geom) anchor_indices.append(index) was_midpoint_mask.append(True) return anchors, anchor_indices, is_anchor_arr, was_midpoint_mask @numba.njit def _add_last_points_to_end( coords: NDArray[np.float64], indices: NDArray[np.int32], ) -> tuple[ NDArray[np.float64], NDArray[np.int32], ]: out_coords, out_indices = [coords[0]], [indices[0]] last_coords = [] prev = coords[0] first_coords = prev n_minus_1 = len(coords) - 1 for i in np.arange(1, len(coords)): idx = indices[i] xy = coords[i] distance_to_prev: float = np.sqrt( (xy[0] - prev[0]) ** 2 + (xy[1] - prev[1]) ** 2 ) if idx != indices[i - 1]: first_coords = xy out_coords.append(xy) out_indices.append(idx) elif not distance_to_prev: if i == n_minus_1 or idx != indices[i + 1]: last_coords.append(xy) prev = xy continue elif i == n_minus_1 or idx != indices[i + 1]: out_coords.append(xy) out_coords.append(first_coords) out_indices.append(idx) out_indices.append(idx) last_coords.append(xy) else: out_coords.append(xy) out_indices.append(idx) prev = xy return (out_coords, out_indices) @numba.njit def _add_last_points_to_end_with_third_arr( coords: NDArray[np.float64], indices: NDArray[np.int32], third_arr: NDArray[Any], ) -> tuple[ NDArray[np.float64], NDArray[np.int32], NDArray[Any], ]: out_coords, out_indices, out_third_arr = [coords[0]], [indices[0]], [third_arr[0]] last_coords = [] prev = coords[0] first_coords = prev n_minus_1 = len(coords) - 1 for i in np.arange(1, len(coords)): idx = indices[i] xy = coords[i] distance_to_prev: float = np.sqrt( (xy[0] - prev[0]) ** 2 + (xy[1] - prev[1]) ** 2 ) if idx != indices[i - 1]: first_coords = xy out_coords.append(xy) out_indices.append(idx) out_third_arr.append(third_arr[i]) elif not distance_to_prev: if i == n_minus_1 or idx != indices[i + 1]: last_coords.append(xy) prev = xy continue elif i == n_minus_1 or idx != indices[i + 1]: out_coords.append(xy) out_coords.append(first_coords) out_indices.append(idx) out_indices.append(idx) last_coords.append(xy) out_third_arr.append(third_arr[i]) out_third_arr.append(third_arr[i]) else: out_coords.append(xy) out_indices.append(idx) out_third_arr.append(third_arr[i]) prev = xy return (out_coords, out_indices, out_third_arr) @numba.njit def _remove_duplicate_points( coords: NDArray[np.float64], indices: NDArray[np.int32], third_arr: NDArray[Any], ): out_coords, out_indices, out_third_arr = [coords[0]], [indices[0]], [third_arr[0]] prev = coords[0] for i in np.arange(1, len(coords)): idx = indices[i] xy = coords[i] distance_to_prev: float = np.sqrt( (xy[0] - prev[0]) ** 2 + (xy[1] - prev[1]) ** 2 ) if not distance_to_prev and idx == indices[i - 1]: prev = xy continue if idx != indices[i - 1]: out_coords.append(xy) out_indices.append(idx) out_third_arr.append(third_arr[i]) prev = xy continue out_coords.append(xy) out_indices.append(idx) out_third_arr.append(third_arr[i]) prev = xy return out_coords, out_indices, out_third_arr def _snap_linearrings( geoms: NDArray[LinearRing], tolerance: int | float, mask: GeoDataFrame | None, snap_to_anchors: bool = True, ): if not len(geoms): return geoms points = GeoDataFrame( { "geometry": extract_unique_points(geoms), "_geom_idx": np.arange(len(geoms)), } ).explode(ignore_index=True) coords = get_coordinates(points.geometry.values) indices = points["_geom_idx"].values if mask is not None: mask_coords, mask_indices = get_coordinates( mask.geometry.values, return_index=True ) is_anchor = np.full(len(mask_coords), False) mask_coords, mask_indices, is_anchor = _remove_duplicate_points( mask_coords, mask_indices, is_anchor ) mask_coords, mask_indices = _add_last_points_to_end(mask_coords, mask_indices) mask_coords = np.array(mask_coords) mask_indices = np.array(mask_indices) is_anchor = np.full(len(mask_coords), False) mask_coords, mask_indices, is_anchor = _remove_duplicate_points( mask_coords, mask_indices, is_anchor ) mask_coords = np.array(mask_coords) mask_indices = np.array(mask_indices) original_mask_buffered = shapely.buffer( shapely.linearrings(mask_coords, indices=mask_indices), tolerance * 1.1, ) mask_coords, mask_indices, was_midpoint_mask, _ = ( _add_midpoints_to_segments_numba( mask_coords, mask_indices, get_coordinates( sfilter( points.geometry.drop_duplicates(), original_mask_buffered, ) ), tolerance * 1.1, ) ) mask_coords = np.array(mask_coords) mask_indices = np.array(mask_indices) mask_indices = (mask_indices + 1) * -1 is_anchor = np.full(len(coords), False) coords, indices, is_anchor = _remove_duplicate_points(coords, indices, is_anchor) coords, indices = _add_last_points_to_end(coords, indices) coords = np.array(coords) indices = np.array(indices) is_anchor = np.full(len(coords), False) coords, indices, is_anchor = _remove_duplicate_points(coords, indices, is_anchor) coords = np.array(coords) indices = np.array(indices) # if 0: # coords, indices, was_midpoint, _ = _add_midpoints_to_segments_numba( # coords, # indices, # mask_coords, # tolerance * 1.1, # + PRECISION * 100, # ) # was_midpoint = np.array(was_midpoint) # coords, is_snapped_to = _snap_to_anchors( # coords, # indices, # mask_coords, # mask_indices, # mask_coords, # mask_indices, # was_midpoint, # was_midpoint_mask, # tolerance + PRECISION * 20, # ) # indices = np.array(indices) # coords = np.array(coords) # indices = indices[coords[:, 0] != np.inf] # coords = coords[coords[:, 0] != np.inf] if snap_to_anchors: if mask is None: mask_coords = [coords[0]] mask_indices = [indices[0]] was_midpoint_mask = [False] anchors, anchor_indices, is_anchor, was_midpoint_anchors = _build_anchors( coords, indices, mask_coords, mask_indices, was_midpoint_mask, tolerance + PRECISION, # * 100 ) anchors = np.array(anchors) anchor_indices = np.array(anchor_indices) # anchors = np.round(anchors, 3) else: anchors, anchor_indices, was_midpoint_anchors = ( mask_coords, mask_indices, was_midpoint_mask, ) coords, indices, was_midpoint, _ = _add_midpoints_to_segments_numba( coords, indices, anchors, tolerance * 1.1, ) was_midpoint = np.array(was_midpoint) coords_up_here000 = ( pd.Series(_coords_to_rings(np.array(coords), np.array(indices), geoms)) .loc[lambda x: x.notna()] .values ) coords_up_here000 = to_gdf(polygons(coords_up_here000), 25833) coords, indices, was_midpoint = _add_last_points_to_end_with_third_arr( coords, indices, was_midpoint ) coords, indices, was_midpoint = _remove_duplicate_points( coords, indices, was_midpoint ) coords = np.array(coords) indices = np.array(indices) was_midpoint = np.array(was_midpoint) coords_up_here = ( pd.Series(_coords_to_rings(coords, indices, geoms)) .loc[lambda x: x.notna()] .values ) coords_up_here = to_gdf(polygons(coords_up_here), 25833) explore( coords=to_gdf(shapely.points(coords), 25833).assign( idx=indices, wkt=lambda x: [g.wkt for g in x.geometry] ), anchors=to_gdf(shapely.points(anchors), 25833).assign( idx=anchor_indices, wkt=lambda x: [g.wkt for g in x.geometry] ), # , straight_distances=straight_distances, distances_to_lines=distances_to_lines), coords_up_here000=coords_up_here000, coords_up_here=coords_up_here, geoms=to_gdf(polygons(geoms), 25833), msk=to_gdf(shapely.points(mask_coords), 25833).assign( was_midpoint_mask=was_midpoint_mask ), # center=_DEBUG_CONFIG["center"], ) coords, indices = _snap_to_anchors( coords, indices, anchors, anchor_indices, mask_coords, mask_indices, was_midpoint, was_midpoint_anchors, tolerance + PRECISION * 100, ) indices = np.array(indices) coords = np.array(coords) indices = indices[coords[:, 0] != np.inf] coords = coords[coords[:, 0] != np.inf] # coords_up_here111 = ( # pd.Series(_coords_to_rings(coords, indices, geoms)) # .loc[lambda x: x.notna()] # .values # ) # coords_up_here111 = to_gdf(polygons(coords_up_here111), 25833) # if 0: # # coords = get_coordinates(points.geometry.values) # # indices = points["_geom_idx"].values # is_anchor = np.full(len(coords), False) # coords, indices, is_anchor = _remove_duplicate_points( # coords, indices, is_anchor # ) # coords, indices = _add_last_points_to_end(coords, indices) # coords = np.array(coords) # indices = np.array(indices) # is_anchor = np.full(len(coords), False) # coords, indices, is_anchor = _remove_duplicate_points( # coords, indices, is_anchor # ) # coords = np.array(coords) # indices = np.array(indices) # display(pd.DataFrame(coords, index=indices, columns=[*"xy"])) # if 0: # mask_coords, mask_indices, , dist_to_closest_geom = ( # _add_midpoints_to_segments_numba( # mask_coords, # mask_indices, # # coords, # get_coordinates( # sfilter( # GeoSeries(shapely.points(coords)).drop_duplicates(), # original_mask_buffered, # ) # ), # tolerance * 1.1, # ) # ) # mask_coords = np.array(mask_coords) # mask_indices = np.array(mask_indices) # anchors, anchor_indices, is_anchor = _build_anchors( # coords, # indices, # mask_coords, # mask_indices, # # is_anchor, # tolerance + PRECISION, # * 100 # ) # anchors = np.array(anchors) # anchor_indices = np.array(anchor_indices) # coords, indices, was_midpoint, _ = _add_midpoints_to_segments_numba( # coords, # indices, # anchors, # tolerance * 1.1, # + PRECISION * 100, # # GeoDataFrame({"geometry": shapely.points(coords), "_geom_idx": indices}), # # GeoDataFrame({"geometry": shapely.points(anchors)}), # # tolerance, # + PRECISION * 100, # # None, # ) # print(len(coords), len(anchors), len(was_midpoint)) # indices = np.array(indices) # coords = np.array(coords) # was_midpoint = np.array(was_midpoint) # coords, is_snapped_to = _snap_to_anchors( # coords, # indices, # anchors, # anchor_indices, # mask_coords, # mask_indices, # was_midpoint, # was_midpoint_anchors, # tolerance + PRECISION * 20, # ) # indices = np.array(indices) # coords = np.array(coords) # indices = indices[coords[:, 0] != np.inf] # coords = coords[coords[:, 0] != np.inf] # coords = np.array(coords) # indices = np.array(indices) coords_down_here = ( pd.Series(_coords_to_rings(coords, indices, geoms)) .loc[lambda x: x.notna()] .values ) lines_down_here = to_gdf(shapely.buffer(coords_down_here, 0.1), 25833) coords_down_here = to_gdf(polygons(coords_down_here), 25833) try: explore( coords=to_gdf(shapely.points(coords), 25833).assign( idx=indices, wkt=lambda x: [g.wkt for g in x.geometry] ), anchors=to_gdf(shapely.points(anchors), 25833).assign( idx=anchor_indices, wkt=lambda x: [g.wkt for g in x.geometry] ), # , straight_distances=straight_distances, distances_to_lines=distances_to_lines), coords_up_here000=coords_up_here000, coords_up_here=coords_up_here, coords_down_here=coords_down_here, lines_down_here=lines_down_here, geoms=to_gdf(polygons(geoms), 25833), msk=to_gdf(shapely.points(mask_coords), 25833).assign( was_midpoint_mask=was_midpoint_mask ), ) explore( coords=to_gdf(shapely.points(coords), 25833).assign( idx=indices, wkt=lambda x: [g.wkt for g in x.geometry] ), anchors=to_gdf(shapely.points(anchors), 25833).assign( idx=anchor_indices, wkt=lambda x: [g.wkt for g in x.geometry] ), # , straight_distances=straight_distances, distances_to_lines=distances_to_lines), coords_up_here000=coords_up_here000, coords_up_here=coords_up_here, coords_down_here=coords_down_here, lines_down_here=lines_down_here, geoms=to_gdf(polygons(geoms), 25833), msk=to_gdf(shapely.points(mask_coords), 25833).assign( was_midpoint_mask=was_midpoint_mask ), center=(5.37707159, 59.01065276, 1), ) explore( coords=to_gdf(shapely.points(coords), 25833).assign( idx=indices, wkt=lambda x: [g.wkt for g in x.geometry] ), anchors=to_gdf(shapely.points(anchors), 25833).assign( idx=anchor_indices, wkt=lambda x: [g.wkt for g in x.geometry] ), # , straight_distances=straight_distances, distances_to_lines=distances_to_lines), coords_up_here000=coords_up_here000, coords_up_here=coords_up_here, coords_down_here=coords_down_here, lines_down_here=lines_down_here, geoms=to_gdf(polygons(geoms), 25833), msk=to_gdf(shapely.points(mask_coords), 25833).assign( was_midpoint_mask=was_midpoint_mask ), center=(5.37419946, 59.01138812, 15), ) explore( coords=to_gdf(shapely.points(coords), 25833).assign( idx=indices, wkt=lambda x: [g.wkt for g in x.geometry] ), anchors=to_gdf(shapely.points(anchors), 25833).assign( idx=anchor_indices, wkt=lambda x: [g.wkt for g in x.geometry] ), # , straight_distances=straight_distances, distances_to_lines=distances_to_lines), coords_up_here000=coords_up_here000, coords_up_here=coords_up_here, lines_down_here=lines_down_here, coords_down_here=coords_down_here, geoms=to_gdf(polygons(geoms), 25833), msk=to_gdf(shapely.points(mask_coords), 25833).assign( was_midpoint_mask=was_midpoint_mask ), center=(5.38389153, 59.00548223, 1), ) explore( coords=to_gdf(shapely.points(coords), 25833).assign( idx=indices, wkt=lambda x: [g.wkt for g in x.geometry] ), anchors=to_gdf(shapely.points(anchors), 25833).assign( idx=anchor_indices, wkt=lambda x: [g.wkt for g in x.geometry] ), # , straight_distances=straight_distances, distances_to_lines=distances_to_lines), coords_up_here000=coords_up_here000, coords_up_here=coords_up_here, coords_down_here=coords_down_here, lines_down_here=lines_down_here, geoms=to_gdf(polygons(geoms), 25833), msk=to_gdf(shapely.points(mask_coords), 25833).assign( was_midpoint_mask=was_midpoint_mask ), center=_DEBUG_CONFIG["center"], ) except GEOSException as e: print(e) return _coords_to_rings(coords, indices, geoms) def _coords_to_rings( coords: NDArray[np.float64], indices: NDArray[np.int32], original_geoms: NDArray[LinearRing], ) -> NDArray[LinearRing]: df = pd.DataFrame({"x": coords[:, 0], "y": coords[:, 1]}, index=indices).loc[ lambda x: x.groupby(level=0).size() > 2 ] to_int_idx = {idx: i for i, idx in enumerate(df.index.unique())} rings = pd.Series( linearrings(df.values, indices=df.index.map(to_int_idx)), index=df.index.unique(), ) missing = pd.Series( index=pd.Index(range(len(original_geoms))).difference(rings.index) ) return pd.concat([rings, missing]).sort_index().values @numba.njit def _add_midpoints_to_segments_numba( geoms: NDArray[np.float64], indices: NDArray[np.int32], anchors: NDArray[np.float64], tolerance: int | float, ): n_minus_1 = len(geoms) - 1 out_coords = [] out_indices = [] was_midpoint = [] out_distances = [] for i in range(len(geoms)): index = indices[i] is_last = i == n_minus_1 or index != indices[i + 1] if is_last: continue geom0 = geoms[i] geom1 = geoms[i + 1] closest_points = np.full((len(anchors) + 2, 2), np.inf) these_out_distances = np.full(len(anchors) + 2, np.inf) closest_points[-1] = geom1 closest_points[-2] = geom0 these_out_distances[-1] = 0 these_out_distances[-2] = 0 segment_vector = geom1 - geom0 segment_length_squared = np.dot(segment_vector, segment_vector) for j in range(len(anchors)): anchor = anchors[j] if segment_length_squared == 0: closest_point = geom0 else: point_vector = anchor - geom0 factor = np.dot(point_vector, segment_vector) / segment_length_squared factor = max(0, min(1, factor)) if factor < 1e-6: closest_point = geom0 elif factor > 1 - 1e-6: closest_point = geom1 else: closest_point = geom0 + factor * segment_vector dist = np.linalg.norm(anchor - closest_point) if dist <= tolerance and dist > PRECISION: closest_points[j] = closest_point these_out_distances[j] = dist # if ( # closest_point[0] == 905049.3317999999 # ): # and int(closest_point[1]) == 7877676: # print() # for xxx in closest_point: # print(xxx) # for xxx in geom0: # print(xxx) # for xxx in geom1: # print(xxx) # for xxx, yyy in locals().items(): # print(xxx, yyy) # ssss not_inf = closest_points[:, 0] != np.inf arr = closest_points[not_inf] these_out_distances = these_out_distances[not_inf] # sort by first and second column # could have used np.lexsort, but it's not numba compatible arr = arr[np.argsort(arr[:, 0])] any_unsorted = True while any_unsorted: any_unsorted = False for i in range(len(arr) - 1): if arr[i, 0] < arr[i + 1, 0]: continue if arr[i, 1] > arr[i + 1, 1]: copied = arr[i].copy() arr[i] = arr[i + 1] arr[i + 1] = copied copied = these_out_distances[i] these_out_distances[i] = these_out_distances[i + 1] these_out_distances[i + 1] = copied any_unsorted = True with_midpoints = [] these_out_distances2 = [] first_is_added = False last_is_added = False is_reverse = False for y in range(len(arr)): point = arr[y] if ( not first_is_added and np.sqrt((geom0[0] - point[0]) ** 2 + (geom0[1] - point[1]) ** 2) == 0 ): first_is_added = True with_midpoints.append(point) these_out_distances2.append(these_out_distances[y]) if last_is_added: is_reverse = True break else: continue elif ( not last_is_added and np.sqrt((geom1[0] - point[0]) ** 2 + (geom1[1] - point[1]) ** 2) == 0 ): last_is_added = True with_midpoints.append(point) these_out_distances2.append(these_out_distances[y]) if not first_is_added: is_reverse = True continue else: with_midpoints.append(point) break if first_is_added or last_is_added: with_midpoints.append(point) these_out_distances2.append(these_out_distances[y]) # these_out_distances2.append(these_out_distances[y]) # these_anchors2.append(these_anchors[y]) # with_midpoints = np.array(with_midpoints) if is_reverse: with_midpoints = with_midpoints[::-1] these_out_distances2 = these_out_distances2[::-1] # these_anchors2 = these_anchors2[::-1] # print(index, is_reverse, arr) # print(with_midpoints) # print(to_gdf(LineString([geom0, geom1]), 25833)) # print(to_gdf(shapely.points(closest_points))) # explore( # to_gdf(shapely.points(with_midpoints)).assign( # idx=lambda x: range(len(x)) # ), # "idx", # ) # explore( # l=to_gdf(LineString([geom0, geom1]), 25833), # # anchors=to_gdf(shapely.points(anchors)), # # anchors_in_dist=to_gdf(shapely.points(these_anchors)), # # closest_points=to_gdf(shapely.points(closest_points)), # with_midpoints=to_gdf(shapely.points(with_midpoints)), # anchors=to_gdf(shapely.points(anchors)), # arr=to_gdf(shapely.points(arr)), # # center=(-0.07034028, 1.80337784, 0.4), # ) with_midpoints_no_dups = [] these_out_distances_no_dups = [] for y2 in range(len(with_midpoints)): point = with_midpoints[y2] should_be_added = True for z in range(len(with_midpoints_no_dups)): out_point = with_midpoints_no_dups[z] if ( np.sqrt( (point[0] - out_point[0]) ** 2 + (out_point[1] - point[1]) ** 2 ) == 0 ): should_be_added = False break if should_be_added: with_midpoints_no_dups.append(point) these_out_distances_no_dups.append(these_out_distances2[y2]) n_minus_1_midpoints = len(with_midpoints_no_dups) - 1 for y3 in range(len(with_midpoints_no_dups)): point = with_midpoints_no_dups[y3] should_be_added = True for z2 in np.arange(len(out_coords))[::-1]: if out_indices[z2] != index: continue out_point = out_coords[z2] if ( np.sqrt( (point[0] - out_point[0]) ** 2 + (out_point[1] - point[1]) ** 2 ) == 0 ): should_be_added = False break if not should_be_added: continue out_coords.append(point) out_indices.append(index) out_distances.append(these_out_distances_no_dups[y3]) if y3 == 0 or y3 == n_minus_1_midpoints: was_midpoint.append(False) else: was_midpoint.append(True) return ( out_coords, out_indices, was_midpoint, out_distances, ) def _separate_single_neighbored_from_multi_neighoured_geometries( gdf: GeoDataFrame, neighbors: GeoDataFrame ) -> tuple[GeoDataFrame, GeoDataFrame]: """Split GeoDataFrame in two: those with 0 or 1 neighbors and those with 2 or more. Because single-neighbored polygons does not need splitting. """ tree = STRtree(neighbors.geometry.values) left, right = tree.query(gdf.geometry.values, predicate="intersects") pairs = pd.Series(right, index=left) has_more_than_one_neighbor = ( pairs.groupby(level=0).size().loc[lambda x: x > 1].index ) more_than_one_neighbor = gdf.iloc[has_more_than_one_neighbor] one_or_zero_neighbors = gdf.iloc[ pd.Index(range(len(gdf))).difference(has_more_than_one_neighbor) ] return one_or_zero_neighbors, more_than_one_neighbor def split_and_eliminate_by_longest( gdf: GeoDataFrame | tuple[GeoDataFrame], to_eliminate: GeoDataFrame, tolerance: float | int, ignore_index: bool = False, **kwargs, ) -> tuple[GeoDataFrame]: if isinstance(gdf, (list, tuple)): # concat, then break up the dataframes in the end was_multiple_gdfs = True original_cols = [df.columns for df in gdf] gdf = pd.concat(df.assign(**{"_df_idx": i}) for i, df in enumerate(gdf)) else: was_multiple_gdfs = False if 0: to_eliminate.geometry = to_eliminate.buffer( -PRECISION, resolution=1, join_style=2, ).buffer( PRECISION, resolution=1, join_style=2, ) to_eliminate = to_eliminate.loc[lambda x: ~x.is_empty] # now to split polygons to be eliminated to avoid weird shapes # split only the polygons with multiple neighbors single_neighbored, multi_neighbored = ( _separate_single_neighbored_from_multi_neighoured_geometries(to_eliminate, gdf) ) multi_neighbored = split_by_neighbors(multi_neighbored, gdf, tolerance=tolerance) to_eliminate = pd.concat([multi_neighbored, single_neighbored]) gdf, isolated = eliminate_by_longest( gdf, to_eliminate, ignore_index=ignore_index, **kwargs ) if not was_multiple_gdfs: return gdf, isolated gdfs = () for i, cols in enumerate(original_cols): df = gdf.loc[gdf["_df_idx"] == i, cols] gdfs += (df,) gdfs += (isolated,) return gdfs