Source code for sgis.raster.image_collection

import datetime
import functools
import glob
import itertools
import os
import random
import re
import time
from abc import abstractmethod
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import ClassVar

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyproj
import rasterio
from affine import Affine
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from pandas.api.types import is_dict_like
from rasterio.enums import MergeAlg
from scipy import stats
from scipy.ndimage import binary_dilation
from scipy.ndimage import binary_erosion
from shapely import Geometry
from shapely import box
from shapely import unary_union
from shapely.geometry import MultiPolygon
from shapely.geometry import Point
from shapely.geometry import Polygon

try:
    import dapla as dp
except ImportError:
    pass


try:
    from google.auth import exceptions
except ImportError:

[docs] class exceptions: """Placeholder."""
[docs] class RefreshError(Exception): """Placeholder."""
try: from gcsfs.core import GCSFile except ImportError:
[docs] class GCSFile: """Placeholder."""
try: from rioxarray.exceptions import NoDataInBounds from rioxarray.merge import merge_arrays from rioxarray.rioxarray import _generate_spatial_coords except ImportError: pass try: from xarray import DataArray from xarray import Dataset from xarray import combine_by_coords except ImportError:
[docs] class DataArray: """Placeholder."""
[docs] class Dataset: """Placeholder."""
def combine_by_coords(*args, **kwargs) -> None: raise ImportError("xarray") from ..geopandas_tools.bounds import get_total_bounds from ..geopandas_tools.conversion import to_bbox from ..geopandas_tools.conversion import to_gdf from ..geopandas_tools.conversion import to_geoseries from ..geopandas_tools.conversion import to_shapely from ..geopandas_tools.general import get_common_crs from ..helpers import _fix_path from ..helpers import get_all_files from ..helpers import get_numpy_func from ..helpers import is_method from ..helpers import is_property from ..io._is_dapla import is_dapla from ..io.opener import opener from . import sentinel_config as config from .base import _array_to_geojson from .base import _gdf_to_arr from .base import _get_res_from_bounds from .base import _get_shape_from_bounds from .base import _get_transform_from_bounds from .base import _res_as_tuple from .base import get_index_mapper from .indices import ndvi from .regex import _extract_regex_match_from_string from .regex import _get_first_group_match from .regex import _get_non_optional_groups from .regex import _get_regexes_matches_for_df from .regex import _RegexError from .zonal import _aggregate from .zonal import _make_geometry_iterrows from .zonal import _no_overlap_df from .zonal import _prepare_zonal from .zonal import _zonal_post if is_dapla(): def _ls_func(*args, **kwargs) -> list[str]: return dp.FileClient.get_gcs_file_system().ls(*args, **kwargs) def _glob_func(*args, **kwargs) -> list[str]: return dp.FileClient.get_gcs_file_system().glob(*args, **kwargs) def _open_func(*args, **kwargs) -> GCSFile: return dp.FileClient.get_gcs_file_system().open(*args, **kwargs) def _read_parquet_func(*args, **kwargs) -> list[str]: return dp.read_pandas(*args, **kwargs) else: _ls_func = functools.partial(get_all_files, recursive=False) _open_func = open _glob_func = glob.glob _read_parquet_func = pd.read_parquet DATE_RANGES_TYPE = ( tuple[str | pd.Timestamp | None, str | pd.Timestamp | None] | tuple[tuple[str | pd.Timestamp | None, str | pd.Timestamp | None], ...] ) DEFAULT_FILENAME_REGEX = r""" .*? (?:_?(?P<date>\d{8}(?:T\d{6})?))? # Optional underscore and date group .*? (?:_?(?P<band>B\d{1,2}A|B\d{1,2}))? # Optional underscore and band group \.(?:tif|tiff|jp2)$ # End with .tif, .tiff, or .jp2 """ DEFAULT_IMAGE_REGEX = r""" .*? (?:_?(?P<date>\d{8}(?:T\d{6})?))? # Optional underscore and date group """ ALLOWED_INIT_KWARGS = [ "image_class", "band_class", "image_regexes", "filename_regexes", "all_bands", "crs", "masking", "_merged", "date", ] _LOAD_COUNTER: int = 0 def _get_child_paths_threaded(data: Sequence[str]) -> set[str]: with ThreadPoolExecutor() as executor: all_paths: Iterator[set[str]] = executor.map(_ls_func, data) return set(itertools.chain.from_iterable(all_paths))
[docs] @dataclass class PixelwiseResults: """Container of results from pixelwise operation to be converted.""" row_indices: np.ndarray col_indices: np.ndarray results: list[Any] res: int | tuple[int, int] bounds: tuple[float, float, float, float] shape: tuple[int, int] crs: Any nodata: int | float | None
[docs] def to_tuple(self) -> tuple[int, int, Any]: """Return 3-length tuple of row indices, column indices and pixelwise results.""" return self.row_indices, self.col_indices, self.results
[docs] def to_dict(self) -> dict[tuple[int, int], Any]: """Return dictionary with row and column indices as keys and pixelwise results as values.""" return { (int(row), int(col)): value for row, col, value in zip( self.row_indices, self.col_indices, self.results, strict=True ) }
[docs] def to_geopandas(self, column: str = "value") -> GeoDataFrame: """Return GeoDataFrame with pixel geometries and values from the pixelwise operation.""" minx, miny = self.bounds[:2] resx, resy = _res_as_tuple(self.res) minxs = np.full(self.row_indices.shape, minx) + (minx * self.row_indices * resx) minys = np.full(self.col_indices.shape, miny) + (miny * self.col_indices * resy) maxxs = minxs + resx maxys = minys + resy return GeoDataFrame( { column: self.results, "geometry": [ box(minx, miny, maxx, maxy) for minx, miny, maxx, maxy in zip( minxs, minys, maxxs, maxys, strict=True ) ], }, index=[self.row_indices, self.col_indices], crs=self.crs, )
[docs] def to_numpy(self) -> np.ndarray | tuple[np.ndarray, ...]: """Reshape pixelwise results to 2d numpy arrays in the shape of the full arrays of the image bands.""" try: n_out_arrays = len(next(iter(self.results))) except TypeError: n_out_arrays = 1 out_arrays = [ np.full(self.shape, self.nodata).astype(np.float64) for _ in range(n_out_arrays) ] for row, col, these_results in zip( self.row_indices, self.col_indices, self.results, strict=True ): if these_results is None: continue for i, arr in enumerate(out_arrays): try: arr[row, col] = these_results[i] except TypeError: arr[row, col] = these_results for i, array in enumerate(out_arrays): all_are_integers = np.all(np.mod(array, 1) == 0) if all_are_integers: out_arrays[i] = array.astype(int) if len(out_arrays) == 1: return out_arrays[0] return tuple(out_arrays)
[docs] class ImageCollectionGroupBy: """Iterator and merger class returned from groupby. Can be iterated through like pandas.DataFrameGroupBy. Or use the methods merge_by_band or merge. """ def __init__( self, data: Iterable[tuple[Any], "ImageCollection"], by: list[str], collection: "ImageCollection", ) -> None: """Initialiser. Args: data: Iterable of group values and ImageCollection groups. by: list of group attributes. collection: Ungrouped ImageCollection. Used to pass attributes to outputs. """ self.data = list(data) self.by = by self.collection = collection
[docs] def merge_by_band( self, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, method: str | Callable = "mean", as_int: bool = True, indexes: int | tuple[int] | None = None, **kwargs, ) -> "ImageCollection": """Merge each group into separate Bands per band_id, returned as an ImageCollection.""" images = self._run_func_for_collection_groups( _merge_by_band, method=method, bounds=bounds, as_int=as_int, indexes=indexes, **kwargs, ) for img, (group_values, _) in zip(images, self.data, strict=True): for attr, group_value in zip(self.by, group_values, strict=True): try: setattr(img, attr, group_value) except AttributeError: setattr(img, f"_{attr}", group_value) collection = ImageCollection( images, level=self.collection.level, **self.collection._common_init_kwargs, ) collection._merged = True return collection
[docs] def merge( self, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, method: str | Callable = "mean", as_int: bool = True, indexes: int | tuple[int] | None = None, **kwargs, ) -> "Image": """Merge each group into a single Band, returned as combined Image.""" bands: list[Band] = self._run_func_for_collection_groups( _merge, method=method, bounds=bounds, as_int=as_int, indexes=indexes, **kwargs, ) for band, (group_values, _) in zip(bands, self.data, strict=True): for attr, group_value in zip(self.by, group_values, strict=True): try: setattr(band, attr, group_value) except AttributeError: if hasattr(band, f"_{attr}"): setattr(band, f"_{attr}", group_value) if "band_id" in self.by: for band in bands: assert band.band_id is not None image = Image( bands, **self.collection._common_init_kwargs, ) image._merged = True return image
def _run_func_for_collection_groups(self, func: Callable, **kwargs) -> list[Any]: if self.collection.processes == 1: return [func(group, **kwargs) for _, group in self] processes = min(self.collection.processes, len(self)) if processes == 0: return [] with joblib.Parallel(n_jobs=processes, backend="threading") as parallel: return parallel(joblib.delayed(func)(group, **kwargs) for _, group in self) def __iter__(self) -> Iterator[tuple[tuple[Any, ...], "ImageCollection"]]: """Iterate over the group values and the ImageCollection groups themselves.""" return iter(self.data) def __len__(self) -> int: """Number of ImageCollection groups.""" return len(self.data) def __repr__(self) -> str: """String representation.""" return f"{self.__class__.__name__}({len(self)}, by={self.by})"
[docs] @dataclass(frozen=True) class BandMasking: """Frozen dict with forced keys.""" band_id: str values: Sequence[int] | dict[int, Any] def __getitem__(self, item: str) -> Any: """Index into attributes to mimick dict.""" return getattr(self, item)
[docs] class None_: """Default None for args that are not allowed to be None.""" def __new__(cls) -> None: """Always returns None.""" return None
class _ImageBase: image_regexes: ClassVar[str | None] = (DEFAULT_IMAGE_REGEX,) filename_regexes: ClassVar[str | tuple[str]] = (DEFAULT_FILENAME_REGEX,) metadata_attributes: ClassVar[dict | None] = None masking: ClassVar[BandMasking | None] = None def __init__(self, *, metadata=None, bbox=None, **kwargs) -> None: self._bounds = None self._path = None self._bbox = to_bbox(bbox) if bbox is not None else None self.metadata_attributes = self.metadata_attributes or {} if metadata is not None: self.metadata = self._metadata_to_nested_dict(metadata) else: self.metadata = {} self.image_patterns = self._compile_regexes("image_regexes") self.filename_patterns = self._compile_regexes("filename_regexes") for key, value in kwargs.items(): error_obj = ValueError( f"{self.__class__.__name__} got an unexpected keyword argument '{key}'" ) if key in ALLOWED_INIT_KWARGS and key in dir(self): self._safe_setattr(key, value, error_obj) else: raise error_obj # attributes for debugging self._metadata_from_xml = False self._merged = False self._from_array = False self._from_geopandas = False def _safe_setattr( self, key: str, value: Any, error_obj: Exception | None = None ) -> None: if is_property(self, key): setattr(self, f"_{key}", value) elif is_method(self, key): if error_obj is None: raise AttributeError(f"Cannot set method '{key}'.") raise error_obj else: setattr(self, key, value) def _compile_regexes(self, regex_attr: str) -> tuple[re.Pattern]: regexes: tuple[str] | str = getattr(self, regex_attr) if not regexes: return () if isinstance(regexes, str): regexes = (regexes,) return tuple(re.compile(regexes, flags=re.VERBOSE) for regexes in regexes) @staticmethod def _metadata_to_nested_dict( metadata: str | Path | os.PathLike | dict | pd.DataFrame | None, ) -> dict[str, dict[str, Any]]: """Construct metadata dict from dictlike, DataFrame or file path. Extract metadata value: >>> self.metadata[self.path]['cloud_cover_percentage'] """ if isinstance(metadata, (str | Path | os.PathLike)): metadata = _read_parquet_func(metadata) if isinstance(metadata, pd.DataFrame): def is_scalar(x) -> bool: """Check if scalar because 'truth value of Series is ambigous'.""" return not hasattr(x, "__len__") or len(x) <= 1 def na_to_none(x) -> None: """Convert to None rowwise because pandas doesn't always.""" return x if not (is_scalar(x) and pd.isna(x)) else None # to nested dict because pandas indexing gives rare KeyError with long strings return { _fix_path(path): { attr: na_to_none(value) for attr, value in row.items() } for path, row in metadata.iterrows() } elif is_dict_like(metadata): return {_fix_path(path): value for path, value in metadata.items()} # try to allow custom types with dict-like indexing return metadata @property def _common_init_kwargs(self) -> dict: return { "processes": self.processes, "res": self.res, "bbox": self._bbox, "nodata": self.nodata, "metadata": self.metadata, } @property def path(self) -> str: try: return self._path except AttributeError as e: raise PathlessImageError(self) from e @property def res(self) -> int: """Pixel resolution.""" # if self._res is None: # if self.has_array: # self._res = _get_res_from_bounds(self.bounds, self.values.shape) # else: # with opener(self.path) as file: # with rasterio.open(file) as src: # self._res = src.res return self._res @abstractmethod def union_all(self) -> Polygon | MultiPolygon: pass def assign(self, **kwargs) -> "_ImageBase": for key, value in kwargs.items(): self._safe_setattr(key, value) return self def _name_regex_searcher( self, group: str, patterns: tuple[re.Pattern] ) -> str | None: if not patterns or not any(pat.groups for pat in patterns): return None for pat in patterns: try: return _get_first_group_match(pat, self.name)[group] except (TypeError, KeyError): pass if isinstance(self, Band): for pat in patterns: try: return _get_first_group_match( pat, str(Path(self.path).parent.name) )[group] except (TypeError, KeyError): pass if not any(group in _get_non_optional_groups(pat) for pat in patterns): return None band_text = ( f" or {Path(self.path).parent.name!s}" if isinstance(self, Band) else "" ) raise ValueError( f"Couldn't find group '{group}' in name {self.name}{band_text} with regex patterns {patterns}" ) def _create_metadata_df(self, file_paths: Sequence[str]) -> pd.DataFrame: """Create a dataframe with file paths and image paths that match regexes. Used in __init__ to select relevant paths fast. """ df = pd.DataFrame({"file_path": list(file_paths)}) df["file_name"] = df["file_path"].apply(lambda x: Path(x).name) df["image_path"] = df["file_path"].apply( lambda x: _fix_path(str(Path(x).parent)) ) if not len(df): return df df = df[~df["file_path"].isin(df["image_path"])] if self.filename_patterns: df = _get_regexes_matches_for_df(df, "file_name", self.filename_patterns) if not len(df): return df grouped = df.drop_duplicates("image_path").set_index("image_path") for col in ["file_path", "file_name"]: if col in df: grouped[col] = df.groupby("image_path")[col].apply(tuple) grouped = grouped.reset_index() else: df["file_path"] = df.groupby("image_path")["file_path"].apply(tuple) df["file_name"] = df.groupby("image_path")["file_name"].apply(tuple) grouped = df.drop_duplicates("image_path") grouped["imagename"] = grouped["image_path"].apply( lambda x: _fix_path(Path(x).name) ) if self.image_patterns and len(grouped): grouped = _get_regexes_matches_for_df( grouped, "imagename", self.image_patterns ) return grouped def copy(self) -> "_ImageBase": """Copy the instance and its attributes.""" copied = deepcopy(self) for key, value in copied.__dict__.items(): try: setattr(copied, key, value.copy()) except AttributeError: setattr(copied, key, deepcopy(value)) except TypeError: continue return copied def equals(self, other) -> bool: for key, value in self.__dict__.items(): if key.startswith("_"): continue if value != getattr(other, key): print(key, value, getattr(other, key)) return False return True class _ImageBandBase(_ImageBase): """Common parent class of Image and Band.""" def intersects( self, geometry: GeoDataFrame | GeoSeries | Geometry | tuple | _ImageBase ) -> bool: if hasattr(geometry, "crs") and not pyproj.CRS(self.crs).equals( pyproj.CRS(geometry.crs) ): raise ValueError(f"crs mismatch: {self.crs} and {geometry.crs}") return self.union_all().intersects(to_shapely(geometry)) def union_all(self) -> Polygon: try: return box(*self.bounds) except TypeError: return Polygon() @property def centroid(self) -> Point: """Centerpoint of the object.""" return self.union_all().centroid @property def year(self) -> str: if hasattr(self, "_year") and self._year: return self._year return str(self.date)[:4] @property def month(self) -> str: if hasattr(self, "_month") and self._month: return self._month return str(self.date).replace("-", "").replace("/", "")[4:6] @property def name(self) -> str | None: if hasattr(self, "_name") and self._name is not None: return self._name try: return Path(self.path).name except (ValueError, AttributeError, TypeError): return None @name.setter def name(self, value) -> None: self._name = value @property def stem(self) -> str | None: try: return Path(self.path).stem except (AttributeError, ValueError, TypeError): return None @property def level(self) -> str: return self._name_regex_searcher("level", self.image_patterns) def _get_metadata_attributes(self, metadata_attributes: dict) -> dict: """Search through xml files for missing metadata attributes.""" self._metadata_from_xml = True missing_metadata_attributes = { attr: constructor_func for attr, constructor_func in metadata_attributes.items() if not hasattr(self, attr) or getattr(self, attr) is None } nonmissing_metadata_attributes = { attr: getattr(self, attr) for attr in metadata_attributes if attr not in missing_metadata_attributes } if not missing_metadata_attributes: return nonmissing_metadata_attributes # read all xml content once file_contents: dict[str, str] = {} for path in self._all_file_paths: if ".xml" not in path: continue with _open_func(path, "rb") as file: file_contents[path] = file.read().decode("utf-8") def is_last_xml(i: int) -> bool: return i == len(file_contents) - 1 for attr, value in missing_metadata_attributes.items(): results = None for i, file_content in enumerate(file_contents.values()): if isinstance(value, str) and value in dir(self): # method or a hardcoded value value: Callable | Any = getattr(self, value) if callable(value): try: results = value(file_content) except _RegexError as e: if is_last_xml(i): raise e.__class__(self.path, list(file_contents), e) from e continue if results is not None: break elif ( isinstance(value, str) or hasattr(value, "__iter__") and all(isinstance(x, str | re.Pattern) for x in value) ): try: results = _extract_regex_match_from_string(file_content, value) except _RegexError as e: if is_last_xml(i): raise e elif value is not None: results = value break missing_metadata_attributes[attr] = results return missing_metadata_attributes | nonmissing_metadata_attributes def _to_xarray(self, array: np.ndarray, transform: Affine) -> DataArray: """Convert the raster to an xarray.DataArray.""" attrs = {"crs": self.crs} for attr in set(self.metadata_attributes).union({"date"}): try: attrs[attr] = getattr(self, attr) except Exception: pass if len(array.shape) == 2: height, width = array.shape dims = ["y", "x"] elif len(array.shape) == 3: height, width = array.shape[1:] dims = ["band", "y", "x"] elif not any(dim for dim in array.shape): DataArray( name=self.name or self.__class__.__name__, attrs=attrs, ) else: raise ValueError( f"Array should be 2 or 3 dimensional. Got shape {array.shape}" ) coords = _generate_spatial_coords(transform, width, height) return DataArray( array, coords=coords, dims=dims, name=self.name or self.__class__.__name__, attrs=attrs, )
[docs] class Band(_ImageBandBase): """Band holding a single 2 dimensional array representing an image band.""" cmap: ClassVar[str | None] = None
[docs] @classmethod def from_geopandas( cls, gdf: GeoDataFrame | GeoSeries, *, res: int | None = None, out_shape: tuple[int, int] | None = None, bounds: Any | None = None, fill: int = 0, all_touched: bool = False, merge_alg: Callable = MergeAlg.replace, default_value: int = 1, dtype: Any | None = None, **kwargs, ) -> None: """Create Band from a GeoDataFrame.""" if bounds is not None: bounds = to_bbox(bounds) if out_shape == (0,): arr = np.array([]) else: arr = _gdf_to_arr( gdf, res=res, bounds=bounds, fill=fill, all_touched=all_touched, merge_alg=merge_alg, default_value=default_value, dtype=dtype, out_shape=out_shape, ) if bounds is None: bounds = gdf.total_bounds obj = cls(arr, crs=gdf.crs, bounds=bounds, **kwargs) obj._from_geopandas = True return obj
def __init__( self, data: str | np.ndarray | None = None, res: int | None_ = None_, crs: Any | None = None, bounds: tuple[float, float, float, float] | None = None, nodata: int | None = None, mask: "Band | None" = None, processes: int = 1, name: str | None = None, band_id: str | None = None, cmap: str | None = None, all_file_paths: list[str] | None = None, **kwargs, ) -> None: """Band initialiser.""" if data is None: # allowing 'path' to replace 'data' as argument # to make the print repr. valid as initialiser if "path" not in kwargs: raise TypeError("Must specify either 'data' or 'path'.") data = kwargs.pop("path") super().__init__(**kwargs) if isinstance(data, (str | Path | os.PathLike)) and any( arg is not None for arg in [crs, bounds] ): raise ValueError("Can only specify 'bounds' and 'crs' if data is an array.") self._mask = mask self._values = None self.nodata = nodata self._crs = crs bounds = to_bbox(bounds) if bounds is not None else None self._bounds = bounds self._all_file_paths = all_file_paths if isinstance(data, np.ndarray): if self._bounds is None: raise ValueError("Must specify bounds when data is an array.") if not (res is None or (callable(res) and res() is None)): # if not (res is None or (callable(res) and res() is None)) and _res_as_tuple( # res # ) != _get_res_from_bounds(self._bounds, data.shape): raise ValueError( f"Cannot specify 'res' when data is an array. {res} and {_get_res_from_bounds(self._bounds, data.shape)}" ) self._crs = crs self.transform = _get_transform_from_bounds(self._bounds, shape=data.shape) self._from_array = True self.values = data self._res = _get_res_from_bounds(self._bounds, self.values.shape) elif not isinstance(data, (str | Path | os.PathLike)): raise TypeError( "'data' must be string, Path-like or numpy.ndarray. " f"Got {type(data)}" ) else: self._path = _fix_path(str(data)) self._res = res if not (callable(res) and res() is None) else None if cmap is not None: self.cmap = cmap self._name = name self._band_id = band_id self.processes = processes if self._all_file_paths: self._all_file_paths = {_fix_path(path) for path in self._all_file_paths} parent = _fix_path(Path(self.path).parent) self._all_file_paths = { path for path in self._all_file_paths if parent in path } if self.metadata: if self.path is not None: self.metadata = { key: value for key, value in self.metadata.items() if key == self.path } this_metadata = self.metadata[self.path] for key, value in this_metadata.items(): if key in dir(self): setattr(self, f"_{key}", value) else: setattr(self, key, value) elif self.metadata_attributes and self.path is not None: if self._all_file_paths is None: self._all_file_paths = _get_all_file_paths(str(Path(self.path).parent)) for key, value in self._get_metadata_attributes( self.metadata_attributes ).items(): setattr(self, key, value) def __lt__(self, other: "Band") -> bool: """Makes Bands sortable by band_id.""" return self.band_id < other.band_id
[docs] def value_counts(self) -> pd.Series: """Value count of each value of the band's array.""" try: values = self.values.data[self.values.mask == False] except AttributeError: values = self.values unique_values, counts = np.unique(values, return_counts=True) return pd.Series(counts, index=unique_values).sort_values(ascending=False)
@property def values(self) -> np.ndarray: """The numpy array, if loaded.""" if self._values is None: raise _ArrayNotLoadedError("array is not loaded.") return self._values @values.setter def values(self, new_val): if isinstance(new_val, np.ndarray): self._values = new_val else: self._values = self._to_numpy(new_val) @property def band_id(self) -> str: """Band id.""" if self._band_id is not None: return self._band_id return self._name_regex_searcher("band", self.filename_patterns) @property def height(self) -> int: """Pixel heigth of the image band.""" try: return self.values.shape[-2] except IndexError: return 0 @property def width(self) -> int: """Pixel width of the image band.""" try: return self.values.shape[-1] except IndexError: return 0 @property def tile(self) -> str: """Tile name from filename_regex.""" if hasattr(self, "_tile") and self._tile: return self._tile return self._name_regex_searcher( "tile", self.filename_patterns + self.image_patterns ) @property def date(self) -> str: """Tile name from filename_regex.""" if hasattr(self, "_date") and self._date: return self._date return self._name_regex_searcher( "date", self.filename_patterns + self.image_patterns ) @property def crs(self) -> pyproj.CRS | None: """Coordinate reference system.""" if self._crs is None: self._add_crs_and_bounds() return pyproj.CRS(self._crs) @property def bounds(self) -> tuple[int, int, int, int] | None: """Bounds as tuple (minx, miny, maxx, maxy).""" if self._bounds is None: self._add_crs_and_bounds() return self._bounds def _add_crs_and_bounds(self) -> None: with opener(self.path) as file: with rasterio.open(file) as src: self._bounds = to_bbox(src.bounds) self._crs = src.crs
[docs] def get_n_largest( self, n: int, precision: float = 0.000001, column: str = "value" ) -> GeoDataFrame: """Get the largest values of the array as polygons in a GeoDataFrame.""" copied = self.copy() value_must_be_at_least = np.sort(np.ravel(copied.values))[-n] - (precision or 0) copied._values = np.where(copied.values >= value_must_be_at_least, 1, 0) df = copied.to_geopandas(column).loc[lambda x: x[column] == 1] df[column] = f"largest_{n}" return df
[docs] def get_n_smallest( self, n: int, precision: float = 0.000001, column: str = "value" ) -> GeoDataFrame: """Get the lowest values of the array as polygons in a GeoDataFrame.""" copied = self.copy() value_must_be_at_least = np.sort(np.ravel(copied.values))[n] - (precision or 0) copied._values = np.where(copied.values <= value_must_be_at_least, 1, 0) df = copied.to_geopandas(column).loc[lambda x: x[column] == 1] df[column] = f"smallest_{n}" return df
[docs] def clip( self, mask: GeoDataFrame | GeoSeries | Polygon | MultiPolygon, ) -> "Band": """Clip band values to geometry mask while preserving bounds.""" if not self.height or not self.width: return self fill: int = self.nodata or 0 mask_array: np.ndarray = Band.from_geopandas( gdf=to_gdf(mask)[["geometry"]], default_value=1, fill=fill, out_shape=self.values.shape, bounds=mask, ).values is_not_polygon = mask_array == fill if isinstance(self.values, np.ma.core.MaskedArray): self._values.mask |= is_not_polygon else: self._values = np.ma.array( self.values, mask=is_not_polygon, fill_value=self.nodata ) return self
[docs] def load( self, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, indexes: int | tuple[int] | None = None, masked: bool = True, file_system=None, **kwargs, ) -> "Band": """Load and potentially clip the array. The array is stored in the 'values' property. """ global _LOAD_COUNTER _LOAD_COUNTER += 1 _masking = kwargs.pop("_masking", self.masking) bounds_was_none = bounds is None bounds = _get_bounds(bounds, self._bbox, self.union_all()) should_return_empty: bool = bounds is not None and bounds.area == 0 if should_return_empty: self._values = np.array([]) self._bounds = None self.transform = None # activate setter self.values = self._values return self if self.has_array and bounds_was_none: return self if bounds is not None: minx, miny, maxx, maxy = to_bbox(bounds) # bounds = (int(minx), int(miny), math.ceil(maxx), math.ceil(maxy)) bounds = minx, miny, maxx, maxy if indexes is None: indexes = 1 # as tuple to ensure we get 3d array _indexes: tuple[int] = (indexes,) if isinstance(indexes, int) else indexes # allow setting a fixed out_shape for the array, in order to make mask same shape as values out_shape = kwargs.pop("out_shape", None) if self.has_array and [int(x) for x in bounds] != [int(x) for x in self.bounds]: raise ValueError( "Cannot re-load array with different bounds. " "Use .copy() to read with different bounds. " "Or .clip(mask) to clip.", self, self.values.shape, [int(x) for x in bounds], [int(x) for x in self.bounds], ) with opener(self.path, file_system=file_system) as f: with rasterio.open(f, nodata=self.nodata) as src: self._res = src.res if not self.res else self.res if self.nodata is None or np.isnan(self.nodata): self.nodata = src.nodata else: dtype_min_value = _get_dtype_min(src.dtypes[0]) dtype_max_value = _get_dtype_max(src.dtypes[0]) if self.nodata > dtype_max_value or self.nodata < dtype_min_value: src._dtypes = tuple( rasterio.dtypes.get_minimum_dtype(self.nodata) for _ in range(len(_indexes)) ) if bounds is None: if self._res != src.res: if out_shape is None: out_shape = _get_shape_from_bounds( to_bbox(src.bounds), self.res, indexes ) self.transform = _get_transform_from_bounds( to_bbox(src.bounds), shape=out_shape ) else: self.transform = src.transform values = src.read( indexes=indexes, out_shape=out_shape, masked=masked, **kwargs, ) else: window = rasterio.windows.from_bounds( *bounds, transform=src.transform ) if out_shape is None: out_shape = _get_shape_from_bounds(bounds, self.res, indexes) values = src.read( indexes=indexes, window=window, boundless=False, out_shape=out_shape, masked=masked, **kwargs, ) assert out_shape == values.shape, ( out_shape, values.shape, ) width, height = values.shape[-2:] if width and height: self.transform = rasterio.transform.from_bounds( *bounds, width, height ) if self.nodata is not None and not np.isnan(self.nodata): if isinstance(values, np.ma.core.MaskedArray): values.data[values.data == src.nodata] = self.nodata else: values[values == src.nodata] = self.nodata if _masking and not isinstance(values, np.ma.core.MaskedArray): mask_arr = _read_mask_array(self, bounds=bounds) values = np.ma.array(values, mask=mask_arr, fill_value=self.nodata) elif _masking: mask_arr = _read_mask_array(self, bounds=bounds) values.mask |= mask_arr if bounds is not None: self._bounds = to_bbox(bounds) self._values = values # trigger the setter self.values = values return self
@property def has_array(self) -> bool: """Whether the array is loaded.""" try: if not isinstance(self.values, (np.ndarray | DataArray)): raise ValueError() return True except ValueError: # also catches _ArrayNotLoadedError return False
[docs] def write( self, path: str | Path, driver: str = "GTiff", compress: str = "LZW", file_system=None, **kwargs, ) -> None: """Write the array as an image file.""" if not hasattr(self, "_values"): raise ValueError( "Can only write image band from Band constructed from array." ) if self.crs is None: raise ValueError("Cannot write None crs to image.") if self.nodata: # TODO take out .data if masked? values_with_nodata = np.concatenate( [self.values.flatten(), np.array([self.nodata])] ) else: values_with_nodata = self.values profile = { "driver": driver, "compress": compress, "dtype": rasterio.dtypes.get_minimum_dtype(values_with_nodata), "crs": self.crs, "transform": self.transform, "nodata": self.nodata, "count": 1 if len(self.values.shape) == 2 else self.values.shape[0], "height": self.height, "width": self.width, } | kwargs with opener(path, "wb", file_system=file_system) as f: with rasterio.open(f, "w", **profile) as dst: if dst.nodata is None: dst.nodata = _get_dtype_min(dst.dtypes[0]) if ( isinstance(self.values, np.ma.core.MaskedArray) and dst.nodata is not None ): self.values.data[np.isnan(self.values.data)] = dst.nodata self.values.data[self.values.mask] = dst.nodata if len(self.values.shape) == 2: dst.write(self.values, indexes=1) else: for i in range(self.values.shape[0]): dst.write(self.values[i], indexes=i + 1) if isinstance(self.values, np.ma.core.MaskedArray): dst.write_mask(self.values.mask) self._path = _fix_path(str(path))
[docs] def apply(self, func: Callable, **kwargs) -> "Band": """Apply a function to the Band.""" results = func(self, **kwargs) if isinstance(results, Band): return results self.values = results return self
[docs] def sample(self, size: int = 1000, mask: Any = None, **kwargs) -> "Image": """Take a random spatial sample area of the Band.""" copied = self.copy() if mask is not None: point = GeoSeries([copied.union_all()]).clip(mask).sample_points(1) else: point = GeoSeries([copied.union_all()]).sample_points(1) buffered = point.buffer(size / 2).clip(copied.union_all()) copied = copied.load(bounds=buffered.total_bounds, **kwargs) return copied
[docs] def buffer(self, distance: int, copy: bool = True) -> "Band": """Buffer array points with the value 1 in a binary array. Args: distance: Number of array cells to buffer by. copy: Whether to copy the Band. Returns: Band with buffered values. """ copied = self.copy() if copy else self copied.values = array_buffer(copied.values, distance) return copied
[docs] def gradient(self, degrees: bool = False, copy: bool = True) -> "Band": """Get the slope of an elevation band. Calculates the absolute slope between the grid cells based on the image resolution. For multi-band images, the calculation is done for each band. Args: band: band instance. degrees: If False (default), the returned values will be in ratios, where a value of 1 means 1 meter up per 1 meter forward. If True, the values will be in degrees from 0 to 90. copy: Whether to copy or overwrite the original Raster. Defaults to True. Returns: The class instance with new array values, or a copy if copy is True. Examples: --------- Making an array where the gradient to the center is always 10. >>> import sgis as sg >>> import numpy as np >>> arr = np.array( ... [ ... [100, 100, 100, 100, 100], ... [100, 110, 110, 110, 100], ... [100, 110, 120, 110, 100], ... [100, 110, 110, 110, 100], ... [100, 100, 100, 100, 100], ... ] ... ) Now let's create a Raster from this array with a resolution of 10. >>> band = sg.Band(arr, crs=None, bounds=(0, 0, 50, 50), res=10) The gradient will be 1 (1 meter up for every meter forward). The calculation is by default done in place to save memory. >>> band.gradient(copy=False) >>> band.values array([[0., 1., 1., 1., 0.], [1., 1., 1., 1., 1.], [1., 1., 0., 1., 1.], [1., 1., 1., 1., 1.], [0., 1., 1., 1., 0.]]) """ copied = self.copy() if copy else self copied._values = _get_gradient(copied, degrees=degrees, copy=copy) return copied
[docs] def zonal( self, polygons: GeoDataFrame, aggfunc: str | Callable | list[Callable | str], array_func: Callable | None = None, dropna: bool = True, ) -> GeoDataFrame: """Calculate zonal statistics in polygons. Args: polygons: A GeoDataFrame of polygon geometries. aggfunc: Function(s) of which to aggregate the values within each polygon. array_func: Optional calculation of the raster array before calculating the zonal statistics. dropna: If True (default), polygons with all missing values will be removed. Returns: A GeoDataFrame with aggregated values per polygon. """ idx_mapper, idx_name = get_index_mapper(polygons) polygons, aggfunc, func_names = _prepare_zonal(polygons, aggfunc) poly_iter = _make_geometry_iterrows(polygons) kwargs = { "band": self, "aggfunc": aggfunc, "array_func": array_func, "func_names": func_names, } if self.processes == 1: aggregated = [_zonal_one_pair(i, poly, **kwargs) for i, poly in poly_iter] else: with joblib.Parallel(n_jobs=self.processes, backend="loky") as parallel: aggregated = parallel( joblib.delayed(_zonal_one_pair)(i, poly, **kwargs) for i, poly in poly_iter ) return _zonal_post( aggregated, polygons=polygons, idx_mapper=idx_mapper, idx_name=idx_name, dropna=dropna, )
[docs] def to_geopandas(self, column: str = "value", dropna: bool = True) -> GeoDataFrame: """Create a GeoDataFrame from the image Band. Args: column: Name of resulting column that holds the raster values. dropna: Whether to remove values that are NA or equal to the nodata value. Returns: A GeoDataFrame with a geometry column and array values. """ if not hasattr(self, "_values"): raise ValueError("Array is not loaded.") if isinstance(self.values, np.ma.core.MaskedArray): self.values.data[self.values.mask] = self.nodata or 0 if self.values.shape[0] == 0: df = GeoDataFrame({"geometry": []}, crs=self.crs) else: df = GeoDataFrame( pd.DataFrame( _array_to_geojson( self.values, self.transform, processes=self.processes ), columns=[column, "geometry"], ), geometry="geometry", crs=self.crs, ) if dropna: return df[(df[column] != self.nodata) & (df[column].notna())] return df
[docs] def to_xarray(self) -> DataArray: """Convert the raster to an xarray.DataArray.""" return self._to_xarray( self.values, transform=self.transform, # name=self.name or self.__class__.__name__.lower(), )
[docs] def to_numpy(self) -> np.ndarray | np.ma.core.MaskedArray: """Convert the raster to a numpy.ndarray.""" return self._to_numpy(self.values).copy()
def _to_numpy( self, arr: np.ndarray | DataArray, masked: bool = True ) -> np.ndarray | np.ma.core.MaskedArray: if not isinstance(arr, np.ndarray): mask_arr = None if masked: try: mask_arr = arr.isnull().values except AttributeError: pass try: arr = arr.to_numpy() except AttributeError: arr = arr.values if mask_arr is not None: arr = np.ma.array(arr, mask=mask_arr, fill_value=self.nodata) if not isinstance(arr, np.ndarray): arr = np.array(arr) if ( masked and not isinstance(arr, np.ma.core.MaskedArray) and mask_arr is not None ): arr = np.ma.array(arr, mask=mask_arr, fill_value=self.nodata) return arr def __repr__(self) -> str: """String representation.""" try: band_id = f"'{self.band_id}'" if self.band_id else None except (ValueError, AttributeError): band_id = None try: path = f"'{self.path}'" except (ValueError, AttributeError): path = None return ( f"{self.__class__.__name__}(band_id={band_id}, res={self.res}, path={path})" )
[docs] class NDVIBand(Band): """Band for NDVI values.""" cmap: str = "Greens"
def median_as_int_and_minimum_dtype(arr: np.ndarray) -> np.ndarray: arr = np.median(arr, axis=0).astype(int) min_dtype = rasterio.dtypes.get_minimum_dtype(arr) return arr.astype(min_dtype)
[docs] class Image(_ImageBandBase): """Image consisting of one or more Bands.""" band_class: ClassVar[Band] = Band def __init__( self, data: str | Path | Sequence[Band] | None = None, res: int | None_ = None_, mask: "Band | None" = None, processes: int = 1, df: pd.DataFrame | None = None, nodata: int | None = None, all_file_paths: list[str] | None = None, **kwargs, ) -> None: """Image initialiser.""" if data is None: # allowing 'bands' to replace 'data' as argument # to make the print repr. valid as initialiser if "bands" not in kwargs: raise TypeError("Must specify either 'data' or 'bands'.") data = kwargs.pop("bands") super().__init__(**kwargs) self.nodata = nodata self.processes = processes self._crs = None self._bands = None self._mask = mask if isinstance(data, Band): data = [data] if hasattr(data, "__iter__") and all(isinstance(x, Band) for x in data): self._construct_image_from_bands(data, res) return elif not isinstance(data, (str | Path | os.PathLike)): raise TypeError( f"'data' must be string, Path-like or a sequence of Band. Got {data}" ) self._res = res if not (callable(res) and res() is None) else None self._path = _fix_path(data) if all_file_paths is None and self.path: self._all_file_paths = _get_all_file_paths(self.path) elif self.path: name = Path(self.path).name all_file_paths = {_fix_path(x) for x in all_file_paths if name in x} self._all_file_paths = {x for x in all_file_paths if self.path in x} else: self._all_file_paths = None if df is None: if not self._all_file_paths: self._all_file_paths = [self.path] df = self._create_metadata_df(self._all_file_paths) df["image_path"] = df["image_path"].astype(str) cols_to_explode = ["file_path", "file_name"] try: df = df.explode(cols_to_explode, ignore_index=True) except ValueError: for col in cols_to_explode: df = df.explode(col) df = df.loc[lambda x: ~x["file_name"].duplicated()].reset_index(drop=True) df = df.loc[lambda x: x["image_path"] == self.path] self._df = df if self.path is not None and self.metadata: self.metadata = { key: value for key, value in self.metadata.items() if self.path in key } if self.metadata: try: metadata = self.metadata[self.path] except KeyError: metadata = {} for key, value in metadata.items(): if key in dir(self): setattr(self, f"_{key}", value) else: setattr(self, key, value) elif self.metadata_attributes and self.path is not None: for key, value in self._get_metadata_attributes( self.metadata_attributes ).items(): setattr(self, key, value)
[docs] def clip( self, mask: GeoDataFrame | GeoSeries | Polygon | MultiPolygon, copy: bool = True ) -> "Image": """Clip band values to geometry mask while preserving bounds.""" copied = self.copy() if copy else self fill: int = self.nodata or 0 mask_array: np.ndarray = Band.from_geopandas( gdf=to_gdf(mask)[["geometry"]], default_value=1, fill=fill, out_shape=next(iter(self)).values.shape, bounds=self.bounds, ).values is_not_polygon = mask_array == fill for band in copied: if isinstance(band.values, np.ma.core.MaskedArray): band._values.mask |= is_not_polygon else: band._values = np.ma.array( band.values, mask=is_not_polygon, fill_value=band.nodata ) return copied
[docs] def load( self, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, indexes: int | tuple[int] | None = None, file_system=None, **kwargs, ) -> "ImageCollection": """Load all image Bands with threading.""" if bounds is None and indexes is None and all(band.has_array for band in self): return self if self.masking: mask_array: np.ndarray = _read_mask_array( self, bounds=bounds, indexes=indexes, file_system=file_system, **kwargs, ) with joblib.Parallel(n_jobs=self.processes, backend="threading") as parallel: parallel( joblib.delayed(_load_band)( band, bounds=bounds, indexes=indexes, file_system=file_system, _masking=None, **kwargs, ) for band in self ) if self.masking: for band in self: if isinstance(band.values, np.ma.core.MaskedArray): band.values.mask |= mask_array else: band.values = np.ma.array( band.values, mask=mask_array, fill_value=self.nodata ) return self
def _construct_image_from_bands( self, data: Sequence[Band], res: int | None ) -> None: self._bands = list(data) if res is None: res = {band.res for band in self.bands} if len(res) == 1: self._res = next(iter(res)) else: raise ValueError(f"Different resolutions for the bands: {res}") else: self._res = res for key in self.metadata_attributes: band_values = {getattr(band, key) for band in self if hasattr(band, key)} band_values = {x for x in band_values if x is not None} if len(band_values) > 1: raise ValueError(f"Different {key} values in bands: {band_values}") elif len(band_values): try: setattr(self, key, next(iter(band_values))) except AttributeError: setattr(self, f"_{key}", next(iter(band_values)))
[docs] def copy(self) -> "Image": """Copy the instance and its attributes.""" copied = super().copy() for band in copied: band._mask = copied._mask return copied
[docs] def apply(self, func: Callable, **kwargs) -> "Image": """Apply a function to each band of the Image.""" with joblib.Parallel(n_jobs=self.processes, backend="loky") as parallel: parallel(joblib.delayed(_band_apply)(band, func, **kwargs) for band in self) return self
[docs] def ndvi( self, red_band: str, nir_band: str, padding: int = 0, copy: bool = True ) -> NDVIBand: """Calculate the NDVI for the Image.""" copied = self.copy() if copy else self red = copied[red_band].load() nir = copied[nir_band].load() arr: np.ndarray | np.ma.core.MaskedArray = ndvi( red.values, nir.values, padding=padding ) return NDVIBand( arr, bounds=red.bounds, crs=red.crs, **{k: v for k, v in red._common_init_kwargs.items() if k != "res"}, )
[docs] def get_brightness( self, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, rbg_bands: list[str] | None = None, ) -> Band: """Get a Band with a brightness score of the Image's RBG bands.""" if rbg_bands is None: try: r, b, g = self.rbg_bands except AttributeError as err: raise AttributeError( "Must specify rbg_bands when there is no class variable 'rbd_bands'" ) from err else: r, b, g = rbg_bands red = self[r].load(bounds=bounds) blue = self[b].load(bounds=bounds) green = self[g].load(bounds=bounds) brightness = ( 0.299 * red.values + 0.587 * green.values + 0.114 * blue.values ).astype(int) return Band( brightness, bounds=red.bounds, crs=self.crs, **{k: v for k, v in self._common_init_kwargs.items() if k != "res"}, )
[docs] def to_xarray(self) -> DataArray: """Convert the raster to an xarray.DataArray.""" return self._to_xarray( np.array([band.values for band in self]), transform=self[0].transform, )
@property def band_ids(self) -> list[str]: """The Band ids.""" return [band.band_id for band in self] @property def file_paths(self) -> list[str]: """The Band file paths.""" return [band.path for band in self] @property def bands(self) -> list[Band]: """The Image Bands.""" if self._bands is not None: return self._bands if self.masking: mask_band_id = self.masking["band_id"] paths = [path for path in self._df["file_path"] if mask_band_id not in path] else: paths = self._df["file_path"] self._bands = [ self.band_class( path, all_file_paths=self._all_file_paths, **self._common_init_kwargs, ) for path in paths ] if ( self.filename_patterns and any(_get_non_optional_groups(pat) for pat in self.filename_patterns) or self.image_patterns and any(_get_non_optional_groups(pat) for pat in self.image_patterns) ): self._bands = [band for band in self._bands if band.band_id is not None] if self.filename_patterns: self._bands = [ band for band in self._bands if any(re.search(pat, band.name) for pat in self.filename_patterns) ] if self.image_patterns: self._bands = [ band for band in self._bands if any( re.search(pat, Path(band.path).parent.name) for pat in self.image_patterns ) ] if self._should_be_sorted: self._bands = list(sorted(self._bands)) return self._bands @property def _should_be_sorted(self) -> bool: sort_groups = ["band", "band_id"] return ( self.filename_patterns and any( group in _get_non_optional_groups(pat) for group in sort_groups for pat in self.filename_patterns ) or all(band.band_id is not None for band in self) ) @property def tile(self) -> str: """Tile name from filename_regex.""" if hasattr(self, "_tile") and self._tile: return self._tile return self._name_regex_searcher( "tile", self.image_patterns + self.filename_patterns ) @property def date(self) -> str: """Tile name from filename_regex.""" if hasattr(self, "_date") and self._date: return self._date return self._name_regex_searcher( "date", self.image_patterns + self.filename_patterns ) @property def crs(self) -> str | None: """Coordinate reference system of the Image.""" if self._crs is not None: return self._crs if not len(self): return None self._crs = get_common_crs(self) return self._crs @property def bounds(self) -> tuple[int, int, int, int] | None: """Bounds of the Image (minx, miny, maxx, maxy).""" try: return get_total_bounds([band.bounds for band in self]) except exceptions.RefreshError: bounds = [] for band in self: time.sleep(0.1) bounds.append(band.bounds) return get_total_bounds(bounds)
[docs] def to_geopandas(self, column: str = "value") -> GeoDataFrame: """Convert the array to a GeoDataFrame of grid polygons and values.""" return pd.concat( [band.to_geopandas(column=column) for band in self], ignore_index=True )
[docs] def sample( self, n: int = 1, size: int = 1000, mask: Any = None, **kwargs ) -> "Image": """Take a random spatial sample of the image.""" copied = self.copy() if mask is not None: points = GeoSeries([self.union_all()]).clip(mask).sample_points(n) else: points = GeoSeries([self.union_all()]).sample_points(n) buffered = points.buffer(size / 2).clip(self.union_all()) boxes = to_gdf([box(*arr) for arr in buffered.bounds.values], crs=self.crs) copied._bands = [band.load(bounds=boxes, **kwargs) for band in copied] copied._bounds = get_total_bounds([band.bounds for band in copied]) return copied
def __getitem__( self, band: str | int | Sequence[str] | Sequence[int] ) -> "Band | Image": """Get bands by band_id or integer index or a sequence of such. Returns a Band if a string or int is passed, returns an Image if a sequence of strings or integers is passed. """ if isinstance(band, str): return self._get_band(band) if isinstance(band, int): return self.bands[band] copied = self.copy() try: copied._bands = [copied._get_band(x) for x in band] except TypeError: try: copied._bands = [copied.bands[i] for i in band] except TypeError as e: raise TypeError( f"{self.__class__.__name__} indices should be string, int " f"or sequence of string or int. Got {band}." ) from e return copied def __contains__(self, item: str | Sequence[str]) -> bool: """Check if the Image contains a band_id (str) or all band_ids in a sequence.""" if isinstance(item, str): return item in self.band_ids return all(x in self.band_ids for x in item) def __lt__(self, other: "Image") -> bool: """Makes Images sortable by date.""" try: return self.date < other.date except Exception as e: print("", self.path, self.date, other.path, other.date, sep="\n") raise e def __iter__(self) -> Iterator[Band]: """Iterate over the Bands.""" return iter(self.bands) def __len__(self) -> int: """Number of bands in the Image.""" return len(self.bands) def __repr__(self) -> str: """String representation.""" return f"{self.__class__.__name__}(bands={self.bands})" def _get_band(self, band: str) -> Band: if not isinstance(band, str): raise TypeError(f"band must be string. Got {type(band)}") bands = [x for x in self.bands if x.band_id == band] if len(bands) == 1: return bands[0] if len(bands) > 1: raise ValueError(f"Multiple matches for band_id {band} for {self}") bands = [x for x in self.bands if x.band_id == band.replace("B0", "B")] if len(bands) == 1: return bands[0] bands = [x for x in self.bands if x.band_id.replace("B0", "B") == band] if len(bands) == 1: return bands[0] try: more_bands = [x for x in self.bands if x.path == band] except PathlessImageError: more_bands = bands if len(more_bands) == 1: return more_bands[0] if len(bands) > 1: prefix = "Multiple" elif not bands: prefix = "No" raise KeyError( f"{prefix} matches for band {band} among paths {[Path(band.path).name for band in self.bands]}" )
[docs] class ImageCollection(_ImageBase): """Collection of Images. Loops though Images. """ image_class: ClassVar[Image] = Image band_class: ClassVar[Band] = Band _metadata_attribute_collection_type: ClassVar[type] = pd.Series def __init__( self, data: str | Path | Sequence[Image] | Sequence[str | Path], res: int | None_ = None_, level: str | None_ | None = None_, processes: int = 1, metadata: str | dict | pd.DataFrame | None = None, nodata: int | None = None, **kwargs, ) -> None: """Initialiser.""" if data is not None and kwargs.get("root"): root = _fix_path(kwargs.pop("root")) data = [f"{root}/{name}" for name in data] _from_root = True else: _from_root = False super().__init__(metadata=metadata, **kwargs) if callable(level) and level() is None: level = None self.nodata = nodata self.level = level self.processes = processes self._res = res if not (callable(res) and res() is None) else None self._crs = None self._df = None self._all_file_paths = None self._images = None if hasattr(data, "__iter__") and not isinstance(data, str): self._path = None if all(isinstance(x, Image) for x in data): self.images = [x.copy() for x in data] return elif all(isinstance(x, (str | Path | os.PathLike)) for x in data): # adding band paths (asuming 'data' is a sequence of image paths) try: self._all_file_paths = _get_child_paths_threaded(data) | { _fix_path(x) for x in data } except FileNotFoundError as e: if _from_root: raise TypeError( "When passing 'root', 'data' must be a sequence of image file names that have 'root' as parent path." ) from e raise e if self.level: self._all_file_paths = [ path for path in self._all_file_paths if self.level in path ] self._df = self._create_metadata_df(self._all_file_paths) return if not isinstance(data, (str | Path | os.PathLike)): raise TypeError("'data' must be string, Path-like or a sequence of Image.") self._path = _fix_path(str(data)) self._all_file_paths = _get_all_file_paths(self.path) if self.level: self._all_file_paths = [ path for path in self._all_file_paths if self.level in path ] self._df = self._create_metadata_df(self._all_file_paths)
[docs] def groupby( self, by: str | list[str], copy: bool = True, **kwargs ) -> ImageCollectionGroupBy: """Group the Collection by Image or Band attribute(s).""" df = pd.DataFrame( [(i, img) for i, img in enumerate(self) for _ in img], columns=["_image_idx", "_image_instance"], ) if isinstance(by, str): by = [by] for attr in by: if attr == "bounds": # need integers to check equality when grouping df[attr] = [ tuple(int(x) for x in band.bounds) for img in self for band in img ] continue try: df[attr] = [getattr(band, attr) for img in self for band in img] except AttributeError: df[attr] = [getattr(img, attr) for img in self for _ in img] with joblib.Parallel(n_jobs=self.processes, backend="loky") as parallel: return ImageCollectionGroupBy( sorted( parallel( joblib.delayed(_copy_and_add_df_parallel)( group_values, group_df, self, copy ) for group_values, group_df in df.groupby(by, **kwargs) ) ), by=by, collection=self, )
[docs] def explode(self, copy: bool = True) -> "ImageCollection": """Make all Images single-banded.""" copied = self.copy() if copy else self copied.images = [ self.image_class( [band], masking=self.masking, band_class=self.band_class, **self._common_init_kwargs, df=self._df, all_file_paths=self._all_file_paths, ) for img in self for band in img ] for img in copied: assert len(img) == 1 try: img._path = _fix_path(img[0].path) except PathlessImageError: pass return copied
[docs] def apply(self, func: Callable, **kwargs) -> "ImageCollection": """Apply a function to all bands in each image of the collection.""" with joblib.Parallel(n_jobs=self.processes, backend="loky") as parallel: parallel( joblib.delayed(_band_apply)(band, func, **kwargs) for img in self for band in img ) return self
[docs] def pixelwise( self, func: Callable, kwargs: dict | None = None, index_aligned_kwargs: dict | None = None, masked: bool = True, processes: int | None = None, ) -> np.ndarray | tuple[np.ndarray] | None: """Run a function for each pixel. The function should take a 1d array as first argument. This will be the pixel values for all bands in all images in the collection. """ values = np.array([band.values for img in self for band in img]) if ( masked and self.nodata is not None and hasattr(next(iter(next(iter(self)))).values, "mask") ): mask_array = np.array( [ (band.values.mask) | (band.values.data == self.nodata) for img in self for band in img ] ) elif masked and self.nodata is not None: mask_array = np.array( [band.values == self.nodata for img in self for band in img] ) elif masked: mask_array = np.array([band.values.mask for img in self for band in img]) else: mask_array = None nonmissing_row_indices, nonmissing_col_indices, results = pixelwise( func=func, values=values, mask_array=mask_array, index_aligned_kwargs=index_aligned_kwargs, kwargs=kwargs, processes=processes or self.processes, ) return PixelwiseResults( nonmissing_row_indices, nonmissing_col_indices, results, shape=values.shape[1:], res=self.res, bounds=self.bounds, crs=self.crs, nodata=self.nodata or np.nan, )
[docs] def get_unique_band_ids(self) -> list[str]: """Get a list of unique band_ids across all images.""" return list({band.band_id for img in self for band in img})
[docs] def filter( self, bands: str | list[str] | None = None, date_ranges: DATE_RANGES_TYPE = None, bbox: GeoDataFrame | GeoSeries | Geometry | tuple[float] | None = None, intersects: GeoDataFrame | GeoSeries | Geometry | tuple[float] | None = None, max_cloud_cover: int | None = None, copy: bool = True, ) -> "ImageCollection": """Filter images and bands in the collection.""" copied = self.copy() if copy else self if date_ranges: copied = copied._filter_dates(date_ranges) if max_cloud_cover is not None: copied.images = [ image for image in copied.images if image.cloud_cover_percentage < max_cloud_cover ] if bbox is not None: copied = copied._filter_bounds(bbox) copied._set_bbox(bbox) if intersects is not None: copied = copied._filter_bounds(intersects) if bands is not None: if isinstance(bands, str): bands = [bands] bands = set(bands) copied.images = [img[bands] for img in copied.images if bands in img] return copied
[docs] def merge( self, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, method: str | Callable = "mean", as_int: bool = True, indexes: int | tuple[int] | None = None, **kwargs, ) -> Band: """Merge all areas and all bands to a single Band.""" bounds = _get_bounds(bounds, self._bbox, self.union_all()) if bounds is not None: bounds = to_bbox(bounds) crs = self.crs if indexes is None: indexes = 1 if isinstance(indexes, int): _indexes = (indexes,) else: _indexes = indexes if method == "mean": _method = "sum" else: _method = method if self.masking or method not in list(rasterio.merge.MERGE_METHODS) + ["mean"]: arr = self._merge_with_numpy_func( method=method, bounds=bounds, as_int=as_int, **kwargs, ) else: datasets = [_open_raster(path) for path in self.file_paths] arr, _ = rasterio.merge.merge( datasets, res=self.res, bounds=(bounds if bounds is not None else self.bounds), indexes=_indexes, method=_method, nodata=self.nodata, **kwargs, ) if isinstance(indexes, int) and len(arr.shape) == 3 and arr.shape[0] == 1: arr = arr[0] if method == "mean": if as_int: arr = arr // len(datasets) else: arr = arr / len(datasets) if bounds is None: bounds = self.bounds # return self.band_class( band = Band( arr, bounds=bounds, crs=crs, **{k: v for k, v in self._common_init_kwargs.items() if k != "res"}, ) band._merged = True return band
[docs] def merge_by_band( self, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, method: str = "mean", as_int: bool = True, indexes: int | tuple[int] | None = None, **kwargs, ) -> Image: """Merge all areas to a single tile, one band per band_id.""" bounds = _get_bounds(bounds, self._bbox, self.union_all()) if bounds is not None: bounds = to_bbox(bounds) bounds = self.bounds if bounds is None else bounds out_bounds = bounds crs = self.crs if indexes is None: indexes = 1 if isinstance(indexes, int): _indexes = (indexes,) else: _indexes = indexes if method == "mean": _method = "sum" else: _method = method arrs = [] bands: list[Band] = [] for (band_id,), band_collection in self.groupby("band_id"): if self.masking or method not in list(rasterio.merge.MERGE_METHODS) + [ "mean" ]: arr = band_collection._merge_with_numpy_func( method=method, bounds=bounds, as_int=as_int, **kwargs, ) else: datasets = [_open_raster(path) for path in band_collection.file_paths] arr, _ = rasterio.merge.merge( datasets, res=self.res, bounds=(bounds if bounds is not None else self.bounds), indexes=_indexes, method=_method, nodata=self.nodata, **kwargs, ) if isinstance(indexes, int): arr = arr[0] if method == "mean": if as_int: arr = arr // len(datasets) else: arr = arr / len(datasets) arrs.append(arr) bands.append( self.band_class( arr, bounds=out_bounds, crs=crs, band_id=band_id, **{k: v for k, v in self._common_init_kwargs.items() if k != "res"}, ) ) # return self.image_class( # TODO image = Image( bands, band_class=self.band_class, **self._common_init_kwargs, ) image._merged = True return image
def _merge_with_numpy_func( self, method: str | Callable, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, as_int: bool = True, indexes: int | tuple[int] | None = None, **kwargs, ) -> np.ndarray: arrs = [] kwargs["indexes"] = indexes bounds = to_shapely(bounds) if bounds is not None else None numpy_func = get_numpy_func(method) if not callable(method) else method for (_bounds,), collection in self.groupby("bounds"): _bounds = ( to_shapely(_bounds).intersection(bounds) if bounds is not None else to_shapely(_bounds) ) if not _bounds.area: continue _bounds = to_bbox(_bounds) arr = np.array( [ ( # band.load( # bounds=(_bounds if _bounds is not None else None), # **kwargs, # ) # if not band.has_array # else band ).values for img in collection for band in img ] ) arr = numpy_func(arr, axis=0) if as_int: arr = arr.astype(int) min_dtype = rasterio.dtypes.get_minimum_dtype(arr) arr = arr.astype(min_dtype) if len(arr.shape) == 2: height, width = arr.shape elif len(arr.shape) == 3: height, width = arr.shape[1:] else: raise ValueError(arr.shape) transform = rasterio.transform.from_bounds(*_bounds, width, height) coords = _generate_spatial_coords(transform, width, height) arrs.append( DataArray( arr, coords=coords, dims=["y", "x"], attrs={"crs": self.crs}, ) ) merged = merge_arrays( arrs, res=self.res, nodata=self.nodata, ) return merged.to_numpy()
[docs] def sort_images(self, ascending: bool = True) -> "ImageCollection": """Sort Images by date, then file path if date attribute is missing.""" self._images = ( list(sorted([img for img in self if img.date is not None])) + sorted( [img for img in self if img.date is None and img.path is not None], key=lambda x: x.path, ) + [img for img in self if img.date is None and img.path is None] ) if not ascending: self._images = list(reversed(self.images)) return self
[docs] def load( self, bounds: tuple | Geometry | GeoDataFrame | GeoSeries | None = None, indexes: int | tuple[int] | None = None, file_system=None, **kwargs, ) -> "ImageCollection": """Load all image Bands with threading.""" if ( bounds is None and indexes is None and all(band.has_array for img in self for band in img) ): return self with joblib.Parallel(n_jobs=self.processes, backend="threading") as parallel: if self.masking: masks: list[np.ndarray] = parallel( joblib.delayed(_read_mask_array)( img, bounds=bounds, indexes=indexes, file_system=file_system, **kwargs, ) for img in self ) parallel( joblib.delayed(_load_band)( band, bounds=bounds, indexes=indexes, file_system=file_system, _masking=None, **kwargs, ) for img in self for band in img ) if self.masking: for img, mask_array in zip(self, masks, strict=True): for band in img: if isinstance(band.values, np.ma.core.MaskedArray): band.values.mask |= mask_array else: band.values = np.ma.array( band.values, mask=mask_array, fill_value=self.nodata ) return self
[docs] def clip( self, mask: Geometry | GeoDataFrame | GeoSeries, dropna: bool = True, copy: bool = True, ) -> "ImageCollection": """Clip all image Bands while preserving bounds.""" copied = self.copy() if copy else self copied._images = [img for img in copied if img.union_all()] fill: int = self.nodata or 0 common_band_from_geopandas_kwargs = dict( gdf=to_gdf(mask)[["geometry"]], default_value=1, fill=fill, ) for img in copied: img._rounded_bounds = tuple(int(x) for x in img.bounds) for bounds in {img._rounded_bounds for img in copied}: shapes = { band.values.shape for img in copied for band in img if img._rounded_bounds == bounds } if len(shapes) != 1: raise ValueError(f"Different shapes: {shapes}. For bounds {bounds}") mask_array: np.ndarray = Band.from_geopandas( **common_band_from_geopandas_kwargs, out_shape=next(iter(shapes)), bounds=bounds, ).values is_not_polygon = mask_array == fill for img in copied: if img._rounded_bounds != bounds: continue for band in img: if isinstance(band.values, np.ma.core.MaskedArray): band._values.mask |= is_not_polygon else: band._values = np.ma.array( band.values, mask=is_not_polygon, fill_value=band.nodata ) for img in copied: del img._rounded_bounds if dropna: copied.images = [ img for img in copied if any(np.sum(band.values) for band in img) ] return copied
def _set_bbox( self, bbox: GeoDataFrame | GeoSeries | Geometry | tuple[float] ) -> "ImageCollection": """Set the mask to be used to clip the images to.""" self._bbox = to_bbox(bbox) # only update images when already instansiated if self._images is not None: for img in self._images: img._bbox = self._bbox if img.bands is None: continue for band in img: band._bbox = self._bbox bounds = box(*band._bbox).intersection(box(*band.bounds)) band._bounds = to_bbox(bounds) if not bounds.is_empty else None return self def _filter_dates( self, date_ranges: DATE_RANGES_TYPE = None, ) -> "ImageCollection": if not isinstance(date_ranges, (tuple, list)): raise TypeError( "date_ranges should be a 2-length tuple of strings or None, " "or a tuple of tuples for multiple date ranges" ) if not self.image_patterns: raise ValueError( "Cannot set date_ranges when the class's image_regexes attribute is None" ) self.images = [img for img in self if _date_is_within(img.date, date_ranges)] return self def _filter_bounds( self, other: GeoDataFrame | GeoSeries | Geometry | tuple ) -> "ImageCollection": if self._images is None: return self other = to_shapely(other) if self.processes == 1: intersects_list: pd.Series = GeoSeries( [img.union_all() for img in self] ).intersects(other) else: with joblib.Parallel(n_jobs=self.processes, backend="loky") as parallel: intersects_list: list[bool] = parallel( joblib.delayed(_intesects)(image, other) for image in self ) self.images = [ image for image, intersects in zip(self, intersects_list, strict=False) if intersects ] return self
[docs] def to_xarray( self, **kwargs, ) -> Dataset: """Convert the raster to an xarray.Dataset. Images are converted to 2d arrays for each unique bounds. The spatial dimensions will be labeled "x" and "y". The third dimension defaults to "date" if all images have date attributes. Otherwise defaults to the image name. """ if any(not band.has_array for img in self for band in img): raise ValueError("Arrays must be loaded.") # if by is None: if all(img.date for img in self): by = ["date"] elif not pd.Index([img.name for img in self]).is_unique: raise ValueError("Images must have unique names.") else: by = ["name"] # elif isinstance(by, str): # by = [by] xarrs: dict[str, DataArray] = {} for (bounds, band_id), collection in self.groupby(["bounds", "band_id"]): name = f"{band_id}_{'-'.join(str(int(x)) for x in bounds)}" first_band = collection[0][0] coords = _generate_spatial_coords( first_band.transform, first_band.width, first_band.height ) values = np.array([band.to_numpy() for img in collection for band in img]) assert len(values) == len(collection) # coords["band_id"] = [ # band.band_id or i for i, band in enumerate(collection[0]) # ] for attr in by: coords[attr] = [getattr(img, attr) for img in collection] # coords["band"] = band_id # dims = [*by, "y", "x"] # dims = ["band", "y", "x"] # dims = {} # for attr in by: # dims[attr] = [getattr(img, attr) for img in collection] xarrs[name] = DataArray( values, coords=coords, dims=dims, # name=name, name=band_id, attrs={ "crs": collection.crs, "band_id": band_id, }, # , "bounds": bounds}, **kwargs, ) return combine_by_coords(list(xarrs.values()))
# return Dataset(xarrs)
[docs] def to_geopandas(self, column: str = "value") -> dict[str, GeoDataFrame]: """Convert each band in each Image to a GeoDataFrame.""" out = {} i = 0 for img in self: for band in img: i += 1 try: name = band.name except AttributeError: name = None if name is None: name = f"{self.__class__.__name__}({i})" if name not in out: out[name] = band.to_geopandas(column=column) return out
[docs] def sample(self, n: int = 1, size: int = 500) -> "ImageCollection": """Sample one or more areas of a given size and set this as mask for the images.""" unioned = self.union_all() buffered_in = unioned.buffer(-size / 2) if not buffered_in.is_empty: bbox = to_gdf(buffered_in) else: bbox = to_gdf(unioned) copied = self.copy() sampled_images = [] while len(sampled_images) < n: mask = to_bbox(bbox.sample_points(1).buffer(size)) images = copied.filter(bbox=mask).images random.shuffle(images) try: images = images[:n] except IndexError: pass sampled_images += images copied._images = sampled_images[:n] if copied._should_be_sorted: copied._images = list(sorted(copied._images)) return copied
[docs] def sample_tiles(self, n: int) -> "ImageCollection": """Sample one or more tiles in a copy of the ImageCollection.""" copied = self.copy() sampled_tiles = list({img.tile for img in self}) random.shuffle(sampled_tiles) sampled_tiles = sampled_tiles[:n] copied.images = [image for image in self if image.tile in sampled_tiles] return copied
[docs] def sample_images(self, n: int) -> "ImageCollection": """Sample one or more images in a copy of the ImageCollection.""" copied = self.copy() images = copied.images if n > len(images): raise ValueError( f"n ({n}) is higher than number of images in collection ({len(images)})" ) sample = [] for _ in range(n): random.shuffle(images) img = images.pop() sample.append(img) copied.images = sample return copied
def __iter__(self) -> Iterator[Image]: """Iterate over the images.""" return iter(self.images) def __len__(self) -> int: """Number of images.""" return len(self.images) def __getattr__(self, attr: str) -> Any: """Make iterable of metadata attribute.""" if attr in (self.metadata_attributes or {}): return self._metadata_attribute_collection_type( [getattr(img, attr) for img in self] ) return super().__getattribute__(attr) def __getitem__( self, item: int | slice | Sequence[int | bool] ) -> "Image | ImageCollection": """Select one Image by integer index, or multiple Images by slice, list of int.""" if isinstance(item, int): return self.images[item] if isinstance(item, slice): copied = self.copy() copied.images = copied.images[item] return copied if isinstance(item, ImageCollection): def _get_from_single_element_list(lst: list[Any]) -> Any: if len(lst) != 1: raise ValueError(lst) return next(iter(lst)) copied = self.copy() copied._images = [ _get_from_single_element_list( [img2 for img2 in copied if img2.stem in img.path] ) for img in item ] return copied copied = self.copy() if callable(item): item = [item(img) for img in copied] # check for base bool and numpy bool if all("bool" in str(type(x)) for x in item): copied.images = [img for x, img in zip(item, copied, strict=True) if x] else: copied.images = [copied.images[i] for i in item] return copied @property def date(self) -> Any: """List of image dates.""" return self._metadata_attribute_collection_type([img.date for img in self]) @property def image_paths(self) -> Any: """List of image paths.""" return self._metadata_attribute_collection_type([img.path for img in self]) @property def images(self) -> list["Image"]: """List of images in the Collection.""" if self._images is not None: return self._images # only fetch images when they are needed self._images = _get_images( list(self._df["image_path"]), all_file_paths=self._all_file_paths, df=self._df, image_class=self.image_class, band_class=self.band_class, masking=self.masking, **self._common_init_kwargs, ) self._images = [img for img in self if len(img)] if self._should_be_sorted: self._images = list(sorted(self._images)) return self._images @property def _should_be_sorted(self) -> bool: """True if the ImageCollection has regexes that should make it sortable by date.""" sort_group = "date" return ( self.filename_patterns and any( sort_group in pat.groupindex and sort_group in _get_non_optional_groups(pat) for pat in self.filename_patterns ) or self.image_patterns and any( sort_group in pat.groupindex and sort_group in _get_non_optional_groups(pat) for pat in self.image_patterns ) or all(getattr(img, sort_group) is not None for img in self) ) @images.setter def images(self, new_value: list["Image"]) -> list["Image"]: new_value = list(new_value) if not new_value: self._images = new_value return if all(isinstance(x, Band) for x in new_value): if len(new_value) != len(self): raise ValueError("'images' must have same length as number of images.") new_images = [] for i, img in enumerate(self): img._bands = [new_value[i]] new_images.append(img) self._images = new_images return if not all(isinstance(x, Image) for x in new_value): raise TypeError("images should be a sequence of Image.") self._images = new_value
[docs] def union_all(self) -> Polygon | MultiPolygon: """(Multi)Polygon representing the union of all image bounds.""" return unary_union([img.union_all() for img in self])
@property def bounds(self) -> tuple[int, int, int, int]: """Total bounds for all Images combined.""" return get_total_bounds([img.bounds for img in self]) @property def crs(self) -> Any: """Common coordinate reference system of the Images.""" if self._crs is not None: return self._crs self._crs = get_common_crs([img.crs for img in self]) return self._crs
[docs] def plot_pixels( self, by: str | list[str] | None = None, x_var: str = "date", y_label: str = "value", p: float = 0.95, ylim: tuple[float, float] | None = None, figsize: tuple[int] = (20, 8), rounding: int = 3, ) -> None: """Plot each individual pixel in a dotplot for all dates. Args: by: Band attributes to groupby. Defaults to "bounds" and "band_id" if all bands have no-None band_ids, otherwise defaults to "bounds". x_var: Attribute to use on the x-axis. Defaults to "date" if the ImageCollection is sortable by date, otherwise a range index. Can be set to "days_since_start". y_label: Label to use on the y-axis. p: p-value for the confidence interval. ylim: Limits of the y-axis. figsize: Figure size as tuple (width, height). rounding: rounding of title n """ if by is None and all(band.band_id is not None for img in self for band in img): by = ["bounds", "band_id"] elif by is None: by = ["bounds"] alpha = 1 - p for group_values, subcollection in self.groupby(by): print("subcollection group values:", group_values) if "date" in x_var and subcollection._should_be_sorted: subcollection._images = list(sorted(subcollection._images)) if "date" in x_var and subcollection._should_be_sorted: x = np.array( [ datetime.datetime.strptime(band.date[:8], "%Y%m%d").date() for img in subcollection for band in img ] ) first_date = pd.Timestamp(x[0]) x = ( pd.to_datetime( [band.date[:8] for img in subcollection for band in img] ) - pd.Timestamp(np.min(x)) ).days else: x = np.arange(0, sum(1 for img in subcollection for band in img)) subcollection.pixelwise( _plot_pixels_1d, kwargs=dict( alpha=alpha, x_var=x_var, y_label=y_label, rounding=rounding, first_date=first_date, figsize=figsize, ), index_aligned_kwargs=dict(x=x), )
def __repr__(self) -> str: """String representation.""" root = "" if self.path is not None: data = f"'{self.path}'" elif all(img.path is not None for img in self): data = [img.path for img in self] parents = {str(Path(path).parent) for path in data} if len(parents) == 1: data = [Path(path).name for path in data] root = f" root='{next(iter(parents))}'," else: data = [img for img in self] return f"{self.__class__.__name__}({data},{root} res={self.res}, level='{self.level}')"
[docs] class Sentinel2Config: """Holder of Sentinel 2 regexes, band_ids etc.""" image_regexes: ClassVar[str] = (config.SENTINEL2_IMAGE_REGEX,) filename_regexes: ClassVar[str] = (config.SENTINEL2_FILENAME_REGEX,) metadata_attributes: ClassVar[ dict[str, Callable | functools.partial | tuple[str]] ] = { "processing_baseline": functools.partial( _extract_regex_match_from_string, regexes=(r"<PROCESSING_BASELINE>(.*?)</PROCESSING_BASELINE>",), ), "cloud_cover_percentage": "_get_cloud_cover_percentage", "is_refined": "_get_image_refining_flag", "boa_quantification_value": "_get_boa_quantification_value", } l1c_bands: ClassVar[set[str]] = { "B01": 60, "B02": 10, "B03": 10, "B04": 10, "B05": 20, "B06": 20, "B07": 20, "B08": 10, "B8A": 20, "B09": 60, "B10": 60, "B11": 20, "B12": 20, } l2a_bands: ClassVar[set[str]] = { key: res for key, res in l1c_bands.items() if key != "B10" } all_bands: ClassVar[set[str]] = l1c_bands rbg_bands: ClassVar[tuple[str]] = ("B04", "B02", "B03") ndvi_bands: ClassVar[tuple[str]] = ("B04", "B08") masking: ClassVar[BandMasking] = BandMasking( band_id="SCL", values={ 2: "Topographic casted shadows", 3: "Cloud shadows", 8: "Cloud medium probability", 9: "Cloud high probability", 10: "Thin cirrus", 11: "Snow or ice", }, ) def _get_image_refining_flag(self, xml_file: str) -> bool: match_ = re.search( r'Image_Refining flag="(?:REFINED|NOT_REFINED)"', xml_file, ) if match_ is None: return None if "NOT_REFINED" in match_.group(0): return False elif "REFINED" in match_.group(0): return True else: raise _RegexError(xml_file) def _get_boa_quantification_value(self, xml_file: str) -> int: return int( _extract_regex_match_from_string( xml_file, ( r'<BOA_QUANTIFICATION_VALUE unit="none">-?(\d+)</BOA_QUANTIFICATION_VALUE>', ), ) ) def _get_cloud_cover_percentage(self, xml_file: str) -> float: return float( _extract_regex_match_from_string( xml_file, ( r"<Cloud_Coverage_Assessment>([\d.]+)</Cloud_Coverage_Assessment>", r"<CLOUDY_PIXEL_OVER_LAND_PERCENTAGE>([\d.]+)</CLOUDY_PIXEL_OVER_LAND_PERCENTAGE>", ), ) )
[docs] class Sentinel2CloudlessConfig(Sentinel2Config): """Holder of regexes, band_ids etc. for Sentinel 2 cloudless mosaic.""" image_regexes: ClassVar[str] = (config.SENTINEL2_MOSAIC_IMAGE_REGEX,) filename_regexes: ClassVar[str] = (config.SENTINEL2_MOSAIC_FILENAME_REGEX,) masking: ClassVar[None] = None all_bands: ClassVar[list[str]] = [ x.replace("B0", "B") for x in Sentinel2Config.all_bands ] rbg_bands: ClassVar[dict[str, str]] = { key.replace("B0", "B") for key in Sentinel2Config.rbg_bands } ndvi_bands: ClassVar[dict[str, str]] = { key.replace("B0", "B") for key in Sentinel2Config.ndvi_bands }
[docs] class Sentinel2Band(Sentinel2Config, Band): """Band with Sentinel2 specific name variables and regexes.""" metadata_attributes = Sentinel2Config.metadata_attributes | { "boa_add_offset": "_get_boa_add_offset_dict", } def _get_boa_add_offset_dict(self, xml_file: str) -> int | None: pat = re.compile( r""" <BOA_ADD_OFFSET\s* band_id="(?P<band_id>\d+)"\s* >\s*(?P<value>-?\d+)\s* </BOA_ADD_OFFSET> """, flags=re.VERBOSE, ) try: matches = [x.groupdict() for x in re.finditer(pat, xml_file)] except (TypeError, AttributeError, KeyError) as e: raise _RegexError(f"Could not find boa_add_offset info from {pat}") from e if not matches: return None dict_ = ( pd.DataFrame(matches).set_index("band_id")["value"].astype(int).to_dict() ) # some xml files have band ids in range index form # converting these to actual band ids (B01 etc.) is_integer_coded = [int(i) for i in dict_] == list(range(len(dict_))) if is_integer_coded: # the xml files contain 13 bandIds for both L1C and L2A # eventhough L2A doesn't have band B10 all_bands = list(self.l1c_bands) if len(all_bands) != len(dict_): raise ValueError( f"Different number of bands in xml file and config for {self.name}: {all_bands}, {list(dict_)}" ) dict_ = { band_id: value for band_id, value in zip(all_bands, dict_.values(), strict=True) } try: return dict_[self.band_id] except KeyError as e: band_id = self.band_id.upper() for txt in ["B0", "B", "A"]: band_id = band_id.replace(txt, "") try: return dict_[band_id] except KeyError: continue raise KeyError(self.band_id, dict_) from e
[docs] class Sentinel2Image(Sentinel2Config, Image): """Image with Sentinel2 specific name variables and regexes.""" band_class: ClassVar[Sentinel2Band] = Sentinel2Band
[docs] def ndvi( self, red_band: str = "B04", nir_band: str = "B08", padding: int = 0, copy: bool = True, ) -> NDVIBand: """Calculate the NDVI for the Image.""" return super().ndvi( red_band=red_band, nir_band=nir_band, padding=padding, copy=copy )
[docs] class Sentinel2Collection(Sentinel2Config, ImageCollection): """ImageCollection with Sentinel2 specific name variables and path regexes.""" image_class: ClassVar[Sentinel2Image] = Sentinel2Image band_class: ClassVar[Sentinel2Band] = Sentinel2Band def __init__(self, data: str | Path | Sequence[Image], **kwargs) -> None: """ImageCollection with Sentinel2 specific name variables and path regexes.""" level = kwargs.get("level", None_) if callable(level) and level() is None: raise ValueError("Must specify level for Sentinel2Collection.") super().__init__(data=data, **kwargs)
[docs] class Sentinel2CloudlessBand(Sentinel2CloudlessConfig, Band): """Band for cloudless mosaic with Sentinel2 specific name variables and regexes."""
[docs] class Sentinel2CloudlessImage(Sentinel2CloudlessConfig, Sentinel2Image): """Image for cloudless mosaic with Sentinel2 specific name variables and regexes.""" band_class: ClassVar[Sentinel2CloudlessBand] = Sentinel2CloudlessBand ndvi = Sentinel2Image.ndvi
[docs] class Sentinel2CloudlessCollection(Sentinel2CloudlessConfig, ImageCollection): """ImageCollection with Sentinel2 specific name variables and regexes.""" image_class: ClassVar[Sentinel2CloudlessImage] = Sentinel2CloudlessImage band_class: ClassVar[Sentinel2Band] = Sentinel2CloudlessBand
[docs] def concat_image_collections(collections: Sequence[ImageCollection]) -> ImageCollection: """Concatenate ImageCollections.""" resolutions = {x.res for x in collections} if len(resolutions) > 1: raise ValueError(f"resoultion mismatch. {resolutions}") images = list(itertools.chain.from_iterable([x.images for x in collections])) levels = {x.level for x in collections} level = next(iter(levels)) if len(levels) == 1 else None first_collection = collections[0] out_collection = first_collection.__class__( images, level=level, band_class=first_collection.band_class, image_class=first_collection.image_class, **first_collection._common_init_kwargs, ) out_collection._all_file_paths = list( sorted( set(itertools.chain.from_iterable([x._all_file_paths for x in collections])) ) ) return out_collection
def _get_gradient(band: Band, degrees: bool = False, copy: bool = True) -> Band: copied = band.copy() if copy else band if len(copied.values.shape) == 3: return np.array( [_slope_2d(arr, copied.res, degrees=degrees) for arr in copied.values] ) elif len(copied.values.shape) == 2: return _slope_2d(copied.values, copied.res, degrees=degrees) else: raise ValueError("array must be 2 or 3 dimensional") def _slope_2d(array: np.ndarray, res: int | tuple[int], degrees: int) -> np.ndarray: resx, resy = _res_as_tuple(res) gradient_x, gradient_y = np.gradient(array, resx, resy) gradient = abs(gradient_x) + abs(gradient_y) if not degrees: return gradient radians = np.arctan(gradient) degrees = np.degrees(radians) assert np.max(degrees) <= 90 return degrees def _clip_xarray( xarr: DataArray, mask: tuple[int, int, int, int], crs: Any, **kwargs, ) -> DataArray: # xarray needs a numpy array of polygons mask_arr: np.ndarray = to_geoseries(mask).values try: return xarr.rio.clip( mask_arr, crs=crs, **kwargs, ) except NoDataInBounds: return np.array([]) def _get_all_file_paths(path: str) -> set[str]: if is_dapla(): return {_fix_path(x) for x in sorted(set(_glob_func(path + "/**")))} else: return { _fix_path(x) for x in sorted( set( _glob_func(path + "/**") + _glob_func(path + "/**/**") + _glob_func(path + "/**/**/**") + _glob_func(path + "/**/**/**/**") + _glob_func(path + "/**/**/**/**/**") ) ) } def _get_images( image_paths: list[str], *, all_file_paths: list[str], df: pd.DataFrame, processes: int, image_class: type, band_class: type, bbox: GeoDataFrame | GeoSeries | Geometry | tuple[float] | None, masking: BandMasking | None, **kwargs, ) -> list[Image]: with joblib.Parallel(n_jobs=processes, backend="threading") as parallel: images: list[Image] = parallel( joblib.delayed(image_class)( path, df=df, all_file_paths=all_file_paths, masking=masking, band_class=band_class, **kwargs, ) for path in image_paths ) if bbox is not None: intersects_list = GeoSeries([img.union_all() for img in images]).intersects( to_shapely(bbox) ) return [ img for img, intersects in zip(images, intersects_list, strict=False) if intersects ] return images class _ArrayNotLoadedError(ValueError): """Arrays are not loaded."""
[docs] class PathlessImageError(ValueError): """'path' attribute is needed but instance has no path.""" def __init__(self, instance: _ImageBase) -> None: """Initialise error class.""" self.instance = instance def __str__(self) -> str: """String representation.""" if self.instance._merged: what = "that have been merged" elif self.instance._from_array: what = "from arrays" elif self.instance._from_geopandas: what = "from GeoDataFrames" else: raise ValueError(self.instance) return ( f"{self.instance.__class__.__name__} instances {what} " "have no 'path' until they are written to file." )
def _date_is_within( date: str | None, date_ranges: DATE_RANGES_TYPE, ) -> bool: if date_ranges is None: return True if date is None: return False date = pd.Timestamp(date) if all(x is None or isinstance(x, str) for x in date_ranges): date_ranges = (date_ranges,) for date_range in date_ranges: date_min, date_max = date_range if date_min is not None: date_min = pd.Timestamp(date_min) if date_max is not None: date_max = pd.Timestamp(date_max) if (date_min is None or date >= date_min) and ( date_max is None or date <= date_max ): return True return False def _get_dtype_min(dtype: str | type) -> int | float: try: return np.iinfo(dtype).min except ValueError: return np.finfo(dtype).min def _get_dtype_max(dtype: str | type) -> int | float: try: return np.iinfo(dtype).max except ValueError: return np.finfo(dtype).max def _intesects(x, other) -> bool: return box(*x.bounds).intersects(other) def _copy_and_add_df_parallel( group_values: tuple[Any, ...], group_df: pd.DataFrame, self: ImageCollection, copy: bool, ) -> tuple[tuple[Any], ImageCollection]: copied = self.copy() if copy else self copied.images = [ img.copy() if copy else img for img in group_df.drop_duplicates("_image_idx")["_image_instance"] ] if "band_id" in group_df: band_ids = set(group_df["band_id"].values) for img in copied.images: img._bands = [band for band in img if band.band_id in band_ids] return (group_values, copied) def _get_bounds(bounds, bbox, band_bounds: Polygon) -> None | Polygon: if bounds is None and bbox is None: return None elif bounds is not None and bbox is None: return to_shapely(bounds).intersection(band_bounds) elif bounds is None and bbox is not None: return to_shapely(bbox).intersection(band_bounds) else: return to_shapely(bounds).intersection(to_shapely(bbox)) def _get_single_value(values: tuple): if len(set(values)) == 1: return next(iter(values)) else: return None def _open_raster(path: str | Path) -> rasterio.io.DatasetReader: with opener(path) as file: return rasterio.open(file) def _read_mask_array(self: Band | Image, **kwargs) -> np.ndarray: mask_band_id = self.masking["band_id"] mask_paths = [path for path in self._all_file_paths if mask_band_id in path] if len(mask_paths) > 1: raise ValueError( f"Multiple file_paths match mask band_id {mask_band_id} for {self.path}" ) elif not mask_paths: raise ValueError( f"No file_paths match mask band_id {mask_band_id} for {self.path} among " + str([Path(x).name for x in _ls_func(self.path)]) ) band = Band( next(iter(mask_paths)), **{**self._common_init_kwargs, "metadata": None}, ) band.load(**kwargs) boolean_mask = np.isin(band.values, list(self.masking["values"])) return boolean_mask def _load_band(band: Band, **kwargs) -> Band: return band.load(**kwargs) def _band_apply(band: Band, func: Callable, **kwargs) -> Band: return band.apply(func, **kwargs) def _clip_band(band: Band, mask, **kwargs) -> Band: return band.clip(mask, **kwargs) def _merge_by_band(collection: ImageCollection, **kwargs) -> Image: return collection.merge_by_band(**kwargs) def _merge(collection: ImageCollection, **kwargs) -> Band: return collection.merge(**kwargs) def _zonal_one_pair(i: int, poly: Polygon, band: Band, aggfunc, array_func, func_names): clipped = band.copy().clip(poly) if not np.size(clipped.values): return _no_overlap_df(func_names, i, date=band.date) return _aggregate(clipped.values, array_func, aggfunc, func_names, band.date, i)
[docs] def array_buffer(arr: np.ndarray, distance: int) -> np.ndarray: """Buffer array points with the value 1 in a binary array. Args: arr: The array. distance: Number of array cells to buffer by. Returns: Array with buffered values. """ if not np.all(np.isin(arr, (1, 0, True, False))): raise ValueError("Array must be all 0s and 1s or boolean.") dtype = arr.dtype structure = np.ones((2 * abs(distance) + 1, 2 * abs(distance) + 1)) arr = np.where(arr, 1, 0) if distance > 0: return binary_dilation(arr, structure=structure).astype(dtype) elif distance < 0: return binary_erosion(arr, structure=structure).astype(dtype)
def _plot_pixels_1d( y: np.ndarray, x: np.ndarray, alpha: float, x_var: str, y_label: str, rounding: int, figsize: tuple, first_date: pd.Timestamp, ) -> None: coef, intercept = np.linalg.lstsq( np.vstack([x, np.ones(x.shape[0])]).T, y, rcond=None, )[0] predicted = np.array([intercept + coef * x for x in x]) predicted_start = predicted[0] predicted_end = predicted[-1] predicted_change = predicted_end - predicted_start # Degrees of freedom dof = len(x) - 2 # 95% confidence interval t_val = stats.t.ppf(1 - alpha / 2, dof) # Mean squared error of the residuals mse = np.sum((y - predicted) ** 2) / dof # Calculate the standard error of predictions pred_stderr = np.sqrt( mse * (1 / len(x) + (x - np.mean(x)) ** 2 / np.sum((x - np.mean(x)) ** 2)) ) # Calculate the confidence interval for predictions ci_lower = predicted - t_val * pred_stderr ci_upper = predicted + t_val * pred_stderr fig = plt.figure(figsize=figsize) ax = fig.add_subplot(1, 1, 1) ax.scatter(x, y, color="#2c93db") ax.plot(x, predicted, color="#e0436b") ax.fill_between( x, ci_lower, ci_upper, color="#e0436b", alpha=0.2, label=f"{int(alpha*100)}% CI", ) plt.title( f"coef: {round(coef, int(np.log(1 / abs(coef))))}, " f"pred change: {round(predicted_change, rounding)}, " f"pred start: {round(predicted_start, rounding)}, " f"pred end: {round(predicted_end, rounding)}" ) plt.xlabel(x_var) plt.ylabel(y_label) if x_var == "date": date_labels = pd.to_datetime( [first_date + pd.Timedelta(days=int(day)) for day in x] ) _, unique_indices = np.unique(date_labels.strftime("%Y-%m"), return_index=True) unique_x = np.array(x)[unique_indices] unique_labels = date_labels[unique_indices].strftime("%Y-%m") ax.set_xticks(unique_x) ax.set_xticklabels(unique_labels, rotation=45, ha="right") plt.show()
[docs] def pixelwise( func: Callable, values: np.ndarray, mask_array: np.ndarray | None = None, index_aligned_kwargs: dict | None = None, kwargs: dict | None = None, processes: int = 1, ) -> tuple[np.ndarray, np.ndarray, list[Any]]: """Run a function for each pixel of a 3d array.""" index_aligned_kwargs = index_aligned_kwargs or {} kwargs = kwargs or {} if mask_array is not None: # skip pixels where all values are masked not_all_missing = np.all(mask_array, axis=0) == False else: mask_array = np.full(values.shape, False) not_all_missing = np.full(values.shape[1:], True) def select_pixel_values(row: int, col: int) -> np.ndarray: return values[~mask_array[:, row, col], row, col] # loop through long 1d arrays of aligned row and col indices nonmissing_row_indices, nonmissing_col_indices = not_all_missing.nonzero() with joblib.Parallel(n_jobs=processes, backend="loky") as parallel: results: list[Any] = parallel( joblib.delayed(func)( select_pixel_values(row, col), **kwargs, **{ key: value[~mask_array[:, row, col]] for key, value in index_aligned_kwargs.items() }, ) for row, col in ( zip(nonmissing_row_indices, nonmissing_col_indices, strict=True) ) ) return nonmissing_row_indices, nonmissing_col_indices, results