Source code for powergrid_synth.distribution.distribution_analysis

"""
Distribution analysis: fit Schweitzer parameters from reference feeders.

Provides routines to extract empirical distributions from a collection of
real feeders and fit the parametric families used in the Schweitzer model
(via MLE / method-of-moments / KL minimization).
"""

from __future__ import annotations

from typing import Dict, List, Optional, Sequence, Tuple

import networkx as nx
import numpy as np
from scipy import optimize, stats

from .distribution_params import (
    BetaParams,
    DistributionSynthParams,
    ExponentialClip,
    ExponentialParams,
    MixtureGammaParams,
    MixtureNormalParams,
    MixturePoissonParams,
    ModifiedCauchyParams,
    NegBinomParams,
    NormalParams,
    PowerLawClip,
    TLocationScaleParams,
)


# ------------------------------------------------------------------
# Extraction helpers
# ------------------------------------------------------------------

[docs] def _hop_distances(feeders: Sequence[nx.Graph]) -> np.ndarray: """Collect all hop distances (excluding source h=0 and root h=1).""" hops = [] for G in feeders: for n in G.nodes: h = G.nodes[n].get("h", 0) if h >= 2: hops.append(h - 2) # shift to 0-based for NegBinom fitting return np.array(hops)
[docs] def _degrees(feeders: Sequence[nx.Graph]) -> np.ndarray: """Collect all node degrees (excluding source/leaf fixed degrees).""" degs = [] for G in feeders: max_h = max(G.nodes[n]["h"] for n in G.nodes) for n in G.nodes: h = G.nodes[n]["h"] if h == 0 or h == max_h: continue # source / leaf — deterministic degs.append(G.degree(n)) return np.array(degs, dtype=float)
[docs] def _cable_lengths(feeders: Sequence[nx.Graph]) -> np.ndarray: """Collect all cable lengths in km.""" lens = [] for G in feeders: for e in G.edges: l = G.edges[e].get("length_km", None) if l is not None and l > 0: lens.append(l) return np.array(lens)
[docs] def _intermediate_fractions(feeders: Sequence[nx.Graph]) -> np.ndarray: """Fraction of nodes that are intermediate, per feeder.""" fracs = [] for G in feeders: n = G.number_of_nodes() n_int = sum( 1 for nd in G.nodes if G.nodes[nd].get("node_type") == "intermediate" ) fracs.append(n_int / n if n > 0 else 0.0) return np.array(fracs)
[docs] def _injection_fractions(feeders: Sequence[nx.Graph]) -> np.ndarray: """Fraction of nodes that are injection, per feeder.""" fracs = [] for G in feeders: n = G.number_of_nodes() n_inj = sum( 1 for nd in G.nodes if G.nodes[nd].get("node_type") == "injection" ) fracs.append(n_inj / n if n > 0 else 0.0) return np.array(fracs)
[docs] def _load_deviations(feeders: Sequence[nx.Graph]) -> np.ndarray: """Per-feeder deviations of load from uniform share.""" devs = [] for G in feeders: load_nodes = [ n for n in G.nodes if G.nodes[n].get("node_type") == "load" and G.nodes[n].get("P_mw", 0) > 0 ] if not load_nodes: continue total = sum(G.nodes[n]["P_mw"] for n in load_nodes) uniform = total / len(load_nodes) if total > 0 else 1.0 for n in load_nodes: devs.append((G.nodes[n]["P_mw"] - uniform) / total if total > 0 else 0.0) return np.array(devs)
# ------------------------------------------------------------------ # Fitting routines # ------------------------------------------------------------------
[docs] def fit_neg_binomial(data: np.ndarray) -> NegBinomParams: """Fit Negative Binomial to hop-distance data (0-shifted).""" mean_x = data.mean() var_x = data.var() if var_x <= mean_x: var_x = mean_x + 0.01 # ensure overdispersion for NB p = mean_x / var_x r = mean_x * p / (1 - p) return NegBinomParams(r=float(r), p=float(p))
[docs] def fit_mixture_gamma(data: np.ndarray) -> MixtureGammaParams: """Fit a two-component Gamma mixture via EM (simplified). Uses a basic EM iteration (50 steps) with Gamma MLE per component. Falls back to defaults if the data is insufficient. """ data = data[data > 0] if len(data) < 10: return MixtureGammaParams() # fallback to defaults # Initialize by splitting at median med = np.median(data) z = (data <= med).astype(float) pi = z.mean() a1, b1, a2, b2 = 1.5, 0.65, 4.4, 1.67 # safe defaults for _ in range(50): d1 = data[z > 0.5] d2 = data[z <= 0.5] if len(d1) < 2 or len(d2) < 2: break try: a1, _, b1 = stats.gamma.fit(d1, floc=0) a2, _, b2 = stats.gamma.fit(d2, floc=0) except (ValueError, RuntimeError): break # E-step pdf1 = stats.gamma.pdf(data, a1, scale=b1) * pi pdf2 = stats.gamma.pdf(data, a2, scale=b2) * (1 - pi) total = pdf1 + pdf2 total = np.where(total < 1e-300, 1e-300, total) z = pdf1 / total pi = z.mean() return MixtureGammaParams(pi=float(pi), a1=float(a1), b1=float(b1), a2=float(a2), b2=float(b2))
[docs] def fit_beta(data: np.ndarray) -> BetaParams: """Fit Beta distribution to fraction data in (0, 1).""" data = data[(data > 0) & (data < 1)] if len(data) < 2: return BetaParams() a, b, _, _ = stats.beta.fit(data, floc=0, fscale=1) return BetaParams(alpha=float(a), beta=float(b))
[docs] def fit_t_location_scale(data: np.ndarray) -> TLocationScaleParams: """Fit t-Location-Scale distribution via MLE.""" if len(data) < 3: return TLocationScaleParams() nu, mu, sigma = stats.t.fit(data) return TLocationScaleParams(mu=float(mu), sigma=float(sigma), nu=float(nu))
[docs] def fit_modified_cauchy(data: np.ndarray) -> ModifiedCauchyParams: """Fit positive-support Cauchy (x0, gamma) to cable-length data.""" data = data[data > 0] if len(data) < 3: return ModifiedCauchyParams() loc, scale = stats.cauchy.fit(data) return ModifiedCauchyParams(x0=float(loc), gamma=float(scale))
[docs] def fit_exponential_clip( feeders: Sequence[nx.Graph], ) -> ExponentialClip: """Fit exponential clipping g_max(h) = a * exp(b*h) from max lengths per hop.""" from collections import defaultdict max_by_h: Dict[int, float] = defaultdict(float) for G in feeders: for u, v in G.edges: h = max(G.nodes[u]["h"], G.nodes[v]["h"]) l = G.edges[u, v].get("length_km", 0) if l > max_by_h[h]: max_by_h[h] = l if len(max_by_h) < 2: return ExponentialClip() hs = np.array(sorted(max_by_h.keys()), dtype=float) vals = np.array([max_by_h[int(h)] for h in hs]) vals = np.where(vals > 0, vals, 1e-6) # Linear regression on log(vals) = log(a) + b*h log_vals = np.log(vals) coeffs = np.polyfit(hs, log_vals, 1) b = float(coeffs[0]) a = float(np.exp(coeffs[1])) return ExponentialClip(a=a, b=b)
[docs] def fit_power_law_clip( feeders: Sequence[nx.Graph], ) -> PowerLawClip: """Fit power-law clipping g_dmax(h) = a * h^b from max degrees per hop.""" from collections import defaultdict max_by_h: Dict[int, int] = defaultdict(int) for G in feeders: for n in G.nodes: h = G.nodes[n]["h"] d = G.degree(n) if d > max_by_h[h]: max_by_h[h] = d hs_raw = sorted(h for h in max_by_h if h > 0) if len(hs_raw) < 2: return PowerLawClip() hs = np.array(hs_raw, dtype=float) vals = np.array([max_by_h[int(h)] for h in hs], dtype=float) vals = np.where(vals > 0, vals, 1) log_h = np.log(hs) log_v = np.log(vals) coeffs = np.polyfit(log_h, log_v, 1) b = float(coeffs[0]) a = float(np.exp(coeffs[1])) return PowerLawClip(a=a, b=b)
# ------------------------------------------------------------------ # High-level fitting # ------------------------------------------------------------------
[docs] def fit_params_from_feeders( feeders: Sequence[nx.Graph], ) -> DistributionSynthParams: """Fit all Schweitzer parameters from a collection of reference feeders. Parameters ---------- feeders : sequence of nx.Graph Reference distribution feeder graphs, each annotated with ``h``, ``P_mw``, ``node_type``, ``length_km`` attributes. Returns ------- DistributionSynthParams Fitted parameter set ready for ``SchweetzerFeederGenerator``. """ hops = _hop_distances(feeders) degs = _degrees(feeders) lengths = _cable_lengths(feeders) int_frac = _intermediate_fractions(feeders) inj_frac = _injection_fractions(feeders) load_dev = _load_deviations(feeders) params = DistributionSynthParams( hop_dist=fit_neg_binomial(hops) if len(hops) > 0 else NegBinomParams(), degree_dist=fit_mixture_gamma(degs) if len(degs) > 0 else MixtureGammaParams(), degree_clip=fit_power_law_clip(feeders), intermediate_frac=fit_beta(int_frac) if len(int_frac) > 0 else BetaParams(1.64, 15.77), injection_frac=fit_beta(inj_frac) if len(inj_frac) > 0 else BetaParams(0.92, 20.53), load_deviation=fit_t_location_scale(load_dev) if len(load_dev) > 0 else TLocationScaleParams(), cable_length=fit_modified_cauchy(lengths) if len(lengths) > 0 else ModifiedCauchyParams(), length_clip=fit_exponential_clip(feeders), ) return params