Source code for el1xr_opt.Modules.oM_ACOPF

# Developed by: Erik F. Alvarez
#
# Electric Power System Unit
# RISE
# erik.alvarez@ri.se
#
# AC optimal power flow analysis module for el1xr_opt (Phase 5a).
#
# A standalone, single-phase (balanced) AC OPF that runs on a network snapshot:
# the buses and branches from a case's electricity-network data plus a set of
# nodal net injections (e.g. taken from a chosen period of a solved operational
# model). It is decoupled from the main linear/MILP solve on purpose — a full-year
# AC MINLP is intractable — so AC feasibility (voltages, losses, reactive power)
# is checked on selected snapshots instead.
#
# Two formulations, chosen by ``formulation``:
#   * "nlp"  — exact polar-form AC OPF (non-convex), solved with Ipopt. Works for
#              any topology (radial or meshed).
#   * "socp" — second-order-cone (branch-flow / DistFlow) relaxation, convex,
#              solved with Gurobi. Requires a radial network; it also returns the
#              relaxation gap (how far P^2+Q^2 is from l*v), the standard exactness
#              check.
#
# Network data uses the existing el1xr columns: branches indexed by
# (InitialNode, FinalNode, Circuit) with Reactance and (optionally) Resistance;
# a lossless line (no Resistance) is allowed but then there are no active losses.
#
# This is the Phase-5a baseline (single-phase). Three-phase unbalanced AC OPF
# (Phase 5c) is a separate, larger effort; see docs/scope_acopf_community.md.

import math

from pyomo.environ import (ConcreteModel, Var, Constraint, Objective, Reals,
                           NonNegativeReals, minimize, cos, sin, value)


[docs] def build_branches(network_df): """Turn an el1xr ElectricityNetwork DataFrame into a list of branch dicts. Expects the (InitialNode, FinalNode, Circuit) multi-index (the unnamed index columns) and ``Reactance`` / optional ``Resistance`` columns. Per-unit values are used as given. A missing/zero reactance branch is skipped (not a real series element). """ branches = [] for idx, row in network_df.iterrows(): i, j, c = idx if isinstance(idx, tuple) else (idx, None, None) x = float(row.get("Reactance", 0.0) or 0.0) if x == 0.0: continue r = row.get("Resistance", 0.0) r = 0.0 if (r is None or (isinstance(r, float) and math.isnan(r))) else float(r) ttc = float(row.get("TTC", 0.0) or 0.0) branches.append({"i": i, "j": j, "c": c, "r": r, "x": x, "ttc": ttc}) return branches
def _buses(branches): bs = [] for b in branches: for n in (b["i"], b["j"]): if n not in bs: bs.append(n) return bs def _orient_radial(branches, slack): """Orient each branch parent->child away from the slack by BFS. Returns the oriented branch list (with 'up' = parent bus) or raises if the network is not a radial tree (needed by the DistFlow SOCP).""" adj = {} for k, b in enumerate(branches): adj.setdefault(b["i"], []).append((b["j"], k)) adj.setdefault(b["j"], []).append((b["i"], k)) seen = {slack} oriented = [] queue = [slack] used = set() while queue: u = queue.pop(0) for v, k in adj.get(u, []): if k in used: continue if v in seen: raise ValueError("network is not radial (cycle found); use formulation='nlp'") used.add(k) seen.add(v) ob = dict(branches[k]) ob["up"], ob["dn"] = u, v oriented.append(ob) queue.append(v) if len(oriented) != len(branches): raise ValueError("network is not radial (parallel/extra branches); use formulation='nlp'") return oriented
[docs] def solve_acopf_nlp(branches, Pd, Qd, slack, v0=1.0, vmin=0.9, vmax=1.1, qlim=3.0, plim=10.0, solver="ipopt", init_v=None, init_th=None): """Exact polar-form AC OPF. Pd/Qd: dict bus->net load (p.u.). Returns a result dict (objective = total slack-bus injection ~ losses + net import). ``init_v`` / ``init_th`` give a warm start (bus -> value); on a stressed radial feeder a flat start often fails, so passing the SOC relaxation's voltages is the reliable way to converge. """ buses = _buses(branches) # Series admittance y = 1/(r+jx) = gs + j bs (gs>0, bs<0). The bus-admittance # matrix has diagonal Y_ii = sum of series admittances, and off-diagonal # Y_ij = -y_ij. So the off-diagonal G/B used in the injection equations are the # NEGATED series values; the diagonal is the (positive) series sum. gs = {} bs = {} for br in branches: denom = br["r"] ** 2 + br["x"] ** 2 gs[(br["i"], br["j"])] = br["r"] / denom bs[(br["i"], br["j"])] = -br["x"] / denom nbr = {n: [] for n in buses} for br in branches: nbr[br["i"]].append(br["j"]) nbr[br["j"]].append(br["i"]) def _series_g(i, j): return gs.get((i, j), gs.get((j, i), 0.0)) def _series_b(i, j): return bs.get((i, j), bs.get((j, i), 0.0)) def goff(i, j): return -_series_g(i, j) # Ybus off-diagonal conductance def boff(i, j): return -_series_b(i, j) # Ybus off-diagonal susceptance Gii = {n: sum(_series_g(n, m) for m in nbr[n]) for n in buses} Bii = {n: sum(_series_b(n, m) for m in nbr[n]) for n in buses} init_v = init_v or {} init_th = init_th or {} m = ConcreteModel() m.V = Var(buses, bounds=(vmin, vmax), initialize=lambda mm, n: init_v.get(n, 1.0)) m.th = Var(buses, initialize=lambda mm, n: init_th.get(n, 0.0)) m.Pg = Var(buses, bounds=(-plim, plim), initialize=0.0) m.Qg = Var(buses, bounds=(-qlim, qlim), initialize=0.0) m.V[slack].fix(v0) m.th[slack].fix(0.0) def pinj(mm, i): return mm.Pg[i] - Pd.get(i, 0.0) == mm.V[i] ** 2 * Gii[i] + sum( mm.V[i] * mm.V[j] * (goff(i, j) * cos(mm.th[i] - mm.th[j]) + boff(i, j) * sin(mm.th[i] - mm.th[j])) for j in nbr[i]) m.pinj = Constraint(buses, rule=pinj) def qinj(mm, i): return mm.Qg[i] - Qd.get(i, 0.0) == -mm.V[i] ** 2 * Bii[i] + sum( mm.V[i] * mm.V[j] * (goff(i, j) * sin(mm.th[i] - mm.th[j]) - boff(i, j) * cos(mm.th[i] - mm.th[j])) for j in nbr[i]) m.qinj = Constraint(buses, rule=qinj) # only the slack injects (a single supply point); other buses are loads for n in buses: if n != slack: m.Pg[n].fix(0.0) m.Qg[n].fix(0.0) m.obj = Objective(expr=m.Pg[slack], sense=minimize) from pyomo.environ import SolverFactory res = SolverFactory(solver).solve(m) return { "formulation": "nlp", "objective": float(value(m.obj)), "V": {n: float(value(m.V[n])) for n in buses}, "theta": {n: float(value(m.th[n])) for n in buses}, "solver_status": str(res.solver.termination_condition), }
[docs] def solve_acopf_socp(branches, Pd, Qd, slack, v0=1.0, vmin=0.9, vmax=1.1, solver="gurobi"): """Branch-flow (DistFlow) SOC relaxation on a radial network. Returns a result dict including the per-branch relaxation gap l*v - (P^2+Q^2) (0 = exact).""" ob = _orient_radial(branches, slack) buses = _buses(branches) children = {n: [] for n in buses} for k, br in enumerate(ob): children[br["up"]].append(k) m = ConcreteModel() m.K = range(len(ob)) m.P = Var(m.K, within=Reals) # active flow into branch k (from parent) m.Q = Var(m.K, within=Reals) m.l = Var(m.K, within=NonNegativeReals) # squared current m.w = Var(buses, within=NonNegativeReals, initialize=1.0) # squared voltage m.w[slack].fix(v0 ** 2) for n in buses: if n != slack: m.w[n].setlb(vmin ** 2) m.w[n].setub(vmax ** 2) # active/reactive balance at each downstream bus def pbal(mm, k): br = ob[k] dn = br["dn"] out = sum(mm.P[kk] for kk in children[dn]) return mm.P[k] - br["r"] * mm.l[k] - out == Pd.get(dn, 0.0) m.pbal = Constraint(m.K, rule=pbal) def qbal(mm, k): br = ob[k] dn = br["dn"] out = sum(mm.Q[kk] for kk in children[dn]) return mm.Q[k] - br["x"] * mm.l[k] - out == Qd.get(dn, 0.0) m.qbal = Constraint(m.K, rule=qbal) def volt(mm, k): br = ob[k] return mm.w[br["dn"]] == mm.w[br["up"]] - 2 * (br["r"] * mm.P[k] + br["x"] * mm.Q[k]) \ + (br["r"] ** 2 + br["x"] ** 2) * mm.l[k] m.volt = Constraint(m.K, rule=volt) def soc(mm, k): br = ob[k] return mm.P[k] ** 2 + mm.Q[k] ** 2 <= mm.l[k] * mm.w[br["up"]] m.soc = Constraint(m.K, rule=soc) m.obj = Objective(expr=sum(ob[k]["r"] * m.l[k] for k in m.K), sense=minimize) # total losses from pyomo.environ import SolverFactory res = SolverFactory(solver).solve(m) gap = {} for k in m.K: lhs = value(m.P[k]) ** 2 + value(m.Q[k]) ** 2 rhs = value(m.l[k]) * value(m.w[ob[k]["up"]]) gap[(ob[k]["up"], ob[k]["dn"])] = float(rhs - lhs) return { "formulation": "socp", "objective": float(value(m.obj)), "V": {n: math.sqrt(max(0.0, float(value(m.w[n])))) for n in buses}, "losses": float(value(m.obj)), "relaxation_gap_max": max(gap.values()) if gap else 0.0, "relaxation_gap": gap, "solver_status": str(res.solver.termination_condition), }
[docs] def solve_acopf(network_df, Pd, Qd, slack, formulation="socp", warm_start=True, **kw): """Dispatch by formulation. Pd/Qd are dicts bus -> net load in per unit. For ``formulation='nlp'`` on a radial network the SOC relaxation is solved first (cheap, convex) and its voltages are used to warm-start the non-convex NLP, which otherwise often fails from a flat start on a stressed feeder. """ branches = build_branches(network_df) if formulation == "socp": return solve_acopf_socp(branches, Pd, Qd, slack, **kw) if formulation == "nlp": if warm_start and "init_v" not in kw: try: socp = solve_acopf_socp(branches, Pd, Qd, slack, v0=kw.get("v0", 1.0), vmin=kw.get("vmin", 0.9), vmax=kw.get("vmax", 1.1)) kw.setdefault("init_v", socp["V"]) except Exception as exc: # non-radial network or no cone solver: fall back to a flat start print(f" AC OPF: SOC warm start unavailable ({exc}); using a flat start") return solve_acopf_nlp(branches, Pd, Qd, slack, **kw) raise ValueError(f"unknown AC OPF formulation '{formulation}' (use 'socp' or 'nlp')")
[docs] def run_acopf_sweep(network_df, snapshots, slack, formulation="socp", vmin_check=0.95, vmax_check=1.05, vmin_solve=0.0, vmax_solve=1.5, **kw): """Run AC OPF over many snapshots and summarise voltage/loss violations. ``snapshots`` is a list of ``(label, Pd, Qd)`` (per-unit nodal loads). Each is solved with permissive voltage bounds (``vmin_solve``/``vmax_solve``) so the actual power-flow voltages are found, then checked against the nominal limits ``vmin_check``/``vmax_check`` to count violations. This is the Phase-5b sweep: one AC OPF per representative period, with a year/horizon violation summary. Returns a list of per-snapshot dicts: label, loss, vmin, vmax, violations, status. A snapshot that fails to solve (e.g. voltage collapse at high load) is recorded with status 'infeasible/error' rather than aborting the sweep. """ rows = [] for label, Pd, Qd in snapshots: try: r = solve_acopf(network_df, Pd, Qd, slack, formulation=formulation, vmin=vmin_solve, vmax=vmax_solve, **kw) V = r["V"] vlo, vhi = min(V.values()), max(V.values()) viol = sum(1 for v in V.values() if v < vmin_check - 1e-6 or v > vmax_check + 1e-6) status = r.get("solver_status", "") ok = "optimal" in str(status).lower() or "feasible" in str(status).lower() rows.append({"label": label, "loss_pu": r.get("losses", r["objective"]), "vmin": vlo, "vmax": vhi, "violations": viol, "status": status if ok else f"check:{status}"}) except Exception as exc: rows.append({"label": label, "loss_pu": None, "vmin": None, "vmax": None, "violations": None, "status": f"infeasible/error: {str(exc)[:60]}"}) return rows
[docs] def scaled_snapshots(Pd, Qd, factors): """Build snapshots by scaling a base load by each factor in ``factors`` (e.g. a daily/seasonal profile). ``factors`` is a dict label -> multiplier.""" snaps = [] for label, f in factors.items(): snaps.append((label, {n: p * f for n, p in Pd.items()}, {n: q * f for n, q in Qd.items()})) return snaps
[docs] def snapshots_from_case(dir_name, case_name, load_levels=None): """Build AC OPF snapshots from a case's demand data: per-node net load at each requested load level. Reads the case through the standard input source, maps demands to their nodes, and sums per node. ``load_levels`` defaults to all. This is the interface for running the sweep on a real case once it has a multi-bus network; on the current single-zone sample cases it returns one bus of load, so it is mainly the wiring for multi-bus distribution cases. """ import pandas as pd from .oM_InputSource import resolve_source src = resolve_source(dir_name, case_name) dem_cfg = src.read_dict("ElectricityDemand") vmax = src.read_data("VarMaxDemand") # wide: index (p,sc,n), cols = demands src.close() # demand -> node map node_of = {} if not dem_cfg.empty and "Node" in dem_cfg.columns: key = dem_cfg.columns[0] node_of = dict(zip(dem_cfg[key], dem_cfg["Node"])) levels = list(vmax.index) if load_levels is None else load_levels snaps = [] for lvl in levels: Pd = {} row = vmax.loc[lvl] for dem, val in row.items(): nd = node_of.get(dem) if nd is None or pd.isna(val): continue Pd[f"B_{nd}" if not str(nd).startswith("B") else nd] = Pd.get(nd, 0.0) + float(val) snaps.append((str(lvl), Pd, {n: 0.0 for n in Pd})) return snaps
[docs] def write_acopf_sweep(db_path, case, rows): """Write the per-snapshot sweep summary to a DuckDB table.""" import duckdb import pandas as pd df = pd.DataFrame(rows) df.insert(0, "case", case) con = duckdb.connect(db_path) try: duckdb.from_df(df, connection=con).create("oM_Result_ACOPF_Sweep") finally: con.close() return db_path
[docs] def write_acopf_results(db_path, case, result): """Append AC OPF results (per-bus voltages + summary) to a DuckDB file.""" import duckdb import pandas as pd volt = pd.DataFrame({"bus": list(result["V"].keys()), "voltage_pu": list(result["V"].values())}) summary = pd.DataFrame({ "key": ["case", "formulation", "objective", "solver_status", "relaxation_gap_max"], "value": [case, result["formulation"], result["objective"], result.get("solver_status", ""), result.get("relaxation_gap_max", "")], }) con = duckdb.connect(db_path) try: import duckdb as _d _d.from_df(volt, connection=con).create("oM_Result_ACOPF_Voltage") _d.from_df(summary, connection=con).create("oM_Result_ACOPF_Summary") finally: con.close() return db_path