Source code for fleetrl.benchmarking.linear_optimization

from fleetrl.fleet_env.fleet_environment import FleetEnv
from fleetrl.benchmarking.benchmark import Benchmark

from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize
from stable_baselines3.common.env_util import make_vec_env

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime as dt

import pyomo.environ as pyo

[docs] class LinearOptimization(Benchmark): def __init__(self, n_steps: int, n_evs: int, n_episodes: int = 1, n_envs: int = 1, time_steps_per_hour: int = 4): self.n_steps = n_steps self.n_evs = n_evs self.n_episodes = n_episodes self.n_envs = n_envs self.time_steps_per_hour = time_steps_per_hour self.env_config = None
[docs] def run_benchmark(self, use_case: str, env_kwargs: dict, seed: int = None ) -> pd.DataFrame: lin_vec_env = make_vec_env(FleetEnv, n_envs=self.n_envs, vec_env_cls=SubprocVecEnv, env_kwargs=env_kwargs, seed=seed) lin_norm_vec_env = VecNormalize(venv=lin_vec_env, norm_obs=True, norm_reward=True, training=True, clip_reward=10.0) env_config = env_kwargs["env_config"] env = FleetEnv(env_config) # config needed for later use in plotting self.env_config = env_config # reading the input file as a pandas DataFrame df: pd.DataFrame = env.db # adjust length of df for n_steps # We don't need to optimise over an entire year if the eval period is just 48 hours # Lambda function filters out unnecessary data, but preserves database structure lin_norm_vec_env.reset() start_time = lin_norm_vec_env.env_method("get_start_time")[0] end_time = pd.to_datetime(start_time) + dt.timedelta(hours=self.n_steps) - dt.timedelta(minutes=15) first_date = df["date"].min() last_date = df["date"].min() + dt.timedelta(hours=self.n_steps) - dt.timedelta(minutes=15) df = df[df.groupby(by="ID").date.transform(lambda x: ((x <= end_time) & (x >= start_time)))].reset_index(drop=True) # Extracting information from the df ev_data = [df.loc[df["ID"] == i, "There"] for i in range(self.n_evs)] building_data = df["load"] # building load in kW # quarter hour resolution, n_steps is in hours length_time_load_pv = self.n_steps * 4 price_data = np.multiply(np.add(df["DELU"], env.ev_config.fixed_markup), env.ev_config.variable_multiplier) / 1000 tariff_data = np.multiply(df["tariff"], 1 - env.ev_config.feed_in_deduction) / 1000 pv_data = df["pv"] # pv power in kW soc_on_return = [df.loc[df["ID"] == i, "SOC_on_return"] for i in range(self.n_evs)] battery_capacity = env.ev_config.init_battery_cap # EV batt size in kWh p_trafo = env.load_calculation.grid_connection # Transformer rating in kW charging_eff = env.ev_config.charging_eff # charging losses discharging_eff = env.ev_config.discharging_eff # discharging losses init_soc = env.ev_config.def_soc # init SoC evse_max_power = env.load_calculation.evse_max_power # kW, max rating of the charger # create pyomo model model = pyo.ConcreteModel(name="sc_pyomo") model.timestep = pyo.Set(initialize=range(length_time_load_pv)) model.time_batt = pyo.Set(initialize=range(0, length_time_load_pv + 1)) model.ev_id = pyo.Set(initialize=range(self.n_evs)) # model parameters model.building_load = pyo.Param(model.timestep, initialize={i: building_data[i] for i in range(length_time_load_pv)}) model.pv = pyo.Param(model.timestep, initialize={i: pv_data[i] for i in range(length_time_load_pv)}) model.ev_availability = pyo.Param(model.timestep, model.ev_id, initialize={(i, j): ev_data[j].iloc[i] for i in range(length_time_load_pv) for j in range(self.n_evs)}) model.soc_on_return = pyo.Param(model.timestep, model.ev_id, initialize={(i, j): soc_on_return[j].iloc[i] for i in range(length_time_load_pv) for j in range(self.n_evs)}) model.price = pyo.Param(model.timestep, initialize={i: price_data[i] for i in range(length_time_load_pv)}) model.tariff = pyo.Param(model.timestep, initialize={i: tariff_data[i] for i in range(length_time_load_pv)}) # decision variables # this assumes only charging, I could also make bidirectional later model.soc = pyo.Var(model.time_batt, model.ev_id, bounds=(0, env.ev_config.target_soc)) model.charging_signal = pyo.Var(model.timestep, model.ev_id, within=pyo.NonNegativeReals, bounds=(0, 1)) model.discharging_signal = pyo.Var(model.timestep, model.ev_id, within=pyo.NonPositiveReals, bounds=(-1, 0)) model.positive_action = pyo.Var(model.timestep, model.ev_id, within=pyo.Binary) model.used_pv = pyo.Var(model.timestep, model.ev_id, within=pyo.NonNegativeReals) def grid_limit(m, i, ev): return ((m.charging_signal[i, ev] + m.discharging_signal[i, ev]) * evse_max_power + m.building_load[i] - m.pv[i] <= p_trafo) def mutual_exclusivity_charging(m, i, ev): return m.charging_signal[i, ev] <= m.positive_action[i, ev] def mutual_exclusivity_discharging(m, i, ev): return m.discharging_signal[i, ev] >= (m.positive_action[i, ev] - 1) def pv_use(m, i, ev): return m.used_pv[i, ev] <= m.charging_signal[i, ev] * evse_max_power def pv_avail(m, i, ev): return m.used_pv[i, ev] <= m.pv[i] / self.n_evs def no_charge_when_no_car(m, i, ev): if m.ev_availability[i, ev] == 0: return m.charging_signal[i, ev] == 0 else: return pyo.Constraint.Feasible def no_discharge_when_no_car(m, i, ev): if m.ev_availability[i, ev] == 0: return m.discharging_signal[i, ev] == 0 else: return pyo.Constraint.Feasible def soc_rules(m, i, ev): # last time step if i == length_time_load_pv - 1: return (m.soc[i + 1, ev] == m.soc[i, ev] + (m.charging_signal[i, ev] * charging_eff + m.discharging_signal[i, ev]) * evse_max_power * 1 / self.time_steps_per_hour / battery_capacity) # new arrival elif (m.ev_availability[i, ev] == 0) and (m.ev_availability[i + 1, ev] == 1): return m.soc[i + 1, ev] == m.soc_on_return[i + 1, ev] # departure in next time step elif (m.ev_availability[i, ev] == 1) and (m.ev_availability[i + 1, ev] == 0): return m.soc[i, ev] == env.ev_config.target_soc else: return pyo.Constraint.Feasible def charging_dynamics(m, i, ev): # last time step if i == length_time_load_pv - 1: return (m.soc[i + 1, ev] == m.soc[i, ev] + (m.charging_signal[i, ev] * charging_eff + m.discharging_signal[i, ev]) * evse_max_power * 1 / self.time_steps_per_hour / battery_capacity) # charging if (m.ev_availability[i, ev] == 1) and (m.ev_availability[i + 1, ev] == 1): return (m.soc[i + 1, ev] == m.soc[i, ev] + (m.charging_signal[i, ev] * charging_eff + m.discharging_signal[i, ev]) * evse_max_power * 1 / self.time_steps_per_hour / battery_capacity) elif (m.ev_availability[i, ev] == 1) and (m.ev_availability[i + 1, ev] == 0): return m.soc[i + 1, ev] == 0 elif m.ev_availability[i, ev] == 0: return m.soc[i, ev] == 0 else: return pyo.Constraint.Feasible def max_charging_limit(m, i, ev): return m.charging_signal[i, ev] * evse_max_power <= evse_max_power * m.ev_availability[i, ev] def max_discharging_limit(m, i, ev): return m.discharging_signal[i, ev] * evse_max_power * -1 <= evse_max_power * m.ev_availability[i, ev] def first_soc(m, i, ev): return m.soc[0, ev] == init_soc def no_departure_abuse(m, i, ev): if i == length_time_load_pv - 1: return pyo.Constraint.Feasible if (m.ev_availability[i, ev] == 0) and (m.ev_availability[i - 1, ev]) == 1: return m.discharging_signal[i, ev] == 0 elif (m.ev_availability[i, ev] == 1) and (m.ev_availability[i + 1, ev]) == 0: return m.discharging_signal[i, ev] == 0 else: return pyo.Constraint.Feasible # constraints model.cs1 = pyo.Constraint(model.timestep, model.ev_id, rule=first_soc) model.cs2 = pyo.Constraint(model.timestep, model.ev_id, rule=grid_limit) model.cs3 = pyo.Constraint(model.timestep, model.ev_id, rule=max_charging_limit) model.cs4 = pyo.Constraint(model.timestep, model.ev_id, rule=max_discharging_limit) model.cs5 = pyo.Constraint(model.timestep, model.ev_id, rule=soc_rules) model.cs6 = pyo.Constraint(model.timestep, model.ev_id, rule=charging_dynamics) model.cs8 = pyo.Constraint(model.timestep, model.ev_id, rule=mutual_exclusivity_charging) model.cs9 = pyo.Constraint(model.timestep, model.ev_id, rule=mutual_exclusivity_discharging) model.cs10 = pyo.Constraint(model.timestep, model.ev_id, rule=no_charge_when_no_car) model.cs11 = pyo.Constraint(model.timestep, model.ev_id, rule=no_discharge_when_no_car) model.cs12 = pyo.Constraint(model.timestep, model.ev_id, rule=pv_use) model.cs13 = pyo.Constraint(model.timestep, model.ev_id, rule=pv_avail) timestep_set = pyo.RangeSet(0, length_time_load_pv - 1) def obj_fun(m): return (sum([((m.charging_signal[i, ev] * evse_max_power - m.used_pv[i, ev]) / self.time_steps_per_hour) * m.price[i] + ((m.discharging_signal[i, ev] * evse_max_power * discharging_eff) / self.time_steps_per_hour) * m.tariff[i] for i in m.timestep for ev in range(self.n_evs)])) model.obj = pyo.Objective(rule=obj_fun, sense=pyo.minimize) opt = pyo.SolverFactory('glpk') # you can specify where a solver lies with the parameter 'executable' opt.options['mipgap'] = 0.005 res = opt.solve(model, tee=True) print(res) actions = [ np.array([model.charging_signal[i, j].value + model.discharging_signal[i, j].value for j in range(self.n_evs)]) for i in range(length_time_load_pv)] actions = pd.DataFrame({"action": actions}) actions.index = pd.date_range(start=start_time, end=end_time, freq="15T") actions["hid"] = actions.index.hour + actions.index.minute / 60 len_day = 24 * 4 action_plot = [] for i in range(len_day): action_plot.append(actions.groupby("hid").mean()["action"].reset_index(drop=True)[i].mean()) # plt.plot(action_plot) # plt.show() lin_norm_vec_env.reset() env_actions = actions.loc[(actions.index >= start_time) & (actions.index <= end_time), "action"].reset_index( drop=True) for i in range(self.n_steps * self.time_steps_per_hour): lin_norm_vec_env.step([np.multiply(np.ones(self.n_evs), env_actions[i])]) lin_log: pd.DataFrame = lin_norm_vec_env.env_method("get_log")[0] lin_log.reset_index(drop=True, inplace=True) lin_log = lin_log.iloc[0:-2] return lin_log
[docs] def plot_benchmark(self, lin_log: pd.DataFrame, ) -> None: lin_log["hour_id"] = (lin_log["Time"].dt.hour + lin_log["Time"].dt.minute / 60) mean_per_hid_lin = lin_log.groupby("hour_id").mean()["Charging energy"].reset_index(drop=True) mean_all_lin = [] for i in range(mean_per_hid_lin.__len__()): mean_all_lin.append(np.mean(mean_per_hid_lin[i])) mean = pd.DataFrame() mean["Linear optimization"] = np.multiply(mean_all_lin, 4) mean.plot() plt.xticks([0, 8, 16, 24, 32, 40, 48, 56, 64, 72, 80, 88] , ["00:00", "02:00", "04:00", "06:00", "08:00", "10:00", "12:00", "14:00", "16:00", "18:00", "20:00", "22:00"], rotation=45) plt.legend() plt.grid(alpha=0.2) plt.ylabel("Charging power in kW") price_lookahead = self.env_config["price_lookahead"] * int(self.env_config["include_price"]) bl_pv_lookahead = self.env_config["bl_pv_lookahead"] number_of_lookaheads = sum([int(self.env_config["include_pv"]), int(self.env_config["include_building"])]) # check observer module for building of observation list power_index = self.n_evs * 6 + 2 * (price_lookahead+1) + number_of_lookaheads * (bl_pv_lookahead+1) + 1 max_val = lin_log.loc[0, "Observation"][power_index] plt.ylim([-max_val * 1.2, max_val * 1.2]) plt.show()