r"""
Generation dispatch for synthetic power grids.
This module implements the generation dispatch algorithm from
`Sadeghian et al. (2018) <https://ieeexplore.ieee.org/document/8585532>`_.
It partitions generator units into three groups — **uncommitted**
(:math:`\alpha = 0`), **partially committed** (:math:`0 < \alpha < 1`),
and **fully committed** (:math:`\alpha \approx 1`) — then assigns
participation factors (dispatch factors) and iteratively balances total
generation against total load. The dispatch factor is defined as
.. math:: \alpha_i = P_{g_i} / P_{g_i}^{\max}, \qquad i = 1, \ldots, N_G
The correlation between normalised capacity and dispatch factor is
reproduced via a 2-D empirical PMF table (``Tab_2D_Pg``).
Ported from the SynGrid MATLAB function ``sg_gen_dist.m``.
"""
import numpy as np
import networkx as nx
from typing import List, Dict, Tuple, Optional
from ..core.reference_data import get_reference_stats
[docs]
class GenerationDispatcher:
r"""Assign active-power dispatch to each generator bus.
The algorithm follows `Sadeghian et al. (2018)
<https://ieeexplore.ieee.org/document/8585532>`_ (Section III):
1. **Uncommitted units** (10–20 %): :math:`\alpha = 0`, selected via
targets drawn from Uniform[0, 0.6].
2. **Partially committed units** (40–50 %): selected via exponential
distribution on capacity; dispatch factors assigned through a
2-D bin-matching table ``Tab_2D_Pg`` (:math:`14 \times 10`).
3. **Fully committed units** (remainder): :math:`\alpha = 1`.
4. **Balancing loop**: iteratively adjusts dispatch to match total
load within 1 % tolerance.
Parameters
----------
graph : networkx.Graph
Power grid graph. Generator nodes must have ``'bus_type' == 'Gen'``
and ``'pg_max'`` (MW) attributes. Load nodes must have ``'pl'`` (MW).
ref_sys_id : int, optional
Reference system for statistical tables (1 = NYISO-2935,
2 = WECC-16994, 3 = additional reference). Default is 1.
Attributes
----------
alpha_mod : int
Loading-level flag from the reference system. When 0 all alphas
are Uniform[0, 1]; otherwise 0.5 % receive negative dispatch
(e.g., pumped-storage hydro).
mu_committed : float
Exponential-distribution parameter for committed-unit capacities.
tab_2d_pg : numpy.ndarray
2-D empirical PMF table (14 capacity bins × 10 alpha bins).
"""
def __init__(self, graph: nx.Graph, ref_sys_id: int = 1):
self.graph = graph
self.ref_sys_id = ref_sys_id
# Load stats
try:
self.stats = get_reference_stats(ref_sys_id)
except ValueError:
print(f"Warning: Invalid ref_sys_id {ref_sys_id}. Defaulting to 1.")
self.stats = get_reference_stats(1)
self.alpha_mod = self.stats['Alpha_mod']
self.mu_committed = self.stats['mu_committed']
self.tab_2d_pg = self.stats['Tab_2D_Pg']
[docs]
def _select_uncommitted(self, norm_pg_max: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
r"""Select generators to be uncommitted (:math:`\alpha = 0`).
Randomly selects 10–20 % of total generator units. Target
capacities are drawn from Uniform[0, 0.6] and the unit whose
normalised capacity is closest to each target is selected.
This reproduces the empirical observation that uncommitted units
tend to be small or medium-size (Sadeghian et al., 2018, Sec. III).
Parameters
----------
norm_pg_max : numpy.ndarray, shape (n, 2)
Array with columns ``[bus_id, normalised_capacity]``.
Returns
-------
uncommitted : numpy.ndarray, shape (m, 3)
Uncommitted units: ``[bus_id, norm_cap, alpha=0]``.
remaining : numpy.ndarray, shape (n-m, 2)
Units not selected.
"""
ng = len(norm_pg_max)
if ng == 0:
return np.array([]), norm_pg_max
# 10% to 20% of the total as uncommitted
ng_uncomm = int(round(ng * (0.10 + np.random.rand() * 0.10)))
uncommitted = []
remaining = norm_pg_max.copy()
for _ in range(ng_uncomm):
if len(remaining) == 0: break
t_uncomm = 0.6 * np.random.rand() # uniform distribution 0-0.6
dists = np.abs(remaining[:, 1] - t_uncomm)
idx = np.argmin(dists) # select the unit closest to the random number
row = remaining[idx]
uncommitted.append([row[0], row[1], 0.0]) # [ID, NormCap, Alpha=0]
remaining = np.delete(remaining, idx, axis=0)
return np.array(uncommitted), remaining
[docs]
def _select_committed(self, norm_pg_max: np.ndarray, total_units_count: int) -> Tuple[np.ndarray, np.ndarray]:
r"""Select generators to be partially committed (:math:`0 < \alpha < 1`).
Selects 40–50 % of *total* generator count. 99 % of these are
chosen by matching to targets drawn from an exponential distribution
with parameter :math:`\mu_{\text{committed}}`; the remaining 1 %
are drawn from the extreme tail Uniform[0.5, 1.0], capturing
super-large units (Sadeghian et al., 2018, Sec. III-A).
Parameters
----------
norm_pg_max : numpy.ndarray, shape (n, 2)
Remaining units after uncommitted selection:
``[bus_id, normalised_capacity]``.
total_units_count : int
Original total number of generator units (before any selection).
Returns
-------
committed : numpy.ndarray, shape (m, 2)
Committed units: ``[bus_id, norm_cap]``.
remaining : numpy.ndarray, shape (n-m, 2)
Units not selected (will become fully committed).
"""
ng_comm = int(round(total_units_count * (0.4 + np.random.rand() * 0.1)))
ng_comm = min(ng_comm, len(norm_pg_max))
if ng_comm <= 0:
return np.array([]), norm_pg_max
ng_99 = int(round(ng_comm * 0.99))
ng_01 = ng_comm - ng_99
committed = []
remaining = norm_pg_max.copy()
for _ in range(ng_99):
if len(remaining) == 0: break
t_unit = np.random.exponential(self.mu_committed) # the exp distribution of the commited units' capacities
idx = np.argmin(np.abs(remaining[:, 1] - t_unit))
committed.append(remaining[idx])
remaining = np.delete(remaining, idx, axis=0)
for _ in range(ng_01):
if len(remaining) == 0: break
t_comm = 0.5 + 0.5 * np.random.rand() # the extreme value of the 1% commited units' capacities
idx = np.argmin(np.abs(remaining[:, 1] - t_comm))
committed.append(remaining[idx])
remaining = np.delete(remaining, idx, axis=0)
return np.array(committed), remaining
[docs]
def _generate_alphas(self, n_comm: int) -> np.ndarray:
r"""Generate dispatch factors for partially committed units.
When ``alpha_mod == 0`` (e.g. NYISO), all :math:`\alpha` values are
drawn from Uniform[0, 1]. When ``alpha_mod != 0`` (e.g. WECC),
99.5 % are Uniform[0, 1] and 0.5 % are negative, representing
reverse dispatch such as pumped-storage hydro.
Parameters
----------
n_comm : int
Number of committed units requiring :math:`\alpha` values.
Returns
-------
numpy.ndarray, shape (n_comm, 1)
Dispatch-factor values.
"""
if n_comm == 0: return np.array([])
if self.alpha_mod == 0:
return np.random.rand(n_comm, 1)
else:
n_995 = int(round(n_comm * 0.995))
n_005 = n_comm - n_995
a1 = np.random.rand(n_995, 1) # 99.5% --- uniform[0, 1]
if n_005 > 0:
a2 = -np.random.rand(n_005, 1) # 0.5% --- negative dispatch value
return np.vstack((a1, a2))
return a1
[docs]
def _assign_alphas(self, units: np.ndarray, alphas: np.ndarray) -> np.ndarray:
r"""Assign dispatch factors to committed units via 2-D bin matching.
Units are sorted by normalised capacity and alphas by value, then
distributed into bins defined by ``Tab_2D_Pg`` (14 capacity bins
:math:`\times` 10 alpha bins). Within each bin, units and alphas
are paired randomly (high-to-low bin traversal). Any leftovers
are paired sequentially as a fallback.
This reproduces the empirical joint distribution
:math:`f(\bar{P}_{g}^{\max}, \alpha)` from the reference system
(Sadeghian et al., 2018, Table I).
Parameters
----------
units : numpy.ndarray, shape (n, 2)
``[bus_id, normalised_capacity]``.
alphas : numpy.ndarray, shape (n, 1)
Dispatch-factor values from :meth:`_generate_alphas`.
Returns
-------
numpy.ndarray, shape (m, 3)
``[bus_id, normalised_capacity, alpha]``.
"""
n_items = len(units)
if n_items == 0: return np.array([])
tab_n = np.round(self.tab_2d_pg * n_items).astype(int)
diff = n_items - np.sum(tab_n)
if diff != 0:
flat_idx = np.argmax(tab_n)
r, c = np.unravel_index(flat_idx, tab_n.shape)
tab_n[r, c] += diff
n_cap_targets = np.sum(tab_n, axis=1)
n_alpha_targets = np.sum(tab_n, axis=0)
sorted_units = units[units[:, 1].argsort()]
unit_bins = []
curr = 0
for count in n_cap_targets:
end = curr + count
unit_bins.append(list(sorted_units[curr:end]))
curr = end
sorted_alphas = np.sort(alphas.flatten())
alpha_bins = []
curr = 0
for count in n_alpha_targets:
end = curr + count
alpha_bins.append(list(sorted_alphas[curr:end]))
curr = end
final_list = []
for r in range(13, -1, -1):
for c in range(10):
count = tab_n[r, c]
if count > 0:
u_bin = unit_bins[r]
a_bin = alpha_bins[c]
take = min(count, len(u_bin), len(a_bin))
for _ in range(take):
if u_bin and a_bin:
u_idx = np.random.randint(0, len(u_bin))
unit = u_bin.pop(u_idx)
a_idx = np.random.randint(0, len(a_bin))
alpha_val = a_bin.pop(a_idx)
final_list.append([unit[0], unit[1], alpha_val])
# Leftovers fallback
remaining_units = [u for bin in unit_bins for u in bin]
remaining_alphas = [a for bin in alpha_bins for a in bin]
if remaining_units and remaining_alphas:
min_len = min(len(remaining_units), len(remaining_alphas))
for i in range(min_len):
final_list.append([remaining_units[i][0], remaining_units[i][1], remaining_alphas[i]])
return np.array(final_list) if final_list else np.array([])
[docs]
def dispatch(self) -> Dict[int, float]:
r"""Run the full generation dispatch pipeline.
Implements the algorithm of Sadeghian et al. (2018):
1. Collect generator buses and normalise capacities by
:math:`P^{\max}_{g_{\max}}`.
2. Partition generators into **uncommitted**
(:math:`\alpha = 0`), **partially committed**
(:math:`0 < \alpha < 1`), and **fully committed**
(:math:`\alpha = 1`).
3. Assign dispatch factors to partially committed units via
the 2-D bin-matching table ``Tab_2D_Pg``.
4. Iteratively balance total generation against total load
(1 % tolerance, up to 50 iterations) by scaling committed
:math:`\alpha` values and toggling uncommitted / full-load
units on or off.
5. Convert normalised dispatch back to MW:
:math:`P_{g_i} = \alpha_i \cdot \bar{P}_{g_i}^{\max} \cdot P^{\max}_{g_{\max}}`.
Returns
-------
dict
Mapping of generator bus ID to dispatched active power (MW).
"""
# 1. Prepare Buses into (Id, Gen) and (Id, Load)
gen_data = []
for n, d in self.graph.nodes(data=True):
if d.get('bus_type') == 'Gen':
gen_data.append([n, d.get('pg_max', 0.0)])
if not gen_data: return {}
gen_arr = np.array(gen_data)
max_pg = np.max(gen_arr[:, 1])
if max_pg == 0: max_pg = 1.0
norm_pg = gen_arr.copy()
norm_pg[:, 1] = norm_pg[:, 1] / max_pg
total_load = sum(d.get('pl', 0.0) for n, d in self.graph.nodes(data=True) if d.get('bus_type') == 'Load')
norm_total_load = total_load / max_pg
# 2. Partitioning --- uncommited, partially commited, and fully committed
uncomm_units, remaining = self._select_uncommitted(norm_pg)
comm_units_raw, remaining = self._select_committed(remaining, len(gen_arr))
full_units = []
for row in remaining:
full_units.append([row[0], row[1], 1.0])
full_units = np.array(full_units)
# 3. Alpha Assignment for the partially committed
if len(comm_units_raw) > 0:
alphas = self._generate_alphas(len(comm_units_raw))
comm_units = self._assign_alphas(comm_units_raw, alphas)
else:
comm_units = np.array([])
# 4. Balancing Logic
def calculate_current_total():
g_u = np.sum(uncomm_units[:, 1] * uncomm_units[:, 2]) if len(uncomm_units) > 0 else 0
g_c = np.sum(comm_units[:, 1] * comm_units[:, 2]) if len(comm_units) > 0 else 0
g_f = np.sum(full_units[:, 1] * full_units[:, 2]) if len(full_units) > 0 else 0
return g_u + g_c + g_f
for _ in range(50): # Max Iterations
current_gen = calculate_current_total()
diff = current_gen - norm_total_load
if abs(diff) < 0.01 * norm_total_load: # 1% Tolerance
break
if diff > 0: # Excess Gen
# 1. Try scaling down committed units
if len(comm_units) > 0:
comm_load = np.sum(comm_units[:, 1] * comm_units[:, 2])
# We need to reduce by 'diff'.
# new_comm_load = comm_load - diff
# ratio = new_comm_load / comm_load
if comm_load > 1e-6:
ratio = max(0, (comm_load - diff) / comm_load)
# Don't drop too drastically in one step to keep stability
ratio = max(ratio, 0.5)
comm_units[:, 2] *= ratio
continue
# 2. If committed scaling didn't help enough, turn off full units
if len(full_units) > 0:
# Find ON units
on_indices = np.where(full_units[:, 2] > 0.01)[0]
if len(on_indices) > 0:
# Turn off the largest one
subset_idx = np.argmax(full_units[on_indices, 1])
full_units[on_indices[subset_idx], 2] = 0.0
continue
# 3. Last resort: turn off committed units completely
if len(comm_units) > 0:
on_indices = np.where(comm_units[:, 2] > 0.01)[0]
if len(on_indices) > 0:
comm_units[on_indices[0], 2] = 0.0
else: # Deficit Gen (diff < 0)
# 1. Try scaling up committed units
if len(comm_units) > 0:
comm_load = np.sum(comm_units[:, 1] * comm_units[:, 2])
capacity = np.sum(comm_units[:, 1]) # Max possible if alpha=1
headroom = capacity - comm_load
if headroom > 1e-6:
# We need to increase by abs(diff)
# But we can't just multiply alphas linearly because they cap at 1.0
# Simple heuristic: multiply by 1.1 or calculated ratio
comm_units[:, 2] *= 1.1
comm_units[:, 2] = np.minimum(comm_units[:, 2], 1.0)
# If we actually gained something, continue
new_load = np.sum(comm_units[:, 1] * comm_units[:, 2])
if new_load > comm_load + 1e-6:
continue
# 2. Turn on Uncommitted units
if len(uncomm_units) > 0:
off_indices = np.where(uncomm_units[:, 2] < 0.01)[0]
if len(off_indices) > 0:
# Turn on largest available
subset_idx = np.argmax(uncomm_units[off_indices, 1])
uncomm_units[off_indices[subset_idx], 2] = 1.0 # Set to Full
continue
# 3. Turn on any Full units that were turned off
if len(full_units) > 0:
off_indices = np.where(full_units[:, 2] < 0.01)[0]
if len(off_indices) > 0:
full_units[off_indices[0], 2] = 1.0
continue
# 5. Result Export
result = {}
for group in [uncomm_units, comm_units, full_units]:
if len(group) > 0:
for row in group:
gen_mw = row[1] * row[2] * max_pg
result[int(row[0])] = gen_mw
return result