diff --git a/edisgo/edisgo.py b/edisgo/edisgo.py index eaa1111e..7e1ad664 100755 --- a/edisgo/edisgo.py +++ b/edisgo/edisgo.py @@ -2595,6 +2595,225 @@ def histogram_voltage(self, timestep=None, title=True, **kwargs): title = None plots.histogram(data=data, title=title, timeindex=timestep, **kwargs) + def plot_voltage_over_dist(self, mv_id, lv_id, other, save_as=False, split_branches=False): + """ + Plot LV voltage over distance to the MV/LV transformer for one LV grid, + comparing this EDisGo object ("base") with another EDisGo object ("other"). + + Parameters + ---------- + mv_id : kept for API compatibility (currently unused) + lv_id : int or str identifying LV grid (int = index in mv_grid.lv_grids) + other : EDisGo + save_as : bool or str + If True -> writes default html. If str -> file path (.html or .png). + split_branches : kept for API compatibility (currently unused) + """ + # Local imports to avoid import overhead + from edisgo.tools.voltage_over_distance import ( + _get_v_res_df, + _infer_load_and_feedin_timesteps, + _series_at_t, + _shortest_distances_km, + make_voltage_over_distance_figure, + ) + + # Require results on both + v_a = _get_v_res_df(self) + v_b = _get_v_res_df(other) + if getattr(v_a, "empty", True) or getattr(v_b, "empty", True): + raise RuntimeError( + "Voltage results (results.v_res) are empty. " + "Run analyze() with timesteps/snapshots so voltage results are generated." + ) + # Resolve LV grids (index-based resolution) + lv_grids_a = list(self.topology.mv_grid.lv_grids) + lv_grids_b = list(other.topology.mv_grid.lv_grids) + + if isinstance(lv_id, int): + try: + lv_grid_a = lv_grids_a[lv_id] + lv_grid_b = lv_grids_b[lv_id] + except IndexError as e: + raise ValueError(f"lv_id={lv_id} out of range. base has {len(lv_grids_a)} LV grids.") from e + else: + # String matching fallback + target = str(lv_id) + def _match(lvs): + for g in lvs: + for cand in (getattr(g, "id", None), getattr(g, "name", None), str(g)): + if cand is not None and str(cand) == target: + return g + return None + lv_grid_a = _match(lv_grids_a) + lv_grid_b = _match(lv_grids_b) + if lv_grid_a is None or lv_grid_b is None: + raise ValueError(f"Could not resolve lv_id='{lv_id}' in one of the EDisGo objects.") + + # Determine LV transformer (source) bus + # Prefer station-like attributes; fallback to first bus if not found. + def _lv_source_bus(lv_grid): + for attr in ("station", "mv_lv_station", "lv_station"): + st = getattr(lv_grid, attr, None) + if st is not None: + bus = getattr(st, "bus", None) + if bus is not None: + return str(bus) + # fallback: take first bus in lv_grid + return str(lv_grid.buses_df.index[0]) + + source_a = _lv_source_bus(lv_grid_a) + source_b = _lv_source_bus(lv_grid_b) + + # Distances (Dijkstra) + # Edge weight attribute commonly is 'length' in eDisGo graphs; adjust if your graph uses 'length_km' + dist_a = _shortest_distances_km(lv_grid_a.graph, source_bus=source_a, weight="length") + dist_b = _shortest_distances_km(lv_grid_b.graph, source_bus=source_b, weight="length") + + # Worst-case timesteps + t_load_a, t_feed_a = _infer_load_and_feedin_timesteps(self, v_a) + t_load_b, t_feed_b = _infer_load_and_feedin_timesteps(other, v_b) + + # Voltages at those timesteps + v_a_load = _series_at_t(v_a, t_load_a) + v_a_feed = _series_at_t(v_a, t_feed_a) + v_b_load = _series_at_t(v_b, t_load_b) + v_b_feed = _series_at_t(v_b, t_feed_b) + + # Buses to plot: intersection + buses = pd.Index([str(b) for b in lv_grid_a.buses_df.index]).intersection( + pd.Index([str(b) for b in lv_grid_b.buses_df.index]) + ) + if len(buses) == 0: + raise RuntimeError("No overlapping LV buses found between base and other object.") + + fig = make_voltage_over_distance_figure( + title=f"LV voltage over distance (lv_id={lv_id})", + buses=buses, + dist_a=dist_a, + v_a_load=v_a_load, + v_a_feed=v_a_feed, + dist_b=dist_b, + v_b_load=v_b_load, + v_b_feed=v_b_feed, + band_low=0.90, + band_high=1.10, + ) + + if save_as: + if isinstance(save_as, str): + if save_as.lower().endswith(".html"): + fig.write_html(save_as) + elif save_as.lower().endswith(".png"): + fig.write_image(save_as) + else: + fig.write_html(save_as) + else: + fig.write_html("voltage_over_distance_lv.html") + + return fig + + def plot_voltage_over_dist_mv(self, mv_id, other, save_as=False): + """ + Plot MV voltage over distance to the HV/MV transformer, comparing two EDisGo objects. + + Parameters + ---------- + mv_id : kept for API compatibility (currently unused) + other : EDisGo + save_as : bool or str + If True -> writes default html. If str -> file path (.html or .png). + """ + from edisgo.tools.voltage_over_distance import ( + _get_v_res_df, + _infer_load_and_feedin_timesteps, + _series_at_t, + _shortest_distances_km, + make_voltage_over_distance_figure, + ) + + v_a = _get_v_res_df(self) + v_b = _get_v_res_df(other) + if getattr(v_a, "empty", True) or getattr(v_b, "empty", True): + raise RuntimeError( + "Voltage results (results.v_res) are empty. " + "Run analyze() with timesteps/snapshots so voltage results are generated." + ) + mv_a = self.topology.mv_grid + mv_b = other.topology.mv_grid + + # HV/MV transformer (source) bus: + # Use first MV bus as fallback if no explicit station bus is available. + def _mv_source_bus(edisgo_obj): + mv = edisgo_obj.topology.mv_grid + G = mv.graph + + # 1) Try transformer buses, but only accept if they exist in graph nodes + trafos = getattr(edisgo_obj.topology, "transformers_hvmv_df", None) + if trafos is not None and hasattr(trafos, "columns") and {"bus0", "bus1"}.issubset(trafos.columns): + for col in ("bus0", "bus1"): + b = str(trafos.iloc[0][col]) + if b in G: + return b + + # 2) Try common station bus attributes (if present) + for attr in ("station", "hvmv_station", "mv_station"): + st = getattr(mv, attr, None) + if st is not None: + bus = getattr(st, "bus", None) + if bus is not None and str(bus) in G: + return str(bus) + + # 3) Fallback: pick the first graph node (guaranteed to exist) + return str(next(iter(G.nodes))) + + + source_a = _mv_source_bus(self) + source_b = _mv_source_bus(other) + + dist_a = _shortest_distances_km(mv_a.graph, source_bus=source_a, weight="length") + dist_b = _shortest_distances_km(mv_b.graph, source_bus=source_b, weight="length") + + t_load_a, t_feed_a = _infer_load_and_feedin_timesteps(self, v_a) + t_load_b, t_feed_b = _infer_load_and_feedin_timesteps(other, v_b) + + v_a_load = _series_at_t(v_a, t_load_a) + v_a_feed = _series_at_t(v_a, t_feed_a) + v_b_load = _series_at_t(v_b, t_load_b) + v_b_feed = _series_at_t(v_b, t_feed_b) + + buses = pd.Index([str(b) for b in mv_a.buses_df.index]).intersection( + pd.Index([str(b) for b in mv_b.buses_df.index]) + ) + if len(buses) == 0: + raise RuntimeError("No overlapping MV buses found between base and other object.") + + fig = make_voltage_over_distance_figure( + title="MV voltage over distance", + buses=buses, + dist_a=dist_a, + v_a_load=v_a_load, + v_a_feed=v_a_feed, + dist_b=dist_b, + v_b_load=v_b_load, + v_b_feed=v_b_feed, + band_low=0.96, + band_high=1.06, + ) + + if save_as: + if isinstance(save_as, str): + if save_as.lower().endswith(".html"): + fig.write_html(save_as) + elif save_as.lower().endswith(".png"): + fig.write_image(save_as) + else: + fig.write_html(save_as) + else: + fig.write_html("voltage_over_distance_mv.html") + + return fig + def histogram_relative_line_load( self, timestep=None, title=True, voltage_level="mv_lv", **kwargs ): diff --git a/edisgo/tools/voltage_over_distance.py b/edisgo/tools/voltage_over_distance.py new file mode 100644 index 00000000..f02ce7d3 --- /dev/null +++ b/edisgo/tools/voltage_over_distance.py @@ -0,0 +1,187 @@ +# edisgo/tools/voltage_over_distance.py + +from __future__ import annotations + +from typing import Optional, Tuple + +import numpy as np +import pandas as pd +import plotly.graph_objects as go + + +def _get_v_res_df(edisgo_obj) -> pd.DataFrame: + """ + Support both v_res attribute and v_res() function (version differences). + Expects index = timesteps, columns = buses, values = voltage in p.u. + """ + if not hasattr(edisgo_obj, "results") or edisgo_obj.results is None: + raise RuntimeError("No results found. Run edisgo.analyze() first.") + + v_res = getattr(edisgo_obj.results, "v_res", None) + if v_res is None: + raise RuntimeError("No voltage results (results.v_res) found. Run edisgo.analyze() first.") + + return v_res() if callable(v_res) else v_res + + +def _infer_load_and_feedin_timesteps(edisgo_obj, v_df: Optional[pd.DataFrame] = None) -> Tuple[pd.Timestamp, pd.Timestamp]: + """ + Robust worst-case timestep inference. + + Preferred (always works after analyze): + - load case: timestep with minimum voltage across all buses (worst undervoltage) + - feed-in case: timestep with maximum voltage across all buses (worst overvoltage) + + Fallback (if voltage results not suitable): + - residual load = sum(loads) - sum(generation) + """ + # 1) Prefer voltage results (available after analyze) + if v_df is None: + try: + v_df = _get_v_res_df(edisgo_obj) + except Exception: + v_df = None + + if isinstance(v_df, pd.DataFrame) and len(v_df.index) > 0 and len(v_df.columns) > 0: + # row-wise min/max across buses + row_min = v_df.min(axis=1) + row_max = v_df.max(axis=1) + + # If all-NaN, fall back + if not row_min.dropna().empty and not row_max.dropna().empty: + t_load = pd.Timestamp(row_min.idxmin()) # worst undervoltage + t_feed = pd.Timestamp(row_max.idxmax()) # worst overvoltage + return t_load, t_feed + + # 2) Fallback: residual load from timeseries + ts = getattr(edisgo_obj, "timeseries", None) + if ts is None: + raise RuntimeError("Cannot infer worst-case timesteps: no voltage results and no timeseries.") + + loads_p = getattr(ts, "loads_active_power", None) + gens_p = getattr(ts, "generators_active_power", None) + + if loads_p is None or gens_p is None or len(loads_p.index) == 0 or len(gens_p.index) == 0: + raise RuntimeError( + "Cannot infer worst-case timesteps: voltage results are empty and " + "timeseries.loads_active_power / generators_active_power are missing or empty." + ) + + loads_sum = loads_p.sum(axis=1) + gens_sum = gens_p.sum(axis=1) + residual = (loads_sum - gens_sum).dropna() + + if residual.empty: + raise RuntimeError("Cannot infer worst-case timesteps: residual load series is empty.") + + t_load = pd.Timestamp(residual.idxmax()) + t_feed = pd.Timestamp(residual.idxmin()) + return t_load, t_feed + + + +def _series_at_t(v_df: pd.DataFrame, t: pd.Timestamp) -> pd.Series: + """ + Return bus-voltage series at timestep t. If exact t not found, use nearest. + """ + if t not in v_df.index: + # use nearest timestep if index is datetime-like + try: + nearest_pos = v_df.index.get_indexer([t], method="nearest")[0] + t = v_df.index[nearest_pos] + except Exception as e: + raise KeyError(f"Timestamp {t} not found in voltage results index.") from e + + s = v_df.loc[t] + # Ensure 1D + if isinstance(s, pd.DataFrame): + s = s.squeeze() + s = s.astype(float) + # normalize bus names to str + s.index = s.index.map(str) + return s + + +def _shortest_distances_km(G, source_bus: str, weight: Optional[str] = "length") -> pd.Series: + """ + Uses the networkx API exposed via the graph object already used by eDisGo. + Assumes edge attribute `weight` is in km (common is 'length' or 'length_km'). + """ + import networkx as nx + + source_bus = str(source_bus) + if source_bus not in G: + raise ValueError(f"Source bus '{source_bus}' not in graph.") + + dist = nx.single_source_dijkstra_path_length(G, source=source_bus, weight=weight) + # include all nodes, missing => NaN + return pd.Series({str(n): dist.get(n, np.nan) for n in G.nodes}, name="distance_km") + + +def make_voltage_over_distance_figure( + *, + title: str, + buses: pd.Index, + dist_a: pd.Series, + v_a_load: pd.Series, + v_a_feed: pd.Series, + dist_b: pd.Series, + v_b_load: pd.Series, + v_b_feed: pd.Series, + band_low: float, + band_high: float, +) -> go.Figure: + """ + Builds a plotly scatter figure with 4 traces: + base-load, base-feed-in, other-load, other-feed-in + """ + buses = pd.Index([str(b) for b in buses]) + + def _mk_df(dist: pd.Series, vv: pd.Series, label: str) -> pd.DataFrame: + df = pd.DataFrame({"bus": buses}) + df["distance_km"] = df["bus"].map(dist) + df["v_pu"] = df["bus"].map(vv) + df["label"] = label + return df.dropna(subset=["distance_km", "v_pu"]) + + df = pd.concat( + [ + _mk_df(dist_a, v_a_load, "base — load case"), + _mk_df(dist_a, v_a_feed, "base — feed-in case"), + _mk_df(dist_b, v_b_load, "other — load case"), + _mk_df(dist_b, v_b_feed, "other — feed-in case"), + ], + ignore_index=True, + ) + + fig = go.Figure() + + # tolerance band + fig.add_hrect( + y0=band_low, + y1=band_high, + line_width=0, + fillcolor="rgba(0,0,0,0.06)", + ) + + for label in df["label"].unique(): + sub = df[df["label"] == label] + fig.add_trace( + go.Scatter( + x=sub["distance_km"], + y=sub["v_pu"], + mode="markers", + name=label, + customdata=np.stack([sub["bus"]], axis=-1), + hovertemplate="bus=%{customdata[0]}
dist=%{x:.3f} km
v=%{y:.4f} p.u.", + ) + ) + + fig.update_layout( + title=title, + xaxis_title="Distance [km]", + yaxis_title="Voltage [p.u.]", + legend_title="Scenario", + ) + return fig + diff --git a/tests/test_edisgo.py b/tests/test_edisgo.py index bbdbddfb..4a0f575d 100755 --- a/tests/test_edisgo.py +++ b/tests/test_edisgo.py @@ -1411,6 +1411,39 @@ def test_plot_mv_grid_topology(self): self.edisgo.plot_mv_grid_topology() plt.close("all") + def test_plot_voltage_over_dist(self): + # needs analysis results + self.edisgo.analyze() + v_res = self.edisgo.results.v_res() if callable(self.edisgo.results.v_res) else self.edisgo.results.v_res + if getattr(v_res, "empty", True): + import pytest + pytest.skip("No voltage results (v_res empty) in this test fixture; skipping voltage-over-distance plot test.") + # create a second object by deepcopy (simple compare object) + import copy + other = copy.deepcopy(self.edisgo) + other.analyze() + + fig = self.edisgo.plot_voltage_over_dist(mv_id=None, lv_id=0, other=other) + assert fig is not None + + def test_plot_voltage_over_dist_mv(self): + self.edisgo.analyze() + v_res = self.edisgo.results.v_res() if callable(self.edisgo.results.v_res) else self.edisgo.results.v_res + if getattr(v_res, "empty", True): + import pytest + pytest.skip( + "No voltage results (v_res empty) in this test fixture; skipping voltage-over-distance plot test." + ) + import copy + other = copy.deepcopy(self.edisgo) + other.analyze() + v_res = self.edisgo.results.v_res() if callable(self.edisgo.results.v_res) else self.edisgo.results.v_res + if getattr(v_res, "empty", True): + import pytest + pytest.skip("No voltage results (v_res empty) in this test fixture; skipping voltage-over-distance plot test.") + fig = self.edisgo.plot_voltage_over_dist_mv(mv_id=None, other=other) + assert fig is not None + def test_plot_mv_voltages(self): self.setup_worst_case_time_series() plt.ion()