Source code for powergrid_synth.distribution.distribution_input_model

"""
Input model for the Schweitzer feeder generator.

Uses a 3-D kernel density estimate (KDE) over (N_nodes, P_load, P_gen) to
sample realistic combinations of feeder size and loading, as described in
Section III-A of Schweitzer et al. (2017).
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import List, Optional, Sequence, Tuple

import networkx as nx
import numpy as np
from scipy.stats import gaussian_kde


[docs] @dataclass class FeederInputSample: """A single draw from the 3-D input model.""" n_nodes: int total_load_mw: float total_gen_mw: float
[docs] class DistributionInputModel: """3-D KDE input model for feeder size and loading. The model is fitted on reference feeders and then sampled to produce realistic ``(n_nodes, total_load, total_gen)`` triples. Parameters ---------- bw_method : str or float, optional Bandwidth method passed to :class:`scipy.stats.gaussian_kde`. seed : int or None, optional Random seed for reproducibility. """ def __init__( self, bw_method: Optional[str | float] = None, seed: Optional[int] = None, ): self.bw_method = bw_method self.rng = np.random.default_rng(seed) self._kde: Optional[gaussian_kde] = None self._data: Optional[np.ndarray] = None
[docs] def fit(self, feeders: Sequence[nx.Graph]) -> "DistributionInputModel": """Fit the KDE from a collection of reference feeder graphs. Each graph must have node attributes ``P_mw`` (positive for load, negative for generation). Parameters ---------- feeders : sequence of nx.Graph Reference feeder graphs. Returns ------- self """ rows: List[Tuple[float, float, float]] = [] for G in feeders: n = G.number_of_nodes() total_load = sum( G.nodes[nd].get("P_mw", 0.0) for nd in G.nodes if G.nodes[nd].get("P_mw", 0.0) > 0 ) total_gen = sum( abs(G.nodes[nd].get("P_mw", 0.0)) for nd in G.nodes if G.nodes[nd].get("P_mw", 0.0) < 0 ) rows.append((float(n), total_load, total_gen)) data = np.array(rows).T # shape (3, n_feeders) self._data = data self._kde = self._fit_kde(data) return self
[docs] def fit_from_arrays( self, n_nodes: Sequence[int], total_load: Sequence[float], total_gen: Sequence[float], ) -> "DistributionInputModel": """Fit directly from arrays (no graph objects needed). Parameters ---------- n_nodes : array-like of int total_load : array-like of float total_gen : array-like of float Returns ------- self """ data = np.array([ np.asarray(n_nodes, dtype=float), np.asarray(total_load, dtype=float), np.asarray(total_gen, dtype=float), ]) self._data = data self._kde = self._fit_kde(data) return self
[docs] def _fit_kde(self, data: np.ndarray) -> gaussian_kde: """Fit KDE with regularization for near-singular covariance.""" try: return gaussian_kde(data, bw_method=self.bw_method) except np.linalg.LinAlgError: # Add small jitter to break degeneracy scale = np.std(data, axis=1, keepdims=True) scale = np.where(scale < 1e-10, 1.0, scale) jitter = self.rng.normal(0, 1e-6, size=data.shape) * scale return gaussian_kde(data + jitter, bw_method=self.bw_method)
[docs] def sample(self, n_samples: int = 1) -> List[FeederInputSample]: """Draw samples from the fitted input model. Parameters ---------- n_samples : int Number of samples to draw. Returns ------- list of FeederInputSample """ if self._kde is None: raise RuntimeError("Model not fitted. Call fit() first.") samples = self._kde.resample(n_samples, seed=self.rng) results = [] for i in range(n_samples): n = max(3, int(round(samples[0, i]))) # minimum 3 nodes load = max(0.0, float(samples[1, i])) gen = max(0.0, float(samples[2, i])) results.append(FeederInputSample(n, load, gen)) return results
[docs] def pdf(self, n_nodes: float, total_load: float, total_gen: float) -> float: """Evaluate the KDE density at a given point. Parameters ---------- n_nodes, total_load, total_gen : float Coordinates in the 3-D input space. Returns ------- float Estimated probability density. """ if self._kde is None: raise RuntimeError("Model not fitted. Call fit() first.") result = self._kde(np.array([[n_nodes], [total_load], [total_gen]])) return float(result[0])