r"""
Transmission-line impedance and capacity allocation.
This module assigns branch impedance components (:math:`R`, :math:`X`) and
thermal-capacity limits (:math:`F_l^{\max}`) to all transmission lines in a
synthetic power grid. The statistical models follow
`Sadeghian et al. (2018) <https://ieeexplore.ieee.org/document/8585532>`_
(Section IV) and the SynGrid MATLAB function ``sg_line.m``.
Impedance magnitudes are drawn from a LogNormal distribution; line angles
from a Lévy stable distribution. A DCPF-based swapping heuristic
associates low impedance with high-flow lines, reproducing empirical
correlations. Capacity factors (gauge ratios) are assigned through a
2-D empirical PMF table.
.. todo::
`Reed et al. (2006) <https://www.tandfonline.com/doi/full/10.1081/STA-120037438>`_
argues that impedance magnitudes may follow a clipped
double-Pareto-logNormal distribution, which could be explored.
"""
import numpy as np
import networkx as nx
from scipy.stats import levy_stable
from typing import Dict, Tuple, List
from ..core.reference_data import get_reference_stats
from ..core.dcpf import DCPowerFlow
[docs]
class TransmissionLineAllocator:
r"""Allocate impedance and capacity limits to transmission lines.
The algorithm follows `Sadeghian et al. (2018)
<https://ieeexplore.ieee.org/document/8585532>`_ and the SynGrid
MATLAB toolbox (``sg_line.m``, ``sg_flow_lim.m``):
1. **Impedance generation** — magnitudes :math:`Z` from
LogNormal(:math:`\mu`, :math:`\sigma`), angles :math:`\varphi`
from a Lévy stable distribution
:math:`S(\alpha_s, \beta_s, \gamma_s, \delta_s)`. Then
:math:`X = Z \sin\varphi`, :math:`R = Z \cos\varphi`.
2. **DCPF-based swapping** — sort impedances ascending and flows
descending, then randomly swap ~20–30 % of assignments to
introduce variance while preserving the negative correlation
between impedance and flow.
3. **(Optional) Topology refinement** — iteratively add
low-impedance lines between max angle-difference bus pairs
and remove weak high-:math:`X` lines until the angle spread
is below a size-dependent threshold.
4. **Capacity assignment** — gauge ratios
:math:`\beta_l = F_l / F_l^{\max}` from
Exponential(:math:`\mu_\beta`) with overload injection;
assigned via 2-D table ``Tab_2D_FlBeta``.
Capacity limits: :math:`F_l^{\max} = F_l / \beta_l`.
Parameters
----------
graph : networkx.Graph
Power grid graph with nodal generation/load attributes.
ref_sys_id : int, optional
Reference system (1 = NYISO-2935, 2 = WECC-16994). Default 1.
Attributes
----------
stab_params : list of float
Lévy stable parameters :math:`[\alpha_s, \beta_s, \gamma_s, \delta_s]`.
tab_fl_beta : numpy.ndarray
2-D empirical PMF table for flow–beta assignment.
mu_beta : float
Mean of the exponential distribution for :math:`\beta`.
overload_b : float
Fraction of lines assigned overload (:math:`\beta > 1`).
"""
def __init__(self, graph: nx.Graph, ref_sys_id: int = 1):
self.graph = graph
self.ref_sys_id = ref_sys_id
try:
self.stats = get_reference_stats(ref_sys_id)
except ValueError:
self.stats = get_reference_stats(1)
self.stab_params = self.stats.get('stab', [1.374, -0.838, 2.965, 85.801])
self.tab_fl_beta = self.stats['Tab_2D_FlBeta']
self.mu_beta = self.stats.get('mu_beta', 0.27)
self.overload_b = self.stats.get('Overload_b', 0.00083)
[docs]
def _generate_phi(self, num_lines: int) -> np.ndarray:
r"""Generate line angles from a Lévy stable distribution.
Draws :math:`\varphi \sim S(\alpha_s, \beta_s, \gamma_s, \delta_s)`
and clips to :math:`[0.01, 89.99]` degrees. Out-of-range samples
are resampled up to 20 times before a hard clip.
Parameters
----------
num_lines : int
Number of transmission lines.
Returns
-------
numpy.ndarray, shape (num_lines,)
Line angle in degrees for each branch.
"""
alpha, beta, gamma, delta = self.stab_params
phi = levy_stable.rvs(alpha, beta, loc=delta, scale=gamma, size=num_lines)
# Ensure phi is within logical bounds [0, 90] degrees mostly
mask = (phi > 90) | (phi < 0)
max_retries = 20
count = 0
while np.any(mask) and count < max_retries:
n_resample = np.sum(mask)
new_vals = levy_stable.rvs(alpha, beta, loc=delta, scale=gamma, size=n_resample)
phi[mask] = new_vals
mask = (phi > 90) | (phi < 0)
count += 1
# Hard clip fallback if retries exhausted
phi = np.clip(phi, 0.01, 89.99)
return phi
[docs]
def _generate_beta(self, num_lines: int) -> np.ndarray:
r"""Generate gauge ratios from an exponential distribution.
Draws :math:`\beta \sim \mathrm{Exp}(\mu_\beta)` and resamples
values exceeding 1.0. A fraction ``overload_b`` of lines are then
injected with :math:`\beta \in (1.0, 1.2]` to model bottleneck
/ overloaded lines (Sadeghian et al., 2018, Sec. IV).
Parameters
----------
num_lines : int
Number of transmission lines.
Returns
-------
numpy.ndarray, shape (num_lines,)
Sorted gauge-ratio values.
"""
beta = np.random.exponential(scale=self.mu_beta, size=num_lines)
# Handle outliers (beta > 1)
mask = beta > 1.0
max_retries = 10
count = 0
while np.any(mask) and count < max_retries:
n_resample = np.sum(mask)
beta[mask] = np.random.exponential(scale=self.mu_beta, size=n_resample)
mask = beta > 1.0
count += 1
# Overload injection (simulating bottleneck lines)
n_overload = int(round(num_lines * self.overload_b))
if n_overload > 0:
indices = np.random.choice(num_lines, n_overload, replace=False)
# Overload_bet = 1 + 0.2 * rand
beta[indices] = 1.0 + 0.2 * np.random.rand(n_overload)
# Avoid zero or near-zero betas (division by zero protection)
small_mask = beta < 1e-4
n_small = np.sum(small_mask)
if n_small > 0:
beta[small_mask] = 0.01 + 0.099 * np.random.rand(n_small)
return np.sort(beta)
[docs]
def _assign_betas(self, flows: np.ndarray, betas: np.ndarray) -> np.ndarray:
r"""Assign gauge ratios to lines via 2-D bin matching.
Uses the empirical PMF ``Tab_2D_FlBeta`` to reproduce the joint
distribution :math:`f(\bar{F}_l, \beta_l)` from the reference
system. Lines are sorted by normalised flow and betas by value,
then paired randomly within matching bins (high-to-low traversal).
Parameters
----------
flows : numpy.ndarray, shape (m, 2)
``[line_index, normalised_flow]``.
betas : numpy.ndarray, shape (m,)
Sorted gauge-ratio values from :meth:`_generate_beta`.
Returns
-------
numpy.ndarray, shape (m, 3)
``[line_index, normalised_flow, beta]``.
"""
num_lines = len(flows)
if num_lines == 0: return np.array([])
tab_n = np.round(self.tab_fl_beta * num_lines).astype(int)
# Adjust for rounding errors to match total lines
diff = num_lines - np.sum(tab_n)
if diff != 0:
flat_idx = np.argmax(tab_n)
r, c = np.unravel_index(flat_idx, tab_n.shape)
tab_n[r, c] += diff
# Targets per Flow class (Rows) and Beta class (Cols)
n_flow_targets = np.sum(tab_n, axis=1)
n_beta_targets = np.sum(tab_n, axis=0)
# Sort flows (input is [idx, val])
sorted_flows = flows[flows[:, 1].argsort()]
# Binning Flows
flow_bins = []
curr = 0
for count in n_flow_targets:
end = curr + count
flow_bins.append(list(sorted_flows[curr:end]))
curr = end
# Binning Betas (already sorted)
beta_bins = []
curr = 0
for count in n_beta_targets:
end = curr + count
beta_bins.append(list(betas[curr:end]))
curr = end
# Assignment Loop: Rows(Flows) x Cols(Betas)
final_assignment = [] # [LineIdx, FlowVal, BetaVal]
# Iterate Rows (Flows) - typically high to low in MATLAB (14:-1:1)
for r in range(len(n_flow_targets)-1, -1, -1):
# Iterate Cols (Betas) - typically high to low in MATLAB (16:-1:1)
for c in range(len(n_beta_targets)-1, -1, -1):
count = tab_n[r, c]
if count > 0:
f_bin = flow_bins[r]
b_bin = beta_bins[c]
take = min(count, len(f_bin), len(b_bin))
for _ in range(take):
if f_bin and b_bin:
# Random sample without replacement
f_idx = np.random.randint(0, len(f_bin))
flow_data = f_bin.pop(f_idx) # [LineIdx, FlowVal]
b_idx = np.random.randint(0, len(b_bin))
beta_val = b_bin.pop(b_idx)
final_assignment.append([flow_data[0], flow_data[1], beta_val])
# Handle leftovers if any (due to bin mismatch/emptying)
rem_flows = [f for bin in flow_bins for f in bin]
rem_betas = [b for bin in beta_bins for b in bin]
for i in range(min(len(rem_flows), len(rem_betas))):
final_assignment.append([rem_flows[i][0], rem_flows[i][1], rem_betas[i]])
return np.array(final_assignment)
[docs]
def _refine_topology(self):
r"""Refine grid topology to reduce phase-angle spread.
Iteratively tightens the electrical diameter of the network
(ported from ``sg_flow_lim.m``):
1. Compute the DCPF and measure
:math:`\Delta\theta_{\max} = \max(\theta) - \min(\theta)`.
2. If :math:`\Delta\theta_{\max} > TT + 2` with
:math:`TT = 10^{0.3196 \log_{10} N + 0.8324}`,
add a low-impedance edge between the bus pair with the
largest angle difference.
3. Remove a random high-:math:`X` edge (top 20 %) whose
end-point degrees are both :math:`\ge 3`, preserving
graph connectivity.
4. Repeat for up to 10 iterations.
"""
n_nodes = self.graph.number_of_nodes()
# Target threshold for angle difference
tt_target = 10**(0.3196 * np.log10(n_nodes) + 0.8324)
max_iter = 10 # Safety limit to prevent infinite loops
for _ in range(max_iter):
dcpf = DCPowerFlow(self.graph)
flows, angles = dcpf.run()
theta_rad = np.array(list(angles.values()))
node_keys = list(angles.keys())
# Convert to degrees
theta_deg = np.degrees(theta_rad)
if len(theta_deg) == 0: break
angle_spread = np.max(theta_deg) - np.min(theta_deg)
# Condition: max(abs(DD)) > TT_target + 2
if angle_spread <= tt_target + 2:
break # Grid is sufficiently compact electrically
# --- 1. Add Strengthening Line ---
# Find pair with max difference
min_idx = np.argmin(theta_deg)
max_idx = np.argmax(theta_deg)
min_node = node_keys[min_idx]
max_node = node_keys[max_idx]
# Add edge if not exists
if not self.graph.has_edge(min_node, max_node):
# Low impedance parameters from MATLAB code
r_new = 0.001 + np.random.rand() * 0.001
x_new = 0.002 + np.random.rand() * 0.003
z_new = np.sqrt(r_new**2 + x_new**2)
self.graph.add_edge(min_node, max_node, r=r_new, x=x_new, z=z_new)
# --- 2. Remove Weak Line ---
# Candidates: Select from lines with high X (Impedance)
edges = list(self.graph.edges(data=True))
if not edges: break
# Sort by X descending (High X = Weak/Long line)
sorted_edges = sorted(edges, key=lambda e: e[2].get('x', 0), reverse=True)
n_edges = len(sorted_edges)
br_sl = 0.8 # Select top 20%
n_candidates = int(n_edges * (1.0 - br_sl))
n_candidates = max(n_candidates, 1)
# Candidates are the 'tail' of the list in MATLAB (high X)
candidates = sorted_edges[:n_candidates]
# Filter by degree to avoid isolating nodes
valid_candidates = []
min_deg = 3 if n_nodes >= 40 else 2
for u, v, d in candidates:
if self.graph.degree(u) >= min_deg and self.graph.degree(v) >= min_deg:
valid_candidates.append((u, v, d))
if valid_candidates:
# Pick random candidate to remove
idx = np.random.randint(0, len(valid_candidates))
u_rem, v_rem, d_rem = valid_candidates[idx]
self.graph.remove_edge(u_rem, v_rem)
# Verify Grid Integrity
# MATLAB: checks success flag. Here we check connectivity.
if not nx.is_connected(self.graph):
# Revert if disconnected
self.graph.add_edge(u_rem, v_rem, **d_rem)
[docs]
def allocate(self, refine_topology: bool = False) -> Dict[Tuple[int, int], float]:
r"""Run the full transmission-line allocation pipeline.
Executes the seven-step procedure:
1. Draw impedance magnitudes
:math:`Z \sim \text{LogNormal}(\mu, \sigma)`, clipped to
[0.001, 0.5] p.u.
2. Generate angles :math:`\varphi` (Lévy stable), compute
:math:`X = Z\sin\varphi`, :math:`R = Z\cos\varphi`.
3. Iterative DCPF swapping: sort :math:`Z` ascending / flows
descending, randomly swap ~20–30 % of assignments.
4. (Optional) Topology refinement via :meth:`_refine_topology`.
5. Final DCPF to obtain converged flows.
6. Generate and assign gauge ratios (:math:`\beta`) via
:meth:`_generate_beta` and :meth:`_assign_betas`.
7. Set capacity limits:
:math:`F_l^{\max} = F_l / \beta_l` with a minimum-capacity
fallback (5 + 100 · rand MW when :math:`\le 2`).
Parameters
----------
refine_topology : bool, optional
If ``True``, run topology refinement after step 3. Default
is ``False``.
Returns
-------
dict
Mapping ``(u, v)`` edge tuple to capacity limit (MW).
"""
edges = list(self.graph.edges())
m_lines = len(edges)
n_nodes = self.graph.number_of_nodes()
if m_lines == 0: return {}
# 1. Initial Impedance (Zpr)
# LogNormal distribution
zpr_pars = self.stats.get('Zpr_pars', [-2.38, 1.99, 1.99])
mu_len = zpr_pars[0]
sigma_len = zpr_pars[1]
zpr = np.random.lognormal(mu_len, sigma_len, m_lines)
# physical practice in the per-unit system, and for stability
zpr = np.clip(zpr, 0.001, 0.5)
# 2. Generate Phi and Initial X, R
phi = self._generate_phi(m_lines)
for i, (u, v) in enumerate(edges):
# X = Z * sin(phi), R = Z * cos(phi)
# Zpr is magnitude
x_val = zpr[i] * np.sin(np.deg2rad(phi[i]))
r_val = zpr[i] * np.cos(np.deg2rad(phi[i]))
self.graph[u][v]['x'] = max(x_val, 1e-5)
self.graph[u][v]['r'] = max(r_val, 1e-5)
self.graph[u][v]['z'] = zpr[i]
self.graph[u][v]['edge_idx'] = i
# 3. Iterative Refinement
iterations = 2 if n_nodes >= 300 else 1
dcpf = DCPowerFlow(self.graph)
for _ in range(iterations):
# Calculate Flow
flows, _ = dcpf.run()
# Map flows to current edges
flow_vals = np.zeros(m_lines)
for i, (u, v) in enumerate(edges):
flow_vals[i] = abs(flows.get((u, v), 0.0))
# Sort Z magnitudes (Ascending)
current_z = np.array([self.graph[u][v]['z'] for u, v in edges])
sorted_z = np.sort(current_z)
# Sort Flows (Descending)
# We want High Flow to get Low Z
flow_indices = np.argsort(-flow_vals)
# Create a pool of Z values ordered by flow rank
# z_pool[0] is smallest Z, assigned to flow_indices[0] (highest flow)
z_pool = sorted_z.copy()
# Swapping Logic
if n_nodes > 1200:
as_param = 0.3
an_param = 0.8
else:
as_param = 0.2
an_param = 0.2
n_swap = int(round(as_param * m_lines))
n_neib = int(round(an_param * m_lines))
for _ in range(n_swap):
# Pick random index
idx1 = np.random.randint(0, m_lines)
if idx1 == 0: idx1 = 1
# Pick neighbor
xch_disf = int(np.floor(n_neib * np.random.rand()))
xch_dis = min(xch_disf, (m_lines - 1 - idx1))
idx2 = idx1 + xch_dis
# Swap Z values
if idx2 < m_lines:
z_pool[idx1], z_pool[idx2] = z_pool[idx2], z_pool[idx1]
# Assign Z back to lines
for rank_i, line_idx in enumerate(flow_indices):
z_val = z_pool[rank_i]
u, v = edges[line_idx]
# Recalculate X, R using original Phi structure
p = phi[line_idx]
self.graph[u][v]['z'] = z_val
self.graph[u][v]['x'] = max(z_val * np.sin(np.deg2rad(p)), 1e-5)
self.graph[u][v]['r'] = max(z_val * np.cos(np.deg2rad(p)), 1e-5)
# 4. Topology Refinement (Optional)
if refine_topology:
self._refine_topology()
# Refresh edge list after topology changes
edges = list(self.graph.edges())
m_lines = len(edges)
# 5. Final Flow Calculation
flows, _ = dcpf.run()
flow_vals = np.array([abs(flows.get((u, v), 0.0)) for u, v in edges])
max_flow = np.max(flow_vals) if len(flow_vals) > 0 and np.max(flow_vals) > 0 else 1.0
# 6. Generate & Assign Capacity Factors (Beta)
betas = self._generate_beta(m_lines)
# Normalize flows
norm_flow_data = []
for i, val in enumerate(flow_vals):
norm_flow_data.append([i, val / max_flow])
norm_flow_data = np.array(norm_flow_data)
# Assign Betas
assigned = self._assign_betas(norm_flow_data, betas)
# 7. Calculate & Set Limits
line_caps = {}
for row in assigned:
idx = int(row[0])
beta = row[2]
u, v = edges[idx]
flow_mw = flow_vals[idx]
if beta < 1e-3: beta = 1e-3
limit = flow_mw / beta
# Min capacity check (MATLAB: <=2 -> 5 + 100*rand)
if limit <= 2.0:
limit = 5.0 + 100 * np.random.rand()
self.graph[u][v]['capacity'] = limit
line_caps[(u, v)] = limit
return line_caps