Source code for powergrid_synth.core.dcpf

import numpy as np
import networkx as nx
from scipy.sparse import csc_matrix, linalg

[docs] class DCPowerFlow: r"""Lightweight DC Power Flow (DCPF) solver. Solves the linearised power flow equations .. math:: \mathbf{P} = \mathbf{B}\,\boldsymbol{\theta}, \qquad F_{ij} = \frac{\theta_i - \theta_j}{x_{ij}} where :math:`\mathbf{B}` is the bus susceptance matrix, :math:`\boldsymbol{\theta}` the vector of voltage angles (radians), :math:`\mathbf{P}` the vector of net real-power injections (per-unit on a 100 MVA base), and :math:`F_{ij}` the real-power flow on branch :math:`(i, j)`. The slack bus (reference angle :math:`\theta = 0`) is chosen as the bus hosting the largest generator. The solver uses a sparse direct factorisation via ``scipy.sparse.linalg.spsolve``. This class is used by :class:`~powergrid_synth.transmission.TransmissionLineAllocator` to compute the flow distribution needed for impedance swapping and capacity assignment, following the methodology of `Sadeghian et al. (2018) <https://ieeexplore.ieee.org/document/8585532>`_. Parameters ---------- graph : networkx.Graph Power grid graph. Each edge must have an ``'x'`` attribute (per-unit reactance). Each node may have ``'pg'`` (generation MW) and ``'pl'`` (load MW) attributes. See Also -------- powergrid_synth.transmission.TransmissionLineAllocator : Uses DCPF for impedance swapping and capacity assignment. """ def __init__(self, graph: nx.Graph): self.graph = graph self.nodes = list(graph.nodes()) self.node_to_idx = {n: i for i, n in enumerate(self.nodes)} self.n_bus = len(self.nodes)
[docs] def run(self): r"""Execute the DC power flow and return branch flows and bus angles. Algorithm --------- 1. Build the susceptance matrix :math:`\mathbf{B}` from branch reactances: :math:`B_{ij} = -1/x_{ij}`, :math:`B_{ii} = \sum_j 1/x_{ij}`. 2. Form the net injection vector :math:`P_i = (P_{g_i} - P_{L_i}) / S_{\text{base}}`. 3. Select the slack bus (largest generator), remove its row/column, and solve :math:`\mathbf{B}_{\text{red}}\, \boldsymbol{\theta}_{\text{red}} = \mathbf{P}_{\text{red}}`. 4. Compute branch flows :math:`F_{ij} = (\theta_i - \theta_j) / x_{ij} \times S_{\text{base}}` (MW). Returns ------- flows : dict[tuple[int, int], float] Branch power flows in MW, keyed by ``(u, v)`` node pairs. angles : dict[int, float] Bus voltage angles in radians, keyed by node id. """ # 1. Build B Matrix (Susceptance) # B_ij = -1/x_ij # B_ii = sum(1/x_ij) for all j connected to i row_ind = [] col_ind = [] data = [] # Diagonal elements accumulator b_diag = np.zeros(self.n_bus) edges = [] for u, v, d in self.graph.edges(data=True): idx_u = self.node_to_idx[u] idx_v = self.node_to_idx[v] # Reactance x. Avoid division by zero. x = d.get('x', 0.1) if x < 1e-6: x = 1e-6 b_val = 1.0 / x # Off-diagonals (B_ij = -1/x) row_ind.extend([idx_u, idx_v]) col_ind.extend([idx_v, idx_u]) data.extend([-b_val, -b_val]) # Add to diagonals b_diag[idx_u] += b_val b_diag[idx_v] += b_val edges.append((u, v, x)) # Add diagonal elements row_ind.extend(range(self.n_bus)) col_ind.extend(range(self.n_bus)) data.extend(b_diag) B = csc_matrix((data, (row_ind, col_ind)), shape=(self.n_bus, self.n_bus)) # 2. Build P Vector (Injections) # P_inj = P_gen - P_load P_inj = np.zeros(self.n_bus) # Identify Slack Bus (usually largest generator or first generator) max_pg = -1 slack_idx = 0 for i, n in enumerate(self.nodes): pg = self.graph.nodes[n].get('pg', 0.0) pl = self.graph.nodes[n].get('pl', 0.0) # Net injection (normalized to 100 MVA base if values are MW) # Assuming attributes are in MW and BaseMVA=100 P_inj[i] = (pg - pl) / 100.0 if pg > max_pg: max_pg = pg slack_idx = i # 3. Solve for Theta # Remove slack row/col to make system non-singular # We solve: B_reduced * theta_reduced = P_reduced # Mask for non-slack buses mask = np.ones(self.n_bus, dtype=bool) mask[slack_idx] = False B_red = B[mask, :][:, mask] P_red = P_inj[mask] try: theta_red = linalg.spsolve(B_red, P_red) except RuntimeError: # Fallback for singular matrix (islanded components) theta_red = np.zeros_like(P_red) # Reconstruct full theta vector theta = np.zeros(self.n_bus) theta[mask] = theta_red theta[slack_idx] = 0.0 # Slack angle reference # 4. Calculate Flows # F_ij = (theta_i - theta_j) / x_ij flows = {} angles = {self.nodes[i]: theta[i] for i in range(self.n_bus)} for u, v, x in edges: idx_u = self.node_to_idx[u] idx_v = self.node_to_idx[v] th_u = theta[idx_u] th_v = theta[idx_v] # Flow from u to v f = (th_u - th_v) / x flows[(u, v)] = f * 100.0 # Convert pu back to MW return flows, angles