Source code for powergrid_synth.transmission.capacity_allocator

"""
Assigns generation capacities to generator buses using a statistical approach
based on Elyas et al. (2017), "On the Statistical Settings of Generation and
Load in a Synthetic Grid Modeling" (https://arxiv.org/abs/1706.09294).

The methodology uses the exponential distribution of individual generation
capacities and the non-trivial correlation between generation capacity and
nodal degree observed in realistic grids. An analogous approach for load
allocation is implemented in :mod:`load_allocator`.

Ported from the MATLAB SynGrid toolbox (``sg_refsys_stat.m``).
"""

import numpy as np
import networkx as nx
import math
from typing import List, Dict, Tuple, Optional
from ..core.reference_data import get_reference_stats

[docs] class CapacityAllocator: """ Assigns generation capacities (PgMax) to generator buses in the grid. Implements the three-stage methodology from Elyas et al. (2017): 1. **Total generation:** Compute the aggregate generation capacity from a scaling law fitted to realistic grids: ``log Pg_tot(N) = -0.21 * log^2(N) + 2.06 * log(N) + 0.66``. 2. **Individual capacities:** Sample N_g individual capacities from an exponential distribution (with ~1% super-large outliers to capture the observed heavy tail). 3. **Correlated assignment:** Assign capacities to generator buses using a 14x14 empirical 2D probability table (``Tab_2D_Pgmax``) that encodes the joint distribution of normalized capacity and normalized nodal degree. This preserves the observed Pearson correlation rho(P_bar, k_bar) in [0.25, 0.5] between capacity and degree. Reference systems with pre-computed ``Tab_2D_Pgmax`` tables are available for NYISO-2935 (id=1), WECC-16944 (id=2), and a third system (id=3). A heuristic diagonal-bias table (id=0) is provided as a fallback. Parameters ---------- graph : nx.Graph NetworkX graph with ``'bus_type'`` node attributes already set (by :class:`BusTypeAllocator`). ref_sys_id : int Reference system for statistical tables (0=heuristic, 1=NYISO-2935, 2=WECC-16944, 3=additional reference). References ---------- .. [1] S. H. Elyas, Z. Wang, R. J. Thomas, "On the Statistical Settings of Generation and Load in a Synthetic Grid Modeling," arXiv:1706.09294, 2017. """ def __init__(self, graph: nx.Graph, ref_sys_id: int = 1): """ Parameters ---------- graph : nx.Graph NetworkX graph whose nodes carry a ``'bus_type'`` attribute (``'Gen'``, ``'Load'``, or ``'Conn'``). This attribute must be set before instantiation, typically via :class:`BusTypeAllocator`. ref_sys_id : int, optional Selects which reference system statistics to use for the 2D probability table ``Tab_2D_Pgmax``: - 0 — heuristic diagonal-bias table (no real-grid data). - 1 — NYISO-2935 (default). - 2 — WECC-16944. - 3 — additional reference system. """ self.graph = graph self.ref_sys_id = ref_sys_id self.nodes = list(graph.nodes()) self.n_nodes = len(self.nodes) # Identify Generator Buses (Assuming 'bus_type' attribute exists) # 1=Gen, 2=Load, 3=Conn # If attribute missing, defaults to empty list (user must run BusTypeAllocator first) self.gen_buses = [n for n, d in graph.nodes(data=True) if d.get('bus_type') == 'Gen'] self.n_gen = len(self.gen_buses)
[docs] def _generate_heuristic_tab_2d(self) -> np.ndarray: """ Generate a heuristic 14x14 joint-probability table as a fallback. When no reference system data is selected (``ref_sys_id=0``), this method produces a synthetic table using a Gaussian-decay diagonal bias: ``weight(r, c) = exp(-0.5 * |r - c|)``, where rows represent capacity classes and columns represent degree classes (both ordered low-to-high). The matrix is normalized to sum to 1. This simulates the positive correlation between nodal degree and generation capacity observed in realistic grids, without relying on empirical data from a specific reference system. Returns ------- np.ndarray A 14x14 probability matrix (sums to 1). """ # Create a matrix with some diagonal weight to simulate correlation # Rows = Capacity Classes (low to high), Cols = Degree Classes (low to high) matrix = np.zeros((14, 14)) for r in range(14): for c in range(14): # Distance from diagonal dist = abs(r - c) # higher probability near diagonal matrix[r, c] = np.exp(-0.5 * dist) # Normalize to sum to 1 return matrix / np.sum(matrix)
[docs] def _get_default_tab_2d(self) -> np.ndarray: """ Return the ``Tab_2D_Pgmax`` table for the selected reference system. The table is a 14x14 matrix representing the empirical joint PDF ``Pr((P_bar_gn_max, k_bar_n) in A)`` discretized into 14 capacity classes (rows, low-to-high) and 14 nodal-degree classes (columns, low-to-high). Tables for real reference systems are loaded from :func:`reference_data.get_reference_stats` (ported from the MATLAB SynGrid file ``sg_refsys_stat.m``). Returns ------- np.ndarray A 14x14 probability matrix. """ # Option 0: Use the heuristic generator (previous default) if self.ref_sys_id == 0: return self._generate_heuristic_tab_2d() try: stats = get_reference_stats(self.ref_sys_id) return stats['Tab_2D_Pgmax'] except ValueError as e: print(f"Warning: {e} Defaulting to Reference System 1.") stats = get_reference_stats(1) return stats['Tab_2D_Pgmax']
[docs] def _initial_generation_distribution(self, total_gen: float) -> Tuple[np.ndarray, float, np.ndarray]: """ Sample individual generation capacities from the empirical distribution. Following Elyas et al. (2017), more than 99% of generation units in realistic grids follow an exponential distribution. About 1% have extremely large capacities that fall outside the exponential range. The procedure is: 1. Draw ``N_g`` samples from ``Exp(mu)`` where ``mu = total_gen / N_g``. 2. Replace ~1% of the samples with "super-large" values drawn uniformly from ``[max(P), 3 * max(P)]``. 3. If the sum deviates more than 5% above or 10% below ``total_gen``, rescale all values proportionally. 4. Normalize by the maximum capacity. Parameters ---------- total_gen : float Target total generation capacity (MW). Returns ------- p_caps : np.ndarray Raw (possibly rescaled) capacity values, shape ``(N_g,)``. max_r_pgmax : float Maximum capacity value, used for normalization. normalized_r_pgmax : np.ndarray Capacities normalized to [0, 1] by dividing by ``max_r_pgmax``. """ if self.n_gen == 0: return np.array([]), 0.0 # Mean capacity per generator mu = total_gen / self.n_gen # Base distribution: Exponential p_caps = np.random.exponential(scale=mu, size=self.n_gen) # Super large capacities (approx 1%) n_super = int(round(0.01 * self.n_gen)) if n_super > 0: max_p = np.max(p_caps) # Uniform random [1*max, 3*max] (based on: max(P) + 2*rand*max(P)) p_super = max_p + (2.0 * np.random.rand(n_super) * max_p) # Replace random indices with super large values indices = np.random.choice(self.n_gen, size=n_super, replace=False) p_caps[indices] = p_super # Scaling check current_sum = np.sum(p_caps) if current_sum > 1.05 * total_gen or current_sum < 0.90 * total_gen: p_caps = p_caps * (total_gen / current_sum) max_r_pgmax = np.max(p_caps) normalized_r_pgmax = p_caps / max_r_pgmax return p_caps, max_r_pgmax, normalized_r_pgmax
[docs] def _assignment_logic(self, norm_deg_pairs: np.ndarray, norm_caps: np.ndarray, tab_2d: np.ndarray) -> np.ndarray: """ Assign normalized capacities to buses via 2D-binning (Stage 3). This implements the correlated assignment algorithm from Elyas et al. (2017) that preserves the empirical joint distribution of normalized capacity and normalized nodal degree. The steps are: 1. **Scale 2D table to counts:** multiply ``tab_2d`` by ``N_g`` and round to integer targets ``n_rc`` for each (capacity-class *r*, degree-class *c*) cell. Adjust rounding so that the total equals ``N_g`` exactly. 2. **Compute marginals:** column sums give degree-bin targets; row sums give capacity-bin targets. 3. **Bin by degree:** sort generators by normalized degree, partition into 14 degree bins according to column-sum targets. 4. **Bin by capacity:** sort normalized capacities, partition into 14 capacity bins according to row-sum targets. 5. **Nested assignment loop:** for each degree bin *c* = 1..14 and each capacity bin *r* = 14..1 (high to low): randomly sample ``n_rc`` capacities from capacity bin *r* (without replacement) and assign them to unassigned generators in degree bin *c*. 6. **Reassemble** all bins into the output array. Parameters ---------- norm_deg_pairs : np.ndarray Shape ``(N_g, 2)`` — columns are ``[NodeID, NormalizedDegree]``. norm_caps : np.ndarray Shape ``(N_g,)`` — normalized capacity values in [0, 1]. tab_2d : np.ndarray A 14x14 joint-probability matrix (rows=capacity classes, columns=degree classes). Returns ------- np.ndarray Shape ``(N_g, 3)`` — columns are ``[NodeID, NormalizedDegree, NormalizedCapacity]``. """ ng = len(norm_caps) # 1. Calculate target counts for the 2D table tab_2d_ng = np.round(tab_2d * ng).astype(int) # Fix rounding errors to match exactly ng current_sum = np.sum(tab_2d_ng) diff = ng - current_sum if diff != 0: # Add/Sub from the max element to assume least disruption # Flatten find max linear index flat_idx = np.argmax(tab_2d_ng) # unravel r_idx, c_idx = np.unravel_index(flat_idx, tab_2d_ng.shape) tab_2d_ng[r_idx, c_idx] += diff # 2. Binning Targets # Column sums = targets for Degree bins n_k_targets = np.sum(tab_2d_ng, axis=0) # 1x14 # Row sums = targets for Capacity bins n_g_targets = np.sum(tab_2d_ng, axis=1) # 1x14 # 3. Sort and Bin Nodes by Degree # Sort by degree (column 1) sorted_nodes = norm_deg_pairs[norm_deg_pairs[:, 1].argsort()] k_bins = [] current_idx = 0 for count in n_k_targets: end_idx = current_idx + count k_bins.append(sorted_nodes[current_idx:end_idx].copy()) current_idx = end_idx # 4. Sort and Bin Capacities # Sort capacities ascending sorted_caps = np.sort(norm_caps) g_bins = [] current_idx = 0 for count in n_g_targets: end_idx = current_idx + count # Keep as list for easy popping g_bins.append(list(sorted_caps[current_idx:end_idx])) current_idx = end_idx # 5. Assignment Loop (The 2D allocation) # Iterate Columns (Degree bins) 1..14 for k_idx in range(14): # kk # Iterate Rows (Capacity bins) 14..1 (High to Low) # MATLAB: for gg=14:-1:1 for g_idx in range(13, -1, -1): # gg count = tab_2d_ng[g_idx, k_idx] if count > 0: # Take 'count' capacities from G_bin[g_idx] # Since G_bin is sorted, random sampling minimizes bias within the bin caps_to_assign = [] source_bin = g_bins[g_idx] # Safety check if bin ran out due to rounding weirdness (shouldn't happen with logic above) take = min(count, len(source_bin)) for _ in range(take): # Random sample without replacement strategy from the bin # pop(0) takes smallest, pop(-1) takes largest in that bin # MATLAB uses datasample which is random rand_i = np.random.randint(0, len(source_bin)) caps_to_assign.append(source_bin.pop(rand_i)) # Assign to nodes in K_bin[k_idx] # We need to fill the 'NormCapacity' column (index 2 in final output, but we construct it) # K_bin currently: [NodeID, NormDeg] # We need to find 'take' nodes in K_bin that don't have capacity yet # Actually, K_bins are partitioned disjointly. We just fill them up. # We add a 3rd column for Capacity to k_bins data structures # Initialize if not present if k_bins[k_idx].shape[1] < 3: # Append column of -1s k_bins[k_idx] = np.hstack((k_bins[k_idx], np.full((len(k_bins[k_idx]), 1), -1.0))) # Find empty slots (-1) empty_slots = np.where(k_bins[k_idx][:, 2] == -1.0)[0] # Fill 'take' slots fill_indices = empty_slots[:take] k_bins[k_idx][fill_indices, 2] = caps_to_assign # 6. Reassemble # Stack all K bins final_list = [] for bin_arr in k_bins: if bin_arr.shape[1] == 3: # valid bin final_list.append(bin_arr) else: # Bin might be empty or untouched? pass if not final_list: return np.array([]) gen_setting = np.vstack(final_list) return gen_setting
[docs] def allocate(self, tab_2d: Optional[np.ndarray] = None) -> Dict[int, float]: """ Run the full generation-capacity allocation pipeline. Executes the three-stage methodology from Elyas et al. (2017): 1. Compute total generation capacity from the scaling law: ``Pg_tot = 10^(-0.21 * log10(N)^2 + 2.06 * log10(N) + 0.66)``. 2. Sample individual capacities, normalize them and the generator nodal degrees by their respective maxima so that both lie in [0, 1]: ``P_bar = P / max(P)``, ``k_bar = k / max(k)``. 3. Assign normalized capacities to generator buses via 2D binning using ``Tab_2D_Pgmax``. 4. Denormalize: ``Pg_max = P_bar * max(P)``. Parameters ---------- tab_2d : np.ndarray or None, optional Custom 14x14 probability matrix. If *None*, the default table for the selected ``ref_sys_id`` is used. Returns ------- dict[int, float] Mapping from generator node ID to its assigned capacity ``Pg_max`` (MW). """ if self.n_gen == 0: print("No generator buses found. Skipping capacity allocation.") return {} if tab_2d is None: tab_2d = self._get_default_tab_2d() # 1. Calculate Total Generation # Total_Generation = 10^(-0.21*(log10(N))^2+2.06*log10(N)+0.66) log10_n = np.log10(self.n_nodes) exponent = -0.21 * (log10_n**2) + 2.06 * log10_n + 0.66 total_gen = 10**exponent ref_label = "Heuristic (Default)" if self.ref_sys_id == 0 else f"Reference System {self.ref_sys_id}" print(f"Allocating Capacity for {self.n_gen} generators.") print(f"Total System Capacity Target: {total_gen:.2f} MW using {ref_label}") # 2. Get Node Degrees for Generators gen_degrees = [] for n in self.gen_buses: gen_degrees.append([n, self.graph.degree(n)]) gen_degrees = np.array(gen_degrees) max_node_degree = np.max(gen_degrees[:, 1]) # Normalized Node Degree [NodeID, NormDeg] norm_gen_degrees = gen_degrees.copy().astype(float) norm_gen_degrees[:, 1] = norm_gen_degrees[:, 1] / max_node_degree # 3. Initial Generation Distribution # We need actual values P_caps AND normalized values p_caps_actual, max_r_pgmax, norm_r_pgmax = self._initial_generation_distribution(total_gen) # 4. Assignment # Returns [NodeID, NormDeg, NormCap] norm_assignment = self._assignment_logic(norm_gen_degrees, norm_r_pgmax, tab_2d) # 5. Denormalize # Map NodeID -> NormCap * MaxCap final_allocation = {} for row in norm_assignment: node_id = int(row[0]) norm_cap = row[2] actual_cap = norm_cap * max_r_pgmax final_allocation[node_id] = actual_cap return final_allocation