"""Contains the NetworkAnalysis class.
The class has five analysis methods: od_cost_matrix, get_route, get_k_routes,
get_route_frequencies and service_area.
"""
from copy import copy
from copy import deepcopy
from datetime import datetime
from time import perf_counter
from typing import Any
import igraph
import numpy as np
import pandas as pd
from geopandas import GeoDataFrame
from igraph import Graph
from pandas import DataFrame
from pandas import MultiIndex
from ..geopandas_tools.general import _push_geom_col
from ._get_route import _get_k_routes
from ._get_route import _get_route
from ._get_route import _get_route_frequencies
from ._od_cost_matrix import _od_cost_matrix
from ._points import Destinations
from ._points import Origins
from ._service_area import _service_area
from .cutting_lines import split_lines_by_nearest_point
from .network import Network
from .networkanalysisrules import NetworkAnalysisRules
[docs]
class NetworkAnalysis:
"""Class for doing network analysis.
The class takes a GeoDataFrame of line geometries and rules for the analyses,
and holds methods for doing network analysis based on GeoDataFrames of origin
and destination points.
The 'od_cost_matrix' method is the fastest, and returns a DataFrame with only
indices and travel costs between each origin-destination pair.
The 'get_route' method does the same, but also returns the line geometry of the
routes. 'get_k_routes' can be used to find multiple routes between each OD pair.
The service_area methods only take a set of origins, and return the lines that
can be reached within one or more breaks.
The 'get_route_frequencies' method is a bit different. It returns the individual
line segments that were visited with an added column for how many times the
segments were used.
Attributes:
network: A Network instance that holds the lines and nodes (points).
rules: NetworkAnalysisRules instance.
log: A DataFrame with information about each analysis run.
Examples:
---------
Read example data.
>>> import sgis as sg
>>> roads = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/roads_oslo_2022.parquet")
Preparing the lines for directed network analysis.
>>> connected_roads = sg.get_connected_components(roads).query("connected == 1")
>>> directed_roads = sg.make_directed_network(
... connected_roads,
... direction_col="oneway",
... direction_vals_bft=("B", "FT", "TF"),
... minute_cols=("drivetime_fw", "drivetime_bw"),
... dropnegative=True,
... dropna=True,
... )
>>> rules = sg.NetworkAnalysisRules(weight="minutes", directed=True)
>>> nwa = sg.NetworkAnalysis(network=directed_roads, rules=rules, detailed_log=False)
>>> nwa
NetworkAnalysis(
network=Network(6364 km, percent_bidirectional=87),
rules=NetworkAnalysisRules(weight=minutes, directed=True, search_tolerance=250, search_factor=0, split_lines=False, ...),
log=True, detailed_log=True,
)
Now we're ready for network analysis.
"""
def __init__(
self,
network: GeoDataFrame,
rules: NetworkAnalysisRules | dict,
log: bool = True,
detailed_log: bool = False,
) -> None:
"""Initialise NetworkAnalysis instance.
Args:
network: A GeoDataFrame of line geometries.
rules: The rules for the analysis, either as an instance of
NetworkAnalysisRules or a dictionary with the parameters
as keys.
log: If True (default), a DataFrame with information about each
analysis run will be stored in the 'log' attribute.
detailed_log: If True, the log DataFrame will include columns for
all arguments passed to the analysis method, plus standard deviation and
percentiles (25th, 50th, 75th) of the weight column in the results.
Defaults to False.
"""
if not isinstance(rules, NetworkAnalysisRules):
rules = NetworkAnalysisRules(**rules)
if not isinstance(network, Network):
network = Network(network)
self.network = network
self.rules = rules.copy()
self._log = log
self.detailed_log = detailed_log
self._check_if_holes_are_nan()
self.network.gdf = self.rules._validate_weight(self.network.gdf)
self._update_wkts()
self.rules._update_rules()
if log:
self.log = DataFrame()
self._graph_updated_count = 0
self._k_nearest_points = 50
def _check_if_holes_are_nan(self) -> None:
holes_are_nan: str = (
"Network holes have been filled by straigt lines, but the rows have "
f"NaN values in the {self.rules.weight!r} column. Either remove NaNs "
"or fill these values with a numeric value (e.g. 0)."
)
if (
hasattr(self.network.gdf, "hole")
and len(self.network.gdf.loc[lambda x: x["hole"] == 1])
and (
self.network.gdf.loc[lambda x: x["hole"] == 1, self.rules.weight]
.isna()
.all()
)
):
raise ValueError(holes_are_nan)
[docs]
def od_cost_matrix(
self,
origins: GeoDataFrame,
destinations: GeoDataFrame,
*,
rowwise: bool = False,
destination_count: int | None = None,
cutoff: int | float | None = None,
lines: bool = False,
) -> DataFrame | GeoDataFrame:
"""Fast calculation of many-to-many travel costs.
Finds the the lowest cost (minutes, meters, etc.) from a set of origins to a
set of destinations. The index of the origins and destinations are used as
values for the returned columns 'origins' and 'destinations'.
Args:
origins: GeoDataFrame of points from where the trips will originate
destinations: GeoDataFrame of points from where the trips will terminate
rowwise: if False (default), it will calculate the cost from each
origins to each destination. If true, it will calculate the cost from
origin 1 to destination 1, origin 2 to destination 2 and so on.
destination_count: number of closest destinations to keep for each origin.
If None (default), all trips will be included. The number of
destinations might be higher than the destination_count if trips have
equal cost.
cutoff: the maximum cost (weight) for the trips. Defaults to None,
meaning all rows will be included. NaNs will also be removed if cutoff
is specified.
lines: if True, returns a geometry column with straight lines between
origin and destination. Defaults to False.
Returns:
A DataFrame with the weight column and the columns 'origin' and
'destination', containing the indices of the origins and destinations
GeoDataFrames. If lines is True, also returns a geometry column with
straight lines between origin and destination.
Examples:
---------
Create the NetworkAnalysis instance.
>>> import sgis as sg
>>> roads = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/roads_oslo_2022.parquet")
>>> directed_roads = sg.get_connected_components(roads).loc[lambda x: x["connected"] == 1].pipe(sg.make_directed_network_norway, dropnegative=True)
>>> rules = sg.NetworkAnalysisRules(weight="minutes", directed=True)
>>> nwa = sg.NetworkAnalysis(network=directed_roads, rules=rules, detailed_log=False)
Create some origin and destination points.
>>> points = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/points_oslo.parquet")
>>> origins = points.loc[:99, ["geometry"]]
>>> origins
geometry
0 POINT (263122.700 6651184.900)
1 POINT (272456.100 6653369.500)
2 POINT (270082.300 6653032.700)
3 POINT (259804.800 6650339.700)
4 POINT (272876.200 6652889.100)
.. ...
95 POINT (270348.000 6651899.400)
96 POINT (264845.600 6649005.800)
97 POINT (263162.000 6650732.200)
98 POINT (272322.700 6653729.100)
99 POINT (265622.800 6644644.200)
<BLANKLINE>
[100 rows x 1 columns]
>>> destinations = points.loc[100:199, ["geometry"]]
>>> destinations
geometry
100 POINT (265997.900 6647899.400)
101 POINT (263835.200 6648677.700)
102 POINT (265764.000 6644063.900)
103 POINT (265970.700 6651258.500)
104 POINT (264624.300 6649937.700)
.. ...
195 POINT (258175.600 6653694.300)
196 POINT (258772.200 6652487.600)
197 POINT (273135.300 6653198.100)
198 POINT (270582.300 6652163.800)
199 POINT (264980.800 6647231.300)
<BLANKLINE>
[100 rows x 1 columns]
Travel time from 100 to 100 points.
>>> od = nwa.od_cost_matrix(origins, destinations)
>>> od
origin destination minutes
0 0 100 8.765621
1 0 101 6.383407
2 0 102 13.482324
3 0 103 6.410121
4 0 104 5.882124
... ... ... ...
9995 99 195 20.488644
9996 99 196 16.721241
9997 99 197 19.977029
9998 99 198 15.233163
9999 99 199 6.439002
<BLANKLINE>
[10000 rows x 3 columns]
Assign aggregated values onto the origins (or destinations).
>>> origins["minutes_min"] = od.groupby("origin")["minutes"].min()
>>> origins["minutes_mean"] = od.groupby("origin")["minutes"].mean()
>>> origins["n_missing"] = len(origins) - od.groupby("origin")["minutes"].count()
>>> origins
geometry minutes_min minutes_mean n_missing
0 POINT (263122.700 6651184.900) 0.966702 11.628637 0
1 POINT (272456.100 6653369.500) 2.754545 16.084722 0
2 POINT (270082.300 6653032.700) 1.768334 15.304246 0
3 POINT (259804.800 6650339.700) 2.776873 14.044023 0
4 POINT (272876.200 6652889.100) 0.541074 17.565747 0
.. ... ... ... ...
95 POINT (270348.000 6651899.400) 1.529400 15.427027 0
96 POINT (264845.600 6649005.800) 1.336207 11.239592 0
97 POINT (263162.000 6650732.200) 1.010721 11.904372 0
98 POINT (272322.700 6653729.100) 3.175472 17.579399 0
99 POINT (265622.800 6644644.200) 1.116209 12.185800 0
<BLANKLINE>
[100 rows x 4 columns]
Join the results onto the 'origins' via the index.
>>> joined = origins.join(od.set_index("origin"))
>>> joined
geometry destination minutes
0 POINT (263122.700 6651184.900) 100 8.765621
0 POINT (263122.700 6651184.900) 101 6.383407
0 POINT (263122.700 6651184.900) 102 13.482324
0 POINT (263122.700 6651184.900) 103 6.410121
0 POINT (263122.700 6651184.900) 104 5.882124
.. ... ... ...
99 POINT (265622.800 6644644.200) 195 20.488644
99 POINT (265622.800 6644644.200) 196 16.721241
99 POINT (265622.800 6644644.200) 197 19.977029
99 POINT (265622.800 6644644.200) 198 15.233163
99 POINT (265622.800 6644644.200) 199 6.439002
<BLANKLINE>
[10000 rows x 3 columns]
Keep only travel times of 10 minutes or less. This is the same as using the
cutoff parameter.
>>> ten_min_or_less = od.loc[od.minutes <= 10]
>>> joined = origins.join(ten_min_or_less.set_index("origin"))
>>> joined
geometry destination minutes
0 POINT (263122.700 6651184.900) 100.0 8.765621
0 POINT (263122.700 6651184.900) 101.0 6.383407
0 POINT (263122.700 6651184.900) 103.0 6.410121
0 POINT (263122.700 6651184.900) 104.0 5.882124
0 POINT (263122.700 6651184.900) 106.0 9.811828
.. ... ... ...
99 POINT (265622.800 6644644.200) 173.0 4.305523
99 POINT (265622.800 6644644.200) 174.0 6.094040
99 POINT (265622.800 6644644.200) 177.0 5.944194
99 POINT (265622.800 6644644.200) 183.0 8.449906
99 POINT (265622.800 6644644.200) 199.0 6.439002
<BLANKLINE>
[2195 rows x 3 columns]
Keep the three fastest times from each origin. This is the same as using the
destination_count parameter.
>>> three_fastest = od.loc[od.groupby("origin")["minutes"].rank() <= 3]
>>> joined = origins.join(three_fastest.set_index("origin"))
>>> joined
geometry destination minutes
0 POINT (263122.700 6651184.900) 135.0 0.966702
0 POINT (263122.700 6651184.900) 175.0 2.202638
0 POINT (263122.700 6651184.900) 188.0 2.931595
1 POINT (272456.100 6653369.500) 171.0 2.918100
1 POINT (272456.100 6653369.500) 184.0 2.754545
.. ... ... ...
98 POINT (272322.700 6653729.100) 184.0 3.175472
98 POINT (272322.700 6653729.100) 189.0 3.179428
99 POINT (265622.800 6644644.200) 102.0 1.648705
99 POINT (265622.800 6644644.200) 134.0 1.116209
99 POINT (265622.800 6644644.200) 156.0 1.368926
<BLANKLINE>
[294 rows x 3 columns]
Use set_index to use column as identifier insted of the index.
>>> origins["areacode"] = np.random.choice(["0301", "3401"], len(origins))
>>> od = nwa.od_cost_matrix(
... origins.set_index("areacode"),
... destinations
... )
>>> od
origin destination minutes
0 0301 100 8.765621
1 0301 101 6.383407
2 0301 102 13.482324
3 0301 103 6.410121
4 0301 104 5.882124
... ... ... ...
9995 3401 195 20.488644
9996 3401 196 16.721241
9997 3401 197 19.977029
9998 3401 198 15.233163
9999 3401 199 6.439002
<BLANKLINE>
[10000 rows x 3 columns]
Travel time from 1000 to 1000 points rowwise.
>>> points_reversed = points.iloc[::-1]
>>> od = nwa.od_cost_matrix(points, points_reversed, rowwise=True)
>>> od
origin destination minutes
0 0 999 14.692667
1 1 998 8.452691
2 2 997 16.370569
3 3 996 9.486131
4 4 995 16.521346
.. ... ... ...
995 995 4 16.794610
996 996 3 9.611700
997 997 2 19.968743
998 998 1 9.484374
999 999 0 14.892648
<BLANKLINE>
[1000 rows x 3 columns]
"""
if self._log:
time_ = perf_counter()
self._prepare_network_analysis(origins, destinations, rowwise)
ori = self.origins.gdf.set_index("temp_idx")
des = self.destinations.gdf.set_index("temp_idx")
results = _od_cost_matrix(
graph=self.graph,
origins=ori,
destinations=des,
weight=self.rules.weight,
lines=lines,
rowwise=rowwise,
)
if cutoff is not None:
results = results.loc[results[self.rules.weight] <= cutoff]
if destination_count:
results = results.loc[
results.groupby("origin")[self.rules.weight].rank() <= destination_count
]
results["origin"] = results["origin"].map(self.origins.idx_dict)
results["destination"] = results["destination"].map(self.destinations.idx_dict)
if lines:
results = _push_geom_col(results)
if self.rules.split_lines:
self._unsplit_network()
if self._log:
minutes_elapsed = round((perf_counter() - time_) / 60, 1)
self._runlog(
"od_cost_matrix",
results,
minutes_elapsed,
lines=lines,
rowwise=rowwise,
)
return results
[docs]
def get_route_frequencies(
self,
origins: GeoDataFrame,
destinations: GeoDataFrame,
weight_df: DataFrame | None = None,
default_weight: int | float | None = None,
rowwise: bool = False,
strict: bool = False,
frequency_col: str = "frequency",
) -> GeoDataFrame:
"""Finds the number of times each line segment was visited in all trips.
Finds the route with the lowest cost (minutes, meters, etc.) from a set of
origins to a set of destinations and summarises the number of times each
segment was used. The aggregation is done on the line indices, which is much
faster than getting the geometries and then dissolving.
The trip frequencies can be weighted (multiplied) based on 'weight_df'. See
example below.
Args:
origins: GeoDataFrame of points from where the routes will originate.
destinations: GeoDataFrame of points from where the routes will terminate.
weight_df: A long formated DataFrame where each row contains the indices of
an origin-destination pair and the number to multiply the frequency for
this route by. The DataFrame can either contain three columns (origin
index, destination index and weight. In that order) or only a weight
column and a MultiIndex where level 0 is origin index and level 1 is
destination index.
default_weight: If set, OD pairs not represented in 'weight_df'
will be given a default weight value.
rowwise: if False (default), it will calculate the cost from each
origins to each destination. If true, it will calculate the cost from
origin 1 to destination 1, origin 2 to destination 2 and so on.
strict: If True, all OD pairs must be in weigth_df if specified. Defaults
to False.
frequency_col: Name of column with the number of times each road was
visited. Defaults to 'frequency'.
Returns:
A GeoDataFrame with all line segments that were visited at least once,
with a column with the number of times the line segment was used in the
individual routes.
Note:
The resulting lines will keep all columns of the 'gdf' of the Network.
Raises:
ValueError: If weight_df is not a DataFrame with one or three columns
that contain weights and all indices of 'origins' and 'destinations'.
Examples:
---------
Create the NetworkAnalysis instance.
>>> import sgis as sg
>>> import pandas as pd
>>> roads = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/roads_oslo_2022.parquet")
>>> directed_roads = sg.get_connected_components(roads).loc[lambda x: x["connected"] == 1].pipe(sg.make_directed_network_norway, dropnegative=True)
>>> rules = sg.NetworkAnalysisRules(weight="minutes", directed=True)
>>> nwa = sg.NetworkAnalysis(network=directed_roads, rules=rules, detailed_log=False)
Get some points.
>>> points = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/points_oslo.parquet")
>>> origins = points.iloc[:25]
>>> destinations = points.iloc[25:50]
Get number of times each road was visited for trips from 25 to 25 points.
>>> frequencies = nwa.get_route_frequencies(origins, destinations)
>>> frequencies[["source", "target", "frequency", "geometry"]]
source target frequency geometry
160188 77264 79112 1.0 LINESTRING Z (268641.225 6651871.624 111.355, ...
153682 68376 4136 1.0 LINESTRING Z (268542.700 6652162.400 121.266, ...
153679 75263 75502 1.0 LINESTRING Z (268665.600 6652165.400 117.466, ...
153678 75262 75263 1.0 LINESTRING Z (268660.000 6652167.100 117.466, ...
153677 47999 75262 1.0 LINESTRING Z (268631.500 6652176.800 118.166, ...
... ... ... ... ...
151465 73801 73802 103.0 LINESTRING Z (265368.600 6647142.900 131.660, ...
151464 73800 73801 103.0 LINESTRING Z (265362.800 6647137.100 131.660, ...
151466 73802 73632 103.0 LINESTRING Z (265371.400 6647147.900 131.660, ...
151463 73799 73800 123.0 LINESTRING Z (265359.600 6647135.400 131.660, ...
152170 74418 74246 130.0 LINESTRING Z (264579.835 6651954.573 113.209, ...
<BLANKLINE>
[8556 rows x 4 columns]
The frequencies can be weighted for each origin-destination pair by specifying
'weight_df'. This can be a DataFrame with three columns, where the first two
contain the indices of the origin and destination (in that order), and the
third the number to multiply the frequency by. 'weight_df' can also be a
DataFrame with a 2-leveled MultiIndex, where level 0 is the origin index and
level 1 is the destination.
Constructing a DataFrame with all od-pair combinations and give all rows a
weight of 10.
>>> od_pairs = pd.MultiIndex.from_product(
... [origins.index, destinations.index], names=["origin", "destination"]
... )
>>> weight_df = pd.DataFrame(index=od_pairs).reset_index()
>>> weight_df["weight"] = 10
>>> weight_df
origin destination weight
0 0 25 10
1 0 26 10
2 0 27 10
3 0 28 10
4 0 29 10
.. ... ... ...
620 24 45 10
621 24 46 10
622 24 47 10
623 24 48 10
624 24 49 10
<BLANKLINE>
[625 rows x 3 columns]
All frequencies will now be multiplied by 10.
>>> frequencies = nwa.get_route_frequencies(origins, destinations, weight_df, weight_df=weight_df)
>>> frequencies[["source", "target", "frequency", "geometry"]]
source target frequency geometry
160188 77264 79112 10.0 LINESTRING Z (268641.225 6651871.624 111.355, ...
153682 68376 4136 10.0 LINESTRING Z (268542.700 6652162.400 121.266, ...
153679 75263 75502 10.0 LINESTRING Z (268665.600 6652165.400 117.466, ...
153678 75262 75263 10.0 LINESTRING Z (268660.000 6652167.100 117.466, ...
153677 47999 75262 10.0 LINESTRING Z (268631.500 6652176.800 118.166, ...
... ... ... ... ...
151465 73801 73802 1030.0 LINESTRING Z (265368.600 6647142.900 131.660, ...
151464 73800 73801 1030.0 LINESTRING Z (265362.800 6647137.100 131.660, ...
151466 73802 73632 1030.0 LINESTRING Z (265371.400 6647147.900 131.660, ...
151463 73799 73800 1230.0 LINESTRING Z (265359.600 6647135.400 131.660, ...
152170 74418 74246 1300.0 LINESTRING Z (264579.835 6651954.573 113.209, ...
<BLANKLINE>
[8556 rows x 4 columns]
'weight_df' can also be a DataFrame with one column (the weight) and a
MultiIndex.
>>> weight_df = pd.DataFrame(index=od_pairs)
>>> weight_df["weight"] = 10
>>> weight_df
weight
0 25 10
26 10
27 10
28 10
29 10
... ...
24 45 10
46 10
47 10
48 10
49 10
<BLANKLINE>
[625 rows x 1 columns]
"""
if self._log:
time_ = perf_counter()
if weight_df is not None:
weight_df: DataFrame = self._prepare_weight_df(weight_df)
od_pairs: MultiIndex = self._create_od_pairs(
origins, destinations, rowwise=rowwise
)
self._make_sure_unique(weight_df, od_pairs)
weights_mapped = od_pairs.map(weight_df.iloc[:, 0])
if default_weight:
if not weight_df.index.isin(od_pairs).all():
raise ValueError(
"All origin-destination indices in 'weight_df' must "
"be in 'origins' and 'destinations'."
)
weights_mapped = weights_mapped.fillna(default_weight)
elif strict:
self._make_sure_index_match(weight_df, od_pairs)
weight_df = DataFrame(index=od_pairs)
weight_df["weight"] = weights_mapped
self._prepare_network_analysis(origins, destinations, rowwise)
if weight_df is not None:
# map to temporary ids
ori_idx_mapper = {v: k for k, v in self.origins.idx_dict.items()}
des_idx_mapper = {v: k for k, v in self.destinations.idx_dict.items()}
def multiindex_mapper(x: tuple[int, int]) -> tuple[int, int]:
return (
ori_idx_mapper.get(x[0]),
des_idx_mapper.get(x[1]),
)
weight_df.index = weight_df.index.map(multiindex_mapper)
else:
od_pairs = self._create_od_pairs(
self.origins.gdf.set_index("temp_idx"),
self.destinations.gdf.set_index("temp_idx"),
rowwise=rowwise,
)
weight_df = DataFrame(index=od_pairs)
weight_df["weight"] = 1
results = _get_route_frequencies(
graph=self.graph,
roads=self.network.gdf,
weight_df=weight_df,
)
if isinstance(results, GeoDataFrame):
results = _push_geom_col(results)
results = results.rename(columns={"frequency": frequency_col}).sort_values(
frequency_col
)
if self.rules.split_lines:
self._unsplit_network()
if self._log:
minutes_elapsed = round((perf_counter() - time_) / 60, 1)
self._runlog(
"get_route_frequencies",
results,
minutes_elapsed,
)
return results
[docs]
def get_route(
self,
origins: GeoDataFrame,
destinations: GeoDataFrame,
*,
rowwise: bool = False,
destination_count: int | None = None,
cutoff: int | float | None = None,
) -> GeoDataFrame:
"""Returns the geometry of the low-cost route between origins and destinations.
Finds the route with the lowest cost (minutes, meters, etc.) from a set of
origins to a set of destinations. If the weight is meters, the shortest route
will be found. If the weight is minutes, the fastest route will be found.
Args:
origins: GeoDataFrame of points from where the routes will originate
destinations: GeoDataFrame of points from where the routes will terminate.
rowwise: if False (default), it will calculate the cost from each
origins to each destination. If true, it will calculate the cost from
origin 1 to destination 1, origin 2 to destination 2 and so on.
destination_count: number of closest destinations to keep for each origin.
If None (default), all trips will be included. The number of
destinations might be higher than the destination_count if trips have
equal cost.
cutoff: the maximum cost (weight) for the trips. Defaults to None,
meaning all rows will be included. NaNs will also be removed if cutoff
is specified.
Returns:
A DataFrame with the geometry of the routes between origin and destination.
Also returns a weight column and the columns 'origin' and 'destination',
containing the indices of the origins and destinations GeoDataFrames.
Examples:
---------
Create the NetworkAnalysis instance.
>>> import sgis as sg
>>> roads = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/roads_oslo_2022.parquet")
>>> directed_roads = sg.get_connected_components(roads).loc[lambda x: x["connected"] == 1].pipe(sg.make_directed_network_norway, dropnegative=True)
>>> rules = sg.NetworkAnalysisRules(weight="minutes", directed=True)
>>> nwa = sg.NetworkAnalysis(network=directed_roads, rules=rules, detailed_log=False)
Get routes from 1 to 1000 points.
>>> points = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/points_oslo.parquet")
>>> routes = nwa.get_route(points.iloc[[0]], points)
>>> routes
origin destination minutes geometry
0 1 2 12.930588 MULTILINESTRING Z ((272281.367 6653079.745 160...
1 1 3 10.867076 MULTILINESTRING Z ((270054.367 6653367.774 144...
2 1 4 8.075722 MULTILINESTRING Z ((259735.774 6650362.886 24....
3 1 5 14.659333 MULTILINESTRING Z ((272281.367 6653079.745 160...
4 1 6 14.406460 MULTILINESTRING Z ((257034.948 6652685.595 156...
.. ... ... ... ...
992 1 996 10.858519 MULTILINESTRING Z ((266881.100 6647824.860 132...
993 1 997 7.461032 MULTILINESTRING Z ((262623.190 6652506.640 79....
994 1 998 10.698588 MULTILINESTRING Z ((263489.330 6645655.330 11....
995 1 999 10.109855 MULTILINESTRING Z ((269217.997 6650654.895 166...
996 1 1000 14.657289 MULTILINESTRING Z ((264475.675 6644245.782 114...
<BLANKLINE>
[997 rows x 4 columns]
"""
if self._log:
time_ = perf_counter()
self._prepare_network_analysis(origins, destinations, rowwise)
od_pairs = self._create_od_pairs(
self.origins.gdf.set_index("temp_idx"),
self.destinations.gdf.set_index("temp_idx"),
rowwise=rowwise,
)
results = _get_route(
graph=self.graph,
weight=self.rules.weight,
roads=self.network.gdf,
od_pairs=od_pairs,
)
if cutoff is not None:
results = results.loc[results[self.rules.weight] <= cutoff]
if destination_count:
results = results.loc[
results.groupby("origin")[self.rules.weight].rank() <= destination_count
]
results["origin"] = results["origin"].map(self.origins.idx_dict)
results["destination"] = results["destination"].map(self.destinations.idx_dict)
if self.rules.split_lines:
self._unsplit_network()
if self._log:
minutes_elapsed = round((perf_counter() - time_) / 60, 1)
self._runlog(
"get_route",
results,
minutes_elapsed,
rowwise=rowwise,
)
return results
[docs]
def get_k_routes(
self,
origins: GeoDataFrame,
destinations: GeoDataFrame,
*,
k: int,
drop_middle_percent: int,
rowwise: bool = False,
destination_count: int | None = None,
cutoff: int | float | None = None,
) -> GeoDataFrame:
r"""Returns the geometry of 1 or more routes between origins and destinations.
Finds the route with the lowest cost (minutes, meters, etc.) from a set of
origins to a set of destinations. Then the middle part of the route is removed
from the graph the new low-cost path is found. Repeats k times. If k=1, it is
identical to the get_route method.
Args:
origins: GeoDataFrame of points from where the routes will originate.
destinations: GeoDataFrame of points from where the routes will terminate.
k: the number of low-cost routes to find.
drop_middle_percent: how many percent of the middle part of the routes
that should be removed from the graph before the next k route is
calculated. If set to 100, only the median edge will be removed.
If set to 0, all but the first and last edge will be removed. The
graph is copied for each od pair.
rowwise: if False (default), it will calculate the cost from each
origins to each destination. If true, it will calculate the cost from
origin 1 to destination 1, origin 2 to destination 2 and so on.
destination_count: number of closest destinations to keep for each origin.
If None (default), all trips will be included. The number of
destinations might be higher than the destination_count if trips have
equal cost.
cutoff: the maximum cost (weight) for the trips. Defaults to None,
meaning all rows will be included. NaNs will also be removed if cutoff
is specified.
Returns:
A DataFrame with the geometry of the k routes between origin and
destination. Also returns the column 'k', a weight column and the columns
'origin' and 'destination', containing the indices of the origins and
destinations GeoDataFrames.
Note:
How many percent of the route to drop from the graph, will determine how
many k routes will be found. If 100 percent of the route is dropped, it is
very hard to find more than one path for each OD pair.
If 'drop_middle_percent' is 1, the resulting routes might be very similar,
depending on the layout of the network.
Raises:
ValueError: if drop_middle_percent is not between 0 and 100.
Examples:
---------
Create the NetworkAnalysis instance.
>>> import sgis as sg
>>> roads = sg.read_parquet_url('https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/roads_oslo_2022.parquet')
>>> directed_roads = sg.get_connected_components(roads).loc[lambda x: x["connected"] == 1].pipe(sg.make_directed_network_norway, dropnegative=True)
>>> rules = sg.NetworkAnalysisRules(weight="minutes", directed=True)
>>> nwa = sg.NetworkAnalysis(network=directed_roads, rules=rules, detailed_log=False)
Getting 10 fastest routes from one point to another point.
>>> points = sg.read_parquet_url('https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/points_oslo.parquet')
>>> point1 = points.iloc[[0]]
>>> point2 = points.iloc[[1]]
>>> k_routes = nwa.get_k_routes(
... point1,
... point2,
... k=10,
... drop_middle_percent=1
... )
>>> k_routes
origin destination minutes k geometry
0 0 1 13.039830 1 MULTILINESTRING Z ((272281.367 6653079.745 160...
1 0 1 14.084324 2 MULTILINESTRING Z ((272281.367 6653079.745 160...
2 0 1 14.238108 3 MULTILINESTRING Z ((272281.367 6653079.745 160...
3 0 1 14.897682 4 MULTILINESTRING Z ((271257.900 6654378.100 193...
4 0 1 14.962593 5 MULTILINESTRING Z ((271257.900 6654378.100 193...
5 0 1 15.423934 6 MULTILINESTRING Z ((272281.367 6653079.745 160...
6 0 1 16.217271 7 MULTILINESTRING Z ((272281.367 6653079.745 160...
7 0 1 16.483982 8 MULTILINESTRING Z ((272281.367 6653079.745 160...
8 0 1 16.513253 9 MULTILINESTRING Z ((272281.367 6653079.745 160...
9 0 1 16.551196 10 MULTILINESTRING Z ((272281.367 6653079.745 160...
We got all 10 routes because only the middle 1 percent of the routes are
removed in each iteration. Let's compare with dropping middle 50 and middle
100 percent.
>>> k_routes = nwa.get_k_routes(
... point1,
... point2,
... k=10,
... drop_middle_percent=50
... )
>>> k_routes
origin destination minutes k geometry
0 0 1 13.039830 1 MULTILINESTRING Z ((272281.367 6653079.745 160...
1 0 1 14.238108 2 MULTILINESTRING Z ((272281.367 6653079.745 160...
2 0 1 20.139294 3 MULTILINESTRING Z ((272281.367 6653079.745 160...
3 0 1 23.506778 4 MULTILINESTRING Z ((265226.515 6650674.617 88....
>>> k_routes = nwa.get_k_routes(
... point1,
... point2,
... k=10,
... drop_middle_percent=100
... )
>>> k_routes
origin destination minutes k geometry
0 0 1 13.03983 1 MULTILINESTRING Z ((272281.367 6653079.745 160...
"""
if not 0 <= drop_middle_percent <= 100:
raise ValueError("'drop_middle_percent' should be between 0 and 100")
if self._log:
time_ = perf_counter()
self._prepare_network_analysis(origins, destinations, rowwise)
od_pairs = self._create_od_pairs(
self.origins.gdf.set_index("temp_idx"),
self.destinations.gdf.set_index("temp_idx"),
rowwise=rowwise,
)
results = _get_k_routes(
graph=self.graph,
weight=self.rules.weight,
roads=self.network.gdf,
od_pairs=od_pairs,
k=k,
drop_middle_percent=drop_middle_percent,
)
if cutoff is not None:
results = results.loc[results[self.rules.weight] <= cutoff]
if destination_count:
results = results.loc[
results.groupby("origin")[self.rules.weight].rank() <= destination_count
]
results["origin"] = results["origin"].map(self.origins.idx_dict)
results["destination"] = results["destination"].map(self.destinations.idx_dict)
if isinstance(results, GeoDataFrame):
results = _push_geom_col(results)
if self.rules.split_lines:
self._unsplit_network()
if self._log:
minutes_elapsed = round((perf_counter() - time_) / 60, 1)
self._runlog(
"get_k_routes",
results,
minutes_elapsed,
rowwise=rowwise,
)
return results
[docs]
def service_area(
self,
origins: GeoDataFrame,
breaks: int | float | tuple[int | float],
*,
dissolve: bool = True,
) -> GeoDataFrame:
"""Returns the lines that can be reached within breaks (weight values).
It finds all the network lines that can be reached within each break. Lines
that are only partly within the break will not be included. The index of the
origins is used as values in the 'origins' column.
Args:
origins: GeoDataFrame of points from where the service areas will
originate
breaks: one or more integers or floats which will be the
maximum weight for the service areas. Calculates multiple areas for
each origins if multiple breaks.
dissolve: If True (default), each service area will be dissolved into
one long multilinestring. If False, the individual line segments will
be returned.
Returns:
A GeoDataFrame with one row per break per origin, with the origin index and
a dissolved line geometry. If dissolve is False, it will return each line
that is part of the service area.
See Also:
precice_service_area: Equivelent method where lines are also cut to get
precice results.
Examples:
---------
Create the NetworkAnalysis instance.
>>> import sgis as sg
>>> roads = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/roads_oslo_2022.parquet")
>>> directed_roads = sg.get_connected_components(roads).loc[lambda x: x["connected"] == 1].pipe(sg.make_directed_network_norway, dropnegative=True)
>>> rules = sg.NetworkAnalysisRules(weight="minutes", directed=True)
>>> nwa = sg.NetworkAnalysis(network=directed_roads, rules=rules, detailed_log=False)
10 minute service area for three origin points.
>>> points = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/points_oslo.parquet")
>>> service_areas = nwa.service_area(
... points.loc[:2],
... breaks=10,
... )
>>> service_areas
origin minutes geometry
0 0 10 MULTILINESTRING Z ((264348.673 6648271.134 17....
1 1 10 MULTILINESTRING Z ((266909.769 6651075.250 114...
2 2 10 MULTILINESTRING Z ((266909.769 6651075.250 114...
Service areas of 5, 10 and 15 minutes from three origin points.
>>> service_areas = nwa.service_area(
... points.iloc[:2],
... breaks=[5, 10, 15],
... )
>>> service_areas
origin minutes geometry
0 0 5 MULTILINESTRING Z ((265378.000 6650581.600 85....
1 0 10 MULTILINESTRING Z ((264348.673 6648271.134 17....
2 0 15 MULTILINESTRING Z ((263110.060 6658296.870 154...
3 1 5 MULTILINESTRING Z ((273330.930 6653248.870 208...
4 1 10 MULTILINESTRING Z ((266909.769 6651075.250 114...
5 1 15 MULTILINESTRING Z ((264348.673 6648271.134 17....
"""
if self._log:
time_ = perf_counter()
self._prepare_network_analysis(origins)
# sort the breaks as an np.ndarray
breaks = self._sort_breaks(breaks)
results = _service_area(
graph=self.graph,
origins=self.origins.gdf,
breaks=breaks,
weight=self.rules.weight,
lines=self.network.gdf,
nodes=self.network.nodes,
directed=self.rules.directed,
precice=False,
)
if not all(results.geometry.isna()):
results = results.drop_duplicates(["src_tgt_wt", "origin"])
if dissolve:
results = results.dissolve(by=["origin", self.rules.weight]).loc[
:, [results.geometry.name]
]
results = results.reset_index()
# add missing rows as NaNs
missing = self.origins.gdf.loc[
~self.origins.gdf["temp_idx"].isin(results["origin"])
].rename(columns={"temp_idx": "origin"})[["origin"]]
if len(missing):
missing[results.geometry.name] = pd.NA
results = pd.concat([results, missing], ignore_index=True)
results["origin"] = results["origin"].map(self.origins.idx_dict)
results = _push_geom_col(results)
if self.rules.split_lines:
self._unsplit_network()
if self._log:
minutes_elapsed = round((perf_counter() - time_) / 60, 1)
self._runlog(
"service_area",
results,
minutes_elapsed,
breaks=breaks,
dissolve=dissolve,
)
return results
[docs]
def precice_service_area(
self,
origins: GeoDataFrame,
breaks: int | float | tuple[int | float],
*,
dissolve: bool = True,
) -> GeoDataFrame:
"""Precice, but slow version of the service_area method.
It finds all the network lines that can be reached within each break. Lines
that are partly within the break will be split at the point where the weight
value is exactly correct. Note that this takes more time than the regular
'service_area' method.
Args:
origins: GeoDataFrame of points from where the service areas will
originate
breaks: one or more integers or floats which will be the
maximum weight for the service areas. Calculates multiple areas for
each origins if multiple breaks.
dissolve: If True (default), each service area will be dissolved into
one long multilinestring. If False, the individual line segments will
be returned.
Returns:
A GeoDataFrame with one row per break per origin, with a dissolved line
geometry. If dissolve is False, it will return all the columns of the
network.gdf as well.
See Also:
service_area: Faster method where lines are not cut to get precice results.
Examples:
---------
Create the NetworkAnalysis instance.
>>> import sgis as sg
>>> roads = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/roads_oslo_2022.parquet")
>>> directed_roads = sg.get_connected_components(roads).loc[lambda x: x["connected"] == 1].pipe(sg.make_directed_network_norway, dropnegative=True)
>>> rules = sg.NetworkAnalysisRules(weight="minutes", directed=True)
>>> nwa = sg.NetworkAnalysis(network=directed_roads, rules=rules, detailed_log=False)
10 minute service area for one origin point.
>>> points = sg.read_parquet_url("https://media.githubusercontent.com/media/statisticsnorway/ssb-sgis/main/tests/testdata/points_oslo.parquet")
>>> sa = nwa.precice_service_area(
... points.iloc[[0]],
... breaks=10,
... )
>>> sa
idx minutes geometry
0 1 10 MULTILINESTRING Z ((264348.673 6648271.134 17....
Service areas of 5, 10 and 15 minutes from three origin points.
>>> sa = nwa.precice_service_area(
... points.iloc[:2],
... breaks=[5, 10, 15],
... )
>>> sa
idx minutes geometry
0 1 5 MULTILINESTRING Z ((265378.000 6650581.600 85....
1 1 10 MULTILINESTRING Z ((264348.673 6648271.134 17....
2 1 15 MULTILINESTRING Z ((263110.060 6658296.870 154...
3 2 5 MULTILINESTRING Z ((273330.930 6653248.870 208...
4 2 10 MULTILINESTRING Z ((266909.769 6651075.250 114...
5 2 15 MULTILINESTRING Z ((264348.673 6648271.134 17....
"""
if self._log:
time_ = perf_counter()
self._prepare_network_analysis(origins)
# sort the breaks as an np.ndarray
breaks = self._sort_breaks(breaks)
results = _service_area(
graph=self.graph,
origins=self.origins.gdf,
breaks=breaks,
weight=self.rules.weight,
lines=self.network.gdf,
nodes=self.network.nodes,
directed=self.rules.directed,
precice=True,
)
if not all(results.geometry.isna()):
if dissolve:
results = results.dissolve(by=["origin", self.rules.weight]).loc[
:, [results.geometry.name]
]
else:
results = results.dissolve(
by=["src_tgt_wt", "origin", self.rules.weight]
)
results = results.reset_index()
# add missing rows as NaNs
missing = self.origins.gdf.loc[
~self.origins.gdf["temp_idx"].isin(results["origin"])
].rename(columns={"temp_idx": "origin"})[["origin"]]
if len(missing):
missing[results.geometry.name] = pd.NA
results = pd.concat([results, missing], ignore_index=True)
results["origin"] = results["origin"].map(self.origins.idx_dict)
results = results.drop("origin", axis=1)
results = _push_geom_col(results)
if self.rules.split_lines:
self._unsplit_network()
if self._log:
minutes_elapsed = round((perf_counter() - time_) / 60, 1)
self._runlog(
"service_area",
results,
minutes_elapsed,
breaks=breaks,
dissolve=dissolve,
)
return results
@staticmethod
def _prepare_weight_df(weight_df: DataFrame) -> DataFrame:
"""Copy weight_df, convert to MultiIndex (if needed), check if correct shape.
The weight_df needs to have a very specific shape and index. If a 3-columned df
is given, convert the first two to a MultiIndex.
"""
error_message = (
"'weight_df' should be a DataFrame with the columns "
"'origin', 'destination' and 'weight', where the first "
"two contain the indices of the origins and destinations "
"and the weight column contains the number to multiply "
"the trip frequency for this origin-destination pair."
)
if not isinstance(weight_df, (DataFrame | pd.Series)):
raise ValueError(error_message)
if isinstance(weight_df, pd.Series):
weight_df = weight_df.to_frame()
weight_df = weight_df.copy()
if len(weight_df.columns) == 3:
weight_df = weight_df.set_index(list(weight_df.columns[:2]))
if len(weight_df.columns) != 1 and isinstance(weight_df.index, MultiIndex):
raise ValueError(error_message)
return weight_df
@staticmethod
def _make_sure_unique(weight_df: DataFrame, od_pairs: MultiIndex) -> None:
"""It's nesseccary with unique index when using weight_df."""
if not weight_df.index.is_unique:
raise ValueError("'weight_df' must contain only unique OD combinations.")
if not od_pairs.is_unique:
raise ValueError(
"'origins' and 'destinations must contain only unique "
"indices when weight_df is specified."
)
@staticmethod
def _make_sure_index_match(
weight_df: DataFrame,
od_pairs: MultiIndex,
) -> None:
"""Make sure this index matches the index of origins and destinations."""
if not od_pairs.isin(weight_df.index).all():
if not od_pairs.isin(weight_df.index).any():
raise ValueError(
"None of the origin-destination pair indices are in 'weight_df'."
)
raise ValueError(
"Not all origin-destination pair indices are in 'weight_df'."
)
@staticmethod
def _create_od_pairs(
origins: GeoDataFrame, destinations: GeoDataFrame, rowwise: bool
) -> MultiIndex:
"""Get all OD combinaions without identical origin-destination geometry.
Returns a MultiIndex to be iterated over in get_route, get_k_routes and
get_route_frequencies. In get_route_frequencies, the MultiIndex is turned
into a DataFrame with a weight column.
"""
if rowwise:
od_pairs = MultiIndex.from_arrays([origins.index, destinations.index])
else:
od_pairs = MultiIndex.from_product([origins.index, destinations.index])
geoms_ori = od_pairs.get_level_values(0).map(origins.geometry)
geoms_des = od_pairs.get_level_values(1).map(destinations.geometry)
no_identical_geoms = od_pairs[geoms_ori != geoms_des]
if not len(no_identical_geoms) and len(origins) and len(destinations):
raise ValueError("All origin-destination pairs have identical geometries.")
return no_identical_geoms
def _log_df_template(self, method: str, minutes_elapsed: float) -> DataFrame:
"""Creates a DataFrame with one row and the main columns.
To be run after each network analysis.
Args:
method: the name of the network analysis method used
minutes_elapsed: time use of the method
Returns:
A one-row DataFrame with log info columns
"""
data = {
"endtime": pd.to_datetime(datetime.now()).floor("S").to_pydatetime(),
"minutes_elapsed": minutes_elapsed,
"method": method,
"origins_count": pd.NA,
"destinations_count": pd.NA,
"percent_missing": pd.NA,
"cost_mean": pd.NA,
}
if self.rules.directed:
data["percent_bidirectional"] = self.network.percent_bidirectional
df = DataFrame(data, index=[0])
if not self.detailed_log:
return df
for key, value in self.rules.__dict__.items():
if key.startswith("_") or key.endswith("_"):
continue
df = pd.concat([df, pd.DataFrame({key: [value]})], axis=1)
return df
def _runlog(
self,
fun: str,
results: DataFrame | GeoDataFrame,
minutes_elapsed: float,
**kwargs,
) -> None:
df = self._log_df_template(fun, minutes_elapsed)
df["origins_count"] = len(self.origins.gdf)
if self.rules.weight in results.columns:
df["percent_missing"] = results[self.rules.weight].isna().mean() * 100
df["cost_mean"] = results[self.rules.weight].mean()
if self.detailed_log:
df["cost_p25"] = results[self.rules.weight].quantile(0.25)
df["cost_median"] = results[self.rules.weight].median()
df["cost_p75"] = results[self.rules.weight].quantile(0.75)
df["cost_std"] = results[self.rules.weight].std()
if fun == "service_area":
df["percent_missing"] = results[results.geometry.name].isna().mean() * 100
else:
df["destinations_count"] = len(self.destinations.gdf)
if self.detailed_log:
for key, value in kwargs.items():
if isinstance(value, np.ndarray):
value = list(value)
if isinstance(value, (list | tuple)):
value = [str(x) for x in value]
value = ", ".join(value)
df[key] = value
self.log = pd.concat([self.log, df], ignore_index=True)
def _prepare_network_analysis(
self,
origins: GeoDataFrame,
destinations: GeoDataFrame | None = None,
rowwise: bool = False,
) -> None:
"""Prepares the weight column, node ids, origins, destinations and graph.
Updates the graph only if it is not yet created and no parts of the analysis
has changed. this method is run inside od_cost_matrix, get_route and
service_area.
"""
if rowwise and len(origins) != len(destinations):
raise ValueError(
"'origins' and 'destinations' must have the same length when "
"rowwise=True"
)
self.network.gdf = self.rules._validate_weight(self.network.gdf)
self.origins = Origins(origins)
self.origins._make_temp_idx(
start=max(self.network.nodes.node_id.astype(int)) + 1
)
if destinations is not None:
self.destinations = Destinations(destinations)
self.destinations._make_temp_idx(
start=max(self.origins.gdf.temp_idx.astype(int)) + 1
)
else:
self.destinations = None
if not self._graph_is_up_to_date() or not self.network._nodes_are_up_to_date():
self.network._update_nodes_if()
edges, weights, ids = self._get_edges_and_weights()
self.graph = self._make_graph(
edges=edges,
weights=weights,
edge_ids=ids,
directed=self.rules.directed,
)
self._add_missing_vertices()
self._graph_updated_count += 1
self._update_wkts()
self.rules._update_rules()
def _get_edges_and_weights(
self,
) -> tuple[list[tuple[str, str]], list[float], list[str]]:
"""Creates lists of edges and weights which will be used to make the graph.
Edges and weights between origins and nodes and nodes and destinations are
also added.
"""
if self.rules.split_lines:
self._split_lines()
self.network._make_node_ids()
self.origins._make_temp_idx(
start=max(self.network.nodes.node_id.astype(int)) + 1
)
if self.destinations is not None:
self.destinations._make_temp_idx(
start=max(self.origins.gdf.temp_idx.astype(int)) + 1
)
edges: list[tuple[str, str]] = self.network.get_edges()
weights = list(self.network.gdf[self.rules.weight])
self.network.gdf["src_tgt_wt"] = self.network._create_edge_ids(edges, weights)
edges_start, weights_start = self.origins._get_edges_and_weights(
nodes=self.network.nodes,
rules=self.rules,
k=self._k_nearest_points,
)
edges = edges + edges_start
weights = weights + weights_start
if self.destinations is None:
edge_ids = self.network._create_edge_ids(edges, weights)
return edges, weights, edge_ids
edges_end, weights_end = self.destinations._get_edges_and_weights(
nodes=self.network.nodes,
rules=self.rules,
k=self._k_nearest_points,
)
edges = edges + edges_end
weights = weights + weights_end
edge_ids = self.network._create_edge_ids(edges, weights)
return edges, weights, edge_ids
def _split_lines(self) -> None:
if self.destinations is not None:
points = pd.concat(
[self.origins.gdf, self.destinations.gdf], ignore_index=True
)
else:
points = self.origins.gdf
points = points.drop_duplicates(points.geometry.name)
self.network.gdf["meters_"] = self.network.gdf.length
# create an id from before the split, used to revert the split later
self.network.gdf["temp_idx__"] = range(len(self.network.gdf))
lines = split_lines_by_nearest_point(
gdf=self.network.gdf,
points=points,
max_distance=self.rules.search_tolerance,
splitted_col="splitted",
)
# save the unsplit lines for later
splitted = lines.loc[lines["splitted"] == 1, "temp_idx__"]
self.network._not_splitted = self.network.gdf.loc[
self.network.gdf["temp_idx__"].isin(splitted)
]
# adjust weight to new length
lines[self.rules.weight] = lines[self.rules.weight] * (
lines.length / lines["meters_"]
)
self.network.gdf = lines
def _unsplit_network(self):
"""Remove the splitted lines and add the unsplitted ones."""
lines = self.network.gdf.loc[self.network.gdf["splitted"] != 1]
self.network.gdf = pd.concat(
[lines, self.network._not_splitted], ignore_index=True
).drop("temp_idx__", axis=1)
del self.network._not_splitted
def _add_missing_vertices(self):
"""Adds the missing points.
Nodes that had no nodes within the search_tolerance are added to the graph.
To not get an error when running the distance calculation.
"""
# TODO: either check if any() beforehand, or add fictional edges before
# making the graph, to make things faster
# (this method took 64.660 out of 500 seconds)
self.graph.add_vertices(
[
idx
for idx in self.origins.gdf["temp_idx"]
if idx not in self.graph.vs["name"]
]
)
if self.destinations is not None:
self.graph.add_vertices(
[
idx
for idx in self.destinations.gdf["temp_idx"]
if idx not in self.graph.vs["name"]
]
)
@staticmethod
def _make_graph(
edges: list[tuple[str, ...]] | np.ndarray[tuple[str, ...]],
weights: list[float] | np.ndarray[float],
edge_ids: np.ndarray,
directed: bool,
) -> Graph:
"""Creates an igraph Graph from a list of edges and weights."""
assert len(edges) == len(weights)
graph = igraph.Graph.TupleList(edges, directed=directed)
graph.es["weight"] = weights
graph.es["src_tgt_wt"] = edge_ids
graph.es["edge_tuples"] = edges
graph.es["source"] = [edge[0] for edge in edges]
graph.es["target"] = [edge[1] for edge in edges]
if min(graph.es["weight"]) < 0:
n = sum([1 for w in graph.es["weight"] if w < 0])
raise ValueError(
f"The graph has been built with {n} negative weight values."
)
return graph
def _graph_is_up_to_date(self) -> bool:
"""Checks if the network or rules have changed.
Returns False if the rules of the graphmaking has changed,
or if the points have changed.
"""
if not hasattr(self, "graph") or not hasattr(self, "wkts"):
return False
if self.rules._rules_have_changed():
return False
if self.network.gdf["src_tgt_wt"].isna().any():
return False
for points in ["origins", "destinations"]:
if self[points] is None:
continue
if points not in self.wkts:
return False
if self._points_have_changed(self[points].gdf, what=points):
return False
return True
def _points_have_changed(self, points: GeoDataFrame, what: str) -> bool:
"""Check if the origins or destinations have changed.
This method is best stored in the NetworkAnalysis class,
since the point classes are instantiated each time an analysis is run.
"""
if not np.array_equal(self.wkts[what], points.geometry.to_wkt().values):
return True
if not all(x in self.graph.vs["name"] for x in list(points.temp_idx.values)):
return True
return False
def _update_wkts(self) -> None:
"""Creates a dict of wkt lists.
This method is run after the graph is created. If the wkts haven't updated
since the last run, the graph doesn't have to be remade.
"""
self.wkts = {}
self.wkts["network"] = self.network.gdf.geometry.to_wkt().values
if not hasattr(self, "origins"):
return
self.wkts["origins"] = self.origins.gdf.geometry.to_wkt().values
if self.destinations is not None:
self.wkts["destinations"] = self.destinations.gdf.geometry.to_wkt().values
@staticmethod
def _sort_breaks(breaks: str | list | tuple | int | float) -> list[float | int]:
if isinstance(breaks, str):
breaks = float(breaks)
if hasattr(breaks, "__iter__"):
return list(sorted(list(breaks)))
if isinstance(breaks, (int | float)):
return [breaks]
raise ValueError(
"'breaks' should be integer, float, string or an iterable of "
f" one of these. Got {type(breaks)!r}"
)
def __repr__(self) -> str:
"""The print representation."""
# drop the 'weight_to_nodes_' parameters in the repr of rules to avoid clutter
rules = (
f"{self.rules.__class__.__name__}(weight={self.rules.weight}, "
f"directed={self.rules.directed}, "
f"search_tolerance={self.rules.search_tolerance}, "
f"search_factor={self.rules.search_factor}, "
f"split_lines={self.rules.split_lines}, "
)
# add one 'weight_to_nodes_' parameter if used,
# else inform that there are more parameters with '...'
if self.rules.nodedist_multiplier:
x = f"nodedist_multiplier={self.rules.nodedist_multiplier}"
elif self.rules.nodedist_kmh:
x = f"nodedist_kmh={self.rules.nodedist_kmh}"
else:
x = "..."
return (
f"{self.__class__.__name__}(\n"
f" network={self.network.__repr__()},\n"
f" rules={rules}{x}),\n"
f" log={self._log}, detailed_log={self.detailed_log},"
"\n)"
)
def __getitem__(self, item: str) -> Any:
"""To be able to write self['origins'] as well as self.origins."""
return getattr(self, item)
[docs]
def copy(self, deep: bool = True) -> "NetworkAnalysis":
"""Returns a (deep) copy of the class instance.
Args:
deep: Whether to return a deep or shallow copy. Defaults to True.
"""
if deep:
return deepcopy(self)
else:
return copy(self)