Source code for powergrid_synth.distribution.distribution_converter

"""
Conversion utilities for building Schweitzer-format feeder graphs
from pandapower and pypowsybl reference networks.
"""
from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Optional

import networkx as nx
import numpy as np


[docs] def pandapower_to_feeders(net: Any) -> list[nx.Graph]: """Convert a pandapower network to a list of Schweitzer-format feeder graphs. Each connected component with at least three nodes is returned as a separate graph annotated with the attributes expected by :func:`~powergrid_synth.distribution.fit_params_from_feeders` and :class:`~powergrid_synth.distribution.SchweetzerFeederGenerator`: * **Node attributes**: ``h`` (hop distance from root), ``P_mw`` (positive = load, negative = generation), ``node_type`` (``'load'`` | ``'injection'`` | ``'intermediate'``), ``pf`` (power factor). * **Edge attributes**: ``length_km`` (cable length in km). The function handles lines, transformers, and closed bus–bus switches so that the full connectivity of the pandapower network is captured. Parameters ---------- net : pandapowerNet A pandapower network object. Returns ------- list[nx.Graph] One feeder graph per connected component (≥ 3 nodes). """ G = nx.Graph() # --- Nodes --------------------------------------------------------------- for bus_idx in net.bus.index: G.add_node(int(bus_idx)) # --- Edges: lines -------------------------------------------------------- for _, row in net.line.iterrows(): G.add_edge( int(row["from_bus"]), int(row["to_bus"]), length_km=row["length_km"], ) # --- Edges: transformers (treated as short connections) ------------------- if hasattr(net, "trafo") and len(net.trafo) > 0: for _, row in net.trafo.iterrows(): G.add_edge( int(row["hv_bus"]), int(row["lv_bus"]), length_km=0.01, ) # --- Edges: closed bus-bus switches -------------------------------------- if hasattr(net, "switch") and len(net.switch) > 0: for _, row in net.switch.iterrows(): if row.get("closed", True) and row.get("et") == "b": G.add_edge( int(row["bus"]), int(row["element"]), length_km=0.001, ) # --- Source buses (ext_grid) --------------------------------------------- source_buses = set(int(b) for b in net.ext_grid.bus.values) # --- Aggregate loads and generation per bus ------------------------------ bus_load: dict[int, float] = {} bus_load_q: dict[int, float] = {} for _, row in net.load.iterrows(): b = int(row["bus"]) bus_load[b] = bus_load.get(b, 0.0) + row["p_mw"] bus_load_q[b] = bus_load_q.get(b, 0.0) + row["q_mvar"] bus_gen: dict[int, float] = {} if hasattr(net, "sgen") and len(net.sgen) > 0: for _, row in net.sgen.iterrows(): b = int(row["bus"]) bus_gen[b] = bus_gen.get(b, 0.0) + row["p_mw"] # --- Build feeders per connected component ------------------------------- feeders: list[nx.Graph] = [] for comp in nx.connected_components(G): if len(comp) < 3: continue sub = G.subgraph(comp).copy() # Identify root: prefer an ext_grid bus inside this component roots_in_comp = source_buses & comp root = min(roots_in_comp) if roots_in_comp else min(comp) # BFS to assign hop distances hop = {root: 0} for u, v in nx.bfs_edges(sub, root): hop[v] = hop[u] + 1 nx.set_node_attributes(sub, hop, "h") # Assign node types and power for n in sub.nodes: p_load = bus_load.get(n, 0.0) p_gen = bus_gen.get(n, 0.0) if p_gen > 0 and p_load == 0: sub.nodes[n]["node_type"] = "injection" sub.nodes[n]["P_mw"] = -p_gen elif p_load > 0: sub.nodes[n]["node_type"] = "load" sub.nodes[n]["P_mw"] = p_load else: sub.nodes[n]["node_type"] = "intermediate" sub.nodes[n]["P_mw"] = 0.0 q = bus_load_q.get(n, 0.0) s = np.sqrt(p_load**2 + q**2) sub.nodes[n]["pf"] = (p_load / s) if s > 0 else 0.95 feeders.append(sub) return feeders
[docs] def feeder_summary(feeders: Sequence[nx.Graph]) -> list[dict[str, Any]]: """Return a summary dict for each feeder graph. Parameters ---------- feeders : sequence of nx.Graph Feeder graphs produced by :func:`pandapower_to_feeders`. Returns ------- list[dict] One dict per feeder with keys ``n_nodes``, ``n_edges``, ``max_hop``, ``is_tree``, ``total_load_mw``, ``total_gen_mw``, ``node_types``. """ from collections import Counter summaries = [] for f in feeders: types = Counter(f.nodes[n]["node_type"] for n in f.nodes) hops = [f.nodes[n]["h"] for n in f.nodes] total_load = sum(f.nodes[n]["P_mw"] for n in f.nodes if f.nodes[n]["P_mw"] > 0) total_gen = sum(-f.nodes[n]["P_mw"] for n in f.nodes if f.nodes[n]["P_mw"] < 0) summaries.append({ "n_nodes": f.number_of_nodes(), "n_edges": f.number_of_edges(), "max_hop": max(hops) if hops else 0, "is_tree": nx.is_tree(f), "total_load_mw": total_load, "total_gen_mw": total_gen, "node_types": dict(types), }) return summaries
[docs] def pypowsybl_to_feeders(network: Any) -> list[nx.Graph]: """Convert a pypowsybl Network to a list of Schweitzer-format feeder graphs. This is the pypowsybl counterpart of :func:`pandapower_to_feeders`. It extracts buses, lines, two-winding transformers, loads, and generators from the pypowsybl Network and builds annotated feeder graphs suitable for :func:`~powergrid_synth.distribution.fit_params_from_feeders` and :class:`~powergrid_synth.distribution.SchweetzerFeederGenerator`. Parameters ---------- network : pypowsybl.network.Network A pypowsybl Network object (loaded from CGMES, XIIDM, MATPOWER, etc.). Returns ------- list[nx.Graph] One feeder graph per connected component (≥ 3 nodes), with the same node/edge attributes as :func:`pandapower_to_feeders`. """ try: import pypowsybl as _ppl except ImportError as exc: raise ImportError( "pypowsybl is required for pypowsybl_to_feeders. " "Install it with: pip install powergrid_synth[export]" ) from exc G = nx.Graph() # --- Buses --------------------------------------------------------------- buses = network.get_buses() for bus_id in buses.index: G.add_node(bus_id) # --- Lines --------------------------------------------------------------- lines = network.get_lines() for line_id, row in lines.iterrows(): b1, b2 = row["bus1_id"], row["bus2_id"] if b1 not in G or b2 not in G: continue if not (row.get("connected1", True) and row.get("connected2", True)): continue G.add_edge(b1, b2, length_km=0.1) # pypowsybl doesn't store line length # --- Two-winding transformers (short connections) ------------------------ trafos = network.get_2_windings_transformers() for trafo_id, row in trafos.iterrows(): b1, b2 = row["bus1_id"], row["bus2_id"] if b1 not in G or b2 not in G: continue if not (row.get("connected1", True) and row.get("connected2", True)): continue G.add_edge(b1, b2, length_km=0.01) # --- Identify source buses (generators with highest output) -------------- gens = network.get_generators() gen_bus_p: dict[str, float] = {} for _, row in gens.iterrows(): bid = row["bus_id"] if bid in G and row.get("connected", True): gen_bus_p[bid] = gen_bus_p.get(bid, 0.0) + abs(row.get("target_p", 0.0)) # --- Loads per bus ------------------------------------------------------- loads = network.get_loads() bus_load: dict[str, float] = {} bus_load_q: dict[str, float] = {} for _, row in loads.iterrows(): bid = row["bus_id"] if bid in G and row.get("connected", True): bus_load[bid] = bus_load.get(bid, 0.0) + row.get("p0", 0.0) bus_load_q[bid] = bus_load_q.get(bid, 0.0) + row.get("q0", 0.0) # --- Build feeders per connected component ------------------------------- feeders: list[nx.Graph] = [] for comp in nx.connected_components(G): if len(comp) < 3: continue sub = G.subgraph(comp).copy() # Root: prefer generator bus with highest output; else highest-degree bus gens_in_comp = {b: p for b, p in gen_bus_p.items() if b in comp} if gens_in_comp: root = max(gens_in_comp, key=gens_in_comp.get) else: root = max(comp, key=lambda n: sub.degree(n)) # BFS to assign hop distances hop = {root: 0} for u, v in nx.bfs_edges(sub, root): hop[v] = hop[u] + 1 nx.set_node_attributes(sub, hop, "h") # Assign node types and power for n in sub.nodes: p_load = bus_load.get(n, 0.0) p_gen = gen_bus_p.get(n, 0.0) if p_gen > 0 and p_load == 0: sub.nodes[n]["node_type"] = "injection" sub.nodes[n]["P_mw"] = -p_gen elif p_load > 0: sub.nodes[n]["node_type"] = "load" sub.nodes[n]["P_mw"] = p_load else: sub.nodes[n]["node_type"] = "intermediate" sub.nodes[n]["P_mw"] = 0.0 q = bus_load_q.get(n, 0.0) s = np.sqrt(p_load**2 + q**2) sub.nodes[n]["pf"] = (p_load / s) if s > 0 else 0.95 feeders.append(sub) return feeders