Parallel

class Parallel(processes, backend='multiprocessing', context='spawn', maxtasksperchild=10, chunksize=1, **kwargs)[source]

Bases: object

Run functions in parallell.

The main method is ‘map’, which runs a single function for each item of an iterable. If the items of the iterable also are iterables, starmap can be used.

The class also provides functions for reading and writing files in parallell in dapla.

Note that nothing gets printed during execution if running in a notebook. Tip for debugging: set processes=1 to run without parallelization.

Note that when using the default backend ‘multiprocessing’, all code except for imports and functions should be guarded by ‘if __name__ == “__main__”’ to not cause an eternal loop. This is not the case if setting backend to ‘loky’. See joblib’s documentation: https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

Parameters:
  • processes (int) – Number of parallel processes. Set to 1 to run without parallelization.

  • backend (str) – Defaults to “multiprocessing”. Other options are ‘loky’ and ‘threading’, offered through joblib’s Parallel class.

  • context (str) – Start method for the processes. Defaults to ‘spawn’ to avoid frozen processes.

  • maxtasksperchild (int) – Number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. Defaults to 10 to

  • **kwargs – Keyword arguments to be passed to either multiprocessing.Pool or joblib.Parallel, depending on the backend. Not to be confused with the kwargs passed to functions in the map and starmap methods.

  • chunksize (int)

chunkwise(func, iterable, args=None, kwargs=None, max_rows_per_chunk=None)[source]

Run a function in parallel on chunks of a (Geo)DataFrame.

Parameters:
  • func (Callable) – Function to run chunkwise. It should take (a chunk of) the iterable as first argument.

  • iterable (Collection[Iterable[Any]]) – Iterable to split in chunks and passed as first argument to ‘func’.

  • args (tuple | None) – Positional arguments in ‘func’ after the DataFrame.

  • kwargs (dict | None) – Additional keyword arguments in ‘func’.

  • max_rows_per_chunk (int | None) – Alternatively decide number of chunks by a maximum number of rows per chunk.

Return type:

Collection[Iterable[Any]]

map(func, iterable, args=None, kwargs=None)[source]

Run functions in parallel with items of an iterable as 0th arguemnt.

Parameters:
  • func (Callable) – Function to be run.

  • iterable (Collection) – An iterable where each item will be passed to func as 0th positional argument.

  • args (tuple | None) – Positional arguments passed to ‘func’ starting from the 1st argument. The 0th argument will be reserved for the values of ‘iterable’.

  • kwargs (dict | None) – Keyword arguments passed to ‘func’. Must be passed as a dict, not unpacked into separate keyword arguments.

Return type:

list[Any]

Returns:

A list of the return values of the function, one for each item in ‘iterable’.

Examples:

Multiply each list element by 2.

>>> iterable = [1, 2, 3]
>>> def x2(x):
...     return x * 2
>>> p = sg.Parallel(4, backend="loky")
>>> results = p.map(x2, iterable)
>>> results
[2, 4, 6]

With args and kwargs.

>>> iterable = [1, 2, 3]
>>> def x2(x, plus, minus):
...     return x * 2 + plus - minus
>>> p = sg.Parallel(4, backend="loky")
...
>>> # these three are the same
>>> results1 = p.map(x2, iterable, args=(2, 1))
>>> results2 = p.map(x2, iterable, kwargs=dict(plus=2, minus=1))
>>> results3 = p.map(x2, iterable, args=(2,), kwargs=dict(minus=1))
>>> assert results1 == results2 == results3
...
>>> results1
[3, 5, 7]

If in Jupyter the function should be defined in another module. And if using the multiprocessing backend, the code should be guarded by if __name__ == “__main__”.

>>> from .file import x2
>>> if __name__ == "__main__":
...     p = sg.Parallel(4, backend="loky")
...     results = p.map(x2, iterable)
...     print(results)
[2, 4, 6]
read_geopandas(files, concat=True, ignore_index=True, strict=True, **kwargs)[source]

Read geospatial files from a list in parallel.

Parameters:
  • files (list[str]) – List of file paths.

  • concat (bool) – Whether to concat the results to a GeoDataFrame.

  • ignore_index (bool) – Defaults to True.

  • strict (bool) – If True (default), all files must exist.

  • chunksize – The size of the chunks of the iterable to distribute to workers.

  • **kwargs – Keyword arguments passed to sgis.read_geopandas.

Return type:

GeoDataFrame | list[GeoDataFrame]

Returns:

A GeoDataFrame, or a list of GeoDataFrames if concat is False.

read_pandas(files, concat=True, ignore_index=True, strict=True, **kwargs)[source]

Read tabular files from a list in parallel.

Parameters:
  • files (list[str]) – List of file paths.

  • concat (bool) – Whether to concat the results to a DataFrame.

  • ignore_index (bool) – Defaults to True.

  • strict (bool) – If True (default), all files must exist.

  • **kwargs – Keyword arguments passed to dapla.read_pandas.

Return type:

DataFrame | list[DataFrame]

Returns:

A DataFrame, or a list of DataFrames if concat is False.

starmap(func, iterable, args=None, kwargs=None)[source]

Run functions in parallel where items of the iterable are unpacked.

This requires the items of the iterable to be iterables as well. See https://docs.python.org/3/library/itertools.html#itertools.starmap

Parameters:
  • func (Callable) – Function to be run.

  • iterable (Collection[Iterable[Any]]) – An iterable of iterables, where each item will be unpacked as positional argument to the function.

  • args (tuple | None) – Positional arguments passed to ‘func’ starting at argument position n + 1, where n is the length of the iterables inside the iterable.

  • kwargs (dict | None) – Keyword arguments passed to ‘func’. Must be passed as a dict, not unpacked into separate keyword arguments.

Return type:

list[Any]

Returns:

A list of the return values of the function, one for each item in ‘iterable’.

Examples:

Multiply each list element by 2.

>>> iterable = [(1, 2), (2, 3), (3, 4)]
>>> def add(x, y):
...     return x + y
>>> p = sg.Parallel(3, backend="loky")
>>> results = p.starmap(add, iterable)
>>> results
[3, 5, 7]

With args and kwargs. Since the iterables inside ‘iterable’ are of length 2, ‘args’ will start at argument number three, e.i. ‘c’.

>>> iterable = [(1, 2), (2, 3), (3, 4)]
>>> def add(a, b, c, *, d):
...     return a + b + c + d
>>> p = sg.Parallel(3, backend="loky")
>>> results = p.starmap(add, iterable, args=(1,), kwargs={"d": 0.1})
>>> results
[4.1, 6.1, 8.1]

If in Jupyter the function should be defined in another module. And if using the multiprocessing backend, the code should be guarded by if __name__ == “__main__”.

>>> from .file import x2
>>> if __name__ == "__main__":
...     p = sg.Parallel(4, backend="loky")
...     results = p.starmap(add, iterable)
...     print(results)
[3, 5, 7]
write_municipality_data(in_data, out_data, municipalities, with_neighbors=False, funcdict=None, file_type='parquet', muni_number_col='KOMMUNENR', strict=False, write_empty=False, id_assign_func=<function clean_overlay>, verbose=True)[source]

Split multiple datasets into municipalities and write as separate files.

The files will be named as the municipality number. Each dataset in ‘in_data’ is intersected with ‘municipalities’ in parallel. The intersections themselves can also be run in parallel with the ‘processes_in_clip’ argument.

Parameters:
  • in_data (dict[str, str | GeoDataFrame]) – Dictionary with dataset names as keys and file paths or (Geo)DataFrames as values. Note that the files will be read in parallel if file paths are used.

  • out_data (str | dict[str, str]) – Either a single folder path or a dictionary with same keys as ‘in_data’ and folder paths as values. If a single folder is passed, the ‘in_data’ keys will be used as subfolders.

  • municipalities (GeoDataFrame) – GeoDataFrame of municipalities (or similar) of which to split the data by.

  • with_neighbors (bool) – If True, the resulting data will include neighbor municipalities, as well as the munipality itself. Defaults to False.

  • funcdict (dict[str, Callable] | None) – Dictionary with the keys of ‘in_data’ and functions as values. The functions should take a GeoDataFrame as input and return a GeoDataFrame. The function will be excecuted before the right after the data is read.

  • file_type (str) – Defaults to parquet.

  • muni_number_col (str) – String column name with municipality number/identifier. Defaults to KOMMUNENR. If the column is not present in the data to be split, the data will be intersected with the municipalities.

  • strict (bool) – If False (default), the dictionaries ‘out_data’ and ‘funcdict’ does not have to have the same length as ‘in_data’.

  • write_empty (bool) – If False (default), municipalities with no data will be skipped. If True, an empty parquet file will be written.

  • id_assign_func (Callable | partial) – Function to assign ids (e.g. municipality number) to the dataframe for missing values.

  • verbose (bool) – Whether to print during execution.

Return type:

None

chunkwise(func, iterable, args=None, kwargs=None, processes=1, max_rows_per_chunk=None, backend='loky')[source]

Run a function in parallel on chunks of a DataFrame.

This method is used to process large (Geo)DataFrames in manageable pieces, optionally in parallel.

Parameters:
  • func (Callable) – The function to apply to each chunk. This function must accept a DataFrame as its first argument and return a DataFrame.

  • iterable (Collection[Iterable[Any]]) – Iterable to be chunked and processed.

  • args (tuple | None) – Additional positional arguments to pass to ‘func’.

  • kwargs (dict | None) – Keyword arguments to pass to ‘func’.

  • processes (int) – The number of parallel jobs to run. Defaults to 1 (no parallel execution).

  • max_rows_per_chunk (int | None) – The maximum number of rows each chunk should contain.

  • backend (str) – The backend to use for parallel execution (e.g., ‘loky’, ‘multiprocessing’).

Return type:

Collection[Iterable[Any]]

Returns:

Iterable of iterable.

parallel_overlay(df1, df2, processes, how='intersection', max_rows_per_chunk=None, backend='loky', to_print=None, **kwargs)[source]

Perform spatial overlay operations on two GeoDataFrames in parallel.

This function splits the first GeoDataFrame into chunks, processes each chunk in parallel using the specified overlay operation with the second GeoDataFrame, and then concatenates the results.

Note that this function is most useful if df2 has few and simple geometries.

Parameters:
  • df1 (GeoDataFrame) – The first GeoDataFrame for the overlay operation.

  • df2 (GeoDataFrame) – The second GeoDataFrame for the overlay operation.

  • how (str) – Type of overlay operation (‘intersection’, ‘union’, etc.).

  • processes (int) – Number of parallel processes to use.

  • max_rows_per_chunk (int | None) – Maximum number of rows per chunk for processing. This helps manage memory usage.

  • backend (str) – The parallelization backend to use (‘loky’, ‘multiprocessing’, ‘threading’).

  • to_print (str | None) – Optional text to print to see progression.

  • **kwargs – Additional keyword arguments to pass to the overlay function.

Return type:

GeoDataFrame

Returns:

A GeoDataFrame containing the result of the overlay operation.

parallel_overlay_rowwise(df1, df2, processes, max_rows_per_chunk=None, backend='loky', to_print=None, **kwargs)[source]

Perform spatial clip on two GeoDataFrames in parallel.

This function splits the first GeoDataFrame into chunks, processes each chunk in parallel using the specified overlay operation with the second GeoDataFrame, and then concatenates the results.

Note that this function is most useful if df2 has few and simple geometries.

Parameters:
  • df1 (GeoDataFrame) – The first GeoDataFrame for the overlay operation.

  • df2 (GeoDataFrame) – The second GeoDataFrame for the overlay operation.

  • how – Type of overlay operation (‘intersection’, ‘union’, etc.).

  • processes (int) – Number of parallel processes to use.

  • max_rows_per_chunk (int | None) – Maximum number of rows per chunk for processing. This helps manage memory usage.

  • backend (str) – The parallelization backend to use (‘loky’, ‘multiprocessing’, ‘threading’).

  • to_print (str | None) – Optional text to print to see progression.

  • **kwargs – Additional keyword arguments to pass to the overlay function.

Return type:

GeoDataFrame

Returns:

A GeoDataFrame containing the result of the overlay operation.

parallel_sjoin(df1, df2, processes, max_rows_per_chunk=None, backend='loky', to_print=None, **kwargs)[source]

Perform spatial clip on two GeoDataFrames in parallel.

This function splits the first GeoDataFrame into chunks, processes each chunk in parallel using the specified overlay operation with the second GeoDataFrame, and then concatenates the results.

Note that this function is most useful if df2 has few and simple geometries.

Parameters:
  • df1 (GeoDataFrame) – The first GeoDataFrame for the overlay operation.

  • df2 (GeoDataFrame) – The second GeoDataFrame for the overlay operation.

  • how – Type of overlay operation (‘intersection’, ‘union’, etc.).

  • processes (int) – Number of parallel processes to use.

  • max_rows_per_chunk (int | None) – Maximum number of rows per chunk for processing. This helps manage memory usage.

  • backend (str) – The parallelization backend to use (‘loky’, ‘multiprocessing’, ‘threading’).

  • to_print (str | None) – Optional text to print to see progression.

  • **kwargs – Additional keyword arguments to pass to the overlay function.

Return type:

GeoDataFrame

Returns:

A GeoDataFrame containing the result of the overlay operation.

write_municipality_data(data, out_folder, municipalities=None, with_neighbors=False, muni_number_col='KOMMUNENR', file_type='parquet', func=None, write_empty=False, id_assign_func=<function clean_overlay>, strict=True, verbose=True)[source]

Splits and writes data into municipality-specific files.

Parameters:
  • data (str | GeoDataFrame | DataFrame) – Path to the data file or a GeoDataFrame.

  • out_folder (str) – Path to the output directory where the municipality data is written.

  • municipalities (GeoDataFrame | list[str] | None) – Either a sequence of municipality numbers or a GeoDataFrame of municipality polygons and municipality numbers in the column ‘muni_number_col’. Defaults to None.

  • with_neighbors (bool) – If True, include data from neighboring municipalities for each municipality.

  • muni_number_col (str) – Column name for municipality codes in ‘municipalities’.

  • file_type (str) – Format of the output file.

  • func (Callable | None) – Function to process data before writing.

  • write_empty (bool) – If True, write empty files for municipalities without data.

  • clip – If True, clip the data to municipality boundaries. If False the data is spatial joined.

  • max_rows_per_chunk – Maximum number of rows in each processed chunk.

  • processes_in_clip – Number of processes to use for clipping.

  • strict (bool) – If True (default) and the data has a municipality column, all municipality numbers in ‘data’ must be present in ‘municipalities’.

  • id_assign_func (Callable) – Function to assign ids (e.g. municipality number) to the dataframe for missing values.

  • verbose (bool) – Whether to print during execution.

Return type:

None

Returns:

None. The function writes files directly.